Spark Streaming

Learn about Spark streaming in Data Flow.

Streaming applications require continuous execution for a long period of time that often extends beyond 24 hours, and might be as long as weeks or even months. In case of unexpected failures, streaming applications must restart from the point of failure without producing incorrect computational results. Data Flow relies on Spark structured streaming check-pointing to record the processed offset which can be stored in your Object Storage bucket.

To allow for regular Oracle Cloud Infrastructure maintenance, Data Flow implements a graceful shutdown of the Spark clusters for Spark structured streaming. When maintenance is complete, a new Spark cluster with the updated software is created, and a new run appears in the list. The status of the previous Run indicates that it is stopped for maintenance.

Data Flow provides access to the Spark UI and Spark History Server, which is a suite of web user interfaces (UIs) that you can use to monitor the events, status and resource consumption of your Spark cluster. Importantly it lets you explore logical and physical execution plans. For streaming it provides insights on processing progress, for example, input/output rates, offsets, durations, and statistical distribution. Spark UI provides information about currently running jobs and History Server about finished jobs.

Batch runs allow multiple concurrent runs of the same code with mostly same arguments. But running multiple instances of streaming applications corrupts the checkpoint data, so Data Flow is limited to only one run per streaming application. To avoid any unintentional corruption of the streaming application, you must stop it running before you can edit it. When the edit is complete, you can restart the streaming application. To help you identify batch and streaming applications, there is the Appliation Type, which has the values of Batch or Streaming.

If the run stops in error, Data Flow makes up to 10 attempts to restart it, waiting for three minutes between attempts. If the tenth attempt fails, then no more attempts are made and the run is stopped.

Getting Started with Spark Streaming

Before you can use Spark streaming with Data Flow, you must set it up.

Apache Spark unifies Batch Processing, Stream Processing and Machine Learning in one API. Data Flow runs Spark applications within a standard Apache Spark runtime. When you run a streaming Application, Data Flow does not use a different runtime, instead it runs the Spark application in a different way:
Differences between streaming and non-streaming runs
What is Different Non-Streaming Run Streaming Run
Authentication Uses an On-Behalf-Of (OBO) token of the requesting user. OBO tokens expire after 24 hours, so this is not suitable for long-running jobs. Accesses Oracle Cloud Infrastructure Object Storage using session tokens tied to the Run's Resource Principal. It is suitable for long-running jobs.
Restart Policy Fails if the Spark runtime exit code is non-zero. Restarts up to ten times if the Spark runtime exit code is non-zero.
Patch Policy No patching policy as jobs are expected to last fewer than 24 hours. Automatic monthly patches.
  1. Create a Spark Streaming Application.
    When the application is run, it uses Resource Principal authentication, auto-patching, and auto-restart.
  2. Create Resource Principal IAM policies.
    Because your Spark Streaming Applications uses the Resource Principal session tokens to authenticate to Oracle Cloud Infrastructure resources, you must create IAM policies authorizing your applications before they can access these resources. Data Flow Runs are launched on-demand so you cannot use the Run OCID in your IAM policy, because it is not allocated until the Run starts. Instead, connect the Run's resources to a permanent resource and reference it in your IAM policy. The two most common ways of doing this are:
    Parent Application ID
    Connect the Data Flow Run to the Data Flow Application that created it, and put the Data Flow Application ID in the IAM Policy. If you want to set permissions for a particular Application, create a dynamic group that matches all Runs launched from the Application, and authorize the Dynamic Group to access IAM resources. Each Run includes a tag associating it with its parent Application. You can use this tag in a Dynamic Group matching rule.
    Note

    This tag cannot be used in an IAM "any-user" policy, you must create a Dynamic Group.
    For example, if you have a Data Flow Application with ID of ocid1.dataflowapplication.oc1.iad.A, then you create a dynamic group:
    ALL {resource.type='dataflowrun', tag.oci-dataflow.application-id.value='ocid1.dataflowapplication.oc1.iad.A'}
    with the following policies:
    allow dynamic-group <dynamic_group_name> to manage objects in tenancy where all {
     target.bucket.name='<bucket_name>’
    }
    allow dynamic-group <dynamic_group_name> to {STREAM_INSPECT, STREAM_READ, STREAM_CONSUME} in tenancy where all {
     target.streampool.id='<streampool_id>'
    }
    Target Compartment ID

    Connect the Data Flow Run to the Compartment where Runs are created, and put the Compartment ID in the IAM Policy. This approach is less specific, since any Spark application run in the Compartment gets access to these resources. If you plan to use spark-submit via CLI, you must use this approach since both Application ID and Run ID are on-demand.

    For example, if you have a Run with ID ocid1.dataflowrun.oc1.iad.R2 in a compartment with the ID ocid1.tenancy.oc1.C, then you would have the following policies:
    allow any-user to manage objects in tenancy where all {
     request.principal.type='dataflowrun',
     request.compartment.id='ocid1.tenancy.oc1.C',
     target.bucket.name='<bucket_name>'
    }
    allow any-user to {STREAM_INSPECT, STREAM_READ, STREAM_CONSUME} in tenancy where all {
     request.principal.type='dataflowrun',
     request.compartment.id='ocid1.tenancy.oc1.C',
     target.streampool.id='<streampool_id>'
    }

