RabbitMQ

RabbitMQ 란? 주요 용어, Round-Robin, Fair dispatch, Acknowledgment

솔솔 2024. 1. 21. 22:27
반응형

사전적 정의

  • 오픈 소스 메시지 브로커 소프트웨어 (메시지 지향 미들웨어)로서, AMQP(Advanced Message Queuing Protocol)를 구현하였으며 그 이후로 STOMP, MQTT 등의 프로토콜을 지원하기 위해 플러그인 구조와 함께 확장되고 있다.
  • 메시지를 생산하는 생산자(Producer)가 메시지를 큐에 저장해 두면, 메시지를 수신하는 소비자(Consumer)가 메시지를 가져와 처리하는 Publish/Subscribe 방식의 메시지 전달 브로커이다.

 

RabbitMQ 란?

  • AMQP를 구현한 오픈 소스 메시지 브로커
  • AMQP 란?
    • Advanced Message Queuing Protocol로 client application과 미들웨어 브로커와의 메시지를 주고받기 위한 프로토콜, Message Queue의 오픈소스에 기반한 표준 프로토콜을 의미
  • 메시지 브로커 란?
    • 메시지를 받아 적절히 처리하고 나면 즉시 or 짧은 시간 내에 메시지가 삭제되는 구조
    • 예 ) Redis Queue, RabbitMQ
  • 이벤트 브로커 란?
    • 이벤트 or 메시지라고 불리는 레코드를 딱 한 개만 저장하고 인덱스로 관리, 업무상 필요한 시간 동안 이벤트 보존 가능
    • 예 ) Kafka, AWS의 키네시스
  •  RabbitMQ는 메시지 브로커 서비스로, 불특정 다수의 사용자가 다른 사람에게 메시지를 전달하려고 메시지 브로커에 메시지를 전달하면 메시지 브로커가 메시지를 받아서 보관하고 있다가 해당 메시지를 받으려는 사람이 보관한 메시지를 꺼내가는 형태의 서비스
  • 쉬운 예로 우체통이나 편지함을 생각해보자. 편지를 보내려는 사람이 편지를 써서 우체통에 넣으면 받는 사람이 우체통에서 편지를 꺼내가는 구조
    • 보내는 사람 = 메시지를 보내는 사람
    • 받는 사람 = 메시지를 꺼내는 사람
    • 우체통 = 메시지 브로커
  • RabbitMQ 이외에도 kafka, zeroMQ, activeMQ와 같은 많은 종류의 메시지 큐가 존재
  • Producer로부터 메시지를 받아 Consumer에게 라우트 하는 것이 주된 역할
  • 많은 사용자에게 전달하거나, 요청에 대한 처리 시간이 길 때 해당 요청을 다른 API에게 위임하고 빠른 응답을 할 때 많이 사용
  • Message Queue를 사용하여 애플리케이션 간 결합도를 낮출 수 있음
  • 작업 큐, 발행 및 구독, 라우팅, 원격 프로시저 호출 등의 모델을 구현
  • RabbitMQ는 channel이라는 개념을 통해 하나의 TCP 연결을 공유해서 사용할 수 있는 기능을 제공한다. 하지만 멀티 스레드, 멀티 프로세스를 사용하는 작업에서는 각각 별도의 Channel을 열고 사용하는 것이 바람직하다.

 

