Exposing HTTP processors
You can use the ListenHTTP
processor to start a HTTP webserver and listen for incoming connections.
In this case POST
requests are being send to NiFi, which acts as a webhook to receive data here.
You should also be able to serve GET
requests by using the HandleHttpRequest
processor, however this is currently not demonstrated in this guide.
1. Create ListenHTTP
processor
Let’s start by creating a ListenHTTP
processor:
-
Set
Base Path
to an empty string -
Set
Listening Port
to8042
. -
Set
Record Reader
to anJsonTreeReader
andRecord Writer
to anJsonRecordSetWriter
instance. This guide assumes that JSON documents are being posted to the listener. For other document formats, change the record reader and writer accordingly.
You should end up with something similar to
2. Expose ListenHTTP
processor
Afterwards you need to expose the processor to the outside world.
For that to work, first create a Service object as follows.
In this guide, the NifiCluster is called simple-nifi
. The name of the Nifi cluster must match the value of the app.kubernetes.io/instance
as shown below.
apiVersion: v1
kind: Service
metadata:
name: simple-nifi-listen-http # Update according to NifiCluster name
spec:
type: ClusterIP
selector:
app.kubernetes.io/component: node
app.kubernetes.io/instance: simple-nifi # Update according to NifiCluster name
app.kubernetes.io/name: nifi
ports:
- name: http
port: 8042
protocol: TCP
targetPort: 8042
In case you don’t have an ingress controller, you can set the Service type to LoadBalancer
or NodePort
instead and should be ready to go.
If you are using an ingress controller, an Ingress could look something like
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: simple-nifi-listen-http # Update according to NifiCluster name
spec:
rules:
- host: simple-nifi-listen-http.my.corp # Update to your host
http:
paths:
- pathType: Prefix
path: /
backend:
service:
name: simple-nifi-listen-http # Update to your Service name
port:
number: 8042
3. Route
The next step is to handle different kind of messages coming in, based on the HTTP path.
First, create a RouteOnAttribute
processor and connect it to the success
output of the ListenHTTP
processor as shown below.
Start the ListenHTTP
processor.
The ListenHTTP
processor should now generate a FlowFile for every incoming HTTP request.
You can test this by calling curl --verbose --data '{"hello":"NiFi"}' https://simple-nifi-listen-http.my.corp
, you should get a HTTP/2 200
response.
If you get a 503 Service Temporarily Unavailable , this probably means your Ingress controller was not able to
reach your ListenHTTP processor. Check that the processor is running and configured correctly.
|
This should result in one FlowFile being queued, as shown in the picture above and below.
In the RouteOnAttribute
processor, add a field called /webhook/foo
with the value of ${"restlistener.request.uri":equals('/webhook/foo')}
.
You can replace /webhook/foo
with whatever URL your HTTP service should be reachable.
This guide also added /webhook/bar
and /webhook/baz
in a similar way.
The RouteOnAttribute
processor now has one unmatched
output as well as one for every field you defined.
You can connect a processor for every HTTP path you routed on.
The result should look something like below and should allow you to consume many different HTTP POST
requests.