Usage

If you are not installing the operator using Helm then after installation the CRD for this operator must be created:

kubectl apply -f /etc/stackable/kafka-operator/crd/kafkacluster.crd.yaml

To create an Apache Kafka cluster named simple-kafka assuming that you already have a Zookeeper cluster named simple-zk:

---
apiVersion: zookeeper.stackable.tech/v1alpha1
kind: ZookeeperZnode
metadata:
  name: simple-kafka-znode
spec:
  clusterRef:
    name: simple-zk
    namespace: default
---
apiVersion: kafka.stackable.tech/v1alpha1
kind: KafkaCluster
metadata:
  name: simple-kafka
spec:
  image:
    productVersion: 3.3.1
    stackableVersion: 0.3.0
  clusterConfig:
    zookeeperConfigMapName: simple-kafka-znode
  brokers:
    roleGroups:
      default:
        replicas: 1

If you wish to include integration with Open Policy Agent and already have an OPA cluster, then you can include an opa field pointing to the OPA cluster discovery ConfigMap and the required package. The package is optional and will default to the metadata.name field:

---
apiVersion: kafka.stackable.tech/v1alpha1
kind: KafkaCluster
metadata:
  name: simple-kafka
spec:
  image:
    productVersion: 3.3.1
    stackableVersion: 0.3.0
  clusterConfig:
    authorization:
      opa:
        configMapName: simple-opa
        package: kafka
    zookeeperConfigMapName: simple-kafka-znode
  brokers:
    roleGroups:
      default:
        replicas: 1

You can change some opa cache properties by overriding:

---
apiVersion: kafka.stackable.tech/v1alpha1
kind: KafkaCluster
metadata:
  name: simple-kafka
spec:
  image:
    productVersion: 3.3.1
    stackableVersion: 0.3.0
  clusterConfig:
    authorization:
      opa:
        configMapName: simple-opa
        package: kafka
    zookeeperConfigMapName: simple-kafka-znode
  brokers:
    configOverrides:
      server.properties:
        opa.authorizer.cache.initial.capacity: "100"
        opa.authorizer.cache.maximum.size: "100"
        opa.authorizer.cache.expire.after.seconds: "10"
    roleGroups:
      default:
        replicas: 1

A full list of settings and their respective defaults can be found here.

Monitoring

The managed Kafka instances are automatically configured to export Prometheus metrics. See Monitoring for more details.

Provide log4j.properties

Per default, the log4j.properties from the kafka package is used. However, you can provide your own log4j.properties via the custom resource:

---
apiVersion: kafka.stackable.tech/v1alpha1
kind: KafkaCluster
metadata:
  name: simple-kafka
spec:
  image:
    productVersion: 3.3.1
    stackableVersion: 0.3.0
  clusterConfig:
    zookeeperConfigMapName: simple-kafka-znode
    log4j: |-
      log4j.rootLogger=INFO, stdout, kafkaAppender

      log4j.appender.stdout=org.apache.log4j.ConsoleAppender
      log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
      log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n

      log4j.appender.kafkaAppender=org.apache.log4j.DailyRollingFileAppender
      log4j.appender.kafkaAppender.DatePattern='.'yyyy-MM-dd-HH
      log4j.appender.kafkaAppender.File=${kafka.logs.dir}/server.log
      log4j.appender.kafkaAppender.layout=org.apache.log4j.PatternLayout
      log4j.appender.kafkaAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
  brokers:
    roleGroups:
      default:
        replicas: 3

Encryption

The internal and client communication can be encrypted TLS. This requires the Secret Operator to be present in order to provide certificates. The utilized certificates can be changed in a top-level config.

---
apiVersion: kafka.stackable.tech/v1alpha1
kind: KafkaCluster
metadata:
  name: simple-kafka
spec:
  image:
    productVersion: 3.3.1
    stackableVersion: 0.3.0
  clusterConfig:
    zookeeperConfigMapName: simple-kafka-znode
    tls:
      serverSecretClass: tls (1)
      internalSecretClass: kafka-internal-tls (2)
  brokers:
    roleGroups:
      default:
        replicas: 3
1 The spec.clusterConfig.tls.serverSecretClass refers to the client-to-server encryption. Defaults to the tls secret. Can be deactivated by setting serverSecretClass to null.
2 The spec.clusterConfig.tls.internalSecretClass refers to the broker-to-broker internal encryption. This must be explicitly set or defaults to tls. May be disabled by setting internalSecretClass to null.

The tls secret is deployed from the Secret Operator and looks like this:

---
apiVersion: secrets.stackable.tech/v1alpha1
kind: SecretClass
metadata:
  name: tls
spec:
  backend:
    autoTls:
      ca:
        secret:
          name: secret-provisioner-tls-ca
          namespace: default
        autoGenerate: true

You can create your own secrets and reference them e.g. in the spec.clusterConfig.tls.serverSecretClass or spec.clusterConfig.tls.internalSecretClass to use different certificates.

Authentication

