Sobre nós Guias Projetos Contactos
Админка
please wait

Introdução

As filas de mensagens permitem comunicação assíncrona entre serviços, desacoplando produtores de consumidores. O RabbitMQ é um message broker fiável e rico em funcionalidades que suporta vários padrões de messaging. Este guia aborda a implementação prática do RabbitMQ para construir sistemas escaláveis e resilientes.

Conceitos Fundamentais

Modelo AMQP

  • Producer: Envia mensagens para exchanges
  • Exchange: Encaminha mensagens para queues com base em regras
  • Queue: Armazena mensagens até serem consumidas
  • Consumer: Recebe e processa mensagens
  • Binding: Liga exchanges a queues

Tipos de Exchange

TipoEncaminhamento
DirectCorrespondência exata da routing key
FanoutDifusão para todas as queues associadas
TopicCorrespondência por padrão na routing key
HeadersCorrespondência com base nos headers da mensagem

Configuração

Instalação com Docker

# docker-compose.yml
version: '3.8'
services:
rabbitmq:
image: rabbitmq:3-management
ports:
- "5672:5672" # AMQP
- "15672:15672" # IU de gestão
environment:
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: secret
volumes:
- rabbitmq_data:/var/lib/rabbitmq
volumes:
rabbitmq_data:

PHP com php-amqplib

composer require php-amqplib/php-amqplib

Node.js com amqplib

npm install amqplib

Messaging Básico

PHP Producer

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'admin', 'secret');
$channel = $connection->channel();
// Declarar queue
$channel->queue_declare('task_queue', false, true, false, false);
// Criar mensagem
$data = json_encode(['task' => 'process_order', 'order_id' => 123]);
$msg = new AMQPMessage($data, [
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'content_type' => 'application/json',
]);
// Publicar
$channel->basic_publish($msg, '', 'task_queue');
echo "Sent: $data\n";
$channel->close();
$connection->close();

PHP Consumer

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('localhost', 5672, 'admin', 'secret');
$channel = $connection->channel();
$channel->queue_declare('task_queue', false, true, false, false);
// Fair dispatch — não atribuir mais do que uma mensagem de cada vez
$channel->basic_qos(null, 1, null);
$callback = function ($msg) {
$data = json_decode($msg->body, true);
echo "Received: " . $msg->body . "\n";
try {
// Processar a mensagem
processTask($data);
// Confirmar sucesso
$msg->ack();
} catch (Exception $e) {
// Rejeitar e reenfileirar em caso de falha
$msg->nack(true);
}
};
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);
echo "Waiting for messages...\n";
while ($channel->is_consuming()) {
$channel->wait();
}

Node.js Producer

const amqp = require('amqplib');
async function sendMessage(queue, message) {
const connection = await amqp.connect('amqp://admin:secret@localhost');
const channel = await connection.createChannel();
await channel.assertQueue(queue, { durable: true });
channel.sendToQueue(queue, Buffer.from(JSON.stringify(message)), {
persistent: true,
contentType: 'application/json',
});
console.log('Sent:', message);
await channel.close();
await connection.close();
}
sendMessage('task_queue', { task: 'process_order', order_id: 123 });

Node.js Consumer

const amqp = require('amqplib');
async function startConsumer(queue) {
const connection = await amqp.connect('amqp://admin:secret@localhost');
const channel = await connection.createChannel();
await channel.assertQueue(queue, { durable: true });
// Fair dispatch
channel.prefetch(1);
console.log('Waiting for messages...');
channel.consume(queue, async (msg) => {
const data = JSON.parse(msg.content.toString());
console.log('Received:', data);
try {
await processTask(data);
channel.ack(msg);
} catch (error) {
console.error('Processing failed:', error);
channel.nack(msg, false, true); // Reenfileirar
}
});
}
startConsumer('task_queue');

Padrão Publish/Subscribe

Fanout Exchange

// Publisher
$channel->exchange_declare('notifications', 'fanout', false, true, false);
$msg = new AMQPMessage(json_encode(['event' => 'user_created', 'user_id' => 123]));
$channel->basic_publish($msg, 'notifications', '');
// Subscriber 1: Notificações por e-mail
$channel->queue_declare('email_notifications', false, true, false, false);
$channel->queue_bind('email_notifications', 'notifications');
// Subscriber 2: Notificações por SMS
$channel->queue_declare('sms_notifications', false, true, false, false);
$channel->queue_bind('sms_notifications', 'notifications');

Encaminhamento Baseado em Tópicos

Topic Exchange

// Declarar exchange
$channel->exchange_declare('logs', 'topic', false, true, false);
// Publisher — enviar com routing key
$channel->basic_publish($msg, 'logs', 'order.created');
$channel->basic_publish($msg, 'logs', 'order.payment.failed');
$channel->basic_publish($msg, 'logs', 'user.login');
// Subscriber — todos os eventos de encomendas
$channel->queue_declare('order_events', false, true, false, false);
$channel->queue_bind('order_events', 'logs', 'order.*');
// Subscriber — todas as falhas de pagamento
$channel->queue_declare('payment_failures', false, true, false, false);
$channel->queue_bind('payment_failures', 'logs', '*.payment.failed');
// Subscriber — todos os eventos
$channel->queue_declare('all_events', false, true, false, false);
$channel->queue_bind('all_events', 'logs', '#');

Padrão Request/Reply

