RabbitMQ와 Decoupling 서비스를 통한 서버 부하 분산
서론
여행 기록 관리 플랫폼 프로젝트를 진행하며 이미지 관련 기능 사용 시 서버의 부하량 해결을 위해 RabbitMQ의 도입을 논의하였다. 또한, 필자는 이전부터 RabbitMQ, Kafka와 같은 AMQP를 도입한 프로젝트들을 많이 보았고, Websocket을 통한 메시지 처리를 진행하며 Redis pub/sub 구조를 학습하며 메시지 브로커를 사용하는 MOM(Message Oriented Middleware) 구조에 대해 알아본 경험이 있다. 당시에도 MOM 구조와 RabbitMQ에 대한 호기심이 있어 이와 관련된 내용을 찾아보았다. 많은 블로그 등에서 성능적인 부분에서 이점을 가지고 올 수 있다는 내용이 많았지만 와닿는 글은 그렇게 많지 않았던 것 같다. 또한, 메시지 브로커를 사용하여 비동기 작업 수행하게 되는데 “스프링 내에서 쓰레드를 만들어 비동기 작업을 수행하는 것과 차이점이 뭐지?”라는 것과 같은 의문이 들기도 하였다.
이번 포스팅에서는 RabbitMQ를 통해 AMQP의 개념과 특징을 알아보고, Spring에 RabbitMQ를 적용시켜 실제 사용은 어떻게 이루어지는 지 알아볼 예정이다. 또한, 별개의 스프링 애플리케이션을 실행시켜 분산(Decoupling) 서비스 구조를 임시로 구성해 부하량 차이를 확인할 예정이다.
AMQP
AMQP(Advanced Message Queuing Protocol)은 메시지 지향 미들웨어(MOM)에서 통신을 위한 표준 프로토콜이다.
시스템 간 안정적이고 효율적인 메시지 전달을 위해 사용되는 프로토콜로 다음과 같은 특징을 가진다.
AMQP는 특정 프로그램이나 언어에 종속되지 않는다. 메시징 제공자와 클라이언트(수신자)의 동작에 대해 각기 다른 벤더들의 구현체가 상호 운용될 수 있을 정도의 권한을 부여하여 플랫폼 독립성을 가진다.
Exchange와 Queue를 사용하여 메시지를 다양한 방식으로 라우팅 가능하다. 이를 통해 Publisher-Subscriber, Point-to-Point 등의 메시징 패턴을 구성할 수 있다.
메시지 전달 시 오류가 일어나더라도 이에 대한 대처가 가능하여 메시지 전달의 신뢰성을 보장한다. 또한, 메시징 순서 보장 및 트랙잭션을 가능하게 한다.
Exchange를 통해 여러가지 라우팅 방식(Direct, Topic, Fanout, etc)을 제공하여, 복잡한 메시징 요구사항을 충족할 수 있다.
SSL/TLS를 사용한 보안 및 사용자 인증 기능을 제공한다.
AMQP의 주요 구성 요소
AMQP는 위와 같이 Producer, Exchange, Queue, Consumer로 구성되어 있다.
큰 흐름은 (1) Procuder에서 메시지를 발행하면, 메시지 브로커 역할을 하는 AMQP 구현체로 메시지가 전달된다. 메시지 브로커 내부에서는 (2) Exchange가 가장 먼저 메시지를 받아 적절한 Queue로 메시지를 라우팅한다. 이 때, 어떠한 Exchange가 어떠한 Queue로 연결될 지에 대한 설정을 바운딩(Bounding)이라 한다.
(3) Queue에 전달된 메시지는 Consumer에게 전달된다. 경우에 따라, (4) Consumer에서 메시지 수신 후 ACK 신호를 메시지 브로커로 전달하여 메시지가 잘 수신된 것을 확인한 경우 큐에서 메시지를 제거하기도 한다.
전체적인 흐름은 Redis pub/sub에서 다루었던 메시지 브로커의 역할과 동일하다. Producer가 생산한 메시지를 Consumer에게 전달할 때, 중간에 위치한 Message Broker에서 이를 제어한다. 그렇다면 메시지 브로커를 사용하는 것의 장점은 무엇일까?
메시지 브로커의 사용 이점
필자가 이전에 작성하였던 WebSocket, STOMP, Redis pub/sub에서는 단순히 Websocket을 사용해 특정 Topic을 구독하고 있는 여러 사용자(Subscriber)에게 메시지를 전달하는 목적으로 Redis pub/sub 구조를 도입하였다. 단순히 현재 기능 구현의 목적에 맞기 때문에 도입하였던 것이었기에 기능상이나 성능 상의 이점과 같은 부분들을 간과하였던 것 같다. AMQP 사용 시 아래와 같은 이점들이 존재한다.
- 비동기 작업을 통한 사용자 응답 시간 개선 및 작업 관심사 분리
- 여러 서버에서 플랫폼 종속성없이 메시지 전달 가능
- 메시지 전달 실패 시 재시도 및 장애 분리와 같은 안전성 보장
- MSA와 같은 분산환경을 구성하여 Producer, Consumer를 별도 배치하여 부하 분산 효과 기대
위와 같은 이점 외에도 다양한 이점들이 존재한다. 이번 포스팅에서는 Producer와 Consumer를 독립 배포한 상황을 가정하여 성능 상의 이점을 확인하고자 한다.
RabbitMQ
RabbitMQ는 AMQP의 구현체 중 하나로 완성도 높은 메시징 브로커 환경을 제공한다. 또한, 스프링부트에서 springboot-starter-amqp 디펜던시를 제공하기 때문에 사용 상 편리함도 존재한다.
RabbitMQ는 AMQP 1.0 및 MQTT 5.0을 포함하여 여러 개방형 표준 프로토콜을 지원하고, 다양한 프로그래밍 언어를 통해 클라이언트를 구성할 수 있어 벤더 종속성 탈피가 가능하다.
또한, 전달이 실패한 메시지(dead letter)에 대한 재시도 로직 및 급작스러운 중단 시 메시지 내용을 저장하여 재실행 후 메시지를 전달할 수 있도록하여 안정성을 보장한다.
이번 절에서는 RabbitMQ 내에 Exchange, Bounding, Queue가 어떻게 구성되어 있는지 확인해볼 것이다.
RabbitMQ 설치
Docker를 통하여 RabbitMQ를 실행한다.
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management
5672
는 RabbitMQ가 실행되는 포트번호를 의미하며, 15672
는 RabbitMQ를 관리하기 위한 웹 콘솔 포트 번호이다.
-e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin
참고로 위의 옵션을 설정하여 RabbitMQ 접속을 위한 Username과 Password를 설정할 수 있다. 기본적으로는 ‘guest’, ‘guest’로 적용이 된다.
Exchange
Type | Durability |
![]() |
![]() |
RabbitMQ의 Exchange에서는 Name, Type, Durability, Auto Delete와 같은 속성이 존재한다.
Type은 해당 Exchange가 라우팅하는 방식을 나타낸다.
Type | Description |
Direct | 라우팅 키가 정확히 일치하는 Queue로 메시지 전송 |
Fan-out | 해당 Exchange와 연결된 모든 Queue로 메시지 전송 |
Headers | {key:value}로 이루어진 header 값을 기준으로 일치하는 Queue로 메시지 전송 |
Topic | 라우팅 키의 패턴이 일치하는 Queue로 메시지 전송 |
x-local-random | 해당 Exchange와 연결된 Queue 중 랜덤으로 선택하여 메시지 전송 |
위와 같은 5가지 타입이 있으며 x-local-random은 RabbitMQ 4.0부터 도입된 Exchange이다. 상황에 따라 적절한 타입의 Exchange를 생성하여 사용하며, 이번 실습에서는 Direct Exchange를 생성해 라우팅 키가 정확히 일치하는 Queue로 메시지를 전달하여 처리할 것이다. 각 타입의 Exchange를 나타낸 포스팅에서 해당 개념을 다이어그램으로 확인할 수 있다.
Durability는 RabbitMQ가 재시작되었을 때, 해당 Exchange가 계속해서 남아있을 수 있도록 디스크에 저장할지 여부를 나타낸다.
- Durable - Exchange를 디스크에 저장
- Transient - Exchange를 메모리에 저장
Durable은 Exchange를 디스크에 저장하여 RabbitMQ가 재시작되더라도 해당 Exchange를 다시 사용할 수 있다. Transient(임시의)는 RabbitMQ가 재시작되었을 때 해당 Exchange는 소멸된다. Transient는 임시 작업이나 테스트 등에서 사용하며, 디스크에 쓰는 연산이 없기에 빠르다. 그러나 운영용 서비스에서는 안정성이 중요하며, 서버를 재시작하였을 때도 동일한 Exchange가 존재하여야 의도한대로 작업이 동작하기 때문에 실습에서는 Durable 속성을 사용할 것이다.
Auto Delete 설정은 Exchange에 메시지가 모두 소비되어 더 이상 라우팅할 메시지가 없는 경우 자동으로 해당 Exchange를 지워주는 설정이다. 기본적으로는 false
로 설정된다.
Queue
Type | Durability |
![]() |
![]() |
Queue에도 Name, Type, Durability, Auto Delete 속성이 존재한다.
Type은 Queue의 형태를 나타낸다.
Type | Description |
Default for Virtual Host | 웹 콘솔 상에서 생성하는 Queue |
Classic | 기본 Queue 유형, 비복제 FIFO 구조 사용 |
Quorum | Raft 합의 알고리즘 기반 복제 FIFO 구조 사용 |
Stream | 대용량 메시지 스트림 처리에 최적화된 Queue |
Queue는 4가지 타입이 존재하며 일반적으로는 기본 Queue의 유형인 Classic Queue를 사용한다. Classic Queue와 Quorum Queue는 비복제-복제의 차이를 가진다. 여기서 말하는 복제는 클러스터링을 통해 여러 RabbitMQ를 구동시킨 경우, 상태를 복제한 Queue를 의미한다. Quorum Queue는 시스템이
그 외 Durability와 Auto Delete는 Exchange에서 이야기한 속성과 동일한 동작을 나타내는 속성이다.
SpringBoot with RabbitMQ
이제 실제로 RabbitMQ를 사용해보고자 한다. 해당 절은 앞서 Docker를 통한 RabbitMQ 실행을 전제로 하고 있다.
implementation 'org.springframework.boot:spring-boot-starter-amqp'
Spring AMQP 디펜던시를 프로젝트 생성 시 또는 수동으로 추가한다. spring-boot-starter-amqp에는 RabbitMQ에 대한 내용도 포함되어 있다.
RabbitMQ를 스프링 프로젝트에 적용시키기 위하여 아래와 같은 작업들이 필요하다.
- application.yml에 프로퍼티 값 추가
- RabbitMqProperties 작성
- RabbitMqConfig 작성
- Producer(Publisher) 작성
- 메시지 전달을 위한 Chat DTO 작성
- Consumer(Subscriber) 작성
해당 절에서 각 단계별로 실습을 진행한다.
--- # SpringBoot Auto Configuration
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
--- # RabbitMQ Custom Setting
rabbitmq:
exchange-name: sample-exchange
queue-name: sample-queue
routing-key: key
이후 application.yml
파일에 위와 같이 설정값들을 작성해준다. spring.rabbitmq.*
는 SpringBoot에서 제공하는 Auto Configruation에 사용되는 값이다. 여러 포스팅에서 SpringBoot의 Auto Configuration 기능을 사용하지 않고, 직접 해당 설정값들을 바인딩해 사용하는 경우가 많지만 기본적으로 해당 프로퍼티 값만 정의해주면 RabbitMQ와 연결이 진행된다.
또한, 해당 서비스에서 사용할 exchange, queue, routing-key를 설정하기 위해 커스텀 프로퍼티도 정의하여 준다.
RabbitMqProperties
@Component
@ConfigurationProperties(prefix = "rabbitmq")
@Getter @Setter
public class RabbitMqProperties {
private String exchangeName;
private String queueName;
private String routingKey;
}
RabbitProperties
에서는 application.yml에서 설정한 커스텀 프로퍼티(Exchange, Queue, Routing-key의 이름)들을 바인딩받는다.
RabbitMqConfig
@Configuration
@RequiredArgsConstructor
public class RabbitMqConfig {
private final RabbitMqProperties rabbitMqProperties;
/**
* Queue를 등록
* - Queue의 이름과 Durability 속성을 인자로 받아 Queue 객체 생성
*/
@Bean
public Queue queue() {
return new Queue(rabbitMqProperties.getQueueName(), true);
}
/**
* Direct Exchange를 등록
* - Exchange의 이름을 인자로 받아 DirectExchange 객체 생성
* - 기본적으로 Durability는 true
*/
@Bean
public DirectExchange directExchange() {
return new DirectExchange(rabbitMqProperties.getExchangeName());
}
/**
* Queue-DirectExchange를 연결
*
* @param queue 연결 대상 Queue
* @param directExchange 연결 대상 Exchange
*/
@Bean
public Binding binding(Queue queue, DirectExchange directExchange) {
return BindingBuilder.bind(queue) // 바인딩할 Queue
.to(directExchange) // 바인딩할 exchange
.with(rabbitMqProperties.getRoutingKey()); // Queue-Exchange 바운딩에 사용되는 라우팅 키
}
/**
* RabbitMQ로 메시지 발행을 위한 RabbitTemplate 등록
* - 메시지 변환을 위해 Jackson2JsonMessageConverter 설정
*
* @param connectionFactory RabbitMQ 연결을 위한 ConnectionFactory - Auto Configuration에 의해 이미 등록
*/
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter()); // 이걸 굳이 ? 그냥 new 키워드 넣어도 되는거 아닌가.
return rabbitTemplate;
}
/**
* Listener 생성을 위한 SimpleListenerContainerFactory 등록
*
* @param connectionFactory RabbitMQ 연결을 위한 ConnectionFactory - Auto Configuration에 의해 이미 등록
*/
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(jackson2JsonMessageConverter());
factory.setConcurrentConsumers(2);
factory.setMaxConcurrentConsumers(4);
factory.setPrefetchCount(5);
return factory;
}
/**
* 메시지 컨버터
*/
@Bean
public MessageConverter jackson2JsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
}
RabbitMqConfig
에서는 RabbitMQ 연결 및 사용을 위한 각종 Bean을 등록한다.
우선 Queue
와 DirectExchange
를 등록하고, 이를 연결할 Binding
객체도 등록한다.