RabbitMQ를 사용해야하는 이유?

  • 서버 - 클라이언트 간 양방향 통신
  • 비동기처리를 위한 메시지큐 브로커
  • 분산처리를 고려한 MQ (Cluster, Federation)
  • 고가용성 보장 (High Avaliabilty)
  • 메시지를 받는 Consumer 서버가 죽더라도 메시지가 사라지지 않고, 큐에 저장되어 있기 때문에 서버가 다시 살아나면 큐를 읽어들이면서 이벤트 처리 가능
  • 그렇기 때문에 일부가 실패할 경우, 전체에 영향을 받지 않고 메시지 순서가 보장되며 누락될 위험이 거의 없다.
  • 큐에 들어오는 메시지를 별도로 관리할 수 있으며, 한눈에 볼 수 있는 실시간 그래프 등 확인이 가능해 관리에 용이
  • 다양한 언어 지원으로 개발 라이브러리를 제공한다. (ex. Java, C#, JavaScript, node.js, erlang 등)
  • RabbitMQ 서버의 안정성과 동시성이 매우 뛰어나다.
  • 작업이 처리된 후 데이터를 삭제한다.
  • 초당 수만 건의 메시지는 큰 문제 없이 전송 가능하다.

 

RabbitMQ 기본적인 흐름

 

  1. Producer가 메시지를 생성하여 전송
  2. Exchange가 어떤 Queue에 전달할지 Routing
  3. Queue들이 메시지를 순차적으로 쌓음
  4. Consumer가 Queue에 대한 Binding을 가지고 있다가 메시지를 Queue에서 수신

 

주요 용어

  • Producer
    • 메시지를 생성하고 발송하는 주체
    • 메시지의 내용은 Queue에 저장되는데 Queue에 직접 접근하지 않고, 항상 Exchange를 통해 접근
  • Publish
    • Producer가 메시지를 보냄
  • Exchange
    • Producer들에게서 전달받은 메시지들을 어떤 Queue들에게 발송할지 결정하는 객체
    • 메시지가 Queue에 직접 전달되지 않고, exchange type이라는 속성으로 전달
    • 일종의 라우터 개념으로 4가지의 타입 존재
    • 특정 Queue에 보낼지, 여러 Queue에 보낼지, 아니면 그냥 제거될지 등을 선택
Exchange 종류
- Direct Exchange : 지정된 RoutingKey를 가진 Queue에만 메시지를 전달
- Fanout Exchange : 알려진 모든 Queue에 메시지를 전달
- Topic Exchange : 지정된 패턴 바인딩에 일치하는 Queue에만 메시지 전달
- Headers Exchange : 헤더에 포함된 key=value의 일치조건에 따라 메시지 전달

 

  •  
  • Binding
    • Exchange와 Queue를 연결해주는 것
    • 생성한 Exchange를 Queue에 매핑을 해줘야 exchange가 메시지를 받으면 Queue에 메시지를 보낼 수 있는데 이를 Binding이라 한다.
    • Exchange에게 메시지를 라우팅할 규칙을 지정하는 행위로 Binding이 되어있어야 Exchange가 Queue에게 메시지를 전달 할 수 있다.
    • 특정 조건에 맞는 메시지를 특정 큐에 전송하도록 설정할 수 있는데, 해당 Exchange 타입에 맞게 설정되어야함 ( Exchange : Queue ⇒ m : n binding 가능 (Multiple binding))
channel.queueBind(queueName, EXCHANGE_NAME, routingKey)

* Multiple Binding일 경우, RoutingKey로 binding 처리하면 된다.
	channel.queueBind(queueName, EXCHANGE_NAME, routingKey1);
	channel.queueBind(queueName, EXCHANGE_NAME, routingKey2);

queueName : 생성한 Queue Name
exchangeName : 생성한 exchangeName
routingKey : routingKey
  • Routing
    • Exchange가 Queue에 메시지를 전달하는 과정
    • 모든 메시지를 받는 것이 아니라 특정 종류의 메시지를 받을 수 있도록 멀티캐스트
  •  
  • RoutingKey
    • Exchange와 Queue가 Binding될 때 Exchange가 Queue에 메시지를 전달할지 결정하는 기준
    • exchange type = direct인 경우, RoutingKey값이 Binding시 설정된 RoutingKey값과 일치
    • exchange type = topic인 경우, RoutingKey값이 Binding시 설정된 패턴에 매칭
  • Queue
    • 메시지를 저장하는 버퍼
    • Exchange는 Producer로부터 전달받은 메시지를 Binding 하여 Queue에게 전달한다.
    • 이때, Queue는 전달받은 메시지들을 Consumer에게 전달하며, Queue는 반드시 미리 정의해야 사용할 수 있다. Queue의 메시지는 공평하게 전달하기 위해 라운드로빈(RR)로 전달된다.
    • Producer들이 발송한 메시지들이 Consumer가 소비하기 전까지 보관되는 장소
    • Queue는 이름으로 구분되는데 같은 이름과 같은 설정으로 생성하면 에러 없이 기존 Queue에 연결되지만, 같은 이름 다른 설정으로 생성할 경우 에러가 발생한다.
    • Queue는 Consumer들에게 전달되기 전에 메모리나 디스크에 저장해놓는다.
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

* queueDeclare : Queue를 선언하는 메서드
	* String queue : queue 이름
		=> Queue이름은 각자 생성해도 되고, channel.queueDeclare().getQueue()로 랜덤 생성가능
	* boolean durable : 서버 재시작에도 살아남을 튼튼한 큐로 선언할 것 인지 여부 (내구성)
		=> 해당 큐를 생성할 때 내구성을 선택하면 이후에 수정할 수 없음
		=> publish와 consume가 동일해야한다.
	* boolean exclusive : 현재의 연결에 한정되는 베타적인 큐로 선언할 것인지 여부, 하나의 연결에서만 사용되며 해당 연결이 닫힐 때 대기열이 삭제됨
	* boolean autoDelete : 사용되지 않을 때 서버에 의해 자동 삭제되는 큐로 선언할 것인지 여부
	* Map<String,Object> arguments : 큐를 구성하는 다른 속성 (옵션, 메시지 TTL, Max Length 등과 같은 브로커별 추가 기능 및 플러그인에서 사용)
  • Consumer
    • 메시지를 수신하는 주체로, Queue에 직접 접근하여 메시지를 가져옴
    • 동일 업무를 처리하는 Consumer는 보통 하나의 Queue를 바라본다. (중복방지)
    • 동일 업무를 처리하는 Consumer가 여러개인 경우, 같은 Queue를 바라보게 하면 자동으로 메시지를 분배하여 전달한다. (RoundRobin)
    • Message Broker와 많은 연결을 맺는 것은 바람직하지 않다.
  • Subscribe
    • Consumer가 메시지를 수신하기 위해 Queue를 실시간으로 리스닝하도록 만듦.

 

메시지 분배 ( Round-Robin dispatching )

  • RabbitMQ는 Consumer가 병렬처리를 쉽게 할 수 있도록 같은 Queue를 바라보고 있는 Consumer에게 메시지를 균등 분배한다.
  • 즉, 첫번째 메시지는 Consumer1에게 두번째 메시지는 Consumer2에게 분배 (중복 처리 하지 않도록 1명에게만 전달)
  • 같은 Queue에 메시지를 전달해주기만 하면 되기 때문에 수평확장이 가능하다.

 

 

공평한 분배 ( Fair dispatch )

  • 하나의 Queue에 여러 Consumer가 존재할 경우, Queue는 기본적으로 라운드 로빈 방식으로 메시지를 분배한다.
  • 여러 Consumer에게 번갈아가면서 메시지를 전달하지만 데이터 크기 등에 의해 완전히 공평할 수는 없다. 그렇기 때문에 Queue에 업무를 할당하다보면 한쪽에 몰릴 수가 있다.
  • 이를 방지하기 위해 Prefetch Count 설정으로 Consumer에게 한번에 하나 이상의 메시지를 주지 않도록 지시할 수 있다.
  • 즉, 이전 메시지를 처리하고 승인(ack)할 때 까지 해당 Consumer에게 새 메시지를 발송하지 않고 다른 Consumer에게 전송한다.
  • Prefetch Count는 동시에 보내지는 메시지 양이다.
int prefetchCount = 1;
channel.basicQos(prefetchCount);

 

 

메시지 수신 통보 ( Acknowledgment )

  • 메시지에 대한 응답
  • RabbitMQ에서 Producer가 MQ에 Message를 전송한 다음 MQ로 부터 ACK를 받는 기법을 Producer Confirm라고 명칭한다. 이와 유사하게 MQ가 Consumer에게 Message를 전송한 다음 Consumer로 부터 ACK를 받는 기법을 Consumer Acknowledgement라고 명칭한다.
  • ACK 기법은 Message가 최소 한번 이상은 전달되는 것을 보장한다.
  • Message를 전송 후 ACK를 받지 못하면 동일한 Message를 재전송하기 때문에 수신자는 동일한 Message를 2번 이상 받을 수 있다.

 

서버 재기동으로 인한 Message Queue 및 Message 보존

  • 어떤 이유에서든지 RabbitMQ 서버 종료 후 재기동 되면 기본적으로 기존의 Queue는 모두 날아가버린다. 이런 상황을 방지하기 위해 durable 을 사용하면 된다.
  • Message Durability
    • Queue 생성시 durable 속성을 true 로 선언
    • 해당 Queue 생성시 내구성을 선택하면 추후 수정할 수 없음
    • publish와 consumer의 설정이 동일해야한다.
    • Producer 메시지를 전송할 때 MessageProperties.PERSISTENT_TEXT_PLAN 옵션 설정

 

RPC 원격 프로시저 호출

  • RabbitMQ는 Request-Response로 Client와 Server를 이어주기 위해 RPC 라는 개념으로 기능을 제공한다.
  • Client의 request를 Server에 전달하고, Server가 처리한 결과를 알맞은 Client 에 응답으로 전달
1. Producer 작업
    String replyQueueName = channel.queueDeclare().getQueue();
    String requestQueueName= "rpc_queue";

    BasicProperties props = new BasicProperties
                .Builder()
                .correlationId(corrId)
                .replyTo(replyQueueName)
                .build();
    * BasicProperties : 자동 생성된 소유자 클래스의 내부 클래스인 AMQP
    * 속성은 다양하지만 대부분 아래의 속성만 사용
        * deliveryMode : 메시지를 지속적 또는 일시적으로 표시
	* contentType : 인코딩의 MIME 유형 설정 > ex) application/json
	* replyTo : Callback Queue의 이름을 지정
	* correlationId : RPC 응답을 요청과 상관 시키는데 유용
		=> 응답을 받았지만 응답이 어느 요청에 속하는지 명확하지 않은 새로운 문제가 발생해서 correlationId로 고유한 값 설정

    channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));

 

 

  • RPC의 이점
    •  서버처리
      • 서버 처리속도가 느려서 성능을 증가시키려고 할 때, 균등분배로 이루어지기 때문에 RPC 서버를 하나 더 두고 같은 Request Queue를 바라보게 하면 된다.
    • Client
      • 임시 Queue를 생성해서 client마다 다른 queue를 사용하기 때문에 하나의 메시지를 개별 Round Trip으로 처리를 위해 queueDeclare같은 동기처리 요청이 필요없다.
  • RPC 구성시 고려할 점
    • 돌아가는 서버가 없을 경우 client 처리
    • 요청 timeout시 client 처리
    • 서버 exception이나 오동작시 client에게 이를 어떻게 전달할지
    • invalid한 데이터가 서버로 전달 되었을 때의 처리
반응형