Usage

Examples

The following examples have the following spec fields in common:

  • version: the current version is "1.0"

  • sparkImage: the docker image that will be used by job, driver and executor pods. This can be provided by the user.

  • mode: only cluster is currently supported

  • mainApplicationFile: the artifact (Java, Scala or Python) that forms the basis of the Spark job.

  • args: these are the arguments passed directly to the application. In the examples below it is e.g. the input path for part of the public New York taxi dataset.

  • sparkConf: these list spark configuration settings that are passed directly to spark-submit and which are best defined explicitly by the user. Since the SparkApplication "knows" that there is an external dependency (the s3 bucket where the data and/or the application is located) and how that dependency should be treated (i.e. what type of credential checks are required, if any), it is better to have these things declared together.

  • volumes: refers to any volumes needed by the SparkApplication, in this case an underlying PersistentVoulmeClaim.

  • driver: driver-specific settings, including any volume mounts.

  • executor: executor-specific settings, including any volume mounts.

Job-specific settings are annotated below.

Pyspark: externally located artifact and dataset

---
apiVersion: spark.stackable.tech/v1alpha1
kind: SparkApplication
metadata:
  name: example-sparkapp-external-dependencies
  namespace: default
spec:
  version: "1.0"
  sparkImage: docker.stackable.tech/stackable/pyspark-k8s:3.3.0-stackable0.3.0
  mode: cluster
  mainApplicationFile: s3a://stackable-spark-k8s-jars/jobs/ny_tlc_report.py (1)
  args:
    - "--input 's3a://nyc-tlc/trip data/yellow_tripdata_2021-07.csv'" (2)
  deps:
    requirements:
      - tabulate==0.8.9  (3)
  sparkConf:  (4)
    "spark.hadoop.fs.s3a.aws.credentials.provider": "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider"
    "spark.driver.extraClassPath": "/dependencies/jars/*"
    "spark.executor.extraClassPath": "/dependencies/jars/*"
  volumes:
    - name: job-deps  (5)
      persistentVolumeClaim:
        claimName: pvc-ksv
  driver:
    volumeMounts:
      - name: job-deps
        mountPath: /dependencies  (6)
  executor:
    instances: 3
    volumeMounts:
      - name: job-deps
        mountPath: /dependencies  (6)
1 Job python artifact (external)
2 Job argument (external)
3 List of python job requirements: these will be installed in the pods via pip
4 Spark dependencies: the credentials provider (the user knows what is relevant here) plus dependencies needed to access external resources (in this case, in s3)
5 the name of the volume mount backed by a PersistentVolumeClaim that must be pre-existing
6 the path on the volume mount: this is referenced in the sparkConf section where the extra class path is defined for the driver and executors

Pyspark: externally located dataset, artifact available via PVC/volume mount

---
apiVersion: spark.stackable.tech/v1alpha1
kind: SparkApplication
metadata:
  name: example-sparkapp-image
  namespace: default
spec:
  version: "1.0"
  image: docker.stackable.tech/stackable/ny-tlc-report:0.1.0 (1)
  sparkImage: docker.stackable.tech/stackable/pyspark-k8s:3.3.0-stackable0.3.0
  mode: cluster
  mainApplicationFile: local:///stackable/spark/jobs/ny_tlc_report.py (2)
  args:
    - "--input 's3a://nyc-tlc/trip data/yellow_tripdata_2021-07.csv'" (3)
  deps:
    requirements:
      - tabulate==0.8.9 (4)
  sparkConf: (5)
    "spark.hadoop.fs.s3a.aws.credentials.provider": "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider"
  job:
    resources:
      cpu:
        min: "1"
        max: "1"
      memory:
        limit: "1Gi"
  driver:
    resources:
      cpu:
        min: "1"
        max: "1500m"
      memory:
        limit: "1Gi"
  executor:
    instances: 3
    resources:
      cpu:
        min: "1"
        max: "4"
      memory:
        limit: "2Gi"
