Spark Connect

Support for Apache Spark Connect is considered experimental and is subject to change in future releases. Spark Connect is a young technology and there are important questions to be answered yet, mostly related to security and multi-tenancy.

Apache Spark Connect is a remote procedure call (RPC) server that allows clients to run Spark applications on a remote cluster. Clients can connect to the Spark Connect server using a variety of programming languages, editors and IDEs without needing to install Spark locally.

The Stackable Spark operator can set up Spark Connect servers backed by Kubernetes as a distributed execution engine.

Deployment

The example below demonstrates how to set up a Spark Connect server and apply some customizations.

---
apiVersion: spark.stackable.tech/v1alpha1
kind: SparkConnectServer
metadata:
  name: spark-connect (1)
spec:
  image:
    productVersion: "3.5.8" (2)
    pullPolicy: IfNotPresent
  args:
    - "--packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.11.0" (3)
  server:
    podOverrides:
      spec:
        containers:
          - name: spark
            env:
              - name: DEMO_GREETING (4)
                value: "Hello"
    jvmArgumentOverrides:
      add:
        - -Dmy.custom.jvm.arg=customValue (5)
    config:
      logging:
        enableVectorAgent: false
        containers:
          spark:
            custom:
              configMap: spark-connect-log-config (6)
    configOverrides:
      spark-defaults.conf:
        spark.driver.cores: "3" (7)
  executor:
    configOverrides:
      spark-defaults.conf:
        spark.executor.memoryOverhead: "1m" (8)
        spark.executor.instances: "3"
    config:
      logging:
        enableVectorAgent: false
        containers:
          spark:
            custom:
              configMap: spark-connect-log-config
1 The name of the Spark Connect server.
2 Version of the Spark Connect server.
3 Additional package to install when starting the Spark Connect server and executors.
4 Environment variable to be created via podOverrides. Alternatively, the environment variable can be set in the spec.server.envOverrides section.
5 Additional argument to be passed to the Spark Connect JVM settings. Do not use this to tweak heap settings. Use spec.server.jvmOptions instead.
6 A custom log4j configuration file to be used by the Spark Connect server. The config map must have an entry called log4j.properties.
7 Customize the driver properties in the server role. The number of cores here is not related to Kubernetes cores!
8 Customize spark.executor.* and spark.kubernetes.executor.* in the executor role.

Monitoring

The operator creates a Kubernetes service dedicated specifically to collect metrics for Spark Connect instances with Prometheus. The service name follows the convention <stacklet name>-server-metrics. This service exposes Prometheus metrics at the following endpoints:

  • <service name>:4040/metrics/prometheus for driver instances.

  • <service name>:4040/metrics/executors/prometheus for executor instances.

