Введение
Очереди сообщений обеспечивают асинхронное взаимодействие между сервисами, разделяя производителей и потребителей. RabbitMQ — надёжный, функционально насыщенный брокер сообщений, поддерживающий несколько шаблонов обмена сообщениями. В этом руководстве рассматривается практическая реализация RabbitMQ для построения масштабируемых и устойчивых систем.
Основные понятия
Модель AMQP
- Producer: отправляет сообщения в exchanges
- Exchange: маршрутизирует сообщения в queues по правилам
- Queue: хранит сообщения до их потребления
- Consumer: получает и обрабатывает сообщения
- Binding: связывает exchanges с queues
Типы Exchange
| Тип | Маршрутизация |
|---|
| Direct | Точное совпадение routing key |
| Fanout | Рассылка во все привязанные queues |
| Topic | Сопоставление по шаблону для routing key |
| Headers | Сопоставление по заголовкам сообщения |
Настройка
Установка через Docker
# docker-compose.yml
version: '3.8'
services:
rabbitmq:
image: rabbitmq:3-management
ports:
- "5672:5672" # AMQP
- "15672:15672" # Интерфейс управления
environment:
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: secret
volumes:
- rabbitmq_data:/var/lib/rabbitmq
volumes:
rabbitmq_data:
PHP с php-amqplib
composer require php-amqplib/php-amqplib
Node.js с amqplib
npm install amqplib
Базовый обмен сообщениями
PHP Producer
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'admin', 'secret');
$channel = $connection->channel();
// Объявить очередь
$channel->queue_declare('task_queue', false, true, false, false);
// Создать сообщение
$data = json_encode(['task' => 'process_order', 'order_id' => 123]);
$msg = new AMQPMessage($data, [
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'content_type' => 'application/json',
]);
// Опубликовать
$channel->basic_publish($msg, '', 'task_queue');
echo "Sent: $data\n";
$channel->close();
$connection->close();
PHP Consumer
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('localhost', 5672, 'admin', 'secret');
$channel = $connection->channel();
$channel->queue_declare('task_queue', false, true, false, false);
// Справедливое распределение — не выдавать более одного сообщения за раз
$channel->basic_qos(null, 1, null);
$callback = function ($msg) {
$data = json_decode($msg->body, true);
echo "Received: " . $msg->body . "\n";
try {
// Обработать сообщение
processTask($data);
// Подтвердить успешную обработку
$msg->ack();
} catch (Exception $e) {
// Отклонить и повторно поставить в очередь при ошибке
$msg->nack(true);
}
};
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);
echo "Waiting for messages...\n";
while ($channel->is_consuming()) {
$channel->wait();
}
Node.js Producer
const amqp = require('amqplib');
async function sendMessage(queue, message) {
const connection = await amqp.connect('amqp://admin:secret@localhost');
const channel = await connection.createChannel();
await channel.assertQueue(queue, { durable: true });
channel.sendToQueue(queue, Buffer.from(JSON.stringify(message)), {
persistent: true,
contentType: 'application/json',
});
console.log('Sent:', message);
await channel.close();
await connection.close();
}
sendMessage('task_queue', { task: 'process_order', order_id: 123 });
Node.js Consumer
const amqp = require('amqplib');
async function startConsumer(queue) {
const connection = await amqp.connect('amqp://admin:secret@localhost');
const channel = await connection.createChannel();
await channel.assertQueue(queue, { durable: true });
// Справедливое распределение
channel.prefetch(1);
console.log('Waiting for messages...');
channel.consume(queue, async (msg) => {
const data = JSON.parse(msg.content.toString());
console.log('Received:', data);
try {
await processTask(data);
channel.ack(msg);
} catch (error) {
console.error('Processing failed:', error);
channel.nack(msg, false, true); // Повторно поставить в очередь
}
});
}
startConsumer('task_queue');
Шаблон Publish/Subscribe
Fanout Exchange
// Publisher
$channel->exchange_declare('notifications', 'fanout', false, true, false);
$msg = new AMQPMessage(json_encode(['event' => 'user_created', 'user_id' => 123]));
$channel->basic_publish($msg, 'notifications', '');
// Subscriber 1: уведомления по email
$channel->queue_declare('email_notifications', false, true, false, false);
$channel->queue_bind('email_notifications', 'notifications');
// Subscriber 2: уведомления по SMS
$channel->queue_declare('sms_notifications', false, true, false, false);
$channel->queue_bind('sms_notifications', 'notifications');
Маршрутизация на основе Topic
Topic Exchange
// Объявить exchange
$channel->exchange_declare('logs', 'topic', false, true, false);
// Publisher — отправка с routing key
$channel->basic_publish($msg, 'logs', 'order.created');
$channel->basic_publish($msg, 'logs', 'order.payment.failed');
$channel->basic_publish($msg, 'logs', 'user.login');
// Subscriber — все события заказов
$channel->queue_declare('order_events', false, true, false, false);
$channel->queue_bind('order_events', 'logs', 'order.*');
// Subscriber — все сбои платежей
$channel->queue_declare('payment_failures', false, true, false, false);
$channel->queue_bind('payment_failures', 'logs', '*.payment.failed');
// Subscriber — все события
$channel->queue_declare('all_events', false, true, false, false);
$channel->queue_bind('all_events', 'logs', '#');
Шаблон Request/Reply
// Client (запрашивающая сторона)
class RpcClient {
private $connection;
private $channel;
private $callbackQueue;
private $response;
private $correlationId;
public function __construct() {
$this->connection = new AMQPStreamConnection('localhost', 5672, 'admin', 'secret');
$this->channel = $this->connection->channel();
// Создать эксклюзивную callback queue
list($this->callbackQueue,,) = $this->channel->queue_declare(
'', false, false, true, false
);
$this->channel->basic_consume(
$this->callbackQueue, '', false, true, false, false,
function ($msg) {
if ($msg->get('correlation_id') === $this->correlationId) {
$this->response = $msg->body;
}
}
);
}
public function call($data) {
$this->response = null;
$this->correlationId = uniqid();
$msg = new AMQPMessage(json_encode($data), [
'correlation_id' => $this->correlationId,
'reply_to' => $this->callbackQueue,
]);
$this->channel->basic_publish($msg, '', 'rpc_queue');
// Ожидать ответ
while (!$this->response) {
$this->channel->wait();
}
return json_decode($this->response, true);
}
}
// Server (отвечающая сторона)
$channel->queue_declare('rpc_queue', false, true, false, false);
$callback = function ($msg) use ($channel) {
$data = json_decode($msg->body, true);
// Обработать запрос
$result = processRequest($data);
// Отправить ответ
$response = new AMQPMessage(json_encode($result), [
'correlation_id' => $msg->get('correlation_id'),
]);
$channel->basic_publish($response, '', $msg->get('reply_to'));
$msg->ack();
};
$channel->basic_consume('rpc_queue', '', false, false, false, false, $callback);
Dead Letter Queue
Обработка неуспешных сообщений:
// Основная очередь с dead letter exchange
$args = new AMQPTable([
'x-dead-letter-exchange' => 'dlx',
'x-dead-letter-routing-key' => 'failed',
'x-message-ttl' => 60000, // Опционально: истечь через 60 с
]);
$channel->queue_declare('main_queue', false, true, false, false, $args);
// Dead letter exchange и очередь
$channel->exchange_declare('dlx', 'direct', false, true, false);
$channel->queue_declare('failed_messages', false, true, false, false);
$channel->queue_bind('failed_messages', 'dlx', 'failed');
// Consumer, который отклоняет сообщения
$callback = function ($msg) {
try {
processMessage($msg);
$msg->ack();
} catch (Exception $e) {
// Сообщение попадает в dead letter queue
$msg->nack(false, false); // Не выполнять повторную постановку в очередь
}
};
Повторные попытки с экспоненциальной задержкой
// Создать отложенные очереди
$delays = [1000, 5000, 30000]; // 1 с, 5 с, 30 с
foreach ($delays as $index => $delay) {
$args = new AMQPTable([
'x-dead-letter-exchange' => '',
'x-dead-letter-routing-key' => 'main_queue',
'x-message-ttl' => $delay,
]);
$channel->queue_declare("retry_queue_$index", false, true, false, false, $args);
}
// Consumer с логикой повторных попыток
$callback = function ($msg) use ($channel) {
$retryCount = $msg->has('application_headers')
? ($msg->get('application_headers')['x-retry-count'] ?? 0)
: 0;
try {
processMessage($msg);
$msg->ack();
} catch (Exception $e) {
$msg->ack();
if ($retryCount < 3) {
// Повторить с задержкой
$retryMsg = new AMQPMessage($msg->body, [
'application_headers' => new AMQPTable([
'x-retry-count' => $retryCount + 1,
]),
]);
$channel->basic_publish($retryMsg, '', "retry_queue_$retryCount");
} else {
// Отправить в dead letter queue
$channel->basic_publish(new AMQPMessage($msg->body), '', 'failed_messages');
}
}
};
Управление соединениями
class RabbitMQConnection {
private static $connection = null;
private static $channel = null;
public static function getChannel() {
if (self::$channel === null || !self::$channel->is_open()) {
self::$connection = new AMQPStreamConnection(
getenv('RABBITMQ_HOST'),
getenv('RABBITMQ_PORT'),
getenv('RABBITMQ_USER'),
getenv('RABBITMQ_PASS'),
'/',
false,
'AMQPLAIN',
null,
'en_US',
3.0, // Тайм-аут соединения
10.0, // Тайм-аут чтения/записи
null,
true, // Keepalive
60 // Heartbeat
);
self::$channel = self::$connection->channel();
}
return self::$channel;
}
public static function close() {
if (self::$channel !== null) {
self::$channel->close();
}
if (self::$connection !== null) {
self::$connection->close();
}
}
}
// Зарегистрировать обработчик завершения
register_shutdown_function([RabbitMQConnection::class, 'close']);
Мониторинг
Management API
# Список очередей
curl -u admin:secret http://localhost:15672/api/queues
# Детали очереди
curl -u admin:secret http://localhost:15672/api/queues/%2F/task_queue
# Опубликовать сообщение (тестирование)
curl -u admin:secret -X POST \
-H "content-type:application/json" \
-d '{"properties":{},"routing_key":"task_queue","payload":"test","payload_encoding":"string"}' \
http://localhost:15672/api/exchanges/%2F/amq.default/publish
Лучшие практики
- Используйте durable queues и persistent messages для важных данных
- Реализуйте dead letter queues для неуспешных сообщений
- Используйте prefetch для управления нагрузкой на consumer
- Подтверждайте сообщения только после успешной обработки
- Обрабатывайте сбои соединения с помощью логики переподключения
- Отслеживайте глубину очереди для выявления проблем обработки
- Используйте осмысленные routing keys для topic exchanges
Заключение
RabbitMQ обеспечивает надёжную постановку сообщений в очередь для построения распределённых систем. Используйте direct exchanges для обмена «точка-точка», fanout — для рассылки, а topics — для гибкой маршрутизации. Для устойчивости реализуйте dead letter queues и логику повторных попыток. Шаблоны из этого руководства помогут вам создавать масштабируемые и отказоустойчивые системы обмена сообщениями.