Sobre nós Guias Projetos Contactos
Админка
please wait

Introdução

O Apache Kafka é uma plataforma distribuída de event streaming concebida para pipelines de dados de elevado débito e tolerantes a falhas. Ao contrário das filas de mensagens tradicionais, o Kafka retém mensagens por períodos configuráveis, permitindo replay e múltiplos consumidores. Este guia aborda a implementação prática do Kafka para arquiteturas orientadas a eventos.

Conceitos Fundamentais

Arquitetura do Kafka

  • Topics: Feeds nomeados de mensagens
  • Partitions: Subdivisões de um topic para paralelismo
  • Producers: Publicam mensagens em topics
  • Consumers: Leem mensagens de topics
  • Consumer Groups: Coordenam o consumo para escalabilidade
  • Brokers: Servidores Kafka num cluster
  • Zookeeper/KRaft: Coordenação do cluster

Principais Diferenças face a Filas Tradicionais

FuncionalidadeKafkaFila Tradicional
Retenção de mensagensBaseada no tempo (configurável)Até ser consumida
Múltiplos consumidoresSim (consumer groups)Normalmente não
ReplaySimNão
OrdenaçãoPor partitionPor fila
DébitoMuito elevadoModerado

Configuração

Instalação com Docker

# docker-compose.yml
version: '3.8'
services:
kafka:
image: confluentinc/cp-kafka:7.5.0
ports:
- "9092:9092"
environment:
KAFKA_NODE_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk
kafka-ui:
image: provectuslabs/kafka-ui:latest
ports:
- "8080:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092

Criar Topics

# Utilizar o comando kafka-topics
docker exec -it kafka kafka-topics \
--bootstrap-server localhost:9092 \
--create \
--topic orders \
--partitions 6 \
--replication-factor 1
# Listar topics
docker exec -it kafka kafka-topics \
--bootstrap-server localhost:9092 \
--list
# Descrever topic
docker exec -it kafka kafka-topics \
--bootstrap-server localhost:9092 \
--describe \
--topic orders

PHP com php-rdkafka

Instalação

# Instalar librdkafka
sudo apt install librdkafka-dev
# Instalar a extensão PHP
pecl install rdkafka
# Adicionar ao php.ini
echo "extension=rdkafka.so" >> /etc/php/8.2/cli/php.ini

Producer

<?php
$conf = new RdKafka\Conf();
$conf->set('bootstrap.servers', 'localhost:9092');
$conf->set('client.id', 'order-service');
// Callback do relatório de entrega
$conf->setDrMsgCb(function ($kafka, $message) {
if ($message->err) {
echo "Delivery failed: " . rd_kafka_err2str($message->err) . "\n";
} else {
echo "Message delivered to partition {$message->partition}\n";
}
});
$producer = new RdKafka\Producer($conf);
$topic = $producer->newTopic('orders');
// Produzir mensagem
$order = [
'order_id' => uniqid(),
'user_id' => 123,
'items' => [
['product_id' => 1, 'quantity' => 2],
],
'total' => 199.99,
'created_at' => date('c'),
];
$topic->produce(
RD_KAFKA_PARTITION_UA, // Atribuição automática de partitions
0, // Flags
json_encode($order), // Payload da mensagem
$order['order_id'] // Key (para partitioning)
);
// Aguardar pela entrega
$producer->flush(10000);

Consumer

<?php
$conf = new RdKafka\Conf();
$conf->set('bootstrap.servers', 'localhost:9092');
$conf->set('group.id', 'order-processor');
$conf->set('auto.offset.reset', 'earliest');
$conf->set('enable.auto.commit', 'false');
$consumer = new RdKafka\KafkaConsumer($conf);
$consumer->subscribe(['orders']);
echo "Waiting for messages...\n";
while (true) {
$message = $consumer->consume(10000);
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
$order = json_decode($message->payload, true);
try {
processOrder($order);
// Fazer commit do offset após processamento bem-sucedido
$consumer->commit($message);
echo "Processed order: {$order['order_id']}\n";
} catch (Exception $e) {
echo "Error processing order: {$e->getMessage()}\n";
// Não fazer commit — irá tentar novamente no reinício
}
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
// Fim da partition; a aguardar mais mensagens
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
// Sem mensagens dentro do timeout
break;
default:
echo "Error: {$message->errstr()}\n";
break;
}
}

