Introduction
Message queues enable asynchronous communication between services, decoupling producers from consumers. RabbitMQ is a reliable, feature-rich message broker that supports multiple messaging patterns. This guide covers practical RabbitMQ implementation for building scalable, resilient systems.
Core Concepts
AMQP Model
- Producer: Sends messages to exchanges
- Exchange: Routes messages to queues based on rules
- Queue: Stores messages until consumed
- Consumer: Receives and processes messages
- Binding: Links exchanges to queues
Exchange Types
| Type | Routing |
|---|
| Direct | Exact routing key match |
| Fanout | Broadcast to all bound queues |
| Topic | Pattern matching on routing key |
| Headers | Match on message headers |
Setup
Docker Installation
# docker-compose.yml
version: '3.8'
services:
rabbitmq:
image: rabbitmq:3-management
ports:
- "5672:5672" # AMQP
- "15672:15672" # Management UI
environment:
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: secret
volumes:
- rabbitmq_data:/var/lib/rabbitmq
volumes:
rabbitmq_data:
PHP with php-amqplib
composer require php-amqplib/php-amqplib
Node.js with amqplib
npm install amqplib
Basic Messaging
PHP Producer
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'admin', 'secret');
$channel = $connection->channel();
// Declare queue
$channel->queue_declare('task_queue', false, true, false, false);
// Create message
$data = json_encode(['task' => 'process_order', 'order_id' => 123]);
$msg = new AMQPMessage($data, [
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'content_type' => 'application/json',
]);
// Publish
$channel->basic_publish($msg, '', 'task_queue');
echo "Sent: $data\n";
$channel->close();
$connection->close();
PHP Consumer
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('localhost', 5672, 'admin', 'secret');
$channel = $connection->channel();
$channel->queue_declare('task_queue', false, true, false, false);
// Fair dispatch: don't give more than one message at a time
$channel->basic_qos(null, 1, null);
$callback = function ($msg) {
$data = json_decode($msg->body, true);
echo "Received: " . $msg->body . "\n";
try {
// Process the message
processTask($data);
// Acknowledge success
$msg->ack();
} catch (Exception $e) {
// Reject and requeue on failure
$msg->nack(true);
}
};
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);
echo "Waiting for messages...\n";
while ($channel->is_consuming()) {
$channel->wait();
}
Node.js Producer
const amqp = require('amqplib');
async function sendMessage(queue, message) {
const connection = await amqp.connect('amqp://admin:secret@localhost');
const channel = await connection.createChannel();
await channel.assertQueue(queue, { durable: true });
channel.sendToQueue(queue, Buffer.from(JSON.stringify(message)), {
persistent: true,
contentType: 'application/json',
});
console.log('Sent:', message);
await channel.close();
await connection.close();
}
sendMessage('task_queue', { task: 'process_order', order_id: 123 });
Node.js Consumer
const amqp = require('amqplib');
async function startConsumer(queue) {
const connection = await amqp.connect('amqp://admin:secret@localhost');
const channel = await connection.createChannel();
await channel.assertQueue(queue, { durable: true });
// Fair dispatch
channel.prefetch(1);
console.log('Waiting for messages...');
channel.consume(queue, async (msg) => {
const data = JSON.parse(msg.content.toString());
console.log('Received:', data);
try {
await processTask(data);
channel.ack(msg);
} catch (error) {
console.error('Processing failed:', error);
channel.nack(msg, false, true); // Requeue
}
});
}
startConsumer('task_queue');
Publish/Subscribe Pattern
Fanout Exchange
// Publisher
$channel->exchange_declare('notifications', 'fanout', false, true, false);
$msg = new AMQPMessage(json_encode(['event' => 'user_created', 'user_id' => 123]));
$channel->basic_publish($msg, 'notifications', '');
// Subscriber 1: Email notifications
$channel->queue_declare('email_notifications', false, true, false, false);
$channel->queue_bind('email_notifications', 'notifications');
// Subscriber 2: SMS notifications
$channel->queue_declare('sms_notifications', false, true, false, false);
$channel->queue_bind('sms_notifications', 'notifications');
Topic-Based Routing
Topic Exchange
// Declare exchange
$channel->exchange_declare('logs', 'topic', false, true, false);
// Publisher: send with routing key
$channel->basic_publish($msg, 'logs', 'order.created');
$channel->basic_publish($msg, 'logs', 'order.payment.failed');
$channel->basic_publish($msg, 'logs', 'user.login');
// Subscriber: all order events
$channel->queue_declare('order_events', false, true, false, false);
$channel->queue_bind('order_events', 'logs', 'order.*');
// Subscriber: all payment failures
$channel->queue_declare('payment_failures', false, true, false, false);
$channel->queue_bind('payment_failures', 'logs', '*.payment.failed');
// Subscriber: all events
$channel->queue_declare('all_events', false, true, false, false);
$channel->queue_bind('all_events', 'logs', '#');
Request/Reply Pattern
// Client (requester)
class RpcClient {
private $connection;
private $channel;
private $callbackQueue;
private $response;
private $correlationId;
public function __construct() {
$this->connection = new AMQPStreamConnection('localhost', 5672, 'admin', 'secret');
$this->channel = $this->connection->channel();
// Create an exclusive callback queue
list($this->callbackQueue,,) = $this->channel->queue_declare(
'', false, false, true, false
);
$this->channel->basic_consume(
$this->callbackQueue, '', false, true, false, false,
function ($msg) {
if ($msg->get('correlation_id') === $this->correlationId) {
$this->response = $msg->body;
}
}
);
}
public function call($data) {
$this->response = null;
$this->correlationId = uniqid();
$msg = new AMQPMessage(json_encode($data), [
'correlation_id' => $this->correlationId,
'reply_to' => $this->callbackQueue,
]);
$this->channel->basic_publish($msg, '', 'rpc_queue');
// Wait for response
while (!$this->response) {
$this->channel->wait();
}
return json_decode($this->response, true);
}
}
// Server (responder)
$channel->queue_declare('rpc_queue', false, true, false, false);
$callback = function ($msg) use ($channel) {
$data = json_decode($msg->body, true);
// Process request
$result = processRequest($data);
// Send response
$response = new AMQPMessage(json_encode($result), [
'correlation_id' => $msg->get('correlation_id'),
]);
$channel->basic_publish($response, '', $msg->get('reply_to'));
$msg->ack();
};
$channel->basic_consume('rpc_queue', '', false, false, false, false, $callback);
Dead Letter Queue
Handle failed messages:
// Main queue with dead letter exchange
$args = new AMQPTable([
'x-dead-letter-exchange' => 'dlx',
'x-dead-letter-routing-key' => 'failed',
'x-message-ttl' => 60000, // Optional: expire after 60 seconds
]);
$channel->queue_declare('main_queue', false, true, false, false, $args);
// Dead letter exchange and queue
$channel->exchange_declare('dlx', 'direct', false, true, false);
$channel->queue_declare('failed_messages', false, true, false, false);
$channel->queue_bind('failed_messages', 'dlx', 'failed');
// Consumer that rejects messages
$callback = function ($msg) {
try {
processMessage($msg);
$msg->ack();
} catch (Exception $e) {
// Message goes to dead letter queue
$msg->nack(false, false); // Don't requeue
}
};
Retry with Exponential Backoff
// Create delayed queues
$delays = [1000, 5000, 30000]; // 1s, 5s, 30s
foreach ($delays as $index => $delay) {
$args = new AMQPTable([
'x-dead-letter-exchange' => '',
'x-dead-letter-routing-key' => 'main_queue',
'x-message-ttl' => $delay,
]);
$channel->queue_declare("retry_queue_$index", false, true, false, false, $args);
}
// Consumer with retry logic
$callback = function ($msg) use ($channel) {
$retryCount = $msg->has('application_headers')
? ($msg->get('application_headers')['x-retry-count'] ?? 0)
: 0;
try {
processMessage($msg);
$msg->ack();
} catch (Exception $e) {
$msg->ack();
if ($retryCount < 3) {
// Retry with delay
$retryMsg = new AMQPMessage($msg->body, [
'application_headers' => new AMQPTable([
'x-retry-count' => $retryCount + 1,
]),
]);
$channel->basic_publish($retryMsg, '', "retry_queue_$retryCount");
} else {
// Send to dead letter queue
$channel->basic_publish(new AMQPMessage($msg->body), '', 'failed_messages');
}
}
};
Connection Management
class RabbitMQConnection {
private static $connection = null;
private static $channel = null;
public static function getChannel() {
if (self::$channel === null || !self::$channel->is_open()) {
self::$connection = new AMQPStreamConnection(
getenv('RABBITMQ_HOST'),
getenv('RABBITMQ_PORT'),
getenv('RABBITMQ_USER'),
getenv('RABBITMQ_PASS'),
'/',
false,
'AMQPLAIN',
null,
'en_US',
3.0, // Connection timeout
10.0, // Read/write timeout
null,
true, // Keepalive
60 // Heartbeat
);
self::$channel = self::$connection->channel();
}
return self::$channel;
}
public static function close() {
if (self::$channel !== null) {
self::$channel->close();
}
if (self::$connection !== null) {
self::$connection->close();
}
}
}
// Register shutdown handler
register_shutdown_function([RabbitMQConnection::class, 'close']);
Monitoring
Management API
# List queues
curl -u admin:secret http://localhost:15672/api/queues
# Queue details
curl -u admin:secret http://localhost:15672/api/queues/%2F/task_queue
# Publish message (testing)
curl -u admin:secret -X POST \
-H "content-type:application/json" \
-d '{"properties":{},"routing_key":"task_queue","payload":"test","payload_encoding":"string"}' \
http://localhost:15672/api/exchanges/%2F/amq.default/publish
Best Practices
- Use durable queues and persistent messages for important data
- Implement dead letter queues for failed messages
- Use prefetch to control consumer load
- Acknowledge messages only after successful processing
- Handle connection failures with reconnection logic
- Monitor queue depth to detect processing issues
- Use meaningful routing keys for topic exchanges
Conclusion
RabbitMQ provides reliable message queuing for building distributed systems. Use direct exchanges for point-to-point messaging, fanout for broadcasting, and topics for flexible routing. Implement dead letter queues and retry logic for resilience. The patterns in this guide help you build scalable, fault-tolerant messaging systems.