Connecting to Oracle Cloud Infrastructure Streaming

Learn how to connect to Oracle Cloud Infrastructure Streaming.

Set up streaming:
  • Set up Oracle Streaming Service and create a stream.
    Note

    Oracle Streaming Service has the following limits:
    • Messages in a stream are retained for no less than 24 hours, and no more than seven days.
    • All messages in a stream are deleted after the retention period has expired, whether they have been read or not.
    • The retention period of a steam cannot be changed after the stream has been created.
    • A tenancy has a default limit of zero or five partitions depending on your license. If you require more partitions you can request a service limit increase.
    • The number of partitions for a stream cannot be changed after creation of the stream.
    • A single stream can support up to 50 consumer groups reading from it.
    • Each partition has a total data write of 1 MB per second. There is no limit to the number of PUT requests, provided the data write limit is not exceeded.
    • Each partition has five GET requests per second per consumer group. As a single stream can support up to 50 consumer groups, and a single partition in a stream can be read by only one consumer in a consumer group, a partition can support up to 250 GET requests per second.
    • Producers can publish a message of no more than 1 MB to a stream.
    • A request can be no bigger than 1 MB. A request's size is the sum of its keys and messages after they have been decoded from Base64.
  • Apply Data Flow Resource Principal Policies to any Oracle Cloud Infrastructure resources your streaming applications need to access.
Connect to Kafka either using Java or Python. Authenticate in one of two ways:
  • Use a plain password or auth token. This method is suitable for cross environment quick testing. For example, Spark structured streaming application prototyping, where you want to run locally and on Data Flow against the Oracle Streaming Service.
    Note

    Hardcoding, or exposing, the password in application arguments is not considered secure, so do not use this method for production runs.
  • Resource principal authentication is more secure than plain password or auth token. It is a more flexible way to authenticate with Oracle Streaming Service. Set up streaming policies to use resource principal authentication.

