반응형
사전적 정의
- 오픈 소스 메시지 브로커 소프트웨어 (메시지 지향 미들웨어)로서, AMQP(Advanced Message Queuing Protocol)를 구현하였으며 그 이후로 STOMP, MQTT 등의 프로토콜을 지원하기 위해 플러그인 구조와 함께 확장되고 있다.
- 메시지를 생산하는 생산자(Producer)가 메시지를 큐에 저장해 두면, 메시지를 수신하는 소비자(Consumer)가 메시지를 가져와 처리하는 Publish/Subscribe 방식의 메시지 전달 브로커이다.
RabbitMQ 란?
- AMQP를 구현한 오픈 소스 메시지 브로커
- AMQP 란?
- Advanced Message Queuing Protocol로 client application과 미들웨어 브로커와의 메시지를 주고받기 위한 프로토콜, Message Queue의 오픈소스에 기반한 표준 프로토콜을 의미
- 메시지 브로커 란?
- 메시지를 받아 적절히 처리하고 나면 즉시 or 짧은 시간 내에 메시지가 삭제되는 구조
- 예 ) Redis Queue, RabbitMQ
- 이벤트 브로커 란?
- 이벤트 or 메시지라고 불리는 레코드를 딱 한 개만 저장하고 인덱스로 관리, 업무상 필요한 시간 동안 이벤트 보존 가능
- 예 ) Kafka, AWS의 키네시스
- RabbitMQ는 메시지 브로커 서비스로, 불특정 다수의 사용자가 다른 사람에게 메시지를 전달하려고 메시지 브로커에 메시지를 전달하면 메시지 브로커가 메시지를 받아서 보관하고 있다가 해당 메시지를 받으려는 사람이 보관한 메시지를 꺼내가는 형태의 서비스
- 쉬운 예로 우체통이나 편지함을 생각해보자. 편지를 보내려는 사람이 편지를 써서 우체통에 넣으면 받는 사람이 우체통에서 편지를 꺼내가는 구조
- 보내는 사람 = 메시지를 보내는 사람
- 받는 사람 = 메시지를 꺼내는 사람
- 우체통 = 메시지 브로커
- RabbitMQ 이외에도 kafka, zeroMQ, activeMQ와 같은 많은 종류의 메시지 큐가 존재
- Producer로부터 메시지를 받아 Consumer에게 라우트 하는 것이 주된 역할
- 많은 사용자에게 전달하거나, 요청에 대한 처리 시간이 길 때 해당 요청을 다른 API에게 위임하고 빠른 응답을 할 때 많이 사용
- Message Queue를 사용하여 애플리케이션 간 결합도를 낮출 수 있음
- 작업 큐, 발행 및 구독, 라우팅, 원격 프로시저 호출 등의 모델을 구현
- RabbitMQ는 channel이라는 개념을 통해 하나의 TCP 연결을 공유해서 사용할 수 있는 기능을 제공한다. 하지만 멀티 스레드, 멀티 프로세스를 사용하는 작업에서는 각각 별도의 Channel을 열고 사용하는 것이 바람직하다.
RabbitMQ를 사용해야하는 이유?
- 서버 - 클라이언트 간 양방향 통신
- 비동기처리를 위한 메시지큐 브로커
- 분산처리를 고려한 MQ (Cluster, Federation)
- 고가용성 보장 (High Avaliabilty)
- 메시지를 받는 Consumer 서버가 죽더라도 메시지가 사라지지 않고, 큐에 저장되어 있기 때문에 서버가 다시 살아나면 큐를 읽어들이면서 이벤트 처리 가능
- 그렇기 때문에 일부가 실패할 경우, 전체에 영향을 받지 않고 메시지 순서가 보장되며 누락될 위험이 거의 없다.
- 큐에 들어오는 메시지를 별도로 관리할 수 있으며, 한눈에 볼 수 있는 실시간 그래프 등 확인이 가능해 관리에 용이
- 다양한 언어 지원으로 개발 라이브러리를 제공한다. (ex. Java, C#, JavaScript, node.js, erlang 등)
- RabbitMQ 서버의 안정성과 동시성이 매우 뛰어나다.
- 작업이 처리된 후 데이터를 삭제한다.
- 초당 수만 건의 메시지는 큰 문제 없이 전송 가능하다.
RabbitMQ 기본적인 흐름
- Producer가 메시지를 생성하여 전송
- Exchange가 어떤 Queue에 전달할지 Routing
- Queue들이 메시지를 순차적으로 쌓음
- Consumer가 Queue에 대한 Binding을 가지고 있다가 메시지를 Queue에서 수신
주요 용어
- Producer
- 메시지를 생성하고 발송하는 주체
- 메시지의 내용은 Queue에 저장되는데 Queue에 직접 접근하지 않고, 항상 Exchange를 통해 접근
- Publish
- Producer가 메시지를 보냄
- Exchange
- Producer들에게서 전달받은 메시지들을 어떤 Queue들에게 발송할지 결정하는 객체
- 메시지가 Queue에 직접 전달되지 않고, exchange type이라는 속성으로 전달
- 일종의 라우터 개념으로 4가지의 타입 존재
- 특정 Queue에 보낼지, 여러 Queue에 보낼지, 아니면 그냥 제거될지 등을 선택
Exchange 종류
- Direct Exchange : 지정된 RoutingKey를 가진 Queue에만 메시지를 전달
- Fanout Exchange : 알려진 모든 Queue에 메시지를 전달
- Topic Exchange : 지정된 패턴 바인딩에 일치하는 Queue에만 메시지 전달
- Headers Exchange : 헤더에 포함된 key=value의 일치조건에 따라 메시지 전달
- Binding
- Exchange와 Queue를 연결해주는 것
- 생성한 Exchange를 Queue에 매핑을 해줘야 exchange가 메시지를 받으면 Queue에 메시지를 보낼 수 있는데 이를 Binding이라 한다.
- Exchange에게 메시지를 라우팅할 규칙을 지정하는 행위로 Binding이 되어있어야 Exchange가 Queue에게 메시지를 전달 할 수 있다.
- 특정 조건에 맞는 메시지를 특정 큐에 전송하도록 설정할 수 있는데, 해당 Exchange 타입에 맞게 설정되어야함 ( Exchange : Queue ⇒ m : n binding 가능 (Multiple binding))
channel.queueBind(queueName, EXCHANGE_NAME, routingKey)
* Multiple Binding일 경우, RoutingKey로 binding 처리하면 된다.
channel.queueBind(queueName, EXCHANGE_NAME, routingKey1);
channel.queueBind(queueName, EXCHANGE_NAME, routingKey2);
queueName : 생성한 Queue Name
exchangeName : 생성한 exchangeName
routingKey : routingKey
- Routing
- Exchange가 Queue에 메시지를 전달하는 과정
- 모든 메시지를 받는 것이 아니라 특정 종류의 메시지를 받을 수 있도록 멀티캐스트
- RoutingKey
- Exchange와 Queue가 Binding될 때 Exchange가 Queue에 메시지를 전달할지 결정하는 기준
- exchange type = direct인 경우, RoutingKey값이 Binding시 설정된 RoutingKey값과 일치
- exchange type = topic인 경우, RoutingKey값이 Binding시 설정된 패턴에 매칭
- Queue
- 메시지를 저장하는 버퍼
- Exchange는 Producer로부터 전달받은 메시지를 Binding 하여 Queue에게 전달한다.
- 이때, Queue는 전달받은 메시지들을 Consumer에게 전달하며, Queue는 반드시 미리 정의해야 사용할 수 있다. Queue의 메시지는 공평하게 전달하기 위해 라운드로빈(RR)로 전달된다.
- Producer들이 발송한 메시지들이 Consumer가 소비하기 전까지 보관되는 장소
- Queue는 이름으로 구분되는데 같은 이름과 같은 설정으로 생성하면 에러 없이 기존 Queue에 연결되지만, 같은 이름 다른 설정으로 생성할 경우 에러가 발생한다.
- Queue는 Consumer들에게 전달되기 전에 메모리나 디스크에 저장해놓는다.
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
* queueDeclare : Queue를 선언하는 메서드
* String queue : queue 이름
=> Queue이름은 각자 생성해도 되고, channel.queueDeclare().getQueue()로 랜덤 생성가능
* boolean durable : 서버 재시작에도 살아남을 튼튼한 큐로 선언할 것 인지 여부 (내구성)
=> 해당 큐를 생성할 때 내구성을 선택하면 이후에 수정할 수 없음
=> publish와 consume가 동일해야한다.
* boolean exclusive : 현재의 연결에 한정되는 베타적인 큐로 선언할 것인지 여부, 하나의 연결에서만 사용되며 해당 연결이 닫힐 때 대기열이 삭제됨
* boolean autoDelete : 사용되지 않을 때 서버에 의해 자동 삭제되는 큐로 선언할 것인지 여부
* Map<String,Object> arguments : 큐를 구성하는 다른 속성 (옵션, 메시지 TTL, Max Length 등과 같은 브로커별 추가 기능 및 플러그인에서 사용)
- Consumer
- 메시지를 수신하는 주체로, Queue에 직접 접근하여 메시지를 가져옴
- 동일 업무를 처리하는 Consumer는 보통 하나의 Queue를 바라본다. (중복방지)
- 동일 업무를 처리하는 Consumer가 여러개인 경우, 같은 Queue를 바라보게 하면 자동으로 메시지를 분배하여 전달한다. (RoundRobin)
- Message Broker와 많은 연결을 맺는 것은 바람직하지 않다.
- Subscribe
- Consumer가 메시지를 수신하기 위해 Queue를 실시간으로 리스닝하도록 만듦.
메시지 분배 ( Round-Robin dispatching )
- RabbitMQ는 Consumer가 병렬처리를 쉽게 할 수 있도록 같은 Queue를 바라보고 있는 Consumer에게 메시지를 균등 분배한다.
- 즉, 첫번째 메시지는 Consumer1에게 두번째 메시지는 Consumer2에게 분배 (중복 처리 하지 않도록 1명에게만 전달)
- 같은 Queue에 메시지를 전달해주기만 하면 되기 때문에 수평확장이 가능하다.
공평한 분배 ( Fair dispatch )
- 하나의 Queue에 여러 Consumer가 존재할 경우, Queue는 기본적으로 라운드 로빈 방식으로 메시지를 분배한다.
- 여러 Consumer에게 번갈아가면서 메시지를 전달하지만 데이터 크기 등에 의해 완전히 공평할 수는 없다. 그렇기 때문에 Queue에 업무를 할당하다보면 한쪽에 몰릴 수가 있다.
- 이를 방지하기 위해 Prefetch Count 설정으로 Consumer에게 한번에 하나 이상의 메시지를 주지 않도록 지시할 수 있다.
- 즉, 이전 메시지를 처리하고 승인(ack)할 때 까지 해당 Consumer에게 새 메시지를 발송하지 않고 다른 Consumer에게 전송한다.
- Prefetch Count는 동시에 보내지는 메시지 양이다.
int prefetchCount = 1;
channel.basicQos(prefetchCount);
메시지 수신 통보 ( Acknowledgment )
- 메시지에 대한 응답
- RabbitMQ에서 Producer가 MQ에 Message를 전송한 다음 MQ로 부터 ACK를 받는 기법을 Producer Confirm라고 명칭한다. 이와 유사하게 MQ가 Consumer에게 Message를 전송한 다음 Consumer로 부터 ACK를 받는 기법을 Consumer Acknowledgement라고 명칭한다.
- ACK 기법은 Message가 최소 한번 이상은 전달되는 것을 보장한다.
- Message를 전송 후 ACK를 받지 못하면 동일한 Message를 재전송하기 때문에 수신자는 동일한 Message를 2번 이상 받을 수 있다.
서버 재기동으로 인한 Message Queue 및 Message 보존
- 어떤 이유에서든지 RabbitMQ 서버 종료 후 재기동 되면 기본적으로 기존의 Queue는 모두 날아가버린다. 이런 상황을 방지하기 위해 durable 을 사용하면 된다.
- Message Durability
- Queue 생성시 durable 속성을 true 로 선언
- 해당 Queue 생성시 내구성을 선택하면 추후 수정할 수 없음
- publish와 consumer의 설정이 동일해야한다.
- Producer 메시지를 전송할 때 MessageProperties.PERSISTENT_TEXT_PLAN 옵션 설정
RPC 원격 프로시저 호출
- RabbitMQ는 Request-Response로 Client와 Server를 이어주기 위해 RPC 라는 개념으로 기능을 제공한다.
- Client의 request를 Server에 전달하고, Server가 처리한 결과를 알맞은 Client 에 응답으로 전달
1. Producer 작업
String replyQueueName = channel.queueDeclare().getQueue();
String requestQueueName= "rpc_queue";
BasicProperties props = new BasicProperties
.Builder()
.correlationId(corrId)
.replyTo(replyQueueName)
.build();
* BasicProperties : 자동 생성된 소유자 클래스의 내부 클래스인 AMQP
* 속성은 다양하지만 대부분 아래의 속성만 사용
* deliveryMode : 메시지를 지속적 또는 일시적으로 표시
* contentType : 인코딩의 MIME 유형 설정 > ex) application/json
* replyTo : Callback Queue의 이름을 지정
* correlationId : RPC 응답을 요청과 상관 시키는데 유용
=> 응답을 받았지만 응답이 어느 요청에 속하는지 명확하지 않은 새로운 문제가 발생해서 correlationId로 고유한 값 설정
channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));
- RPC의 이점
- 서버처리
- 서버 처리속도가 느려서 성능을 증가시키려고 할 때, 균등분배로 이루어지기 때문에 RPC 서버를 하나 더 두고 같은 Request Queue를 바라보게 하면 된다.
- Client
- 임시 Queue를 생성해서 client마다 다른 queue를 사용하기 때문에 하나의 메시지를 개별 Round Trip으로 처리를 위해 queueDeclare같은 동기처리 요청이 필요없다.
- 서버처리
- RPC 구성시 고려할 점
- 돌아가는 서버가 없을 경우 client 처리
- 요청 timeout시 client 처리
- 서버 exception이나 오동작시 client에게 이를 어떻게 전달할지
- invalid한 데이터가 서버로 전달 되었을 때의 처리
반응형
'RabbitMQ' 카테고리의 다른 글
RabbitMQ + DockerCompose 실행시키기 _ 환경설정(2) (1) | 2024.01.22 |
---|---|
RabbitMQ + Docker Compose 실행시키기 (1) (1) | 2024.01.18 |