背景 之前大部分时间都在用client-go进行开发,也用过informer做一些有意思的事情,但是很多细节方面的东西没有注意,这次算是对client-go原理进行一个整理。
架构设计 client-go类型 client-go支持四种类型客户端对象来和api server进行交互:
RESTClient
ClientSet
DynamicClient
DiscoveryClient
其他三种都是基于RESTClient实现,所以RESTClient可以算是他们的父类。
RESTClient RESTClient是最基础客户端,对HTTP Request进行封装,实现RESTful API。
1 2 3 4 5 6 7 8 9 10 11 type Interface interface { GetRateLimiter() flowcontrol.RateLimiter Verb(verb string ) *Request Post() *Request Put() *Request Patch(pt types.PatchType) *Request Get() *Request Delete() *Request APIVersion() schema.GroupVersion }
ClientSet ClientSet在RESTClient基础上添加了Resouce和Version的管理方法,每一个Resouce都是一个客户端,ClientSet就是这些客户端的集合,通过函数来暴露这些Resource和Version,需要注意的是ClienSet只支持K8S内置资源,如果需要对CRD资源进行访问会特别麻烦,还需要通过client-gen重新生成ClientSet。意义不大。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 type Interface interface { Discovery() discovery.DiscoveryInterface AdmissionregistrationV1() admissionregistrationv1.AdmissionregistrationV1Interface AdmissionregistrationV1alpha1() admissionregistrationv1alpha1.AdmissionregistrationV1alpha1Interface AdmissionregistrationV1beta1() admissionregistrationv1beta1.AdmissionregistrationV1beta1Interface InternalV1alpha1() internalv1alpha1.InternalV1alpha1Interface AppsV1() appsv1.AppsV1Interface AppsV1beta1() appsv1beta1.AppsV1beta1Interface AppsV1beta2() appsv1beta2.AppsV1beta2Interface AuthenticationV1() authenticationv1.AuthenticationV1Interface AuthenticationV1alpha1() authenticationv1alpha1.AuthenticationV1alpha1Interface AuthenticationV1beta1() authenticationv1beta1.AuthenticationV1beta1Interface AuthorizationV1() authorizationv1.AuthorizationV1Interface AuthorizationV1beta1() authorizationv1beta1.AuthorizationV1beta1Interface AutoscalingV1() autoscalingv1.AutoscalingV1Interface AutoscalingV2() autoscalingv2.AutoscalingV2Interface AutoscalingV2beta1() autoscalingv2beta1.AutoscalingV2beta1Interface AutoscalingV2beta2() autoscalingv2beta2.AutoscalingV2beta2Interface BatchV1() batchv1.BatchV1Interface BatchV1beta1() batchv1beta1.BatchV1beta1Interface CertificatesV1() certificatesv1.CertificatesV1Interface CertificatesV1beta1() certificatesv1beta1.CertificatesV1beta1Interface CertificatesV1alpha1() certificatesv1alpha1.CertificatesV1alpha1Interface CoordinationV1beta1() coordinationv1beta1.CoordinationV1beta1Interface CoordinationV1() coordinationv1.CoordinationV1Interface CoreV1() corev1.CoreV1Interface DiscoveryV1() discoveryv1.DiscoveryV1Interface DiscoveryV1beta1() discoveryv1beta1.DiscoveryV1beta1Interface EventsV1() eventsv1.EventsV1Interface EventsV1beta1() eventsv1beta1.EventsV1beta1Interface ExtensionsV1beta1() extensionsv1beta1.ExtensionsV1beta1Interface FlowcontrolV1() flowcontrolv1.FlowcontrolV1Interface FlowcontrolV1beta1() flowcontrolv1beta1.FlowcontrolV1beta1Interface FlowcontrolV1beta2() flowcontrolv1beta2.FlowcontrolV1beta2Interface FlowcontrolV1beta3() flowcontrolv1beta3.FlowcontrolV1beta3Interface NetworkingV1() networkingv1.NetworkingV1Interface NetworkingV1alpha1() networkingv1alpha1.NetworkingV1alpha1Interface NetworkingV1beta1() networkingv1beta1.NetworkingV1beta1Interface NodeV1() nodev1.NodeV1Interface NodeV1alpha1() nodev1alpha1.NodeV1alpha1Interface NodeV1beta1() nodev1beta1.NodeV1beta1Interface PolicyV1() policyv1.PolicyV1Interface PolicyV1beta1() policyv1beta1.PolicyV1beta1Interface RbacV1() rbacv1.RbacV1Interface RbacV1beta1() rbacv1beta1.RbacV1beta1Interface RbacV1alpha1() rbacv1alpha1.RbacV1alpha1Interface ResourceV1alpha2() resourcev1alpha2.ResourceV1alpha2Interface SchedulingV1alpha1() schedulingv1alpha1.SchedulingV1alpha1Interface SchedulingV1beta1() schedulingv1beta1.SchedulingV1beta1Interface SchedulingV1() schedulingv1.SchedulingV1Interface StorageV1beta1() storagev1beta1.StorageV1beta1Interface StorageV1() storagev1.StorageV1Interface StorageV1alpha1() storagev1alpha1.StorageV1alpha1Interface }
通过接口可以发现都是内置资源。
DynamicClient DynamicClient是一个加强版ClientSet,它除了内置资源外,还可以对CRD资源进行控制。也就是说可以对所有K8S资源对象进行操作。
之所以DynamicClient能够访问到自定义资源(CRD),是因为它内部实现了Unstructured,用来处理非结构化数据结构,既无法提前预知的数据结构。
DynamicClient使用类似于interface{}断言转换的过程,将所有Resouce转换为Unstructured结构类型,所以它并不是类型安全的,在访问CRD自定义资源的时候就需要注意,以免出现像操作指针出现问题导致程序崩溃。
DiscoveryClient 用于发现kube-apiserver所支持的资源组、资源版本、资源信息(即Group、Versions、Resources)。主要用于发现K8S API Server所支持的资源组,资源版本,资源信息等等。类似于kubectl api-resource的效果。DiscoveryClient将获取到的资源同步到本地缓存中,每过10分钟和API-Server进行同步更新,因为这些资源变化不大,所以10分钟是一个可以接收的氛围。
KubeConfig kubeconfig是用来管理访问api-server的配置信息,同时也支持访问多kube-apiserver的配置管理,支持不同环境下管理不同kube-apiserver集群配置。kubeconfig中存储了集群、用户、命名空间和身份验证等信息,一个kubeconfig配置文件组成部分如下:
cluster:表示集群信息,比如说kube-apiserver服务地址以及集群证书信息等等。
users: 定义访问集群用户的客户端凭据,例如client-certificate、client-key、token及username/password等。
contexts: 定义Kubernetes集群用户信息和命名空间等,用于将请求发送到指定的集群,也就是指定namespace或者全部namespace的访问权限。
K8S组件之间使用的是HTTP协议进行通信,通过Informer机制来保证消息在各个组件之间的实时性,可靠性,顺序性等等。
Reflector
DeltaFIFO
Indexer
Reflector Reflector用来监听(watch)资源变化,如果监听到自愿发生变化之后就会触发对应事件,比如Added,Updated,Deleted等等,之后将资源对象存放在本地缓存DeltaFIFO中。
DeltaFIFO DeltaFIFO是一个本地缓存队列,具有基本队列操作方法,比如Add、Update、Delete、List、Pop、Close等等,同时保存资源对象操作类型,比如Added(添加)操作类型、Updated(更新)操作类型、Deleted(删除)操作类型、Sync(同步)操作类型等。
Indexer Indexer也是一个本地缓存,它是一个自带索引功能的本地缓存。它会从DeltaFIFO中去消费,并且它的数据需要和etcd数据保持完全一致,这样就可以减轻API-Server和etcd的负担,client-go只需要从indexer中消费即可。
从DelataFIFO队列中消费之后,会推送到Workqueue或者其他队列中。
每个资源都已经实现了informer机制,每一个informer上都实现了infomer和lister方法:
1 2 3 4 type PodInformer interface { Informer() cache.SharedIndexInformer Lister() cache.GenericLister }
通过client-go去使用informer的话,如果是同一个资源就会被实例化多次,并且每个informer都使用一个Reflector会造成太多一样的ListAndWatch,会导致过多的序列化和反序列化操作,导致API-Server负载过高。
Shared Informer可以使同一类资源Informer共享一个Reflector,这样可以节约很多资源。通过map数据结构实现共享的Informer机制。
1 2 3 4 5 6 7 type SharedIndexInformerOptions struct { ResyncPeriod time.Duration Indexers Indexers ObjectDescription string }type Indexers map [string ]IndexFunc
ListerWatcher机制 ListerWatcher主要是Reflector需要, List表示K8S资源需要定期去获取最新状态, 而watch则是对应监控资源变化的, 只要实现了List和Watch方法的对象就可以成为ListerWatcher。Watch通过HTTP协议和API Server建立长连接。
ThreadSafeMap ThreadSafeMap是一个并发安全的存储,具有增、删、改、查操作方法;是一个内存存储,并不会写入到本地磁盘中,每次的增、删、改、查操作都会加锁,以保证数据的一致性。
WorkQueue workQueue和普通队列相比较,实现会复杂一点,主要功能在于标记和去重,并且支持以下特性:
有序: 按照添加顺序处理元素。
去重: 相同元素在同一时间不会被重复处理。
并发性: 多生产者和多消费者。
标记机制: 标记一个元素是否被处理,也允许元素在处理时重新排队。
通知机制: ShutDown方法通过信号量通知队列不再接收新的元素,并通知metric goroutine退出。
延迟: 支持延迟队列,延迟一段时间后再将元素存入队列。
限速: 元素存入队列时进行速率限制。限制一个元素被重新排队(Reenqueued)的次数。
Metric: 用于Prometheus监控。
workQueue支持三种队列,并且提供三种接口,来面对不同使用场景:
Interface: FIFO队列接口,先进先出队列,并支持去重机制。
DelayingInterface: 基于Interface接口封装,实现延迟功能。
RateLimitingInterface: 基于DelayingInterface接口封装,支持元素存入队列时进行速率限制。
限速算法
实战-获取event K8S event记录中集群上各种事件,这些事件可以很好的帮助运维和开发人员了解到,但是event只保留最近2个小时的,如果集群重启了或者这些事件丢失了,就不好复盘原因。所以我们可以简单地用一个event informer来获取event,然后做一些持久化存储。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 package mainimport ( "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/clientcmd" "time" )func main () { config, err := clientcmd.BuildConfigFromFlags("" , "" ) if err != nil { panic (err) } client, err := kubernetes.NewForConfig(config) if err != nil { panic (err) } stopCh := make (chan struct {}) defer close (stopCh) sharedInformer := informers.NewSharedInformerFactory(client, time.Minute) informer := sharedInformer.Core().V1().Events().Informer() informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func (obj interface {}) { }, UpdateFunc: func (oldObj, newObj interface {}) { }, DeleteFunc: func (obj interface {}) { }, }) informer.Run(stopCh) }
这是一个大概模板,首先我们获取集群的kubeconfig,之后去创建clientSet,informer通过clientSet来和api-Server通信,因为Informer是一个持久运行的goroutine。所以需要使用channel来通知它提前退出。
NewSharedInformerFactory函数实例化SharedInformer对象,其中两个参数分别是clientSet和resync,resync是用来周期性执行List操作,将所有的资源存放在Informer Store中,如果该参数为0,则禁用resync功能。
代码中事件对象,AddFunc,UpdateFunc,DeleteFunc都是回调方法,也对应方法名。