Node.js com KafkaJS

Producer

const { Kafka, Partitioners } = require('kafkajs');
const kafka = new Kafka({
clientId: 'order-service',
brokers: ['localhost:9092'],
});
const producer = kafka.producer({
createPartitioner: Partitioners.DefaultPartitioner,
});
async function publishOrder(order) {
await producer.connect();
await producer.send({
topic: 'orders',
messages: [
{
key: order.orderId,
value: JSON.stringify(order),
headers: {
'correlation-id': order.correlationId,
'source': 'web-app',
},
},
],
});
console.log('Order published:', order.orderId);
}
// Publicação em lote
async function publishOrders(orders) {
await producer.connect();
await producer.sendBatch({
topicMessages: [
{
topic: 'orders',
messages: orders.map(order => ({
key: order.orderId,
value: JSON.stringify(order),
})),
},
],
});
}

Consumer

const { Kafka } = require('kafkajs');
const kafka = new Kafka({
clientId: 'order-processor',
brokers: ['localhost:9092'],
});
const consumer = kafka.consumer({ groupId: 'order-processor-group' });
async function startConsumer() {
await consumer.connect();
await consumer.subscribe({ topic: 'orders', fromBeginning: false });
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const order = JSON.parse(message.value.toString());
console.log({
partition,
offset: message.offset,
key: message.key?.toString(),
orderId: order.orderId,
});
await processOrder(order);
},
});
}
// Com processamento em lote
async function startBatchConsumer() {
await consumer.connect();
await consumer.subscribe({ topic: 'orders' });
await consumer.run({
eachBatch: async ({ batch, resolveOffset, heartbeat }) => {
for (const message of batch.messages) {
const order = JSON.parse(message.value.toString());
await processOrder(order);
resolveOffset(message.offset);
await heartbeat();
}
},
});
}
startConsumer().catch(console.error);

Padrões de Eventos

Event Sourcing

// Publicar eventos de domínio
async function placeOrder(orderData) {
const orderId = generateId();
// Publicar o evento OrderPlaced
await producer.send({
topic: 'order-events',
messages: [{
key: orderId,
value: JSON.stringify({
type: 'OrderPlaced',
orderId,
data: orderData,
timestamp: Date.now(),
}),
}],
});
return orderId;
}
async function cancelOrder(orderId, reason) {
await producer.send({
topic: 'order-events',
messages: [{
key: orderId,
value: JSON.stringify({
type: 'OrderCancelled',
orderId,
reason,
timestamp: Date.now(),
}),
}],
});
}
// Consumir e reconstruir o estado
async function rebuildOrderState(orderId) {
const events = await getEventsForOrder(orderId);
let state = { status: 'unknown' };
for (const event of events) {
switch (event.type) {
case 'OrderPlaced':
state = { ...event.data, status: 'placed' };
break;
case 'OrderPaid':
state.status = 'paid';
break;
case 'OrderShipped':
state.status = 'shipped';
break;
case 'OrderCancelled':
state.status = 'cancelled';
break;
}
}
return state;
}

Padrão CQRS

// O handler do comando publica eventos
class OrderCommandHandler {
async handle(command) {
switch (command.type) {
case 'PlaceOrder':
return this.handlePlaceOrder(command);
case 'ShipOrder':
return this.handleShipOrder(command);
}
}
async handlePlaceOrder(command) {
// Validar
// Publicar evento
await this.publish('order-events', {
type: 'OrderPlaced',
...command.data,
});
}
}
// O lado de query consome eventos e atualiza o read model
class OrderProjection {
async processEvent(event) {
switch (event.type) {
case 'OrderPlaced':
await db.orders.insert({
id: event.orderId,
status: 'placed',
...event.data,
});
break;
case 'OrderShipped':
await db.orders.update(
{ id: event.orderId },
{ status: 'shipped' }
);
break;
}
}
}

Tratamento de Erros

Dead Letter Topic

const DLT_TOPIC = 'orders-dlt';
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
try {
const order = JSON.parse(message.value.toString());
await processOrder(order);
} catch (error) {
console.error('Processing failed:', error);
// Enviar para dead letter topic
await producer.send({
topic: DLT_TOPIC,
messages: [{
key: message.key,
value: message.value,
headers: {
...message.headers,
'error': error.message,
'original-topic': topic,
'original-partition': partition.toString(),
},
}],
});
}
},
});

