О нас Руководства Проекты Контакты
Админка
пожалуйста подождите

Введение

Apache Kafka — это распределённая платформа для потоковой передачи событий, предназначенная для высокопроизводительных, отказоустойчивых конвейеров данных. В отличие от традиционных очередей сообщений Kafka хранит сообщения в течение настраиваемых периодов, что позволяет выполнять повторное воспроизведение (replay) и поддерживать нескольких потребителей. В этом руководстве рассматривается практическая реализация Kafka для событийно-ориентированных архитектур.

Ключевые концепции

Архитектура Kafka

  • Topics: именованные ленты сообщений
  • Partitions: подразделы topic для параллелизма
  • Producers: публикуют сообщения в topics
  • Consumers: читают сообщения из topics
  • Consumer Groups: координируют потребление для масштабирования
  • Brokers: серверы Kafka в кластере
  • Zookeeper/KRaft: координация кластера

Ключевые отличия от традиционных очередей

FeatureKafkaTraditional Queue
Message Retentionпо времени (настраиваемо)до потребления
Multiple Consumersда (consumer groups)обычно нет
Replayданет
Orderingна уровне partitionна уровне очереди
Throughputочень высокийумеренный

Настройка

Установка через Docker

# docker-compose.yml
version: '3.8'
services:
kafka:
image: confluentinc/cp-kafka:7.5.0
ports:
- "9092:9092"
environment:
KAFKA_NODE_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk
kafka-ui:
image: provectuslabs/kafka-ui:latest
ports:
- "8080:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092

Создание topics

# Использование команды kafka-topics
docker exec -it kafka kafka-topics \
--bootstrap-server localhost:9092 \
--create \
--topic orders \
--partitions 6 \
--replication-factor 1
# Список topics
docker exec -it kafka kafka-topics \
--bootstrap-server localhost:9092 \
--list
# Описание topic
docker exec -it kafka kafka-topics \
--bootstrap-server localhost:9092 \
--describe \
--topic orders

PHP с php-rdkafka

Установка

# Установка librdkafka
sudo apt install librdkafka-dev
# Установка расширения PHP
pecl install rdkafka
# Добавить в php.ini
echo "extension=rdkafka.so" >> /etc/php/8.2/cli/php.ini

Producer

<?php
$conf = new RdKafka\Conf();
$conf->set('bootstrap.servers', 'localhost:9092');
$conf->set('client.id', 'order-service');
// Callback отчёта о доставке
$conf->setDrMsgCb(function ($kafka, $message) {
if ($message->err) {
echo "Delivery failed: " . rd_kafka_err2str($message->err) . "\n";
} else {
echo "Message delivered to partition {$message->partition}\n";
}
});
$producer = new RdKafka\Producer($conf);
$topic = $producer->newTopic('orders');
// Отправить сообщение
$order = [
'order_id' => uniqid(),
'user_id' => 123,
'items' => [
['product_id' => 1, 'quantity' => 2],
],
'total' => 199.99,
'created_at' => date('c'),
];
$topic->produce(
RD_KAFKA_PARTITION_UA, // Автоматическое назначение partitions
0, // Флаги
json_encode($order), // Payload сообщения
$order['order_id'] // Key (для partitioning)
);
// Ожидать доставку
$producer->flush(10000);

Consumer

<?php
$conf = new RdKafka\Conf();
$conf->set('bootstrap.servers', 'localhost:9092');
$conf->set('group.id', 'order-processor');
$conf->set('auto.offset.reset', 'earliest');
$conf->set('enable.auto.commit', 'false');
$consumer = new RdKafka\KafkaConsumer($conf);
$consumer->subscribe(['orders']);
echo "Waiting for messages...\n";
while (true) {
$message = $consumer->consume(10000);
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
$order = json_decode($message->payload, true);
try {
processOrder($order);
// Коммит offset после успешной обработки
$consumer->commit($message);
echo "Processed order: {$order['order_id']}\n";
} catch (Exception $e) {
echo "Error processing order: {$e->getMessage()}\n";
// Не делать commit — будет повтор при перезапуске
}
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
// Конец partition, ожидание новых сообщений
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
// Нет сообщений в пределах timeout
break;
default:
echo "Error: {$message->errstr()}\n";
break;
}
}

Node.js с KafkaJS

