Introdução
As filas de mensagens permitem comunicação assíncrona entre serviços, desacoplando produtores de consumidores. O RabbitMQ é um message broker fiável e rico em funcionalidades que suporta vários padrões de messaging. Este guia aborda a implementação prática do RabbitMQ para construir sistemas escaláveis e resilientes.
Conceitos Fundamentais
Modelo AMQP
- Producer: Envia mensagens para exchanges
- Exchange: Encaminha mensagens para queues com base em regras
- Queue: Armazena mensagens até serem consumidas
- Consumer: Recebe e processa mensagens
- Binding: Liga exchanges a queues
Tipos de Exchange
| Tipo | Encaminhamento |
|---|
| Direct | Correspondência exata da routing key |
| Fanout | Difusão para todas as queues associadas |
| Topic | Correspondência por padrão na routing key |
| Headers | Correspondência com base nos headers da mensagem |
Configuração
Instalação com Docker
# docker-compose.yml
version: '3.8'
services:
rabbitmq:
image: rabbitmq:3-management
ports:
- "5672:5672" # AMQP
- "15672:15672" # IU de gestão
environment:
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: secret
volumes:
- rabbitmq_data:/var/lib/rabbitmq
volumes:
rabbitmq_data:
PHP com php-amqplib
composer require php-amqplib/php-amqplib
Node.js com amqplib
npm install amqplib
Messaging Básico
PHP Producer
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'admin', 'secret');
$channel = $connection->channel();
// Declarar queue
$channel->queue_declare('task_queue', false, true, false, false);
// Criar mensagem
$data = json_encode(['task' => 'process_order', 'order_id' => 123]);
$msg = new AMQPMessage($data, [
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'content_type' => 'application/json',
]);
// Publicar
$channel->basic_publish($msg, '', 'task_queue');
echo "Sent: $data\n";
$channel->close();
$connection->close();
PHP Consumer
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('localhost', 5672, 'admin', 'secret');
$channel = $connection->channel();
$channel->queue_declare('task_queue', false, true, false, false);
// Fair dispatch — não atribuir mais do que uma mensagem de cada vez
$channel->basic_qos(null, 1, null);
$callback = function ($msg) {
$data = json_decode($msg->body, true);
echo "Received: " . $msg->body . "\n";
try {
// Processar a mensagem
processTask($data);
// Confirmar sucesso
$msg->ack();
} catch (Exception $e) {
// Rejeitar e reenfileirar em caso de falha
$msg->nack(true);
}
};
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);
echo "Waiting for messages...\n";
while ($channel->is_consuming()) {
$channel->wait();
}
Node.js Producer
const amqp = require('amqplib');
async function sendMessage(queue, message) {
const connection = await amqp.connect('amqp://admin:secret@localhost');
const channel = await connection.createChannel();
await channel.assertQueue(queue, { durable: true });
channel.sendToQueue(queue, Buffer.from(JSON.stringify(message)), {
persistent: true,
contentType: 'application/json',
});
console.log('Sent:', message);
await channel.close();
await connection.close();
}
sendMessage('task_queue', { task: 'process_order', order_id: 123 });
Node.js Consumer
const amqp = require('amqplib');
async function startConsumer(queue) {
const connection = await amqp.connect('amqp://admin:secret@localhost');
const channel = await connection.createChannel();
await channel.assertQueue(queue, { durable: true });
// Fair dispatch
channel.prefetch(1);
console.log('Waiting for messages...');
channel.consume(queue, async (msg) => {
const data = JSON.parse(msg.content.toString());
console.log('Received:', data);
try {
await processTask(data);
channel.ack(msg);
} catch (error) {
console.error('Processing failed:', error);
channel.nack(msg, false, true); // Reenfileirar
}
});
}
startConsumer('task_queue');
Padrão Publish/Subscribe
Fanout Exchange
// Publisher
$channel->exchange_declare('notifications', 'fanout', false, true, false);
$msg = new AMQPMessage(json_encode(['event' => 'user_created', 'user_id' => 123]));
$channel->basic_publish($msg, 'notifications', '');
// Subscriber 1: Notificações por e-mail
$channel->queue_declare('email_notifications', false, true, false, false);
$channel->queue_bind('email_notifications', 'notifications');
// Subscriber 2: Notificações por SMS
$channel->queue_declare('sms_notifications', false, true, false, false);
$channel->queue_bind('sms_notifications', 'notifications');
Encaminhamento Baseado em Tópicos
Topic Exchange
// Declarar exchange
$channel->exchange_declare('logs', 'topic', false, true, false);
// Publisher — enviar com routing key
$channel->basic_publish($msg, 'logs', 'order.created');
$channel->basic_publish($msg, 'logs', 'order.payment.failed');
$channel->basic_publish($msg, 'logs', 'user.login');
// Subscriber — todos os eventos de encomendas
$channel->queue_declare('order_events', false, true, false, false);
$channel->queue_bind('order_events', 'logs', 'order.*');
// Subscriber — todas as falhas de pagamento
$channel->queue_declare('payment_failures', false, true, false, false);
$channel->queue_bind('payment_failures', 'logs', '*.payment.failed');
// Subscriber — todos os eventos
$channel->queue_declare('all_events', false, true, false, false);
$channel->queue_bind('all_events', 'logs', '#');
Padrão Request/Reply
// Client (requerente)
class RpcClient {
private $connection;
private $channel;
private $callbackQueue;
private $response;
private $correlationId;
public function __construct() {
$this->connection = new AMQPStreamConnection('localhost', 5672, 'admin', 'secret');
$this->channel = $this->connection->channel();
// Criar callback queue exclusiva
list($this->callbackQueue,,) = $this->channel->queue_declare(
'', false, false, true, false
);
$this->channel->basic_consume(
$this->callbackQueue, '', false, true, false, false,
function ($msg) {
if ($msg->get('correlation_id') === $this->correlationId) {
$this->response = $msg->body;
}
}
);
}
public function call($data) {
$this->response = null;
$this->correlationId = uniqid();
$msg = new AMQPMessage(json_encode($data), [
'correlation_id' => $this->correlationId,
'reply_to' => $this->callbackQueue,
]);
$this->channel->basic_publish($msg, '', 'rpc_queue');
// Aguardar resposta
while (!$this->response) {
$this->channel->wait();
}
return json_decode($this->response, true);
}
}
// Server (respondente)
$channel->queue_declare('rpc_queue', false, true, false, false);
$callback = function ($msg) use ($channel) {
$data = json_decode($msg->body, true);
// Processar pedido
$result = processRequest($data);
// Enviar resposta
$response = new AMQPMessage(json_encode($result), [
'correlation_id' => $msg->get('correlation_id'),
]);
$channel->basic_publish($response, '', $msg->get('reply_to'));
$msg->ack();
};
$channel->basic_consume('rpc_queue', '', false, false, false, false, $callback);
Dead Letter Queue
Tratar mensagens com falhas:
// Queue principal com dead letter exchange
$args = new AMQPTable([
'x-dead-letter-exchange' => 'dlx',
'x-dead-letter-routing-key' => 'failed',
'x-message-ttl' => 60000, // Opcional: expirar após 60 s
]);
$channel->queue_declare('main_queue', false, true, false, false, $args);
// Dead letter exchange e queue
$channel->exchange_declare('dlx', 'direct', false, true, false);
$channel->queue_declare('failed_messages', false, true, false, false);
$channel->queue_bind('failed_messages', 'dlx', 'failed');
// Consumer que rejeita mensagens
$callback = function ($msg) {
try {
processMessage($msg);
$msg->ack();
} catch (Exception $e) {
// A mensagem vai para a dead letter queue
$msg->nack(false, false); // Não reenfileirar
}
};
Retry com Exponential Backoff
// Criar queues com atraso
$delays = [1000, 5000, 30000]; // 1 s, 5 s, 30 s
foreach ($delays as $index => $delay) {
$args = new AMQPTable([
'x-dead-letter-exchange' => '',
'x-dead-letter-routing-key' => 'main_queue',
'x-message-ttl' => $delay,
]);
$channel->queue_declare("retry_queue_$index", false, true, false, false, $args);
}
// Consumer com lógica de retry
$callback = function ($msg) use ($channel) {
$retryCount = $msg->has('application_headers')
? ($msg->get('application_headers')['x-retry-count'] ?? 0)
: 0;
try {
processMessage($msg);
$msg->ack();
} catch (Exception $e) {
$msg->ack();
if ($retryCount < 3) {
// Retry com atraso
$retryMsg = new AMQPMessage($msg->body, [
'application_headers' => new AMQPTable([
'x-retry-count' => $retryCount + 1,
]),
]);
$channel->basic_publish($retryMsg, '', "retry_queue_$retryCount");
} else {
// Enviar para a dead letter queue
$channel->basic_publish(new AMQPMessage($msg->body), '', 'failed_messages');
}
}
};
Gestão de Ligações
class RabbitMQConnection {
private static $connection = null;
private static $channel = null;
public static function getChannel() {
if (self::$channel === null || !self::$channel->is_open()) {
self::$connection = new AMQPStreamConnection(
getenv('RABBITMQ_HOST'),
getenv('RABBITMQ_PORT'),
getenv('RABBITMQ_USER'),
getenv('RABBITMQ_PASS'),
'/',
false,
'AMQPLAIN',
null,
'en_US',
3.0, // Timeout de ligação
10.0, // Timeout de leitura/escrita
null,
true, // Keepalive
60 // Heartbeat
);
self::$channel = self::$connection->channel();
}
return self::$channel;
}
public static function close() {
if (self::$channel !== null) {
self::$channel->close();
}
if (self::$connection !== null) {
self::$connection->close();
}
}
}
// Registar handler de encerramento
register_shutdown_function([RabbitMQConnection::class, 'close']);
Monitorização
Management API
# Listar queues
curl -u admin:secret http://localhost:15672/api/queues
# Detalhes da queue
curl -u admin:secret http://localhost:15672/api/queues/%2F/task_queue
# Publicar mensagem (testes)
curl -u admin:secret -X POST \
-H "content-type:application/json" \
-d '{"properties":{},"routing_key":"task_queue","payload":"test","payload_encoding":"string"}' \
http://localhost:15672/api/exchanges/%2F/amq.default/publish
Boas Práticas
- Use queues duráveis e mensagens persistentes para dados importantes
- Implemente dead letter queues para mensagens com falhas
- Use prefetch para controlar a carga do consumer
- Confirme (acknowledge) as mensagens apenas após processamento bem-sucedido
- Trate falhas de ligação com lógica de reconexão
- Monitorize a profundidade da queue para detetar problemas de processamento
- Use routing keys significativas para topic exchanges
Conclusão
O RabbitMQ fornece filas de mensagens fiáveis para construir sistemas distribuídos. Use direct exchanges para messaging ponto-a-ponto, fanout para difusão e topics para encaminhamento flexível. Implemente dead letter queues e lógica de retry para resiliência. Os padrões neste guia ajudam a construir sistemas de messaging escaláveis e tolerantes a falhas.