A Java sample application and a Python sample application are available.

  1. Find the stream pool you want to use to connect to Kafka.
    1. Click Home.
    2. Click Streaming.
    3. Click Stream Pools.
    4. Click the stream pool you want to use to see its details.
    5. Click Kafka Connection Settings.
    6. Copy the following information:
      • Stream pool OCID
      • Bootstrap server
      • Connection string
      • Security protocol, for example, SASL_SSL
      • Security mechanism, for example, PLAIN
      Note

      If the password in the connection string is set to AUTH_TOKEN, create an auth token or use an existing one (password="<auth_token>") for the user specified in username (username="<tenancy>/<username>/<stream_pool_id>":
      1. Click Identity.
      2. Click Users.
      3. For your user, display the user details.
      4. Create an auth token, or use an existing one.
  2. Spark does not bind to Kafka integration libraries by default, so you must add it as part of the Spark application dependencies.
    • For Java or Scala applications using SBT or Maven project definitions, link your application with this artifact:
      groupId = org.apache.spark
      artifactId = spark-sql-kafka-0-10_2.12
      version = 3.0.2
      Note

      To use the headers functionality, your Kafka client version must be at least 0.11.0.0.
    • For Python applications, add the Kafka integration libraries and dependencies when deploying your application.
    • If you use Data Flow Resource Principal authentication, you need this artifact:
      groupId = com.oracle.oci.sdk
      artifactId = oci-java-sdk-addons-sasl
      version = 1.36.1
  3. Configure the system.
    How Kafka connections behave is controlled by configuring the system, for example, the servers, authentication, topic, groups and so on. Configuration is simple and powerful with a single value change having a big effect on the whole system.
    Common Configuration
    subscribe = <Kafka_topic_to_consume>
    kafka.max.partition.fetch.bytes = <Fetch_rate_limit>
    startingOffsets = <Start_point_of_the_first_run_of_the_application> 
    failOnDataLoss = <Failure_streaming_application>
    For more information on the fetch rate limit, see Limits on Streaming Resources.
    Note

    Subsequent restarts continue from the last checkpoint, not the place specified in startingOffsets. For other options, see Structured Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher).

    failOnDataLoss specifies the streaming application to use when the data cannot be fetched because it has been removed from Oracle Streaming.

    Advanced Configuration

    See the Spark Streaming Kafka Integration Guide.

    Example Configurations
    Plain password:
    kafka.bootstrap.servers = <bootstrap_server_name>:<port_number>
    kafka.security.protocol = SASL_SSL
    kafka.sasl.mechanism = PLAIN
    kafka.sasl.jaas.config = org.apache.kafka.common.security.plain.PlainLoginModule required username="<tenancy_name>/<username>/<streampool_ocid>" password="<password>";
    Resource principal:
    kafka.bootstrap.servers = <bootstrap_server_name>:<port_number>
    kafka.security.protocol = SASL_SSL
    kafka.sasl.mechanism = OCI-RSA-SHA256
    kafka.sasl.jaas.config = com.oracle.bmc.auth.sasl.ResourcePrincipalsLoginModule required intent="streamPoolId:<streampool_ocid>";
  4. Connect to Kafka.
    Example connections.
    Java with resource principal for Oracle Cloud Infrastructure streaming
    // Create DataFrame representing the stream of input lines from Kafka
    Dataset<Row> lines = spark
        .readStream()
        .format("kafka")
        .option("kafka.bootstrap.servers", bootstrapServers)
        .option("subscribe", topics)
        .option("kafka.security.protocol", "SASL_SSL")
        .option("kafka.sasl.mechanism", "OCI-RSA-SHA256")
        .option("kafka.sasl.jaas.config", "com.oracle.bmc.auth.sasl.ResourcePrincipalsLoginModule required intent=\"streamPoolId:<streampool_ocid>\";")
        .option("kafka.max.partition.fetch.bytes", 1024 * 1024) // limit request size to 1MB per partition
        .option("startingOffsets", "latest")
    Java with a plain password
    // Create DataFrame representing the stream of input lines from Kafka
    Dataset<Row> lines = spark
        .readStream()
        .format("kafka")
        .option("kafka.bootstrap.servers", bootstrapServers)
        .option("subscribe", topics)
        .option("kafka.security.protocol", "SASL_SSL")
        .option("kafka.sasl.mechanism", "PLAIN")
        .option("kafka.sasl.jaas.config",
            "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<tenancy_name>/<username>/<streampool_ocid>" password=\"<password> \";")
        .option("kafka.max.partition.fetch.bytes", 1024 * 1024) // limit request size to 1MB per partition
        .option("startingOffsets", "latest")
        .load()
    Python
    spark = (
        SparkSession.builder.config("failOnDataLoss", "false")
        .appName("kafka_streaming_aggregate")
        .getOrCreate()
    )
    spark.sparkContext.setLogLevel("WARN")
     
    # Configure settings we need to read Kafka.
    if args.ocid is not None:
        jaas_template = 'com.oracle.bmc.auth.sasl.ResourcePrincipalsLoginModule required intent="streamPoolId:{ocid}";'
        args.auth_type = "OCI-RSA-SHA256"
    else:
        jaas_template = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="{username}" password="{password}";'
     
    # For the raw source stream.
    raw_kafka_options = {
        "kafka.sasl.jaas.config": jaas_template.format(
            username=args.stream_username, password=args.stream_password, ocid=args.ocid
        ),
        "kafka.sasl.mechanism": args.auth_type,
        "kafka.security.protocol": args.encryption,
        "kafka.bootstrap.servers": "{}:{}".format(
            args.bootstrap_server, args.bootstrap_port
        ),
        "group.id": args.raw_stream,
        "subscribe": args.raw_stream,
    }
     
    # The actual reader.
    raw = spark.readStream.format("kafka").options(**raw_kafka_options).load()
    Note

    To use Python with resource principal for Oracle Cloud Infrastructure streaming, you must use archive.zip. More information is available in the section on Spark-Submit Functionality in Data Flow.
Sample Java Application

This is a sample Java application for Data Flow.

package example;

import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
import org.apache.spark.sql.catalyst.encoders.RowEncoder;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.Trigger;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StringType$;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.StructType$;
import org.apache.spark.sql.types.TimestampType$;

import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.concurrent.TimeoutException;

public class StructuredKafkaWordCount {