The internal or broker-to-broker communication is authenticated via TLS. In order to enforce TLS authentication for client-to-server communication, you can set an AuthenticationClass reference in the custom resource provided by the Commons Operator.

---
apiVersion: authentication.stackable.tech/v1alpha1
kind: AuthenticationClass
metadata:
  name: kafka-client-tls (2)
spec:
  provider:
    tls:
      clientCertSecretClass: kafka-client-auth-secret (3)
---
apiVersion: secrets.stackable.tech/v1alpha1
kind: SecretClass
metadata:
  name: kafka-client-auth-secret (4)
spec:
  backend:
    autoTls:
      ca:
        secret:
          name: secret-provisioner-tls-kafka-client-ca
          namespace: default
        autoGenerate: true
---
apiVersion: kafka.stackable.tech/v1alpha1
kind: KafkaCluster
metadata:
  name: simple-kafka
spec:
  image:
    productVersion: 3.3.1
    stackableVersion: 0.3.0
  clusterConfig:
    authentication:
      - authenticationClass: kafka-client-tls (1)
    zookeeperConfigMapName: simple-kafka-znode
  brokers:
    roleGroups:
      default:
        replicas: 3
1 The clusterConfig.authentication.authenticationClass can be set to use TLS for authentication. This is optional.
2 The referenced AuthenticationClass that references a SecretClass to provide certificates.
3 The reference to a SecretClass.
4 The SecretClass that is referenced by the AuthenticationClass in order to provide certificates.

Configuration & Environment Overrides

The cluster definition also supports overriding configuration properties and environment variables, either per role or per role group, where the more specific override (role group) has precedence over the less specific one (role).

Overriding certain properties which are set by operator (such as the ports) can interfere with the operator and can lead to problems.

Configuration Properties

For a role or role group, at the same level of config, you can specify: configOverrides for the server.properties. For example, if you want to set the auto.create.topics.enable to disable automatic topic creation, it can be configured in the KafkaCluster resource like so:

brokers:
  roleGroups:
    default:
      configOverrides:
        server.properties:
          auto.create.topics.enable: "false"
      replicas: 1

Just as for the config, it is possible to specify this at role level as well:

brokers:
  configOverrides:
    server.properties:
      auto.create.topics.enable: "false"
  roleGroups:
    default:
      replicas: 1

All override property values must be strings.

For a full list of configuration options we refer to the Apache Kafka Configuration Reference.

Environment Variables

In a similar fashion, environment variables can be (over)written. For example per role group:

servers:
  roleGroups:
    default:
      envOverrides:
        MY_ENV_VAR: "MY_VALUE"
      replicas: 1

or per role:

servers:
  envOverrides:
    MY_ENV_VAR: "MY_VALUE"
  roleGroups:
    default:
      replicas: 1

Storage for data volumes

You can mount volumes where data is stored by specifying PersistentVolumeClaims for each individual role group:

brokers:
  roleGroups:
    default:
      config:
        resources:
          storage:
            data:
              capacity: 2Gi

In the above example, all Kafka brokers in the default group will store data (the location of the property log.dirs) on a 2Gi volume.

By default, in case nothing is configured in the custom resource for a certain role group, each Pod will have a 1Gi large local volume mount for the data location.

Resource Requests

Stackable operators handle resource requests in a sligtly different manner than Kubernetes. Resource requests are defined on role or group level. See Roles and role groups for details on these concepts. On a role level this means that e.g. all workers will use the same resource requests and limits. This can be further specified on role group level (which takes priority to the role level) to apply different resources.

This is an example on how to specify CPU and memory resources using the Stackable Custom Resources:

---
apiVersion: example.stackable.tech/v1alpha1
kind: ExampleCluster
metadata:
  name: example
spec:
  workers: # role-level
    config:
      resources:
        cpu:
          min: 300m
          max: 600m
        memory:
          limit: 3Gi
    roleGroups: # role-group-level
      resources-from-role: # role-group 1
        replicas: 1
      resources-from-role-group: # role-group 2
        replicas: 1
        config:
          resources:
            cpu:
              min: 400m
              max: 800m
            memory:
              limit: 4Gi

In this case, the role group resources-from-role will inherit the resources specified on the role level. Resulting in a maximum of 3Gi memory and 600m CPU resources.

The role group resources-from-role-group has maximum of 4Gi memory and 800m CPU resources (which overrides the role CPU resources).

For Java products the actual used Heap memory is lower than the specified memory limit due to other processes in the Container requiring memory to run as well. Currently, 80% of the specified memory limits is passed to the JVM.

For memory only a limit can be specified, which will be set as memory request and limit in the Container. This is to always guarantee a Container the full amount memory during Kubernetes scheduling.

If no resource requests are configured explicitly, the Kafka operator uses the following defaults:

brokers:
  roleGroups:
    default:
      config:
        resources:
          memory:
            limit: '2Gi'
          cpu:
            min: '500m'
            max: '4'
          storage:
            log_dirs:
              capacity: 1Gi