RabbitMQ Python
✅ Sending (송신)
#!/usr/bin/env python
import pika
# 유저 ID / PW 등록
credentials = pika.PlainCredentials(
username='<username>',
password='<password>'
)
# 브로커와 연결
connection = pika.BlockingConnection(
pika.ConnectionParameters(
'<Host IP>',
credentials=credentials
)
)
channel = connection.channel()
channel.queue_declare(queue='<큐 이름>')
# 브로커에 메시지 전송
channel.basic_publish(exchange='<exchange 이름>',
routing_key='<큐 이름>',
body='<보내려는 메시지>')
print(" [x] Sent <보내려는 메시지>"),
✅ Receiving (수신)
import json
import pika
import consts
# 유저 ID / PW 등록
credentials = pika.PlainCredentials(
username='<username>',
password='<password>'
)
# 브로커와 연결
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host='<host IP>',
credentials=credentials
)
)
channel = connection.channel()
# 큐 이름 설정
channel.queue_declare(
queue='<queue name>'
)
# 데이터가 던져질 때 실행되는 함수
def callback(ch, method, properties, body):
data = json.loads(body)
# value01 = data.get('key01')
# value02 = data.get('key02')
print(" [x] Received %r" % body)
# auto_ack 를 False 로 하면 정상 수신 후에도 큐에 데이터를 남겨두어
# 명시적으로 ack 를 날리지 않으면 잘 받은 메시지도 큐에서 삭제되지 않아서
# 메모리가 낭비될 수 있다.
channel.basic_consume(on_message_callback=callback,
queue='<queue name>',
auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
✅ 배분 방식
- 라운드 로빈(RR) 방식:
- 모든 컨슈머가 돌아가면서 메시지를 수신한다. 평균적으로 각각 같은 개수의 메시지를 받도록 수행된다.
- 프로듀서가 1, 2, 3, 4, 5 를 보냄
컨슈머 a, b, c 가 있을 때
a 는 1, 4 받음
b 는 2, 5 받음
c 는 3 받음
✅ Consumer 의 auto_ack 설정
ack
는acknowledgement
의 약자로 일반적으로 전송이 성공했는지 여부를 확인하기 위해 전송되는 패킷을 말한다.auto_ack=true
→ 수신시 자동으로 큐에서 삭제auto_ack=false
→ 수신해도 사용자가 수동으로 ack 생성시까지 큐에 들고있음
(대신 수동 ack 날리기 전까지 다시 받아오기 가능)auto_ack
를false
로 설정했을 때,ack
를 날리지 않으면 큐에 메시지가 계속 쌓이게 되고, 일정 이상 쌓이면 메모리가 터질 수 있다. 🤯
✅ 수동 ack 전송법
# callback 함수 마지막에 ch.basic_ack() 함수를 호출하면 된다.
# 만약 ack 함수가 호출되지 않거나 실행 도중 프로세스가 종료되면,
# 큐에 해당 데이터가 계속 남아있게 되어 메모리 문제가 생길 수 있다.
def callback(ch, method, properties, body):
data = json.loads(body)
# value01 = data.get('key01')
# value02 = data.get('key02')
##### Do someting #####
print(" [x] Received %r" % body)
####### 이 부분이 ack 를 전송해주는 부분 #######
ch.basic_ack(delivery_tag=method.delivery_tag)
다음 명령어를 통해 메시지 큐 브로커 노드에서 ack를 아직 받지 못한 메시지 리스트를 볼 수 있다.
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
참고: https://www.rabbitmq.com/tutorials/tutorial-two-python.html
'Review' 카테고리의 다른 글
Git flow 전략 (0) | 2022.05.27 |
---|---|
흥미로운 SOTA 인공지능 오픈소스들.. (0) | 2022.05.27 |
패스워드 관리 툴: pass (0) | 2022.05.27 |