Тест на вакансию

Работа с Kafka в Docker-контейнере при помощи Python

9 февраля 2026 г.
26

Docker Compose файл

Напишем следующий docker-compose.yml:
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.3.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
  kafka:
    image: confluentinc/cp-kafka:7.3.0
    hostname: kafka
    container_name: kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "29092:29092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    container_name: kafka-ui
    ports:
      - "8080:8080"
    depends_on:
      - kafka
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
      KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
Далее можно сделать запуск контейнера:
docker-compose up -d

Архитектура системы

Zookeeper

zookeeper:
  image: confluentinc/cp-zookeeper:7.3.0

Предназначен для координации и управления кластером Kafka. Расположен на 2181  порту (стандартный порт Zookeeper).

Функции:
  • Хранит метаданные (какие брокеры живы)
  • Выбирает контроллер (controller) брокера
  • Управляет конфигурацией топиков
  • Отслеживает членство consumer групп

Kafka Broker

kafka:
  image: confluentinc/cp-kafka:7.3.0
  depends_on:
    - zookeeper

Это основной брокер сообщений (порты: 9092 и 29092). Запускается после Zookeeper.
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
Это настройки репликации. Для разработки подходит =1.
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_MIN_INSYNC_REPLICAS: 2
Для продакшена нужно ставить =3.
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
Эта настройка убирает задержку при перебалансировке consumer групп.

Kafka UI

kafka-ui:
  image: provectuslabs/kafka-ui:latest

Это веб-интерфейс для управления и мониторинга Kafka (порт: 8080).

Функции:
  • Просмотр топиков и сообщений
  • Мониторинг consumer групп
  • Управление топиками
  • Просмотр метрик

Python скрипты

Producer

Сделаем простой генератор тестовых данных для Kafka в формате JSON, который можно использовать для отладки и тестирования.
from confluent_kafka import Producer
from datetime import datetime
import json
import uuid

p = Producer({'bootstrap.servers': 'localhost:29092'})

def delivery_report(err, msg):
    if err is not None:
        print(f'Ошибка доставки: {err}')
    else:
        print(f'Сообщение доставлено в {msg.topic()} [{msg.partition()}]')

for i in range(10):
    now = datetime.now()
    guid = uuid.uuid4()
   
    json_data = {
        "event_id": str(guid),
        "event_type": "user_action",
        "user_id": (i+1),
        "timestamp": str(now),
        "metadata": {
            "browser": "Chrome",
            "os": "Windows 10"
        }
    }
    p.produce('json-topic', json.dumps(json_data).encode('utf-8'), callback=delivery_report)
    p.flush()

Consumer

Теперь создадим Consumer для чтения JSON сообщений из Kafka.
from confluent_kafka import Consumer
import json

c = Consumer({
    'bootstrap.servers': 'localhost:29092',
    'group.id': 'mygroup',
    'auto.offset.reset': 'earliest'
})

c.subscribe(['json-topic'])

while True:
    msg = c.poll(1.0)
   
    if msg is None:
        continue
    if msg.error():
        print(f"Ошибка: {msg.error()}")
        continue
    try:
        json_data = json.loads(msg.value().decode('utf-8'))
        print(f"Получен JSON: {json_data}")
    except json.JSONDecodeError as e:
        print(f"Ошибка декодирования JSON: {e}")
c.close()

Итого

Предложенная конфигурация docker-compose.yml позволяет запустить полнофункциональное окружение Kafka для разработки и тестирования одной командой docker-compose up -d. Не требуется сложной установки.

Использование двух портов (9092 для связи между контейнерами и 29092 для подключения с вашего компьютера) упрощает разработку приложений как внутри, так и вне Docker-сети.

Примеры на Python с использованием библиотеки confluent_kafka демонстрируют базовые, но рабочие шаблоны для создания Producer (отправителя) и Consumer (получателя).

 
Поделиться: