实现k8s自定义controller

发布时间 2023-12-11 08:51:21作者: 王景迁

创建crd

apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
  name: fruits.crd.io
spec:
  group: crd.io
  versions:
    - name: v1
      served: true
      storage: true
      schema:
        openAPIV3Schema:
          type: object
          properties:
            spec:
              type: object
              properties:
                location:
                  type: string
  scope: Cluster
  names:
    plural: fruits
    singular: fruit
    kind: Fruit
k create -f crd-fruits.yaml

编写controller

cd $GOPATH/src
mkdir crd_controller
cd crd_controller

go mod init crd_controller
# 创建go.mod
module crd_controller

go 1.19

require (
	k8s.io/api v0.28.2
	k8s.io/apimachinery v0.28.2
	k8s.io/client-go v0.28.2
	k8s.io/klog/v2 v2.100.1
	sigs.k8s.io/structured-merge-diff/v4 v4.3.0
)

require (
	github.com/davecgh/go-spew v1.1.1 // indirect
	github.com/emicklei/go-restful/v3 v3.9.0 // indirect
	github.com/evanphx/json-patch v4.12.0+incompatible // indirect
	github.com/go-logr/logr v1.2.4 // indirect
	github.com/go-openapi/jsonpointer v0.19.6 // indirect
	github.com/go-openapi/jsonreference v0.20.2 // indirect
	github.com/go-openapi/swag v0.22.3 // indirect
	github.com/gogo/protobuf v1.3.2 // indirect
	github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
	github.com/golang/protobuf v1.5.3 // indirect
	github.com/google/gnostic-models v0.6.8 // indirect
	github.com/google/go-cmp v0.5.9 // indirect
	github.com/google/gofuzz v1.2.0 // indirect
	github.com/google/uuid v1.3.0 // indirect
	github.com/josharian/intern v1.0.0 // indirect
	github.com/json-iterator/go v1.1.12 // indirect
	github.com/mailru/easyjson v0.7.7 // indirect
	github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
	github.com/modern-go/reflect2 v1.0.2 // indirect
	github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
	github.com/pkg/errors v0.9.1 // indirect
	golang.org/x/net v0.13.0 // indirect
	golang.org/x/oauth2 v0.8.0 // indirect
	golang.org/x/sys v0.10.0 // indirect
	golang.org/x/term v0.10.0 // indirect
	golang.org/x/text v0.11.0 // indirect
	golang.org/x/time v0.3.0 // indirect
	google.golang.org/appengine v1.6.7 // indirect
	google.golang.org/protobuf v1.31.0 // indirect
	gopkg.in/inf.v0 v0.9.1 // indirect
	gopkg.in/yaml.v2 v2.4.0 // indirect
	gopkg.in/yaml.v3 v3.0.1 // indirect
	k8s.io/kube-openapi v0.0.0-20230905202853-d090da108d2f // indirect
	k8s.io/utils v0.0.0-20230726121419-3b25d923346b // indirect
	sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
	sigs.k8s.io/yaml v1.3.0 // indirect
)

// k8s.io/api => k8s.io/api v0.0.0-20230915221828-1cac0b1ef7e3
replace k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20230915221524-64708d3e9048

go mod tidy

mkdir -p pkg/apis/crd
cd pkg/apis/crd

# 创建register.go
package crd

const (
	GroupName = "crd.io"
	Version   = "v1"
)

mkdir v1
cd v1
# 创建doc.go
// +k8s:deepcopy-gen=package

// +groupName=crd.io
package v1

# 创建types.go
package v1