  public static void main(String[] args) throws Exception {

    Logger log = LogManager.getLogger(StructuredKafkaWordCount.class);
    log.info("Started StructuredKafkaWordCount");

    Thread.setDefaultUncaughtExceptionHandler((thread, e) -> {
      log.error("Exception uncaught: ", e);
    });

    //Uncomment following line to enable debug log level.
    //Logger.getRootLogger().setLevel(Level.DEBUG);

    if (args.length < 4) {
      printUsage();
    }

    String bootstrapServers = args[0];
    String topics = args[1];
    String checkpointLocation = args[2];
    String type = args[3];
    String outputLocation = null;

    switch (type) {
      case "console":
        System.err.println("Using console output sink");
        break;

      case "csv":
        if (args.length < 5) {
          printUsage();
        }
        outputLocation = args[4];
        System.err.println("Using csv output sink, output location = " + outputLocation);
        break;

      default:
        printUsage();
    }

    SparkSession spark;

    SparkConf conf = new SparkConf();
    if (conf.contains("spark.master")) {
      spark = SparkSession.builder()
          .appName("StructuredKafkaWordCount")
          .config("spark.sql.streaming.minBatchesToRetain", "10")
          .config("spark.sql.shuffle.partitions", "1")
          .config("spark.sql.streaming.stateStore.maintenanceInterval", "300")
          .getOrCreate();
    } else {
      spark = SparkSession.builder()
          .appName("StructuredKafkaWordCount")
          .master("local[*]")
          .config("spark.sql.streaming.minBatchesToRetain", "10")
          .config("spark.sql.shuffle.partitions", "1")
          .config("spark.sql.streaming.stateStore.maintenanceInterval", "300")
          .getOrCreate();
    }

    // Create DataFrame representing the stream of input lines from Kafka
    Dataset<Row> lines = spark
        .readStream()
        .format("kafka")
        .option("kafka.bootstrap.servers", bootstrapServers)
        .option("subscribe", topics)
        .option("kafka.security.protocol", "SASL_SSL")
        .option("kafka.sasl.mechanism", "OCI-RSA-SHA256")
        .option("kafka.sasl.jaas.config",
            "com.oracle.bmc.auth.sasl.ResourcePrincipalsLoginModule required intent=\"streamPoolId:ocid1.streampool.oc1.phx.amaaaaaaep4fdfaartcuoi5y72mkrcg7hzogcx2jnygbmgik3hqwssxqa6pq\";")
        .option("kafka.max.partition.fetch.bytes", 1024 * 1024) // limit request size to 1MB per partition
        .option("startingOffsets", "latest")
        .load()
        .selectExpr("CAST(value AS STRING)");

    // Split the lines into timestamp and words
    StructType wordsSchema = StructType$.MODULE$.apply(
        new StructField[]{
            StructField.apply("timestamp", TimestampType$.MODULE$, true, Metadata.empty()),
            StructField.apply("value", StringType$.MODULE$, true, Metadata.empty())
        }
    );
    ExpressionEncoder<Row> encoder = RowEncoder.apply(wordsSchema);
    final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss");

    Dataset<Row> words = lines
        .flatMap(
            (FlatMapFunction<Row, Row>) row -> {
              // parse Kafka record in format: "timestamp(iso8601) text"
              String text = row.getString(0);
              String timestampString = text.substring(0, 25);
              String message = text.substring(26);
              Timestamp timestamp = new Timestamp(dateFormat.parse(timestampString).getTime());

              return Arrays.asList(message.split(" ")).stream()
                  .map(word -> RowFactory.create(timestamp, word)).iterator();
            }
            , encoder);

    // Time window aggregation
    Dataset<Row> wordCounts = words
        .withWatermark("timestamp", "1 minutes")
        .groupBy(
            functions.window(functions.col("timestamp"), "1 minutes", "1 minutes"),
            functions.col("value")
        )
        .count()
        .selectExpr("CAST(window.start AS timestamp) AS START_TIME",
            "CAST(window.end AS timestamp) AS END_TIME",
            "value AS WORD", "CAST(count AS long) AS COUNT");

    wordCounts.printSchema();

    // Reducing to a single partition
    wordCounts = wordCounts.coalesce(1);

    // Start streaming query
    StreamingQuery query = null;
    switch (type) {
      case "console":
        query = outputToConsole(wordCounts, checkpointLocation);
        break;
      case "csv":
        query = outputToCsv(wordCounts, checkpointLocation, outputLocation);
        break;
      default:
        System.err.println("Unknown type " + type);
        System.exit(1);
    }

    query.awaitTermination();
  }

