About us Guides Projects Contacts
Админка
please wait

Introduction

Message queues enable asynchronous communication between services, decoupling producers from consumers. RabbitMQ is a reliable, feature-rich message broker that supports multiple messaging patterns. This guide covers practical RabbitMQ implementation for building scalable, resilient systems.

Core Concepts

AMQP Model

  • Producer: Sends messages to exchanges
  • Exchange: Routes messages to queues based on rules
  • Queue: Stores messages until consumed
  • Consumer: Receives and processes messages
  • Binding: Links exchanges to queues

Exchange Types

TypeRouting
DirectExact routing key match
FanoutBroadcast to all bound queues
TopicPattern matching on routing key
HeadersMatch on message headers

Setup

Docker Installation

# docker-compose.yml
version: '3.8'
services:
rabbitmq:
image: rabbitmq:3-management
ports:
- "5672:5672" # AMQP
- "15672:15672" # Management UI
environment:
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: secret
volumes:
- rabbitmq_data:/var/lib/rabbitmq
volumes:
rabbitmq_data:

PHP with php-amqplib

composer require php-amqplib/php-amqplib

Node.js with amqplib

npm install amqplib

Basic Messaging

PHP Producer

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'admin', 'secret');
$channel = $connection->channel();
// Declare queue
$channel->queue_declare('task_queue', false, true, false, false);
// Create message
$data = json_encode(['task' => 'process_order', 'order_id' => 123]);
$msg = new AMQPMessage($data, [
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'content_type' => 'application/json',
]);
// Publish
$channel->basic_publish($msg, '', 'task_queue');
echo "Sent: $data\n";
$channel->close();
$connection->close();

PHP Consumer

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('localhost', 5672, 'admin', 'secret');
$channel = $connection->channel();
$channel->queue_declare('task_queue', false, true, false, false);
// Fair dispatch: don't give more than one message at a time
$channel->basic_qos(null, 1, null);
$callback = function ($msg) {
$data = json_decode($msg->body, true);
echo "Received: " . $msg->body . "\n";
try {
// Process the message
processTask($data);
// Acknowledge success
$msg->ack();
} catch (Exception $e) {
// Reject and requeue on failure
$msg->nack(true);
}
};
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);
echo "Waiting for messages...\n";
while ($channel->is_consuming()) {
$channel->wait();
}

Node.js Producer

const amqp = require('amqplib');
async function sendMessage(queue, message) {
const connection = await amqp.connect('amqp://admin:secret@localhost');
const channel = await connection.createChannel();
await channel.assertQueue(queue, { durable: true });
channel.sendToQueue(queue, Buffer.from(JSON.stringify(message)), {
persistent: true,
contentType: 'application/json',
});
console.log('Sent:', message);
await channel.close();
await connection.close();
}
sendMessage('task_queue', { task: 'process_order', order_id: 123 });

Node.js Consumer

const amqp = require('amqplib');
async function startConsumer(queue) {
const connection = await amqp.connect('amqp://admin:secret@localhost');
const channel = await connection.createChannel();
await channel.assertQueue(queue, { durable: true });
// Fair dispatch
channel.prefetch(1);
console.log('Waiting for messages...');
channel.consume(queue, async (msg) => {
const data = JSON.parse(msg.content.toString());
console.log('Received:', data);
try {
await processTask(data);
channel.ack(msg);
} catch (error) {
console.error('Processing failed:', error);
channel.nack(msg, false, true); // Requeue
}
});
}
startConsumer('task_queue');

Publish/Subscribe Pattern

Fanout Exchange

// Publisher
$channel->exchange_declare('notifications', 'fanout', false, true, false);
$msg = new AMQPMessage(json_encode(['event' => 'user_created', 'user_id' => 123]));
$channel->basic_publish($msg, 'notifications', '');
// Subscriber 1: Email notifications
$channel->queue_declare('email_notifications', false, true, false, false);
$channel->queue_bind('email_notifications', 'notifications');
// Subscriber 2: SMS notifications
$channel->queue_declare('sms_notifications', false, true, false, false);
$channel->queue_bind('sms_notifications', 'notifications');

Topic-Based Routing

Topic Exchange

// Declare exchange
$channel->exchange_declare('logs', 'topic', false, true, false);
// Publisher: send with routing key
$channel->basic_publish($msg, 'logs', 'order.created');
$channel->basic_publish($msg, 'logs', 'order.payment.failed');
$channel->basic_publish($msg, 'logs', 'user.login');
// Subscriber: all order events
$channel->queue_declare('order_events', false, true, false, false);
$channel->queue_bind('order_events', 'logs', 'order.*');
// Subscriber: all payment failures
$channel->queue_declare('payment_failures', false, true, false, false);
$channel->queue_bind('payment_failures', 'logs', '*.payment.failed');
// Subscriber: all events
$channel->queue_declare('all_events', false, true, false, false);
$channel->queue_bind('all_events', 'logs', '#');

Request/Reply Pattern