import (
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// +genclient
// +genclient:noStatus
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// +genclient:nonNamespaced

type Fruit struct {
	metav1.TypeMeta   `json:",inline"`
	metav1.ObjectMeta `json:"metadata,omitempty"`
	Spec              StudentSpec `json:"spec"`
}

type FruitSpec struct {
	Location string `json:"location"`
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object

// FruitList is a list of Fruit resources
type FruitList struct {
	metav1.TypeMeta `json:",inline"`
	metav1.ListMeta `json:"metadata"`

	Items []Fruit `json:"items"`
}

# 创建register.go
package v1

import (
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/runtime"
	"k8s.io/apimachinery/pkg/runtime/schema"

	"crd_controller/pkg/apis/crd"
)

var SchemeGroupVersion = schema.GroupVersion{
	Group:   crd.GroupName,
	Version: crd.Version,
}

var (
	SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes)
	AddToScheme   = SchemeBuilder.AddToScheme
)

func Resource(resource string) schema.GroupResource {
	return SchemeGroupVersion.WithResource(resource).GroupResource()
}

func Kind(kind string) schema.GroupKind {
	return SchemeGroupVersion.WithKind(kind).GroupKind()
}

func addKnownTypes(scheme *runtime.Scheme) error {
	scheme.AddKnownTypes(
		SchemeGroupVersion,
		&Fruit{},
		&FruitList{},
	)

	// register the type in the scheme
	metav1.AddToGroupVersion(scheme, SchemeGroupVersion)
	return nil
}

# 自动生成代码
mkdir $GOPATH/src/k8s.io
cd $GOPATH/src/k8s.io
git clone https://github.com/kubernetes/code-generator
cd code-generator
touch boilerplate.go.txt
go env -w GO111MODULE=off
./generate-groups.sh all crd_controller/pkg/client crd_controller/pkg/apis crd:v1 --go-header-file=boilerplate.go.txt

go env -w GO111MODULE=on
# controller list/watch
cd $GOPATH/src/crd_controller/pkg
mkdir controller
cd controller
# 创建fruits.go
package controller

import (
	"fmt"
	"time"

	crdv1 "crd_controller/pkg/apis/crd/v1"
	clientset "crd_controller/pkg/client/clientset/versioned"
	informers "crd_controller/pkg/client/informers/externalversions/crd/v1"
	listers "crd_controller/pkg/client/listers/crd/v1"

	"k8s.io/apimachinery/pkg/api/errors"
	"k8s.io/apimachinery/pkg/util/runtime"
	"k8s.io/apimachinery/pkg/util/wait"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/tools/cache"
	"k8s.io/client-go/util/workqueue"
	klog "k8s.io/klog/v2"
)

type Controller struct {
	kubeclientset  kubernetes.Interface
	fruitclientset clientset.Interface

	fruitsLister listers.FruitLister
	fruitsSynced cache.InformerSynced

	workqueue workqueue.RateLimitingInterface
}

func NewController(
	kubeclientset kubernetes.Interface,
	fruitclientset clientset.Interface,
	fruitInformer informers.FruitInformer) *Controller {
	controller := &Controller{
		kubeclientset:  kubeclientset,
		fruitclientset: fruitclientset,
		fruitsLister:   fruitInformer.Lister(),
		fruitsSynced:   fruitInformer.Informer().HasSynced,
		workqueue:      workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Fruits"),
	}

	fruitInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc: controller.enqueueFruitForAddOrUpdate,
		UpdateFunc: func(old, new interface{}) {
			oldFruit := old.(*crdv1.Fruit)
			newFruit := new.(*crdv1.Fruit)
			if oldFruit.ResourceVersion == newFruit.ResourceVersion {
				return
			}
			controller.enqueueFruitForAddOrUpdate(new)
		},
		DeleteFunc: controller.enqueueFruitForDelete,
	})

	return controller
}

func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
	defer runtime.HandleCrash()
	defer c.workqueue.ShutDown()

	if ok := cache.WaitForCacheSync(stopCh, c.fruitsSynced); !ok {
		return fmt.Errorf("failed to wait for caches to sync")
	}

	for i := 0; i < threadiness; i++ {
		go wait.Until(c.runWorker, time.Second, stopCh)
	}

	<-stopCh
	return nil
}

