Введение
Apache Kafka — это распределённая платформа для потоковой передачи событий, предназначенная для высокопроизводительных, отказоустойчивых конвейеров данных. В отличие от традиционных очередей сообщений Kafka хранит сообщения в течение настраиваемых периодов, что позволяет выполнять повторное воспроизведение (replay) и поддерживать нескольких потребителей. В этом руководстве рассматривается практическая реализация Kafka для событийно-ориентированных архитектур.
Ключевые концепции
Архитектура Kafka
- Topics: именованные ленты сообщений
- Partitions: подразделы topic для параллелизма
- Producers: публикуют сообщения в topics
- Consumers: читают сообщения из topics
- Consumer Groups: координируют потребление для масштабирования
- Brokers: серверы Kafka в кластере
- Zookeeper/KRaft: координация кластера
Ключевые отличия от традиционных очередей
| Feature | Kafka | Traditional Queue |
|---|
| Message Retention | по времени (настраиваемо) | до потребления |
| Multiple Consumers | да (consumer groups) | обычно нет |
| Replay | да | нет |
| Ordering | на уровне partition | на уровне очереди |
| Throughput | очень высокий | умеренный |
Настройка
Установка через Docker
# docker-compose.yml
version: '3.8'
services:
kafka:
image: confluentinc/cp-kafka:7.5.0
ports:
- "9092:9092"
environment:
KAFKA_NODE_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk
kafka-ui:
image: provectuslabs/kafka-ui:latest
ports:
- "8080:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
Создание topics
# Использование команды kafka-topics
docker exec -it kafka kafka-topics \
--bootstrap-server localhost:9092 \
--create \
--topic orders \
--partitions 6 \
--replication-factor 1
# Список topics
docker exec -it kafka kafka-topics \
--bootstrap-server localhost:9092 \
--list
# Описание topic
docker exec -it kafka kafka-topics \
--bootstrap-server localhost:9092 \
--describe \
--topic orders
PHP с php-rdkafka
Установка
# Установка librdkafka
sudo apt install librdkafka-dev
# Установка расширения PHP
pecl install rdkafka
# Добавить в php.ini
echo "extension=rdkafka.so" >> /etc/php/8.2/cli/php.ini
Producer
<?php
$conf = new RdKafka\Conf();
$conf->set('bootstrap.servers', 'localhost:9092');
$conf->set('client.id', 'order-service');
// Callback отчёта о доставке
$conf->setDrMsgCb(function ($kafka, $message) {
if ($message->err) {
echo "Delivery failed: " . rd_kafka_err2str($message->err) . "\n";
} else {
echo "Message delivered to partition {$message->partition}\n";
}
});
$producer = new RdKafka\Producer($conf);
$topic = $producer->newTopic('orders');
// Отправить сообщение
$order = [
'order_id' => uniqid(),
'user_id' => 123,
'items' => [
['product_id' => 1, 'quantity' => 2],
],
'total' => 199.99,
'created_at' => date('c'),
];
$topic->produce(
RD_KAFKA_PARTITION_UA, // Автоматическое назначение partitions
0, // Флаги
json_encode($order), // Payload сообщения
$order['order_id'] // Key (для partitioning)
);
// Ожидать доставку
$producer->flush(10000);
Consumer
<?php
$conf = new RdKafka\Conf();
$conf->set('bootstrap.servers', 'localhost:9092');
$conf->set('group.id', 'order-processor');
$conf->set('auto.offset.reset', 'earliest');
$conf->set('enable.auto.commit', 'false');
$consumer = new RdKafka\KafkaConsumer($conf);
$consumer->subscribe(['orders']);
echo "Waiting for messages...\n";
while (true) {
$message = $consumer->consume(10000);
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
$order = json_decode($message->payload, true);
try {
processOrder($order);
// Коммит offset после успешной обработки
$consumer->commit($message);
echo "Processed order: {$order['order_id']}\n";
} catch (Exception $e) {
echo "Error processing order: {$e->getMessage()}\n";
// Не делать commit — будет повтор при перезапуске
}
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
// Конец partition, ожидание новых сообщений
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
// Нет сообщений в пределах timeout
break;
default:
echo "Error: {$message->errstr()}\n";
break;
}
}
Node.js с KafkaJS
Producer
const { Kafka, Partitioners } = require('kafkajs');
const kafka = new Kafka({
clientId: 'order-service',
brokers: ['localhost:9092'],
});
const producer = kafka.producer({
createPartitioner: Partitioners.DefaultPartitioner,
});
async function publishOrder(order) {
await producer.connect();
await producer.send({
topic: 'orders',
messages: [
{
key: order.orderId,
value: JSON.stringify(order),
headers: {
'correlation-id': order.correlationId,
'source': 'web-app',
},
},
],
});
console.log('Order published:', order.orderId);
}
// Пакетная публикация
async function publishOrders(orders) {
await producer.connect();
await producer.sendBatch({
topicMessages: [
{
topic: 'orders',
messages: orders.map(order => ({
key: order.orderId,
value: JSON.stringify(order),
})),
},
],
});
}
Consumer
const { Kafka } = require('kafkajs');
const kafka = new Kafka({
clientId: 'order-processor',
brokers: ['localhost:9092'],
});
const consumer = kafka.consumer({ groupId: 'order-processor-group' });
async function startConsumer() {
await consumer.connect();
await consumer.subscribe({ topic: 'orders', fromBeginning: false });
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const order = JSON.parse(message.value.toString());
console.log({
partition,
offset: message.offset,
key: message.key?.toString(),
orderId: order.orderId,
});
await processOrder(order);
},
});
}
// С пакетной обработкой
async function startBatchConsumer() {
await consumer.connect();
await consumer.subscribe({ topic: 'orders' });
await consumer.run({
eachBatch: async ({ batch, resolveOffset, heartbeat }) => {
for (const message of batch.messages) {
const order = JSON.parse(message.value.toString());
await processOrder(order);
resolveOffset(message.offset);
await heartbeat();
}
},
});
}
startConsumer().catch(console.error);
Паттерны событий
Event Sourcing
// Публикация domain events
async function placeOrder(orderData) {
const orderId = generateId();
// Публикация события OrderPlaced
await producer.send({
topic: 'order-events',
messages: [{
key: orderId,
value: JSON.stringify({
type: 'OrderPlaced',
orderId,
data: orderData,
timestamp: Date.now(),
}),
}],
});
return orderId;
}
async function cancelOrder(orderId, reason) {
await producer.send({
topic: 'order-events',
messages: [{
key: orderId,
value: JSON.stringify({
type: 'OrderCancelled',
orderId,
reason,
timestamp: Date.now(),
}),
}],
});
}
// Потребление и восстановление состояния
async function rebuildOrderState(orderId) {
const events = await getEventsForOrder(orderId);
let state = { status: 'unknown' };
for (const event of events) {
switch (event.type) {
case 'OrderPlaced':
state = { ...event.data, status: 'placed' };
break;
case 'OrderPaid':
state.status = 'paid';
break;
case 'OrderShipped':
state.status = 'shipped';
break;
case 'OrderCancelled':
state.status = 'cancelled';
break;
}
}
return state;
}
Паттерн CQRS
// Обработчик command публикует events
class OrderCommandHandler {
async handle(command) {
switch (command.type) {
case 'PlaceOrder':
return this.handlePlaceOrder(command);
case 'ShipOrder':
return this.handleShipOrder(command);
}
}
async handlePlaceOrder(command) {
// Валидация
// Публикация event
await this.publish('order-events', {
type: 'OrderPlaced',
...command.data,
});
}
}
// Сторона запросов потребляет events и обновляет read model
class OrderProjection {
async processEvent(event) {
switch (event.type) {
case 'OrderPlaced':
await db.orders.insert({
id: event.orderId,
status: 'placed',
...event.data,
});
break;
case 'OrderShipped':
await db.orders.update(
{ id: event.orderId },
{ status: 'shipped' }
);
break;
}
}
}
Обработка ошибок
Dead Letter Topic
const DLT_TOPIC = 'orders-dlt';
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
try {
const order = JSON.parse(message.value.toString());
await processOrder(order);
} catch (error) {
console.error('Processing failed:', error);
// Отправить в dead letter topic
await producer.send({
topic: DLT_TOPIC,
messages: [{
key: message.key,
value: message.value,
headers: {
...message.headers,
'error': error.message,
'original-topic': topic,
'original-partition': partition.toString(),
},
}],
});
}
},
});
Retry с Backoff
async function processWithRetry(order, maxRetries = 3) {
let lastError;
for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
await processOrder(order);
return;
} catch (error) {
lastError = error;
const delay = Math.pow(2, attempt) * 1000;
await sleep(delay);
}
}
throw lastError;
}
Мониторинг
Consumer Lag
# Проверить lag consumer group
docker exec -it kafka kafka-consumer-groups \
--bootstrap-server localhost:9092 \
--describe \
--group order-processor-group
Метрики с KafkaJS
const consumer = kafka.consumer({
groupId: 'my-group',
});
consumer.on('consumer.heartbeat', ({ timestamp, groupId }) => {
console.log('Heartbeat:', { timestamp, groupId });
});
consumer.on('consumer.commit_offsets', ({ groupId, topics }) => {
console.log('Committed:', { groupId, topics });
});
consumer.on('consumer.group_join', ({ duration, groupId }) => {
console.log('Joined group:', { duration, groupId });
});
Event-Driven Architecture: мышление Senior-инженера
Event vs. Command
Понимание различий критически важно:
- Command:
ProcessPayment(orderId) — ожидает ответ. Императивно. - Event:
OrderPlaced(orderId) — факт, который произошёл. В прошедшем времени. Можно игнорировать.
Events развязывают сервисы. Если Payment Service недоступен, Order Service не падает — он просто публикует event и продолжает работу.
Паттерн Outbox (гарантированная доставка)
Проблема: вы сохраняете Order в DB, но Kafka падает до того, как вы публикуете Event. Данные становятся несогласованными.
Решение: 1. Начните DB Transaction. 2. Сохраните Order. 3. Сохраните запись Outbox (event_type, payload, status='PENDING') в той же DB transaction. 4. Выполните commit. 5. Отдельный процесс-poller читает таблицу Outbox и публикует в Kafka. 6. Пометьте запись Outbox как SENT.
Это гарантирует, что если заказ сохранён, событие в итоге будет опубликовано.
Идемпотентность: обработка дубликатов
Факт: Kafka гарантирует доставку «At Least Once». Вы обязательно получите одно и то же сообщение дважды — рано или поздно.
Решение: отслеживайте обработанные Message IDs.
async function handleEvent(event) {
// Проверить, не обработано ли уже
const exists = await db.processedEvents.exists(event.id);
if (exists) return; // Уже выполнено
// Логика обработки
await processOrder(event.data);
// Пометить как обработанное
await db.processedEvents.add(event.id);
}
Kafka vs. RabbitMQ: когда выбирать что
| Factor | Kafka | RabbitMQ |
|---|
| Model | Pull-based (Log) | Push-based (Queue) |
| Best for | stream processing, event sourcing | task queues, complex routing |
| Message retention | по времени (replay) | до потребления |
| Throughput | масштабирование до огромных объёмов | умеренный |
Правило Senior: используйте RabbitMQ для задач «сделай эту работу». Используйте Kafka для событий «это произошло», когда нужен replay или несколько consumers.
Лучшие практики
- Выбирайте количество partitions исходя из потребностей параллелизма consumers
- Используйте осмысленные keys для упорядочивания связанных сообщений
- Делайте consumers идемпотентными — сообщения могут доставляться более одного раза
- Мониторьте consumer lag для выявления проблем обработки
- Задавайте подходящий retention исходя из требований к replay
- Используйте consumer groups для масштабирования и отказоустойчивости
- Реализуйте dead letter topics для сообщений, обработка которых завершилась ошибкой
- Используйте паттерн Outbox для гарантированной доставки
- Проектируйте events как факты — прошедшее время, неизменяемость
Заключение
Kafka позволяет строить масштабируемые, отказоустойчивые событийно-ориентированные системы. Senior-инженеры понимают, что Event-Driven Architecture вводит «Eventual Consistency» — пользователь может оформить заказ, но он появится в истории не сразу, а через 2 секунды. Ваша задача — управлять компромиссами между Availability (EDA) и Consistency (Monolith). Используйте корректное разбиение на partitions для параллелизма, consumer groups для масштабирования и реализуйте правильную обработку ошибок с dead letter topics.