본문 바로가기

Review

RabbitMQ 메시지 Python 에서 송수신하기

RabbitMQ Python

✅ Sending (송신)

#!/usr/bin/env python
import pika

# 유저 ID / PW 등록
credentials = pika.PlainCredentials(
    username='<username>',
    password='<password>'
)

# 브로커와 연결
connection = pika.BlockingConnection(
        pika.ConnectionParameters(
                '<Host IP>',
                credentials=credentials
        )
)
channel = connection.channel()
channel.queue_declare(queue='<큐 이름>')

# 브로커에 메시지 전송
channel.basic_publish(exchange='<exchange 이름>',
                      routing_key='<큐 이름>',
                      body='<보내려는 메시지>')
print(" [x] Sent <보내려는 메시지>"),

✅ Receiving (수신)

import json
import pika
import consts

# 유저 ID / PW 등록
credentials = pika.PlainCredentials(
    username='<username>',
    password='<password>'
)
# 브로커와 연결
connection = pika.BlockingConnection(
    pika.ConnectionParameters(
        host='<host IP>',
        credentials=credentials
    )
)
channel = connection.channel()

# 큐 이름 설정
channel.queue_declare(
    queue='<queue name>'
)

# 데이터가 던져질 때 실행되는 함수
def callback(ch, method, properties, body):
    data = json.loads(body)
    # value01 = data.get('key01')
    # value02 = data.get('key02')
    print(" [x] Received %r" % body)

# auto_ack 를 False 로 하면 정상 수신 후에도 큐에 데이터를 남겨두어
# 명시적으로 ack 를 날리지 않으면 잘 받은 메시지도 큐에서 삭제되지 않아서
# 메모리가 낭비될 수 있다.
channel.basic_consume(on_message_callback=callback,
                      queue='<queue name>',
                      auto_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

✅ 배분 방식

  • 라운드 로빈(RR) 방식:
    • 모든 컨슈머가 돌아가면서 메시지를 수신한다. 평균적으로 각각 같은 개수의 메시지를 받도록 수행된다.
    • 프로듀서가 1, 2, 3, 4, 5 를 보냄
      컨슈머 a, b, c 가 있을 때
      a 는 1, 4 받음
      b 는 2, 5 받음
      c 는 3 받음

✅ Consumer 의 auto_ack 설정

  • ackacknowledgement 의 약자로 일반적으로 전송이 성공했는지 여부를 확인하기 위해 전송되는 패킷을 말한다.
  • auto_ack=true → 수신시 자동으로 큐에서 삭제
  • auto_ack=false → 수신해도 사용자가 수동으로 ack 생성시까지 큐에 들고있음
    (대신 수동 ack 날리기 전까지 다시 받아오기 가능)
  • auto_ackfalse 로 설정했을 때, ack 를 날리지 않으면 큐에 메시지가 계속 쌓이게 되고, 일정 이상 쌓이면 메모리가 터질 수 있다. 🤯

✅ 수동 ack 전송법

# callback 함수 마지막에 ch.basic_ack() 함수를 호출하면 된다.
# 만약 ack 함수가 호출되지 않거나 실행 도중 프로세스가 종료되면,
# 큐에 해당 데이터가 계속 남아있게 되어 메모리 문제가 생길 수 있다.
def callback(ch, method, properties, body):
    data = json.loads(body)
    # value01 = data.get('key01')
    # value02 = data.get('key02')

        ##### Do someting #####

    print(" [x] Received %r" % body)
        ####### 이 부분이 ack 를 전송해주는 부분 #######
        ch.basic_ack(delivery_tag=method.delivery_tag)

다음 명령어를 통해 메시지 큐 브로커 노드에서 ack를 아직 받지 못한 메시지 리스트를 볼 수 있다.

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

참고: https://www.rabbitmq.com/tutorials/tutorial-two-python.html

'Review' 카테고리의 다른 글

Git flow 전략  (0) 2022.05.27
흥미로운 SOTA 인공지능 오픈소스들..  (0) 2022.05.27
패스워드 관리 툴: pass  (0) 2022.05.27