1 Job image: this contains the job artifact that will be retrieved from the volume mount backed by the PVC
2 Job python artifact (local)
3 Job argument (external)
4 List of python job requirements: these will be installed in the pods via pip
5 Spark dependencies: the credentials provider (the user knows what is relevant here) plus dependencies needed to access external resources (in this case, in an S3 store)

JVM (Scala): externally located artifact and dataset

---
apiVersion: spark.stackable.tech/v1alpha1
kind: SparkApplication
metadata:
  name: example-sparkapp-pvc
  namespace: default
spec:
  version: "1.0"
  sparkImage: docker.stackable.tech/stackable/spark-k8s:3.3.0-stackable0.3.0
  mode: cluster
  mainApplicationFile: s3a://stackable-spark-k8s-jars/jobs/ny-tlc-report-1.0-SNAPSHOT.jar (1)
  mainClass: org.example.App (2)
  args:
    - "'s3a://nyc-tlc/trip data/yellow_tripdata_2021-07.csv'"
  sparkConf: (3)
    "spark.hadoop.fs.s3a.aws.credentials.provider": "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider"
    "spark.driver.extraClassPath": "/dependencies/jars/*"
    "spark.executor.extraClassPath": "/dependencies/jars/*"
  volumes:
    - name: job-deps (4)
      persistentVolumeClaim:
        claimName: pvc-ksv
  driver:
    volumeMounts:
      - name: job-deps
        mountPath: /dependencies (5)
  executor:
    instances: 3
    volumeMounts:
      - name: job-deps
        mountPath: /dependencies (5)
1 Job artifact located on S3.
2 Job main class
3 Spark dependencies: the credentials provider (the user knows what is relevant here) plus dependencies needed to access external resources (in this case, in an S3 store, accessed without credentials)
4 the name of the volume mount backed by a PersistentVolumeClaim that must be pre-existing
5 the path on the volume mount: this is referenced in the sparkConf section where the extra class path is defined for the driver and executors

JVM (Scala): externally located artifact accessed with credentials

---
apiVersion: spark.stackable.tech/v1alpha1
kind: SparkApplication
metadata:
  name: example-sparkapp-s3-private
spec:
  version: "1.0"
  sparkImage: docker.stackable.tech/stackable/spark-k8s:3.3.0-stackable0.3.0
  mode: cluster
  mainApplicationFile: s3a://my-bucket/spark-examples_2.12-3.3.0.jar (1)
  mainClass: org.apache.spark.examples.SparkPi (2)
  s3connection: (3)
    inline:
      host: test-minio
      port: 9000
      accessStyle: Path
      credentials: (4)
        secretClass: s3-credentials-class
  sparkConf: (5)
    spark.hadoop.fs.s3a.aws.credentials.provider: "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider" (6)
    spark.driver.extraClassPath: "/dependencies/jars/hadoop-aws-3.2.0.jar:/dependencies/jars/aws-java-sdk-bundle-1.11.375.jar"
    spark.executor.extraClassPath: "/dependencies/jars/hadoop-aws-3.2.0.jar:/dependencies/jars/aws-java-sdk-bundle-1.11.375.jar"
  executor:
    instances: 3
1 Job python artifact (located in an S3 store)
2 Artifact class
3 S3 section, specifying the existing secret and S3 end-point (in this case, MinIO)
4 Credentials referencing a secretClass (not shown in is example)
5 Spark dependencies: the credentials provider (the user knows what is relevant here) plus dependencies needed to access external resources…​
6 …​in this case, in an S3 store, accessed with the credentials defined in the secret

JVM (Scala): externally located artifact accessed with job arguments provided via configuration map

---
apiVersion: v1
kind: ConfigMap
metadata:
  name: cm-job-arguments (1)
data:
  job-args.txt: |
    s3a://nyc-tlc/trip data/yellow_tripdata_2021-07.csv (2)