// Client (requester)
class RpcClient {
private $connection;
private $channel;
private $callbackQueue;
private $response;
private $correlationId;
public function __construct() {
$this->connection = new AMQPStreamConnection('localhost', 5672, 'admin', 'secret');
$this->channel = $this->connection->channel();
// Create an exclusive callback queue
list($this->callbackQueue,,) = $this->channel->queue_declare(
'', false, false, true, false
);
$this->channel->basic_consume(
$this->callbackQueue, '', false, true, false, false,
function ($msg) {
if ($msg->get('correlation_id') === $this->correlationId) {
$this->response = $msg->body;
}
}
);
}
public function call($data) {
$this->response = null;
$this->correlationId = uniqid();
$msg = new AMQPMessage(json_encode($data), [
'correlation_id' => $this->correlationId,
'reply_to' => $this->callbackQueue,
]);
$this->channel->basic_publish($msg, '', 'rpc_queue');
// Wait for response
while (!$this->response) {
$this->channel->wait();
}
return json_decode($this->response, true);
}
}
// Server (responder)
$channel->queue_declare('rpc_queue', false, true, false, false);
$callback = function ($msg) use ($channel) {
$data = json_decode($msg->body, true);
// Process request
$result = processRequest($data);
// Send response
$response = new AMQPMessage(json_encode($result), [
'correlation_id' => $msg->get('correlation_id'),
]);
$channel->basic_publish($response, '', $msg->get('reply_to'));
$msg->ack();
};
$channel->basic_consume('rpc_queue', '', false, false, false, false, $callback);

Dead Letter Queue

Handle failed messages:

// Main queue with dead letter exchange
$args = new AMQPTable([
'x-dead-letter-exchange' => 'dlx',
'x-dead-letter-routing-key' => 'failed',
'x-message-ttl' => 60000, // Optional: expire after 60 seconds
]);
$channel->queue_declare('main_queue', false, true, false, false, $args);
// Dead letter exchange and queue
$channel->exchange_declare('dlx', 'direct', false, true, false);
$channel->queue_declare('failed_messages', false, true, false, false);
$channel->queue_bind('failed_messages', 'dlx', 'failed');
// Consumer that rejects messages
$callback = function ($msg) {
try {
processMessage($msg);
$msg->ack();
} catch (Exception $e) {
// Message goes to dead letter queue
$msg->nack(false, false); // Don't requeue
}
};

Retry with Exponential Backoff

// Create delayed queues
$delays = [1000, 5000, 30000]; // 1s, 5s, 30s
foreach ($delays as $index => $delay) {
$args = new AMQPTable([
'x-dead-letter-exchange' => '',
'x-dead-letter-routing-key' => 'main_queue',
'x-message-ttl' => $delay,
]);
$channel->queue_declare("retry_queue_$index", false, true, false, false, $args);
}
// Consumer with retry logic
$callback = function ($msg) use ($channel) {
$retryCount = $msg->has('application_headers')
? ($msg->get('application_headers')['x-retry-count'] ?? 0)
: 0;
try {
processMessage($msg);
$msg->ack();
} catch (Exception $e) {
$msg->ack();
if ($retryCount < 3) {
// Retry with delay
$retryMsg = new AMQPMessage($msg->body, [
'application_headers' => new AMQPTable([
'x-retry-count' => $retryCount + 1,
]),
]);
$channel->basic_publish($retryMsg, '', "retry_queue_$retryCount");
} else {
// Send to dead letter queue
$channel->basic_publish(new AMQPMessage($msg->body), '', 'failed_messages');
}
}
};

Connection Management

class RabbitMQConnection {
private static $connection = null;
private static $channel = null;
public static function getChannel() {
if (self::$channel === null || !self::$channel->is_open()) {
self::$connection = new AMQPStreamConnection(
getenv('RABBITMQ_HOST'),
getenv('RABBITMQ_PORT'),
getenv('RABBITMQ_USER'),
getenv('RABBITMQ_PASS'),
'/',
false,
'AMQPLAIN',
null,
'en_US',
3.0, // Connection timeout
10.0, // Read/write timeout
null,
true, // Keepalive
60 // Heartbeat
);
self::$channel = self::$connection->channel();
}
return self::$channel;
}
public static function close() {
if (self::$channel !== null) {
self::$channel->close();
}
if (self::$connection !== null) {
self::$connection->close();
}
}
}
// Register shutdown handler
register_shutdown_function([RabbitMQConnection::class, 'close']);

Monitoring

Management API

# List queues
curl -u admin:secret http://localhost:15672/api/queues
# Queue details
curl -u admin:secret http://localhost:15672/api/queues/%2F/task_queue
# Publish message (testing)
curl -u admin:secret -X POST \
-H "content-type:application/json" \
-d '{"properties":{},"routing_key":"task_queue","payload":"test","payload_encoding":"string"}' \
http://localhost:15672/api/exchanges/%2F/amq.default/publish

Best Practices

  1. Use durable queues and persistent messages for important data
  2. Implement dead letter queues for failed messages
  3. Use prefetch to control consumer load
  4. Acknowledge messages only after successful processing
  5. Handle connection failures with reconnection logic
  6. Monitor queue depth to detect processing issues
  7. Use meaningful routing keys for topic exchanges

Conclusion

RabbitMQ provides reliable message queuing for building distributed systems. Use direct exchanges for point-to-point messaging, fanout for broadcasting, and topics for flexible routing. Implement dead letter queues and retry logic for resilience. The patterns in this guide help you build scalable, fault-tolerant messaging systems.

 
 
 
Языки
Темы
Copyright © 1999 — 2026
ZK Interactive