client-go组件使用与原理

背景

之前大部分时间都在用client-go进行开发,也用过informer做一些有意思的事情,但是很多细节方面的东西没有注意,这次算是对client-go原理进行一个整理。

架构设计

client-go类型

client-go支持四种类型客户端对象来和api server进行交互:

  • RESTClient
  • ClientSet
  • DynamicClient
  • DiscoveryClient

其他三种都是基于RESTClient实现,所以RESTClient可以算是他们的父类。

image-20240115094702534

RESTClient

RESTClient是最基础客户端,对HTTP Request进行封装,实现RESTful API。

1
2
3
4
5
6
7
8
9
10
11
// Interface captures the set of operations for generically interacting with Kubernetes REST apis.
type Interface interface {
GetRateLimiter() flowcontrol.RateLimiter
Verb(verb string) *Request
Post() *Request
Put() *Request
Patch(pt types.PatchType) *Request
Get() *Request
Delete() *Request
APIVersion() schema.GroupVersion
}

ClientSet

ClientSet在RESTClient基础上添加了Resouce和Version的管理方法,每一个Resouce都是一个客户端,ClientSet就是这些客户端的集合,通过函数来暴露这些Resource和Version,需要注意的是ClienSet只支持K8S内置资源,如果需要对CRD资源进行访问会特别麻烦,还需要通过client-gen重新生成ClientSet。意义不大。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
type Interface interface {
Discovery() discovery.DiscoveryInterface
AdmissionregistrationV1() admissionregistrationv1.AdmissionregistrationV1Interface
AdmissionregistrationV1alpha1() admissionregistrationv1alpha1.AdmissionregistrationV1alpha1Interface
AdmissionregistrationV1beta1() admissionregistrationv1beta1.AdmissionregistrationV1beta1Interface
InternalV1alpha1() internalv1alpha1.InternalV1alpha1Interface
AppsV1() appsv1.AppsV1Interface
AppsV1beta1() appsv1beta1.AppsV1beta1Interface
AppsV1beta2() appsv1beta2.AppsV1beta2Interface
AuthenticationV1() authenticationv1.AuthenticationV1Interface
AuthenticationV1alpha1() authenticationv1alpha1.AuthenticationV1alpha1Interface
AuthenticationV1beta1() authenticationv1beta1.AuthenticationV1beta1Interface
AuthorizationV1() authorizationv1.AuthorizationV1Interface
AuthorizationV1beta1() authorizationv1beta1.AuthorizationV1beta1Interface
AutoscalingV1() autoscalingv1.AutoscalingV1Interface
AutoscalingV2() autoscalingv2.AutoscalingV2Interface
AutoscalingV2beta1() autoscalingv2beta1.AutoscalingV2beta1Interface
AutoscalingV2beta2() autoscalingv2beta2.AutoscalingV2beta2Interface
BatchV1() batchv1.BatchV1Interface
BatchV1beta1() batchv1beta1.BatchV1beta1Interface
CertificatesV1() certificatesv1.CertificatesV1Interface
CertificatesV1beta1() certificatesv1beta1.CertificatesV1beta1Interface
CertificatesV1alpha1() certificatesv1alpha1.CertificatesV1alpha1Interface
CoordinationV1beta1() coordinationv1beta1.CoordinationV1beta1Interface
CoordinationV1() coordinationv1.CoordinationV1Interface
CoreV1() corev1.CoreV1Interface
DiscoveryV1() discoveryv1.DiscoveryV1Interface
DiscoveryV1beta1() discoveryv1beta1.DiscoveryV1beta1Interface
EventsV1() eventsv1.EventsV1Interface
EventsV1beta1() eventsv1beta1.EventsV1beta1Interface
ExtensionsV1beta1() extensionsv1beta1.ExtensionsV1beta1Interface
FlowcontrolV1() flowcontrolv1.FlowcontrolV1Interface
FlowcontrolV1beta1() flowcontrolv1beta1.FlowcontrolV1beta1Interface
FlowcontrolV1beta2() flowcontrolv1beta2.FlowcontrolV1beta2Interface
FlowcontrolV1beta3() flowcontrolv1beta3.FlowcontrolV1beta3Interface
NetworkingV1() networkingv1.NetworkingV1Interface
NetworkingV1alpha1() networkingv1alpha1.NetworkingV1alpha1Interface
NetworkingV1beta1() networkingv1beta1.NetworkingV1beta1Interface
NodeV1() nodev1.NodeV1Interface
NodeV1alpha1() nodev1alpha1.NodeV1alpha1Interface
NodeV1beta1() nodev1beta1.NodeV1beta1Interface
PolicyV1() policyv1.PolicyV1Interface
PolicyV1beta1() policyv1beta1.PolicyV1beta1Interface
RbacV1() rbacv1.RbacV1Interface
RbacV1beta1() rbacv1beta1.RbacV1beta1Interface
RbacV1alpha1() rbacv1alpha1.RbacV1alpha1Interface
ResourceV1alpha2() resourcev1alpha2.ResourceV1alpha2Interface
SchedulingV1alpha1() schedulingv1alpha1.SchedulingV1alpha1Interface
SchedulingV1beta1() schedulingv1beta1.SchedulingV1beta1Interface
SchedulingV1() schedulingv1.SchedulingV1Interface
StorageV1beta1() storagev1beta1.StorageV1beta1Interface
StorageV1() storagev1.StorageV1Interface
StorageV1alpha1() storagev1alpha1.StorageV1alpha1Interface
}