  private static void printUsage() {
    System.err.println("Usage: StructuredKafkaWordCount <bootstrap-servers> " +
        "<subscribe-topics> <checkpoint-location> <type> ...");
    System.err.println("<type>: console");
    System.err.println("<type>: csv <output-location>");
    System.err.println("<type>: adw <wallet-path> <wallet-password> <tns-name>");
    System.exit(1);
  }

  private static StreamingQuery outputToConsole(Dataset<Row> wordCounts, String checkpointLocation)
      throws TimeoutException {
    return wordCounts
        .writeStream()
        .format("console")
        .outputMode("complete")
        .option("checkpointLocation", checkpointLocation)
        .start();
  }

  private static StreamingQuery outputToCsv(Dataset<Row> wordCounts, String checkpointLocation,
      String outputLocation) throws TimeoutException {
    return wordCounts
        .writeStream()
        .format("csv")
        .outputMode("append")
        .option("checkpointLocation", checkpointLocation)
        .trigger(Trigger.ProcessingTime("1 minutes"))
        .option("path", outputLocation)
        .start();
  }
}
Sample Python Application

This is a sample Python application for Data Flow.

#!/usr/bin/env python3
 
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.functions import concat, col, current_timestamp, lit, window, \
  substring, to_timestamp, explode, split, length
 
import argparse
import os
 
 
def main():
  parser = argparse.ArgumentParser()
  parser.add_argument('--auth-type', default='PLAIN')
  parser.add_argument('--bootstrap-port', default='9092')
  parser.add_argument('--bootstrap-server')
  parser.add_argument('--checkpoint-location')
  parser.add_argument('--encryption', default='SASL_SSL')
  parser.add_argument('--ocid')
  parser.add_argument('--output-location')
  parser.add_argument('--output-mode', default='file')
  parser.add_argument('--stream-password')
  parser.add_argument('--raw-stream')
  parser.add_argument('--stream-username')
  args = parser.parse_args()
 
  if args.bootstrap_server is None:
    args.bootstrap_server = os.environ.get('BOOTSTRAP_SERVER')
  if args.raw_stream is None:
    args.raw_stream = os.environ.get('RAW_STREAM')
  if args.stream_username is None:
    args.stream_username = os.environ.get('STREAM_USERNAME')
  if args.stream_password is None:
    args.stream_password = os.environ.get('STREAM_PASSWORD')
 
  assert args.bootstrap_server is not None, "Kafka bootstrap server (--bootstrap-server) name must be set"
  assert args.checkpoint_location is not None, "Checkpoint location (--checkpoint-location) must be set"
  assert args.output_location is not None, "Output location (--output-location) must be set"
  assert args.raw_stream is not None, "Kafka topic (--raw-stream) name must be set"
 
  spark = (
    SparkSession.builder
      .appName('StructuredKafkaWordCount')
      .config('failOnDataLoss', 'true')
      .config('spark.sql.streaming.minBatchesToRetain', '10')
      .config('spark.sql.shuffle.partitions', '1')
      .config('spark.sql.streaming.stateStore.maintenanceInterval', '300')
      .getOrCreate()
  )
 
  # Uncomment following line to enable debug log level.
  # spark.sparkContext.setLogLevel('DEBUG')
 
  # Configure Kafka connection settings.
  if args.ocid is not None:
    jaas_template = 'com.oracle.bmc.auth.sasl.ResourcePrincipalsLoginModule required intent="streamPoolId:{ocid}";'
    args.auth_type = 'OCI-RSA-SHA256'
  else:
    jaas_template = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="{username}" password="{password}";'
 
  raw_kafka_options = {
    'kafka.sasl.jaas.config': jaas_template.format(
      username=args.stream_username, password=args.stream_password,
      ocid=args.ocid
    ),
    'kafka.sasl.mechanism': args.auth_type,
    'kafka.security.protocol': args.encryption,
    'kafka.bootstrap.servers': '{}:{}'.format(args.bootstrap_server,
                                              args.bootstrap_port),
    'subscribe': args.raw_stream,
    'kafka.max.partition.fetch.bytes': 1024 * 1024,
    'startingOffsets': 'latest'
  }
 
  # Reading raw Kafka stream.
  raw = spark.readStream.format('kafka').options(**raw_kafka_options).load()
 
  # Cast raw lines to a string.
  lines = raw.selectExpr('CAST(value AS STRING)')
 
  # Split value column into timestamp and words columns.
  parsedLines = (
    lines.select(
      to_timestamp(substring('value', 1, 25))
        .alias('timestamp'),
      lines.value.substr(lit(26), length('value') - 25)
        .alias('words'))
  )
 
  # Split words into array and explode single record into multiple.
  words = (
    parsedLines.select(
      col('timestamp'),
      explode(split('words', ' ')).alias('word')
    )
  )
 
  # Do time window aggregation
  wordCounts = (
    words
      .withWatermark('timestamp', '1 minutes')
      .groupBy('word', window('timestamp', '1 minute'))
      .count()
      .selectExpr('CAST(window.start AS timestamp) AS START_TIME',
                  'CAST(window.end AS timestamp) AS END_TIME',
                  'word AS WORD',
                  'CAST(count AS long) AS COUNT')
  )
 
  # Reduce partitions to a single.
  wordCounts = wordCounts.coalesce(1)
 
  wordCounts.printSchema()
 
  # Output it to the chosen channel.
  if args.output_mode == 'console':
    print("Writing aggregates to console")
    query = (
      wordCounts.writeStream
        .option('checkpointLocation', args.checkpoint_location)
        .outputMode('complete')
        .format('console')
        .option('truncate', False)
        .start()
    )
  else:
    print("Writing aggregates to Object Storage")
    query = (
      wordCounts.writeStream
        .format('csv')
        .outputMode('append')
        .trigger(processingTime='1 minutes')
        .option('checkpointLocation', args.checkpoint_location)
        .option('path', args.output_location)
        .start()
    )
 
  query.awaitTermination()
 
 