---
apiVersion: spark.stackable.tech/v1alpha1
kind: SparkApplication
metadata:
  name: ny-tlc-report-configmap
  namespace: default
spec:
  version: "1.0"
  sparkImage: docker.stackable.tech/stackable/spark-k8s:3.3.0-stackable0.3.0
  mode: cluster
  mainApplicationFile: s3a://stackable-spark-k8s-jars/jobs/ny-tlc-report-1.1.0.jar (3)
  mainClass: tech.stackable.demo.spark.NYTLCReport
  volumes:
    - name: cm-job-arguments
      configMap:
        name: cm-job-arguments (4)
  args:
    - "--input /arguments/job-args.txt" (5)
  sparkConf:
    "spark.hadoop.fs.s3a.aws.credentials.provider": "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider"
  driver:
    volumeMounts:
      - name: cm-job-arguments (6)
        mountPath: /arguments  (7)
  executor:
    instances: 3
    volumeMounts:
      - name: cm-job-arguments (6)
        mountPath: /arguments (7)
1 Name of the configuration map
2 Argument required by the job
3 Job scala artifact that requires an input argument
4 The volume backed by the configuration map
5 The expected job argument, accessed via the mounted configuration map file
6 The name of the volume backed by the configuration map that will be mounted to the driver/executor
7 The mount location of the volume (this will contain a file /arguments/job-args.txt)

S3 bucket specification

You can specify S3 connection details directly inside the SparkApplication specification or by referring to an external S3Bucket custom resource.

To specify S3 connection details directly as part of the SparkApplication resource you add an inline connection configuration as shown below.

s3connection: (1)
  inline:
    host: test-minio (2)
    port: 9000 (3)
    accessStyle: Path
    credentials:
      secretClass: s3-credentials-class  (4)
1 Entry point for the S3 connection configuration.
2 Connection host.
3 Optional connection port.
4 Name of the Secret object expected to contain the following keys: ACCESS_KEY_ID and SECRET_ACCESS_KEY

It is also possible to configure the connection details as a separate Kubernetes resource and only refer to that object from the SparkApplication like this:

s3connection:
  reference: s3-connection-resource (1)
1 Name of the connection resource with connection details.

The resource named s3-connection-resource is then defined as shown below:

---
apiVersion: s3.stackable.tech/v1alpha1
kind: S3Connection
metadata:
  name: s3-connection-resource
spec:
  host: test-minio
  port: 9000
  accessStyle: Path
  credentials:
    secretClass: minio-credentials-class

This has the advantage that one connection configuration can be shared across SparkApplications and reduces the cost of updating these details.

Resource Requests

Stackable operators handle resource requests in a sligtly different manner than Kubernetes. Resource requests are defined on role or group level. See Roles and role groups for details on these concepts. On a role level this means that e.g. all workers will use the same resource requests and limits. This can be further specified on role group level (which takes priority to the role level) to apply different resources.

This is an example on how to specify CPU and memory resources using the Stackable Custom Resources:

---
apiVersion: example.stackable.tech/v1alpha1
kind: ExampleCluster
metadata:
  name: example
spec:
  workers: # role-level
    config:
      resources:
        cpu:
          min: 300m
          max: 600m
        memory:
          limit: 3Gi
    roleGroups: # role-group-level
      resources-from-role: # role-group 1
        replicas: 1
      resources-from-role-group: # role-group 2
        replicas: 1
        config:
          resources:
            cpu:
              min: 400m
              max: 800m
            memory:
              limit: 4Gi

In this case, the role group resources-from-role will inherit the resources specified on the role level. Resulting in a maximum of 3Gi memory and 600m CPU resources.

The role group resources-from-role-group has maximum of 4Gi memory and 800m CPU resources (which overrides the role CPU resources).

For Java products the actual used Heap memory is lower than the specified memory limit due to other processes in the Container requiring memory to run as well. Currently, 80% of the specified memory limits is passed to the JVM.