通过接口可以发现都是内置资源。

DynamicClient

DynamicClient是一个加强版ClientSet,它除了内置资源外,还可以对CRD资源进行控制。也就是说可以对所有K8S资源对象进行操作。

之所以DynamicClient能够访问到自定义资源(CRD),是因为它内部实现了Unstructured,用来处理非结构化数据结构,既无法提前预知的数据结构。

DynamicClient使用类似于interface{}断言转换的过程,将所有Resouce转换为Unstructured结构类型,所以它并不是类型安全的,在访问CRD自定义资源的时候就需要注意,以免出现像操作指针出现问题导致程序崩溃。

DiscoveryClient

用于发现kube-apiserver所支持的资源组、资源版本、资源信息(即Group、Versions、Resources)。主要用于发现K8S API Server所支持的资源组,资源版本,资源信息等等。类似于kubectl api-resource的效果。DiscoveryClient将获取到的资源同步到本地缓存中,每过10分钟和API-Server进行同步更新,因为这些资源变化不大,所以10分钟是一个可以接收的氛围。

image-20240115110928458

KubeConfig

kubeconfig是用来管理访问api-server的配置信息,同时也支持访问多kube-apiserver的配置管理,支持不同环境下管理不同kube-apiserver集群配置。kubeconfig中存储了集群、用户、命名空间和身份验证等信息,一个kubeconfig配置文件组成部分如下:

  • cluster:表示集群信息,比如说kube-apiserver服务地址以及集群证书信息等等。
  • users: 定义访问集群用户的客户端凭据,例如client-certificate、client-key、token及username/password等。
  • contexts: 定义Kubernetes集群用户信息和命名空间等,用于将请求发送到指定的集群,也就是指定namespace或者全部namespace的访问权限。

Informer机制

K8S组件之间使用的是HTTP协议进行通信,通过Informer机制来保证消息在各个组件之间的实时性,可靠性,顺序性等等。

img

informer组件

  • Reflector
  • DeltaFIFO
  • Indexer

Reflector

Reflector用来监听(watch)资源变化,如果监听到自愿发生变化之后就会触发对应事件,比如Added,Updated,Deleted等等,之后将资源对象存放在本地缓存DeltaFIFO中。

image-20240115112429687

DeltaFIFO

DeltaFIFO是一个本地缓存队列,具有基本队列操作方法,比如Add、Update、Delete、List、Pop、Close等等,同时保存资源对象操作类型,比如Added(添加)操作类型、Updated(更新)操作类型、Deleted(删除)操作类型、Sync(同步)操作类型等。

Indexer

Indexer也是一个本地缓存,它是一个自带索引功能的本地缓存。它会从DeltaFIFO中去消费,并且它的数据需要和etcd数据保持完全一致,这样就可以减轻API-Server和etcd的负担,client-go只需要从indexer中消费即可。

image-20240115143109998

从DelataFIFO队列中消费之后,会推送到Workqueue或者其他队列中。

