目录
介绍
我们可以使用code-generator 以及controller-tools来进行代码自动生成,通过代码自动生成可以帮我们自动生成 CRD 资源对象,以及客户端访问的 ClientSet、Informer、Lister 等工具包,接下来我们就来了解下如何编写一个自定义的控制器。
CRD定义
首先初始化项目:
$ mkdir operator-crd && cd operator-crd $ go mod init operator-crd $ mkdir -p pkg/apis/example.com/v1
在该文件夹下新建doc.go
文件,内容如下所示:
// +k8s:deepcopy-gen=package // +groupName=example.com package v1
根据 CRD 的规范定义,这里我们定义的 group 为example.com
,版本为v1
,在顶部添加了一个代码自动生成的deepcopy-gen
的 tag,为整个包中的类型生成深拷贝方法。
然后就是非常重要的资源对象的结构体定义,新建types.go
文件,types.go内容可以使用type-scaffpld
自动生成,具体文件内容如下:
package v1 import metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" // BarSpec defines the desired state of Bar type BarSpec struct { // INSERT ADDITIONAL SPEC FIELDS -- desired state of cluster DeploymentName string `json:"deploymentName"` Image string `json:"image"` Replicas *int32 `json:"replicas"` } // BarStatus defines the observed state of Bar. // It should always be reconstructable from the state of the cluster and/or outside world. type BarStatus struct { // INSERT ADDITIONAL STATUS FIELDS -- observed state of cluster } // 下面这个一定不能少,少了的话不能生成 lister 和 informer // +genclient // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object // Bar is the Schema for the bars API // +k8s:openapi-gen=true type Bar struct { metav1.TypeMeta `json:",inline"` metav1.ObjectMeta `json:"metadata,omitempty"` Spec BarSpec `json:"spec,omitempty"` Status BarStatus `json:"status,omitempty"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object // BarList contains a list of Bar type BarList struct { metav1.TypeMeta `json:",inline"` metav1.ListMeta `json:"metadata,omitempty"` Items []Bar `json:"items"` }
然后可以参考系统内置的资源对象,还需要提供 AddToScheme 与 Resource 两个变量供 client 注册,新建 register.go 文件,内容如下所示:
package v1 import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" ) // SchemeGroupVersion 注册自己的自定义资源 var SchemeGroupVersion = schema.GroupVersion{Group: "example.com", Version: "v1"} // Kind takes an unqualified kind and returns back a Group qualified GroupKind func Kind(kind string) schema.GroupKind { return SchemeGroupVersion.WithKind(kind).GroupKind() } // Resource takes an unqualified resource and returns a Group qualified GroupResource func Resource(resource string) schema.GroupResource { return SchemeGroupVersion.WithResource(resource).GroupResource() } var ( // SchemeBuilder initializes a scheme builder SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes) // AddToScheme is a global function that registers this API group & version to a scheme AddToScheme = SchemeBuilder.AddToScheme ) // Adds the list of known types to Scheme. func addKnownTypes(scheme *runtime.Scheme) error { // 添加 Bar 与 BarList这两个资源到 scheme scheme.AddKnownTypes(SchemeGroupVersion, &Bar{}, &BarList{}, ) metav1.AddToGroupVersion(scheme, SchemeGroupVersion) return nil }
使用controller-gen
生成crd:
$ controller-gen crd paths=./... output:crd:dir=crd
生成example.com_bars.yaml文件如下所示:
--- apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: controller-gen.kubebuilder.io/version: (devel) creationTimestamp: null name: bars.example.com spec: group: example.com names: kind: Bar listKind: BarList plural: bars singular: bar scope: Namespaced versions: - name: v1 schema: openAPIV3Schema: description: Bar is the Schema for the bars API properties: apiVersion: description: 'APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources' type: string kind: description: 'Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' type: string metadata: type: object spec: description: BarSpec defines the desired state of Bar properties: deploymentName: description: INSERT ADDITIONAL SPEC FIELDS -- desired state of cluster type: string image: type: string replicas: format: int32 type: integer required: - deploymentName - image - replicas type: object status: description: BarStatus defines the observed state of Bar. It should always be reconstructable from the state of the cluster and/or outside world. type: object type: object served: true storage: true
最终项目结构如下所示:
$ tree . ├── crd │ └── example.com_bars.yaml ├── go.mod ├── go.sum └── pkg └── apis └── example.com └── v1 ├── doc.go ├── register.go └── types.go 5 directories, 6 files
生成客户端相关代码
上面我们准备好资源的 API 资源类型后,就可以使用开始生成 CRD 资源的客户端使用的相关代码了。
首先创建生成代码的脚本,下面这些脚本均来源于sample-controller提供的示例:
$ mkdir hack && cd hack
在该目录下面新建 tools.go 文件,添加 code-generator 依赖,因为在没有代码使用 code-generator 时,go module 默认不会为我们依赖此包。文件内容如下所示:
// +build tools // 建立 tools.go 来依赖 code-generator // 因为在没有代码使用 code-generator 时,go module 默认不会为我们依赖此包. package tools import _ "k8s.io/code-generator"
然后新建 update-codegen.sh 脚本,用来配置代码生成的脚本:
#!/usr/bin/env bash set -o errexit set -o nounset set -o pipefail SCRIPT_ROOT=$(dirname "${BASH_SOURCE[0]}")/.. CODEGEN_PKG=${CODEGEN_PKG:-$(cd "${SCRIPT_ROOT}"; ls -d -1 ./vendor/k8s.io/code-generator 2>/dev/null || echo ../code-generator)} bash "${CODEGEN_PKG}"/generate-groups.sh "deepcopy,client,informer,lister" \ operator-crd/pkg/client operator-crd/pkg/apis example.com:v1 \ --output-base "${SCRIPT_ROOT}"/../ \ --go-header-file "${SCRIPT_ROOT}"/hack/boilerplate.go.txt # To use your own boilerplate text append: # --go-header-file "${SCRIPT_ROOT}"/hack/custom-boilerplate.go.txt
同样还有 verify-codegen.sh 脚本,用来校验生成的代码是否是最新的:
#!/usr/bin/env bash set -o errexit set -o nounset set -o pipefail SCRIPT_ROOT=$(dirname "${BASH_SOURCE[0]}")/.. DIFFROOT="${SCRIPT_ROOT}/pkg" TMP_DIFFROOT="${SCRIPT_ROOT}/_tmp/pkg" _tmp="${SCRIPT_ROOT}/_tmp" cleanup() { rm -rf "${_tmp}" } trap "cleanup" EXIT SIGINT cleanup mkdir -p "${TMP_DIFFROOT}" cp -a "${DIFFROOT}"/* "${TMP_DIFFROOT}" "${SCRIPT_ROOT}/hack/update-codegen.sh" echo "diffing ${DIFFROOT} against freshly generated codegen" ret=0 diff -Naupr "${DIFFROOT}" "${TMP_DIFFROOT}" || ret=$? cp -a "${TMP_DIFFROOT}"/* "${DIFFROOT}" if [[ $ret -eq 0 ]] then echo "${DIFFROOT} up to date." else echo "${DIFFROOT} is out of date. Please run hack/update-codegen.sh" exit 1 fi
还有一个为生成的代码文件添加头部内容的 boilerplate.go.txt 文件,内容如下所示,其实就是为每个生成的代码文件头部添加上下面的开源协议声明:
/* Copyright The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */
接下来我们就可以来执行代码生成的脚本了,首先将依赖包放置到 vendor 目录中去:
$ go mod vendor
然后执行脚本生成代码:
$ chmod +x ./hack/update-codegen.sh $./hack/update-codegen.sh Generating deepcopy funcs Generating clientset for example.com:v1 at operator-crd/pkg/client/clientset Generating listers for example.com:v1 at operator-crd/pkg/client/listers Generating informers for example.com:v1 at operator-crd/pkg/client/informers
代码生成后,整个项目的 pkg 包变成了下面的样子:
$ tree pkg pkg ├── apis │ └── example.com │ └── v1 │ ├── doc.go │ ├── register.go │ ├── types.go │ └── zz_generated.deepcopy.go └── client ├── clientset │ └── versioned │ ├── clientset.go │ ├── doc.go │ ├── fake │ │ ├── clientset_generated.go │ │ ├── doc.go │ │ └── register.go │ ├── scheme │ │ ├── doc.go │ │ └── register.go │ └── typed │ └── example.com │ └── v1 │ ├── bar.go │ ├── doc.go │ ├── example.com_client.go │ ├── fake │ │ ├── doc.go │ │ ├── fake_bar.go │ │ └── fake_example.com_client.go │ └── generated_expansion.go ├── informers │ └── externalversions │ ├── example.com │ │ ├── interface.go │ │ └── v1 │ │ ├── bar.go │ │ └── interface.go │ ├── factory.go │ ├── generic.go │ └── internalinterfaces │ └── factory_interfaces.go └── listers └── example.com └── v1 ├── bar.go └── expansion_generated.go 20 directories, 26 files
仔细观察可以发现pkg/apis/example.com/v1目录下面多了一个zz_generated.deepcopy.go文件,在pkg/client文件夹下生成了 clientset和 informers 和 listers 三个目录,有了这几个自动生成的客户端相关操作包,我们就可以去访问 CRD 资源了,可以和使用内置的资源对象一样去对 Bar 进行 List 和 Watch 操作了。
编写控制器
首先要先获取访问资源对象的 ClientSet,在项目根目录下面新建 main.go 文件。
package main import ( "k8s.io/client-go/tools/clientcmd" "k8s.io/klog/v2" clientset "operator-crd/pkg/client/clientset/versioned" "operator-crd/pkg/client/informers/externalversions" "time" "os" "os/signal" "syscall" ) var ( onlyOneSignalHandler = make(chan struct{}) shutdownSignals = []os.Signal{os.Interrupt, syscall.SIGTERM} ) // 注册 SIGTERM 和 SIGINT 信号 // 返回一个 stop channel, 该通道在捕获到第一个信号时被关闭 // 如果捕获到第二个信号,程序直接退出 func setupSignalHandler() (stopCh <-chan struct{}) { // 当调用两次的时候 panics close(onlyOneSignalHandler) stop := make(chan struct{}) c := make(chan os.Signal, 2) // Notify 函数让 signal 包将输入信号转发到c // 如果没有列出要传递的信号,会将所有输入信号传递到 c; 否则只会传递列出的输入信号 signal.Notify(c, shutdownSignals...) go func() { <-c close(stop) <-c os.Exit(1) // 第二个信号直接退出 }() return stop } func main() { stopCh := setupSignalHandler() // 获取config config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile) if err != nil { klog.Fatalln(err) } // 通过config构建clientSet // 这里的clientSet 是 Bar 的 clientSet, err := clientset.NewForConfig(config) if err != nil { klog.Fatalln(err) } // informerFactory 工厂类, 这里注入我们通过代码生成的 client // client 主要用于和 API Server 进行通信,实现 ListAndWatch factory := externalversions.NewSharedInformerFactory(clientSet, time.Second*30) // 实例化自定义控制器 controller := NewController(factory.Example().V1().Bars()) // 启动 informer,开始list 和 watch go factory.Start(stopCh) // 启动控制器 if err = controller.Run(2, stopCh); err != nil { klog.Fatalf("Error running controller: %s", err.Error()) } }
首先初始化一个用于访问 Bar 资源的 ClientSet 对象,然后同样新建一个 Bar 的 InformerFactory 实例,通过这个工厂实例可以去启动 Informer 开始对 Bar 的 List 和 Watch 操作,然后同样我们要自己去封装一个自定义的控制器,在这个控制器里面去实现一个控制循环,不断对 Bar 的状态进行调谐。
在项目根目录下新建controller.go文件,内容如下所示:
package main import ( "fmt" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" v1 "operator-crd/pkg/apis/example.com/v1" "time" informers "operator-crd/pkg/client/informers/externalversions/example.com/v1" ) type Controller struct { informer informers.BarInformer workqueue workqueue.RateLimitingInterface } func NewController(informer informers.BarInformer) *Controller { controller := &Controller{ informer: informer, // WorkQueue 的实现,负责同步 Informer 和控制循环之间的数据 workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "bar"), } klog.Info("Setting up Bar event handlers") // informer 注册了三个 Handler(AddFunc、UpdateFunc 和 DeleteFunc) // 分别对应 API 对象的“添加”“更新”和“删除”事件。 // 而具体的处理操作,都是将该事件对应的 API 对象加入到工作队列中 informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: controller.addBar, UpdateFunc: controller.updateBar, DeleteFunc: controller.deleteBar, }) return controller } func (c *Controller) Run(thread int, stopCh <-chan struct{}) error { defer runtime.HandleCrash() defer c.workqueue.ShuttingDown() // 记录开始日志 klog.Info("Starting Bar control loop") klog.Info("Waiting for informer caches to sync") // 等待缓存同步数据 if ok := cache.WaitForCacheSync(stopCh, c.informer.Informer().HasSynced); !ok { return fmt.Errorf("failed to wati for caches to sync") } klog.Info("Starting workers") for i := 0; i < thread; i++ { go wait.Until(c.runWorker, time.Second, stopCh) } klog.Info("Started workers") <-stopCh klog.Info("Shutting down workers") return nil } // runWorker 是一个不断运行的方法,并且一直会调用 c.processNextWorkItem 从 workqueue读取消息 func (c *Controller) runWorker() { for c.processNExtWorkItem() { } } // 从workqueue读取和读取消息 func (c *Controller) processNExtWorkItem() bool { // 获取 item item, shutdown := c.workqueue.Get() if shutdown { return false } if err := func(item interface{}) error { // 标记以及处理 defer c.workqueue.Done(item) var key string var ok bool if key, ok = item.(string); !ok { // 判读key的类型不是字符串,则直接丢弃 c.workqueue.Forget(item) runtime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", item)) return nil } if err := c.syncHandler(key); err != nil { return fmt.Errorf("error syncing '%s':%s", item, err.Error()) } c.workqueue.Forget(item) return nil }(item); err != nil { runtime.HandleError(err) return false } return true } // 尝试从 Informer 维护的缓存中拿到了它所对应的 Bar 对象 func (c *Controller) syncHandler(key string) error { namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { runtime.HandleError(fmt.Errorf("invalid respirce key:%s", key)) return err } bar, err := c.informer.Lister().Bars(namespace).Get(name) if err != nil { if errors.IsNotFound(err) { // 说明是在删除事件中添加进来的 return nil } runtime.HandleError(fmt.Errorf("failed to get bar by: %s/%s", namespace, name)) return err } fmt.Printf("[BarCRD] try to process bar:%#v ...", bar) // 可以根据bar来做其他的事。 // todo return nil } func (c *Controller) addBar(item interface{}) { var key string var err error if key, err = cache.MetaNamespaceKeyFunc(item); err != nil { runtime.HandleError(err) return } c.workqueue.AddRateLimited(key) } func (c *Controller) deleteBar(item interface{}) { var key string var err error if key, err = cache.DeletionHandlingMetaNamespaceKeyFunc(item); err != nil { runtime.HandleError(err) return } fmt.Println("delete crd") c.workqueue.AddRateLimited(key) } func (c *Controller) updateBar(old, new interface{}) { oldItem := old.(*v1.Bar) newItem := new.(*v1.Bar) // 比较两个资源版本,如果相同,则不处理 if oldItem.ResourceVersion == newItem.ResourceVersion { return } c.workqueue.AddRateLimited(new) }
我们这里自定义的控制器只封装了一个 Informer 和一个限速队列,我们当然也可以在里面添加一个用于访问本地缓存的 Indexer,但实际上 Informer 中已经包含了 Lister,对于 List 和 Get 操作都会去通过 Indexer 从本地缓存中获取数据,所以只用一个 Informer 也是完全可行的。
同样在 Informer 中注册了3个事件处理器,将监听的事件获取到后送入 workqueue 队列,然后通过控制器的控制循环不断从队列中消费数据,根据获取的 key 来获取数据判断对象是需要删除还是需要进行其他业务处理,这里我们同样也只是打印出了对应的操作日志,对于实际的项目则进行相应的业务逻辑处理即可。
到这里一个完整的自定义 API 对象和它所对应的自定义控制器就编写完毕了。
测试
接下来我们直接运行我们的main函数:
I0512 16:51:33.922138 39032 controller.go:29] Setting up Bar event handlers I0512 16:51:33.922255 39032 controller.go:47] Starting Bar control loop I0512 16:51:33.922258 39032 controller.go:48] Waiting for informer caches to sync I0512 16:51:34.023108 39032 controller.go:55] Starting workers I0512 16:51:34.023153 39032 controller.go:60] Started workers
现在我们创建一个Bar资源对象:
# bar.yaml apiVersion: example.com/v1 kind: Bar metadata: name: bar-demo namespace: default spec: image: "nginx:1.17.1" deploymentName: example-bar replicas: 2
直接创建上面的对象,注意观察控制器的日志:
I0512 16:51:33.922138 39032 controller.go:29] Setting up Bar event handlers I0512 16:51:33.922255 39032 controller.go:47] Starting Bar control loop I0512 16:51:33.922258 39032 controller.go:48] Waiting for informer caches to sync I0512 16:51:34.023108 39032 controller.go:55] Starting workers I0512 16:51:34.023153 39032 controller.go:60] Started workers [BarCRD] try to process bar:"bar-demo" ...
可以看到,我们上面创建 bar.yaml 的操作,触发了 EventHandler 的添加事件,从而被放进了工作队列。然后控制器的控制循环从队列里拿到这个对象,并且打印出了正在处理这个 bar 对象的日志信息。
同样我们删除这个资源的时候,也会有对应的提示。
这就是开发自定义 CRD 控制器的基本流程,当然我们还可以在事件处理的业务逻辑中去记录一些 Events 信息,这样我们就可以通过 Event 去了解我们资源的状态了,更多关于CRD生成自定义控制器的资料请关注其它相关文章!