Producer

const { Kafka, Partitioners } = require('kafkajs');
const kafka = new Kafka({
clientId: 'order-service',
brokers: ['localhost:9092'],
});
const producer = kafka.producer({
createPartitioner: Partitioners.DefaultPartitioner,
});
async function publishOrder(order) {
await producer.connect();
await producer.send({
topic: 'orders',
messages: [
{
key: order.orderId,
value: JSON.stringify(order),
headers: {
'correlation-id': order.correlationId,
'source': 'web-app',
},
},
],
});
console.log('Order published:', order.orderId);
}
// Пакетная публикация
async function publishOrders(orders) {
await producer.connect();
await producer.sendBatch({
topicMessages: [
{
topic: 'orders',
messages: orders.map(order => ({
key: order.orderId,
value: JSON.stringify(order),
})),
},
],
});
}

Consumer

const { Kafka } = require('kafkajs');
const kafka = new Kafka({
clientId: 'order-processor',
brokers: ['localhost:9092'],
});
const consumer = kafka.consumer({ groupId: 'order-processor-group' });
async function startConsumer() {
await consumer.connect();
await consumer.subscribe({ topic: 'orders', fromBeginning: false });
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const order = JSON.parse(message.value.toString());
console.log({
partition,
offset: message.offset,
key: message.key?.toString(),
orderId: order.orderId,
});
await processOrder(order);
},
});
}
// С пакетной обработкой
async function startBatchConsumer() {
await consumer.connect();
await consumer.subscribe({ topic: 'orders' });
await consumer.run({
eachBatch: async ({ batch, resolveOffset, heartbeat }) => {
for (const message of batch.messages) {
const order = JSON.parse(message.value.toString());
await processOrder(order);
resolveOffset(message.offset);
await heartbeat();
}
},
});
}
startConsumer().catch(console.error);

Паттерны событий

Event Sourcing

// Публикация domain events
async function placeOrder(orderData) {
const orderId = generateId();
// Публикация события OrderPlaced
await producer.send({
topic: 'order-events',
messages: [{
key: orderId,
value: JSON.stringify({
type: 'OrderPlaced',
orderId,
data: orderData,
timestamp: Date.now(),
}),
}],
});
return orderId;
}
async function cancelOrder(orderId, reason) {
await producer.send({
topic: 'order-events',
messages: [{
key: orderId,
value: JSON.stringify({
type: 'OrderCancelled',
orderId,
reason,
timestamp: Date.now(),
}),
}],
});
}
// Потребление и восстановление состояния
async function rebuildOrderState(orderId) {
const events = await getEventsForOrder(orderId);
let state = { status: 'unknown' };
for (const event of events) {
switch (event.type) {
case 'OrderPlaced':
state = { ...event.data, status: 'placed' };
break;
case 'OrderPaid':
state.status = 'paid';
break;
case 'OrderShipped':
state.status = 'shipped';
break;
case 'OrderCancelled':
state.status = 'cancelled';
break;
}
}
return state;
}

Паттерн CQRS

// Обработчик command публикует events
class OrderCommandHandler {
async handle(command) {
switch (command.type) {
case 'PlaceOrder':
return this.handlePlaceOrder(command);
case 'ShipOrder':
return this.handleShipOrder(command);
}
}
async handlePlaceOrder(command) {
// Валидация
// Публикация event
await this.publish('order-events', {
type: 'OrderPlaced',
...command.data,
});
}
}
// Сторона запросов потребляет events и обновляет read model
class OrderProjection {
async processEvent(event) {
switch (event.type) {
case 'OrderPlaced':
await db.orders.insert({
id: event.orderId,
status: 'placed',
...event.data,
});
break;
case 'OrderShipped':
await db.orders.update(
{ id: event.orderId },
{ status: 'shipped' }
);
break;
}
}
}

Обработка ошибок

Dead Letter Topic

const DLT_TOPIC = 'orders-dlt';
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
try {
const order = JSON.parse(message.value.toString());
await processOrder(order);
} catch (error) {
console.error('Processing failed:', error);
// Отправить в dead letter topic
await producer.send({
topic: DLT_TOPIC,
messages: [{
key: message.key,
value: message.value,
headers: {
...message.headers,
'error': error.message,
'original-topic': topic,
'original-partition': partition.toString(),
},
}],
});
}
},
});