For memory only a limit can be specified, which will be set as memory request and limit in the Container. This is to always guarantee a Container the full amount memory during Kubernetes scheduling.

If no resources are configured explicitly, the operator uses the following defaults:

job:
  resources:
    cpu:
      min: '500m'
      max: "1"
    memory:
      limit: '1Gi'
driver:
  resources:
    cpu:
      min: '1'
      max: "2"
    memory:
      limit: '2Gi'
executor:
  resources:
    cpu:
      min: '1'
      max: "4"
    memory:
      limit: '4Gi'
The default values are most likely not sufficient to run a proper cluster in production. Please adapt according to your requirements. For more details regarding Kubernetes CPU limits see: Assign CPU Resources to Containers and Pods.

Spark allocates a default amount of non-heap memory based on the type of job (JVM or non-JVM). This is taken into account when defining memory settings based exclusively on the resource limits, so that the "declared" value is the actual total value (i.e. including memory overhead). This may result in minor deviations from the stated resource value due to rounding differences.

It is possible to define Spark resources either directly by setting configuration properties listed under sparkConf, or by using resource limits. If both are used, then sparkConf properties take precedence. It is recommended for the sake of clarity to use either one or the other.

CRD argument coverage

Below are listed the CRD fields that can be defined by the user:

CRD field Remarks

apiVersion

spark.stackable.tech/v1alpha1

kind

SparkApplication

metadata.name

Job name

spec.version

"1.0"

spec.mode

cluster or client. Currently only cluster is supported

spec.image

User-supplied image containing spark-job dependencies that will be copied to the specified volume mount

spec.sparkImage

Spark image which will be deployed to driver and executor pods, which must contain spark environment needed by the job e.g. docker.stackable.tech/stackable/spark-k8s:3.3.0-stackable0.3.0

spec.sparkImagePullPolicy

Optional Enum (one of Always, IfNotPresent or Never) that determines the pull policy of the spark job image

spec.sparkImagePullSecrets

An optional list of references to secrets in the same namespace to use for pulling any of the images used by a SparkApplication resource. Each reference has a single property (name) that must contain a reference to a valid secret

spec.mainApplicationFile

The actual application file that will be called by spark-submit

spec.mainClass

The main class i.e. entry point for JVM artifacts

spec.args

Arguments passed directly to the job artifact

spec.s3connection

S3 connection specification. See the S3 bucket specification for more details.

spec.sparkConf

A map of key/value strings that will be passed directly to spark-submit

spec.deps.requirements

A list of python packages that will be installed via pip

spec.deps.packages

A list of packages that is passed directly to spark-submit

spec.deps.excludePackages

A list of excluded packages that is passed directly to spark-submit

spec.deps.repositories

A list of repositories that is passed directly to spark-submit

spec.volumes

A list of volumes

spec.volumes.name

The volume name

spec.volumes.persistentVolumeClaim.claimName

The persistent volume claim backing the volume

spec.job.resources

Resources specification for the initiating Job

spec.driver.resources

Resources specification for the driver Pod

spec.driver.volumeMounts

A list of mounted volumes for the driver

spec.driver.volumeMounts.name

Name of mount

spec.driver.volumeMounts.mountPath

Volume mount path

spec.driver.nodeSelector

A dictionary of labels to use for node selection when scheduling the driver N.B. this assumes there are no implicit node dependencies (e.g. PVC, VolumeMount) defined elsewhere.

spec.executor.resources

Resources specification for the executor Pods

spec.executor.instances

Number of executor instances launched for this job

spec.executor.volumeMounts

A list of mounted volumes for each executor

spec.executor.volumeMounts.name

Name of mount

spec.executor.volumeMounts.mountPath

Volume mount path

spec.executor.nodeSelector

A dictionary of labels to use for node selection when scheduling the executors N.B. this assumes there are no implicit node dependencies (e.g. PVC, VolumeMount) defined elsewhere.