// Client (requerente)
class RpcClient {
private $connection;
private $channel;
private $callbackQueue;
private $response;
private $correlationId;
public function __construct() {
$this->connection = new AMQPStreamConnection('localhost', 5672, 'admin', 'secret');
$this->channel = $this->connection->channel();
// Criar callback queue exclusiva
list($this->callbackQueue,,) = $this->channel->queue_declare(
'', false, false, true, false
);
$this->channel->basic_consume(
$this->callbackQueue, '', false, true, false, false,
function ($msg) {
if ($msg->get('correlation_id') === $this->correlationId) {
$this->response = $msg->body;
}
}
);
}
public function call($data) {
$this->response = null;
$this->correlationId = uniqid();
$msg = new AMQPMessage(json_encode($data), [
'correlation_id' => $this->correlationId,
'reply_to' => $this->callbackQueue,
]);
$this->channel->basic_publish($msg, '', 'rpc_queue');
// Aguardar resposta
while (!$this->response) {
$this->channel->wait();
}
return json_decode($this->response, true);
}
}
// Server (respondente)
$channel->queue_declare('rpc_queue', false, true, false, false);
$callback = function ($msg) use ($channel) {
$data = json_decode($msg->body, true);
// Processar pedido
$result = processRequest($data);
// Enviar resposta
$response = new AMQPMessage(json_encode($result), [
'correlation_id' => $msg->get('correlation_id'),
]);
$channel->basic_publish($response, '', $msg->get('reply_to'));
$msg->ack();
};
$channel->basic_consume('rpc_queue', '', false, false, false, false, $callback);

Dead Letter Queue

Tratar mensagens com falhas:

// Queue principal com dead letter exchange
$args = new AMQPTable([
'x-dead-letter-exchange' => 'dlx',
'x-dead-letter-routing-key' => 'failed',
'x-message-ttl' => 60000, // Opcional: expirar após 60 s
]);
$channel->queue_declare('main_queue', false, true, false, false, $args);
// Dead letter exchange e queue
$channel->exchange_declare('dlx', 'direct', false, true, false);
$channel->queue_declare('failed_messages', false, true, false, false);
$channel->queue_bind('failed_messages', 'dlx', 'failed');
// Consumer que rejeita mensagens
$callback = function ($msg) {
try {
processMessage($msg);
$msg->ack();
} catch (Exception $e) {
// A mensagem vai para a dead letter queue
$msg->nack(false, false); // Não reenfileirar
}
};

Retry com Exponential Backoff

// Criar queues com atraso
$delays = [1000, 5000, 30000]; // 1 s, 5 s, 30 s
foreach ($delays as $index => $delay) {
$args = new AMQPTable([
'x-dead-letter-exchange' => '',
'x-dead-letter-routing-key' => 'main_queue',
'x-message-ttl' => $delay,
]);
$channel->queue_declare("retry_queue_$index", false, true, false, false, $args);
}
// Consumer com lógica de retry
$callback = function ($msg) use ($channel) {
$retryCount = $msg->has('application_headers')
? ($msg->get('application_headers')['x-retry-count'] ?? 0)
: 0;
try {
processMessage($msg);
$msg->ack();
} catch (Exception $e) {
$msg->ack();
if ($retryCount < 3) {
// Retry com atraso
$retryMsg = new AMQPMessage($msg->body, [
'application_headers' => new AMQPTable([
'x-retry-count' => $retryCount + 1,
]),
]);
$channel->basic_publish($retryMsg, '', "retry_queue_$retryCount");
} else {
// Enviar para a dead letter queue
$channel->basic_publish(new AMQPMessage($msg->body), '', 'failed_messages');
}
}
};

Gestão de Ligações

class RabbitMQConnection {
private static $connection = null;
private static $channel = null;
public static function getChannel() {
if (self::$channel === null || !self::$channel->is_open()) {
self::$connection = new AMQPStreamConnection(
getenv('RABBITMQ_HOST'),
getenv('RABBITMQ_PORT'),
getenv('RABBITMQ_USER'),
getenv('RABBITMQ_PASS'),
'/',
false,
'AMQPLAIN',
null,
'en_US',
3.0, // Timeout de ligação
10.0, // Timeout de leitura/escrita
null,
true, // Keepalive
60 // Heartbeat
);
self::$channel = self::$connection->channel();
}
return self::$channel;
}
public static function close() {
if (self::$channel !== null) {
self::$channel->close();
}
if (self::$connection !== null) {
self::$connection->close();
}
}
}
// Registar handler de encerramento
register_shutdown_function([RabbitMQConnection::class, 'close']);

Monitorização

Management API

# Listar queues
curl -u admin:secret http://localhost:15672/api/queues
# Detalhes da queue
curl -u admin:secret http://localhost:15672/api/queues/%2F/task_queue
# Publicar mensagem (testes)
curl -u admin:secret -X POST \
-H "content-type:application/json" \
-d '{"properties":{},"routing_key":"task_queue","payload":"test","payload_encoding":"string"}' \
http://localhost:15672/api/exchanges/%2F/amq.default/publish

Boas Práticas

  1. Use queues duráveis e mensagens persistentes para dados importantes
  2. Implemente dead letter queues para mensagens com falhas
  3. Use prefetch para controlar a carga do consumer
  4. Confirme (acknowledge) as mensagens apenas após processamento bem-sucedido
  5. Trate falhas de ligação com lógica de reconexão
  6. Monitorize a profundidade da queue para detetar problemas de processamento
  7. Use routing keys significativas para topic exchanges

Conclusão

O RabbitMQ fornece filas de mensagens fiáveis para construir sistemas distribuídos. Use direct exchanges para messaging ponto-a-ponto, fanout para difusão e topics para encaminhamento flexível. Implemente dead letter queues e lógica de retry para resiliência. Os padrões neste guia ajudam a construir sistemas de messaging escaláveis e tolerantes a falhas.

 
 
 
Языки
Темы
Copyright © 1999 — 2026
ZK Interactive