main()

Best Practices for Building Java Spark Streaming Applications

Tips and best practices for configuring Java build projects in Data Flow.

Maven is used for the following examples to show the various configuration possibilities.

Use a consistent version of Spark across your libraries and Data Flow
The easiest way to do this is define a common variable and reuse it. In the following example, the common variable is called spark.version.
<properties>
 ...
 <spark.version>3.0.2</spark.version>
 ...
</properties>
<dependencies>
 <dependency>
     <groupId>org.apache.spark</groupId>
     <artifactId>spark-core_2.12</artifactId>
     <version>${spark.version}</version>
     <scope>provided</scope>
 </dependency>
Note

Make sure the Scala version in artifactId is suitable for the version of Spark. In the example, artifactId is set to spark-core_2.12 for Scala 2.12.
Include Spark binaries that are already part of the standard Spark bundle with the scope of provided, so as to avoid any code duplication.
Expanding the previous example:
<properties>
 ...
 <spark.version>3.0.2</spark.version>
 ...
</properties>
<dependencies>
 <dependency>
     <groupId>org.apache.spark</groupId>
     <artifactId>spark-core_2.12</artifactId>
     <version>${spark.version}</version>
     <scope>provided</scope>
 </dependency>
 <dependency>
     <groupId>org.apache.spark</groupId>
     <artifactId>spark-sql_2.12</artifactId>
     <version>${spark.version}</version>
     <scope>provided</scope>
 </dependency>
...
Note

More information on dependency scope can be found in the Maven documentation.
Package Spark binaries that are not part of the standard bundle with your Spark application.
This example includes spark-sql-kafka-0-10_2.12 in artifactId.
<properties>
 ...
 <spark.version>3.0.2</spark.version>
 ...
</properties>
<dependencies>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql-kafka-0-10_2.12</artifactId>
    <version>${spark.version}</version>
</dependency>
...
The Oracle Cloud Infrastructure SDK might be compiled against a different version of third-party libraries, which might result in run-time failures. To avoid such failures, package the Oracle Cloud Infrastructure SDK and third-party libraries with your Spark application and relocate any newer third-party libraries into a shaded namespace.
For example:
<dependencies>
    <dependency>
        <groupId>com.oracle.oci.sdk</groupId>
        <artifactId>
</artifactId>
        <optional>false</optional>
        <version>1.36.1</version>
    </dependency>
    <dependency>
        <groupId>com.google.protobuf</groupId>
        <artifactId>protobuf-java</artifactId>
        <version>3.11.1</version>
    </dependency>
</dependencies>
...
<build>
 <plugins>
 ...
