1
1
Fork 0
java-interview/kafka.md

1421 lines
96 KiB
Markdown
Raw Permalink Normal View History

[Вопросы для собеседования](README.md)
# Apache Kafka
* [Что такое Apache Kafka?](#что-такое-apache-kafka)
* [Основные компоненты Kafka](#основные-компоненты-kafka)
**Архитектура компонентов**
* Topic
* [Архитектура топика](#архитектура-топика)
* [Настройки топика Kafka](#настройки-топика-kafka)
* Broker
* [Архитектура брокера](#архитектура-брокера)
* [Настройки брокера Kafka](#настройки-брокера-kafka)
* Producer
* [Архитектура продюсера](#архитектура-продюсера)
* [Настройки продюсера](#настройки-продюсера)
* [Пример конфигурации Kafka Producer](#пример-конфигурации-kafka-producer)
* Consumer
* [Архитектура консюмера](#архитектура-консюмера)
* [Настройки консюмера](#настройки-консюмера)
* [Пример конфигурации Kafka Consumer](#пример-конфигурации-kafka-consumer)
**Kafka API**
* [Основные API Kafka](#основные-api-kafka)
* [Какова роль Producer API?](#какова-роль-producer-api)
* [Какова роль Consumer API?](#какова-роль-consumer-api)
* [Какова роль Connector API?](#какова-роль-connector-api)
* [Какова роль Streams API?](#какова-роль-streams-api)
* [Какова роль Transactions API?](#какова-роль-transactions-api)
* [Какова роль Quota API?](#какова-роль-quota-api)
* [Какова роль AdminClient API?](#какова-роль-AdminClient-api)
**Kafka Consumer**
* [Для чего нужен координатор группы?](#для-чего-нужен-координатор-группы)
* [Для чего нужен Consumer heartbeat thread?](#для-чего-нужен-consumer-heartbeat-thread)
* [Как Kafka обрабатывает сообщения?](#как-kafka-обрабатывает-сообщения)
* [Как Kafka обрабатывает задержку консюмера?](#как-kafka-обрабатывает-задержку-консюмера)
* [Для чего нужны методы subscribe() и poll()?](#для-чего-нужны-методы-subscribe-и-poll)
* [Для чего нужен метод position()?](#для-чего-нужен-метод-position)
* [Для чего нужны методы commitSync() и commitAsync()?](#для-чего-нужны-методы-commitsync-и-commitasync)
**Другие вопросы**
* [Для чего нужен идемпотентный продюсер?](#для-чего-нужен-идемпотентный-продюсер)
* [Для чего нужен интерфейс Partitioner?](#для-чего-нужен-интерфейс-partitioner)
* [Для чего нужен Broker log cleaner thread?](#для-чего-нужен-broker-log-cleaner-thread)
* [Для чего нужен Kafka Mirror Maker?](#для-чего-нужен-kafka-mirror-maker)
* [Для чего нужна Schema Registry?](#для-чего-нужна-schema-registry)
* [Для чего нужен Streams DSL?](#для-чего-нужен-streams-dsl)
* [Как Kafka обеспечивает версионирование сообщений?](#как-kafka-обеспечивает-версионирование-сообщений)
* [Как потребители получают сообщения от брокера?](#как-потребители-получают-сообщения-от-брокера)
**Сравнение с другими компонентами и системами**
* [В чем разница между Kafka Consumer и Kafka Stream?](#в-чем-разница-между-kafka-consumer-и-kafka-stream)
* [В чем разница между Kafka Streams и Apache Flink?](#в-чем-разница-между-kafka-streams-и-apache-flink)
* [В чем разница между Kafka и Flume?](#в-чем-разница-между-kafka-и-flume)
* [В чем разница между Kafka и RabbitMQ?](#в-чем-разница-между-kafka-и-rabbitmq)
## Что такое Apache Kafka?
Это распределённая система с открытым исходным кодом, разработанная для высокоскоростной передачи больших объёмов данных
с минимальной задержкой.
### Преимущества
* Персистентность данных
* Высокая производительность
* Независимость пайплайнов обработки
* Возможность просмотреть историю записей заново
* Гибкость в использовании
### Когда использовать
* λ-архитектура или k-архитектура
* Стриминг больших данных
* Много клиентов (producer и consumer)
* Требуется кратное масштабирование
### Чего в Kafka нет из коробки
* Это не брокер сообщений
* Отложенные сообщения
* DLQ
* AMQP / MQTT
* TTL на сообщение
* Очереди с приоритетами
[к оглавлению](#apache-kafka)
## Основные компоненты Kafka
* **Producer (Производитель)** — приложение, которое публикует сообщения в топики Kafka
* **Consumer (Потребитель)** — приложение, которое подписывается на топики и читает сообщения
* **Broker (Брокер)** — сервер Kafka, который принимает, хранит и распределяет сообщения. В кластере Kafka может быть несколько брокеров
* **Topic (Топик)** — логическое разделение, по которому организуются данные. Производители отправляют сообщения в топики, а потребители читают из них
* **Partition (Раздел)** — каждый топик разделён на партиции для параллельной обработки. Сообщения в партициях упорядочены
* **Zookeeper** — сервис, используемый Kafka для управления состоянием кластера и координации брокеров.
Однако в новых версиях Kafka отказывается от Zookeeper в пользу собственного механизма метаданных KRaft (Kafka Raft).
Это новая внутренняя архитектура метаданных Kafka, которая устраняет зависимость от Zookeeper. Она основана на Raft-консенсусе,
позволяя Kafka брокерам самостоятельно управлять метаданными и координировать взаимодействие между собой.
[к оглавлению](#apache-kafka)
## Архитектура топика
* **Топик разбит на партиции** — сообщения в топике распределяются по партициям для более эффективной параллельной обработки и хранения
* **Партиции хранятся на диске** — Kafka сохраняет данные на диск, что позволяет долговременно хранить сообщения
* **Партиции делятся на сегменты** — сегмент представляет собой обычный файл на диске, сегменты делятся на пассивные и активный.
Запись происходит в активный сегмент
* **Данные удаляются либо по времени, либо по размеру**. Удаление происходит посегментно, с самого старого сегмента
* **retention.bytes** - по максимальному размеру
* **retention.ms** - по времени
* **Сообщение можно быстро найти по его Offset** — каждому сообщению в партиции присваивается уникальный смещающий индекс (offset), по которому можно легко найти сообщение
[к оглавлению](#apache-kafka)
## Настройки топика Kafka
### Репликация
* `replication.factor`
* **Описание**: Количество реплик для каждой партиции топика
* **Пример**: `replication.factor=3`
* `min.insync.replicas`
* **Описание**: Минимальное количество синхронизированных реплик
* **Пример**: `min.insync.replicas=2`
### Хранение данных
* `retention.ms`
* **Описание**: Время хранения сообщений в топике в миллисекундах
* **Пример**: `retention.ms=604800000` (7 дней)
* `retention.bytes`
* **Описание**: Максимальный объём данных в топике, после чего старые сообщения удаляются
* **Пример**: `retention.bytes=10737418240` (10 GB)
* `segment.bytes`
* **Описание**: Размер сегмента логов топика
* **Пример**: `segment.bytes=1073741824` (1 GB)
### Политики очистки
* `cleanup.policy`
* **Описание**: Как Kafka обрабатывает старые сообщения
* **Значения**: `delete`, `compact`
* **Пример**: `cleanup.policy=delete`
### Партиции
* `num.partitions`
* **Описание**: Количество партиций в топике
* **Пример**: `num.partitions=3`
[к оглавлению](#apache-kafka)
## Архитектура брокера
* **У каждой партиции свой лидер** — в Kafka для каждой партиции в топике назначается лидер-брокер, который отвечает
за запись и чтение данных
* **Сообщения пишутся в лидера** — производители отправляют сообщения напрямую в брокер-лидер партиции
* **Данные реплицируются между брокерами** — для обеспечения отказоустойчивости Kafka реплицирует данные партиций на
другие брокеры, которые становятся репликами
* **Автоматический фейловер лидера** — в случае сбоя брокера-лидера Kafka автоматически назначает новый лидер из числа
реплик, обеспечивая бесшовную работу системы
[к оглавлению](#apache-kafka)
## Настройки брокера Kafka
### Репликация и консистентность
* `min.insync.replicas`
* **Описание**: Минимальное количество синхронизированных реплик для подтверждения записи
* **Пример**: `min.insync.replicas=2`
* `unclean.leader.election.enable`
* **Описание**: Разрешает выбор лидера из неактуальных реплик, если нет синхронизированных реплик
* **Пример**: `unclean.leader.election.enable=false`
### Логирование и хранение данных
* `log.dirs`
* **Описание**: Директория на диске, где хранятся логи партиций
* **Пример**: `log.dirs=/var/lib/kafka/logs`
* `log.retention.hours`
* **Описание**: Максимальное время хранения данных в логах
* **Пример**: `log.retention.hours=168` (7 дней)
* `log.segment.bytes`
* **Описание**: Максимальный размер сегмента лога, после чего создаётся новый
* **Пример**: `log.segment.bytes=1073741824` (1 GB)
### Производительность и задержки
* `num.network.threads`
* **Описание**: Количество потоков для обработки сетевых запросов
* **Пример**: `num.network.threads=3`
* `num.io.threads`
* **Описание**: Количество потоков для ввода-вывода
* **Пример**: `num.io.threads=8`
* `socket.send.buffer.bytes`
* **Описание**: Размер буфера для отправки данных по сети
* **Пример**: `socket.send.buffer.bytes=102400`
### Управление сообщениями
* `message.max.bytes`
* **Описание**: Максимальный размер сообщения, которое брокер может принять
* **Пример**: `message.max.bytes=1048576` (1 MB)
* `replica.fetch.max.bytes`
* **Описание**: Максимальный размер данных для запроса реплики
* **Пример**: `replica.fetch.max.bytes=1048576` (1 MB)
### Безопасность
* `ssl.keystore.location`
* **Описание**: Путь к хранилищу ключей SSL
* **Пример**: `ssl.keystore.location=/var/private/ssl/kafka.keystore.jks`
* `ssl.truststore.location`
* **Описание**: Путь к хранилищу доверенных сертификатов
* **Пример**: `ssl.truststore.location=/var/private/ssl/kafka.truststore.jks`
[к оглавлению](#apache-kafka)
## Архитектура продюсера
* **Создание сообщения (Record)**: Продюсер формирует сообщение, содержащее ключ (необязательный), значение и метаданные,
такие как время отправки. Сообщение отправляется в топик (Topic), который состоит из одной или нескольких партиций
* **Выбор партиции**: Если ключ сообщения указан, Kafka использует его для хеширования и определения, в какую партицию
записать сообщение (сообщения с одинаковым ключом попадают в одну и ту же партицию). Если ключа нет, Kafka распределяет
сообщения по партициям с помощью round-robin или по другим правилам
* **Отправка сообщений в буфер (Batching)**: Для повышения производительности продюсер Kafka не отправляет каждое сообщение
по отдельности, а группирует несколько сообщений в пакеты (batching), прежде чем отправить их брокеру. Это снижает
сетевые задержки и нагрузку на брокера
* **Сжатие (Compression)**: Для уменьшения объёма передаваемых данных продюсер может сжимать сообщения с использованием
таких алгоритмов, как GZIP, Snappy или LZ4. Сжатие снижает нагрузку на сеть и хранение, но добавляет небольшие накладные
расходы на процессор
* **Асинхронная отправка**: Продюсер отправляет пакеты сообщений асинхронно. Это означает, что сообщения записываются в
буфер памяти и отправляются брокеру, не ожидая завершения предыдущих операций. Это повышает пропускную способность
* **Подтверждения (Acknowledgments)**: Kafka позволяет настраивать уровень подтверждений от брокеров
* **Ретрай и идемпотентность**: Если отправка сообщения не удалась, продюсер может повторить попытку отправки (ретрай).
Также можно включить идемпотентный режим продюсера, что предотвращает повторную отправку одного и того же сообщения в
случае сбоя, обеспечивая отправку уникального сообщения один раз
* **Error handling**: Продюсер обрабатывает ошибки при отправке сообщений. В зависимости от настроек продюсер может
попытаться переотправить сообщение или сообщить о проблеме через callback
### Резюме
* Продюсер выбирает партицию для сообщения
* Продюсер выбирает уровень гарантии доставки
* В продюсере можно тюнить производительность
[к оглавлению](#apache-kafka)
## Настройки продюсера
### Bootstrap-серверы (`bootstrap.servers`)
* **Описание**: Указывает адреса брокеров Kafka, к которым продюсер должен подключаться для отправки сообщений
* **Пример**: `bootstrap.servers: localhost:9092,localhost:9093`
* **Зачем это нужно**: Kafka продюсер использует эти брокеры для получения метаданных о кластере (например, информация о топиках и партициях). Эти брокеры служат точками входа в кластер Kafka.
### Сериализация ключа и значения
Продюсер должен преобразовывать (сериализовать) данные в байтовый формат перед отправкой в Kafka
* **Ключевая настройка для сериализации ключа:**
* `key.serializer`
* Пример: `key.serializer: org.apache.kafka.common.serialization.StringSerializer`
* **Ключевая настройка для сериализации значения:**
* `value.serializer`
* Пример: `value.serializer: org.apache.kafka.common.serialization.StringSerializer`
**Варианты сериализаторов:**
* `StringSerializer` для строк
* `ByteArraySerializer` для массива байтов
* `LongSerializer` для чисел
* Также можно реализовать свои собственные сериализаторы
### Отправка сообщений в буфер
Продюсер Kafka отправляет сообщения асинхронно, и для этого используется буферизация сообщений
* **batch.size**: Размер одного пакета (batch), который продюсер отправляет брокеру
* **Описание**: Определяет количество байтов сообщений, которые могут быть буферизованы в одном пакете перед отправкой брокеру
* **Пример**: `"batch.size": 16384` (16 KB)
* **Зачем это нужно**: Большие пакеты могут повысить производительность, но могут увеличить задержки
* **linger.ms**: Максимальное время ожидания перед отправкой пакета
* **Описание**: Продюсер может немного подождать, пока буфер накопит сообщения, чтобы отправить больше данных за один раз
* **Пример**: `linger.ms: 5` (время ожидания 5 мс)
* **Зачем это нужно**: Позволяет продюсеру собирать больше сообщений в пакете перед отправкой, что может улучшить эффективность использования сети
* **buffer.memory**: Размер выделенной памяти для буферизации сообщений
* **Описание**: Общий объем памяти, который продюсер может использовать для хранения сообщений, ожидающих отправки
* **Пример**: `buffer.memory: 33554432` (32 MB)
* **Зачем это нужно**: Если буфер заполняется, продюсер приостанавливает отправку сообщений, пока буфер не освободится
### Сжатие сообщений
Продюсер может сжимать сообщения для уменьшения объема передаваемых данных
* **compression.type**
* **Описание**: Указывает тип сжатия для сообщений
* **Пример**: `compression.type: gzip` (варианты: none, gzip, snappy, lz4, zstd)
* **Зачем это нужно**: Сжатие уменьшает объем данных, передаваемых по сети, что может снизить нагрузку на сеть и хранилище,
особенно при больших объемах сообщений. Однако это может потребовать дополнительных ресурсов на сжатие/разжатие
### Распределение сообщений по партициям (партицирование)
* **partitioner.class**
* **Описание**: определяет логику, по которой продюсер выбирает партицию для каждого сообщения
* **Примеры**:
* **если настройка не задана**, по умолчанию используется `DefaultPartitioner` , который может распределять сообщения по партициям
равномерно или на основе ключа сообщения
* `partitioner.class: o.a.k.clients.producer.RoundRobinPartitioner` использует метод Round Robin для распределения сообщений
* `partitioner.class: o.a.k.clients.producer.UniformStickyPartitioner` равномерно отправляет сообщения, привязываясь
к партиции на короткий промежуток времени, чтобы уменьшить нагрузку на брокеры
### Подтверждения (acks)
Настройка определяет, как много брокеров должны подтвердить получение сообщения перед тем, как продюсер будет считать его
успешно отправленным
* **acks**
* **Описание**: Определяет количество подтверждений от брокеров
* **Значения**:
* `0`: Продюсер не ждёт подтверждений (самая быстрая отправка, но высокий риск потери сообщений)
* `1`: Продюсер ждёт подтверждения от лидера партиции
* `all` (или `-1`): Продюсер ждёт подтверждений от всех реплик (наибольшая надежность, но увеличенные задержки)
* **Пример**: `acks: all`
* **Зачем это нужно**: Позволяет выбрать баланс между скоростью и надежностью отправки данных.
### Дополнительные важные настройки
* **Количество повторных попыток (retries):**
* **Описание**: Определяет, сколько раз продюсер должен попытаться отправить сообщение при неудаче
* **Пример**: `retries: 3`
* **Зачем это нужно**: Если произошёл временный сбой, продюсер может попытаться повторить отправку сообщений, что
увеличивает шанс доставки
* **Идемпотентность продюсера (enable.idempotence):**
* **Описание**: Включение идемпотентного режима, что предотвращает дублирование сообщений при сбоях
* **Пример**: `enable.idempotence: true`
* **Зачем это нужно**: Гарантирует, что каждое сообщение будет доставлено ровно один раз
* **Максимальный размер сообщения (max.request.size):**
* **Описание**: Максимальный размер сообщения, которое продюсер может отправить брокеру
* **Пример**: `max.request.size: 1048576` (1 MB)
* **Зачем это нужно**: Ограничивает размер сообщений, которые могут быть отправлены, чтобы избежать перегрузки сети и брокеров.
* **Таймаут ожидания подтверждений (request.timeout.ms):**
* **Описание**: Максимальное время ожидания подтверждения от брокера
* **Пример**: `request.timeout.ms: 30000` (30 секунд)
* **Зачем это нужно**: Помогает избежать бесконечного ожидания ответа от брокера в случае его сбоя
[к оглавлению](#apache-kafka)
## Пример конфигурации Kafka Producer
```java
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class KafkaStringArrayProducer {
public static void main(String[] args) {
// Настройки Kafka Producer
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// Создание Kafka Producer
KafkaProducer<String, String[]> producer = new KafkaProducer<>(props);
String key = "user123";
String[] value = {"message1", "message2", "message3"};
// Создание записи и добавление заголовков
ProducerRecord<String, String> record = new ProducerRecord<>("my_topic", key, value);
record.headers().add("traceId", "someTraceId");
// Отправка сообщения в Kafka
producer.send(record, (metadata, exception) -> {
if (exception != null) {
System.out.println("Ошибка при отправке сообщения: " + exception.getMessage());
} else {
System.out.println("Сообщение отправлено в топик " + metadata.topic() + " с партицией " + metadata.partition());
}
});
producer.close();
}
}
```
```properties
acks=all
retries=3
compression.type=gzip
```
### С использованием Spring Kafka
```java
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.config.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.producer.Producer;
import org.springframework.kafka.producer.ProducerRecord;
import java.util.HashMap;
import java.util.Map;
@EnableKafka
@Configuration
public class KafkaProducerConfig {
@Autowired
private KafkaProperties kafkaProperties;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getServer());
props.put(ProducerConfig.CLIENT_ID_CONFIG, kafkaProperties.getProducerId());
props.put(
ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
"com.example.configuration.kafka.KafkaProducerLoggingInterceptor"
);
if ("SASL_SSL".equals(kafkaProperties.getSecurityProtocol())) {
props.put("ssl.truststore.location", kafkaProperties.getSslTrustStoreLocation());
props.put("ssl.truststore.password", kafkaProperties.getSslTrustStorePassword());
props.put("ssl.truststore.type", kafkaProperties.getSslTrustStoreType());
props.put("ssl.keystore.type", kafkaProperties.getSslKeyStoreType());
props.put("sasl.mechanism", kafkaProperties.getSaslMechanism());
props.put("security.protocol", kafkaProperties.getSecurityProtocol());
props.put("sasl.jaas.config", kafkaProperties.getJaasConfigCompiled());
}
return props;
}
@Bean
public ProducerFactory<String, String> producerFactory() {
var stringSerializerKey = new StringSerializer();
stringSerializerKey.configure(Map.of("key.serializer.encoding", "UTF-8"), true);
stringSerializerKey.configure(Map.of("serializer.encoding", "UTF-8"), true);
var stringSerializerValue = new StringSerializer();
stringSerializerValue.configure(Map.of("value.serializer.encoding", "UTF-8"), false);
stringSerializerValue.configure(Map.of("serializer.encoding", "UTF-8"), false);
return new DefaultKafkaProducerFactory<>(producerConfigs(), stringSerializerKey, stringSerializerValue);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
```
```java
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaProducerService {
private final KafkaTemplate<String, String> kafkaTemplate;
public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(String message, String key, String topic) {
try {
log.info("Sending message {}", data);
kafkaTemplate.send(topic, key, message);
log.info("Successfully send message {}", data);
} catch (Exception ex) {
log.error("Failed send message to {} topic by key {}", key, topic);
throw ex;
}
}
}
```
```java
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping("/kafka")
public class KafkaController {
@Autowired
private KafkaProducerService kafkaProducerService;
@PostMapping("/send")
public String sendMessage(@RequestParam String message, @RequestParam String key, @RequestParam String topic) {
kafkaProducerService.sendMessage(message, key, topic);
return "Message sent to Kafka!";
}
}
```
### С использованием Spring Cloud Stream
```yaml
spring:
cloud:
stream:
bindings:
output:
destination: my_topic
kafka:
binder:
brokers: localhost:9092
```
```java
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Service;
@Service
@EnableBinding(Source.class) // Подключение к каналу сообщений
public class KafkaStreamProducer {
private final Source source;
public KafkaStreamProducer(Source source) {
this.source = source;
}
public void sendMessage(String message) {
Message<String> msg = MessageBuilder.withPayload(message).build();
source.output().send(msg); // Отправка сообщения в Kafka
}
}
```
```java
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping("/kafka-stream")
public class KafkaStreamController {
@Autowired
private KafkaStreamProducer kafkaStreamProducer;
@PostMapping("/send")
public String sendMessage(@RequestParam String message) {
kafkaStreamProducer.sendMessage(message);
return "Message sent to Kafka via Spring Cloud Stream!";
}
}
```
[к оглавлению](#apache-kafka)
## Архитектура консюмера
Потребители используют **Kafka Consumer API** для взаимодействия с брокерами Kafka. Они получают сообщения и обрабатывают
их согласно своей логике. Потребители могут быть объединены в группы **Consumer Groups**.
### Резюме
* "Smart" консюмер
* Консюмер опрашивает кафку
* Консюмер отвечает за гарантию обработки
* Автоматические фейловер в консюмер-группе
* Независимая обработка разными консюмер-группе
### Компоненты
#### Consumer Group
Kafka использует концепцию Consumer Groups, что позволяет нескольким потребителям работать вместе, чтобы параллельно
обрабатывать данные из топиков. Каждый потребитель в группе обрабатывает только часть данных из топика, обеспечивая масштабируемость и балансировку нагрузки.
* Все сообщения из одного Kafka Topic делятся между всеми потребителями в группе
* Если в группе несколько потребителей, Kafka гарантирует, что каждая партиция топика будет обрабатываться только одним потребителем
* В случае если один из потребителей выходит из строя, его партиции автоматически перераспределяются между оставшимися активными потребителями
#### Offset (Смещение)
Потребитель отслеживает offset каждой партиции, чтобы понимать, с какого сообщения продолжать чтение. Смещение — это
уникальный идентификатор каждого сообщения в партиции.
Потребители могут хранить offset в Kafka или вне её (например, в базе данных или файловой системе). Если потребитель
отключается, он может возобновить обработку с того места, где остановился, прочитав сохранённый offset.
#### Poll (Опрос)
Потребители используют метод poll() для опроса Kafka на наличие новых сообщений. Это асинхронный процесс, и Kafka будет
отправлять потребителю доступные сообщения по мере их поступления.
* Потребитель может указывать тайм-аут, после которого метод poll() вернёт пустой результат, если сообщений нет.
* Потребитель должен обрабатывать сообщения, а затем снова опрашивать Kafka для получения новых данных.
### Процесс работы
1. **Инициализация**: Потребитель подключается к Kafka-брокерам и присоединяется к consumer group. Он получает информацию о партиции топика, который будет читать.
2. **Подписка на топик**: Потребитель подписывается на определённые топики с помощью метода `subscribe()`.
3. **Опрос**: Потребитель вызывает метод `poll()` для получения новых сообщений. Если в очереди есть сообщения, они передаются потребителю для обработки.
4. **Обработка сообщений**: Потребитель обрабатывает сообщения, извлекая полезную информацию из каждого.
5. **Подтверждение обработки**: После обработки сообщения потребитель подтверждает обработку с помощью `commit()`.
Это обновляет **offset**, позволяя потребителю продолжить чтение с места, на котором остановился.
6. **Обработка ошибок**: В случае ошибки потребитель может решить, как повторить обработку сообщения
(например, с использованием механизма повторных попыток).
7. **Завершение работы**: Когда потребитель завершает обработку, он выходит из consumer group и может закрыть соединение с Kafka.
[к оглавлению](#apache-kafka)
## Настройки консюмера
* **bootstrap.servers** — список брокеров, к которым будет подключаться потребитель
* **group.id** — идентификатор группы потребителей
* **auto.offset.reset** — настройка поведения при отсутствии offset (`earliest` для чтения с самого начала или `latest` для чтения с конца)
* **enable.auto.commit** — указывает, должен ли потребитель автоматически коммитить offset. Если `false`, потребитель должен делать это вручную
* **auto.commit.interval.ms** — определяет интервал времени между автоматическими коммитами offset сообщений, если включена автоматическая фиксация
* **max.poll.records** — максимальное количество сообщений, которые потребитель будет получать за один вызов `poll()`
* **session.timeout.ms** — максимальное время без общения с Kafka перед тем, как потребитель считается недоступным
* **client.rack** — используется для указания серверной стойки или дата-центра. Это особенно важно в случае, если у вас
есть распределённая инфраструктура Kafka с несколькими стойками или дата-центрами, где сообщения могут быть реплицированы
между разными физическими местоположениями (например, несколькими дата-центрами).
### Что такое Rack в контексте Kafka?
**Rack** — это метка, которая идентифицирует физическое местоположение брокеров Kafka. В Kafka можно задать rack для каждого брокера
с помощью параметра `broker.rack`, чтобы управлять репликацией данных, предпочтительно размещая реплики на разных физических машинах или в разных дата-центрах.
**Преимущества использования client.rack**
* **Снижение задержек**: Kafka будет предпочитать, чтобы данные попадали в тот же rack, где находится клиент, что уменьшает время отклика
* **Повышенная отказоустойчивость**: С правильной настройкой client.rack и broker.rack можно улучшить отказоустойчивость
за счет размещения реплик в разных физически удаленных местах
* **Лучшее использование ресурсов**: Правильное распределение нагрузки по rack помогает избежать перегрузки одного физического местоположения
[к оглавлению](#apache-kafka)
## Пример конфигурации Kafka Consumer
```java
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Collections;
public class KafkaConsumerExample {
public static void main(String[] args) {
String bootstrapServers = "localhost:9092";
String groupId = "my-consumer-group";
String topic = "my-topic";
// Настройки Consumer
Map<String, Object> consumerConfigs = new HashMap<>();
consumerConfigs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
consumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
consumerConfigs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerConfigs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// Создание Consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerConfigs);
// Подписка на тему
consumer.subscribe(Collections.singletonList(topic));
try {
// Чтение сообщений из Kafka
while (true) {
var records = consumer.poll(Duration.ofSeconds(1));
records.forEach(record -> System.out.println("Received message: " + record.value()));
}
} finally {
consumer.close();
}
}
}
```
**At least once**
Чтобы гарантировать обработку сообщений хотя бы один раз, нужно коммитить после обработки.
```java
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Collections;
public class KafkaConsumerAtLeastOnce {
public static void main(String[] args) {
try {
// Чтение сообщений
while (true) {
var records = consumer.poll(Duration.ofSeconds(1)); // Ожидание 1 секунду для получения сообщений
process(records);
consumer.commitAsync(); // Commit после обработки
}
} finally {
consumer.close(); // Закрытие consumer
}
}
}
```
**At most once**
Чтобы гарантировать обработку сообщений не более одного раза, нужно коммитить до обработки или включить авто-подтверждение смещений
`enable.auto.commit=true`.
```java
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Collections;
public class KafkaConsumerAtLeastOnce {
public static void main(String[] args) {
try {
// Чтение сообщений
while (true) {
var records = consumer.poll(Duration.ofSeconds(1)); // Ожидание 1 секунду для получения сообщений
consumer.commitAsync(); // Commit перед обработкой
process(records);
}
} finally {
consumer.close(); // Закрытие consumer
}
}
}
```
### С использованием Spring Kafka
```java
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Autowired
private KafkaProperties kafkaProperties;
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> configs = new HashMap<>();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getServer());
configs.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getConsumerGroupId());
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return configs;
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentMessageListenerContainerFactory<String, String> factory = new ConcurrentMessageListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
```
```java
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumer {
@KafkaListener(topics = "my_topic", groupId = "group_id")
public void listen(@Payload String message,
@Header("traceId") String traceId,
@Header("correlationId") String correlationId) {
System.out.println("Received message: " + message);
System.out.println("Trace ID: " + traceId);
System.out.println("Correlation ID: " + correlationId);
}
}
```
**At least once**
```yaml
spring:
kafka:
consumer:
enable-auto-commit: false # Отключение авто-commit
auto-offset-reset: earliest # Начинать чтение с самого начала (если нет смещения)
group-id: my-consumer-group
max-poll-records: 500 # Максимальное количество сообщений для обработки за один раз
listener:
ack-mode: manual # Ручное подтверждение
```
```java
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.listener.config.DefaultMessageListenerContainer;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.listener.MessageListenerContainer;
@EnableKafka
public class AtLeastOnceConsumer {
@KafkaListener(topics = "my-topic", groupId = "my-consumer-group")
public void listen(String message, Acknowledgment acknowledgment) {
System.out.println("Received message: " + message);
// Обработка сообщения
// Подтверждение смещения вручную после успешной обработки
acknowledgment.acknowledge();
}
}
```
**At most once**
```yaml
spring:
kafka:
consumer:
enable-auto-commit: true # Включение авто-commit
group-id: my-consumer-group
auto-offset-reset: earliest # Начинать чтение с самого начала
max-poll-records: 100 # Максимальное количество сообщений для обработки за один раз
```
```java
import org.springframework.kafka.annotation.KafkaListener;
public class AtMostOnceConsumer {
@KafkaListener(topics = "my-topic", groupId = "my-consumer-group")
public void listen(String message) {
System.out.println("Received message: " + message);
// Обработка сообщения...
// Смещение будет автоматически зафиксировано после получения сообщения
}
}
```
### С использованием Spring Cloud Stream
```yaml
spring:
cloud:
stream:
bindings:
input:
destination: my-topic
group: my-consumer-group
content-type: application/json
kafka:
binder:
brokers: localhost:9092
auto-create-topics: false
```
```java
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;
@Service
@EnableBinding(KafkaProcessor.class) // Указывает на интерфейс, с которым связывается этот сервис
public class KafkaConsumerService {
// Метод будет слушать сообщения из указанного канала
@StreamListener("input")
public void handle(@Payload String message) {
System.out.println("Received message: " + message);
}
}
```
```java
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
public interface KafkaProcessor {
@Input("input") // Имя канала, которое мы используем в application.yml
SubscribableChannel input();
}
```
**At least once**
```yaml
spring:
cloud:
stream:
bindings:
input:
destination: my-topic
group: my-consumer-group
content-type: application/json
consumer:
ackMode: manual # Ручное подтверждение
maxAttempts: 3 # Максимальное количество попыток
```
```java
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Component;
@Component
@EnableBinding(Sink.class) // Sink - это интерфейс, предоставляющий Binding для входных сообщений
public class AtLeastOnceConsumer {
@StreamListener(Sink.INPUT)
public void handleMessage(Message<String> message, @Header(name = "kafka_offset") String offset) {
// Обработка сообщения
System.out.println("Received message: " + message.getPayload());
// После успешной обработки подтверждаем сообщение
// Spring Cloud Stream автоматически подтвердит сообщение после завершения метода
// благодаря ackMode=manual и настроенному acknowledgment
}
}
```
**At most once**
```yaml
spring:
cloud:
stream:
bindings:
input:
destination: my-topic
group: my-consumer-group
content-type: application/json
consumer:
ackMode: batch # Автоматическое подтверждение после пакета сообщений
```
```java
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
@Component
@EnableBinding(Sink.class)
public class AtMostOnceConsumer {
@StreamListener(Sink.INPUT)
public void handleMessage(Message<String> message) {
// Обработка сообщения
System.out.println("Received message: " + message.getPayload());
// Смещение будет автоматически зафиксировано после получения сообщения
}
}
```
**Mostly Once**
Это гибридный режим, который стремится быть чем-то средним между At least once и At most once. Он предполагает, что сообщения
будут доставлены обычно один раз, но иногда, в случае сбоев, может быть обработано больше одного раза. Для реализации
такого режима в Spring Cloud Stream потребуется дополнительная логика, например, фильтрация дублированных сообщений или
использование уникальных идентификаторов сообщений.
В рамках Spring Cloud Stream, можно обработать Mostly Once с использованием уникальных идентификаторов сообщений или
кеширования состояния, чтобы отфильтровать повторно обработанные сообщения.
```yaml
spring:
cloud:
stream:
bindings:
input:
destination: my-topic
group: my-consumer-group
content-type: application/json
consumer:
ackMode: manual # Ручное подтверждение
maxAttempts: 3 # Максимальное количество попыток
```
```java
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Component;
import java.util.HashSet;
import java.util.Set;
@Component
@EnableBinding(Sink.class)
public class MostlyOnceConsumer {
private Set<String> processedMessageIds = new HashSet<>();
@StreamListener(Sink.INPUT)
public void handleMessage(Message<String> message, @Header("messageId") String messageId) {
if (processedMessageIds.contains(messageId)) {
System.out.println("Duplicate message: " + messageId);
return; // Пропускаем дублированное сообщение
}
// Обработка сообщения
System.out.println("Received message: " + message.getPayload());
// Добавляем идентификатор в обработанные
processedMessageIds.add(messageId);
// После успешной обработки подтверждаем сообщение вручную
// Spring Cloud Stream подтвердит сообщение после выполнения метода
}
}
```
[к оглавлению](#apache-kafka)
## Основные API Kafka
* Producer API
* Consumer API
* Streams API
* Connector API
[к оглавлению](#apache-kafka)
## Какова роль Producer API?
Используется для публикации потока сообщений в топики Kafka. Он управляет партицированием сообщений, сжатием и балансировкой
нагрузки между несколькими брокерами. Продюсер также отвечает за повторные неудачные попытки публикации и может быть
настроен на различные уровни гарантий доставки.
[к оглавлению](#apache-kafka)
## Какова роль Consumer API?
Обеспечивает механизм для потребления сообщений топиков. Оно позволяет приложениям и микросервисам читать данные,
поступающие в Kafka, и обрабатывать их для дальнейшего использования, будь то хранение, анализ или реактивная обработка.
[к оглавлению](#apache-kafka)
## Какова роль Connector API?
Connector API в Apache Kafka является частью Kafka Connect, которая представляет собой инфраструктуру для интеграции
внешних систем с Kafka. Connector API играет ключевую роль в упрощении процесса подключения различных источников данных
и систем-приемников к Kafka, предоставляя возможность автоматического перемещения данных между ними.
[к оглавлению](#apache-kafka)
## Какова роль Streams API?
Это компонент Apache Kafka, предназначенный для создания приложений и микросервисов, которые обрабатывают потоки данных
в реальном времени. Его основная роль заключается в том, чтобы позволить разработчикам легко обрабатывать и анализировать
данные, поступающие в виде непрерывных потоков из топиков. Kafka Streams API предоставляет высокоуровневый интерфейс для
выполнения таких операций, как фильтрация, агрегация, объединение данных и вычисление оконных функций.
[к оглавлению](#apache-kafka)
## Какова роль Transactions API?
Kafka Transactions API позволяет выполнять атомарные обновления для нескольких топиков. Он включает exactly-once
гарантию для приложений, которые читают данные из одного топика и пишут в другой. Это особенно полезно для приложений потоковой
обработки, которым необходимо гарантировать, что каждое входное событие влияет на выходные данные ровно один раз, даже в случае сбоев.
[к оглавлению](#apache-kafka)
## Какова роль Quota API?
Quota API позволяет настраивать квоты для каждого клиента для ограничения скорости создания или потребления данных, чтобы
один клиент не потреблял слишком много ресурсов брокера. Это помогает обеспечить справедливое распределение ресурсов и
предотвратить сценарии отказа в обслуживании.
[к оглавлению](#apache-kafka)
## Какова роль AdminClient API?
AdminClient API предоставляет операции для управления топиками, брокерами, конфигурацией и другими объектами Kafka.
Его можно использовать для создания, удаления и описания топиков, управления списками ACL, получения информации о кластере и
программного выполнения других административных задач.
[к оглавлению](#apache-kafka)
## Kafka Consumer
## Для чего нужен координатор группы?
Координатор группы отвечает за управление группами потребителей. Он управляет членством в группах потребителей, назначает
партиции потребителям внутри группы и управляет фиксацией смещения. Когда потребитель присоединяется к группе или покидает ее,
координатор группы запускает перебалансировку для переназначения партиций среди оставшихся потребителей.
[к оглавлению](#apache-kafka)
## Для чего нужен Consumer heartbeat thread?
Consumer Heartbeat Thread отвечает за отправку периодических сигналов брокеру Kafka (в частности, координатору группы).
Эти сигналы указывают на то, что потребитель жив и все еще является частью группы потребителей. Если потребитель не отправляет
данные сигналы в течение настроенного периода, он считается неживым, и координатор группы инициирует перебалансировку
для переназначения его партиций другим потребителям в группе.
[к оглавлению](#apache-kafka)
## Как Kafka обрабатывает сообщения?
Kafka поддерживает два основных способа обработки сообщений:
* **Queue**: каждое сообщение обрабатывается одним потребителем в группе потребителей. Это достигается за счет наличия
в группе нескольких потребителей, каждый из которых считывает данные из отдельных партиций.
* **Publish-Subscribe**: все сообщения обрабатываются всеми потребителями. Это достигается за счет того, что каждый
потребитель находится в своей собственной группе потребителей, что позволяет всем потребителям читать все сообщения.
[к оглавлению](#apache-kafka)
## Как Kafka обрабатывает задержку консюмера?
Задержка (лаг) консюмера в Kafka относится к разнице между оффсетом последнего созданного сообщения и оффсетом последнего
полученного сообщения. Kafka предоставляет инструменты и API для мониторинга задержек консюмеров такие, как инструмент
командной строки Kafka Consumer Groups и API AdminClient. Высокая задержка консюмеров может указывать на проблемы с
производительностью или недостаточную пропускную способность консюмеров. Kafka не обрабатывает задержки автоматически,
но предоставляет информацию, необходимую приложениям для принятия решений о масштабировании или оптимизации производительности.
[к оглавлению](#apache-kafka)
## Для чего нужны методы subscribe() и poll()?
Метод subscribe() используется для подписки на один или несколько топиков. Фактически он не извлекает никаких данных.
Метод poll(), с другой стороны, используется для извлечения данных из топиков. Он возвращает записи, которые были опубликованы
с момента последнего запроса топиков и партиций. Метод poll() обычно вызывается в цикле для непрерывного получения данных.
[к оглавлению](#apache-kafka)
## Для чего нужен метод position()?
Метод position() возвращает смещение следующей записи, которая будет извлечена для данной партиции. Это полезно для
отслеживания хода получения данных и может использоваться в сочетании с методом committed(), чтобы определить насколько
сильно потребитель отстал от своего последнего комита оффсета. Эта информация может быть ценной для мониторинга и
управления показателями потребителей.
[к оглавлению](#apache-kafka)
## Для чего нужны методы commitSync() и commitAsync()?
Эти методы используются для фиксации смещений:
* **commitSync()**: синхронно фиксирует последнее смещение, возвращенное poll(). Он будет повторять попытку до тех пор,
пока не завершится успешно или не столкнется с непроверяемой ошибкой.
* **commitAsync()**: асинхронно фиксирует смещения. Он не повторяет попытку при сбое, что делает его более быстрым,
но менее надежным, чем commitSync(). Выбор между этими методами зависит от баланса между производительностью и надежностью,
требуемого приложением.
[к оглавлению](#apache-kafka)
## Другие вопросы
## Для чего нужен идемпотентный продюсер?
Идемпотентный продюсер гарантирует exactly-once гарантию доставки, предотвращая дублирование записей в Kafka в случае
повторных попыток отправки сообщений. Это важно для поддержания целостности данных и правильности их обработки в системе,
особенно в распределенных системах, где могут возникать ошибки связи или сбои.
[к оглавлению](#apache-kafka)
## Для чего нужен интерфейс Partitioner?
Интерфейс Partitioner в Producer API определяет в какую партицию топика будет отправлено сообщение. Partitioner по-умолчанию
использует хэш ключа (если он присутствует) для выбора партиции, гарантируя, что сообщения с одним и тем же ключом всегда
отправляются в одну и ту же партицию. Могут быть реализованы пользовательские Partitioner для управления распределением
сообщений по партициям на основе определенной бизнес-логики или характеристик данных.
[к оглавлению](#apache-kafka)
## Для чего нужен Broker log cleaner thread?
Поток очистки журнала в Kafka отвечает за выполнение сжатия журнала. Сжатие журнала - это механизм, при котором Kafka
удаляет избыточные записи, сохраняя только последнее значение для каждого ключа. Это полезно в тех случаях, когда требуется
только последнее обновление для данного ключа, например, для обслуживания changelog или состояния БД. Программа очистки журналов
периодически запускается для сжатия соответствующих партиций.
[к оглавлению](#apache-kafka)
## Для чего нужен Kafka Mirror Maker?
Это инструмент, позволяющий реплицировать данные между кластерами Kafka, потенциально находящихся в разных дата-центрах.
Он работает, потребляя данные из одного кластера и передавая в другой. Можно использовать для создания резервной копии данных,
объединения данных из нескольких дата-центров в единое хранилище или для переноса данных между кластерами.
[к оглавлению](#apache-kafka)
## Для чего нужна Schema Registry?
Kafka Schema Registry предоставляет RESTful интерфейс для хранения и извлечения схем Avro. Schema Registry используется
совместно с Kafka для обеспечения совместимости схем данных между производителями и потребителями. Это особенно полезно
при разработке моделей данных с течением времени, сохраняя обратную и прямую совместимость.
[к оглавлению](#apache-kafka)
## Для чего нужен Streams DSL?
Kafka Streams DSL предоставляет высокоуровневый API для операций потоковой обработки. Он позволяет разработчикам описывать
сложную логику обработки, такую как фильтрация, преобразование, агрегирование и объединение потоков данных. DSL абстрагирует
многие низкоуровневые детали потоковой обработки, упрощая создание и обслуживание приложений потоковой обработки.
[к оглавлению](#apache-kafka)
## Как Kafka обеспечивает версионирование сообщений?
Сама по себе Kafka не обеспечивает версионирование сообщений напрямую, но предоставляет механизмы, позволяющие реализовывать
управление версиями. Одним из распространенных подходов является включение поля версии в схему сообщения. Для более сложных задач
управления версиями используются реестры схем (например, Confluent Schema Registry), которые могут управлять изменением схемы и совместимостью.
[к оглавлению](#apache-kafka)
## Как потребители получают сообщения от брокера?
Kafka использует pull-модель для извлечения сообщений. Потребители запрашивают сообщения у брокеров, а не брокеры
отправляют сообщения потребителям. Это позволяет потребителям контролировать скорость, с которой они получают сообщения.
Потребители отправляют запросы на получение данных от брокера, указывая топик, партицию и начальное смещение для каждой партиции.
Брокер отвечает сообщениями с объемом до указанного максимального предела в байтах.
[к оглавлению](#apache-kafka)
## В чем разница между Kafka Streams и Apache Flink?
Kafka Streams и Apache Flink — это два мощных инструмента для обработки потоков данных в режиме реального времени, но
они различаются по архитектуре, возможностям и сценариям применения.
### Сравнение Kafka Streams и Apache Flink
| **Критерий** | **Kafka Streams** | **Apache Flink** |
|------------------------|---------------------------------------------|-----------------------------------------|
| **Архитектура** | Встроенная библиотека, работающая внутри приложения. Зависит от Kafka. | Независимая распределенная система потоковой обработки данных с возможностью интеграции с различными источниками и приемниками данных. |
| **Обработка данных** | Обрабатывает потоки событий непосредственно из Kafka. Подходит для обработки событий и транзакционных данных с минимальной задержкой. | Поддерживает как потоковую (streaming), так и пакетную (batch) обработку данных. Специализируется на сложной обработке событий с гибкими возможностями управления состоянием. |
| **Зависимость от Kafka**| Построена исключительно вокруг Kafka. Требует Kafka для получения и отправки данных. | Работает с широким спектром источников данных (Kafka, HDFS, базы данных и т. д.). Kafka — лишь один из многих источников. |
| **Установка** | Легко интегрируется в существующее Java/Scala-приложение как библиотека. Не требует развертывания кластеров. | Требует отдельного кластера для выполнения, что подходит для высокопроизводительных распределенных систем. |
| **Управление состоянием** | Встроенное состояние с использованием RocksDB, также поддержка репликации состояния. | Имеет развитую систему управления состоянием, поддерживает сложные функции восстановления состояния и обработки данных. |
| **Гарантия доставки** | Поддерживает "at-least-once" и "exactly-once" семантику, когда Kafka настроена соответствующим образом. | Имеет гибкие гарантии доставки: поддержка "exactly-once", "at-least-once" и "at-most-once". |
| **Масштабируемость** | Масштабируется автоматически вместе с Kafka-партициями. Каждая инстанция потребителя Kafka обрабатывает свою партицию. | Поддерживает масштабирование на уровне задач (task), с более гибкой моделью масштабирования и управления ресурсами. |
| **Обработка событий** | Подходит для обработки событий с низкой задержкой и транзакционными требованиями. | Специализируется на сложной обработке событий, таких как windowing, агрегирование и работа с изменяющимся состоянием. Поддерживает сложные аналитические операции. |
| **Инструменты и API** | Легковесная библиотека с простыми API для работы с потоками данных. Основные операции — фильтрация, маппинг, объединение потоков, windowing. | Продвинутая система с богатыми API для сложных вычислений, поддерживающая потоковую и пакетную обработку, обработку событий и контроль сложных бизнес-процессов. |
| **Требования к ресурсам**| Менее ресурсоемка, так как не требует отдельного кластера. Работает в рамках JVM-приложения. | Требует более высоких вычислительных ресурсов, так как выполняется на отдельном кластере и поддерживает высокую степень параллелизма. |
### Когда выбрать Kafka Streams
- Если вы уже используете Kafka и вам нужна легковесная библиотека для обработки данных непосредственно внутри вашего приложения.
- Для сценариев с низкой задержкой, где данные приходят из Kafka и должны быть быстро обработаны с минимальными накладными расходами.
- Если вам нужно встроить обработку потоков данных в существующую Java/Scala программу без необходимости развертывания отдельных кластеров.
### Когда выбрать Apache Flink
- Если вы работаете с потоковой и пакетной обработкой данных, где источники и приемники могут быть не только Kafka, но и другие системы (например, HDFS, базы данных).
- Для сложных задач обработки событий, требующих управления состоянием, временных окон, аналитики и восстановления после сбоев.
- Если ваш проект требует высокой производительности, гибкости, точных гарантий доставки и распределенной обработки в кластере.
### Заключение
- **Kafka Streams** — это идеальный выбор, если ваша инфраструктура уже основана на Kafka, и вам нужна быстрая и легковесная обработка потоков данных.
- **Apache Flink** — это мощный инструмент для сложных аналитических задач, потоковой обработки данных в режиме реального
времени с поддержкой сложных схем обработки, который предоставляет больше возможностей для работы с разнообразными источниками данных.
[к оглавлению](#apache-kafka)
## В чем разница между Kafka Consumer и Kafka Stream?
**Kafka Consumer** - это клиент, который читает данные из топика и производит некоторую обработку. Обычно используется для
простых сценариев получения данных. **Kafka Stream**, с другой стороны, более подвинутый клиент, который может потреблять,
обрабатывать и класть данные обратно в Kafka. Он предоставляет DSL для сложных операций потоковой обработки, таких как
фильтрация, преобразование, агрегирование и объединение потоков.
[к оглавлению](#apache-kafka)
## В чем разница между Kafka и Flume?
**Apache Kafka** и **Apache Flume** — это два популярных инструмента для обработки и передачи данных, однако они имеют
разные цели и архитектуры. Вот основные различия между ними:
### 1. **Назначение и использование**
- **Kafka**: Это распределенная платформа для потоковой передачи данных, которая обеспечивает высокую пропускную способность
и низкую задержку для обработки больших объемов данных. Kafka используется для создания стриминговых приложений и обработки
данных в реальном времени. Она может быть использована для передачи логов, событий, метрик и других данных, требующих
высокой доступности и масштабируемости.
- **Flume**: Это распределенная система для сбора, агрегации и передачи логов и событий. Flume обычно используется для
доставки логов с серверов в HDFS, HBase или другие системы хранения данных. Его основное предназначение — это сбор данных
из различных источников (например, лог-файлов) и передача их в системы хранения или аналитики.
### 2. **Архитектура**
- **Kafka**: В Kafka данные отправляются в топики и партиции, которые могут быть независимо прочитаны несколькими потребителями.
Kafka ориентирована на высокую пропускную способность и масштабируемость. Это решает задачу обработки потоковых данных и
событий в реальном времени.
- **Flume**: Flume состоит из **источников (sources)**, **каналов (channels)** и **приемников (sinks)**. Источник получает
данные, канал их буферизует, а приемник отправляет их в конечную систему. Flume использует систему "event-based" и часто
применяется для сбора логов.
### 3. **Хранение данных**
- **Kafka**: Kafka сохраняет сообщения на диске в течение длительного времени (по умолчанию — до 7 дней) в топиках.
Потребители могут читать данные в любой момент времени, и Kafka поддерживает концепцию **сохранения и ретрансляции данных**.
- **Flume**: Flume не имеет встроенного механизма долговременного хранения. Он просто передает данные в назначенные места
хранения (например, HDFS). Данные в Flume не сохраняются долго, и если система хранения не доступна, они теряются.
### 4. **Производительность**
- **Kafka**: Kafka предназначен для работы с высокими объемами данных. Он поддерживает масштабируемость как по производителям,
так и по потребителям, и может обрабатывать миллионы сообщений в секунду с минимальной задержкой.
- **Flume**: Flume может быть менее масштабируемым по сравнению с Kafka и больше ориентирован на сбор логов и событий
с различных источников. Хотя Flume тоже может обрабатывать большие объемы данных, он не предназначен для работы с
такими большими потоками, как Kafka.
### 5. **Использование и кейсы**
- **Kafka**: Используется для стриминга данных, аналитики в реальном времени, интеграции различных систем, работы с
большими данными и построения событийных приложений.
- **Flume**: Используется для сбора, агрегации и передачи логов и событий в системы хранения, такие как HDFS, HBase,
или внешние системы. Это идеальный выбор для организации потоков логирования и мониторинга.
### 6. **Поддержка и интеграция**
- **Kafka**: Kafka поддерживает широкий спектр интеграций и может быть использован с различными системами для построения
распределенных приложений и аналитических решений.
- **Flume**: Flume ориентирован на интеграцию с Hadoop-экосистемой, и основное его использование — это интеграция с HDFS,
HBase и другими хранилищами данных в этой экосистеме.
### 7. **Потребительская модель**
- **Kafka**: Kafka поддерживает много потребителей, которые могут читать из одного и того же топика независимо, а также
возможность **повторного прочтения данных**.
- **Flume**: Flume имеет фиксированную схему доставки данных и не поддерживает такую гибкость, как Kafka в части потребителей и обработки.
### 8. **Гарантии доставки**
- **Kafka**: Kafka поддерживает **гарантии доставки** с различными уровнями подтверждения (acknowledgment), а также может
обеспечивать **доставку сообщений точно один раз** (exactly-once semantics).
- **Flume**: Flume обеспечивает базовые гарантии доставки, но они менее строгие, чем у Kafka, и больше ориентированы на
устойчивость к сбоям, а не на гарантированную доставку.
[к оглавлению](#apache-kafka)
## В чем разница между Kafka и RabbitMQ?
**RabbitMQ** и **Apache Kafka** — это две популярные системы обмена сообщениями, каждая из которых имеет свои особенности
и используется для разных типов приложений. Вот основные различия между ними:
### 1. **Архитектура**
- **RabbitMQ** использует **очереди сообщений**. Сообщения отправляются в очередь, и один потребитель извлекает сообщение
из очереди для обработки.
- **Apache Kafka** использует **топики и партиции**. Сообщения отправляются в топики, которые могут быть разделены на
партиции, и несколько потребителей могут читать эти сообщения в любом порядке. Kafka ориентирован на большие потоки данных и масштабируемость.
### 2. **Модель доставки сообщений**
- **RabbitMQ**: Сообщения передаются в очереди, и каждый потребитель получает одно сообщение. Сообщения могут быть
подтверждены (acknowledged) или отклонены (rejected). RabbitMQ гарантирует, что сообщение будет доставлено хотя бы одному потребителю.
- **Kafka**: Сообщения сохраняются в топиках на длительный срок, и потребители могут читать их в любом порядке. Kafka
гарантирует доставку сообщений всем потребителям, если они подписаны на топик, и может позволить многократное чтение старых сообщений.
### 3. **Гарантии доставки**
- **RabbitMQ**: Предоставляет подтверждения доставки и может повторно отправить сообщение, если потребитель не подтвердил
его получение. Можно настроить разные уровни надежности (например, за счет использования подтверждений или транзакций).
- **Kafka**: Сообщения сохраняются на диске, что позволяет потребителям считывать их в любое время. Kafka гарантирует
доставку сообщений при определенной конфигурации репликации и сохранения.
### 4. **Производительность и масштабируемость**
- **RabbitMQ**: Лучше подходит для небольших и средних систем, где требуется высокая надежность и гарантированная доставка.
Он поддерживает **горизонтальное масштабирование**, но требует дополнительных усилий для настройки и управления.
- **Kafka**: Отличается высокой **производительностью** и возможностью обработки больших объемов данных. Kafka легко
масштабируется за счет **партиционирования** и репликации данных.
### 5. **Потребительская модель**
- **RabbitMQ**: Один потребитель получает одно сообщение. Если потребитель не успевает обработать сообщение, оно может быть повторно отправлено.
- **Kafka**: Потребители могут читать сообщения независимо друг от друга. Kafka сохраняет все сообщения в топиках,
и потребители могут читать их в любое время. Kafka также поддерживает концепцию **групп потребителей**, где каждый
потребитель группы обрабатывает разные партиции.
### 6. **Использование и кейсы**
- **RabbitMQ**: Идеален для обработки запросов и ответов, распределенных приложений, микросервисов с гарантией доставки,
бизнес-процессов с очередями задач.
- **Kafka**: Используется для обработки потоков данных, интеграции с большими данными, записи журналов, мониторинга,
обработки событий в реальном времени и сохранения больших объемов данных для последующего анализа.
### 7. **Производители и потребители**
- **RabbitMQ**: Один производитель отправляет сообщения в очередь, и несколько потребителей могут обрабатывать эти сообщения.
- **Kafka**: Множество производителей могут отправлять сообщения в топики, и несколько потребителей могут читать их
одновременно, поддерживая масштабируемость.
### 8. **Сообщения и хранение**
- **RabbitMQ**: Сообщения удаляются из очереди после их обработки потребителем. Хранение сообщений обычно краткосрочное.
- **Kafka**: Сообщения сохраняются на диске в топиках до тех пор, пока не истечет срок хранения (по конфигурации).
Это позволяет повторно читать данные.
[к оглавлению](#apache-kafka)