RabbitMQ 콘솔에서도 확인할 수 있듯이 설정한 이름으로 새로운 Exchange와 Queue가 등록된 것을 알 수 있다.
그리고, RabbitMQ로 메시지 발행(Produce, Publish)를 위한 RabbitTemplate
를 등록한다. 메시지 변환을 위해서 MessageConveter
의 구현체를 사용하여야 하기 때문에 Jackson2JsonMessageConverter
빈을 등록하여 메시지 컨버터로 설정한다.
스프링 상에서 메시지 소비자(Consumer)의 역할을 수행하기 위해 @RabbitListener
어노테이션을 통해서 메시지 리스너를 설정할 수 있다. 이 때, 메시지 리스너가 생성되는 설정을 정의하는 SimpleRabbitListenerContainerFactory
를 등록하여야한다. 인자로 받는 ConnectionFactory
는 SpringBoot Auto Configuration에 의해서 기본적으로 등록된 ChachingConnectionFactory
Bean을 사용하게 된다.
SpringBoot에서 지원하는 Auto Configuration 기능에 의해 직접 ConnectionFactory
를 Bean으로 등록하지 않은 경우 CachingConnectionFactory
가 Bean으로 등록되게 된다.
connectionFactory
와 messageConvert
를 설정하고, Listener 당 최소 Consumer 수와 최대 Consumer 수를 설정한다. Prefetch는 Consumer가 한 번에 Queue에서 가지고올 수 있는 메시지의 양을 의미한다.
앞서, AMQP의 개념에서 설명하였을 때는 Consumer 밖에 없었지만, 스프링에서는 Listener라는 개념이 추가되었다. Listener는 Consumer를 가지고 있는 형태로 각 리스너 당 설정한 만큼의 Consumer를 가지고 있게 된다. 이는 차후 애플리케이션 실행 후 쓰레드 로그와 시스템 다이어그램을 통해 추가적으로 설명한다.
RabbitMqProducer
@Component
@RequiredArgsConstructor
public class RabbitMqProducer {
private final RabbitTemplate rabbitTemplate;
private final RabbitMqProperties rabbitMqProperties;
/**
* 메시지를 RabbitMQ로 발행(전달)
* - Exchange, Routing Key, 메시지를 인자로 전달
*
* @param message Object -> JSON 으로 직렬화
*/
public void send(Object message) {
rabbitTemplate.convertAndSend(
rabbitMqProperties.getExchangeName(), rabbitMqProperties.getRoutingKey(), message
);
}
}
RabbitMqProducer
는 스프링에서 메시지 브로커로 메시지를 발행하는 기능을 담당하는 클래스이다. 앞서 Bean으로 등록한 RabbitTemplate
의 convertAndSend()
메서드를 사용하여 메시지를 발행한다.
Exchange마다 여러 Routing Key를 통해 여러 Queue와 바인딩 될 수 있다. 따라서, 메시지 발행 시에 Exchange name과 Routing Key를 전달하여야한다.
Chat
public record Chat (
String username,
String message
) {
}
RabbitMqSubscriber
@Slf4j
@Component
@RequiredArgsConstructor
public class RabbitMqSubscriber {
/**
* 메시지 브로커(Queue)에서 전달받은 메시지를 처리
* - @RabbitListener 어노테이션
* - queues 메시지를 전달받을 Queue의 이름
* - containerFactory Listener를 생성할 containerFactory Bean의 이름
*
* @param chat 전달받은 메시지
*/
@RabbitListener(queues = "${rabbitmq.queue-name}", containerFactory = "rabbitListenerContainerFactory")
public void handleMessage(Chat chat) {
log.info("{} - [{}] {}", Thread.currentThread().getName(), chat.username(), chat.message());
}
}
RabbitMqSubscriber
는 Consumer의 역할을 담당한다. 그러나, 정확하게는 각 Listener 내에 Consumer가 위치하기 때문에 해당 클래스명은 Subscriber로 사용한다.
RabbitMQController
@Slf4j
@RestController
@RequiredArgsConstructor
public class RabbitMqController {
private final RabbitMqMessagePublisher rabbitMqMessagePublisher;
@PostMapping("/send")
public ResponseEntity<?> send(@RequestBody Chat chat) {
rabbitMqMessagePublisher.send(chat);
return ResponseEntity.ok().build();
}
}
테스트를 위해 POST/send
요청 시 본문의 Chat
을 메시지 브로커로 발행한다.
테스트
curl -X POST http://localhost:8080/send \
-H "Content-Type: application/json" \
-d '{"username": "kim", "message": "hello"}' &
curl -X POST http://localhost:8080/send \
-H "Content-Type: application/json" \
-d '{"username": "lee", "message": "hello"}' &
curl -X POST http://localhost:8080/send \
-H "Content-Type: application/json" \
-d '{"username": "park", "message": "hello"}'
위 테스트 결과에서 알 수 있듯이, Listener인 RabbitListenerContainer
를 통해서 메시지가 전달된 것을 알 수 있다.
현재는 위와 같이 단일 서버(애플리케이션)에서 Producer와 Consumer의 역할을 모두 수행하고 있다. Producer와 Consumer를 동일한 서버에서 처리하는 경우와 분리하는 경우에 대한 부하 테스트는 Decoupling 서비스를 통한 부하 분산에서 자세히 서술한다.
RabbitListenerContainerFactory
RabbitListenerContainerFactory
는 이름에서 알 수 있듯이 RabbitListener(Container)를 생성하는 역할을 담당하는 클래스이다. 앞서 RabbitMqConfig
클래스에서 해당 팩토리 객체를 Bean으로 등록하였다. 또한, @RabbitListener
어노테이션을 사용해 리스너를 생성할 때 containerFactory
를 지정해주었다.
// RabbitMqConfig
/**
* Listener 생성을 위한 SimpleListenerContainerFactory 등록
*
* @param connectionFactory RabbitMQ 연결을 위한 ConnectionFactory - Auto Configuration에 의해 이미 등록
*/
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(jackson2JsonMessageConverter());
factory.setConcurrentConsumers(2);
factory.setMaxConcurrentConsumers(4);
factory.setPrefetchCount(5);
return factory;
}
@Slf4j
@Component
@RequiredArgsConstructor
public class RabbitMqMessageSubscriber {
/**
* 메시지 브로커(Queue)에서 전달받은 메시지를 처리
* - @RabbitListener 어노테이션
* - queues 메시지를 전달받을 Queue의 이름
* - containerFactory Listener를 생성할 containerFactory Bean의 이름
*
* @param chat 전달받은 메시지
*/
@RabbitListener(queues = "${rabbitmq.queue-name}", containerFactory = "rabbitListenerContainerFactory")
public void handleMessage(Chat chat) {
log.info("{} - [{}] {}", Thread.currentThread().getName(), chat.username(), chat.message());
}
}
즉, handleMessage(Chat chat)
메서드에 붙은 어노테이션에 의해 생성되는 리스너는 rabbitListenerContainerFactory
Bean에 의해 생성된다.
따라서, Spring Autoconfiguration에 의해 설정한 CachingConnectionFactory
를 사용해 RabbitMQ와 연결되고, 메시지 변환에 Jacknson2JsonMessageConverter
를 사용하며, 리스너 당 기본 Consumer 수는 2개, 최대 Consumer 수는 4개이다. 또한, 한 번에 큐에서 가져오는 메시지양은 5개이다.
이 구조를 자세히 표현하면 위 다이어그램과 같다.
스프링에서 각 리스너의 Consumer는 별도의 쓰레드로 관리된다. 즉, 해당 쓰레드를 통해서 메시지 브로커에서 전달된 메시지를 소비하게 된다. 이를 확인하기 위해 RabbitMqSubscriber
에 다른 리스너들을 정의한다.
@Slf4j
@Component
@RequiredArgsConstructor
public class RabbitMqMessageSubscriber {
@RabbitListener(queues = "${rabbitmq.queue-name}", containerFactory = "rabbitListenerContainerFactory")
public void handleMessage(Chat chat) {
log.info("{} - [{}] {}", Thread.currentThread().getName(), chat.username(), chat.message());
}
@RabbitListener(queues = "${rabbitmq.queue-name}", containerFactory = "rabbitListenerContainerFactory")
public void handleMessage(Chat chat) {
log.info("{} - [{}] {}", Thread.currentThread().getName(), chat.username(), chat.message());
}
@RabbitListener(queues = "${rabbitmq.queue-name}", containerFactory = "rabbitListenerContainerFactory")
public void handleMessage(Chat chat) {
log.info("{} - [{}] {}", Thread.currentThread().getName(), chat.username(), chat.message());
}
}
현재 3개의 리스너를 정의하였다. RabbitListenerContainerFactory
Bean 설정에 따라 최소 2개, 최대 4개의 consumer를 가지게 된다.
위 로그에서 확인할 수 있듯이 xxxContainer# {Listener 인덱스}-{Consumer 번호}
로 쓰레드가 생성되어 메시지를 소비하게 된다. 3개의 리스너가 정의되어 있으므로 컨테이너 인덱스는 #0
, #1
, #2
, 최대 Consumer 수가 4개이므로 -1
, -2
, -3
, -4
로 나타나게 된다.
Listener Concurrency
Spring AMQP 공식문서에 따르면, SimpleMessageListenerContainer
는 Single Queue로 시작하나, 앞서 우리가 설정한 것과 같이 ListenerContainerFactory에서 ListenerContainer 생성 옵션을 설정할 수 있다고 명시되어 있다.
Spring AMQP 1.3.0 버전 이후 Listener Concurrency
가 도입되며 동적으로 Consumer 수를 조절할 수 있게 되었다. ‘자동 스케일링’ 기능을 통해 부하량(workload)에 따라 동적으로 Consumer의 수를 조절할 수 있다.
- Consumer가 10번 연속 활성화된 경우
- Consumer가 시작 이후 10초가 지난 경우
Consumer의 수가 maxConcurrentConsumers
에 도달하지 않은 경우 위 조건을 만족하면 Consumer의 수가 증가하게 된다. 활성 상태라는 것은 batchSize * recieveTimeout
milliseconds 동안 하나의 메시지라도 수신한 경우를 칭한다.
- Consumer가 10번 연속 Timeout된 경우
- Consumer가 정지 후 최소 60초가 초과된 경우
Consumer의 수가 concurrentConsumers
보다 많으면서 위 조건을 만족한 경우 Consumer의 수가 감소하게 된다.
Performance Test
이제 RabbitMQ를 적용한 서비스의 성능 테스트를 진행하고자 한다.
많은 블로그 등에서 RabbitMQ를 도입하여 성능 상의 이점을 얻을 수 있었다는 글은 보았다. 그러나, Producer-Consumer를 분리하여 얻은 성능 상의 이점이 아니라, 단순히 스프링 내에서 비동기 쓰레드 작업이 이루어지는 경우에 반해 RabbitMQ를 사용하여 불필요한 컨텍스트 스위칭이나 프로세스의 점유율이 감소하기 때문에 발생하는 CPU 사용량 감소 현상이라 생각한다.
RabbitMQ 공식 홈페이지에 따르면 다음과 같은 사용 사례를 언급하고 있다.
Decoupling Service란, Producer와 Consumer를 별도의 서버로 분리하여 부하(load spike)를 분산시키는 방법이다.
여러 인스턴스를 실행시켜 디커풀링 서비스를 테스트하는 것에는 제한이 있어, 로컬에서 RabbitMqSubscriber
만 정의한 다른 스프링 애플리케이션을 8081
포트로 실행시켜 테스트를 진행한다.
Producer-Consumer를 분리한 테스트 환경은 위와 같다. 테스트 시에는 환경에 제약이 있어 위와 같이 단일 PC 내에 2개의 스프링 애플리케이션을 실행하여 테스트를 진행하였지만, RabbitMQ는 클라이언트의 플랫폼 독립성을 보장하기 때문에 Spring 뿐만 아니라 기타 서비스를 도입하여도 무방하다. 또한, 분산 환경에서는 단일 PC가 아닌 여러 서버에서 RabbitMQ를 통한 메시지 전달이 가능할 것이다.
동일 서버 내 Producer-Consumer 배치 | 별도 서버에 Producer-Consumer 분리 |
![]() |
![]() |
33.3% |
17.5% |
Decoupling Service(별도 서버에 Producer-Consumer 분리)의 경우에는 Producer를 포함하는 애플리케이션의 CPU 사용량만 측정하였다.
결과적으로 Decoupling Service에서 약 47.45%의 성능 개선이 가능하였다.
최근 많이 언급되는 MSA와 같은 분산 환경에서 Consumer를 별도의 서버로 구성하여 비동기 작업에 대한 서버 부하를 분산시킬 수 있는 효과를 기대할 수 있을 것이다.
Summary
이메일 전송 등과 같이 시간이 오래 걸리는 작업은 비동기 처리를 통하여 사용자 응답 시간을 개선할 필요가 있다. 프로모션 메일 전송이나 전체 알림과 같은 서비스들은 대량의 데이터가 전송되어야하는 작업이다. 해당 작업들이 서버 내에서 단순 비동기 작업으로 처리할 경우 애플리케이션의 CPU 사용량 증가, 메모리 과다 사용, 쓰레드 과다 사용과 같은 문제가 발생할 수 있다. RabbitMQ와 같은 MOM을 도입하여 메시징 작업을 서버 애플리케이션과 분리할 경우, 성능 및 장애 분리와 같은 이점을 얻을 수 있을 것이다. 또한, Producer와 Consumer를 분리한 Decoupling Service를 도입함으로써 장애 분리, 서버 부하 분산이라는 이점과 함께, 여러 종류의 클라이언트들을 통해 메시지 소비도 가능할 것이다.
해당 포스팅에서 AMQP의 기본 개념과 RabbitMQ를 통해 서버 애플리케이션 부하를 분산시키는 실습까지 진행하여보았다. 그러나, 앞서 언급한 장애 분리의 개념은 적용해보지 않았다. 오류로 인하여 소비자에게 제대로 전달되지 않은 메시지(dead-letter)의 경우 dlx(dead-letter-exchange), dlq(dead-letter-queue)를 통하여 재시도 로직을 수행할 수 있다. 해당 내용은 차후 포스팅에서 다뤄볼 예정이다.