Sunday, May 15, 2022

[Kafka] 初探Kafka using Golang

前言

本篇是個人進行初探Kafka using Golang時,把遇到坑填平後的結果紀錄。


Kafka 基本知識

網路上已經有很多相關文章,故不贅述。

初學Kafka

Kafka 介紹 + Golang程式實作

Kafka(Go)教程(五)---Producer-Consumer API 基本使用

Kafka理論之Consumer Group & Coordinator


Kafka 環境啟動

 透過Docker-Compose YAML file 如下:

$ docker-compolse -f dc-kafka.yaml up -d

P.S: 這邊需要設定Kafka啟動所需要的IP與Port Number (我習慣用9091) 
PLAINTEXT_HOST://<<Server IP>>:9091 

dc-kafka.yaml

version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:5.3.1
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-kafka:5.3.1
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "9091:9091"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_BOOTSTRAP.SERVERS: 'broker:29091'
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29091,PLAINTEXT_HOST://<<Server IP>>:9091
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0

  control-center:
    image: confluentinc/cp-enterprise-control-center:5.3.1
    hostname: control-center
    container_name: control-center
    depends_on:
      - zookeeper
      - broker
    ports:
      - "9021:9021"
    environment:
      CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29091'
      CONTROL_CENTER_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      CONTROL_CENTER_CONNECT_CLUSTER: 'connect:8083'
      CONTROL_CENTER_KSQL_URL: "http://ksql-server:8088"
      CONTROL_CENTER_KSQL_ADVERTISED_URL: "http://localhost:8088"
      CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      CONTROL_CENTER_REPLICATION_FACTOR: 1
      CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
      CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
      CONFLUENT_METRICS_TOPIC_REPLICATION: 1
      PORT: 9021

開啟Kafka Configuration Web UI : <<Server IP>>:9021 


P.S: 我這邊先創建一個Topic: packet-log with 12 Partitions,這邊的Topic: packet-log 也會在下面程式中使用到

Kafka example using Golang

我是修改: 這篇文章的Source Code: Go 操作kafka包sarama

sarama 是一個純Go 客戶端庫,用於處理Apache Kafka(0.8 及更高版本)。


Kafka Producer

package main

import (
    "fmt"
    "log"
    "os"
    "os/signal"
    "sync"
    "time"

    "github.com/Shopify/sarama"
)

//异步生产者Goroutines
func AsyncProducer() {
    config := sarama.NewConfig()
    config.Producer.Return.Successes = true
    config.Producer.Return.Errors = true
    producer, err := sarama.NewAsyncProducer([]string{"localhost:9091"}, config)
    if err != nil {
        panic(err)
    }

    // Trap SIGINT to trigger a graceful shutdown.
    signals := make(chan os.Signal, 1)
    signal.Notify(signals, os.Interrupt)

    var (
        wg                                  sync.WaitGroup
        enqueued, successes, producerErrors int
    )

    wg.Add(1)
    go func() {
        defer wg.Done()
        for range producer.Successes() {
            successes++
        }
    }()

    wg.Add(1)
    go func() {
        defer wg.Done()
        for err := range producer.Errors() {
            log.Println(err)
            producerErrors++
        }
    }()

ProducerLoop:
    for {
        message := &sarama.ProducerMessage{Topic: "packet-log",
            Value: sarama.StringEncoder(fmt.Sprintf("testing AsyncProducer %d", enqueued))}
        time.Sleep(1 * time.Second)
        select {
        case producer.Input() <- message:
            enqueued++

        case <-signals:
            producer.AsyncClose() // Trigger a shutdown of the producer.
            break ProducerLoop
        }
    }
    wg.Wait()

    log.Printf("Successfully produced: %d; errors: %d\n", successes, producerErrors)
}

//异步生产者Select
func AsyncProducerSelect() {
    producer, err := sarama.NewAsyncProducer([]string{"localhost:9091"}, nil)
    if err != nil {
        panic(err)
    }

    defer func() {
        if err := producer.Close(); err != nil {
            log.Fatalln(err)
        }
    }()

    // Trap SIGINT to trigger a shutdown.
    signals := make(chan os.Signal, 1)
    signal.Notify(signals, os.Interrupt)

    var enqueued, producerErrors int
ProducerLoop:
    for {
        select {
        case producer.Input() <- &sarama.ProducerMessage{Topic: "packet-log", Key: nil,
            Value: sarama.StringEncoder(fmt.Sprintf("testing AsyncProducerSelect %d", enqueued))}:
            enqueued++
            time.Sleep(1 * time.Second)
        case err := <-producer.Errors():
            log.Println("Failed to produce message", err)
            producerErrors++
        case <-signals:
            break ProducerLoop
        }
    }
    log.Printf("Enqueued: %d; errors: %d\n", enqueued, producerErrors)
}

//同步生产模式
func SaramaProducer() {
    producer, err := sarama.NewSyncProducer([]string{"localhost:9091"}, nil)
    if err != nil {
        log.Fatalln(err)
    }
    defer func() {
        if err := producer.Close(); err != nil {
            log.Fatalln(err)
        }
    }()

    msg := &sarama.ProducerMessage{Topic: "packet-log", Value: sarama.StringEncoder("testing SaramaProducer")}
    partition, offset, err := producer.SendMessage(msg)
    if err != nil {
        log.Printf("FAILED to send message: %s\n", err)
    } else {
        log.Printf("> message sent to partition %d at offset %d\n", partition, offset)
    }

}

