Extending Kubernetes — Part 1 — Custom Operator

Extending Kubernetes

  1. Kubectl plugins:- Kubectl plugins are nothing but an executable file with name starts with kubectl-
  2. API Access Extensions:- API Access Extensions are extensions that extend different stages of API Server. We can use this extension to implement custom authentication, automatic sidecar injection, etc.
  3. Custom Resources:- Kubernetes has many resource types like Pods, Services, Deployments, etc. We can implement custom resource types. Custom Resources commonly combine with custom controllers.
  4. Scheduler Extensions:- Kubernetes scheduler decides which node to use to deploy pods. We can extend this scheduler by writing a custom scheduler extension and implement our algorithms.
  5. Custom Controllers:- Custom controllers used along with custom resources, which is called Operator Pattern.
  6. Network Plugins:- Network Plugins are plugins that extend the pod networking.
  7. Storage Plugins:- Storage Plugins are plugins that extend the types of storage.

Custom Resource Definition

Custom Controller

Operator Pattern

Reconciliation Loop

Writing an Operator

Pulsar Consumer Operator

kubebuilder init --domain pulsarconsumer.krvarma.com
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
"flag"
"os"

"k8s.io/apimachinery/pkg/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
// +kubebuilder:scaffold:imports
)

var (
scheme = runtime.NewScheme()
setupLog = ctrl.Log.WithName("setup")
)

func init() {
_ = clientgoscheme.AddToScheme(scheme)

// +kubebuilder:scaffold:scheme
}

func main() {
var metricsAddr string
var enableLeaderElection bool
flag.StringVar(&metricsAddr, "metrics-addr", ":8080", "The address the metric endpoint binds to.")
flag.BoolVar(&enableLeaderElection, "enable-leader-election", false,
"Enable leader election for controller manager. "+
"Enabling this will ensure there is only one active controller manager.")
flag.Parse()

ctrl.SetLogger(zap.New(zap.UseDevMode(true)))

mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Scheme: scheme,
MetricsBindAddress: metricsAddr,
Port: 9443,
LeaderElection: enableLeaderElection,
LeaderElectionID: "ba84c7bc.pulsarconsumer.krvarma.com",
})
if err != nil {
setupLog.Error(err, "unable to start manager")
os.Exit(1)
}

// +kubebuilder:scaffold:builder

setupLog.Info("starting manager")
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
setupLog.Error(err, "problem running manager")
os.Exit(1)
}
}
kubebuilder create api --group consumer --version v1 --kind PulsarConsumer
  • ServerAddress — Address of the Pulsar server
  • Topic — Name of the topic
  • Subscription — Name of the subscription
  • Replicas — Number of replicas
  • ServerAddress — Address of the Pulsar server
  • Topic — Name of the topic
  • Subscription — Name of the subscription
  • Replicas — Number of replicas
  1. Query the named object: The Reconcile function receives a parameter of type Request. The request parameter contains the namespaced name of the specified object. We query the system to get the PulsarConsumer with the specified name using the Get method.
  2. Retrieve the object: Once we have the PulsarConsumer with the specified name. We will check whether we already have an object in the system.
  3. Create if it is not present: If the client returns a not found error, it means an object with the specified name is not there in the system. So we need to create it and update the status. If the error is something else, we should gracefully return.
  4. Check current state: If there is no error, then it means the specified object is present in the system. We should check the current state and the desired state and see whether it is equal or not. This part is a little bit tricky. Many blogs suggest using reflect.DeepEqual, but this will not work since the deployment controller or other Kubernetes components will add some default fields to the Spec object, which will result in always false situations while using reflect.DeepEqual. After going through different blogs and kubebuilder issues, I found this particular issue. One of the comments suggests using equality.Semantic.DeepDerivative since it will compare only non-zero fields on the structure. Using the proposed solution worked without any issues.
  5. Update the state if necessary: If there is a difference in the present and desired state, then we should update the state and update the status. If there is no difference, do nothing.
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package v1

