$ kubectl apply -f artifacts/examples/example-foo.yaml
foo.samplecontroller.k8s.io/example-foo created
$ kubectl get po,rs,deploy,foo
NAME READY STATUS RESTARTS AGE
pod/example-foo-5b8c9679d8-xjhdf 1/1 Running 0 67s
NAME DESIRED CURRENT READY AGE
replicaset.extensions/example-foo-5b8c9679d8 1 1 1 67s
NAME READY UP-TO-DATE AVAILABLE AGE
deployment.extensions/example-foo 1/1 1 1 67s
NAME AGE
foo.samplecontroller.k8s.io/example-foo 67s
// +genclient
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
const (
PhasePending = "PENDING"
PhaseRunning = "RUNNING"
PhaseDone = "DONE"
)
// AtSpec defines the desired state of At
type AtSpec struct {
// Schedule is the desired time the command is supposed to be executed.
// Note: the format used here is UTC time https://www.utctime.net
Schedule string `json:"schedule,omitempty"`
// Command is the desired command (executed in a Bash shell) to be
// executed.
Command string `json:"command,omitempty"`
}
// AtStatus defines the observed state of At
type AtStatus struct {
// Phase represents the state of the schedule: until the command is
// executed it is PENDING, afterwards it is DONE.
Phase string `json:"phase,omitempty"`
}
if when, err := c.syncHandler(key); err != nil {
c.workqueue.AddRateLimited(key)
return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
} else if when != time.Duration(0) {
c.workqueue.AddAfter(key, when)
} else {
// Finally, if no error occurs we Forget this item so it does not
// get queued again until another change happens.
c.workqueue.Forget(obj)
}
// syncHandler compares the actual state with the desired state and attempts
// to converge the two. It then updates the Status block of the At resource
// with the current status of the resource. It returns how long to wait
// until the schedule is due.
func (c *Controller) syncHandler(key string) (time.Duration, error) {
...
}
...
// If no phase set, default to pending (the initial phase):
if instance.Status.Phase == "" {
instance.Status.Phase = cnatv1alpha1.PhasePending
}
// Now let's make the main case distinction: implementing
// the state diagram PENDING -> RUNNING -> DONE
switch instance.Status.Phase {
case cnatv1alpha1.PhasePending:
klog.Infof("instance %s: phase=PENDING", key)
// As long as we haven't executed the command yet, we need
// to check if it's time already to act:
klog.Infof("instance %s: checking schedule %q", key, instance.Spec.Schedule)
// Check if it's already time to execute the command with a
// tolerance of 2 seconds:
d, err := timeUntilSchedule(instance.Spec.Schedule)
if err != nil {
utilruntime.HandleError(fmt.Errorf("schedule parsing failed: %v", err))
// Error reading the schedule - requeue the request:
return time.Duration(0), err
}
klog.Infof("instance %s: schedule parsing done: diff=%v", key, d)
if d > 0 {
// Not yet time to execute the command, wait until the
// scheduled time
return d, nil
}
klog.Infof(
"instance %s: it's time! Ready to execute: %s", key,
instance.Spec.Command,
)
instance.Status.Phase = cnatv1alpha1.PhaseRunning
case cnatv1alpha1.PhaseRunning:
klog.Infof("instance %s: Phase: RUNNING", key)
pod := newPodForCR(instance)
// Set At instance as the owner and controller
owner := metav1.NewControllerRef(
instance, cnatv1alpha1.SchemeGroupVersion.
WithKind("At"),
)
pod.ObjectMeta.OwnerReferences = append(pod.ObjectMeta.OwnerReferences, *owner)
// Try to see if the pod already exists and if not
// (which we expect) then create a one-shot pod as per spec:
found, err := c.kubeClientset.CoreV1().Pods(pod.Namespace).
Get(pod.Name, metav1.GetOptions{})
if err != nil && errors.IsNotFound(err) {
found, err = c.kubeClientset.CoreV1().Pods(pod.Namespace).Create(pod)
if err != nil {
return time.Duration(0), err
}
klog.Infof("instance %s: pod launched: name=%s", key, pod.Name)
} else if err != nil {
// requeue with error
return time.Duration(0), err
} else if found.Status.Phase == corev1.PodFailed ||
found.Status.Phase == corev1.PodSucceeded {
klog.Infof(
"instance %s: container terminated: reason=%q message=%q",
key, found.Status.Reason, found.Status.Message,
)
instance.Status.Phase = cnatv1alpha1.PhaseDone
} else {
// Don't requeue because it will happen automatically
// when the pod status changes.
return time.Duration(0), nil
}
case cnatv1alpha1.PhaseDone:
klog.Infof("instance %s: phase: DONE", key)
return time.Duration(0), nil
default:
klog.Infof("instance %s: NOP")
return time.Duration(0), nil
}
// Update the At instance, setting the status to the respective phase:
_, err = c.cnatClientset.CnatV1alpha1().Ats(instance.Namespace).
UpdateStatus(instance)
if err != nil {
return time.Duration(0), err
}
// Don't requeue. We should be reconcile because either the pod or
// the CR changes.
return time.Duration(0), nil
此外,为了设置informer和控制器,我们实现NewController()方法:
// NewController returns a new cnat controller
func NewController(
kubeClientset kubernetes.Interface,
cnatClientset clientset.Interface,
atInformer informers.AtInformer,
podInformer corev1informer.PodInformer) *Controller {
// Create event broadcaster
// Add cnat-controller types to the default Kubernetes Scheme so Events
// can be logged for cnat-controller types.
utilruntime.Must(cnatscheme.AddToScheme(scheme.Scheme))
klog.V(4).Info("Creating event broadcaster")
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(klog.Infof)
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{
Interface: kubeClientset.CoreV1().Events(""),
})
source := corev1.EventSource{Component: controllerAgentName}
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, source)
rateLimiter := workqueue.DefaultControllerRateLimiter()
controller := &Controller{
kubeClientset: kubeClientset,
cnatClientset: cnatClientset,
atLister: atInformer.Lister(),
atsSynced: atInformer.Informer().HasSynced,
podLister: podInformer.Lister(),
podsSynced: podInformer.Informer().HasSynced,
workqueue: workqueue.NewNamedRateLimitingQueue(rateLimiter, "Ats"),
recorder: recorder,
}
klog.Info("Setting up event handlers")
// Set up an event handler for when At resources change
atInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.enqueueAt,
UpdateFunc: func(old, new interface{}) {
controller.enqueueAt(new)
},
})
// Set up an event handler for when Pod resources change
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.enqueuePod,
UpdateFunc: func(old, new interface{}) {
controller.enqueuePod(new)
},
})
return controller
}
$ dep version
dep:
version : v0.5.1
build date : 2019-03-11
git hash : faa6189
go version : go1.12
go compiler : gc
platform : darwin/amd64
features : ImportDuringSolve=false
$ kustomize version
Version: {KustomizeVersion:v2.0.3 GitCommit:a6f65144121d1955266b0cd836ce954c04122dc8
BuildDate:2019-03-18T22:15:21+00:00 GoOs:darwin GoArch:amd64}
$ kubebuilder version
Version: version.Version{
KubeBuilderVersion:"1.0.8",
KubernetesVendor:"1.13.1",
GitCommit:"1adf50ed107f5042d7472ba5ab50d5e1d357169d",
BuildDate:"2019-01-25T23:14:29Z", GoOs:"unknown", GoArch:"unknown"
}
$ kubebuilder init \
--domain programming-kubernetes.info \
--license apache2 \
--owner "Programming Kubernetes authors"
Run `dep ensure` to fetch dependencies (Recommended) [y/n]?
y
dep ensure
Running make...
make
go generate ./pkg/... ./cmd/...
go fmt ./pkg/... ./cmd/...
go vet ./pkg/... ./cmd/...
go run vendor/sigs.k8s.io/controller-tools/cmd/controller-gen/main.go all
CRD manifests generated under 'config/crds'
RBAC manifests generated under 'config/rbac'
go test ./pkg/... ./cmd/... -coverprofile cover.out
? github.com/mhausenblas/cnat-kubebuilder/pkg/apis [no test files]
? github.com/mhausenblas/cnat-kubebuilder/pkg/controller [no test files]
? github.com/mhausenblas/cnat-kubebuilder/pkg/webhook [no test files]
? github.com/mhausenblas/cnat-kubebuilder/cmd/manager [no test files]
go build -o bin/manager github.com/mhausenblas/cnat-kubebuilder/cmd/manager
$ kubebuilder create api \
--group cnat \
--version v1alpha1 \
--kind At
Create Resource under pkg/apis [y/n]?
y
Create Controller under pkg/controller [y/n]?
y
Writing scaffold for you to edit...
pkg/apis/cnat/v1alpha1/at_types.go
pkg/apis/cnat/v1alpha1/at_types_test.go
pkg/controller/at/at_controller.go
pkg/controller/at/at_controller_test.go
Running make...
go generate ./pkg/... ./cmd/...
go fmt ./pkg/... ./cmd/...
go vet ./pkg/... ./cmd/...
go run vendor/sigs.k8s.io/controller-tools/cmd/controller-gen/main.go all
CRD manifests generated under 'config/crds'
RBAC manifests generated under 'config/rbac'
go test ./pkg/... ./cmd/... -coverprofile cover.out
? github.com/mhausenblas/cnat-kubebuilder/pkg/apis [no test files]
? github.com/mhausenblas/cnat-kubebuilder/pkg/apis/cnat [no test files]
ok github.com/mhausenblas/cnat-kubebuilder/pkg/apis/cnat/v1alpha1 9.011s
? github.com/mhausenblas/cnat-kubebuilder/pkg/controller [no test files]
ok github.com/mhausenblas/cnat-kubebuilder/pkg/controller/at 8.740s
? github.com/mhausenblas/cnat-kubebuilder/pkg/webhook [no test files]
? github.com/mhausenblas/cnat-kubebuilder/cmd/manager [no test files]
go build -o bin/manager github.com/mhausenblas/cnat-kubebuilder/cmd/manager
$ make install
go run vendor/sigs.k8s.io/controller-tools/cmd/controller-gen/main.go all
CRD manifests generated under 'config/crds'
RBAC manifests generated under 'config/rbac'
kubectl apply -f config/crds
customresourcedefinition.apiextensions.k8s.io/ats.cnat.programming-kubernetes.info created
在本机启动operator:
$ make run
go generate ./pkg/... ./cmd/...
go fmt ./pkg/... ./cmd/...
go vet ./pkg/... ./cmd/...
go run ./cmd/manager/main.go
{"level":"info","ts":1559152740.0550249,"logger":"entrypoint",
"msg":"setting up client for manager"}
{"level":"info","ts":1559152740.057556,"logger":"entrypoint",
"msg":"setting up manager"}
{"level":"info","ts":1559152740.1396701,"logger":"entrypoint",
"msg":"Registering Components."}
{"level":"info","ts":1559152740.1397,"logger":"entrypoint",
"msg":"setting up scheme"}
{"level":"info","ts":1559152740.139773,"logger":"entrypoint",
"msg":"Setting up controller"}
{"level":"info","ts":1559152740.139831,"logger":"kubebuilder.controller",
"msg":"Starting EventSource","controller":"at-controller",
"source":"kind source: /, Kind="}
{"level":"info","ts":1559152740.139929,"logger":"kubebuilder.controller",
"msg":"Starting EventSource","controller":"at-controller",
"source":"kind source: /, Kind="}
{"level":"info","ts":1559152740.139971,"logger":"entrypoint",
"msg":"setting up webhooks"}
{"level":"info","ts":1559152740.13998,"logger":"entrypoint",
"msg":"Starting the Cmd."}
{"level":"info","ts":1559152740.244628,"logger":"kubebuilder.controller",
"msg":"Starting Controller","controller":"at-controller"}
{"level":"info","ts":1559152740.344791,"logger":"kubebuilder.controller",
"msg":"Starting workers","controller":"at-controller","worker count":1}
保留正在运行的终端会话,打开一个新的会话,再执行命令安装CRD,并创建示例自定义资源,如下所示:
$ kubectl apply -f config/crds/cnat_v1alpha1_at.yaml
customresourcedefinition.apiextensions.k8s.io/ats.cnat.programming-kubernetes.info
configured
$ kubectl get crds
NAME CREATED AT
ats.cnat.programming-kubernetes.info 2019-05-29T17:54:51Z
$ kubectl apply -f config/samples/cnat_v1alpha1_at.yaml
at.cnat.programming-kubernetes.info/at-sample created
const (
PhasePending = "PENDING"
PhaseRunning = "RUNNING"
PhaseDone = "DONE"
)
// AtSpec defines the desired state of At
type AtSpec struct {
// Schedule is the desired time the command is supposed to be executed.
// Note: the format used here is UTC time https://www.utctime.net
Schedule string `json:"schedule,omitempty"`
// Command is the desired command (executed in a Bash shell) to be executed.
Command string `json:"command,omitempty"`
}
// AtStatus defines the observed state of At
type AtStatus struct {
// Phase represents the state of the schedule: until the command is executed
// it is PENDING, afterwards it is DONE.
Phase string `json:"phase,omitempty"`
}
func (r *ReconcileAt) Reconcile(req reconcile.Request) (reconcile.Result, error) {
reqLogger := log.WithValues("namespace", req.Namespace, "at", req.Name)
reqLogger.Info("=== Reconciling At")
// Fetch the At instance
instance := &cnatv1alpha1.At{}
err := r.Get(context.TODO(), req.NamespacedName, instance)
if err != nil {
if errors.IsNotFound(err) {
// Request object not found, could have been deleted after
// reconcile request—return and don't requeue:
return reconcile.Result{}, nil
}
// Error reading the object—requeue the request:
return reconcile.Result{}, err
}
// If no phase set, default to pending (the initial phase):
if instance.Status.Phase == "" {
instance.Status.Phase = cnatv1alpha1.PhasePending
}
// Now let's make the main case distinction: implementing
// the state diagram PENDING -> RUNNING -> DONE
switch instance.Status.Phase {
case cnatv1alpha1.PhasePending:
reqLogger.Info("Phase: PENDING")
// As long as we haven't executed the command yet, we need to check if
// it's already time to act:
reqLogger.Info("Checking schedule", "Target", instance.Spec.Schedule)
// Check if it's already time to execute the command with a tolerance
// of 2 seconds:
d, err := timeUntilSchedule(instance.Spec.Schedule)
if err != nil {
reqLogger.Error(err, "Schedule parsing failure")
// Error reading the schedule. Wait until it is fixed.
return reconcile.Result{}, err
}
reqLogger.Info("Schedule parsing done", "Result", "diff",
fmt.Sprintf("%v", d))
if d > 0 {
// Not yet time to execute the command, wait until the scheduled time
return reconcile.Result{RequeueAfter: d}, nil
}
reqLogger.Info("It's time!", "Ready to execute", instance.Spec.Command)
instance.Status.Phase = cnatv1alpha1.PhaseRunning
case cnatv1alpha1.PhaseRunning:
reqLogger.Info("Phase: RUNNING")
pod := newPodForCR(instance)
// Set At instance as the owner and controller
err := controllerutil.SetControllerReference(instance, pod, r.scheme)
if err != nil {
// requeue with error
return reconcile.Result{}, err
}
found := &corev1.Pod{}
nsName := types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace}
err = r.Get(context.TODO(), nsName, found)
// Try to see if the pod already exists and if not
// (which we expect) then create a one-shot pod as per spec:
if err != nil && errors.IsNotFound(err) {
err = r.Create(context.TODO(), pod)
if err != nil {
// requeue with error
return reconcile.Result{}, err
}
reqLogger.Info("Pod launched", "name", pod.Name)
} else if err != nil {
// requeue with error
return reconcile.Result{}, err
} else if found.Status.Phase == corev1.PodFailed ||
found.Status.Phase == corev1.PodSucceeded {
reqLogger.Info("Container terminated", "reason",
found.Status.Reason, "message", found.Status.Message)
instance.Status.Phase = cnatv1alpha1.PhaseDone
} else {
// Don't requeue because it will happen automatically when the
// pod status changes.
return reconcile.Result{}, nil
}
case cnatv1alpha1.PhaseDone:
reqLogger.Info("Phase: DONE")
return reconcile.Result{}, nil
default:
reqLogger.Info("NOP")
return reconcile.Result{}, nil
}
// Update the At instance, setting the status to the respective phase:
err = r.Status().Update(context.TODO(), instance)
if err != nil {
return reconcile.Result{}, err
}
// Don't requeue. We should be reconcile because either the pod
// or the CR changes.
return reconcile.Result{}, nil
}
$ make run
...
{"level":"info","ts":1555063897.488535,"logger":"controller",
"msg":"=== Reconciling At","namespace":"cnat","at":"example-at"}
{"level":"info","ts":1555063897.488621,"logger":"controller",
"msg":"Phase: PENDING","namespace":"cnat","at":"example-at"}
{"level":"info","ts":1555063897.4886441,"logger":"controller",
"msg":"Checking schedule","namespace":"cnat","at":"example-at",
"Target":"2019-04-12T10:12:00Z"}
{"level":"info","ts":1555063897.488703,"logger":"controller",
"msg":"Schedule parsing done","namespace":"cnat","at":"example-at",
"Result":"2019-04-12 10:12:00 +0000 UTC with a diff of 22.511336s"}
{"level":"info","ts":1555063907.489264,"logger":"controller",
"msg":"=== Reconciling At","namespace":"cnat","at":"example-at"}
{"level":"info","ts":1555063907.489402,"logger":"controller",
"msg":"Phase: PENDING","namespace":"cnat","at":"example-at"}
{"level":"info","ts":1555063907.489428,"logger":"controller",
"msg":"Checking schedule","namespace":"cnat","at":"example-at",
"Target":"2019-04-12T10:12:00Z"}
{"level":"info","ts":1555063907.489486,"logger":"controller",
"msg":"Schedule parsing done","namespace":"cnat","at":"example-at",
"Result":"2019-04-12 10:12:00 +0000 UTC with a diff of 12.510551s"}
{"level":"info","ts":1555063917.490178,"logger":"controller",
"msg":"=== Reconciling At","namespace":"cnat","at":"example-at"}
{"level":"info","ts":1555063917.4902349,"logger":"controller",
"msg":"Phase: PENDING","namespace":"cnat","at":"example-at"}
{"level":"info","ts":1555063917.490247,"logger":"controller",
"msg":"Checking schedule","namespace":"cnat","at":"example-at",
"Target":"2019-04-12T10:12:00Z"}
{"level":"info","ts":1555063917.490278,"logger":"controller",
"msg":"Schedule parsing done","namespace":"cnat","at":"example-at",
"Result":"2019-04-12 10:12:00 +0000 UTC with a diff of 2.509743s"}
{"level":"info","ts":1555063927.492718,"logger":"controller",
"msg":"=== Reconciling At","namespace":"cnat","at":"example-at"}
{"level":"info","ts":1555063927.49283,"logger":"controller",
"msg":"Phase: PENDING","namespace":"cnat","at":"example-at"}
{"level":"info","ts":1555063927.492857,"logger":"controller",
"msg":"Checking schedule","namespace":"cnat","at":"example-at",
"Target":"2019-04-12T10:12:00Z"}
{"level":"info","ts":1555063927.492915,"logger":"controller",
"msg":"Schedule parsing done","namespace":"cnat","at":"example-at",
"Result":"2019-04-12 10:12:00 +0000 UTC with a diff of -7.492877s"}
{"level":"info","ts":1555063927.4929411,"logger":"controller",
"msg":"It's time!","namespace":"cnat","at":
"example-at","Ready to execute":"echo YAY"}
{"level":"info","ts":1555063927.626236,"logger":"controller",
"msg":"=== Reconciling At","namespace":"cnat","at":"example-at"}
{"level":"info","ts":1555063927.626303,"logger":"controller",
"msg":"Phase: RUNNING","namespace":"cnat","at":"example-at"}
{"level":"info","ts":1555063928.07445,"logger":"controller",
"msg":"Pod launched","namespace":"cnat","at":"example-at",
"name":"example-at-pod"}
{"level":"info","ts":1555063928.199562,"logger":"controller",
"msg":"=== Reconciling At","namespace":"cnat","at":"example-at"}
{"level":"info","ts":1555063928.199645,"logger":"controller",
"msg":"Phase: DONE","namespace":"cnat","at":"example-at"}
{"level":"info","ts":1555063937.631733,"logger":"controller",
"msg":"=== Reconciling At","namespace":"cnat","at":"example-at"}
{"level":"info","ts":1555063937.631783,"logger":"controller",
"msg":"Phase: DONE","namespace":"cnat","at":"example-at"}
...
想验证我们的自定义控制器是否已完成其工作,可以执行以下命令:
$ kubectl get at,pods
NAME AGE
at.cnat.programming-kubernetes.info/example-at 11m
NAME READY STATUS RESTARTS AGE
pod/example-at-pod 0/1 Completed 0 38s
$ dep version
dep:
version : v0.5.1
build date : 2019-03-11
git hash : faa6189
go version : go1.12
go compiler : gc
platform : darwin/amd64
features : ImportDuringSolve=false
$ operator-sdk --version
operator-sdk version v0.6.0
引导
现在开始按如下方式创建cnat operator:
$ operator-sdk new cnat-operator && cd cnat-operator
// AtSpec defines the desired state of At
// +k8s:openapi-gen=true
type AtSpec struct {
// Schedule is the desired time the command is supposed to be executed.
// Note: the format used here is UTC time https://www.utctime.net
Schedule string `json:"schedule,omitempty"`
// Command is the desired command (executed in a Bash shell) to be executed.
Command string `json:"command,omitempty"`
}
// AtStatus defines the observed state of At
// +k8s:openapi-gen=true
type AtStatus struct {
// Phase represents the state of the schedule: until the command is executed
// it is PENDING, afterwards it is DONE.
Phase string `json:"phase,omitempty"`
}
$ kubectl get at,pods
NAME AGE
at.cnat.programming-kubernetes.info/example-at 23m
NAME READY STATUS RESTARTS AGE
pod/example-at-pod 0/1 Completed 0 46s
$ kubectl logs example-at-pod
YAY
目前operator开发仍然被认为将是最受欢迎和广泛使用的方式。在Kubernetes生态中,在CR和控制器方面,有几个特殊兴趣小组。最主要的是SIG API Machinery,目前它拥有CR和控制器,负责Kubebuilder项目。operator SDK已经努力在与Kubebuilder API的对齐,因此你也会看到很多重合的地方。