Implementing log aggregation

For a conceptual overview of logging aggregation, consult the logging concept page.



The Stackable Data Platform (SDP) does *not* implement product logging where none exists, but rather provides a way for the user to interact with whatever logging framework is already in place, and to ensure that the log output can be consolidated and preserved in an output sink (e.g. Opensearch). Products use various logging libraries such as Log4j, Log4j2, Logback, Python logging etc. and SDP logging aggregation enables:

  • the extension or overriding of the default product logging configuration (so that e.g. the log level can be changed): this assumes knowledge of the product logging library

  • the capture and parsing of stdout and stderr logging statements

  • the consolidation of both of the above into a standard logging format which will be written to the aggregation sink

Implementing logging for a given operator and product will require code:

  • in a logging module specific to the product

  • in the operator-rs framework library

  • in the product controller

These will be described in the next section.

Implementation details

Logging module

Most products will have a logging module (e.g. that contains two functions:


pub async fn resolve_vector_aggregator_address(
    client: &Client,
    ... // other parameters dependent on calling context
) -> Result<Option<String>> {

This function will typically be called early in the reconcile step to determine whether a Vector ConfigMap has been mounted and, if so, under which address the Vector process is running.

extend_config_map (or similar)

pub fn extend_config_map<C, K>(
    role_group: &RoleGroupRef<K>,
    vector_aggregator_address: Option<&str>,
    logging: &Logging<C>,
    ... // other parameters dependent on calling context
    cm_builder: &mut ConfigMapBuilder,
) -> Result<()> {

This function will typically be called from the product controller at the point at which the RoleGroup ConfigMap is being defined. It issues internal calls to the framework to conditionally retrieve:

  • a default logging configuration for the given product logging library e.g. product_logging::framework::create_log4j2_config

  • a Vector configuration called from product_logging::framework::create_vector_config

For the first of these, the framework provides pre-configured configurations for several familiar libraries. These can be overridden by providing a specific logging configuration in a ConfigMap directly in the custom resource. The second one is set by a boolean flag. Both these are shown in the snippet below:

        enableVectorAgent: true # deploy a vector container with the log agent and mount the vector.toml config
              configMap: spark-history-log-config # containing e.g. a log4j configuration

If a specific logging configuration is not yet available in the framework this can be added directly in For instance, a very simple local Java Logging configuration could be defined like this:

fn create_java_logging_config(
) -> String {
handlers=java.util.logging.FileHandler, java.util.logging.ConsoleHandler
java.util.logging.ConsoleHandler.formatter = java.util.logging.SimpleFormatter

        root_log_level = "INFO",
        console_log_level = "INFO"

Framework (operator-rs)

As mentioned above, the framework provides pre-defined logging configurations for standard logging libraries and Vector itself. It also provides a way of capturing standard shell output and writing it to two files (container.stdout.log and container.stderr.log). This is done by calling the function capture_shell_output.


The information for this file is retrieved by calling product_logging::framework::create_vector_config, which returns the file contents. Each [sources.*] section declares a logging type or source e.g.

type = "file"
include = ["{STACKABLE_LOG_DIR}/*/*.stdout.log"]

This files_stdout source can be referenced in a [transforms.*] section, several of which can be chained together. A simple example shown below takes JUL logging output, sets the log level, extracts the container and file names and adds a prefix to each message.

type = "file"
include = ["{STACKABLE_LOG_DIR}/*/*.logger"]

inputs = ["files_jul"]
type = "remap"
source = '''
.logger = "ROOT"
.level = "INFO"

inputs = ["processed_files_jul"]
type = "remap"
source = '''
. |= parse_regex!(.file, r'^{STACKABLE_LOG_DIR}/(?P<container>.*?)/(?P<file>.*?)$')

inputs = ["parsed_logs_jul"]
type = "remap"
source = '''
.message = "Java Logging: " + string!(.message)

Product Controller

How do all these parts fit together? Let’s look at where they are applied in the product controller.

  • The vector aggregator address is retrieved early on in the reconcile function, where a client object is available:

let vector_aggregator_address = resolve_vector_aggregator_address(&cluster, client)
  • It is then passed through to functions where config maps are created at role-group level, and where extend_config_map (or extend_role_group_config_map in the example below) is called:

    &mut cm_builder,
.context(InvalidLoggingConfigSnafu {
    cm_name: rolegroup.object_name(),

This can be done at multiple places, as is the case for the spark-k8s-operator, where config maps are defined for the driver and executor pod-templates, as well as for the spark-submit Job.

  • If shell capture is required, this is done for each relevant container in the role-group StatefulSet. capture_shell_output returns a command that should normally be the first component of a container’s command arguments:

if let Some(ContainerLogConfig {
    choice: Some(ContainerLogConfigChoice::Automatic(log_config)),
}) = merged_config.logging.containers.get(&Container::Connector)