Custom Python processors
In NiFi 2.0, support for custom processors written in Python was added. The Stackable images already contain the required tooling, such as - obviously - a supported Python version.
General configuration
spec:
nodes:
configOverrides:
nifi.properties:
# The command used to launch Python.
# This property must be set to enable Python-based processors.
nifi.python.command: python3
# The directory that NiFi should look in to find custom Python-based
# Processors.
nifi.python.extensions.source.directory.custom: /nifi-python-extensions
# The directory that contains the Python framework for communicating
# between the Python and Java processes.
nifi.python.framework.source.directory: /stackable/nifi/python/framework/
# The working directory where NiFi should store artifacts
# This property defaults to ./work/python but if you want to mount an
# emptyDir for the working directory then another directory has to be
# set to avoid ownership conflicts with ./work/nar.
nifi.python.working.directory: /nifi-python-working-directory
Getting Python scripts into NiFi
NiFi should hot-reload the Python scripts. You might need to refresh your browser window to see the new processor. |
1. Mount as ConfigMap
The easiest way is defining a ConfigMap and mounting it as follows. This way, the Python processors are stored and versioned alongside your NifiCluster itself.
apiVersion: v1
kind: ConfigMap
metadata:
name: nifi-python-extensions
data:
CreateFlowFileProcessor.py: |
from nifiapi.flowfilesource import FlowFileSource, FlowFileSourceResult
class CreateFlowFile(FlowFileSource):
class Java:
implements = ['org.apache.nifi.python.processor.FlowFileSource']
class ProcessorDetails:
version = '0.0.1-SNAPSHOT'
description = '''A Python processor that creates FlowFiles.'''
def __init__(self, **kwargs):
pass
def create(self, context):
return FlowFileSourceResult(
relationship = 'success',
attributes = {'greeting': 'hello'},
contents = 'Hello World!'
)
The Python script is taken from the offical NiFi Python developer guide.
You can add multiple Python scripts in the ConfigMap.
Afterwards we need to mount the Python scripts into /nifi-python-extensions
:
spec:
nodes:
podOverrides:
spec:
containers:
- name: nifi
volumeMounts:
- name: nifi-python-extensions
mountPath: /nifi-python-extensions
- name: nifi-python-working-directory
mountPath: /nifi-python-working-directory
volumes:
- name: nifi-python-extensions
configMap:
name: nifi-python-extensions
- name: nifi-python-working-directory
emptyDir: {}
2. Use git-sync
As an alternative you can use git-sync
to keep your Python processors up to date.
You need to add a sidecar using podOverrides that syncs into a shared volume between the nifi
and git-sync
container.
The following snippet can serve as a starting point (the Git repo has the folder processors
with the Python scripts inside).
spec:
nodes:
podOverrides:
spec:
containers:
- name: nifi
volumeMounts:
- name: nifi-python-extensions
mountPath: /nifi-python-extensions
- name: nifi-python-working-directory
mountPath: /nifi-python-working-directory
- name: git-sync
image: registry.k8s.io/git-sync/git-sync:v4.2.3
args:
- --repo=https://github.com/stackabletech/nifi-talk
- --root=/nifi-python-extensions
- --period=10s
volumeMounts:
- name: nifi-python-extensions
mountPath: /nifi-python-extensions
volumes:
- name: nifi-python-extensions
emptyDir: {}
- name: nifi-python-working-directory
emptyDir: {}
Afterwards you need to update your source directory (the one you added previously) accordingly, to point into the Git subfolder you have.
spec:
nodes:
configOverrides:
nifi.properties:
# Replace the property from the previous step
# Format is /nifi-python-extensions/<git-repo-name>/<git-folder>/
nifi.python.extensions.source.directory.custom: >
/nifi-python-extensions/nifi-talk/processors/
3. Use PersistentVolume
You can also mount a PVC below /nifi-python-extensions
using podOverrides and shell into the NiFi Pod to make changes.
However, the 1. Mount as ConfigMap or 2. Use git-sync approach is recommended.
Check processors have been loaded
NiFi logs every Python processor it found. You can use that to check if the processors have been loaded.
$ kubectl logs nifi-2-0-0-node-default-0 -c nifi \
| grep 'Discovered.*Python Processor'
… INFO [main] … Discovered Python Processor PythonZgrepProcessor
… INFO [main] … Discovered Python Processor TransformOpenskyStates
… INFO [main] … Discovered Python Processor UpdateAttributeFileLookup
… INFO [main] … Discovered or updated 3 Python Processors in 64 millis