Tuesday, February 1, 2022

[Kubernetes] How to use Informer to avoid frequently getting Pod and Service List via clientset.CoreV1()?

How to use Informer to avoid frequently getting Pod and Service List via clientset.CoreV1()?

 [情境]

之前開發Kubernetes相關的系統有遇過這種情況: 

需要不斷的輪詢(Pooling) Kubernetes API 去獲得最新的 Pods 與 Services List,例如是每2秒。但輪詢是比較沒有效率的做法,因為很有極大的可能是,大部分的輪詢結果都是沒有變化的。

 [解決方法]

使用Informer來解決此問題。

client-go是從k8s代碼中抽出來的一個Kubernetes Client API,Informer是client-go中的核心工具包,已經被kubernetes中大量組件所使用。所謂Informer,其實就是一個帶有本地緩存和索引機制的,可以註冊EventHandler的客戶端,本地緩存被稱為存儲,索引被稱為Index。使用notifyer的目的是為了替換apiserver數據交互的壓力而抽像出來的一個緩存層,客戶端對apiserver數據的“讀取”和“監聽”操作都通過本地告密者進行。告密者實例的Lister()方法可以直接查找緩存在本地內存中的數據。

- Informer的主要功能:

    - 同步數據到本地緩存

    - 根據對應的事件類型,觸發預先註冊好的ResourceEventHandler

此問題透過client-go's Informers的功能, 可監聽所關心的資源變化包含: ADD、UPDATE、DELETE等。所以原先每秒都詢問一次 Kubernetes API Pod & Service List 改為: 

當Pod或是Service資源發生變化時才查詢Kubernetes API

完整實作的Informer如下:
package informer

import (
    "fmt"

    "k8s.io/client-go/informers"
    coreinformers "k8s.io/client-go/informers/core/v1"
    k8scache "k8s.io/client-go/tools/cache"
)

// PodController watches the name and namespace of pods that are added,
// deleted, or updated
type PodSvcController struct {
    informerFactory informers.SharedInformerFactory
    podInformer     coreinformers.PodInformer
    svcInformer     coreinformers.ServiceInformer
    ChangePod       bool
    ChangeSvc       bool
}

// Run starts shared informers and waits for the shared informer cache to
// synchronize.
func (c *PodSvcController) Run(stopCh chan struct{}) error {
    // Starts all the shared informers that have been created by the factory so
    // far.
    c.informerFactory.Start(stopCh)
    // wait for the initial synchronization of the local cache.
    if !k8scache.WaitForCacheSync(stopCh, c.podInformer.Informer().HasSynced) {
        return fmt.Errorf("Failed to sync")
    }
    return nil
}

func (c *PodSvcController) podAdd(obj interface{}) {
    //pod := obj.(*corev1.Pod)
    c.ChangePod = true
    //fmt.Println("POD ADD: ", pod.Namespace, pod.Name)
}

func (c *PodSvcController) podDelete(obj interface{}) {
    //pod := obj.(*corev1.Pod)
    c.ChangePod = true
    //fmt.Println("POD DELETE: ", pod.Namespace, pod.Name)
}

func (c *PodSvcController) podUpdate(old, new interface{}) {
    //oldPod := old.(*corev1.Pod)
    //newPod := new.(*corev1.Pod)
    c.ChangePod = true
    //fmt.Println("POD UPDATED: ",
    //  oldPod.Namespace, oldPod.Name, newPod.Status.Phase)
}

func (c *PodSvcController) svcAdd(obj interface{}) {
    //svc := obj.(*corev1.Service)
    c.ChangeSvc = true
    //fmt.Println("SVC ADD: ", svc.Namespace, svc.Name)
}

func (c *PodSvcController) svcDelete(obj interface{}) {
    //svc := obj.(*corev1.Service)
    c.ChangeSvc = true
    //fmt.Println("SVC DELETE: ", svc.Namespace, svc.Name)
}

func (c *PodSvcController) svcUpdate(old, new interface{}) {
    //oldSvc := old.(*corev1.Service)
    //newSvc := new.(*corev1.Service)
    c.ChangeSvc = true
    //fmt.Println("POD UPDATED: ",
    //  oldSvc.Namespace, oldSvc.Name, newSvc.Status)
}

// NewPodLoggingController creates a PodLoggingController
func NewPodSvcController(informerFactory informers.SharedInformerFactory) *PodSvcController {
    podInformer := informerFactory.Core().V1().Pods()
    svcInformer := informerFactory.Core().V1().Services()

    c := &PodSvcController{
        informerFactory: informerFactory,
        podInformer:     podInformer,
        svcInformer:     svcInformer,
        ChangePod:       false,
        ChangeSvc:       false,
    }

    podInformer.Informer().AddEventHandler(
        // We only care about Add and Delete event handlers.
        k8scache.ResourceEventHandlerFuncs{
            // Called on creation
            AddFunc: c.podAdd,
            // Called on resource update and every resyncPeriod on existing resources.
            UpdateFunc: c.podUpdate,
            // Called on resource deletion.
            DeleteFunc: c.podDelete,
        },
    )
    svcInformer.Informer().AddEventHandler(
        // We only care about Add and Delete event handlers.
        k8scache.ResourceEventHandlerFuncs{
            // Called on creation
            AddFunc: c.svcAdd,
            // Called on resource update and every resyncPeriod on existing resources.
            UpdateFunc: c.svcUpdate,
            // Called on resource deletion.
            DeleteFunc: c.svcDelete,
        },
    )
    return c
}

 使用Informer的做法範例:

// Connect to the API server
    clientset, err := k8sutil.NewClientset(kubeconfig)
    if err != nil {
        panic(err)
    }

    // Here we use Informers to receive the Pods/Svcs change notices
    factory := informers.NewSharedInformerFactory(clientset, time.Minute*2)
    controller := informer.NewPodSvcController(factory)
    stop := make(chan struct{})
    defer close(stop)
    err = controller.Run(stop)
    if err != nil {
        failure <- err.Error()
    }

    // This goroutine is triggerd by timer for parsing the data in queue.
    ticker := time.NewTicker(time.Second)
    done := make(chan bool)
    go func() {
        var pods *v1.PodList = nil
        var svcs *v1.ServiceList = nil
        for {
            select {
            case <-done:
                return
            case <-ticker.C:
                eventCount := len(queue)
                if eventCount == 0 {
                    continue
                }
                if controller.ChangePod == true || pods == nil {
                    pods, err = clientset.CoreV1().Pods("").List(metav1.ListOptions{})
                    if err != nil {
                        failure <- err.Error()
                    }
                    //fmt.Println("Get new pods")
                    controller.ChangePod = false
                }
                if controller.ChangeSvc == true || svcs == nil {
                    svcs, err = clientset.CoreV1().Services("").List(metav1.ListOptions{})
                    if err != nil {
                        failure <- err.Error()
                    }
                    //fmt.Println("Get new svcs")
                    controller.ChangeSvc = false
                }
                // Consume that amount of events from the queue and use the same cache of
                // pods and services with them. We might not consume all the events,
                // that's ok, we'll get them at the next tick.
                //batch := make([]string, eventCount)
                for i := 0; i < eventCount; i++ {
                    f(<-queue, pods, svcs)
                }
            }
        }
    }()


No comments: