Introduction
Apache Kafka is a distributed event streaming platform designed for high-throughput, fault-tolerant data pipelines. Unlike traditional message queues, Kafka retains messages for configurable periods, enabling replay and multiple consumers. This guide covers practical Kafka implementation for event-driven architectures.
Core Concepts
Kafka Architecture
- Topics: Named feeds of messages
- Partitions: Topic subdivisions for parallelism
- Producers: Publish messages to topics
- Consumers: Read messages from topics
- Consumer Groups: Coordinate consumption for scaling
- Brokers: Kafka servers in a cluster
- ZooKeeper/KRaft: Cluster coordination
Key Differences from Traditional Queues
| Feature | Kafka | Traditional Queue |
|---|
| Message Retention | Time-based (configurable) | Until consumed |
| Multiple Consumers | Yes (consumer groups) | Usually no |
| Replay | Yes | No |
| Ordering | Per partition | Per queue |
| Throughput | Very high | Moderate |
Setup
Docker Installation
# docker-compose.yml
version: '3.8'
services:
kafka:
image: confluentinc/cp-kafka:7.5.0
ports:
- "9092:9092"
environment:
KAFKA_NODE_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk
kafka-ui:
image: provectuslabs/kafka-ui:latest
ports:
- "8080:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
Create Topics
# Use the kafka-topics command.
docker exec -it kafka kafka-topics \
--bootstrap-server localhost:9092 \
--create \
--topic orders \
--partitions 6 \
--replication-factor 1
# List topics.
docker exec -it kafka kafka-topics \
--bootstrap-server localhost:9092 \
--list
# Describe the topic.
docker exec -it kafka kafka-topics \
--bootstrap-server localhost:9092 \
--describe \
--topic orders
PHP with php-rdkafka
Installation
# Install librdkafka.
sudo apt install librdkafka-dev
# Install the PHP extension.
pecl install rdkafka
# Add to php.ini.
echo "extension=rdkafka.so" >> /etc/php/8.2/cli/php.ini
Producer
<?php
$conf = new RdKafka\Conf();
$conf->set('bootstrap.servers', 'localhost:9092');
$conf->set('client.id', 'order-service');
// Delivery report callback.
$conf->setDrMsgCb(function ($kafka, $message) {
if ($message->err) {
echo "Delivery failed: " . rd_kafka_err2str($message->err) . "\n";
} else {
echo "Message delivered to partition {$message->partition}\n";
}
});
$producer = new RdKafka\Producer($conf);
$topic = $producer->newTopic('orders');
// Produce a message.
$order = [
'order_id' => uniqid(),
'user_id' => 123,
'items' => [
['product_id' => 1, 'quantity' => 2],
],
'total' => 199.99,
'created_at' => date('c'),
];
$topic->produce(
RD_KAFKA_PARTITION_UA, // Automatic partition assignment.
0, // Flags.
json_encode($order), // Message payload.
$order['order_id'] // Key (for partitioning).
);
// Wait for delivery.
$producer->flush(10000);
Consumer
<?php
$conf = new RdKafka\Conf();
$conf->set('bootstrap.servers', 'localhost:9092');
$conf->set('group.id', 'order-processor');
$conf->set('auto.offset.reset', 'earliest');
$conf->set('enable.auto.commit', 'false');
$consumer = new RdKafka\KafkaConsumer($conf);
$consumer->subscribe(['orders']);
echo "Waiting for messages...\n";
while (true) {
$message = $consumer->consume(10000);
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
$order = json_decode($message->payload, true);
try {
processOrder($order);
// Commit the offset after successful processing.
$consumer->commit($message);
echo "Processed order: {$order['order_id']}\n";
} catch (Exception $e) {
echo "Error processing order: {$e->getMessage()}\n";
// Don't commit; will retry on restart.
}
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
// End of partition; waiting for more messages.
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
// No messages within the timeout.
break;
default:
echo "Error: {$message->errstr()}\n";
break;
}
}
Node.js with KafkaJS
Producer
const { Kafka, Partitioners } = require('kafkajs');
const kafka = new Kafka({
clientId: 'order-service',
brokers: ['localhost:9092'],
});
const producer = kafka.producer({
createPartitioner: Partitioners.DefaultPartitioner,
});
async function publishOrder(order) {
await producer.connect();
await producer.send({
topic: 'orders',
messages: [
{
key: order.orderId,
value: JSON.stringify(order),
headers: {
'correlation-id': order.correlationId,
'source': 'web-app',
},
},
],
});
console.log('Order published:', order.orderId);
}
// Batch publishing.
async function publishOrders(orders) {
await producer.connect();
await producer.sendBatch({
topicMessages: [
{
topic: 'orders',
messages: orders.map(order => ({
key: order.orderId,
value: JSON.stringify(order),
})),
},
],
});
}
Consumer
const { Kafka } = require('kafkajs');
const kafka = new Kafka({
clientId: 'order-processor',
brokers: ['localhost:9092'],
});
const consumer = kafka.consumer({ groupId: 'order-processor-group' });
async function startConsumer() {
await consumer.connect();
await consumer.subscribe({ topic: 'orders', fromBeginning: false });
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const order = JSON.parse(message.value.toString());
console.log({
partition,
offset: message.offset,
key: message.key?.toString(),
orderId: order.orderId,
});
await processOrder(order);
},
});
}
// With batch processing.
async function startBatchConsumer() {
await consumer.connect();
await consumer.subscribe({ topic: 'orders' });
await consumer.run({
eachBatch: async ({ batch, resolveOffset, heartbeat }) => {
for (const message of batch.messages) {
const order = JSON.parse(message.value.toString());
await processOrder(order);
resolveOffset(message.offset);
await heartbeat();
}
},
});
}
startConsumer().catch(console.error);
Event Patterns
Event Sourcing
// Publish domain events.
async function placeOrder(orderData) {
const orderId = generateId();
// Publish the OrderPlaced event.
await producer.send({
topic: 'order-events',
messages: [{
key: orderId,
value: JSON.stringify({
type: 'OrderPlaced',
orderId,
data: orderData,
timestamp: Date.now(),
}),
}],
});
return orderId;
}
async function cancelOrder(orderId, reason) {
await producer.send({
topic: 'order-events',
messages: [{
key: orderId,
value: JSON.stringify({
type: 'OrderCancelled',
orderId,
reason,
timestamp: Date.now(),
}),
}],
});
}
// Consume and rebuild state.
async function rebuildOrderState(orderId) {
const events = await getEventsForOrder(orderId);
let state = { status: 'unknown' };
for (const event of events) {
switch (event.type) {
case 'OrderPlaced':
state = { ...event.data, status: 'placed' };
break;
case 'OrderPaid':
state.status = 'paid';
break;
case 'OrderShipped':
state.status = 'shipped';
break;
case 'OrderCancelled':
state.status = 'cancelled';
break;
}
}
return state;
}
CQRS Pattern
// The command handler publishes events.
class OrderCommandHandler {
async handle(command) {
switch (command.type) {
case 'PlaceOrder':
return this.handlePlaceOrder(command);
case 'ShipOrder':
return this.handleShipOrder(command);
}
}
async handlePlaceOrder(command) {
// Validate.
// Publish the event.
await this.publish('order-events', {
type: 'OrderPlaced',
...command.data,
});
}
}
// The query side consumes events and updates the read model.
class OrderProjection {
async processEvent(event) {
switch (event.type) {
case 'OrderPlaced':
await db.orders.insert({
id: event.orderId,
status: 'placed',
...event.data,
});
break;
case 'OrderShipped':
await db.orders.update(
{ id: event.orderId },
{ status: 'shipped' }
);
break;
}
}
}
Error Handling
Dead Letter Topic
const DLT_TOPIC = 'orders-dlt';
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
try {
const order = JSON.parse(message.value.toString());
await processOrder(order);
} catch (error) {
console.error('Processing failed:', error);
// Send to the dead letter topic.
await producer.send({
topic: DLT_TOPIC,
messages: [{
key: message.key,
value: message.value,
headers: {
...message.headers,
'error': error.message,
'original-topic': topic,
'original-partition': partition.toString(),
},
}],
});
}
},
});
Retry with Backoff
async function processWithRetry(order, maxRetries = 3) {
let lastError;
for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
await processOrder(order);
return;
} catch (error) {
lastError = error;
const delay = Math.pow(2, attempt) * 1000;
await sleep(delay);
}
}
throw lastError;
}
Monitoring
Consumer Lag
# Check consumer group lag.
docker exec -it kafka kafka-consumer-groups \
--bootstrap-server localhost:9092 \
--describe \
--group order-processor-group
Metrics with KafkaJS
const consumer = kafka.consumer({
groupId: 'my-group',
});
consumer.on('consumer.heartbeat', ({ timestamp, groupId }) => {
console.log('Heartbeat:', { timestamp, groupId });
});
consumer.on('consumer.commit_offsets', ({ groupId, topics }) => {
console.log('Committed:', { groupId, topics });
});
consumer.on('consumer.group_join', ({ duration, groupId }) => {
console.log('Joined group:', { duration, groupId });
});
Event-Driven Architecture: The Senior Mindset
Event vs. Command
Understanding the difference is crucial:
- Command:
ProcessPayment(orderId) - Expects a response. Imperative. - Event:
OrderPlaced(orderId) - A fact that happened. Past tense. Ignorable.
Events decouple services. If the Payment Service is down, the Order Service doesn't fail—it just publishes an event and moves on.
The Outbox Pattern (Guaranteed Delivery)
Problem: You save an Order to the DB, but Kafka crashes before you publish the event. The data is now inconsistent.
Solution: 1. Start a DB transaction. 2. Save the Order. 3. Save an Outbox record (event_type, payload, status='PENDING') in the same DB transaction. 4. Commit. 5. A separate poller process reads the Outbox table and publishes to Kafka. 6. Mark the Outbox record as SENT.
This guarantees that if the order is saved, the event will eventually be published.
Idempotency: Handling Duplicates
Fact: Kafka guarantees "At Least Once" delivery. You will receive the same message twice eventually.
Solution: Track processed message IDs.
async function handleEvent(event) {
// Check if already processed.
const exists = await db.processedEvents.exists(event.id);
if (exists) return; // Already done.
// Processing logic.
await processOrder(event.data);
// Mark as processed.
await db.processedEvents.add(event.id);
}
Kafka vs. RabbitMQ: When to Choose Which
| Factor | Kafka | RabbitMQ |
|---|
| Model | Pull-based (Log) | Push-based (Queue) |
| Best for | Stream processing, event sourcing | Task queues, complex routing |
| Message retention | Time-based (replay) | Until consumed |
| Throughput | Massive scale | Moderate |
Senior Rule: Use RabbitMQ for "do this job" tasks. Use Kafka for "this happened" events where you need replay or multiple consumers.
Best Practices
- Choose partition count based on consumer parallelism needs
- Use meaningful keys for related message ordering
- Make consumers idempotent - messages may be delivered more than once
- Monitor consumer lag to detect processing issues
- Set appropriate retention based on replay requirements
- Use consumer groups for scaling and fault tolerance
- Implement dead letter topics for failed messages
- Use the Outbox Pattern for guaranteed delivery
- Design events as facts - past tense, immutable
Conclusion
Kafka enables building scalable, fault-tolerant event-driven systems. Senior engineers understand that Event-Driven Architecture introduces "Eventual Consistency"—the user might place an order, but it won't appear in their history for 2 seconds. Your job is to manage the tradeoffs between Availability (EDA) and Consistency (Monolith). Use appropriate partitioning for parallelism, consumer groups for scaling, and implement proper error handling with dead letter topics.