To customize the metrics configuration use the `spec.server.configOverrides' like this:

spec:
  server:
    configOverrides:
      metrics.properties:
        applications.sink.prometheusServlet.path: "/metrics/applications/prometheus"

The example above adds a new endpoint for application metrics.

Connect to S3

There are multiple ways to connect a Spark Connect server to S3:

  1. Configure a S3 connection using the s3connections.stackable.tech resource. Use this method when you are sure all clients are equally allowed to access all buckets on the given connection.

  2. Configure a list of S3 buckets using the s3buckets.stackable.tech resources. Use this when you want to restrict access to specific buckets but don’t need to set up different permissions for different clients.

  3. A combination of both.

For more details on how the Stackable Data Platform manages S3 resources see the S3 resources page.

In the simplest case, you can just set up a S3 connection and the Spark Connect server will automatically make it available to clients. The example below demonstrates how to set up a Spark Connect server with a S3 connection.

---
apiVersion: spark.stackable.tech/v1alpha1
kind: SparkConnectServer
metadata:
  name: spark-connect
spec:
  image:
    productVersion: 4.1.1
  connectors:
    s3connection:
      reference: s3-connection
...

In a more complex use case, clients can read data from the two buckets on AWS, transform it and write to a local corporate S3 instance. The example below demonstrates how to set up a Spark Connect server with with two S3 buckets and a connection.

---
apiVersion: spark.stackable.tech/v1alpha1
kind: SparkConnectServer
metadata:
  name: spark-connect
spec:
  image:
    productVersion: 4.1.1
  connectors:
    s3buckets:
      - reference: aws-cust-1-ingest
      - reference: aws-cust-2-ingest
    s3connection:
      reference: corporate-s3-connection
...

Kerberos

Kerberos support for Spark Connect is not a first-class feature of the operator. The setup described here is a manual configuration that uses podOverrides, configOverrides, envOverrides and an init container.

A Spark Connect server can authenticate to a Kerberos-secured service, such as a Apache Hive metastore or Apache Hadoop HDFS. There is, however, an important caveat: the Spark Connect server runs in Spark’s client deploy mode. Spark only performs the automatic keytab login (UserGroupInformation.loginUserFromKeytab) for YARN, local, Mesos and the Kubernetes cluster-mode driver — not for the Kubernetes client mode that the Connect server uses. As a consequence, setting spark.kerberos.keytab and spark.kerberos.principal alone does not obtain a Kerberos ticket (TGT), and the SASL/GSSAPI handshake to the metastore fails with GSS initiate failed / Failed to find any Kerberos tgt.

The workaround is to obtain the ticket yourself with an init container that runs kinit before the Spark Connect server JVM starts. The init container reads the keytab and writes a Kerberos credential cache to an emptyDir volume that is shared with the spark container; the server JVM then picks the ticket up from that cache. An init container is required because nothing in the client-mode server process performs the login automatically, so the ticket must exist in the credential cache before the JVM opens the metastore connection.

The relevant parts of the SparkConnectServer are shown below.

spec:
  server:
    envOverrides:
      # The shared credential cache populated by the init container. Hadoop's
      # UserGroupInformation reads KRB5CCNAME when Kerberos auth is enabled.
      KRB5CCNAME: /stackable/krb5/ccache
    jvmArgumentOverrides:
      add:
        # The JVM does NOT read the KRB5_CONFIG environment variable (that is an
        # MIT C-library variable). It reads this system property (or /etc/krb5.conf).
        - -Djava.security.krb5.conf=/stackable/kerberos/krb5.conf
        # The Spark Connect execute thread does not carry the ticket in its
        # Subject, so JGSS must fall back to the ambient credential cache.
        - -Djavax.security.auth.useSubjectCredsOnly=false
    configOverrides:
      spark-defaults.conf:
        spark.hadoop.hadoop.security.authentication: "kerberos"
        spark.hadoop.hive.metastore.uris: "thrift://hive-metastore:9083"
        spark.hadoop.hive.metastore.sasl.enabled: "true"
        # Use the literal metastore principal, not _HOST (which would resolve to
        # the connection host instead of the metastore's service principal).
        spark.hadoop.hive.metastore.kerberos.principal: "hive/hive.example.svc.cluster.local@EXAMPLE.COM"
    podOverrides:
      spec:
        initContainers:
          - name: kinit
            image: oci.stackable.tech/sdp/spark-k8s:4.1.1-stackable0.0.0-dev
            command: ["/bin/bash", "-euo", "pipefail", "-c"]
            args:
              - kinit -kt /stackable/kerberos/keytab spark-connect/spark-connect.example.svc.cluster.local@EXAMPLE.COM
            env:
              - name: KRB5_CONFIG
                value: /stackable/kerberos/krb5.conf
              - name: KRB5CCNAME
                value: /stackable/krb5/ccache
            volumeMounts:
              - name: kerberos
                mountPath: /stackable/kerberos
              - name: krb5-ccache
                mountPath: /stackable/krb5
        containers:
          - name: spark
            volumeMounts:
              - name: kerberos
                mountPath: /stackable/kerberos
              - name: krb5-ccache
                mountPath: /stackable/krb5
        volumes:
          - name: krb5-ccache
            emptyDir: {}
          - name: kerberos
            ephemeral:
              volumeClaimTemplate:
                metadata:
                  annotations:
                    secrets.stackable.tech/class: kerberos
                    secrets.stackable.tech/scope: service=spark-connect
                    secrets.stackable.tech/kerberos.service.names: spark-connect
                spec:
                  storageClassName: secrets.stackable.tech
                  accessModes: ["ReadWriteOnce"]
                  resources:
                    requests:
                      storage: "1"

The keytab and krb5.conf are only required on the server, which is the Spark driver and the only component that talks to the metastore. Executors that merely read and write data files on (non-kerberized) S3 do not need Kerberos credentials.

Spark History Server

Unfortunately integration with the Spark History Server is not supported yet. The connect server seems to ignore the spark.eventLog properties while also prohibiting clients to set them programmatically.

Notable Omissions

The following features are not supported by the Stackable Spark operator yet

  • Authorization and authentication. Currently, anyone with access to the Spark Connect service can run jobs.

  • Volumes and volume mounts can be added only with pod overrides.

  • Job dependencies must be provisioned as custom images or via --packages or --jars arguments.

Known Issues

  • Distributed operations on Apache Iceberg tables fail on the executors with Spark 3.5.x. PySpark calls that trigger a distributed job over an Iceberg table — for example DataFrame.collect() or DataFrame.count() — fail with java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3. Driver-only operations still work: DDL (CREATE/DROP), INSERT of small data, DataFrame.show() of a small result, SHOW TABLES and DESCRIBE. The cause is upstream in Spark Connect: executor tasks run under a per-session class loader (which fetches session classes from the driver’s artifact server) that cannot deserialize the Iceberg scan closures. It is independent of how the Iceberg runtime is provisioned — it reproduces with --packages, with the jar placed on the system class path (/stackable/spark/jars), with spark.addArtifact(), and with combinations of these. This is fixed in Spark 4 (SPARK-51537 / apache/spark#50475), where distributed reads and writes of Iceberg tables work; it is not backported to the 3.5.x line (see the still-open SPARK-46032).