1
1
Fork 0

Initial commit

This commit is contained in:
Gleb Goncharov 2023-02-11 23:12:26 +04:00
commit 34c114508e
36 changed files with 5832 additions and 0 deletions

13
.env.example Normal file
View File

@ -0,0 +1,13 @@
GO_VERSION=1.20
ZK_VERSION=3.8
KAFKA_VERSION=3.3
KAFKA_EXPORTER_VERSION=latest
JMX_JAVAAGENT_VERSION=0.17.2
REDPANDA_CONSOLE_VERSION=2.1.1
GRAFANA_VERSION=latest
PROMETHEUS_VERSION=latest
JMX_JAVAAGENT_PORT=5556
GRAFANA_PORT=3000
PROMETHEUS_PORT=9090
REDPANDA_CONSOLE_PORT=8080

20
.gitignore vendored Normal file
View File

@ -0,0 +1,20 @@
# VSCode
.vscode/*
!.vscode/settings.json
!.vscode/tasks.json
!.vscode/launch.json
!.vscode/extensions.json
*.code-workspace
# Local History for Visual Studio Code
.history/
# Mac/OSX
.DS_Store
# Environments
.env
# Go binaries
examples/consumer/consumer
examples/consumer/producer

46
Dockerfile.app Normal file
View File

@ -0,0 +1,46 @@
################################################################################
ARG GO_VERSION="1.20"
ARG BUILD_TARGET="consumer"
ARG BUILD_TYPE="debug"
################################################################################
FROM docker.io/library/golang:${GO_VERSION} AS base-consumer
WORKDIR /usr/src
COPY examples/consumer/go.mod examples/consumer/go.sum ./
RUN go mod download && go mod verify
COPY examples/consumer/ .
RUN go build -v -o /usr/local/bin/app
################################################################################
FROM docker.io/library/golang:${GO_VERSION} AS base-producer
WORKDIR /usr/src
COPY examples/producer/go.mod examples/producer/go.sum ./
RUN go mod download && go mod verify
COPY examples/producer/ .
RUN go build -v -o /usr/local/bin/app
################################################################################
FROM base-${BUILD_TARGET} AS base
FROM docker.io/library/debian:11 AS app-release
COPY --from=base /usr/local/bin/app /usr/local/bin/app
FROM base AS app-debug
RUN set -eux; \
DEBIAN_FRONTEND=noninteractive apt-get update; \
DEBIAN_FRONTEND=noninteractive apt-get install -y -q tmux vim telnet; \
rm -rf /var/cache/apt;
################################################################################
FROM app-${BUILD_TYPE} AS app
WORKDIR /
CMD ["app"]

10
Dockerfile.kafka Normal file
View File

@ -0,0 +1,10 @@
ARG KAFKA_VERSION="3.3"
FROM docker.io/bitnami/kafka:${KAFKA_VERSION}
ARG JMX_JAVAAGENT_VERSION="0.17.2"
RUN set -eux; \
curl \
https://repo1.maven.org/maven2/io/prometheus/jmx/jmx_prometheus_javaagent/${JMX_JAVAAGENT_VERSION}/jmx_prometheus_javaagent-${JMX_JAVAAGENT_VERSION}.jar \
-o /opt/bitnami/kafka/libs/jmx_prometheus_javaagent-${JMX_JAVAAGENT_VERSION}.jar && \
chmod 664 /opt/bitnami/kafka/libs/jmx_prometheus_javaagent-${JMX_JAVAAGENT_VERSION}.jar

7
LICENSE.md Normal file
View File

@ -0,0 +1,7 @@
Copyright 2033 Gleb Goncharov
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

56
README.md Normal file
View File

@ -0,0 +1,56 @@
# Интенсив «Kafka за 90 минут»
[![Sponsored by SberMarket Tech](images/sbermarket-tech-logo.svg)](https://sbermarket.ru)
Интенсив «Kafka за 90 минут» состоит из двух частей: теории и практики. Теория поможет составить ментальную модель Kafka, а практика — попробовать инструмент в действии и получить набор готовых конфигураций для применения их в своих лабораторных и тестовых средах на работе.
## Теория
### Содержание
* Расскажем о сценариях использования Kafka.
* Узнаем, что такое консумер, продюсер и брокер.
* Разберём, как связаны топики, партиции и сегменты.
* Поговорим о формате сообщений в Kafka.
* Расскажем о лидере партиций, репликации данных и партицировании.
* Поговорим о гарантиях доставки сообщений и идемпотентности.
* Выясним, что такое консумер-группа и ребалансировка консумеров в ней.
**Длительность**: 45 минут
### Материал
- [Презентация](docs/theory/slides.pdf)
- [Расшифровка](docs/theory/speaker-notes.md)
## Практика
### Содержание
* Склонируем репозиторий с конфигурацией Docker Compose.
* Подберём конфигурации топиков и создадим их.
* Настроим и запустим продюсер.
* Настроим и запустим консумер.
* Изменим оффсет для консумер-группы.
* Посмотрим на основные показатели в Grafana.
**Длительность**: 30 минут
### Материал
- [Перед началом](docs/guide/001-intro.md)
- [Запуск кластера](docs/guide/002-getting-started.md)
- [Создание топика](docs/guide/003-topics-and-partitions.md)
- [Работа продюсера](docs/guide/004-producers.md)
- [Работа консумера](docs/guide/005-consumers.md)
- [Наблюдаемость за Kafka](docs/guide/006-observability.md)
## Бонус-трек
### Материалы
* [Kafka: настройка клиента](docs/cheatsheet/000-kafka-client-setup.md)
* [Kafka: чтение и запись в топик](docs/cheatsheet/001-kafka-consume-or-produce.md)
* [Kafka: управление топиками и партициями](docs/cheatsheet/002-kafka-topics-and-partitions.md)
* [Kafka: управление консумер-группами](docs/cheatsheet/003-kafka-consumer-groups.md)
* [Kafka: управление доступами к топикам](docs/cheatsheet/004-kafka-acl.md)

189
docker-compose.yml Normal file
View File

@ -0,0 +1,189 @@
version: '3.9'
################################################################################
x-app: &app
build: &app-build
context: .
dockerfile: Dockerfile.app
profiles:
- app
command:
- /bin/sh
- -c
- "while : ; do sleep 60 ; done"
restart: unless-stopped
x-consumer: &consumer
<<: *app
command:
- app
environment:
KAFKA_TOPIC: "example"
KAFKA_BROKERS: "kafka-1:9092,kafka-2:9092,kafka-3:9092"
KAFKA_CONSUMER_GROUP: "example-consumer-group"
build:
<<: *app-build
args:
BUILD_TARGET: "consumer"
x-producer: &producer
<<: *app
command:
- app
environment:
KAFKA_TOPIC: "example"
KAFKA_BROKERS: "kafka-1:9092,kafka-2:9092,kafka-3:9092"
build:
<<: *app-build
args:
BUILD_TARGET: "producer"
x-kafka: &kafka
build: &kafka-build
context: .
dockerfile: Dockerfile.kafka
args:
JMX_JAVAAGENT_VERSION: "${JMX_JAVAAGENT_VERSION}"
environment: &kafka-env
KAFKA_CFG_ZOOKEEPER_CONNECT: "zookeeper:2181"
ALLOW_PLAINTEXT_LISTENER: "yes"
KAFKA_OPTS: >-
-javaagent:/opt/bitnami/kafka/libs/jmx_prometheus_javaagent-${JMX_JAVAAGENT_VERSION}.jar=${JMX_JAVAAGENT_PORT}:/etc/kafka/prometheus/kafka.yml
volumes:
- ./jmx-exporter/kafka.yml:/etc/kafka/prometheus/kafka.yml:ro
depends_on:
- zookeeper
x-zookeeper: &zookeeper
image: docker.io/bitnami/zookeeper:${ZK_VERSION}
volumes:
- "zookeeper:/bitnami"
environment:
ALLOW_ANONYMOUS_LOGIN: "yes"
x-redpanda-console: &redpanda-console
image: docker.redpanda.com/vectorized/console:v${REDPANDA_CONSOLE_VERSION}
entrypoint: /bin/sh
command: -c 'echo "$$CONSOLE_CONFIG_FILE" > /tmp/config.yml; /app/console'
environment:
CONFIG_FILEPATH: /tmp/config.yml
CONSOLE_CONFIG_FILE: |
kafka:
brokers: ["kafka-1:9092", "kafka-2:9092", "kafka-3:9092"]
schemaRegistry:
enabled: false
urls: ["http://redpanda-0:8081"]
ports:
- ${REDPANDA_CONSOLE_PORT}:8080
depends_on:
- kafka-1
- kafka-2
- kafka-3
x-kafka-exporter: &kafka-exporter
image: docker.io/bitnami/kafka-exporter:${KAFKA_EXPORTER_VERSION}
restart: unless-stopped
command: ["--kafka.server=kafka-1:9092"]
# profiles:
# - metrics
x-prometheus: &prometheus
image: quay.io/prometheus/prometheus:${PROMETHEUS_VERSION}
volumes:
- ./prometheus/prometheus.yml:/etc/prometheus/prometheus.yml
ports:
- ${PROMETHEUS_PORT}:9090
# profiles:
# - metrics
x-grafana: &grafana
image: docker.io/grafana/grafana:${GRAFANA_VERSION}
user: root
restart: unless-stopped
ports:
- ${GRAFANA_PORT}:3000
volumes:
- grafana:/var/lib/grafana
- ./grafana/provisioning:/etc/grafana/provisioning
- ./grafana/dashboards:/var/lib/grafana/dashboards
# profiles:
# - metrics
environment:
GF_SECURITY_ADMIN_PASSWORD: admin
################################################################################
services:
# Sample consumer
consumer-1:
<<: *consumer
container_name: consumer-1
consumer-2:
<<: *consumer
container_name: consumer-2
consumer-3:
<<: *consumer
container_name: consumer-3
# Sample producer
producer:
<<: *producer
container_name: producer-1
# Zookeeper quorum
zookeeper:
<<: *zookeeper
container_name: zookeeper
# Redpanda Console UI
ui:
<<: *redpanda-console
container_name: ui
# Kafka: broker #1
kafka-1:
<<: *kafka
container_name: kafka-1
environment:
<<: *kafka-env
KAFKA_BROKER_ID: 1
# Kafka: broker #2
kafka-2:
<<: *kafka
container_name: kafka-2
environment:
<<: *kafka-env
KAFKA_BROKER_ID: 2
# Kafka: broker #3
kafka-3:
<<: *kafka
container_name: kafka-3
environment:
<<: *kafka-env
KAFKA_BROKER_ID: 3
# Grafana
grafana:
<<: *grafana
container_name: grafana
# Prometheus
prometheus:
<<: *prometheus
container_name: prometheus
# Prometheus
kafka-exporter:
<<: *kafka-exporter
container_name: kafka-exporter
################################################################################
volumes:
zookeeper:
driver: local
grafana:
driver: local

View File

@ -0,0 +1,38 @@
# Kafka: настройка клиентов
## Kafka CLI
Создайте конфигурационный файл `$HOME/config.properties`. Для SASL-аутентификации с механизмом SCRAM поверх открытого текста.
```
bootstrap.servers=broker1:9094,broker2:9094,broker3:9094
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="keepinsecret";
```
## kafkactl
Создайте конфигурационный файл `$HOME/.config/kafkactl/config.yml`.
```yaml
contexts:
dev:
brokers:
- broker-1:9092
- broker-2:9092
- broker-3:9092
kafkaversion: 2.7.2
producer:
maxmessagebytes: 1000000
partitioner: hash
requiredacks: WaitForAll
requesttimeout: 10s
sasl:
enabled: true
mechanism: scram-sha512
username: admin
password: keepinsecret
current-context: dev
```

View File

@ -0,0 +1,62 @@
# Kafka: чтение и запись
## Прочитать топик Kafka
### Redpanda Console
Для просмотра сообщений в топике Kafka, необходимо найти его в секции Topics, а далее во вкладке Messages отобразить список сообщений. При нажатии на значок "плюс" вы можете также просмотреть структуру отдельного сообщения: key, headers и value.
### Kafka CLI
Подключитесь к брокеру Kafka и введите команду, чтобы прочитать топик с самого начала.
```
bin/kafka-console-consumer.sh --consumer.config $HOME/config.properties \
--bootstrap-server $(hostname):9094 \
--topic "topic" \
--from-beginning
```
Показать в выводе время, ключ и значение.
```
bin/kafka-console-consumer.sh --consumer.config $HOME/config.properties \
--bootstrap-server $(hostname):9094 \
--topic "topic" \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.timestamp=true \
--property print.key=true \
--property print.value=true
```
Прочитать сообщения в составе консумер-группы consumer-group.
```
bin/kafka-console-consumer.sh --consumer.config $HOME/config.properties \
--bootstrap-server $(hostname):9094 \
--topic "topic" \
--group "consumer-group" \
--to-latest
```
## Записать в топик Kafka
### Kafka CLI
Подключитесь к брокеру Kafka и введите команду, чтобы записать данные в топик из stdin.
```
bin/kafka-console-producer.sh --producer.config $HOME/producer.properties \
--broker-list $(hostname):9094 \
--topic "$(hostname):9094"
```
Записать данные в формате ключ-значение с разделителем-двоеточием.
```
bin/kafka-console-producer.sh --producer.config $HOME/producer.properties \
--broker-list $(hostname):9094 \
--topic "$(hostname):9094" \
--property parse.key=true \
--property key.separator=:
```

View File

@ -0,0 +1,242 @@
# Kafka: управление топиками и партициями
## Создать топик Kafka
### Kafka CLI
Подключитесь к брокеру Kafka и выполните команду для создания топика.
```
bin/kafka-topics.sh --command-config $HOME/config.properties \
--topic "topic-name" \
--create \
--zookeeper localhost:2181/kafka \
--replication-factor 2 \
--partitions 10
```
## Изменить настройки топика Kafka
### Kafka CLI
Подключитесь к брокеру Kafka и примените конфигурацию для топика. В примере указание политики устаревания по времени (48 часов).
```
bin/kafka-topics.sh --command-config $HOME/config.properties \
--topic "topic-name" \
--alter \
--zookeeper localhost:2181/kafka \
--config retention.ms=$((48 * 60 * 60 * 1000))
```
Определить новое число партиций, например, с целью масштабирования до 4 штук.
```
bin/kafka-topics.sh --command-config $HOME/config.properties \
--topic "topic-name" \
--alter \
--zookeeper localhost:2181/kafka \
--partitions 4
```
Определить при этом брокеры для размещения партиций.
```
bin/kafka-topics.sh --command-config $HOME/config.properties \
--topic "topic-name" \
--alter \
--zookeeper localhost:2181/kafka \
--replica-assignment 0:1:2,0:1:2,0:1:2,2:1:0
--partitions 4
```
## Распечатать список топиков Kafka
### Kafka CLI
Подключитесь к брокеру Kafka и выполните следующую команду, чтобы увидеть список топиков со назначенной ему конфигурацией.
```
bin/kafka-topics.sh --command-config $HOME/config.properties \
--zookeeper localhost:2181/kafka \
--describe \
--topics-with-overrides
```
Без системных топиков (например, `__consumer_offsets`).
```
bin/kafka-topics.sh --command-config $HOME/config.properties \
--zookeeper localhost:2181/kafka \
--describe \
--topics-with-overrides \
--exclude-internal
```
Показать список топиков и его нереплицированные партиции.
```
bin/kafka-topics.sh --command-config $HOME/config.properties \
--zookeeper localhost:2181/kafka \
--describe \
--under-replicated-partitions
```
Показать список топиков и список партиций без активных лидеров.
```
bin/kafka-topics.sh --command-config $HOME/config.properties \
--zookeeper localhost:2181/kafka \
--describe \
--unavailable-partitions
```
## Посмотреть конфигурацию топика Kafka
### Kafka CLI
Подключитесь к брокеру Kafka и выполните следующую команду, чтобы увидеть конфигурацию топика.
```
bin/kafka-configs.sh --command-config $HOME/config.properties \
--zookeeper localhost:2181/kafka \
--describe \
--entity-type topics \
--entity-name "topic-name"
```
## Очистить топик Kafka
### Kafka CLI
В настоящий момент не существует команды, что удаляла бы сообщение в Kafka-топике. Для этого необходимо установить низкий retention, а после — вернуть значение на место. Подключитесь к брокеру Kafka и укажите retention размером в минуту (60000ms).
```
bin/kafka-configs.sh --command-config $HOME/config.properties \
--zookeeper localhost:2181/kafka
--alter \
--entity-type topics \
--entity-name "topic-name" \
--add-config retention.ms=60000
```
Подождите минуту, чтобы Kafka-брокеры успели удалить связанные сегменты данных, а после верните значение retention назад. Например, для 3 суток.
```
bin/kafka-configs.sh --command-config $HOME/config.properties \
--zookeeper localhost:2181/kafka
--alter \
--entity-type topics \
--entity-name "topic-name" \
--add-config retention.ms=$((72 * 60 * 60 * 1000))
```
## Удалить топик Kafka
Перед удалением необходимо убедиться, что в топике нет записи и нет чтения. Для этого в Redpanda UI во вкладке Consumers должно быть пусто, а во вкладке Messages не должно появляться новых сообщений.
### Kafka CLI
Подключитесь к брокеру, остановите консумер-группы и удалите топик.
```
bin/kafka-topics.sh --command-config $HOME/config.properties \
--topic "topic-name" \
--delete \
--bootstrap-server $(hostname):9094
```
## Распечатать оффсеты партиций топика Kafka
### Kafka CLI
Подключитесь к брокеру и выполните команду, чтобы увидеть список партиций в топике с оффсетами в формате `topic-name:partition-id:offset`.
```
bin/kafka-run-class.sh kafka.tools.GetOffsetShell \
--command-config $HOME/config.properties \
--broker-list $(hostname):9094 \
--topic "topic-name"
```
Показать самый крайний оффсет (latest).
```
bin/kafka-run-class.sh kafka.tools.GetOffsetShell \
--command-config $HOME/config.properties \
--broker-list $(hostname):9094 \
--topic "topic-name" \
--time -1
```
Показать самый ранний оффсет (earliest).
```
bin/kafka-run-class.sh kafka.tools.GetOffsetShell \
--command-config $HOME/config.properties \
--broker-list $(hostname):9094 \
--topic "topic-name" \
--time -2
```
Посмотреть оффсет партиции топика Kafka.
```
bin/kafka-run-class.sh kafka.tools.GetOffsetShell \
--command-config $HOME/config.properties \
--broker-list $(hostname):9094 \
--topic "topic-name" \
--partitions "partition-id1, partition-id2"
```
## Увеличить фактор репликации топика Kafka
### Kafka CLI
Подготовьте план переназначения партиции в формате JSON. В примере мы используем фактор репликации 3.
```
/tmp/reassignment.json
{
"version": 1,
"partitions": [
{ "topic": "topic-name", "partition": 0, "replicas": [0, 1, 2] },
{ "topic": "topic-name", "partition": 1, "replicas": [0, 1, 2] },
{ "topic": "topic-name", "partition": 2, "replicas": [0, 1, 2] }
]
}
```
Скопируйте конфигурацию на брокер и выполните следующую команду:
```
bin/kafka-reassign-partitions.sh --bootstrap-server $(hostname):9094 \
--command-config $HOME/config.properties \
--reassignment-json-file /tmp/reassignment.json \
--execute
```
## Перенести партиции топика Kafka на другой брокер
### Kafka CLI
Подготовьте план переназначения партиции в формате JSON. В поле replicas укажите желаемые идентификаторы брокеров.
```
/tmp/reassignment.json
{
"version": 1,
"partitions": [
{ "topic": "topic-name", "partition": 0, "replicas": [0, 1] },
{ "topic": "topic-name", "partition": 1, "replicas": [1, 2] },
{ "topic": "topic-name", "partition": 2, "replicas": [0, 2] }
]
}
```
Скопируйте конфигурацию на брокер и выполните следующую команду.
```
bin/kafka-reassign-partitions.sh --bootstrap-server $(hostname):9094 \
--command-config $HOME/config.properties \
--reassignment-json-file /tmp/reassignment.json \
--execute
```

View File

@ -0,0 +1,135 @@
# Kafka: управление консумер-группами
## Создать новую консумер-группу Kafka
Консумер-группы не требуется создавать явно. Соответствующая группа будет создана автоматически при верной настройке консумера на работу с группами.
## Распечатать список консумер-групп Kafka
### Kafka CLI
Подключитесь к брокеру Kafka и выполните команду, чтобы вывести на экран список консумер-групп.
```
bin/kafka-consumer-groups.sh --command-config $HOME/config.properties \
--bootstrap-server $(hostname):9094 \
--list
```
## Посмотреть конфигурацию консумер-группы Kafka
### Kafka CLI
Подключитесь к брокеру Kafka и выполните команду, чтобы вывести на экран конфигурацию выбранной консумер-группы и её состояния.
```
bin/kafka-consumer-groups.sh --command-config $HOME/config.properties \
--bootstrap-server $(hostname):9094 \
--group "group-name" \
--state \
--describe
```
## Изменить или удалить консумер-группы Kafka
Любое изменение состава или конфигурации консумер-группы проводится с полным выходом участников из неё. Обратите внимание, что в момент остановки и запуска приложения данные не будут прочитаны.
Для изменения:
- Остановите приложение.
- Измените консумер-группу.
- Запустите приложение.
### Kafka CLI
Переместить оффсет для консумер-группы. Например, на 1 января 2022 года на 00:00 MSK.
```
bin/kafka-consumer-groups.sh --command-config $HOME/config.properties \
--bootstrap-server $(hostname):9094 \
--topic "topic-name" \
--group "group-name" \
--reset-offsets \
--to-datetime 2022-01-01T00:00:00.000+0300 \
--execute
```
Переместить оффсет всех партиций на пять записей вперёд.
```
bin/kafka-consumer-groups.sh --command-config $HOME/config.properties \
--bootstrap-server $(hostname):9094 \
--topic "topic-name:0" \
--group "group-name" \
--reset-offsets \
--shift-to 5 \
--execute
```
Переместить оффсет партиции №0 на самый ранний (начать читать с начала).
```
bin/kafka-consumer-groups.sh --command-config $HOME/config.properties \
--bootstrap-server $(hostname):9094 \
--topic "topic-name:0" \
--group "group-name" \
--reset-offsets \
--to-earliest \
--execute
```
Переместить оффсет для партиций №3 и №4 на самый крайний (пропустить все сообщения и начать с конца).
```
bin/kafka-consumer-groups.sh --command-config $HOME/config.properties \
--bootstrap-server $(hostname):9094 \
--topic "topic-name:3,4" \
--group "group-name" \
--reset-offsets \
--to-latest \
--execute
```
Распечатать текущую позицию оффсетов консумер-группы. Предварительно необходимо остановить приложение-консумер.
```
bin/kafka-consumer-groups.sh --command-config $HOME/config.properties \
--bootstrap-server $(hostname):9094 \
--topic "topic-name" \
--group "group-name" \
--reset-offsets \
--to-current \
--export \
--execute
```
Восстановить оффсеты консумер-группы из CSV-файла offsets.csv.
```
bin/kafka-consumer-groups.sh --command-config $HOME/config.properties \
--bootstrap-server $(hostname):9094 \
--topic "topic-name" \
--group "group-name" \
--reset-offsets \
--from-file offsets.csv \
--execute
```
Удалить оффсет консумер-группы
```
bin/kafka-consumer-groups.sh --command-config $HOME/config.properties \
--bootstrap-server $(hostname):9094 \
--topic "topic-name" \
--group "group-name" \
--delete-offsets
```
Удалить консумер-группу.
```
bin/kafka-consumer-groups.sh --command-config $HOME/config.properties \
--bootstrap-server $(hostname):9094 \
--group "group-name" \
--delete
```

View File

@ -0,0 +1,121 @@
# Kafka: управление доступами к топикам
## Создать нового пользователя
### Kafka CLI
Подключитесь к брокеру и выполните команду. Укажите в параметре password случайно сгенерированный пароль.
```
bin/kafka-configs.sh --alter \
--entity-type users \
--entity-name "john-doe" \
--bootstrap-server $(hostname):9094 \
--command-config /etc/kafka/config.properties \
--add-config 'SCRAM-SHA-256=[iterations=8192,password=keepinsecret],SCRAM-SHA-512=[password=keepinsecret]'
```
## Распечатать список пользователей
### Kafka CLI
Подключитесь к брокеру и выполните команду.
```
bin/kafka-configs.sh --describe \
--entity-type users \
--bootstrap-server $(hostname):9094 \
--command-config /etc/kafka/config.properties
```
## Удалить пользователя
### Kafka CLI
Подключитесь к брокеру и выполните команду для выдачи прав на чтение топика консумеру.
```
bin/kafka-configs.sh --delete \
--entity-type users \
--entity-name "john-doe" \
--bootstrap-server $(hostname):9094 \
--command-config /etc/kafka/config.properties
```
## Выдать права консумер-группе
### Kafka CLI
Подключитесь к брокеру и выполните команду для выдачи прав на чтение топика консумером.
```
bin/kafka-acls.sh --add \
--bootstrap-server $(hostname):9094 \
--allow-principal User:producer-user \
--topic "topic-name" \
--operation Write \
--command-config /etc/kafka/config.properties
```
## Выдать права продюсеру
### Kafka CLI
Подключитесь к брокеру и выполните команду для выдачи прав на запись в топик продюсеру.
```
bin/kafka-acls.sh --add \
--bootstrap-server $(hostname):9094 \
--allow-principal User:producer-user \
--topic "topic-name" \
--operation Write \
--command-config /etc/kafka/config.properties
```
## Распечатать список прав
### Kafka CLI
Подключитесь к брокеру и выполните команду.
```
bin/kafka-acls.sh --list \
--bootstrap-server $(hostname):9094 \
--command-config /etc/kafka/config.properties
```
## Забрать права у продюсера
### Kafka CLI
Подключитесь к брокеру и выполните команду, чтобы отобрать права.
```
bin/kafka-acls.sh --remove \
--bootstrap-server $(hostname):9094 \
--allow-principal User:producer-user \
--topic "topic-name" \
--operation Write \
--command-config /etc/kafka/config.properties
```
## Забрать права у консумера
### Kafka CLI
Подключитесь к брокеру и выполните команду, чтобы отобрать права.
```
bin/kafka-acls.sh --remove \
--bootstrap-server $(hostname):9094 \
--allow-principal User:consumer-user \
--topic "topic-name" \
--operation Read \
--command-config /etc/kafka/config.properties
```
## Удаление пользователя
### Kafka CLI
Чтобы удалить пользователя, достаточно забрать все права со всех топиков, которые ему назначены.

11
docs/guide/001-intro.md Normal file
View File

@ -0,0 +1,11 @@
# Подготовка
Для работы вам необходимо:
- [Docker](https://www.docker.com/products/docker-desktop/) _(20.10.22 или старше)_
- [Docker Compose](https://docs.docker.com/compose/) _(v2.15.1 или старше)_
- Доступ в Интернет.
Мы тестировали курс только на Linux и Mac OS, но, скорее всего, команды без проблем заработают и под Windows.
✅ Готово. Переходите к [запуску кластера](./002-getting-started.md).

View File

@ -0,0 +1,68 @@
# Начало работы
## Запуск кластера
Скопируйте конфигурационный файл:
```bash
cp .env.example .env
```
Запустите кластер:
```bash
docker-compose up -d
```
Если вы всё выполнили верно, то в списке запущенных контейнеров вы увидите Kafka, Zookeeper и Redpanda Console UI.
```bash
docker-compose ps
```
```text
NAME IMAGE COMMAND SERVICE CREATED STATUS PORTS
grafana docker.io/grafana/grafana:latest "/run.sh" grafana 9 seconds ago Up 7 seconds 0.0.0.0:3000->3000/tcp
kafka-1 kafka-workshop-kafka-1 "/opt/bitnami/script…" kafka-1 8 seconds ago Up 7 seconds 9092/tcp
kafka-2 kafka-workshop-kafka-2 "/opt/bitnami/script…" kafka-2 8 seconds ago Up 7 seconds 9092/tcp
kafka-3 kafka-workshop-kafka-3 "/opt/bitnami/script…" kafka-3 8 seconds ago Up 7 seconds 9092/tcp
kafka-exporter docker.io/bitnami/kafka-exporter:latest "kafka_exporter --ka…" kafka-exporter 9 seconds ago Up 2 seconds 9308/tcp
prometheus quay.io/prometheus/prometheus:latest "/bin/prometheus --c…" prometheus 9 seconds ago Up 7 seconds 0.0.0.0:9090->9090/tcp
ui docker.redpanda.com/vectorized/console:v2.1.1 "/bin/sh -c 'echo \"$…" ui 8 seconds ago Up 6 seconds 0.0.0.0:8080->8080/tcp
zookeeper docker.io/bitnami/zookeeper:3.8 "/opt/bitnami/script…" zookeeper 9 seconds ago Up 7 seconds 2181/tcp, 2888/tcp, 3888/tcp, 8080/tcp
```
## Состав кластера
Стенд Kafka предназначен для локальных экспериментов при изучении Apache Kafka и состоит из следующих групп программ:
| **Название** | **Контейнеры** | **Описание** |
| ----------------- | ---------------- | ---------------- |
| **Кластер** |
| Kafka | `kafka-1`, `kafka-2`, `kafka-3` | Основной кластер |
| Zookeeper | `zookeeper` | Координатор и менеджер кворума |
| Redpanda Console | `ui` | UI для управления Kafka |
| **Приложения** |
| Консумер | `consumer-1` | Пример программы на Go для чтения данных из Kafka |
| Продюсер | `producer` | Пример программы на Go для чтения данных из Kafka |
| **Наблюдаемость** |
| Kafka Exporter | `kafka-exporter` | Экспортер метрик Kafka в формате PromQL |
| Prometheus | `prometheus` | Сервер метрик в формате PromQL |
| Grafana | `grafana` | Сервер визуализации метрик |
## Интерфейсы
Кластер представляет следующие публичные интерфейсы:
- [Prometheus UI](http://localhost:9090/) (`http://localhost:9090`)
- [Веб-интерфейс UI (Redpanda Console)](http://localhost:8080/) (`http://localhost:8080`)
- [Grafana UI](http://localhost:3000/) (`http://localhost:3000`)
## Доступ
Для демонстрации возможностей мы не используем авторизацию, за исключением Grafana.
* Имя пользователя: `admin`
* Пароль: `admin`
✅ Готово. Переходите к [созданию топика](./003-topics-and-partitions.md).

View File

@ -0,0 +1,47 @@
# Создание топика
Откройте веб-интерфейс [Redpanda Console](http://localhost:8080/topcs) и перейдите на вкладку "Topics". Создайте топик "example".
- Имя топика: `example`
- Число партиций: `6`
- Фактор репликации: `3`
- Минимальное число ISR: `2`
- Политика устаревания: `10` минут
![Redpanda Console: Create topic](./assets/003-create-topic-popup.png)
Перейдите по ссылке на [страницу уже созданного Kafka-топика](http://localhost:8080/topics/example#messages).
![Redpanda Console: Open topic profile page](./assets/003-open-topic-profile-page.png)
Запишите в топик ваше первое сообщение (Actions — Publish messages).
![Redpanda Console: Publish first message](./assets/003-publish-first-message.png)
Если вы всё сделали верно, то в топике появится первое сообщение.
![Redpanda Console: View first message](./assets/003-view-first-message.png)
Усложним пример: запустим консумер и продюсер, что пишут и читают топик `example`. В `docker-compose.yml` мы заранее подготовили примеры программ продюсера и консумера. Запустите приложения:
```bash
docker-compose --profile app up -d
```
Если вы всё сделали верно, то в списке контейнеров вы увидите программу-консумер и продюсер:
```
NAME IMAGE COMMAND SERVICE CREATED STATUS PORTS
consumer-1 kafka-workshop-consumer-1 "app" consumer-1 5 seconds ago Up 4 seconds
grafana docker.io/grafana/grafana:latest "/run.sh" grafana 6 minutes ago Up 6 minutes 0.0.0.0:3000->3000/tcp
kafka-1 kafka-workshop-kafka-1 "/opt/bitnami/script…" kafka-1 6 minutes ago Up 6 minutes 9092/tcp
kafka-2 kafka-workshop-kafka-2 "/opt/bitnami/script…" kafka-2 6 minutes ago Up 6 minutes 9092/tcp
kafka-3 kafka-workshop-kafka-3 "/opt/bitnami/script…" kafka-3 6 minutes ago Up 6 minutes 9092/tcp
kafka-exporter docker.io/bitnami/kafka-exporter:latest "kafka_exporter --ka…" kafka-exporter 6 minutes ago Up 6 minutes 9308/tcp
producer-1 kafka-workshop-producer "app" producer 5 seconds ago Up 4 seconds
prometheus quay.io/prometheus/prometheus:latest "/bin/prometheus --c…" prometheus 6 minutes ago Up 6 minutes 0.0.0.0:9090->9090/tcp
ui docker.redpanda.com/vectorized/console:v2.1.1 "/bin/sh -c 'echo \"$…" ui 6 minutes ago Up 6 minutes 0.0.0.0:8080->8080/tcp
zookeeper docker.io/bitnami/zookeeper:3.8 "/opt/bitnami/script…" zookeeper 6 minutes ago Up 6 minutes 2181/tcp, 2888/tcp, 3888/tcp, 8080/tcp
```
✅ Готово. Переходите к [работе с продюсерами](./004-producers.md).

View File

@ -0,0 +1,60 @@
# Работа продюсера
Мы заранее подготовили пример программы-продюсера на Go. Для простоты, представьте себе приложение, что собирает координаты сборщиков и курьеров СберМаркет (кандидатов), а затем отправляет их в один топик `example` для будущих потребителей. Продюсер записывает сообщения в формате JSON, передавая идентификатор кандидата (`id`), широту (`lat`) и долготу (`lon`).
```json
{
"id": 1,
"lat": -17.000132,
"lon": 28.587008
}
```
Посмотрите пример записываемых событий:
```
docker-compose --profile app logs -f producer
```
```
producer-1 | 2023/02/25 18:51:36 message written at topic example: 6 = {"id":6,"lat":-56.998822,"lon":-120.432007}
producer-1 | 2023/02/25 18:51:38 message written at topic example: 1 = {"id":1,"lat":-46.10566,"lon":-21.857902}
producer-1 | 2023/02/25 18:51:40 message written at topic example: 3 = {"id":3,"lat":73.333584,"lon":-113.570201}
producer-1 | 2023/02/25 18:51:42 message written at topic example: 1 = {"id":1,"lat":40.295088,"lon":-78.754235}
producer-1 | 2023/02/25 18:51:44 message written at topic example: 5 = {"id":5,"lat":-16.964839,"lon":40.875567}
producer-1 | 2023/02/25 18:51:46 message written at topic example: 5 = {"id":5,"lat":37.125313,"lon":-99.247528}
producer-1 | 2023/02/25 18:51:48 message written at topic example: 4 = {"id":4,"lat":43.214251,"lon":116.305726}
producer-1 | 2023/02/25 18:51:50 message written at topic example: 1 = {"id":1,"lat":55.444785,"lon":170.116819}
producer-1 | 2023/02/25 18:51:52 message written at topic example: 3 = {"id":3,"lat":83.039649,"lon":-136.749644}
producer-1 | 2023/02/25 18:51:54 message written at topic example: 3 = {"id":3,"lat":-17.167125,"lon":157.868446}
producer-1 | 2023/02/25 18:51:56 message written at topic example: 4 = {"id":4,"lat":81.208194,"lon":121.136919}
```
Откройте [исходный код продюсера](../../examples/producer/main.go). Обратите внимание на настройки `kafka.Writer`:
```go
w := &kafka.Writer{
Addr: kafka.TCP(addrs...),
Topic: topic,
Balancer: &kafka.Hash{},
}
```
Опция `Balancer` определяет стратегию балансировки событий между партициями. В нашем примере мы используем хеш-функцию от передаваемого ключа (`key`). В качестве ключа используем идентификатор кандидата, чтобы события одного и того же пользователя сохранялись в верном порядке.
```go
payload := kafka.Message{
Key: []byte(strconv.Itoa(candidate_id)),
Value: []byte(msg),
}
err = w.WriteMessages(context.Background(), payload)
if err != nil {
log.Fatal("failed to write messages:", err)
}
```
Откройте [веб-интерфейс Redpanda Console](http://localhost:8080/topics/example#messages) и посмотрите содержимое любого сообщения. Убедитесь, что поля ключ и тело заполнены так, как ожидается.
✅ Готово. Переходите к [работе с консумерами](./005-consumers.md).

View File

@ -0,0 +1,3 @@
# Работа с консумерами
TBD

Binary file not shown.

After

Width:  |  Height:  |  Size: 36 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 70 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 31 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 73 KiB

BIN
docs/theory/slides.pdf Normal file

Binary file not shown.

View File

@ -0,0 +1,90 @@
# Теория. Ментальная модель Kafka.
## Предисловие
В СберМаркете мы широко используем Kafka в качестве шины данных для микросервисов и знаем непонаслышке, что работать с Kafka нужно уметь. Воркшоп состоит из двух частей: теоретическая и практическая. В теоретической части мы:
- Обсудим сценарии использования
- Узнаем что такое консумер, продюсер и брокер
- Поймём как связаны топики, партиции и сегменты
- Посмотрим на формат сообщения Kafka
- Поймём зачем нужен лидер партиции и как реплицируются данные
- Зачем нужно партицирование
- Узнаем какие есть гарантии доставки сообщений
- Затронем идемпотентность обработки событий
- Определим что такое консумер-группа
- Посмотрим на ребалансировку консумеров
## Предпосылки к использованию
Долгое время инженеры разрабатывали программы, оперирующие объектами реального мира, сохраняя состояние о них в базы данных. Будь то, например, пользователи, заказы или товары. Представление о вещах мира как об объектах широко распространено среди ИТ-разработки (будь то парадигма ООП), однако сейчас больше компаний и технических команд всё чаще предпочитают думать не о самих объектах, а событиях, которые они порождают — то бишь, изменениях объектов во времени.
Популярность событийно-ориентированного подхода вызвана стремлением компаний снизить связность сервисов друг с другом (что крайне важно при микросервисной трансформации) и улучшить устойчивость приложений к сбоям за счёт изоляции поставщиков данных и их потребителей.
События, проходящие в системах, как и объекты, также можно хранить в традиционных реляционных базах данных, однако это достаточно громоздко и неэффективно. Вместо этого мы используем структуру под названием лог.
## Устройство Apache Kafka
Лог — это упорядоченный поток событий во времени. Некоторое событие происходит и попадает всегда в конец лога, оставаясь неизменным.
Apache Kafka — это система по управлению такими логами и платформа, призванная соединить поставщиков данных и их потребителей, предоставив возможность получать упорядоченный поток событий в реальном времени.
### Продюсеры
Чтобы записать события в кластер Kafka, есть продюсеры. Продюсер — это приложение, которое вы разрабатываете. Программа-продюсер записывает сообщение в Kafka, а Kafka сохраняет события, возвращает подтверждение (acknowledgement) о записи, продюсер получает его и начинает следующую запись. И так далее.
### Брокеры
Сам же кластер Kafka состоит из брокеров. Представьте себе дата-центр и серверы в нём. В первом приближении думайте о Kafka-брокере как о компьютере: это процесс в операционной системе с доступом к своему локальному диску. Все брокеры соединены друг с другом сетью, действуя сообща как единый Kafka-кластер. Когда продюсеры пишут события в Kafka-кластер, они работают с брокерами в нём.
В облачной среде Kafka-кластер не обязательно работает на выделенных серверах, а может быть виртуальными машинами или контейнерами в Kubernetes. Главное — каждый кластер Kafka состоит из брокеров.
### Консумеры
События, записанные продюсерами на локальные диски брокеров, могут быть прочитаны консумерами. Консумер — это также приложение, которое вы разрабатываете. В этом случае по-прежнему кластер Kafka — это нечто, обслуживаемое инфраструктурой, но что делаете вы как пользователь — пишете продюсер и консумер. Программа-консумер подписывается на события (поллит) и получает данные в ответ. И так по кругу.
Консумером может быть программа, подбирающая кандидата на основе координат партнёра, или при появлении заказа — инициирующая новую сборку. При этом консумер также может быть и продюсером, создавая новые события для размещения в Kafka для других сервисов.
Итого основы Kafka: продюсер, брокер и консумер.
## Архитектура Kafka
Итак, давайте посмотрим на архитектуру Kafka внимательнее. Слева есть продюсеры, в середине брокеры, а справа — консумеры. Kafka же представляет собой группу брокеров, связанных с Zookeeper-кворумом. Kafka использует Zookeeper для достижения консенсуса состояния в распределённой системе: есть несколько вещей, с которыми должен быть «согласен» каждый брокер и Zookeeper помогает достичь этого «согласия» внутри кластера.
> _Начиная с Kafka 3.4 необходимость в использовании Zookeeper отпала: для арбитража используется собственный протокол KRaft, решающий те же задачи, но на уровне брокеров, однако для простоты мы пока остановимся на традиционной схеме_.
Так вот, Zookeeper представляет собой выделенный кластер серверов для образования кворума и поддержки внутренних процессов Kafka. Благодаря Zookeeper, кластером Kafka можно управлять: добавлять пользователей, топики и задавать им настройки.
Zookeeper также помогает при обнаружении сбоя в мастере, провести выборы нового и сохранить работоспособность кластера Kafka. И, что немаловажно, Zookeeper хранит в себе все авторизационные данные и ограничения (Access Control Lists) при работе консумеров и продюсеров с брокерами.
Подводя промежуточный итог, кластер Kafka позволяет изолировать консумеры и продюсеры друг от друга. Продюсер ничего не знает о консумерах при записи данных в брокер, а консумер — ничего не знает о продюсере данных.
Скажем, если консумер станет работать медленнее продюсера, то это никак не влияет на запись этих событий. То же и с добавлением, удалением или даже сбоем: изменения консумеров не оказывают никакого влияния на продюсеры.
## Устройство брокеров
Теперь поговорим отдельно о брокерах. Наверняка вы несколько раз слышали про какие-то топики, теперь коротко о том, что это такое. События в Kafka-брокерах образуют топики.
### Топики
Топик — это логическое представление категорий сообщений в группы. Например, события по статусам заказов, координат партнёров, маршрутных листов и так далее.
Ключевое слово здесь — **логическое**. Мы создаём топики для событий общей группы и стараемся не смешивать их друг с другом. Например, координаты партнёров не должны быть в том же топике, что и статусы заказов, а обновлённые статусы по заказам — не быть вперемешку с обновлением регистраций пользователей.
О топике стоит думать как о логе: вы пишете событие в конец, не разрушая при этом старые события. Один продюсер может писать в один или несколько топиков, в один топик могут писать один или более продюсеров, как и много консумеров могут читать из одного топика, как и один консумер может читать сразу несколько топиков.
Теоретически, нет никаких ограничений на число этих топиков, но практически это ограничено числом партиций, о которых мы поговорим позднее.
### Партиции и сегменты
Топиков в кластере Kafka может быть много и нет ограничений на их создание. Однако рано или поздно, компьютер, выполняя операции на процессоре и вводе-выводе, упирается в свой предел. К сожалению, мы не можем увеличивать мощность и производительность компьютеров бесконечно, а потому топик следует делить на части.
В Kafka эти части называются партиции. Каждый Kafka-топик состоит из одной или более партиций, каждая из которых может быть размещена на разных брокерах. Это то, благодаря чему Kafka может масштабироваться: пользователь может создать топик, разделить его на партиции и разместить каждую из них на своём брокере.
Формально партиция — это и есть строго упорядоченный лог сообщений. Каждое сообщение в партиции добавлено в конец без возможности изменить его в будущем и как-то повлиять на уже записанные сообщения. При этом сам топик вцелом не имеет никакого порядка, но порядок сообщений всегда есть в одной из его партиций.
Партиции же представлены физически на дисках в виде сегментов — отдельных файлов, что могут быть созданы, отротированы или удалены в соответствии с настройкой устаревания данных в них. Обычно, если вы не администрируете кластер Kafka, вам не приходится много думать о сегментах партиций, но вы должны помнить о партициях, как о модели хранения данных в топиках Kafka.

5
examples/consumer/go.mod Normal file
View File

@ -0,0 +1,5 @@
module github.com/gongled/kafka-workshop/examples/consumer
go 1.16
require github.com/segmentio/kafka-go v0.4.38

39
examples/consumer/go.sum Normal file
View File

@ -0,0 +1,39 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/klauspost/compress v1.15.9 h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY=
github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0=
github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/segmentio/kafka-go v0.4.38 h1:iQdOBbUSdfuYlFpvjuALgj7N6DrdPA0HfB4AhREOdtg=
github.com/segmentio/kafka-go v0.4.38/go.mod h1:ikyuGon/60MN/vXFgykf7Zm8P5Be49gJU6vezwjnnhU=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/xdg/scram v1.0.5 h1:TuS0RFmt5Is5qm9Tm2SoD89OPqe4IRiFtyFY4iwWXsw=
github.com/xdg/scram v1.0.5/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
github.com/xdg/stringprep v1.0.3 h1:cmL5Enob4W83ti/ZHuZLuKD/xqJfus4fVPwE+/BDm+4=
github.com/xdg/stringprep v1.0.3/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d h1:sK3txAijHtOK88l68nt020reeT1ZdKLIYetKl95FzVY=
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20220706163947-c90051bbdb60 h1:8NSylCMxLW4JvserAndSgFL7aPli6A68yf0bYFTcWCM=
golang.org/x/net v0.0.0-20220706163947-c90051bbdb60/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

64
examples/consumer/main.go Normal file
View File

@ -0,0 +1,64 @@
package main
import (
"context"
"flag"
"fmt"
"log"
"os"
"strings"
"github.com/segmentio/kafka-go"
)
var (
brokers = ""
group = ""
topic = ""
)
func init() {
flag.StringVar(&brokers, "brokers", os.Getenv("KAFKA_BROKERS"), "Kafka bootstrap brokers to connect to, as a comma separated list")
flag.StringVar(&group, "group", os.Getenv("KAFKA_CONSUMER_GROUP"), "Kafka consumer group definition")
flag.StringVar(&topic, "topic", os.Getenv("KAFKA_TOPIC"), "Kafka topic to be consumed")
flag.Parse()
if len(brokers) == 0 {
panic("no Kafka bootstrap brokers defined, please set the -brokers flag")
}
if len(topic) == 0 {
panic("no topic given to be consumed, please set the -topic flag")
}
if len(group) == 0 {
panic("no Kafka consumer group defined, please set the -group flag")
}
}
func main() {
// make a new reader that consumes from topic-A
addrs := strings.Split(brokers, ",")
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: addrs,
GroupID: group,
Topic: topic,
MinBytes: 10e2, // 1KB
MaxBytes: 10e6, // 10MB
})
log.Print("Starting consumer program...")
log.Print(fmt.Sprintf("Brokers (%s), topic (%s), consumer group (%s)", brokers, topic, group))
for {
m, err := r.ReadMessage(context.Background())
if err != nil {
break
}
log.Print(fmt.Sprintf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value)))
}
if err := r.Close(); err != nil {
log.Fatal("failed to close reader:", err)
}
}

8
examples/producer/go.mod Normal file
View File

@ -0,0 +1,8 @@
module github.com/gongled/kafka-workshop/examples/producer
go 1.16
require (
github.com/brianvoe/gofakeit/v6 v6.20.1 // indirect
github.com/segmentio/kafka-go v0.4.38
)

41
examples/producer/go.sum Normal file
View File

@ -0,0 +1,41 @@
github.com/brianvoe/gofakeit/v6 v6.20.1 h1:8ihJ60OvPnPJ2W6wZR7M+TTeaZ9bml0z6oy4gvyJ/ek=
github.com/brianvoe/gofakeit/v6 v6.20.1/go.mod h1:Ow6qC71xtwm79anlwKRlWZW6zVq9D2XHE4QSSMP/rU8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/klauspost/compress v1.15.9 h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY=
github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0=
github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/segmentio/kafka-go v0.4.38 h1:iQdOBbUSdfuYlFpvjuALgj7N6DrdPA0HfB4AhREOdtg=
github.com/segmentio/kafka-go v0.4.38/go.mod h1:ikyuGon/60MN/vXFgykf7Zm8P5Be49gJU6vezwjnnhU=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/xdg/scram v1.0.5 h1:TuS0RFmt5Is5qm9Tm2SoD89OPqe4IRiFtyFY4iwWXsw=
github.com/xdg/scram v1.0.5/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
github.com/xdg/stringprep v1.0.3 h1:cmL5Enob4W83ti/ZHuZLuKD/xqJfus4fVPwE+/BDm+4=
github.com/xdg/stringprep v1.0.3/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d h1:sK3txAijHtOK88l68nt020reeT1ZdKLIYetKl95FzVY=
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20220706163947-c90051bbdb60 h1:8NSylCMxLW4JvserAndSgFL7aPli6A68yf0bYFTcWCM=
golang.org/x/net v0.0.0-20220706163947-c90051bbdb60/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

89
examples/producer/main.go Normal file
View File

@ -0,0 +1,89 @@
package main
import (
"context"
"encoding/json"
"flag"
"fmt"
"log"
"os"
"strconv"
"strings"
"time"
"github.com/brianvoe/gofakeit/v6"
"github.com/segmentio/kafka-go"
)
var (
brokers = ""
topic = ""
)
type CandidateLocation struct {
ID int `json:"id"`
Lat float64 `json:"lat"`
Lon float64 `json:"lon"`
}
func init() {
flag.StringVar(&brokers, "brokers", os.Getenv("KAFKA_BROKERS"), "Kafka bootstrap brokers to connect to, as a comma separated list")
flag.StringVar(&topic, "topic", os.Getenv("KAFKA_TOPIC"), "Kafka topic to be produced")
flag.Parse()
if len(brokers) == 0 {
panic("no Kafka bootstrap brokers defined, please set the -brokers flag")
}
if len(topic) == 0 {
panic("no topic given to be consumed, please set the -topic flag")
}
}
func main() {
// make a writer that produces to topic-A, using the round-robin distribution
addrs := strings.Split(brokers, ",")
w := &kafka.Writer{
Addr: kafka.TCP(addrs...),
Topic: topic,
Balancer: &kafka.Hash{},
}
log.Print("Starting producer program...")
log.Print(fmt.Sprintf("Brokers (%s), topic (%s)", brokers, topic))
for {
candidate_id := gofakeit.Number(1, 6)
location := CandidateLocation{
ID: candidate_id,
Lat: gofakeit.Latitude(),
Lon: gofakeit.Longitude(),
}
msg, err := json.Marshal(location)
if err != nil {
panic(err)
}
payload := kafka.Message{
Key: []byte(strconv.Itoa(candidate_id)),
Value: []byte(msg),
}
err = w.WriteMessages(context.Background(), payload)
if err != nil {
log.Fatal("failed to write messages:", err)
}
log.Print(fmt.Sprintf("message written at topic %s: %s = %s\n", w.Topic, string(payload.Key), string(payload.Value)))
time.Sleep(1 * time.Second)
}
if err := w.Close(); err != nil {
log.Fatal("failed to close writer:", err)
}
}

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,8 @@
apiVersion: 1
providers:
- name: Default
folder: Kafka
type: file
options:
path: /var/lib/grafana/dashboards

View File

@ -0,0 +1,10 @@
# config file version
apiVersion: 1
datasources:
- name: Prometheus
type: prometheus
access: proxy
orgId: 1
url: http://prometheus:9090
editable: false

View File

@ -0,0 +1,16 @@
<svg width="280" height="115" viewBox="0 0 280 115" fill="none" xmlns="http://www.w3.org/2000/svg">
<path d="M0 0L222.701 0C254.346 0 280 25.8544 280 57.5V57.5C280 89.1456 254.346 115 222.701 115H0L0 0Z" fill="url(#paint0_linear_145_5463)"/>
<path d="M235.604 79.1616H226.224V54.9493C226.224 49.1768 223.899 46.6113 218.367 46.5311H218.126C211.151 46.5311 207.864 49.9785 207.864 56.8734V79.1616H198.484V19.0317H207.864V47.2527C208.987 42.3621 214.198 38.3535 221.814 38.3535C230.473 38.4336 235.604 43.0035 235.604 52.3837V79.1616Z" fill="black"/>
<path d="M88.9881 39.075V32.1801L98.3684 25.4456V39.075H112.719V47.3328H98.3684V64.7304C98.3684 70.102 99.8917 71.7055 104.381 71.7055C107.749 71.7055 111.116 70.583 113.441 69.1399V77.2374C111.677 78.6805 106.787 79.9633 102.778 79.9633C93.7985 79.9633 88.9881 76.5158 88.9881 67.4563V47.3328H81.6924V39.075H88.9881Z" fill="black"/>
<path fill-rule="evenodd" clip-rule="evenodd" d="M151.789 65.5321V75.1529C149.865 77.3977 144.333 80.0435 137.037 79.9633C123.808 80.0435 115.951 72.7477 115.951 58.9579C115.951 45.6492 124.289 38.1931 136.395 38.1931C148.021 38.2733 154.996 45.4087 154.996 57.9959V61.8442H125.251C125.973 68.4985 130.062 71.8658 137.358 71.8658C144.733 71.8658 149.624 69.0597 151.789 65.5321ZM145.615 55.1898C145.375 49.738 141.847 46.3708 136.155 46.3708C130.142 46.3708 126.374 49.4173 125.412 55.1898H145.615Z" fill="black"/>
<path d="M192.721 75.1529V65.5321C190.716 69.0597 186.307 71.8658 179.653 71.8658C172.116 71.8658 167.627 68.0175 167.627 59.7597V58.5571C167.627 50.4596 171.796 46.3708 179.492 46.3708C185.345 46.4509 189.594 49.3372 192.32 53.5863V41.8811C189.674 39.8767 184.864 38.2733 179.492 38.1931C166.745 38.2733 158.086 45.1682 158.086 58.9579C158.086 72.9882 166.584 80.0435 179.412 80.0435C185.906 80.0435 190.957 77.3977 192.721 75.1529Z" fill="black"/>
<path d="M0.332031 36.1178L3.73473 35.7273C7.24361 35.3247 10.4967 37.6171 11.2978 41.057L17.9521 69.6313H61.5329L66.9277 47.8174H58.5319L55.1527 61.481H24.4224L21.1288 47.3379H58.4448L60.1789 39.1877H19.2308C17.4823 31.7264 10.4212 26.7563 2.80555 27.6302L0.332031 27.9141V36.1178Z" fill="black"/>
<path d="M26.2211 87.1304C29.928 87.1304 32.933 84.1254 32.933 80.4185C32.933 76.7115 29.928 73.7065 26.2211 73.7065C22.5142 73.7065 19.5091 76.7115 19.5091 80.4185C19.5091 84.1254 22.5142 87.1304 26.2211 87.1304Z" fill="black"/>
<path d="M58.3427 80.4185C58.3427 84.1254 55.3376 87.1304 51.6307 87.1304C47.9238 87.1304 44.9187 84.1254 44.9187 80.4185C44.9187 76.7115 47.9238 73.7065 51.6307 73.7065C55.3376 73.7065 58.3427 76.7115 58.3427 80.4185Z" fill="black"/>
<defs>
<linearGradient id="paint0_linear_145_5463" x1="280" y1="57.7258" x2="1.88205e-07" y2="57.7257" gradientUnits="userSpaceOnUse">
<stop stop-color="#31C2A7"/>
<stop offset="0.830729" stop-color="#31C2A7" stop-opacity="0"/>
</linearGradient>
</defs>
</svg>

After

Width:  |  Height:  |  Size: 2.8 KiB

117
jmx-exporter/kafka.yml Normal file
View File

@ -0,0 +1,117 @@
lowercaseOutputName: true
rules:
# Special cases and very specific rules
- pattern : kafka.server<type=(.+), name=(.+), clientId=(.+), topic=(.+), partition=(.*)><>Value
name: kafka_server_$1_$2
type: GAUGE
labels:
clientId: "$3"
topic: "$4"
partition: "$5"
- pattern : kafka.server<type=(.+), name=(.+), clientId=(.+), brokerHost=(.+), brokerPort=(.+)><>Value
name: kafka_server_$1_$2
type: GAUGE
labels:
clientId: "$3"
broker: "$4:$5"
- pattern : kafka.coordinator.(\w+)<type=(.+), name=(.+)><>Value
name: kafka_coordinator_$1_$2_$3
type: GAUGE
# Generic per-second counters with 0-2 key/value pairs
- pattern: kafka.(\w+)<type=(.+), name=(.+)PerSec\w*, (.+)=(.+), (.+)=(.+)><>Count
name: kafka_$1_$2_$3_total
type: COUNTER
labels:
"$4": "$5"
"$6": "$7"
- pattern: kafka.(\w+)<type=(.+), name=(.+)PerSec\w*, (.+)=(.+)><>Count
name: kafka_$1_$2_$3_total
type: COUNTER
labels:
"$4": "$5"
- pattern: kafka.(\w+)<type=(.+), name=(.+)PerSec\w*><>Count
name: kafka_$1_$2_$3_total
type: COUNTER
- pattern: kafka.server<type=(.+), client-id=(.+)><>([a-z-]+)
name: kafka_server_quota_$3
type: GAUGE
labels:
resource: "$1"
clientId: "$2"
- pattern: kafka.server<type=(.+), user=(.+), client-id=(.+)><>([a-z-]+)
name: kafka_server_quota_$4
type: GAUGE
labels:
resource: "$1"
user: "$2"
clientId: "$3"
# Generic gauges with 0-2 key/value pairs
- pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.+), (.+)=(.+)><>Value
name: kafka_$1_$2_$3
type: GAUGE
labels:
"$4": "$5"
"$6": "$7"
- pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.+)><>Value
name: kafka_$1_$2_$3
type: GAUGE
labels:
"$4": "$5"
- pattern: kafka.(\w+)<type=(.+), name=(.+)><>Value
name: kafka_$1_$2_$3
type: GAUGE
# Emulate Prometheus 'Summary' metrics for the exported 'Histogram's.
#
# Note that these are missing the '_sum' metric!
- pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.+), (.+)=(.+)><>Count
name: kafka_$1_$2_$3_count
type: COUNTER
labels:
"$4": "$5"
"$6": "$7"
- pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.*), (.+)=(.+)><>(\d+)thPercentile
name: kafka_$1_$2_$3
type: GAUGE
labels:
"$4": "$5"
"$6": "$7"
quantile: "0.$8"
- pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.+)><>Count
name: kafka_$1_$2_$3_count
type: COUNTER
labels:
"$4": "$5"
- pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.*)><>(\d+)thPercentile
name: kafka_$1_$2_$3
type: GAUGE
labels:
"$4": "$5"
quantile: "0.$6"
- pattern: kafka.(\w+)<type=(.+), name=(.+)><>Count
name: kafka_$1_$2_$3_count
type: COUNTER
- pattern: kafka.(\w+)<type=(.+), name=(.+)><>(\d+)thPercentile
name: kafka_$1_$2_$3
type: GAUGE
labels:
quantile: "0.$4"
# Generic gauges for MeanRate Percent
# Ex) kafka.server<type=KafkaRequestHandlerPool, name=RequestHandlerAvgIdlePercent><>MeanRate
- pattern: kafka.(\w+)<type=(.+), name=(.+)Percent\w*><>MeanRate
name: kafka_$1_$2_$3_percent
type: GAUGE
- pattern: kafka.(\w+)<type=(.+), name=(.+)Percent\w*><>Value
name: kafka_$1_$2_$3_percent
type: GAUGE
- pattern: kafka.(\w+)<type=(.+), name=(.+)Percent\w*, (.+)=(.+)><>Value
name: kafka_$1_$2_$3_percent
type: GAUGE
labels:
"$4": "$5"

14
prometheus/prometheus.yml Normal file
View File

@ -0,0 +1,14 @@
global:
scrape_interval: 5s
evaluation_interval: 5s
scrape_configs:
- job_name: 'kafka-exporter'
metrics_path: /metrics
static_configs:
- targets: [ 'kafka-exporter:9308' ]
- job_name: 'kafka'
metrics_path: /metrics
static_configs:
- targets: [ 'kafka-1:5556', 'kafka-2:5556', 'kafka-3:5556' ]