前言
本篇是個人進行初探Kafka using Golang時,把遇到坑填平後的結果紀錄。
Kafka 基本知識
網路上已經有很多相關文章,故不贅述。
Kafka(Go)教程(五)---Producer-Consumer API 基本使用
Kafka理論之Consumer Group & Coordinator
Kafka 環境啟動
透過Docker-Compose YAML file 如下:
$ docker-compolse -f dc-kafka.yaml up -d
P.S: 這邊需要設定Kafka啟動所需要的IP與Port Number (我習慣用9091)
PLAINTEXT_HOST://<<Server IP>>:9091
dc-kafka.yaml
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:5.3.1
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-kafka:5.3.1
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "9091:9091"
environment:
KAFKA_BROKER_ID: 1
KAFKA_BOOTSTRAP.SERVERS: 'broker:29091'
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29091,PLAINTEXT_HOST://<<Server IP>>:9091
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
control-center:
image: confluentinc/cp-enterprise-control-center:5.3.1
hostname: control-center
container_name: control-center
depends_on:
- zookeeper
- broker
ports:
- "9021:9021"
environment:
CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29091'
CONTROL_CENTER_ZOOKEEPER_CONNECT: 'zookeeper:2181'
CONTROL_CENTER_CONNECT_CLUSTER: 'connect:8083'
CONTROL_CENTER_KSQL_URL: "http://ksql-server:8088"
CONTROL_CENTER_KSQL_ADVERTISED_URL: "http://localhost:8088"
CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
CONTROL_CENTER_REPLICATION_FACTOR: 1
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
CONFLUENT_METRICS_TOPIC_REPLICATION: 1
PORT: 9021
開啟Kafka Configuration Web UI : <<Server IP>>:9021
P.S: 我這邊先創建一個Topic: packet-log with 12 Partitions,這邊的Topic: packet-log 也會在下面程式中使用到
Kafka example using Golang
我是修改: 這篇文章的Source Code: Go 操作kafka包sarama
sarama 是一個純Go 客戶端庫,用於處理Apache Kafka(0.8 及更高版本)。
Kafka Producer
Kafka ConsumerGroup
P.S: 我這邊是使用ConsumerGroup的例子 ( 多個Consumers在同一個Group內,每一訊息保證只會有一個Consumer拿到)
測試:
啟動一個Producer&3個Consumers(在同一個Group)
運行結果:
3個Consumers會拿到唯一的訊息,達到ConsumerGroup的效果
Consumed start
2022/05/15 02:39:28 Message claimed: value = testing AsyncProducerSelect 3, timestamp = 2022-05-15 02:39:28.104 +0000 UTC, topic = packet-log
2022/05/15 02:39:31 Message claimed: value = testing AsyncProducerSelect 6, timestamp = 2022-05-15 02:39:31.105 +0000 UTC, topic = packet-log
2022/05/15 02:39:32 Message claimed: value = testing AsyncProducerSelect 7, timestamp = 2022-05-15 02:39:32.106 +0000 UTC, topic = packet-log
2022/05/15 02:39:34 Message claimed: value = testing AsyncProducerSelect 9, timestamp = 2022-05-15 02:39:34.108 +0000 UTC, topic = packet-log
Consumed start
2022/05/15 02:39:25 Message claimed: value = testing AsyncProducerSelect 0, timestamp = 2022-05-15 02:39:25.102 +0000 UTC, topic = packet-log
2022/05/15 02:39:30 Message claimed: value = testing AsyncProducerSelect 5, timestamp = 2022-05-15 02:39:30.105 +0000 UTC, topic = packet-log
2022/05/15 02:39:33 Message claimed: value = testing AsyncProducerSelect 8, timestamp = 2022-05-15 02:39:33.107 +0000 UTC, topic = packet-log
Consumed start
2022/05/15 02:39:26 Message claimed: value = testing AsyncProducerSelect 1, timestamp = 2022-05-15 02:39:26.103 +0000 UTC, topic = packet-log
2022/05/15 02:39:27 Message claimed: value = testing AsyncProducerSelect 2, timestamp = 2022-05-15 02:39:27.103 +0000 UTC, topic = packet-log
2022/05/15 02:39:29 Message claimed: value = testing AsyncProducerSelect 4, timestamp = 2022-05-15 02:39:29.104 +0000 UTC, topic = packet-log
心得
Kafka是個很強大的Message Queueing & Streaming的平台,目前的初探僅止於透過小程式去體驗Kafak強大的功能。
目前的疑問: 設定Partitions 的數量需要大於 Group內的Consumers數量,才能使每個Consumer都能收到訊息,有其他的方式達到嗎?
No comments:
Post a Comment