О нас Руководства Проекты Контакты
Админка
пожалуйста подождите

Введение

Очереди сообщений обеспечивают асинхронное взаимодействие между сервисами, разделяя производителей и потребителей. RabbitMQ — надёжный, функционально насыщенный брокер сообщений, поддерживающий несколько шаблонов обмена сообщениями. В этом руководстве рассматривается практическая реализация RabbitMQ для построения масштабируемых и устойчивых систем.

Основные понятия

Модель AMQP

  • Producer: отправляет сообщения в exchanges
  • Exchange: маршрутизирует сообщения в queues по правилам
  • Queue: хранит сообщения до их потребления
  • Consumer: получает и обрабатывает сообщения
  • Binding: связывает exchanges с queues

Типы Exchange

ТипМаршрутизация
DirectТочное совпадение routing key
FanoutРассылка во все привязанные queues
TopicСопоставление по шаблону для routing key
HeadersСопоставление по заголовкам сообщения

Настройка

Установка через Docker

# docker-compose.yml
version: '3.8'
services:
rabbitmq:
image: rabbitmq:3-management
ports:
- "5672:5672" # AMQP
- "15672:15672" # Интерфейс управления
environment:
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: secret
volumes:
- rabbitmq_data:/var/lib/rabbitmq
volumes:
rabbitmq_data:

PHP с php-amqplib

composer require php-amqplib/php-amqplib

Node.js с amqplib

npm install amqplib

Базовый обмен сообщениями

PHP Producer

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'admin', 'secret');
$channel = $connection->channel();
// Объявить очередь
$channel->queue_declare('task_queue', false, true, false, false);
// Создать сообщение
$data = json_encode(['task' => 'process_order', 'order_id' => 123]);
$msg = new AMQPMessage($data, [
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'content_type' => 'application/json',
]);
// Опубликовать
$channel->basic_publish($msg, '', 'task_queue');
echo "Sent: $data\n";
$channel->close();
$connection->close();

PHP Consumer

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('localhost', 5672, 'admin', 'secret');
$channel = $connection->channel();
$channel->queue_declare('task_queue', false, true, false, false);
// Справедливое распределение — не выдавать более одного сообщения за раз
$channel->basic_qos(null, 1, null);
$callback = function ($msg) {
$data = json_decode($msg->body, true);
echo "Received: " . $msg->body . "\n";
try {
// Обработать сообщение
processTask($data);
// Подтвердить успешную обработку
$msg->ack();
} catch (Exception $e) {
// Отклонить и повторно поставить в очередь при ошибке
$msg->nack(true);
}
};
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);
echo "Waiting for messages...\n";
while ($channel->is_consuming()) {
$channel->wait();
}

Node.js Producer

const amqp = require('amqplib');
async function sendMessage(queue, message) {
const connection = await amqp.connect('amqp://admin:secret@localhost');
const channel = await connection.createChannel();
await channel.assertQueue(queue, { durable: true });
channel.sendToQueue(queue, Buffer.from(JSON.stringify(message)), {
persistent: true,
contentType: 'application/json',
});
console.log('Sent:', message);
await channel.close();
await connection.close();
}
sendMessage('task_queue', { task: 'process_order', order_id: 123 });

Node.js Consumer

const amqp = require('amqplib');
async function startConsumer(queue) {
const connection = await amqp.connect('amqp://admin:secret@localhost');
const channel = await connection.createChannel();
await channel.assertQueue(queue, { durable: true });
// Справедливое распределение
channel.prefetch(1);
console.log('Waiting for messages...');
channel.consume(queue, async (msg) => {
const data = JSON.parse(msg.content.toString());
console.log('Received:', data);
try {
await processTask(data);
channel.ack(msg);
} catch (error) {
console.error('Processing failed:', error);
channel.nack(msg, false, true); // Повторно поставить в очередь
}
});
}
startConsumer('task_queue');

Шаблон Publish/Subscribe

Fanout Exchange

// Publisher
$channel->exchange_declare('notifications', 'fanout', false, true, false);
$msg = new AMQPMessage(json_encode(['event' => 'user_created', 'user_id' => 123]));
$channel->basic_publish($msg, 'notifications', '');
// Subscriber 1: уведомления по email
$channel->queue_declare('email_notifications', false, true, false, false);
$channel->queue_bind('email_notifications', 'notifications');
// Subscriber 2: уведомления по SMS
$channel->queue_declare('sms_notifications', false, true, false, false);
$channel->queue_bind('sms_notifications', 'notifications');

Маршрутизация на основе Topic

Topic Exchange

// Объявить exchange
$channel->exchange_declare('logs', 'topic', false, true, false);
// Publisher — отправка с routing key
$channel->basic_publish($msg, 'logs', 'order.created');
$channel->basic_publish($msg, 'logs', 'order.payment.failed');
$channel->basic_publish($msg, 'logs', 'user.login');
// Subscriber — все события заказов
$channel->queue_declare('order_events', false, true, false, false);
$channel->queue_bind('order_events', 'logs', 'order.*');
// Subscriber — все сбои платежей
$channel->queue_declare('payment_failures', false, true, false, false);
$channel->queue_bind('payment_failures', 'logs', '*.payment.failed');
// Subscriber — все события
$channel->queue_declare('all_events', false, true, false, false);
$channel->queue_bind('all_events', 'logs', '#');

Шаблон Request/Reply

