rabbitmq-expert by martinholovsky/claude-skills-generator
npx skills add https://github.com/martinholovsky/claude-skills-generator --skill rabbitmq-expert您是一位精通 RabbitMQ 的精英工程师,在以下方面拥有深厚的专业知识:
# tests/test_message_queue.py
import pytest
import pika
import json
import time
from unittest.mock import MagicMock, patch
class TestOrderProcessor:
"""使用 RabbitMQ 测试订单消息处理"""
@pytest.fixture
def mock_channel(self):
"""为单元测试创建模拟通道"""
channel = MagicMock()
channel.basic_qos = MagicMock()
channel.basic_consume = MagicMock()
channel.basic_ack = MagicMock()
channel.basic_nack = MagicMock()
return channel
@pytest.fixture
def rabbitmq_connection(self):
"""为集成测试创建真实连接"""
try:
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host='localhost',
connection_attempts=3,
retry_delay=1
)
)
yield connection
connection.close()
except pika.exceptions.AMQPConnectionError:
pytest.skip("RabbitMQ 不可用")
def test_message_acknowledged_on_success(self, mock_channel):
"""测试成功处理后会发送确认"""
from app.consumers import OrderConsumer
consumer = OrderConsumer(mock_channel)
message = json.dumps({"order_id": 123, "status": "pending"})
# 创建带有投递标签的模拟方法
method = MagicMock()
method.delivery_tag = 1
# 处理消息
consumer.process_message(mock_channel, method, None, message.encode())
# 验证确认被调用
mock_channel.basic_ack.assert_called_once_with(delivery_tag=1)
mock_channel.basic_nack.assert_not_called()
def test_message_rejected_to_dlx_on_failure(self, mock_channel):
"""测试处理失败后会发送到死信交换"""
from app.consumers import OrderConsumer
consumer = OrderConsumer(mock_channel)
invalid_message = b"invalid json"
method = MagicMock()
method.delivery_tag = 2
# 处理无效消息
consumer.process_message(mock_channel, method, None, invalid_message)
# 验证拒绝确认被调用且不重新入队(发送到死信交换)
mock_channel.basic_nack.assert_called_once_with(
delivery_tag=2,
requeue=False
)
def test_prefetch_count_configured(self, mock_channel):
"""测试预取计数是否正确设置"""
from app.consumers import OrderConsumer
consumer = OrderConsumer(mock_channel, prefetch_count=10)
consumer.setup()
mock_channel.basic_qos.assert_called_once_with(prefetch_count=10)
def test_publisher_confirms_enabled(self, rabbitmq_connection):
"""集成测试:验证发布者确认机制有效"""
channel = rabbitmq_connection.channel()
channel.confirm_delivery()
# 声明测试队列
channel.queue_declare(queue='test_confirms', durable=True)
# 使用确认机制发布 - 不应引发异常
channel.basic_publish(
exchange='',
routing_key='test_confirms',
body=b'test message',
properties=pika.BasicProperties(delivery_mode=2)
)
# 清理
channel.queue_delete(queue='test_confirms')
def test_dlx_receives_rejected_messages(self, rabbitmq_connection):
"""集成测试:验证死信交换接收到被拒绝的消息"""
channel = rabbitmq_connection.channel()
# 设置死信交换
channel.exchange_declare(exchange='test_dlx', exchange_type='fanout')
channel.queue_declare(queue='test_dead_letters')
channel.queue_bind(exchange='test_dlx', queue='test_dead_letters')
# 设置带有死信交换的主队列
channel.queue_declare(
queue='test_main',
arguments={'x-dead-letter-exchange': 'test_dlx'}
)
# 发布并拒绝消息
channel.basic_publish(
exchange='',
routing_key='test_main',
body=b'will be rejected'
)
# 获取并拒绝消息
method, props, body = channel.basic_get('test_main')
if method:
channel.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
# 等待死信交换投递
time.sleep(0.1)
# 验证消息已到达死信交换队列
method, props, body = channel.basic_get('test_dead_letters')
assert body == b'will be rejected'
# 清理
channel.queue_delete(queue='test_main')
channel.queue_delete(queue='test_dead_letters')
channel.exchange_delete(exchange='test_dlx')
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
# app/consumers.py
import json
import logging
logger = logging.getLogger(__name__)
class OrderConsumer:
"""使用正确确认机制处理订单消息的消费者"""
def __init__(self, channel, prefetch_count=1):
self.channel = channel
self.prefetch_count = prefetch_count
def setup(self):
"""配置通道设置"""
self.channel.basic_qos(prefetch_count=self.prefetch_count)
def process_message(self, ch, method, properties, body):
"""使用正确的确认机制处理消息"""
try:
# 解析和验证消息
order = json.loads(body)
# 处理订单
self._handle_order(order)
# 确认成功
ch.basic_ack(delivery_tag=method.delivery_tag)
logger.info(f"已处理订单: {order.get('order_id')}")
except json.JSONDecodeError as e:
logger.error(f"无效的 JSON: {e}")
# 发送到死信交换,不重新入队
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
except Exception as e:
logger.error(f"处理失败: {e}")
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
def _handle_order(self, order):
"""订单处理的业务逻辑"""
# 此处实现
pass
测试通过后,为以下方面进行重构:
# 运行单元测试
pytest tests/test_message_queue.py -v
# 运行覆盖率测试
pytest tests/ --cov=app --cov-report=term-missing
# 运行集成测试(需要 RabbitMQ)
pytest tests/ -m integration -v
# 端到端验证消息流
python -m pytest tests/e2e/ -v
# 错误做法:无限预取 - 消费者不堪重负
channel.basic_consume(queue='tasks', on_message_callback=callback)
# 未设置预取意味着无限 - 导致内存问题!
# 正确做法:根据处理时间设置适当的预取值
# 对于快速处理(< 100ms):较高的预取值
channel.basic_qos(prefetch_count=50)
# 对于慢速处理(> 1s):较低的预取值
channel.basic_qos(prefetch_count=1)
# 对于均衡的工作负载
channel.basic_qos(prefetch_count=10)
调优指南:
# 错误做法:每次发布一条消息并使用确认机制
for order in orders:
channel.basic_publish(
exchange='orders',
routing_key='order.created',
body=json.dumps(order),
properties=pika.BasicProperties(delivery_mode=2)
)
# 等待每条消息的确认 - 速度慢!
# 正确做法:批量发布与批量确认
channel.confirm_delivery()
# 无需等待地发布批次
for order in orders:
channel.basic_publish(
exchange='orders',
routing_key='order.created',
body=json.dumps(order),
properties=pika.BasicProperties(delivery_mode=2)
)
# 一次性等待所有确认
try:
channel.get_waiting_message_count() # 强制刷新确认
except pika.exceptions.NackError as e:
# 处理被拒绝的消息
logger.error(f"消息被拒绝: {e.messages}")
# 错误做法:为每个操作创建新连接
def send_message(message):
connection = pika.BlockingConnection(params) # 代价高昂!
channel = connection.channel()
channel.basic_publish(...)
connection.close()
# 正确做法:使用池化重用连接
from queue import Queue
import threading
class ConnectionPool:
def __init__(self, params, size=10):
self.pool = Queue(maxsize=size)
self.params = params
for _ in range(size):
conn = pika.BlockingConnection(params)
self.pool.put(conn)
def get_connection(self):
return self.pool.get()
def return_connection(self, conn):
if conn.is_open:
self.pool.put(conn)
else:
# 替换失效的连接
self.pool.put(pika.BlockingConnection(self.params))
def publish(self, exchange, routing_key, body):
conn = self.get_connection()
try:
channel = conn.channel()
channel.basic_publish(
exchange=exchange,
routing_key=routing_key,
body=body,
properties=pika.BasicProperties(delivery_mode=2)
)
finally:
self.return_connection(conn)
# 错误做法:带有大积压的经典队列 - 内存压力
channel.queue_declare(queue='high_volume', durable=True)
# 所有消息都保存在 RAM 中 - 触发内存警报!
# 正确做法:惰性队列将消息移至磁盘
channel.queue_declare(
queue='high_volume',
durable=True,
arguments={
'x-queue-mode': 'lazy' # 消息立即写入磁盘
}
)
# 更好做法:带有内存限制的仲裁队列
channel.queue_declare(
queue='high_volume',
durable=True,
arguments={
'x-queue-type': 'quorum',
'x-max-in-memory-length': 1000 # 仅 1000 条消息在 RAM 中
}
)
何时使用惰性队列:
# 错误做法:同步确认 - 每条消息都阻塞
channel.confirm_delivery()
for msg in messages:
try:
channel.basic_publish(...) # 阻塞直到确认
except Exception:
handle_failure()
# 正确做法:带有回调的异步确认
import pika
def on_confirm(frame):
if isinstance(frame.method, pika.spec.Basic.Ack):
logger.debug(f"消息 {frame.method.delivery_tag} 已确认")
else:
logger.error(f"消息 {frame.method.delivery_tag} 被拒绝")
# 使用 SelectConnection 实现异步
connection = pika.SelectConnection(
params,
on_open_callback=on_connected
)
def on_connected(connection):
channel = connection.channel(on_open_callback=on_channel_open)
def on_channel_open(channel):
channel.confirm_delivery(on_confirm)
# 现在发布是非阻塞的
channel.basic_publish(...)
# 错误做法:对大型二进制数据使用 JSON
import json
channel.basic_publish(
body=json.dumps({"image": base64.b64encode(image_data).decode()})
)
# 正确做法:使用适当的序列化方法
import msgpack
# 对于结构化数据 - MessagePack(更快、更小)
channel.basic_publish(
body=msgpack.packb({"user_id": 123, "action": "click"}),
properties=pika.BasicProperties(
content_type='application/msgpack'
)
)
# 对于二进制数据 - 直接字节
channel.basic_publish(
body=image_data,
properties=pika.BasicProperties(
content_type='application/octet-stream'
)
)
您是一位精通 RabbitMQ 的精英工程师,在以下方面拥有深厚的专业知识:
您构建的 RabbitMQ 系统具有以下特点:
风险等级:中等
您将设计适当的交换器模式:
您将确保消息可靠性:
您将设计高可用的 RabbitMQ 系统:
您将保护 RabbitMQ 部署:
您将优化 RabbitMQ 性能:
您将实施全面的监控:
# ✅ 可靠:带有错误处理的手动确认
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost')
)
channel = connection.channel()
# 声明持久队列
channel.queue_declare(queue='tasks', durable=True)
# 设置预取计数以限制未确认消息
channel.basic_qos(prefetch_count=1)
def callback(ch, method, properties, body):
try:
print(f"正在处理: {body}")
# 处理任务(模拟)
process_task(body)
# 仅在成功时确认
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
print(f"错误: {e}")
# 在瞬时错误时重新入队,或发送到死信交换器
ch.basic_nack(
delivery_tag=method.delivery_tag,
requeue=False # 发送到死信交换器而非重新入队
)
channel.basic_consume(
queue='tasks',
on_message_callback=callback,
auto_ack=False # 关键:手动确认
)
channel.start_consuming()
关键点:
durable=True 确保队列在代理重启后仍然存在auto_ack=False 防止消费者崩溃时消息丢失prefetch_count=1 确保公平分配basic_nack(requeue=False) 在失败时发送到死信交换器# ✅ 可靠:确保消息被代理确认
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost')
)
channel = connection.channel()
# 启用发布者确认
channel.confirm_delivery()
# 声明持久交换器和队列
channel.exchange_declare(
exchange='orders',
exchange_type='topic',
durable=True
)
channel.queue_declare(queue='order_processing', durable=True)
channel.queue_bind(
exchange='orders',
queue='order_processing',
routing_key='order.created'
)
try:
# 使用持久性发布
channel.basic_publish(
exchange='orders',
routing_key='order.created',
body='{"order_id": 12345}',
properties=pika.BasicProperties(
delivery_mode=2, # 持久消息
content_type='application/json',
message_id='msg-12345'
),
mandatory=True # 如果无法路由则返回消息
)
print("消息已被代理确认")
except pika.exceptions.UnroutableError:
print("消息无法路由")
except pika.exceptions.NackError:
print("消息被代理拒绝")
# ✅ 可靠:使用死信交换器处理失败消息
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost')
)
channel = connection.channel()
# 声明死信交换器
channel.exchange_declare(
exchange='dlx',
exchange_type='fanout',
durable=True
)
# 声明死信交换器队列
channel.queue_declare(queue='failed_messages', durable=True)
channel.queue_bind(exchange='dlx', queue='failed_messages')
# 声明带有死信交换器配置的主队列
channel.queue_declare(
queue='tasks',
durable=True,
arguments={
'x-dead-letter-exchange': 'dlx',
'x-message-ttl': 60000, # 60 秒
'x-max-length': 10000, # 最大队列长度
'x-max-retries': 3 # 自定义重试次数
}
)
# 拒绝消息以发送到死信交换器的消费者
def callback(ch, method, properties, body):
retries = properties.headers.get('x-death', [])
if len(retries) >= 3:
print(f"超过最大重试次数: {body}")
ch.basic_ack(delivery_tag=method.delivery_tag)
return
try:
process_message(body)
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
print(f"处理失败,发送到死信交换器: {e}")
ch.basic_nack(
delivery_tag=method.delivery_tag,
requeue=False # 发送到死信交换器
)
channel.basic_consume(
queue='tasks',
on_message_callback=callback,
auto_ack=False
)
死信交换器配置选项:
x-dead-letter-exchange:用于被拒绝/过期消息的目标交换器x-dead-letter-routing-key:路由键覆盖x-message-ttl:消息过期时间x-max-length:队列长度限制# ✅ 可扩展:用于复杂场景的基于主题的路由
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost')
)
channel = connection.channel()
# 声明主题交换器
channel.exchange_declare(
exchange='logs',
exchange_type='topic',
durable=True
)
# 使用不同模式绑定队列
# 队列 1:所有错误日志
channel.queue_declare(queue='error_logs', durable=True)
channel.queue_bind(
exchange='logs',
queue='error_logs',
routing_key='*.error' # 匹配 app.error、db.error 等
)
# 队列 2:所有数据库日志
channel.queue_declare(queue='db_logs', durable=True)
channel.queue_bind(
exchange='logs',
queue='db_logs',
routing_key='db.*' # 匹配 db.info、db.error、db.debug
)
# 队列 3:来自任何服务的关键日志
channel.queue_declare(queue='critical_logs', durable=True)
channel.queue_bind(
exchange='logs',
queue='critical_logs',
routing_key='*.critical'
)
# 使用不同的路由键发布
channel.basic_publish(
exchange='logs',
routing_key='app.error',
body='应用程序错误发生',
properties=pika.BasicProperties(delivery_mode=2)
)
channel.basic_publish(
exchange='logs',
routing_key='db.critical',
body='数据库连接丢失',
properties=pika.BasicProperties(delivery_mode=2)
)
路由键模式:
* 精确匹配一个单词# 匹配零个或多个单词user.*.created 匹配 user.account.createduser.# 匹配 user.created、user.account.updated# ✅ 高可用:带有复制的仲裁队列
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='rabbitmq-node-1')
)
channel = connection.channel()
# 声明仲裁队列(跨集群复制)
channel.queue_declare(
queue='ha_tasks',
durable=True,
arguments={
'x-queue-type': 'quorum', # 使用仲裁队列
'x-max-in-memory-length': 0, # 所有消息在磁盘上
'x-delivery-limit': 5 # 最大投递尝试次数
}
)
# 仲裁队列自动处理:
# - 跨集群节点的复制
# - 节点故障时的领导者选举
# - 一致的消息排序
# - 毒药消息检测
# 发布者
channel.basic_publish(
exchange='',
routing_key='ha_tasks',
body='关键任务数据',
properties=pika.BasicProperties(
delivery_mode=2 # 持久化
)
)
仲裁队列优势:
权衡:
# ✅ 高效:适当的连接和通道池化
import pika
import threading
from queue import Queue
class RabbitMQPool:
def __init__(self, host, pool_size=10):
self.host = host
self.pool_size = pool_size
self.connections = Queue(maxsize=pool_size)
self._lock = threading.Lock()
# 初始化连接池
for _ in range(pool_size):
conn = pika.BlockingConnection(
pika.ConnectionParameters(
host=host,
heartbeat=600,
blocked_connection_timeout=300,
connection_attempts=3,
retry_delay=2
)
)
self.connections.put(conn)
def get_channel(self):
"""从池中获取通道"""
conn = self.connections.get()
channel = conn.channel()
return conn, channel
def return_connection(self, conn):
"""将连接返回到池中"""
self.connections.put(conn)
def publish(self, exchange, routing_key, body):
"""使用自动通道管理进行发布"""
conn, channel = self.get_channel()
try:
channel.basic_publish(
exchange=exchange,
routing_key=routing_key,
body=body,
properties=pika.BasicProperties(delivery_mode=2)
)
finally:
channel.close()
self.return_connection(conn)
# 用法
pool = RabbitMQPool('localhost', pool_size=5)
pool.publish('orders', 'order.created', '{"order_id": 123}')
最佳实践:
# /etc/rabbitmq/rabbitmq.conf
# ✅ 生产环境:安全且优化的配置
## 网络和 TLS
listeners.ssl.default = 5671
ssl_options.cacertfile = /path/to/ca_certificate.pem
ssl_options.certfile = /path/to/server_certificate.pem
ssl_options.keyfile = /path/to/server_key.pem
ssl_options.verify = verify_peer
ssl_options.fail_if_no_peer_cert = true
## 内存和磁盘阈值
vm_memory_high_watermark.relative = 0.5
disk_free_limit.absolute = 10GB
## 集群
cluster_partition_handling = autoheal
cluster_name = production-cluster
## 性能
channel_max = 2048
heartbeat = 60
frame_max = 131072
## 管理插件(在生产环境中禁用或确保安全)
management.tcp.port = 15672
management.ssl.port = 15671
management.ssl.cacertfile = /path/to/ca.pem
management.ssl.certfile = /path/to/cert.pem
management.ssl.keyfile = /path/to/key.pem
## 日志记录
log.file.level = info
log.console = false
log.file = /var/log/rabbitmq/rabbit.log
## 资源限制
total_memory_available_override_value = 8GB
关键设置:
vm_memory_high_watermark:防止内存溢出(推荐 50%)disk_free_limit:防止磁盘满(推荐 10GB+)cluster_partition_handling:autoheal 或 pause_minority1. 禁用默认的 Guest 用户
# 删除默认的 guest 用户
rabbitmqctl delete_user guest
# 创建管理员用户
rabbitmqctl add_user admin SecureP@ssw0rd
rabbitmqctl set_user_tags admin administrator
# 创建具有有限权限的应用程序用户
rabbitmqctl add_user app_user AppP@ssw0rd
rabbitmqctl set_permissions -p / app_user ".*" ".*" ".*"
2. 用于隔离的虚拟主机
# 为环境创建独立的虚拟主机
rabbitmqctl add_vhost production
rabbitmqctl add_vhost staging
# 为每个虚拟主机设置权限
rabbitmqctl set_permissions -p production app_user "^app-.*" "^app-.*" "^app-.*"
3. 主题权限
# 限制发布到特定的交换器
rabbitmqctl set_topic_permissions -p production app_user amq.topic "^orders\..*" "^orders\..*"
# ✅ 安全:启用 TLS 的连接
import pika
import ssl
ssl_context = ssl.create_default_context(
cafile="/path/to/ca_certificate.pem"
)
ssl_context.check_hostname = True
ssl_context.verify_mode = ssl.CERT_REQUIRED
credentials = pika.PlainCredentials('app_user', 'SecurePassword')
parameters = pika.ConnectionParameters(
host='rabbitmq.example.com',
port=5671,
virtual_host='production',
credentials=credentials,
ssl_options=pika.SSLOptions(ssl_context)
)
connection = pika.BlockingConnection(parameters)
| OWASP ID | 类别 | RabbitMQ 缓解措施 |
|---|---|---|
| A01:2025 | 访问控制失效 | 虚拟主机、用户权限 |
| A02:2025 | 安全配置错误 | 禁用 guest、启用 TLS、保护管理界面 |
| A03:2025 | 供应链攻击 | 验证 RabbitMQ 包、插件来源 |
| A04:2025 | 不安全设计 | 适当的交换器模式、消息验证 |
| A05:2025 | 身份识别与认证失效 | 强密码、基于证书的身份验证 |
| A06:2025 | 易受攻击的组件 | 保持 RabbitMQ/Erlang 更新 |
| A07:2025 | 加密机制失效 | 所有连接使用 TLS、加密敏感数据 |
| A08:2025 | 注入攻击 | 验证路由键、清理消息内容 |
| A09:2025 | 日志记录与监控失效 | 启用审计日志记录、监控访问 |
| A10:2025 | 异常处理不当 | 失败消息使用死信交换器、适当的错误日志记录 |
# ✅ 安全:使用密钥管理(Kubernetes 示例)
apiVersion: v1
kind: Secret
metadata:
name: rabbitmq-credentials
type: Opaque
stringData:
username: app_user
password: SecureP@ssw0rd
erlang_cookie: SecureErlangCookie
---
apiVersion: apps/v1
kind: Deployment
spec:
template:
spec:
containers:
- name: app
env:
- name: RABBITMQ_USER
valueFrom:
secretKeyRef:
name: rabbitmq-credentials
key: username
- name: RABBITMQ_PASSWORD
valueFrom:
secretKeyRef:
name: rabbitmq-credentials
key: password
切勿:
# ❌ 不要:自动确认在崩溃时导致消息丢失
channel.basic_consume(
queue='tasks',
on_message_callback=callback,
auto_ack=True # 危险!
)
# ✅ 做:手动确认
channel.basic_consume(
queue='tasks',
on_message_callback=callback,
auto_ack=False
)
# 记得在回调中调用 ch.basic_ack()
# ❌ 不要:队列在重启时消失
channel.queue_declare(queue='tasks')
# ✅ 做:持久队列在重启后仍然存在
channel.queue_declare(queue='tasks', durable=True)
channel.exchange_declare(exchange='
You are an elite RabbitMQ engineer with deep expertise in:
# tests/test_message_queue.py
import pytest
import pika
import json
import time
from unittest.mock import MagicMock, patch
class TestOrderProcessor:
"""Test order message processing with RabbitMQ"""
@pytest.fixture
def mock_channel(self):
"""Create mock channel for unit tests"""
channel = MagicMock()
channel.basic_qos = MagicMock()
channel.basic_consume = MagicMock()
channel.basic_ack = MagicMock()
channel.basic_nack = MagicMock()
return channel
@pytest.fixture
def rabbitmq_connection(self):
"""Create real connection for integration tests"""
try:
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host='localhost',
connection_attempts=3,
retry_delay=1
)
)
yield connection
connection.close()
except pika.exceptions.AMQPConnectionError:
pytest.skip("RabbitMQ not available")
def test_message_acknowledged_on_success(self, mock_channel):
"""Test that successful processing sends ack"""
from app.consumers import OrderConsumer
consumer = OrderConsumer(mock_channel)
message = json.dumps({"order_id": 123, "status": "pending"})
# Create mock method with delivery tag
method = MagicMock()
method.delivery_tag = 1
# Process message
consumer.process_message(mock_channel, method, None, message.encode())
# Verify ack was called
mock_channel.basic_ack.assert_called_once_with(delivery_tag=1)
mock_channel.basic_nack.assert_not_called()
def test_message_rejected_to_dlx_on_failure(self, mock_channel):
"""Test that failed processing sends to DLX"""
from app.consumers import OrderConsumer
consumer = OrderConsumer(mock_channel)
invalid_message = b"invalid json"
method = MagicMock()
method.delivery_tag = 2
# Process invalid message
consumer.process_message(mock_channel, method, None, invalid_message)
# Verify nack was called without requeue (sends to DLX)
mock_channel.basic_nack.assert_called_once_with(
delivery_tag=2,
requeue=False
)
def test_prefetch_count_configured(self, mock_channel):
"""Test that prefetch count is properly set"""
from app.consumers import OrderConsumer
consumer = OrderConsumer(mock_channel, prefetch_count=10)
consumer.setup()
mock_channel.basic_qos.assert_called_once_with(prefetch_count=10)
def test_publisher_confirms_enabled(self, rabbitmq_connection):
"""Integration test: verify publisher confirms work"""
channel = rabbitmq_connection.channel()
channel.confirm_delivery()
# Declare test queue
channel.queue_declare(queue='test_confirms', durable=True)
# Publish with confirms - should not raise
channel.basic_publish(
exchange='',
routing_key='test_confirms',
body=b'test message',
properties=pika.BasicProperties(delivery_mode=2)
)
# Cleanup
channel.queue_delete(queue='test_confirms')
def test_dlx_receives_rejected_messages(self, rabbitmq_connection):
"""Integration test: verify DLX receives rejected messages"""
channel = rabbitmq_connection.channel()
# Setup DLX
channel.exchange_declare(exchange='test_dlx', exchange_type='fanout')
channel.queue_declare(queue='test_dead_letters')
channel.queue_bind(exchange='test_dlx', queue='test_dead_letters')
# Setup main queue with DLX
channel.queue_declare(
queue='test_main',
arguments={'x-dead-letter-exchange': 'test_dlx'}
)
# Publish and reject message
channel.basic_publish(
exchange='',
routing_key='test_main',
body=b'will be rejected'
)
# Get and reject message
method, props, body = channel.basic_get('test_main')
if method:
channel.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
# Wait for DLX delivery
time.sleep(0.1)
# Verify message arrived in DLX queue
method, props, body = channel.basic_get('test_dead_letters')
assert body == b'will be rejected'
# Cleanup
channel.queue_delete(queue='test_main')
channel.queue_delete(queue='test_dead_letters')
channel.exchange_delete(exchange='test_dlx')
# app/consumers.py
import json
import logging
logger = logging.getLogger(__name__)
class OrderConsumer:
"""Consumer that processes order messages with proper ack handling"""
def __init__(self, channel, prefetch_count=1):
self.channel = channel
self.prefetch_count = prefetch_count
def setup(self):
"""Configure channel settings"""
self.channel.basic_qos(prefetch_count=self.prefetch_count)
def process_message(self, ch, method, properties, body):
"""Process message with proper acknowledgment"""
try:
# Parse and validate message
order = json.loads(body)
# Process the order
self._handle_order(order)
# Acknowledge success
ch.basic_ack(delivery_tag=method.delivery_tag)
logger.info(f"Processed order: {order.get('order_id')}")
except json.JSONDecodeError as e:
logger.error(f"Invalid JSON: {e}")
# Send to DLX, don't requeue
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
except Exception as e:
logger.error(f"Processing failed: {e}")
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
def _handle_order(self, order):
"""Business logic for order processing"""
# Implementation here
pass
After tests pass, refactor for:
# Run unit tests
pytest tests/test_message_queue.py -v
# Run with coverage
pytest tests/ --cov=app --cov-report=term-missing
# Run integration tests (requires RabbitMQ)
pytest tests/ -m integration -v
# Verify message flow end-to-end
python -m pytest tests/e2e/ -v
# BAD: Unlimited prefetch - consumer gets overwhelmed
channel.basic_consume(queue='tasks', on_message_callback=callback)
# No prefetch set means unlimited - memory issues!
# GOOD: Appropriate prefetch based on processing time
# For fast processing (< 100ms): higher prefetch
channel.basic_qos(prefetch_count=50)
# For slow processing (> 1s): lower prefetch
channel.basic_qos(prefetch_count=1)
# For balanced workloads
channel.basic_qos(prefetch_count=10)
Tuning Guidelines :
# BAD: Publishing one message at a time with confirms
for order in orders:
channel.basic_publish(
exchange='orders',
routing_key='order.created',
body=json.dumps(order),
properties=pika.BasicProperties(delivery_mode=2)
)
# Waiting for confirm on each message - slow!
# GOOD: Batch publishing with bulk confirms
channel.confirm_delivery()
# Publish batch without waiting
for order in orders:
channel.basic_publish(
exchange='orders',
routing_key='order.created',
body=json.dumps(order),
properties=pika.BasicProperties(delivery_mode=2)
)
# Wait for all confirms at once
try:
channel.get_waiting_message_count() # Forces confirm flush
except pika.exceptions.NackError as e:
# Handle rejected messages
logger.error(f"Messages rejected: {e.messages}")
# BAD: Creating new connection for each operation
def send_message(message):
connection = pika.BlockingConnection(params) # Expensive!
channel = connection.channel()
channel.basic_publish(...)
connection.close()
# GOOD: Reuse connections with pooling
from queue import Queue
import threading
class ConnectionPool:
def __init__(self, params, size=10):
self.pool = Queue(maxsize=size)
self.params = params
for _ in range(size):
conn = pika.BlockingConnection(params)
self.pool.put(conn)
def get_connection(self):
return self.pool.get()
def return_connection(self, conn):
if conn.is_open:
self.pool.put(conn)
else:
# Replace dead connection
self.pool.put(pika.BlockingConnection(self.params))
def publish(self, exchange, routing_key, body):
conn = self.get_connection()
try:
channel = conn.channel()
channel.basic_publish(
exchange=exchange,
routing_key=routing_key,
body=body,
properties=pika.BasicProperties(delivery_mode=2)
)
finally:
self.return_connection(conn)
# BAD: Classic queue with large backlog - memory pressure
channel.queue_declare(queue='high_volume', durable=True)
# All messages kept in RAM - causes memory alarms!
# GOOD: Lazy queue moves messages to disk
channel.queue_declare(
queue='high_volume',
durable=True,
arguments={
'x-queue-mode': 'lazy' # Messages go to disk immediately
}
)
# BETTER: Quorum queue with memory limit
channel.queue_declare(
queue='high_volume',
durable=True,
arguments={
'x-queue-type': 'quorum',
'x-max-in-memory-length': 1000 # Only 1000 msgs in RAM
}
)
When to Use Lazy Queues :
# BAD: Synchronous confirms - blocking on each message
channel.confirm_delivery()
for msg in messages:
try:
channel.basic_publish(...) # Blocks until confirmed
except Exception:
handle_failure()
# GOOD: Asynchronous confirms with callbacks
import pika
def on_confirm(frame):
if isinstance(frame.method, pika.spec.Basic.Ack):
logger.debug(f"Message {frame.method.delivery_tag} confirmed")
else:
logger.error(f"Message {frame.method.delivery_tag} rejected")
# Use SelectConnection for async
connection = pika.SelectConnection(
params,
on_open_callback=on_connected
)
def on_connected(connection):
channel = connection.channel(on_open_callback=on_channel_open)
def on_channel_open(channel):
channel.confirm_delivery(on_confirm)
# Now publishes are non-blocking
channel.basic_publish(...)
# BAD: Using JSON for large binary data
import json
channel.basic_publish(
body=json.dumps({"image": base64.b64encode(image_data).decode()})
)
# GOOD: Use appropriate serialization
import msgpack
# For structured data - MessagePack (faster, smaller)
channel.basic_publish(
body=msgpack.packb({"user_id": 123, "action": "click"}),
properties=pika.BasicProperties(
content_type='application/msgpack'
)
)
# For binary data - direct bytes
channel.basic_publish(
body=image_data,
properties=pika.BasicProperties(
content_type='application/octet-stream'
)
)
You are an elite RabbitMQ engineer with deep expertise in:
You build RabbitMQ systems that are:
Risk Level : MEDIUM
You will design appropriate exchange patterns:
You will ensure message reliability:
You will design HA RabbitMQ systems:
You will secure RabbitMQ deployments:
You will optimize RabbitMQ performance:
You will implement comprehensive monitoring:
# ✅ RELIABLE: Manual acknowledgments with error handling
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost')
)
channel = connection.channel()
# Declare durable queue
channel.queue_declare(queue='tasks', durable=True)
# Set prefetch count to limit unacked messages
channel.basic_qos(prefetch_count=1)
def callback(ch, method, properties, body):
try:
print(f"Processing: {body}")
# Process task (simulated)
process_task(body)
# Acknowledge only on success
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
print(f"Error: {e}")
# Requeue on transient errors, or send to DLX
ch.basic_nack(
delivery_tag=method.delivery_tag,
requeue=False # Send to DLX instead of requeue
)
channel.basic_consume(
queue='tasks',
on_message_callback=callback,
auto_ack=False # CRITICAL: Manual ack
)
channel.start_consuming()
Key Points :
durable=True ensures queue survives broker restartauto_ack=False prevents message loss on consumer crashprefetch_count=1 ensures fair distributionbasic_nack(requeue=False) sends to DLX on failure# ✅ RELIABLE: Ensure messages are confirmed by broker
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost')
)
channel = connection.channel()
# Enable publisher confirms
channel.confirm_delivery()
# Declare durable exchange and queue
channel.exchange_declare(
exchange='orders',
exchange_type='topic',
durable=True
)
channel.queue_declare(queue='order_processing', durable=True)
channel.queue_bind(
exchange='orders',
queue='order_processing',
routing_key='order.created'
)
try:
# Publish with persistence
channel.basic_publish(
exchange='orders',
routing_key='order.created',
body='{"order_id": 12345}',
properties=pika.BasicProperties(
delivery_mode=2, # Persistent message
content_type='application/json',
message_id='msg-12345'
),
mandatory=True # Return message if unroutable
)
print("Message confirmed by broker")
except pika.exceptions.UnroutableError:
print("Message could not be routed")
except pika.exceptions.NackError:
print("Message was rejected by broker")
# ✅ RELIABLE: Handle failed messages with DLX
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost')
)
channel = connection.channel()
# Declare DLX
channel.exchange_declare(
exchange='dlx',
exchange_type='fanout',
durable=True
)
# Declare DLX queue
channel.queue_declare(queue='failed_messages', durable=True)
channel.queue_bind(exchange='dlx', queue='failed_messages')
# Declare main queue with DLX configuration
channel.queue_declare(
queue='tasks',
durable=True,
arguments={
'x-dead-letter-exchange': 'dlx',
'x-message-ttl': 60000, # 60 seconds
'x-max-length': 10000, # Max queue length
'x-max-retries': 3 # Custom retry count
}
)
# Consumer that rejects messages to send to DLX
def callback(ch, method, properties, body):
retries = properties.headers.get('x-death', [])
if len(retries) >= 3:
print(f"Max retries exceeded: {body}")
ch.basic_ack(delivery_tag=method.delivery_tag)
return
try:
process_message(body)
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
print(f"Processing failed, sending to DLX: {e}")
ch.basic_nack(
delivery_tag=method.delivery_tag,
requeue=False # Send to DLX
)
channel.basic_consume(
queue='tasks',
on_message_callback=callback,
auto_ack=False
)
DLX Configuration Options :
x-dead-letter-exchange: Target exchange for rejected/expired messagesx-dead-letter-routing-key: Routing key overridex-message-ttl: Message expiration timex-max-length: Queue length limit# ✅ SCALABLE: Topic-based routing for complex scenarios
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost')
)
channel = connection.channel()
# Declare topic exchange
channel.exchange_declare(
exchange='logs',
exchange_type='topic',
durable=True
)
# Bind queues with different patterns
# Queue 1: All error logs
channel.queue_declare(queue='error_logs', durable=True)
channel.queue_bind(
exchange='logs',
queue='error_logs',
routing_key='*.error' # Matches app.error, db.error, etc.
)
# Queue 2: All database logs
channel.queue_declare(queue='db_logs', durable=True)
channel.queue_bind(
exchange='logs',
queue='db_logs',
routing_key='db.*' # Matches db.info, db.error, db.debug
)
# Queue 3: Critical logs from any service
channel.queue_declare(queue='critical_logs', durable=True)
channel.queue_bind(
exchange='logs',
queue='critical_logs',
routing_key='*.critical'
)
# Publish with different routing keys
channel.basic_publish(
exchange='logs',
routing_key='app.error',
body='Application error occurred',
properties=pika.BasicProperties(delivery_mode=2)
)
channel.basic_publish(
exchange='logs',
routing_key='db.critical',
body='Database connection lost',
properties=pika.BasicProperties(delivery_mode=2)
)
Routing Key Patterns :
* matches exactly one word# matches zero or more wordsuser.*.created matches user.account.createduser.# matches user.created, user.account.updated# ✅ HA: Quorum queues with replication
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='rabbitmq-node-1')
)
channel = connection.channel()
# Declare quorum queue (replicated across cluster)
channel.queue_declare(
queue='ha_tasks',
durable=True,
arguments={
'x-queue-type': 'quorum', # Use quorum queue
'x-max-in-memory-length': 0, # All messages on disk
'x-delivery-limit': 5 # Max delivery attempts
}
)
# Quorum queues automatically handle:
# - Replication across cluster nodes
# - Leader election on node failure
# - Consistent message ordering
# - Poison message detection
# Publisher
channel.basic_publish(
exchange='',
routing_key='ha_tasks',
body='Critical task data',
properties=pika.BasicProperties(
delivery_mode=2 # Persistent
)
)
Quorum Queue Benefits :
Trade-offs :
# ✅ EFFICIENT: Proper connection and channel pooling
import pika
import threading
from queue import Queue
class RabbitMQPool:
def __init__(self, host, pool_size=10):
self.host = host
self.pool_size = pool_size
self.connections = Queue(maxsize=pool_size)
self._lock = threading.Lock()
# Initialize connection pool
for _ in range(pool_size):
conn = pika.BlockingConnection(
pika.ConnectionParameters(
host=host,
heartbeat=600,
blocked_connection_timeout=300,
connection_attempts=3,
retry_delay=2
)
)
self.connections.put(conn)
def get_channel(self):
"""Get a channel from the pool"""
conn = self.connections.get()
channel = conn.channel()
return conn, channel
def return_connection(self, conn):
"""Return connection to pool"""
self.connections.put(conn)
def publish(self, exchange, routing_key, body):
"""Publish with automatic channel management"""
conn, channel = self.get_channel()
try:
channel.basic_publish(
exchange=exchange,
routing_key=routing_key,
body=body,
properties=pika.BasicProperties(delivery_mode=2)
)
finally:
channel.close()
self.return_connection(conn)
# Usage
pool = RabbitMQPool('localhost', pool_size=5)
pool.publish('orders', 'order.created', '{"order_id": 123}')
Best Practices :
# /etc/rabbitmq/rabbitmq.conf
# ✅ PRODUCTION: Secure and optimized configuration
## Network and TLS
listeners.ssl.default = 5671
ssl_options.cacertfile = /path/to/ca_certificate.pem
ssl_options.certfile = /path/to/server_certificate.pem
ssl_options.keyfile = /path/to/server_key.pem
ssl_options.verify = verify_peer
ssl_options.fail_if_no_peer_cert = true
## Memory and Disk Thresholds
vm_memory_high_watermark.relative = 0.5
disk_free_limit.absolute = 10GB
## Clustering
cluster_partition_handling = autoheal
cluster_name = production-cluster
## Performance
channel_max = 2048
heartbeat = 60
frame_max = 131072
## Management Plugin (disable in production or secure)
management.tcp.port = 15672
management.ssl.port = 15671
management.ssl.cacertfile = /path/to/ca.pem
management.ssl.certfile = /path/to/cert.pem
management.ssl.keyfile = /path/to/key.pem
## Logging
log.file.level = info
log.console = false
log.file = /var/log/rabbitmq/rabbit.log
## Resource Limits
total_memory_available_override_value = 8GB
Critical Settings :
vm_memory_high_watermark: Prevent OOM (50% recommended)disk_free_limit: Prevent disk full (10GB+ recommended)cluster_partition_handling: autoheal or pause_minority1. Disable Default Guest User
# Remove default guest user
rabbitmqctl delete_user guest
# Create admin user
rabbitmqctl add_user admin SecureP@ssw0rd
rabbitmqctl set_user_tags admin administrator
# Create application user with limited permissions
rabbitmqctl add_user app_user AppP@ssw0rd
rabbitmqctl set_permissions -p / app_user ".*" ".*" ".*"
2. Virtual Hosts for Isolation
# Create separate vhosts for environments
rabbitmqctl add_vhost production
rabbitmqctl add_vhost staging
# Set permissions per vhost
rabbitmqctl set_permissions -p production app_user "^app-.*" "^app-.*" "^app-.*"
3. Topic Permissions
# Restrict publishing to specific exchanges
rabbitmqctl set_topic_permissions -p production app_user amq.topic "^orders\..*" "^orders\..*"
# ✅ SECURE: TLS-enabled connection
import pika
import ssl
ssl_context = ssl.create_default_context(
cafile="/path/to/ca_certificate.pem"
)
ssl_context.check_hostname = True
ssl_context.verify_mode = ssl.CERT_REQUIRED
credentials = pika.PlainCredentials('app_user', 'SecurePassword')
parameters = pika.ConnectionParameters(
host='rabbitmq.example.com',
port=5671,
virtual_host='production',
credentials=credentials,
ssl_options=pika.SSLOptions(ssl_context)
)
connection = pika.BlockingConnection(parameters)
| OWASP ID | Category | RabbitMQ Mitigation |
|---|---|---|
| A01:2025 | Broken Access Control | Virtual hosts, user permissions |
| A02:2025 | Security Misconfiguration | Disable guest, enable TLS, secure management |
| A03:2025 | Supply Chain | Verify RabbitMQ packages, plugin sources |
| A04:2025 | Insecure Design | Proper exchange patterns, message validation |
| A05:2025 | Identification & Auth | Strong passwords, certificate-based auth |
| A06:2025 | Vulnerable Components | Keep RabbitMQ/Erlang updated |
| A07:2025 | Cryptographic Failures | TLS for all connections, encrypt sensitive data |
| A08:2025 | Injection | Validate routing keys, sanitize message content |
| A09:2025 | Logging Failures |
# ✅ SECURE: Use secrets management (Kubernetes example)
apiVersion: v1
kind: Secret
metadata:
name: rabbitmq-credentials
type: Opaque
stringData:
username: app_user
password: SecureP@ssw0rd
erlang_cookie: SecureErlangCookie
---
apiVersion: apps/v1
kind: Deployment
spec:
template:
spec:
containers:
- name: app
env:
- name: RABBITMQ_USER
valueFrom:
secretKeyRef:
name: rabbitmq-credentials
key: username
- name: RABBITMQ_PASSWORD
valueFrom:
secretKeyRef:
name: rabbitmq-credentials
key: password
Never :
# ❌ DON'T: Auto-ack causes message loss on crash
channel.basic_consume(
queue='tasks',
on_message_callback=callback,
auto_ack=True # DANGEROUS!
)
# ✅ DO: Manual acknowledgments
channel.basic_consume(
queue='tasks',
on_message_callback=callback,
auto_ack=False
)
# Remember to call ch.basic_ack() in callback
# ❌ DON'T: Queues disappear on restart
channel.queue_declare(queue='tasks')
# ✅ DO: Durable queues survive restarts
channel.queue_declare(queue='tasks', durable=True)
channel.exchange_declare(exchange='orders', durable=True)
# ❌ DON'T: Consumer gets all messages at once
# (No prefetch limit set)
# ✅ DO: Limit unacknowledged messages
channel.basic_qos(prefetch_count=10)
# ❌ DON'T: Failed messages get requeued infinitely
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
# ✅ DO: Configure DLX for failed messages
channel.queue_declare(
queue='tasks',
arguments={'x-dead-letter-exchange': 'dlx'}
)
# ❌ DON'T: Classic mirrored queues (deprecated)
channel.queue_declare(
queue='tasks',
arguments={'x-ha-policy': 'all'}
)
# ✅ DO: Use quorum queues for HA
channel.queue_declare(
queue='tasks',
arguments={'x-queue-type': 'quorum'}
)
# ❌ DON'T: No connection recovery
connection = pika.BlockingConnection(params)
# ✅ DO: Implement retry logic
def create_connection():
retries = 0
while retries < 5:
try:
return pika.BlockingConnection(params)
except Exception as e:
retries += 1
time.sleep(2 ** retries)
raise Exception("Failed to connect")
# ❌ DON'T: Ignore queue buildup
# ✅ DO: Monitor and alert on queue depth
# Prometheus query:
# rabbitmq_queue_messages{queue="tasks"} > 10000
# Set max queue length:
channel.queue_declare(
queue='tasks',
arguments={'x-max-length': 50000}
)
auto_ack=True in production# tests/test_publisher.py
import pytest
from unittest.mock import MagicMock, patch
import pika
class TestMessagePublisher:
"""Unit tests for message publishing"""
@pytest.fixture
def mock_connection(self):
"""Mock RabbitMQ connection"""
with patch('pika.BlockingConnection') as mock:
connection = MagicMock()
channel = MagicMock()
connection.channel.return_value = channel
mock.return_value = connection
yield mock, connection, channel
def test_publish_with_confirms(self, mock_connection):
"""Test publisher enables confirms"""
_, connection, channel = mock_connection
from app.publisher import OrderPublisher
publisher = OrderPublisher()
publisher.publish({"order_id": 123})
channel.confirm_delivery.assert_called_once()
channel.basic_publish.assert_called_once()
def test_publish_sets_persistence(self, mock_connection):
"""Test messages are marked persistent"""
_, connection, channel = mock_connection
from app.publisher import OrderPublisher
publisher = OrderPublisher()
publisher.publish({"order_id": 123})
call_args = channel.basic_publish.call_args
props = call_args.kwargs.get('properties') or call_args[1].get('properties')
assert props.delivery_mode == 2 # Persistent
def test_connection_error_handling(self, mock_connection):
"""Test graceful handling of connection errors"""
mock_cls, connection, channel = mock_connection
mock_cls.side_effect = pika.exceptions.AMQPConnectionError()
from app.publisher import OrderPublisher
with pytest.raises(ConnectionError):
publisher = OrderPublisher()
# tests/integration/test_message_flow.py
import pytest
import pika
import json
import time
@pytest.fixture(scope="module")
def rabbitmq():
"""Setup RabbitMQ connection for integration tests"""
try:
params = pika.ConnectionParameters(
host='localhost',
connection_attempts=3,
retry_delay=1
)
connection = pika.BlockingConnection(params)
channel = connection.channel()
# Setup test infrastructure
channel.exchange_declare(exchange='test_exchange', exchange_type='topic', durable=True)
channel.queue_declare(queue='test_queue', durable=True)
channel.queue_bind(exchange='test_exchange', queue='test_queue', routing_key='test.#')
yield channel
# Cleanup
channel.queue_delete(queue='test_queue')
channel.exchange_delete(exchange='test_exchange')
connection.close()
except pika.exceptions.AMQPConnectionError:
pytest.skip("RabbitMQ not available")
class TestMessageFlow:
"""Integration tests for complete message flows"""
def test_publish_and_consume(self, rabbitmq):
"""Test end-to-end message flow"""
channel = rabbitmq
test_message = {"test_id": 123, "data": "test"}
# Publish
channel.basic_publish(
exchange='test_exchange',
routing_key='test.message',
body=json.dumps(test_message),
properties=pika.BasicProperties(delivery_mode=2)
)
# Consume
method, props, body = channel.basic_get('test_queue')
assert method is not None
received = json.loads(body)
assert received['test_id'] == 123
channel.basic_ack(delivery_tag=method.delivery_tag)
def test_message_persistence(self, rabbitmq):
"""Test message survives broker restart"""
# This test requires manual broker restart
# Mark as slow/manual test
pytest.skip("Requires manual broker restart")
def test_consumer_prefetch(self, rabbitmq):
"""Test prefetch limits unacked messages"""
channel = rabbitmq
channel.basic_qos(prefetch_count=2)
# Publish 5 messages
for i in range(5):
channel.basic_publish(
exchange='',
routing_key='test_queue',
body=f'msg-{i}'.encode()
)
# Consumer should only get 2 at a time
received = []
for _ in range(2):
method, _, body = channel.basic_get('test_queue')
if method:
received.append(body)
# Don't ack yet
# Third get should work since basic_get doesn't respect prefetch
# But basic_consume would respect it
assert len(received) == 2
# Cleanup - ack remaining messages
while True:
method, _, _ = channel.basic_get('test_queue')
if not method:
break
channel.basic_ack(delivery_tag=method.delivery_tag)
# tests/performance/test_throughput.py
import pytest
import pika
import time
import statistics
@pytest.fixture
def perf_channel():
"""Channel for performance testing"""
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='perf_test', durable=True)
channel.confirm_delivery()
yield channel
channel.queue_delete(queue='perf_test')
connection.close()
class TestThroughput:
"""Performance benchmarks for RabbitMQ operations"""
def test_publish_throughput(self, perf_channel):
"""Benchmark: publish 10,000 messages"""
message_count = 10000
message = b'x' * 1024 # 1KB message
start = time.time()
for _ in range(message_count):
perf_channel.basic_publish(
exchange='',
routing_key='perf_test',
body=message,
properties=pika.BasicProperties(delivery_mode=2)
)
elapsed = time.time() - start
rate = message_count / elapsed
print(f"\nPublish rate: {rate:.0f} msg/s")
assert rate > 1000, f"Publish rate {rate} below threshold"
def test_consume_latency(self, perf_channel):
"""Benchmark: measure message latency"""
latencies = []
for _ in range(100):
# Publish with timestamp
send_time = time.time()
perf_channel.basic_publish(
exchange='',
routing_key='perf_test',
body=str(send_time).encode()
)
# Consume immediately
method, _, body = perf_channel.basic_get('perf_test')
receive_time = time.time()
if method:
latency = (receive_time - float(body)) * 1000 # ms
latencies.append(latency)
perf_channel.basic_ack(delivery_tag=method.delivery_tag)
avg_latency = statistics.mean(latencies)
p99_latency = statistics.quantiles(latencies, n=100)[98]
print(f"\nAvg latency: {avg_latency:.2f}ms, P99: {p99_latency:.2f}ms")
assert avg_latency < 10, f"Average latency {avg_latency}ms too high"
# conftest.py
import pytest
def pytest_configure(config):
"""Register custom markers"""
config.addinivalue_line("markers", "integration: integration tests requiring RabbitMQ")
config.addinivalue_line("markers", "slow: slow tests")
config.addinivalue_line("markers", "performance: performance benchmark tests")
# pytest.ini
# [pytest]
# markers =
# integration: integration tests requiring RabbitMQ
# slow: slow running tests
# performance: performance benchmarks
# testpaths = tests
# addopts = -v --tb=short
# Run all tests
pytest tests/ -v
# Run only unit tests (fast, no RabbitMQ needed)
pytest tests/ -v -m "not integration"
# Run integration tests
pytest tests/ -v -m integration
# Run performance benchmarks
pytest tests/performance/ -v -m performance
# Run with coverage
pytest tests/ --cov=app --cov-report=html
# Run specific test file
pytest tests/test_message_queue.py -v
You are a RabbitMQ expert focused on:
Key Principles :
RabbitMQ is the backbone of distributed systems. Design it for reliability, secure it properly, and monitor it continuously.
Weekly Installs
223
Repository
GitHub Stars
29
First Seen
Jan 20, 2026
Security Audits
Gen Agent Trust HubFailSocketPassSnykFail
Installed on
gemini-cli189
opencode189
codex188
github-copilot181
cursor163
amp160
React 组合模式指南:Vercel 组件架构最佳实践,提升代码可维护性
107,800 周安装
| Enable audit logging, monitor access |
| A10:2025 | Exception Handling | DLX for failed messages, proper error logging |