Kafka. Быстрый старт.

Apache Kafka — распределённый программный брокер сообщений с открытым исходным кодом, разрабатываемый в рамках фонда Apache на языках Java и Scala. Цель проекта — создание горизонтально масштабируемой платформы для обработки потоковых данных в реальном времени с высокой пропускной способностью и низкой задержкой.

Тип: Распределённая платформа потоковой передачи событий (Distributed Event Streaming Platform).

Архитектура: Использует архитектуру топиков (topics), где сообщения сохраняются в логах для длительного хранения, и потребители могут читать данные в своём темпе. Kafka ориентирован на высокопроизводительные, распределённые системы.

Использование: Подходит для работы с большими объемами данных и обработки событий в реальном времени (например, логирование, аналитика, события IoT). Используется для долгосрочного хранения данных и их обработки.

Плюсы:

  • Поддержка горизонтального масштабирования.
  • Высокая производительность для обработки большого количества данных.
  • Долговременное хранение сообщений.
  • Консистентность данных и возможности реиграции событий.

Минусы:

  • Сложность конфигурации и управления.
  • Не подходит для краткосрочных или транзакционных сообщений.

Далее будет рассмотрен пример использования системы для организации общения между микросервисами через Kafka на Python.

Чтобы запустить Kafka и создать топик my_topic, следуйте инструкциям ниже. Мы будем использовать Apache Kafka и ZooKeeper (ZooKeeper — это сервер координации, который требуется для работы Kafka).

Шаги:

1. Скачать и установить Kafka

Скачать Kafka:

  • Перейдите на страницу загрузки Kafka и выберите нужную версию (например, Kafka 3.0).
  • Или воспользуйтесь командой для загрузки последней версии Kafka:
wget https://dlcdn.apache.org/kafka/3.0.0/kafka_2.13-3.0.0.tgz

Распаковать Kafka:

tar -xzf kafka_2.13-3.0.0.tgz
cd kafka_2.13-3.0.0

2. Запуск ZooKeeper и Kafka

Kafka использует ZooKeeper для координации, поэтому сначала нужно запустить ZooKeeper, а затем сам Kafka.

Запустить ZooKeeper:

Kafka поставляется с встроенным ZooKeeper. Запустите его с помощью следующей команды:

bin/zookeeper-server-start.sh config/zookeeper.properties

Это запустит ZooKeeper на порту 2181 (по умолчанию).

Запустить Kafka:

В другом терминале запустите Kafka-брокер:

bin/kafka-server-start.sh config/server.properties

Это запустит Kafka на порту 9092 (по умолчанию).

3. Создать топик my_topic

После запуска ZooKeeper и Kafka можно создать топик с именем my_topic. Используйте следующую команду для создания топика:

bin/kafka-topics.sh --create --topic my_topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1

Здесь:

  • --bootstrap-server localhost:9092: Адрес брокера Kafka.
  • --partitions 1: Количество разделов для топика.
  • --replication-factor 1: Число реплик (можно установить в 1 для локального запуска).

Убедитесь, что топик создан:

bin/kafka-topics.sh --list --bootstrap-server localhost:9092

В списке должен появиться my_topic.

4. Тестирование

Запустить producer для отправки сообщений:

  • В другом терминале выполните команду, чтобы отправлять сообщения в my_topic:
bin/kafka-console-producer.sh --topic my_topic --bootstrap-server localhost:9092

Введите несколько сообщений вручную и нажмите Enter.

Запустить consumer для получения сообщений:

  • В другом терминале выполните команду для чтения сообщений:
bin/kafka-console-consumer.sh --topic my_topic --from-beginning --bootstrap-server localhost:9092

Вы увидите сообщения, отправленные продюсером.

5. Запуск Python-кода

Теперь, когда Kafka запущен и топик создан, можно использовать игрушечный пример кода на Python, который будет чуть ниже, чтобы продемонстрировать отправку и получение сообщений через Kafka.

6. Остановка Kafka и ZooKeeper

После тестирования можно остановить Kafka и ZooKeeper:

  • Остановите Kafka:
bin/kafka-server-stop.sh
  • Остановите ZooKeeper:
bin/zookeeper-server-stop.sh

Таким образом, вы настроите и запустите Kafka локально, создадите топик и сможете тестировать общение микросервисов через Python и Kafka.

Демонстрация общения микросервисов с помощью Kafka

Для организации общения между микросервисами через Kafka на Python, можно использовать библиотеку confluent-kafka, которая обеспечивает удобный интерфейс для взаимодействия с Apache Kafka.

Шаги:

  1. Установить Kafka и библиотеку confluent-kafka
pip install confluent-kafka

2. Настроить брокер Kafka (локально или в Docker), если он еще не запущен.

3. Написать два простых микросервиса: один будет производителем (producer), а другой — потребителем (consumer).

Пример кода:

1. Producer (отправка сообщений)

from confluent_kafka import Producer

# Конфигурация для подключения к Kafka
producer_config = {
    'bootstrap.servers': 'localhost:9092'  # Адрес брокера Kafka
}

# Создаем объект продюсера
producer = Producer(producer_config)

# Функция, вызываемая при подтверждении отправки
def delivery_report(err, msg):
    if err:
        print(f'Ошибка отправки сообщения: {err}')
    else:
        print(f'Сообщение отправлено в {msg.topic()} [{msg.partition()}]')

# Отправляем несколько сообщений
for i in range(10):
    message = f'Сообщение номер {i}'
    producer.produce('my_topic', message.encode('utf-8'), callback=delivery_report)

# Ожидаем завершения всех сообщений
producer.flush()

2. Consumer (получение сообщений)

from confluent_kafka import Consumer, KafkaError

# Конфигурация для подключения к Kafka
consumer_config = {
    'bootstrap.servers': 'localhost:9092',  # Адрес брокера Kafka
    'group.id': 'my_group',                # Группа потребителей
    'auto.offset.reset': 'earliest'         # Начинать чтение с самого начала, если нет сохраненного смещения
}

# Создаем объект консюмера
consumer = Consumer(consumer_config)

# Подписываемся на топик
consumer.subscribe(['my_topic'])

# Чтение сообщений
try:
    while True:
        msg = consumer.poll(timeout=1.0)  # Ожидаем получения сообщения
        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                continue
            else:
                print(f'Ошибка: {msg.error()}')
                break

        print(f'Получено сообщение: {msg.value().decode("utf-8")}')

except KeyboardInterrupt:
    pass
finally:
    # Закрываем консюмер
    consumer.close()

Пояснение:

  1. Producer:
    • Отправляет сообщения в топик my_topic.
    • Использует метод produce для отправки сообщений с асинхронной обработкой.
    • После отправки всех сообщений, вызывается flush, чтобы дождаться завершения всех запросов.
  2. Consumer:
    • Подписывается на топик my_topic и прослушивает его.
    • Читает сообщения в бесконечном цикле, используя метод poll.
    • Сообщения выводятся на экран.

Запуск:

  1. Убедитесь, что Kafka запущен, и топик my_topic создан.
  2. Запустите producer для отправки сообщений.
  3. Запустите consumer для получения сообщений.

Таким образом, микросервисы могут общаться через Kafka, отправляя и получая сообщения в различных топиках.