About us Guides Projects Contacts
Админка
please wait

Introduction

Apache Kafka is a distributed event streaming platform designed for high-throughput, fault-tolerant data pipelines. Unlike traditional message queues, Kafka retains messages for configurable periods, enabling replay and multiple consumers. This guide covers practical Kafka implementation for event-driven architectures.

Core Concepts

Kafka Architecture

  • Topics: Named feeds of messages
  • Partitions: Topic subdivisions for parallelism
  • Producers: Publish messages to topics
  • Consumers: Read messages from topics
  • Consumer Groups: Coordinate consumption for scaling
  • Brokers: Kafka servers in a cluster
  • ZooKeeper/KRaft: Cluster coordination

Key Differences from Traditional Queues

FeatureKafkaTraditional Queue
Message RetentionTime-based (configurable)Until consumed
Multiple ConsumersYes (consumer groups)Usually no
ReplayYesNo
OrderingPer partitionPer queue
ThroughputVery highModerate

Setup

Docker Installation

# docker-compose.yml
version: '3.8'
services:
kafka:
image: confluentinc/cp-kafka:7.5.0
ports:
- "9092:9092"
environment:
KAFKA_NODE_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk
kafka-ui:
image: provectuslabs/kafka-ui:latest
ports:
- "8080:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092

Create Topics

# Use the kafka-topics command.
docker exec -it kafka kafka-topics \
--bootstrap-server localhost:9092 \
--create \
--topic orders \
--partitions 6 \
--replication-factor 1
# List topics.
docker exec -it kafka kafka-topics \
--bootstrap-server localhost:9092 \
--list
# Describe the topic.
docker exec -it kafka kafka-topics \
--bootstrap-server localhost:9092 \
--describe \
--topic orders

PHP with php-rdkafka

Installation

# Install librdkafka.
sudo apt install librdkafka-dev
# Install the PHP extension.
pecl install rdkafka
# Add to php.ini.
echo "extension=rdkafka.so" >> /etc/php/8.2/cli/php.ini

Producer

<?php
$conf = new RdKafka\Conf();
$conf->set('bootstrap.servers', 'localhost:9092');
$conf->set('client.id', 'order-service');
// Delivery report callback.
$conf->setDrMsgCb(function ($kafka, $message) {
if ($message->err) {
echo "Delivery failed: " . rd_kafka_err2str($message->err) . "\n";
} else {
echo "Message delivered to partition {$message->partition}\n";
}
});
$producer = new RdKafka\Producer($conf);
$topic = $producer->newTopic('orders');
// Produce a message.
$order = [
'order_id' => uniqid(),
'user_id' => 123,
'items' => [
['product_id' => 1, 'quantity' => 2],
],
'total' => 199.99,
'created_at' => date('c'),
];
$topic->produce(
RD_KAFKA_PARTITION_UA, // Automatic partition assignment.
0, // Flags.
json_encode($order), // Message payload.
$order['order_id'] // Key (for partitioning).
);
// Wait for delivery.
$producer->flush(10000);

Consumer

<?php
$conf = new RdKafka\Conf();
$conf->set('bootstrap.servers', 'localhost:9092');
$conf->set('group.id', 'order-processor');
$conf->set('auto.offset.reset', 'earliest');
$conf->set('enable.auto.commit', 'false');
$consumer = new RdKafka\KafkaConsumer($conf);
$consumer->subscribe(['orders']);
echo "Waiting for messages...\n";
while (true) {
$message = $consumer->consume(10000);
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
$order = json_decode($message->payload, true);
try {
processOrder($order);
// Commit the offset after successful processing.
$consumer->commit($message);
echo "Processed order: {$order['order_id']}\n";
} catch (Exception $e) {
echo "Error processing order: {$e->getMessage()}\n";
// Don't commit; will retry on restart.
}
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
// End of partition; waiting for more messages.
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
// No messages within the timeout.
break;
default:
echo "Error: {$message->errstr()}\n";
break;
}
}

Node.js with KafkaJS