// Client (запрашивающая сторона)
class RpcClient {
private $connection;
private $channel;
private $callbackQueue;
private $response;
private $correlationId;
public function __construct() {
$this->connection = new AMQPStreamConnection('localhost', 5672, 'admin', 'secret');
$this->channel = $this->connection->channel();
// Создать эксклюзивную callback queue
list($this->callbackQueue,,) = $this->channel->queue_declare(
'', false, false, true, false
);
$this->channel->basic_consume(
$this->callbackQueue, '', false, true, false, false,
function ($msg) {
if ($msg->get('correlation_id') === $this->correlationId) {
$this->response = $msg->body;
}
}
);
}
public function call($data) {
$this->response = null;
$this->correlationId = uniqid();
$msg = new AMQPMessage(json_encode($data), [
'correlation_id' => $this->correlationId,
'reply_to' => $this->callbackQueue,
]);
$this->channel->basic_publish($msg, '', 'rpc_queue');
// Ожидать ответ
while (!$this->response) {
$this->channel->wait();
}
return json_decode($this->response, true);
}
}
// Server (отвечающая сторона)
$channel->queue_declare('rpc_queue', false, true, false, false);
$callback = function ($msg) use ($channel) {
$data = json_decode($msg->body, true);
// Обработать запрос
$result = processRequest($data);
// Отправить ответ
$response = new AMQPMessage(json_encode($result), [
'correlation_id' => $msg->get('correlation_id'),
]);
$channel->basic_publish($response, '', $msg->get('reply_to'));
$msg->ack();
};
$channel->basic_consume('rpc_queue', '', false, false, false, false, $callback);

Dead Letter Queue

Обработка неуспешных сообщений:

// Основная очередь с dead letter exchange
$args = new AMQPTable([
'x-dead-letter-exchange' => 'dlx',
'x-dead-letter-routing-key' => 'failed',
'x-message-ttl' => 60000, // Опционально: истечь через 60 с
]);
$channel->queue_declare('main_queue', false, true, false, false, $args);
// Dead letter exchange и очередь
$channel->exchange_declare('dlx', 'direct', false, true, false);
$channel->queue_declare('failed_messages', false, true, false, false);
$channel->queue_bind('failed_messages', 'dlx', 'failed');
// Consumer, который отклоняет сообщения
$callback = function ($msg) {
try {
processMessage($msg);
$msg->ack();
} catch (Exception $e) {
// Сообщение попадает в dead letter queue
$msg->nack(false, false); // Не выполнять повторную постановку в очередь
}
};

Повторные попытки с экспоненциальной задержкой

// Создать отложенные очереди
$delays = [1000, 5000, 30000]; // 1 с, 5 с, 30 с
foreach ($delays as $index => $delay) {
$args = new AMQPTable([
'x-dead-letter-exchange' => '',
'x-dead-letter-routing-key' => 'main_queue',
'x-message-ttl' => $delay,
]);
$channel->queue_declare("retry_queue_$index", false, true, false, false, $args);
}
// Consumer с логикой повторных попыток
$callback = function ($msg) use ($channel) {
$retryCount = $msg->has('application_headers')
? ($msg->get('application_headers')['x-retry-count'] ?? 0)
: 0;
try {
processMessage($msg);
$msg->ack();
} catch (Exception $e) {
$msg->ack();
if ($retryCount < 3) {
// Повторить с задержкой
$retryMsg = new AMQPMessage($msg->body, [
'application_headers' => new AMQPTable([
'x-retry-count' => $retryCount + 1,
]),
]);
$channel->basic_publish($retryMsg, '', "retry_queue_$retryCount");
} else {
// Отправить в dead letter queue
$channel->basic_publish(new AMQPMessage($msg->body), '', 'failed_messages');
}
}
};

Управление соединениями

class RabbitMQConnection {
private static $connection = null;
private static $channel = null;
public static function getChannel() {
if (self::$channel === null || !self::$channel->is_open()) {
self::$connection = new AMQPStreamConnection(
getenv('RABBITMQ_HOST'),
getenv('RABBITMQ_PORT'),
getenv('RABBITMQ_USER'),
getenv('RABBITMQ_PASS'),
'/',
false,
'AMQPLAIN',
null,
'en_US',
3.0, // Тайм-аут соединения
10.0, // Тайм-аут чтения/записи
null,
true, // Keepalive
60 // Heartbeat
);
self::$channel = self::$connection->channel();
}
return self::$channel;
}
public static function close() {
if (self::$channel !== null) {
self::$channel->close();
}
if (self::$connection !== null) {
self::$connection->close();
}
}
}
// Зарегистрировать обработчик завершения
register_shutdown_function([RabbitMQConnection::class, 'close']);

Мониторинг

Management API

# Список очередей
curl -u admin:secret http://localhost:15672/api/queues
# Детали очереди
curl -u admin:secret http://localhost:15672/api/queues/%2F/task_queue
# Опубликовать сообщение (тестирование)
curl -u admin:secret -X POST \
-H "content-type:application/json" \
-d '{"properties":{},"routing_key":"task_queue","payload":"test","payload_encoding":"string"}' \
http://localhost:15672/api/exchanges/%2F/amq.default/publish

Лучшие практики

  1. Используйте durable queues и persistent messages для важных данных
  2. Реализуйте dead letter queues для неуспешных сообщений
  3. Используйте prefetch для управления нагрузкой на consumer
  4. Подтверждайте сообщения только после успешной обработки
  5. Обрабатывайте сбои соединения с помощью логики переподключения
  6. Отслеживайте глубину очереди для выявления проблем обработки
  7. Используйте осмысленные routing keys для topic exchanges

Заключение

RabbitMQ обеспечивает надёжную постановку сообщений в очередь для построения распределённых систем. Используйте direct exchanges для обмена «точка-точка», fanout — для рассылки, а topics — для гибкой маршрутизации. Для устойчивости реализуйте dead letter queues и логику повторных попыток. Шаблоны из этого руководства помогут вам создавать масштабируемые и отказоустойчивые системы обмена сообщениями.

 
 
 
Языки
Темы
Copyright © 1999 — 2026
Зетка Интерактив