import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN!
// NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized.

// PulsarConsumerSpec defines the desired state of PulsarConsumer
type PulsarConsumerSpec struct {
// INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
// Important: Run "make" to regenerate code after modifying this file

// +kubebuilder:validation:Required
// Address of the pulsar server.
ServerAddress string `json:"serverAddress,omitempty"`

// +kubebuilder:validation:Required
// Name of the topic to listen.
Topic string `json:"topic,omitempty"`

// +kubebuilder:validation:Required
// Name of the subscripton.
SubscriptionName string `json:"subscriptionName,omitempty"`

// +kubebuilder:validation:Required
// Number of replicas.
Replicas *int32 `json:"replicas,omitempty"`
}

// PulsarConsumerStatus defines the observed state of PulsarConsumer
type PulsarConsumerStatus struct {
// INSERT ADDITIONAL STATUS FIELD - define observed state of cluster
// Important: Run "make" to regenerate code after modifying this file

// Server Address
Server string `json:"server"`
// Name of the pulsar topic
Topic string `json:"topic"`
// Name of the subscription
Subscription string `json:"subscription"`
// Number of replicas
Replicas *int32 `json:"replicas"`
}

// +kubebuilder:object:root=true
// +kubebuilder:subresource:status
// +kubebuilder:printcolumn:JSONPath=".status.server",name="Server",type="string"
// +kubebuilder:printcolumn:JSONPath=".status.topic",name="Topic",type="string"
// +kubebuilder:printcolumn:JSONPath=".status.subscription",name="Subscription",type="string"
// +kubebuilder:printcolumn:JSONPath=".status.replicas",name="Replicas",type="integer"

// PulsarConsumer is the Schema for the pulsarconsumers API
type PulsarConsumer struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`

Spec PulsarConsumerSpec `json:"spec,omitempty"`
Status PulsarConsumerStatus `json:"status,omitempty"`
}

// +kubebuilder:object:root=true

// PulsarConsumerList contains a list of PulsarConsumer
type PulsarConsumerList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata,omitempty"`
Items []PulsarConsumer `json:"items"`
}

func init() {
SchemeBuilder.Register(&PulsarConsumer{}, &PulsarConsumerList{})
}

Idempotency of the Operator

Marker Comments

// +kubebuilder:validation:Required
// +kubebuilder:rbac:groups=pulsar.pulsarconsumer.krvarma.com,resources=pulsarconsumers,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=pulsar.pulsarconsumer.krvarma.com,resources=pulsarconsumers/status,verbs=get;update;patch

Building and deploying the Operator

  1. make run:- Run the on the default Kubernetes cluster
  2. make install:- Install the CRD into the cluster
  3. make uninstall:- Uninstall the CRD
  4. make deploy:- Deploy the Operator into the cluster
  5. make manifests:- Generate the YAML files
  6. make generate:- Generate source codes
  7. make docker-build:- Build the docker image
  8. make docker-push:- Push the docker image to the specified registry
make install make run
kubectl apply -f config/samples/pulsar_v1_pulsarconsumer.yaml

Deploying the Operator

make docker-build docker-push IMG=<registry>/<user>/pulsar-operator
make deploy IMG=<registry>/<user>/pulsar-operator
kubectl get pods -n pulsarconsumercrd-system
env PULSAR_SERVER=pulsar://localhost:6650 env PULSAR_MESSAGE="Sample Message" env PULSAR_TOPIC="my-topic" pulsar-producer

Code

Further reading

  1. Official Documentation
  2. Helm Operators using Operator-SDK
  3. RedHat documentation

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Krishnaraj Varma

Krishnaraj Varma

A Software Architect from Kerala, India, Open Source, Cloud Native enthusiast. Likes Golang, Rust, C/C++, Kubernetes, Kafka, etc.