资源informer

每个资源都已经实现了informer机制,每一个informer上都实现了infomer和lister方法:

1
2
3
4
type PodInformer interface {
Informer() cache.SharedIndexInformer
Lister() cache.GenericLister
}

Shared Informer机制

通过client-go去使用informer的话,如果是同一个资源就会被实例化多次,并且每个informer都使用一个Reflector会造成太多一样的ListAndWatch,会导致过多的序列化和反序列化操作,导致API-Server负载过高。

Shared Informer可以使同一类资源Informer共享一个Reflector,这样可以节约很多资源。通过map数据结构实现共享的Informer机制。

1
2
3
4
5
6
7
type SharedIndexInformerOptions struct {
ResyncPeriod time.Duration
Indexers Indexers
ObjectDescription string
}

type Indexers map[string]IndexFunc

ListerWatcher机制

ListerWatcher主要是Reflector需要, List表示K8S资源需要定期去获取最新状态, 而watch则是对应监控资源变化的, 只要实现了List和Watch方法的对象就可以成为ListerWatcher。Watch通过HTTP协议和API Server建立长连接。

ThreadSafeMap

ThreadSafeMap是一个并发安全的存储,具有增、删、改、查操作方法;是一个内存存储,并不会写入到本地磁盘中,每次的增、删、改、查操作都会加锁,以保证数据的一致性。

WorkQueue

workQueue和普通队列相比较,实现会复杂一点,主要功能在于标记和去重,并且支持以下特性:

  1. 有序: 按照添加顺序处理元素。
  2. 去重: 相同元素在同一时间不会被重复处理。
  3. 并发性: 多生产者和多消费者。
  4. 标记机制: 标记一个元素是否被处理,也允许元素在处理时重新排队。
  5. 通知机制: ShutDown方法通过信号量通知队列不再接收新的元素,并通知metric goroutine退出。
  6. 延迟: 支持延迟队列,延迟一段时间后再将元素存入队列。
  7. 限速: 元素存入队列时进行速率限制。限制一个元素被重新排队(Reenqueued)的次数。
  8. Metric: 用于Prometheus监控。

workQueue支持三种队列,并且提供三种接口,来面对不同使用场景:

  1. Interface: FIFO队列接口,先进先出队列,并支持去重机制。
  2. DelayingInterface: 基于Interface接口封装,实现延迟功能。
  3. RateLimitingInterface: 基于DelayingInterface接口封装,支持元素存入队列时进行速率限制。

限速算法

  • 令牌桶算法
  • 排队指数算法
  • 计数器算法
  • 混合模式

实战-获取event

K8S event记录中集群上各种事件,这些事件可以很好的帮助运维和开发人员了解到,但是event只保留最近2个小时的,如果集群重启了或者这些事件丢失了,就不好复盘原因。所以我们可以简单地用一个event informer来获取event,然后做一些持久化存储。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package main

import (
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"time"
)

func main() {
config, err := clientcmd.BuildConfigFromFlags("", "")
if err != nil {
panic(err)
}
client, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err)
}
stopCh := make(chan struct{})
defer close(stopCh)

sharedInformer := informers.NewSharedInformerFactory(client, time.Minute)
informer := sharedInformer.Core().V1().Events().Informer()
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
//事件被创建
},
UpdateFunc: func(oldObj, newObj interface{}) {
//事件被更新
},
DeleteFunc: func(obj interface{}) {
//事件被删除
},
})
informer.Run(stopCh)
}

这是一个大概模板,首先我们获取集群的kubeconfig,之后去创建clientSet,informer通过clientSet来和api-Server通信,因为Informer是一个持久运行的goroutine。所以需要使用channel来通知它提前退出。

NewSharedInformerFactory函数实例化SharedInformer对象,其中两个参数分别是clientSet和resync,resync是用来周期性执行List操作,将所有的资源存放在Informer Store中,如果该参数为0,则禁用resync功能。

代码中事件对象,AddFunc,UpdateFunc,DeleteFunc都是回调方法,也对应方法名。


client-go组件使用与原理
http://example.com/2024/01/12/client-go组件使用与原理/
Author
John Doe
Posted on
January 12, 2024
Licensed under