Producer

const { Kafka, Partitioners } = require('kafkajs');
const kafka = new Kafka({
clientId: 'order-service',
brokers: ['localhost:9092'],
});
const producer = kafka.producer({
createPartitioner: Partitioners.DefaultPartitioner,
});
async function publishOrder(order) {
await producer.connect();
await producer.send({
topic: 'orders',
messages: [
{
key: order.orderId,
value: JSON.stringify(order),
headers: {
'correlation-id': order.correlationId,
'source': 'web-app',
},
},
],
});
console.log('Order published:', order.orderId);
}
// Batch publishing.
async function publishOrders(orders) {
await producer.connect();
await producer.sendBatch({
topicMessages: [
{
topic: 'orders',
messages: orders.map(order => ({
key: order.orderId,
value: JSON.stringify(order),
})),
},
],
});
}

Consumer

const { Kafka } = require('kafkajs');
const kafka = new Kafka({
clientId: 'order-processor',
brokers: ['localhost:9092'],
});
const consumer = kafka.consumer({ groupId: 'order-processor-group' });
async function startConsumer() {
await consumer.connect();
await consumer.subscribe({ topic: 'orders', fromBeginning: false });
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const order = JSON.parse(message.value.toString());
console.log({
partition,
offset: message.offset,
key: message.key?.toString(),
orderId: order.orderId,
});
await processOrder(order);
},
});
}
// With batch processing.
async function startBatchConsumer() {
await consumer.connect();
await consumer.subscribe({ topic: 'orders' });
await consumer.run({
eachBatch: async ({ batch, resolveOffset, heartbeat }) => {
for (const message of batch.messages) {
const order = JSON.parse(message.value.toString());
await processOrder(order);
resolveOffset(message.offset);
await heartbeat();
}
},
});
}
startConsumer().catch(console.error);

Event Patterns

Event Sourcing

// Publish domain events.
async function placeOrder(orderData) {
const orderId = generateId();
// Publish the OrderPlaced event.
await producer.send({
topic: 'order-events',
messages: [{
key: orderId,
value: JSON.stringify({
type: 'OrderPlaced',
orderId,
data: orderData,
timestamp: Date.now(),
}),
}],
});
return orderId;
}
async function cancelOrder(orderId, reason) {
await producer.send({
topic: 'order-events',
messages: [{
key: orderId,
value: JSON.stringify({
type: 'OrderCancelled',
orderId,
reason,
timestamp: Date.now(),
}),
}],
});
}
// Consume and rebuild state.
async function rebuildOrderState(orderId) {
const events = await getEventsForOrder(orderId);
let state = { status: 'unknown' };
for (const event of events) {
switch (event.type) {
case 'OrderPlaced':
state = { ...event.data, status: 'placed' };
break;
case 'OrderPaid':
state.status = 'paid';
break;
case 'OrderShipped':
state.status = 'shipped';
break;
case 'OrderCancelled':
state.status = 'cancelled';
break;
}
}
return state;
}

CQRS Pattern

// The command handler publishes events.
class OrderCommandHandler {
async handle(command) {
switch (command.type) {
case 'PlaceOrder':
return this.handlePlaceOrder(command);
case 'ShipOrder':
return this.handleShipOrder(command);
}
}
async handlePlaceOrder(command) {
// Validate.
// Publish the event.
await this.publish('order-events', {
type: 'OrderPlaced',
...command.data,
});
}
}
// The query side consumes events and updates the read model.
class OrderProjection {
async processEvent(event) {
switch (event.type) {
case 'OrderPlaced':
await db.orders.insert({
id: event.orderId,
status: 'placed',
...event.data,
});
break;
case 'OrderShipped':
await db.orders.update(
{ id: event.orderId },
{ status: 'shipped' }
);
break;
}
}
}

Error Handling

Dead Letter Topic

