A Kubernetes operator is a set of custom controllers that we deploy in the cluster implemented with CustomResourceDefinitions (CRDs). They listen for changes in the custom resources owned by them and perform specific actions like creating, modifying, and deleting Kubernetes resources.
If you want to read more about custom controllers and CustomResourceDefinition, please read this Kubernetes documentation.
Why would you make your own Kubernetes operator based system in the first place? It is because you are creating a platform. A platform is a software that we build other software on. The things we build on top of a platform matter; that is where real business value is. This means platforms should be our best tested software, and if we make Kubernetes operator based platforms, how do we test them? We will see in a bit.
You can refer to this post about Prometheus Operator to know more about use cases for Kubernetes operators.
There are three basic forms of testing when it comes to software:
Because of the challenges we saw in End to End and Unit tests, Integration tests have become more important. How we write our integration tests depends on how we built our controllers. We will use EnvTest with Ginkgo and Gomega to test our CRD based operators.
EnvTest - A Go library that helps write integration tests for your controllers by setting up and starting an instance of etcd and the Kubernetes API Server, without kubelet, controller-manager or other components.
Ginkgo is a testing framework for Go designed to help you write expressive tests. It is best paired with the Gomega matcher library. When combined, Ginkgo and Gomega provide a rich and expressive DSL (Domain-specific Language) for writing tests.
Gomega is a matcher/assertion library. It is best paired with the Ginkgo BDD test framework.
We will use a sample operator based on the Operator SDK framework and will write integration test cases for it.Testing Kubernetes controllers is a big subject, and the boilerplate testing files generated for you by Kubebuilder/Operator SDK are fairly minimal. Creating the operator from scratch is out of the scope of this post. However, you can refer to the guide to build operators using Go.
This will be the hierarchy of the files in our setup:
└── controller/
├── memcached_controller.go
├── suite_test.go
└── memcached_controller_test.go
Sample Memcached CR:
apiVersion: cache.infracloud.io/v1alpha1
kind: Memcached
metadata:
name: memcached-sample
spec:
size:2
Here is the reconciler, which we will be testing. This reconciler creates a Memcached deployment if it doesn’t exist, ensures that the deployment size is the same as specified by the Memcached Custom Resource (CR) spec. And then updates the Memcached CR status using the status writer with the name of the pods.
func (r *MemcachedReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := ctrllog.FromContext(ctx)
// Fetch the Memcached instance
memcached := &cachev1alpha1.Memcached{}
err := r.Get(ctx, req.NamespacedName, memcached)
if err != nil {
if errors.IsNotFound(err) {
// Request object not found, could have been deleted after reconcile request.
// Owned objects are automatically garbage collected. For additional cleanup logic use finalizers.
// Return and don't requeue
log.Info("Memcached resource not found. Ignoring since object must be deleted")
return ctrl.Result{}, nil
}
// Error reading the object - requeue the request.
log.Error(err, "Failed to get Memcached")
return ctrl.Result{}, err
}
// Check if the deployment already exists, if not create a new one
found := &appsv1.Deployment{}
err = r.Get(ctx, types.NamespacedName{Name: memcached.Name, Namespace: memcached.Namespace}, found)
if err != nil && errors.IsNotFound(err) {
// Define a new deployment
dep := r.deploymentForMemcached(memcached)
log.Info("Creating a new Deployment", "Deployment.Namespace", dep.Namespace, "Deployment.Name", dep.Name)
err = r.Create(ctx, dep)
if err != nil {
log.Error(err, "Failed to create new Deployment", "Deployment.Namespace", dep.Namespace, "Deployment.Name", dep.Name)
return ctrl.Result{}, err
}
// Deployment created successfully - return and requeue
return ctrl.Result{Requeue: true}, nil
} else if err != nil {
log.Error(err, "Failed to get Deployment")
return ctrl.Result{}, err
}
// Ensure the deployment size is the same as the spec
size := memcached.Spec.Size
if *found.Spec.Replicas != size {
found.Spec.Replicas = &size
err = r.Update(ctx, found)
if err != nil {
log.Error(err, "Failed to update Deployment", "Deployment.Namespace", found.Namespace, "Deployment.Name", found.Name)
return ctrl.Result{}, err
}
// Ask to requeue after 1 minute in order to give enough time for the
// pods be created on the cluster side and the operand be able
// to do the next update step accurately.
return ctrl.Result{RequeueAfter: time.Minute}, nil
}
// Update the Memcached status with the pod names
// List the pods for this memcached's deployment
podList := &corev1.PodList{}
listOpts := []client.ListOption{
client.InNamespace(memcached.Namespace),
client.MatchingLabels(labelsForMemcached(memcached.Name)),
}
if err = r.List(ctx, podList, listOpts...); err != nil {
log.Error(err, "Failed to list pods", "Memcached.Namespace", memcached.Namespace, "Memcached.Name", memcached.Name)
return ctrl.Result{}, err
}
podNames := getPodNames(podList.Items)
// Update status.Nodes if needed
if !reflect.DeepEqual(podNames, memcached.Status.Nodes) {
memcached.Status.Nodes = podNames
err := r.Status().Update(ctx, memcached)
if err != nil {
log.Error(err, "Failed to update Memcached status")
return ctrl.Result{}, err
}
}
return ctrl.Result{}, nil
}
A few simple integration test cases could be:
Operator SDK does the boilerplate setup and teardown of testEnv for you, in the ginkgo test suite that it generates under the /controllers
directory
Ginkgo supports suite-level setup and cleanup through two specialized suite setup nodes:Â
BeforeSuite: Ginkgo will run our BeforeSuite closure at the beginning of the run phase - i.e., after the spec tree has been constructed but before any specs (test cases) have run. Following are the tasks that will be performed in the BeforeSuite. These will be executed only once.
AfterSuite: AfterSuite closure will run after all the tests to tear down the setup and stop the testEnv Kubernetes server.
var _ = BeforeSuite(func() {
logf.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true)))
ctx, cancel = context.WithCancel(context.TODO())
By("bootstrapping test environment")
testEnv = &envtest.Environment{
CRDDirectoryPaths: []string{filepath.Join("..", "config", "crd", "bases")},
ErrorIfCRDPathMissing: true,
CRDInstallOptions: envtest.CRDInstallOptions{
MaxTime: 60 * time.Second,
},
}
cfg, err := testEnv.Start()
Expect(err).NotTo(HaveOccurred())
Expect(cfg).NotTo(BeNil())
err = cachev1alpha1.AddToScheme(scheme.Scheme)
Expect(err).NotTo(HaveOccurred())
k8sClient, err = client.New(cfg, client.Options{Scheme: scheme.Scheme})
Expect(err).NotTo(HaveOccurred())
Expect(k8sClient).NotTo(BeNil())
k8sManager, err := ctrl.NewManager(cfg, ctrl.Options{
Scheme: scheme.Scheme,
})
Expect(err).ToNot(HaveOccurred())
err = (&MemcachedReconciler{
Client: k8sManager.GetClient(),
Scheme: k8sManager.GetScheme(),
}).SetupWithManager(k8sManager)
Expect(err).ToNot(HaveOccurred())
//+kubebuilder:scaffold:scheme
go func() {
defer GinkgoRecover()
err = k8sManager.Start(ctx)
Expect(err).ToNot(HaveOccurred(), "failed to run manager")
}()
})
var _ = AfterSuite(func() {
cancel()
By("tearing down the test environment")
err := testEnv.Stop()
Expect(err).NotTo(HaveOccurred())
})
This is the part of suite_test.go file.
Now let’s start writing the integration tests
Ginkgo makes it easy to write expressive specs that describe the behavior of your code in an organized manner. Ginkgo suites are hierarchical collections of specs composed of container nodes, setup nodes, and subject nodes organized into a spec tree.
var _ = Describe("MemcachedController", func() {
Context("testing memcache controller", func() {
var memcached *cachev1alpha1.Memcached
BeforeEach(func() {
memcached = &cachev1alpha1.Memcached{
ObjectMeta: metav1.ObjectMeta{
Name: "test-memcache",
Namespace: "default",
},
Spec: cachev1alpha1.MemcachedSpec{
Size: 2,
},
}
})
// Integration tests using It blocks are written here.
})
)}
When a custom resource Memcached is created in a cluster, the controller should create a corresponding deployment.
It("should create deployment", func() {
Expect(k8sClient.Create(ctx, memcached)).To(BeNil())
createdDeploy := &appsv1.Deployment{}
deployKey := types.NamespacedName{Name: memcached.Name, Namespace: memcached.Namespace}
Eventually(func() bool {
err := k8sClient.Get(ctx, deployKey, createdDeploy)
return err == nil
}, time.Second*10, time.Millisecond*250).Should(BeTrue())
})
nil
. Once we create a CR, the controller running should create a deployment.Get
call to the testEnv Kubernetes API Server to get the deployment in the Eventually block (it is like a retry block with a timeout) and expect it to get the deployment in that period; otherwise, it will fail.Deployment created should have same number of replicas as defined by user in custom resource spec.
It("verify replicas for deployment", func() {
createdDeploy := &appsv1.Deployment{}
deployKey := types.NamespacedName{Name: memcached.Name, Namespace: memcached.Namespace}
Eventually(func() bool {
err := k8sClient.Get(ctx, deployKey, createdDeploy)
return err == nil
}, time.Second*10, time.Millisecond*250).Should(BeTrue())
Expect(createdDeploy.Spec.Replicas).To(Equal(&memcached.Spec.Size))
})
Memcached.Spec.Size
using Expect block.In a similar way, using Gomega’s assertion functions test case 3 is written in which once a user updates a custom resource (here number of replicas), the corresponding deployment should be updated.
It("should update deployment, once memcached size is changed", func() {
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: memcached.Name, Namespace: memcached.Namespace},
memcached)).Should(Succeed())
// update size to 3
memcached.Spec.Size = 3
Expect(k8sClient.Update(ctx, memcached)).Should(Succeed())
Eventually(func() bool {
k8sClient.Get(ctx,
types.NamespacedName{Name: memcached.Name, Namespace: memcached.Namespace},
memcached)
return memcached.Spec.Size == 3
}, time.Second*10, time.Millisecond*250).Should(BeTrue())
createdDeploy := &appsv1.Deployment{}
deployKey := types.NamespacedName{Name: memcached.Name, Namespace: memcached.Namespace}
Eventually(func() bool {
err := k8sClient.Get(ctx, deployKey, createdDeploy)
return err == nil
}, time.Second*10, time.Millisecond*250).Should(BeTrue())
Expect(createdDeploy.Spec.Replicas).To(Equal(&memcached.Spec.Size))
})
After writing all this code, you can run go test ./...
(or you can use ginkgo cli) in your controllers/
directory to run the test cases.
You can check out the whole code base in our GitHub repository k8s-operator-with-tests.
--until-it-fails
to identify flaky tests.That’s all for this post. This blog post is co-written by Yachika and Rahul. If you are working with Kubernetes Operators or plan to use it and need some assistance, feel free to reach out to Yachika and Rahul Sawra. We’re always excited to hear thoughts and feedback from our readers!
Looking for help with Kubernetes adoption or Day 2 operations? do check out how we’re helping startups & enterprises with our Kubernetes consulting services and capabilities.