Apache Kafka — распределённый программный брокер сообщений с открытым исходным кодом, разрабатываемый в рамках фонда Apache на языках Java и Scala. Цель проекта — создание горизонтально масштабируемой платформы для обработки потоковых данных в реальном времени с высокой пропускной способностью и низкой задержкой.
Тип: Распределённая платформа потоковой передачи событий (Distributed Event Streaming Platform).
Архитектура: Использует архитектуру топиков (topics), где сообщения сохраняются в логах для длительного хранения, и потребители могут читать данные в своём темпе. Kafka ориентирован на высокопроизводительные, распределённые системы.
Использование: Подходит для работы с большими объемами данных и обработки событий в реальном времени (например, логирование, аналитика, события IoT). Используется для долгосрочного хранения данных и их обработки.
Плюсы:
- Поддержка горизонтального масштабирования.
- Высокая производительность для обработки большого количества данных.
- Долговременное хранение сообщений.
- Консистентность данных и возможности реиграции событий.
Минусы:
- Сложность конфигурации и управления.
- Не подходит для краткосрочных или транзакционных сообщений.
Далее будет рассмотрен пример использования системы для организации общения между микросервисами через Kafka на Python.
Чтобы запустить Kafka и создать топик my_topic
, следуйте инструкциям ниже. Мы будем использовать Apache Kafka и ZooKeeper (ZooKeeper — это сервер координации, который требуется для работы Kafka).
Шаги:
1. Скачать и установить Kafka
Скачать Kafka:
- Перейдите на страницу загрузки Kafka и выберите нужную версию (например, Kafka 3.0).
- Или воспользуйтесь командой для загрузки последней версии Kafka:
wget https://dlcdn.apache.org/kafka/3.0.0/kafka_2.13-3.0.0.tgz
Распаковать Kafka:
tar -xzf kafka_2.13-3.0.0.tgz
cd kafka_2.13-3.0.0
2. Запуск ZooKeeper и Kafka
Kafka использует ZooKeeper для координации, поэтому сначала нужно запустить ZooKeeper, а затем сам Kafka.
Запустить ZooKeeper:
Kafka поставляется с встроенным ZooKeeper. Запустите его с помощью следующей команды:
bin/zookeeper-server-start.sh config/zookeeper.properties
Это запустит ZooKeeper на порту 2181 (по умолчанию).
Запустить Kafka:
В другом терминале запустите Kafka-брокер:
bin/kafka-server-start.sh config/server.properties
Это запустит Kafka на порту 9092 (по умолчанию).
3. Создать топик my_topic
После запуска ZooKeeper и Kafka можно создать топик с именем my_topic
. Используйте следующую команду для создания топика:
bin/kafka-topics.sh --create --topic my_topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
Здесь:
--bootstrap-server localhost:9092
: Адрес брокера Kafka.--partitions 1
: Количество разделов для топика.--replication-factor 1
: Число реплик (можно установить в 1 для локального запуска).
Убедитесь, что топик создан:
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
В списке должен появиться my_topic
.
4. Тестирование
Запустить producer для отправки сообщений:
- В другом терминале выполните команду, чтобы отправлять сообщения в
my_topic
:
bin/kafka-console-producer.sh --topic my_topic --bootstrap-server localhost:9092
Введите несколько сообщений вручную и нажмите Enter.
Запустить consumer для получения сообщений:
- В другом терминале выполните команду для чтения сообщений:
bin/kafka-console-consumer.sh --topic my_topic --from-beginning --bootstrap-server localhost:9092
Вы увидите сообщения, отправленные продюсером.
5. Запуск Python-кода
Теперь, когда Kafka запущен и топик создан, можно использовать игрушечный пример кода на Python, который будет чуть ниже, чтобы продемонстрировать отправку и получение сообщений через Kafka.
6. Остановка Kafka и ZooKeeper
После тестирования можно остановить Kafka и ZooKeeper:
- Остановите Kafka:
bin/kafka-server-stop.sh
- Остановите ZooKeeper:
bin/zookeeper-server-stop.sh
Таким образом, вы настроите и запустите Kafka локально, создадите топик и сможете тестировать общение микросервисов через Python и Kafka.
Демонстрация общения микросервисов с помощью Kafka
Для организации общения между микросервисами через Kafka на Python, можно использовать библиотеку confluent-kafka
, которая обеспечивает удобный интерфейс для взаимодействия с Apache Kafka.
Шаги:
- Установить Kafka и библиотеку
confluent-kafka
pip install confluent-kafka
2. Настроить брокер Kafka (локально или в Docker), если он еще не запущен.
3. Написать два простых микросервиса: один будет производителем (producer), а другой — потребителем (consumer).
Пример кода:
1. Producer (отправка сообщений)
from confluent_kafka import Producer
# Конфигурация для подключения к Kafka
producer_config = {
'bootstrap.servers': 'localhost:9092' # Адрес брокера Kafka
}
# Создаем объект продюсера
producer = Producer(producer_config)
# Функция, вызываемая при подтверждении отправки
def delivery_report(err, msg):
if err:
print(f'Ошибка отправки сообщения: {err}')
else:
print(f'Сообщение отправлено в {msg.topic()} [{msg.partition()}]')
# Отправляем несколько сообщений
for i in range(10):
message = f'Сообщение номер {i}'
producer.produce('my_topic', message.encode('utf-8'), callback=delivery_report)
# Ожидаем завершения всех сообщений
producer.flush()
2. Consumer (получение сообщений)
from confluent_kafka import Consumer, KafkaError
# Конфигурация для подключения к Kafka
consumer_config = {
'bootstrap.servers': 'localhost:9092', # Адрес брокера Kafka
'group.id': 'my_group', # Группа потребителей
'auto.offset.reset': 'earliest' # Начинать чтение с самого начала, если нет сохраненного смещения
}
# Создаем объект консюмера
consumer = Consumer(consumer_config)
# Подписываемся на топик
consumer.subscribe(['my_topic'])
# Чтение сообщений
try:
while True:
msg = consumer.poll(timeout=1.0) # Ожидаем получения сообщения
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
print(f'Ошибка: {msg.error()}')
break
print(f'Получено сообщение: {msg.value().decode("utf-8")}')
except KeyboardInterrupt:
pass
finally:
# Закрываем консюмер
consumer.close()
Пояснение:
- Producer:
- Отправляет сообщения в топик
my_topic
. - Использует метод
produce
для отправки сообщений с асинхронной обработкой. - После отправки всех сообщений, вызывается
flush
, чтобы дождаться завершения всех запросов.
- Отправляет сообщения в топик
- Consumer:
- Подписывается на топик
my_topic
и прослушивает его. - Читает сообщения в бесконечном цикле, используя метод
poll
. - Сообщения выводятся на экран.
- Подписывается на топик
Запуск:
- Убедитесь, что Kafka запущен, и топик
my_topic
создан. - Запустите producer для отправки сообщений.
- Запустите consumer для получения сообщений.
Таким образом, микросервисы могут общаться через Kafka, отправляя и получая сообщения в различных топиках.