const DLT_TOPIC = 'orders-dlt';
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
try {
const order = JSON.parse(message.value.toString());
await processOrder(order);
} catch (error) {
console.error('Processing failed:', error);
// Send to the dead letter topic.
await producer.send({
topic: DLT_TOPIC,
messages: [{
key: message.key,
value: message.value,
headers: {
...message.headers,
'error': error.message,
'original-topic': topic,
'original-partition': partition.toString(),
},
}],
});
}
},
});

Retry with Backoff

async function processWithRetry(order, maxRetries = 3) {
let lastError;
for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
await processOrder(order);
return;
} catch (error) {
lastError = error;
const delay = Math.pow(2, attempt) * 1000;
await sleep(delay);
}
}
throw lastError;
}

Monitoring

Consumer Lag

# Check consumer group lag.
docker exec -it kafka kafka-consumer-groups \
--bootstrap-server localhost:9092 \
--describe \
--group order-processor-group

Metrics with KafkaJS

const consumer = kafka.consumer({
groupId: 'my-group',
});
consumer.on('consumer.heartbeat', ({ timestamp, groupId }) => {
console.log('Heartbeat:', { timestamp, groupId });
});
consumer.on('consumer.commit_offsets', ({ groupId, topics }) => {
console.log('Committed:', { groupId, topics });
});
consumer.on('consumer.group_join', ({ duration, groupId }) => {
console.log('Joined group:', { duration, groupId });
});

Event-Driven Architecture: The Senior Mindset

Event vs. Command

Understanding the difference is crucial:

  • Command: ProcessPayment(orderId) - Expects a response. Imperative.
  • Event: OrderPlaced(orderId) - A fact that happened. Past tense. Ignorable.

Events decouple services. If the Payment Service is down, the Order Service doesn't fail—it just publishes an event and moves on.

The Outbox Pattern (Guaranteed Delivery)

Problem: You save an Order to the DB, but Kafka crashes before you publish the event. The data is now inconsistent.

Solution: 1. Start a DB transaction. 2. Save the Order. 3. Save an Outbox record (event_type, payload, status='PENDING') in the same DB transaction. 4. Commit. 5. A separate poller process reads the Outbox table and publishes to Kafka. 6. Mark the Outbox record as SENT.

This guarantees that if the order is saved, the event will eventually be published.

Idempotency: Handling Duplicates

Fact: Kafka guarantees "At Least Once" delivery. You will receive the same message twice eventually.

Solution: Track processed message IDs.

async function handleEvent(event) {
// Check if already processed.
const exists = await db.processedEvents.exists(event.id);
if (exists) return; // Already done.
// Processing logic.
await processOrder(event.data);
// Mark as processed.
await db.processedEvents.add(event.id);
}

Kafka vs. RabbitMQ: When to Choose Which

FactorKafkaRabbitMQ
ModelPull-based (Log)Push-based (Queue)
Best forStream processing, event sourcingTask queues, complex routing
Message retentionTime-based (replay)Until consumed
ThroughputMassive scaleModerate

Senior Rule: Use RabbitMQ for "do this job" tasks. Use Kafka for "this happened" events where you need replay or multiple consumers.

Best Practices

  1. Choose partition count based on consumer parallelism needs
  2. Use meaningful keys for related message ordering
  3. Make consumers idempotent - messages may be delivered more than once
  4. Monitor consumer lag to detect processing issues
  5. Set appropriate retention based on replay requirements
  6. Use consumer groups for scaling and fault tolerance
  7. Implement dead letter topics for failed messages
  8. Use the Outbox Pattern for guaranteed delivery
  9. Design events as facts - past tense, immutable

Conclusion

Kafka enables building scalable, fault-tolerant event-driven systems. Senior engineers understand that Event-Driven Architecture introduces "Eventual Consistency"—the user might place an order, but it won't appear in their history for 2 seconds. Your job is to manage the tradeoffs between Availability (EDA) and Consistency (Monolith). Use appropriate partitioning for parallelism, consumer groups for scaling, and implement proper error handling with dead letter topics.

 
 
 
Языки
Темы
Copyright © 1999 — 2026
ZK Interactive