Add consumer chapter
|
@ -43,7 +43,7 @@
|
|||
- [Создание топика](docs/guide/003-topics-and-partitions.md)
|
||||
- [Работа продюсера](docs/guide/004-producers.md)
|
||||
- [Работа консумера](docs/guide/005-consumers.md)
|
||||
- [Наблюдаемость за Kafka](docs/guide/006-observability.md)
|
||||
- [Наблюдаемость Kafka](docs/guide/006-observability.md)
|
||||
|
||||
## Бонус-трек
|
||||
|
||||
|
|
|
@ -1,3 +1,138 @@
|
|||
# Работа с консумерами
|
||||
|
||||
TBD
|
||||
## Запуск консумеров
|
||||
|
||||
Мы также подготовили консумер на Go: программа читает топик и распечатывает сообщение на экран. В тестовом кластере сейчас работает сразу три консумера — все они представляют собой один и тот же экземпляр запущенной программы.
|
||||
|
||||
Посмотрите пример прочитанных событий:
|
||||
|
||||
```bash
|
||||
docker-compose --profile app logs -f consumer-1 consumer-2 consumer-3
|
||||
```
|
||||
|
||||
```
|
||||
consumer-3 | 2023/02/26 09:27:17 message at topic/partition/offset example/2/2597: 5 = {"id":5,"lat":70.305836,"lon":-82.353797}
|
||||
consumer-2 | 2023/02/26 09:27:17 message at topic/partition/offset example/5/1446: 6 = {"id":6,"lat":-68.561725,"lon":41.288281}
|
||||
consumer-2 | 2023/02/26 09:27:17 message at topic/partition/offset example/5/1447: 6 = {"id":6,"lat":-18.154029,"lon":-62.287865}
|
||||
consumer-1 | 2023/02/26 09:27:17 message at topic/partition/offset example/4/1421: 1 = {"id":1,"lat":-50.646267,"lon":10.077705}
|
||||
consumer-1 | 2023/02/26 09:27:17 message at topic/partition/offset example/4/1422: 1 = {"id":1,"lat":-47.339896,"lon":-33.38711}
|
||||
```
|
||||
|
||||
Обратите внимание, что каждый консумер читает только события, связанные соответствующим ключом. Это возможно благодаря партицированию по хешу со стороны продюсера.
|
||||
|
||||
Откройте исходный [код консумера](../../examples/consumer/main.go) и посмотрите описание `kafka.Reader`. Программа подключается к брокерам, поллит данные в соответствии с выбранной минимальной величиной байт для одной пачки данных (`MinBytes`) и максимальным размером сообщения (`MaxBytes`).
|
||||
|
||||
```go
|
||||
r := kafka.NewReader(kafka.ReaderConfig{
|
||||
Brokers: addrs,
|
||||
GroupID: group,
|
||||
Topic: topic,
|
||||
MinBytes: 10e2, // 1KB
|
||||
MaxBytes: 10e6, // 10MB
|
||||
})
|
||||
```
|
||||
|
||||
Само чтение же представляет собой бесконечный цикл:
|
||||
- Чтение пачки данных
|
||||
- Обработка
|
||||
- Коммит оффсета
|
||||
|
||||
В представленном примере чтение данных и коммит оффсета совмещены в функции `ReadMessage` и выполняются неявно (implicit).
|
||||
|
||||
```go
|
||||
for {
|
||||
m, err := r.ReadMessage(context.Background())
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
log.Print(fmt.Sprintf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value)))
|
||||
}
|
||||
```
|
||||
|
||||
Однако обе эти операции могут быть выполнены явно (explicit). Например, сначала вы считываете порцию данных, обрабатываете её, а уже в самом конце — коммитите оффсет, перемещаясь к следующей порции записей. Например:
|
||||
|
||||
```go
|
||||
ctx := context.Background()
|
||||
for {
|
||||
m, err := r.FetchMessage(ctx)
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
log.Print(fmt.Sprintf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value)))
|
||||
if err := r.CommitMessages(ctx, m); err != nil {
|
||||
log.Fatal("failed to commit messages:", err)
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
Для простоты оба примера иллюстрируют синхронную работу консумера, но для улучшения производительности операции могут быть выполнены и асинхронно.
|
||||
|
||||
## Управление консумер-группой
|
||||
|
||||
Посмотрим на консумер-группу детальнее. Откройте веб-интерфейс [Redpanda Console](http://localhost:8080/groups/example-consumer-group), перейдите на вкладку "Consumer groups" и откройте профиль консумер-группы `example-consumer-group`.
|
||||
|
||||
![Redpanda Console: Open consumer group profile page](./assets/005-open-consumer-group-profile-page.png)
|
||||
|
||||
На странице видно, что в группу `example-consumer-group` входят три консумера, каждый из которых определяется уникальным именем (Assigned member) и адресом узла (Host). Важно, что каждый консумер в группе читает одну или несколько партиций, однако одна партиция может быть прочитана только одним консумером в текущей группе.
|
||||
|
||||
Каждый консумер в группе характеризуется текущим оффсетом (Group Offset) относительно конечного оффсета партиции (Log End Offset). Расстояние между конечным оффсетом партиции и текущим оффсетом группы является лагом (Lag).
|
||||
|
||||
Вы можете управлять положением оффсета консумер-группы на уровне брокера. Например, если вам требуется прочитать сообщения за пределами текущего оффсета, вы можете переместить его на позицию в любое другое место.
|
||||
|
||||
Однако для этого важно, чтобы консумер-группа была **пуста**: то есть, в ней не было **ни одного** участника.
|
||||
|
||||
Остановите имеющиеся консумеры.
|
||||
|
||||
```bash
|
||||
docker-compose --profile app stop consumer-1 consumer-2 consumer-3
|
||||
```
|
||||
|
||||
Обновите страницу [Redpanda Console](http://localhost:8080/groups/example-consumer-group).
|
||||
|
||||
![Redpanda Console: Empty consumer group](./assets/005-empty-consumer-group.png)
|
||||
|
||||
Консумер-группа пуста (Empty). Теперь вы можете отредактировать её (Edit group). Переключите оффсет для всей группы, «отмотав» его на 5 минут назад.
|
||||
|
||||
![Redpanda Console: Edit consumer group](./assets/005-edit-consumer-group.png)
|
||||
|
||||
Подтвердите изменение.
|
||||
|
||||
![Redpanda Console: Edit consumer group](./assets/005-edit-consumer-group-confirmation.png)
|
||||
|
||||
Готово. Обратите внимание на значения лага партиций.
|
||||
|
||||
![Redpanda Console: Consumer group with lag](./assets/005-open-consumer-group-with-lag-profile-page.png)
|
||||
|
||||
Теперь, когда мы запустим консумеры, они вновь вернутся к чтению с обозначенных позиций. Запустите остановленные консумеры:
|
||||
|
||||
```bash
|
||||
docker-compose --profile app start consumer-1 consumer-2 consumer-3
|
||||
```
|
||||
|
||||
Посмотрите на лог консумер-групп и убедитесь, что сообщения были прочитаны повторно.
|
||||
|
||||
```bash
|
||||
docker-compose --profile app logs -f consumer-1 consumer-2 consumer-3
|
||||
```
|
||||
|
||||
## Ребалансировка консумер-группы
|
||||
|
||||
Любая смена композиции в консумер-группе (например, добавление или выход участника из группы, а также изменение числа партиций) приводит к её ребалансировке — процессу перераспределения партиций между «живыми» участниками. Сейчас в консумер-группе три консумера, каждый из которых читает две партиции.
|
||||
|
||||
Остановите один из работающих консумеров, перейдите на страницу консумер-группы в [Redpanda Console](http://localhost:8080/groups/example-consumer-group).
|
||||
|
||||
```bash
|
||||
docker-compose --profile app stop consumer-1
|
||||
```
|
||||
|
||||
Сначала никаких изменений не будет, а после, на короткое время, группа перейдёт в состояние `PreparingRebalance`.
|
||||
|
||||
![Redpanda Console: Consumer group is in preparing rebalance state](./assets/005-consumer-group-preparing-rebalance.png)
|
||||
|
||||
В этот момент все консумеры прекращают чтение и дожидаются синхронизации всех имеющихся участников в группе. После окончания ребалансировки, партиции перераспределяются между участниками и консумеры продолжают чтение с сохранённых позиций.
|
||||
|
||||
![Redpanda Console: Consumer group after rebalance](./assets/005-consumer-group-after-rebalance.png)
|
||||
|
||||
Как можно заметить, теперь партиции распределились между двумя консумерами: по три на каждого участника группы.
|
||||
|
||||
✅ Готово. Теперь переходите к [изучению метрик Kafka]().
|
|
@ -0,0 +1 @@
|
|||
# Наблюдаемость Kafka
|
After Width: | Height: | Size: 396 KiB |
After Width: | Height: | Size: 279 KiB |
After Width: | Height: | Size: 137 KiB |
After Width: | Height: | Size: 152 KiB |
After Width: | Height: | Size: 278 KiB |
After Width: | Height: | Size: 392 KiB |
After Width: | Height: | Size: 281 KiB |