<plugin>
   <groupId>org.apache.maven.plugins</groupId>
   <artifactId>maven-shade-plugin</artifactId>
   <version>3.2.4</version>
   <configuration>
       <!-- The final uber jar file name will not have a version component. -->
       <finalName>${project.artifactId}</finalName>
       <createDependencyReducedPom>false</createDependencyReducedPom>
       <transformers>
           <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
           <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"/>
       </transformers>
       <relocations>
           <relocation>
               <pattern>com.google.</pattern>
               <shadedPattern>com.shaded.google.</shadedPattern>
           </relocation>
           <relocation>
               <pattern>com.oracle.bmc.</pattern>
               <shadedPattern>com.shaded.oracle.bmc.</shadedPattern>
           </relocation>
       </relocations>
       <!-- exclude signed Manifests -->
       <filters>
           <filter>
               <artifact>*:*</artifact>
               <excludes>
                   <exclude>META-INF/*.SF</exclude>
                   <exclude>META-INF/*.DSA</exclude>
                   <exclude>META-INF/*.RSA</exclude>
               </excludes>
           </filter>
       </filters>
   </configuration>
   <executions>
       <execution>
           <phase>package</phase>
           <goals>
               <goal>shade</goal>
           </goals>
       </execution>
   </executions>
</plugin>
...
Sample POM.xml File for Java Spark Streaming Application

A sample POM.xml for use when building a Java-based Spark streaming application for use with Data Flow.

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.example</groupId>
    <artifactId>StructuredKafkaWordCount</artifactId>
    <version>1.0.0-SNAPSHOT</version>
 
    <properties>
      <spark.version>3.2.1</spark.version>
      <maven.compiler.source>1.8</maven.compiler.source>
      <maven.compiler.target>1.8</maven.compiler.target>
    </properties>
 
    <dependencies>
        <dependency>
            <groupId>com.oracle.oci.sdk</groupId>
            <artifactId>oci-java-sdk-addons-sasl</artifactId>
            <optional>false</optional>
            <version>1.36.1</version>
        </dependency>
        <dependency>
            <groupId>com.google.protobuf</groupId>
            <artifactId>protobuf-java</artifactId>
            <version>3.11.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.12</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql-kafka-0-10_2.12/3.2.1</artifactId>
            <version>${spark.version}</version>
        </dependency>
    </dependencies>
 
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <skipAssembly>false</skipAssembly>
                </configuration>
            </plugin>
 
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.2.4</version>
                <configuration>
                    <!-- The final uber jar file name will not have a version component. -->
                    <finalName>${project.artifactId}</finalName>
                    <createDependencyReducedPom>false</createDependencyReducedPom>
                    <transformers>
                        <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                        <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"/>
                    </transformers>
                    <relocations>
                        <relocation>
                            <pattern>com.google.</pattern>
                            <shadedPattern>com.shaded.google.</shadedPattern>
                        </relocation>
                        <relocation>
                            <pattern>com.oracle.bmc.</pattern>
                            <shadedPattern>com.shaded.oracle.bmc.</shadedPattern>
                        </relocation>
                    </relocations>
                    <!-- exclude signed Manifests -->
                    <filters>
                        <filter>
                            <artifact>*:*</artifact>
                            <excludes>
                                <exclude>META-INF/*.SF</exclude>
                                <exclude>META-INF/*.DSA</exclude>
                                <exclude>META-INF/*.RSA</exclude>
                            </excludes>
                        </filter>
                    </filters>
                </configuration>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
 
        </plugins>
    </build>
</project>

Best Practices for Building Python Spark Streaming Applications

Tips and best practices for configuring Python build projects in Data Flow.

Python Spark applications are more complex than Java applications as the dependencies are required at the same time for both JVM and Python runtimes. The runtimes each have their own project and package management systems. To assist with packaging dependencies from different runtime, Data Flow has a dependency packager tool. The tool packages all dependencies into a single archive that needs to be uploaded to Object Storage. Data Flow provides dependencies from that archive to the Spark application.

The archive ensures availability when stored in Oracle Cloud Infrastructure Object Storage, the same reproducibility (antifactory is dynamic and so can potentially produce a different dependency tree) and it stops downloading the same dependencies from external sources.

More information on how to set up and use the dependency manager is available in the section on Spark-Submit Functionality in Data Flow.

When building archive.zip for your application list the required Java libraries in packages.txt and the dependency packager will package them together with their dependencies.
For example, to include spark-sql-kafka-0-10_2.12, add it in packages.txt:
org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.1
Run this command:
docker run --pull always --rm -v $(pwd):/opt/dataflow -it phx.ocir.io/oracle/dataflow/dependency-packager:latest
Resulting in an archive.zip file:
  adding: java/ (stored 0%)
  adding: java/org.lz4_lz4-java-1.7.1.jar (deflated 3%)
  adding: java/org.slf4j_slf4j-api-1.7.30.jar (deflated 11%)
  adding: java/org.xerial.snappy_snappy-java-1.1.8.2.jar (deflated 1%)
  adding: java/org.apache.spark_spark-token-provider-kafka-0-10_2.12-3.0.2.jar (deflated 8%)
  adding: java/org.apache.spark_spark-sql-kafka-0-10_2.12-3.0.2.jar (deflated 5%)
  adding: java/com.github.luben_zstd-jni-1.4.4-3.jar (deflated 1%)
  adding: java/org.spark-project.spark_unused-1.0.0.jar (deflated 42%)
  adding: java/org.apache.kafka_kafka-clients-2.4.1.jar (deflated 8%)
  adding: java/org.apache.commons_commons-pool2-2.6.2.jar (deflated 9%)
  adding: version.txt (deflated 59%)
archive.zip is generated!
It might be necessary to shade some of the Java libararies.
If using Spark 2.4.4 or Spark 3.0.2, you might have to shade your libraries. Create a separate Maven project to build a fat JAR that contains all the Java dependencies and other tweaks like shading in one place. Include it as custom JAR using the dependency packager. For example, using oci-java-sdk-addons-sasl, as the Oracle Cloud Infrastructure SDK is compiled against later versions of some third-party librararies, and so runtime failures might occur.

An example Maven project:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
 
  <groupId>com.example</groupId>
  <artifactId>SaslFat</artifactId>
  <version>1.0-SNAPSHOT</version>
 
  <properties>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
    <spark.version>3.0.2</spark.version>
  </properties>
 
  <dependencies>
    <dependency>
      <groupId>com.oracle.oci.sdk</groupId>
      <artifactId>oci-java-sdk-addons-sasl</artifactId>
      <optional>false</optional>
      <version>1.36.1</version>
    </dependency>
    <dependency>
      <groupId>com.google.protobuf</groupId>
      <artifactId>protobuf-java</artifactId>
      <version>3.11.1</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.12</artifactId>
      <version>${spark.version}</version>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.12</artifactId>
      <version>${spark.version}</version>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql-kafka-0-10_2.12</artifactId>
      <version>${spark.version}</version>
    </dependency>
  </dependencies>
 
  <build>
    <plugins>
 
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>3.2.4</version>
        <configuration>
          <createDependencyReducedPom>false</createDependencyReducedPom>
          <transformers>
            <transformer
              implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
            <transformer
              implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"/>
          </transformers>
          <relocations>
            <relocation>
              <pattern>com.google.</pattern>
              <shadedPattern>com.shaded.google.</shadedPattern>
            </relocation>
            <relocation>
              <pattern>com.oracle.bmc.</pattern>
              <shadedPattern>com.shaded.oracle.bmc.</shadedPattern>
              <excludes>
                <exclude>com.oracle.bmc.auth.sasl.*</exclude>
              </excludes>
            </relocation>
          </relocations>
          <!-- exclude signed Manifests -->
          <filters>
            <filter>
              <artifact>*:*</artifact>
              <excludes>
                <exclude>META-INF/*.SF</exclude>
                <exclude>META-INF/*.DSA</exclude>
                <exclude>META-INF/*.RSA</exclude>
              </excludes>
            </filter>
          </filters>
          <artifactSet>
            <excludes>
              <exclude>${project.groupId}:${project.artifactId}</exclude>
            </excludes>
          </artifactSet>
        </configuration>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
 
    </plugins>
  </build>
</project>
Place SaslFat-1.0-SNAPSHOT.jar in the working directory of the dependedncy packager and execute the command:
docker run --pull always --rm -v $(pwd):/opt/dataflow -it phx.ocir.io/oracle/dataflow/dependency-packager:latest
SaslFat-1.0-SNAPSHOT.jar is packaged into archive.zip as a Java dependency:
  adding: java/ (stored 0%)
  adding: java/SaslFat-1.0-SNAPSHOT.jar (deflated 8%)
  adding: version.txt (deflated 59%)
archive.zip is generated!
Alternatively, you can manually create such an archive.zip that contains the java folder with SaslFat-1.0-SNAPSHOT.jar in it.

Planned Maintenance

Data Flow is regularly maintained in the last week of every month.

To provide the best and most secure environment for Spark applications, Data Flow has regular maintenance in the last week of every month (adjusted for public holidays). Customers are notified two weeks in advance about the upcoming infrastructure maintenance schedule. The service automatically stops in-progress streaming runs and stars a new streaming run on the updated compute resources.

Data Flow relies on Spark structured streaming check-pointing to record the processed offset which can be stored in your Object Storage bucket. When the new compute resources are created, the streaming application resumes from the previous checkpoint.