func (c *Controller) enqueueFruitForAddOrUpdate(obj interface{}) {
	var key string
	var err error
	if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
		klog.Errorf("enqueue add/update fruit error, err is %v", err)
		return
	}
	c.workqueue.AddRateLimited(key)
}

func (c *Controller) enqueueFruitForDelete(obj interface{}) {
	var key string
	var err error
	key, err = cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
	if err != nil {
		klog.Errorf("enqueue delete fruit error, err is %v", err)
		return
	}
	c.workqueue.AddRateLimited(key)
}

func (c *Controller) runWorker() {
	for c.processNextWorkItem() {
	}
}

func (c *Controller) processNextWorkItem() bool {
	obj, shutdown := c.workqueue.Get()

	if shutdown {
		return false
	}

	err := func(obj interface{}) error {
		defer c.workqueue.Done(obj)
		var key string
		var ok bool

		if key, ok = obj.(string); !ok {
			c.workqueue.Forget(obj)
			klog.Errorf("key is not string")
			return nil
		}
		if err := c.syncHandler(key); err != nil {
			return fmt.Errorf("error syncing '%s': %s", key, err.Error())
		}
		c.workqueue.Forget(obj)
		klog.Infof("Successfully synced '%s'", key)
		return nil
	}(obj)

	if err != nil {
		klog.Errorf("err is %v", err)
		return true
	}

	return true
}

func (c *Controller) syncHandler(key string) error {
	_, name, err := cache.SplitMetaNamespaceKey(key)
	if err != nil {
		return nil
	}

	fruit, err := c.fruitsLister.Get(name)
	if err != nil {
		if errors.IsNotFound(err) {
			klog.Infof("handle delete fruit %s", name)
			return nil
		}
		return err
	}

	klog.Infof("sync handle %s/%s for update or delete", name, fruit.Spec.Location)

	return nil
}

cd $GOPATH/src/crd_controller/pkg
# 创建signal.go
package signals

import (
	"os"
	"os/signal"
	"syscall"
)

func SetupSignalHandler() (stopCh <-chan struct{}) {
	stop := make(chan struct{})
	c := make(chan os.Signal, 2)
	signal.Notify(c, []os.Signal{os.Interrupt, syscall.SIGTERM}...)
	go func() {
		<-c
		close(c)
		close(stop)
	}()

	return stop
}

mkdir $GOPATH/src/crd_controller/cmd
cd $GOPATH/src/crd_controller/cmd
# 创建main.go文件
package main

import (
	"crd_controller/pkg"
	clientset "crd_controller/pkg/client/clientset/versioned"
	informers "crd_controller/pkg/client/informers/externalversions"
	"crd_controller/pkg/controller"

	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/tools/clientcmd"
	klog "k8s.io/klog/v2"
)

func main() {
	stopCh := signals.SetupSignalHandler()

	cfg, err := clientcmd.BuildConfigFromFlags("", "/root/.kube/config")
	if err != nil {
		klog.Fatalf("Error building kubeconfig: %s", err.Error())
	}

	kubeClient, err := kubernetes.NewForConfig(cfg)
	if err != nil {
		klog.Fatalf("Error building kubernetes clientset: %s", err.Error())
	}

	fruitClient, err := clientset.NewForConfig(cfg)
	if err != nil {
		klog.Fatalf("Error building example clientset: %s", err.Error())
	}

	fruitInformerFactory := informers.NewSharedInformerFactory(fruitClient, 0)

	controller := controller.NewController(kubeClient, fruitClient,
		fruitInformerFactory.Crd().V1().Fruits())

	go fruitInformerFactory.Start(stopCh)

	if err = controller.Run(2, stopCh); err != nil {
		klog.Fatalf("Error running controller: %s", err.Error())
	}
}

 验证

# cr-fruit.yaml
apiVersion: crd.io/v1
kind: Fruit
metadata:
  name: apple
spec:
  location: "beijing"

创建->修改->删除