Custom Python processors

In NiFi 2.0, support for custom processors written in Python was added. The Stackable images already contain the required tooling, such as - obviously - a supported Python version.

General configuration

spec:
  nodes:
    configOverrides:
      nifi.properties:
        # The command used to launch Python.
        # This property must be set to enable Python-based processors.
        nifi.python.command: python3
        # The directory that NiFi should look in to find custom Python-based
        # Processors.
        nifi.python.extensions.source.directory.custom: /nifi-python-extensions
        # The directory that contains the Python framework for communicating
        # between the Python and Java processes.
        nifi.python.framework.source.directory: /stackable/nifi/python/framework/
        # The working directory where NiFi should store artifacts
        # This property defaults to ./work/python but if you want to mount an
        # emptyDir for the working directory then another directory has to be
        # set to avoid ownership conflicts with ./work/nar.
        nifi.python.working.directory: /nifi-python-working-directory

Getting Python scripts into NiFi

NiFi should hot-reload the Python scripts. You might need to refresh your browser window to see the new processor.

1. Mount as ConfigMap

The easiest way is defining a ConfigMap and mounting it as follows. This way, the Python processors are stored and versioned alongside your NifiCluster itself.

apiVersion: v1
kind: ConfigMap
metadata:
  name: nifi-python-extensions
data:
  CreateFlowFileProcessor.py: |
    from nifiapi.flowfilesource import FlowFileSource, FlowFileSourceResult

    class CreateFlowFile(FlowFileSource):
        class Java:
            implements = ['org.apache.nifi.python.processor.FlowFileSource']

        class ProcessorDetails:
            version = '0.0.1-SNAPSHOT'
            description = '''A Python processor that creates FlowFiles.'''

        def __init__(self, **kwargs):
            pass

        def create(self, context):
            return FlowFileSourceResult(
                relationship = 'success',
                attributes = {'greeting': 'hello'},
                contents = 'Hello World!'
            )

The Python script is taken from the offical NiFi Python developer guide.

You can add multiple Python scripts in the ConfigMap. Afterwards we need to mount the Python scripts into /nifi-python-extensions:

spec:
  nodes:
    podOverrides:
      spec:
        containers:
          - name: nifi
            volumeMounts:
              - name: nifi-python-extensions
                mountPath: /nifi-python-extensions
              - name: nifi-python-working-directory
                mountPath: /nifi-python-working-directory
        volumes:
          - name: nifi-python-extensions
            configMap:
              name: nifi-python-extensions
          - name: nifi-python-working-directory
            emptyDir: {}

2. Use git-sync

As an alternative you can use git-sync to keep your Python processors up to date. You need to add a sidecar using podOverrides that syncs into a shared volume between the nifi and git-sync container.

The following snippet can serve as a starting point (the Git repo has the folder processors with the Python scripts inside).

spec:
  nodes:
    podOverrides:
      spec:
        containers:
          - name: nifi
            volumeMounts:
              - name: nifi-python-extensions
                mountPath: /nifi-python-extensions
              - name: nifi-python-working-directory
                mountPath: /nifi-python-working-directory
          - name: git-sync
            image: registry.k8s.io/git-sync/git-sync:v4.2.3
            args:
              - --repo=https://github.com/stackabletech/nifi-talk
              - --root=/nifi-python-extensions
              - --period=10s
            volumeMounts:
              - name: nifi-python-extensions
                mountPath: /nifi-python-extensions
        volumes:
          - name: nifi-python-extensions
            emptyDir: {}
          - name: nifi-python-working-directory
            emptyDir: {}

Afterwards you need to update your source directory (the one you added previously) accordingly, to point into the Git subfolder you have.

spec:
  nodes:
    configOverrides:
      nifi.properties:
        # Replace the property from the previous step
        # Format is /nifi-python-extensions/<git-repo-name>/<git-folder>/
        nifi.python.extensions.source.directory.custom: >
          /nifi-python-extensions/nifi-talk/processors/

3. Use PersistentVolume

You can also mount a PVC below /nifi-python-extensions using podOverrides and shell into the NiFi Pod to make changes. However, the 1. Mount as ConfigMap or 2. Use git-sync approach is recommended.

Check processors have been loaded

NiFi logs every Python processor it found. You can use that to check if the processors have been loaded.

$ kubectl logs nifi-2-0-0-node-default-0 -c nifi \
      | grep 'Discovered.*Python Processor'
… INFO [main] …  Discovered Python Processor PythonZgrepProcessor
… INFO [main] …  Discovered Python Processor TransformOpenskyStates
… INFO [main] …  Discovered Python Processor UpdateAttributeFileLookup
… INFO [main] …  Discovered or updated 3 Python Processors in 64 millis