Retry с Backoff

async function processWithRetry(order, maxRetries = 3) {
let lastError;
for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
await processOrder(order);
return;
} catch (error) {
lastError = error;
const delay = Math.pow(2, attempt) * 1000;
await sleep(delay);
}
}
throw lastError;
}

Мониторинг

Consumer Lag

# Проверить lag consumer group
docker exec -it kafka kafka-consumer-groups \
--bootstrap-server localhost:9092 \
--describe \
--group order-processor-group

Метрики с KafkaJS

const consumer = kafka.consumer({
groupId: 'my-group',
});
consumer.on('consumer.heartbeat', ({ timestamp, groupId }) => {
console.log('Heartbeat:', { timestamp, groupId });
});
consumer.on('consumer.commit_offsets', ({ groupId, topics }) => {
console.log('Committed:', { groupId, topics });
});
consumer.on('consumer.group_join', ({ duration, groupId }) => {
console.log('Joined group:', { duration, groupId });
});

Event-Driven Architecture: мышление Senior-инженера

Event vs. Command

Понимание различий критически важно:

  • Command: ProcessPayment(orderId) — ожидает ответ. Императивно.
  • Event: OrderPlaced(orderId) — факт, который произошёл. В прошедшем времени. Можно игнорировать.

Events развязывают сервисы. Если Payment Service недоступен, Order Service не падает — он просто публикует event и продолжает работу.

Паттерн Outbox (гарантированная доставка)

Проблема: вы сохраняете Order в DB, но Kafka падает до того, как вы публикуете Event. Данные становятся несогласованными.

Решение: 1. Начните DB Transaction. 2. Сохраните Order. 3. Сохраните запись Outbox (event_type, payload, status='PENDING') в той же DB transaction. 4. Выполните commit. 5. Отдельный процесс-poller читает таблицу Outbox и публикует в Kafka. 6. Пометьте запись Outbox как SENT.

Это гарантирует, что если заказ сохранён, событие в итоге будет опубликовано.

Идемпотентность: обработка дубликатов

Факт: Kafka гарантирует доставку «At Least Once». Вы обязательно получите одно и то же сообщение дважды — рано или поздно.

Решение: отслеживайте обработанные Message IDs.

async function handleEvent(event) {
// Проверить, не обработано ли уже
const exists = await db.processedEvents.exists(event.id);
if (exists) return; // Уже выполнено
// Логика обработки
await processOrder(event.data);
// Пометить как обработанное
await db.processedEvents.add(event.id);
}

Kafka vs. RabbitMQ: когда выбирать что

FactorKafkaRabbitMQ
ModelPull-based (Log)Push-based (Queue)
Best forstream processing, event sourcingtask queues, complex routing
Message retentionпо времени (replay)до потребления
Throughputмасштабирование до огромных объёмовумеренный

Правило Senior: используйте RabbitMQ для задач «сделай эту работу». Используйте Kafka для событий «это произошло», когда нужен replay или несколько consumers.

Лучшие практики

  1. Выбирайте количество partitions исходя из потребностей параллелизма consumers
  2. Используйте осмысленные keys для упорядочивания связанных сообщений
  3. Делайте consumers идемпотентными — сообщения могут доставляться более одного раза
  4. Мониторьте consumer lag для выявления проблем обработки
  5. Задавайте подходящий retention исходя из требований к replay
  6. Используйте consumer groups для масштабирования и отказоустойчивости
  7. Реализуйте dead letter topics для сообщений, обработка которых завершилась ошибкой
  8. Используйте паттерн Outbox для гарантированной доставки
  9. Проектируйте events как факты — прошедшее время, неизменяемость

Заключение

Kafka позволяет строить масштабируемые, отказоустойчивые событийно-ориентированные системы. Senior-инженеры понимают, что Event-Driven Architecture вводит «Eventual Consistency» — пользователь может оформить заказ, но он появится в истории не сразу, а через 2 секунды. Ваша задача — управлять компромиссами между Availability (EDA) и Consistency (Monolith). Используйте корректное разбиение на partitions для параллелизма, consumer groups для масштабирования и реализуйте правильную обработку ошибок с dead letter topics.

 
 
 
Языки
Темы
Copyright © 1999 — 2026
Зетка Интерактив