Retry com Backoff

async function processWithRetry(order, maxRetries = 3) {
let lastError;
for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
await processOrder(order);
return;
} catch (error) {
lastError = error;
const delay = Math.pow(2, attempt) * 1000;
await sleep(delay);
}
}
throw lastError;
}

Monitorização

Consumer Lag

# Verificar o lag do consumer group
docker exec -it kafka kafka-consumer-groups \
--bootstrap-server localhost:9092 \
--describe \
--group order-processor-group

Métricas com KafkaJS

const consumer = kafka.consumer({
groupId: 'my-group',
});
consumer.on('consumer.heartbeat', ({ timestamp, groupId }) => {
console.log('Heartbeat:', { timestamp, groupId });
});
consumer.on('consumer.commit_offsets', ({ groupId, topics }) => {
console.log('Committed:', { groupId, topics });
});
consumer.on('consumer.group_join', ({ duration, groupId }) => {
console.log('Joined group:', { duration, groupId });
});

Arquitetura Orientada a Eventos: A Mentalidade de um Sénior

Evento vs. Comando

Compreender a diferença é crucial:

  • Comando: ProcessPayment(orderId) - Espera uma resposta. Imperativo.
  • Evento: OrderPlaced(orderId) - Um facto que aconteceu. No passado. Ignorável.

Os eventos desacoplam serviços. Se o Payment Service estiver em baixo, o Order Service não falha — limita-se a publicar um evento e segue em frente.

O Padrão Outbox (Entrega Garantida)

Problema: Guarda um Order na BD, mas o Kafka falha antes de publicar o Event. Os dados ficam inconsistentes.

Solução: 1. Iniciar uma transação na BD. 2. Guardar o Order. 3. Guardar um registo Outbox (event_type, payload, status='PENDING') na mesma transação da BD. 4. Commit. 5. Um processo de polling separado lê a tabela Outbox e publica no Kafka. 6. Marcar o registo Outbox como SENT.

Isto garante que, se a encomenda for guardada, o evento acabará por ser publicado.

Idempotência: Lidar com Duplicados

Facto: O Kafka garante entrega «At Least Once». Vai receber a mesma mensagem duas vezes, mais cedo ou mais tarde.

Solução: Acompanhar os IDs das mensagens processadas.

async function handleEvent(event) {
// Verificar se já foi processado
const exists = await db.processedEvents.exists(event.id);
if (exists) return; // Já concluído
// Lógica de processamento
await processOrder(event.data);
// Marcar como processado
await db.processedEvents.add(event.id);
}

Kafka vs. RabbitMQ: Quando Escolher Qual

FatorKafkaRabbitMQ
ModeloPull-based (Log)Push-based (Queue)
Melhor paraStream processing, event sourcingFilas de tarefas, routing complexo
Retenção de mensagensBaseada no tempo (replay)Até ser consumida
DébitoEscala massivaModerado

Regra Sénior: Use RabbitMQ para tarefas «faça este trabalho». Use Kafka para eventos «isto aconteceu» quando precisa de replay ou de múltiplos consumidores.

Boas Práticas

  1. Escolha o número de partitions com base nas necessidades de paralelismo dos consumers
  2. Use keys significativas para ordenação de mensagens relacionadas
  3. Torne os consumers idempotentes - as mensagens podem ser entregues mais do que uma vez
  4. Monitorize o consumer lag para detetar problemas de processamento
  5. Defina uma retenção adequada com base nos requisitos de replay
  6. Use consumer groups para escalabilidade e tolerância a falhas
  7. Implemente dead letter topics para mensagens com falha
  8. Use o Padrão Outbox para entrega garantida
  9. Desenhe eventos como factos - no passado, imutáveis

Conclusão

O Kafka permite construir sistemas orientados a eventos escaláveis e tolerantes a falhas. Engenheiros sénior compreendem que a Arquitetura Orientada a Eventos introduz «Eventual Consistency» — o utilizador pode fazer uma encomenda, mas esta pode não aparecer no histórico durante 2 segundos. O seu trabalho é gerir os trade-offs entre Availability (EDA) e Consistency (Monolith). Use partitioning adequado para paralelismo, consumer groups para escalabilidade e implemente um tratamento de erros apropriado com dead letter topics.

 
 
 
Языки
Темы
Copyright © 1999 — 2026
ZK Interactive