func main() {
    //生产者
    //AsyncProducer()
    //SaramaProducer()
    AsyncProducerSelect()
}


Kafka ConsumerGroup

P.S: 我這邊是使用ConsumerGroup的例子 ( 多個Consumers在同一個Group內,每一訊息保證只會有一個Consumer拿到)

package main

import (
    "context"
    "fmt"
    "log"
    "os"
    "os/signal"

    "github.com/Shopify/sarama"
)

type consumerGroupHandler struct {
}

func (consumerGroupHandler) Setup(sarama.ConsumerGroupSession) error {
    return nil
}
func (consumerGroupHandler) Cleanup(sarama.ConsumerGroupSession) error { return nil }

// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (consumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    for msg := range claim.Messages() {
        log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s", msg.Value, msg.Timestamp, msg.Topic)
        session.MarkMessage(msg, "")
    }
    return nil
}

//消费者组
func SaramaConsumerGroup() {
    config := sarama.NewConfig()
    config.Consumer.Return.Errors = false
    config.Version = sarama.V0_10_2_0                     // specify appropriate version
    config.Consumer.Offsets.Initial = sarama.OffsetOldest // 未找到组消费位移的时候从哪边开始消费

    group, err := sarama.NewConsumerGroup([]string{"140.113.207.9:9091"}, "my-group", config)
    if err != nil {
        panic(err)
    }
    defer func() { _ = group.Close() }()

    // Track errors
    go func() {
        for err := range group.Errors() {
            fmt.Println("ERROR", err)
        }
    }()
    fmt.Println("Consumed start")
    // Iterate over consumer sessions.
    ctx := context.Background()
    for {
        topics := []string{"packet-log"}
        handler := consumerGroupHandler{}

        // `Consume` should be called inside an infinite loop, when a
        // server-side rebalance happens, the consumer session will need to be
        // recreated to get the new claims
        err := group.Consume(ctx, topics, handler)
        if err != nil {
            panic(err)
        }
    }
}

//消费者
func SaramaConsumer() {
    consumer, err := sarama.NewConsumer([]string{"140.113.207.9:9091"}, sarama.NewConfig())
    if err != nil {
        panic(err)
    }

    defer func() {
        if err := consumer.Close(); err != nil {
            log.Fatalln(err)
        }
    }()

    partitionConsumer, err := consumer.ConsumePartition("packet-log", 0, sarama.OffsetNewest)
    if err != nil {
        panic(err)
    }

    defer func() {
        if err := partitionConsumer.Close(); err != nil {
            log.Fatalln(err)
        }
    }()

    // Trap SIGINT to trigger a shutdown.
    signals := make(chan os.Signal, 1)
    signal.Notify(signals, os.Interrupt)

    consumed := 0
ConsumerLoop:
    for {
        select {
        case msg := <-partitionConsumer.Messages():
            log.Printf("Consumed message offset %d\n", msg.Offset)
            consumed++
        case <-signals:
            break ConsumerLoop
        }
    }

    log.Printf("Consumed: %d\n", consumed)
}

func main() {
    //消费者
    SaramaConsumerGroup()
    //SaramaConsumer()
}


測試:

啟動一個Producer&3個Consumers(在同一個Group)

運行結果:

3個Consumers會拿到唯一的訊息,達到ConsumerGroup的效果

Consumed start
2022/05/15 02:39:28 Message claimed: value = testing AsyncProducerSelect 3, timestamp = 2022-05-15 02:39:28.104 +0000 UTC, topic = packet-log
2022/05/15 02:39:31 Message claimed: value = testing AsyncProducerSelect 6, timestamp = 2022-05-15 02:39:31.105 +0000 UTC, topic = packet-log
2022/05/15 02:39:32 Message claimed: value = testing AsyncProducerSelect 7, timestamp = 2022-05-15 02:39:32.106 +0000 UTC, topic = packet-log
2022/05/15 02:39:34 Message claimed: value = testing AsyncProducerSelect 9, timestamp = 2022-05-15 02:39:34.108 +0000 UTC, topic = packet-log

Consumed start
2022/05/15 02:39:25 Message claimed: value = testing AsyncProducerSelect 0, timestamp = 2022-05-15 02:39:25.102 +0000 UTC, topic = packet-log
2022/05/15 02:39:30 Message claimed: value = testing AsyncProducerSelect 5, timestamp = 2022-05-15 02:39:30.105 +0000 UTC, topic = packet-log
2022/05/15 02:39:33 Message claimed: value = testing AsyncProducerSelect 8, timestamp = 2022-05-15 02:39:33.107 +0000 UTC, topic = packet-log

Consumed start
2022/05/15 02:39:26 Message claimed: value = testing AsyncProducerSelect 1, timestamp = 2022-05-15 02:39:26.103 +0000 UTC, topic = packet-log
2022/05/15 02:39:27 Message claimed: value = testing AsyncProducerSelect 2, timestamp = 2022-05-15 02:39:27.103 +0000 UTC, topic = packet-log
2022/05/15 02:39:29 Message claimed: value = testing AsyncProducerSelect 4, timestamp = 2022-05-15 02:39:29.104 +0000 UTC, topic = packet-log

心得

Kafka是個很強大的Message Queueing & Streaming的平台,目前的初探僅止於透過小程式去體驗Kafak強大的功能。


目前的疑問: 設定Partitions 的數量需要大於 Group內的Consumers數量,才能使每個Consumer都能收到訊息,有其他的方式達到嗎?


No comments: