Spark Connect
| Support for Apache Spark Connect is considered experimental and is subject to change in future releases. Spark Connect is a young technology and there are important questions to be answered yet, mostly related to security and multi-tenancy. |
Apache Spark Connect is a remote procedure call (RPC) server that allows clients to run Spark applications on a remote cluster. Clients can connect to the Spark Connect server using a variety of programming languages, editors and IDEs without needing to install Spark locally.
The Stackable Spark operator can set up Spark Connect servers backed by Kubernetes as a distributed execution engine.
Deployment
The example below demonstrates how to set up a Spark Connect server and apply some customizations.
---
apiVersion: spark.stackable.tech/v1alpha1
kind: SparkConnectServer
metadata:
name: spark-connect (1)
spec:
image:
productVersion: "3.5.8" (2)
pullPolicy: IfNotPresent
args:
- "--packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.11.0" (3)
server:
podOverrides:
spec:
containers:
- name: spark
env:
- name: DEMO_GREETING (4)
value: "Hello"
jvmArgumentOverrides:
add:
- -Dmy.custom.jvm.arg=customValue (5)
config:
logging:
enableVectorAgent: false
containers:
spark:
custom:
configMap: spark-connect-log-config (6)
configOverrides:
spark-defaults.conf:
spark.driver.cores: "3" (7)
executor:
configOverrides:
spark-defaults.conf:
spark.executor.memoryOverhead: "1m" (8)
spark.executor.instances: "3"
config:
logging:
enableVectorAgent: false
containers:
spark:
custom:
configMap: spark-connect-log-config
| 1 | The name of the Spark Connect server. |
| 2 | Version of the Spark Connect server. |
| 3 | Additional package to install when starting the Spark Connect server and executors. |
| 4 | Environment variable to be created via podOverrides. Alternatively, the environment variable can be set in the spec.server.envOverrides section. |
| 5 | Additional argument to be passed to the Spark Connect JVM settings. Do not use this to tweak heap settings. Use spec.server.jvmOptions instead. |
| 6 | A custom log4j configuration file to be used by the Spark Connect server. The config map must have an entry called log4j.properties. |
| 7 | Customize the driver properties in the server role. The number of cores here is not related to Kubernetes cores! |
| 8 | Customize spark.executor.* and spark.kubernetes.executor.* in the executor role. |
Monitoring
The operator creates a Kubernetes service dedicated specifically to collect metrics for Spark Connect instances with Prometheus.
The service name follows the convention <stacklet name>-server-metrics.
This service exposes Prometheus metrics at the following endpoints:
-
<service name>:4040/metrics/prometheusfor driver instances. -
<service name>:4040/metrics/executors/prometheusfor executor instances.
To customize the metrics configuration use the `spec.server.configOverrides' like this:
spec:
server:
configOverrides:
metrics.properties:
applications.sink.prometheusServlet.path: "/metrics/applications/prometheus"
The example above adds a new endpoint for application metrics.
Connect to S3
There are multiple ways to connect a Spark Connect server to S3:
-
Configure a S3 connection using the
s3connections.stackable.techresource. Use this method when you are sure all clients are equally allowed to access all buckets on the given connection. -
Configure a list of S3 buckets using the
s3buckets.stackable.techresources. Use this when you want to restrict access to specific buckets but don’t need to set up different permissions for different clients. -
A combination of both.
For more details on how the Stackable Data Platform manages S3 resources see the S3 resources page.
In the simplest case, you can just set up a S3 connection and the Spark Connect server will automatically make it available to clients. The example below demonstrates how to set up a Spark Connect server with a S3 connection.
---
apiVersion: spark.stackable.tech/v1alpha1
kind: SparkConnectServer
metadata:
name: spark-connect
spec:
image:
productVersion: 4.1.1
connectors:
s3connection:
reference: s3-connection
...
In a more complex use case, clients can read data from the two buckets on AWS, transform it and write to a local corporate S3 instance. The example below demonstrates how to set up a Spark Connect server with with two S3 buckets and a connection.
---
apiVersion: spark.stackable.tech/v1alpha1
kind: SparkConnectServer
metadata:
name: spark-connect
spec:
image:
productVersion: 4.1.1
connectors:
s3buckets:
- reference: aws-cust-1-ingest
- reference: aws-cust-2-ingest
s3connection:
reference: corporate-s3-connection
...
Kerberos
Kerberos support for Spark Connect is not a first-class feature of the operator. The setup described here is a manual configuration that uses podOverrides, configOverrides, envOverrides and an init container.
|
A Spark Connect server can authenticate to a Kerberos-secured service, such as a Apache Hive metastore or Apache Hadoop HDFS.
There is, however, an important caveat: the Spark Connect server runs in Spark’s client deploy mode.
Spark only performs the automatic keytab login (UserGroupInformation.loginUserFromKeytab) for YARN, local, Mesos and the Kubernetes cluster-mode driver — not for the Kubernetes client mode that the Connect server uses.
As a consequence, setting spark.kerberos.keytab and spark.kerberos.principal alone does not obtain a Kerberos ticket (TGT), and the SASL/GSSAPI handshake to the metastore fails with GSS initiate failed / Failed to find any Kerberos tgt.
The workaround is to obtain the ticket yourself with an init container that runs kinit before the Spark Connect server JVM starts.
The init container reads the keytab and writes a Kerberos credential cache to an emptyDir volume that is shared with the spark container; the server JVM then picks the ticket up from that cache.
An init container is required because nothing in the client-mode server process performs the login automatically, so the ticket must exist in the credential cache before the JVM opens the metastore connection.
The relevant parts of the SparkConnectServer are shown below.
spec:
server:
envOverrides:
# The shared credential cache populated by the init container. Hadoop's
# UserGroupInformation reads KRB5CCNAME when Kerberos auth is enabled.
KRB5CCNAME: /stackable/krb5/ccache
jvmArgumentOverrides:
add:
# The JVM does NOT read the KRB5_CONFIG environment variable (that is an
# MIT C-library variable). It reads this system property (or /etc/krb5.conf).
- -Djava.security.krb5.conf=/stackable/kerberos/krb5.conf
# The Spark Connect execute thread does not carry the ticket in its
# Subject, so JGSS must fall back to the ambient credential cache.
- -Djavax.security.auth.useSubjectCredsOnly=false
configOverrides:
spark-defaults.conf:
spark.hadoop.hadoop.security.authentication: "kerberos"
spark.hadoop.hive.metastore.uris: "thrift://hive-metastore:9083"
spark.hadoop.hive.metastore.sasl.enabled: "true"
# Use the literal metastore principal, not _HOST (which would resolve to
# the connection host instead of the metastore's service principal).
spark.hadoop.hive.metastore.kerberos.principal: "hive/hive.example.svc.cluster.local@EXAMPLE.COM"
podOverrides:
spec:
initContainers:
- name: kinit
image: oci.stackable.tech/sdp/spark-k8s:4.1.1-stackable0.0.0-dev
command: ["/bin/bash", "-euo", "pipefail", "-c"]
args:
- kinit -kt /stackable/kerberos/keytab spark-connect/spark-connect.example.svc.cluster.local@EXAMPLE.COM
env:
- name: KRB5_CONFIG
value: /stackable/kerberos/krb5.conf
- name: KRB5CCNAME
value: /stackable/krb5/ccache
volumeMounts:
- name: kerberos
mountPath: /stackable/kerberos
- name: krb5-ccache
mountPath: /stackable/krb5
containers:
- name: spark
volumeMounts:
- name: kerberos
mountPath: /stackable/kerberos
- name: krb5-ccache
mountPath: /stackable/krb5
volumes:
- name: krb5-ccache
emptyDir: {}
- name: kerberos
ephemeral:
volumeClaimTemplate:
metadata:
annotations:
secrets.stackable.tech/class: kerberos
secrets.stackable.tech/scope: service=spark-connect
secrets.stackable.tech/kerberos.service.names: spark-connect
spec:
storageClassName: secrets.stackable.tech
accessModes: ["ReadWriteOnce"]
resources:
requests:
storage: "1"
The keytab and krb5.conf are only required on the server, which is the Spark driver and the only component that talks to the metastore.
Executors that merely read and write data files on (non-kerberized) S3 do not need Kerberos credentials.
Spark History Server
Unfortunately integration with the Spark History Server is not supported yet.
The connect server seems to ignore the spark.eventLog properties while also prohibiting clients to set them programmatically.
Notable Omissions
The following features are not supported by the Stackable Spark operator yet
-
Authorization and authentication. Currently, anyone with access to the Spark Connect service can run jobs.
-
Volumes and volume mounts can be added only with pod overrides.
-
Job dependencies must be provisioned as custom images or via
--packagesor--jarsarguments.
Known Issues
-
Distributed operations on Apache Iceberg tables fail on the executors with Spark 3.5.x. PySpark calls that trigger a distributed job over an Iceberg table — for example
DataFrame.collect()orDataFrame.count()— fail withjava.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3. Driver-only operations still work: DDL (CREATE/DROP),INSERTof small data,DataFrame.show()of a small result,SHOW TABLESandDESCRIBE. The cause is upstream in Spark Connect: executor tasks run under a per-session class loader (which fetches session classes from the driver’s artifact server) that cannot deserialize the Iceberg scan closures. It is independent of how the Iceberg runtime is provisioned — it reproduces with--packages, with the jar placed on the system class path (/stackable/spark/jars), withspark.addArtifact(), and with combinations of these. This is fixed in Spark 4 (SPARK-51537 / apache/spark#50475), where distributed reads and writes of Iceberg tables work; it is not backported to the 3.5.x line (see the still-open SPARK-46032).