Erste Schritte mit Spark Streaming

Bevor Sie Spark-Streaming mit Data Flow verwenden können, müssen Sie es einrichten.

Apache Spark vereint Batchverarbeitung, Streamverarbeitung und maschinelles Lernen in einer API. Data Flow führt Spark-Anwendungen in einer Apache Spark-Standardlaufzeit aus. Wenn Sie eine Streaminganwendung ausführen, verwendet Data Flow keine andere Laufzeit, sondern führt die Spark-Anwendung anders aus:
Unterschiede zwischen Streaming- und Nicht-Streamingausführungen
Unterschied Nicht-Streamingausführung Streamingausführung
Authentifizierung Verwendet ein On-Behalf-Of-(OBO-)Token des anfordernden Benutzers. OBO-Token laufen nach 24 Stunden ab, sodass diese nicht für Jobs mit langer Ausführungszeit geeignet sind. Ruft Oracle Cloud Infrastructure Object Storage mit Sessiontoken auf, die mit dem Resource Principal der Ausführung verknüpft sind. Eignet sich für Jobs mit langer Ausführungszeit.
Neustart-Policy Ist nicht erfolgreich, wenn der Exitcode der Spark-Laufzeit ungleich Null ist. Startet bis zu zehn Mal, wenn der Exitcode der Spark-Laufzeit ungleich Null ist.
Patch-Policy Keine Patching-Policy, da Jobs voraussichtlich weniger als 24 Stunden dauern. Automatische monatliche Patches.
  1. Erstellen Sie eine Spark-Streaminganwendung.
    Beim Ausführen der Anwendung werden die Resource-Principal-Authentifizierung, das automatische Patching und der automatische Neustart verwendet.
  2. Policy für Spark-Streaming einrichten
    Da Ihre Spark-Streaminganwendungen zur Authentifizierung bei Oracle Cloud Infrastructure-Ressourcen die Resource- Principal-Sessiontokens verwenden, müssen Sie IAM-Policys zur Autorisierung Ihrer Anwendungen erstellen, bevor sie auf diese Ressourcen zugreifen können. Datenflussausführungen werden bei Bedarf gestartet, sodass Sie die Ausführungs-OCID in Ihrer IAM-Policy nicht verwenden können, da sie erst nach dem Start der Ausführung zugewiesen wird. Verbinden Sie stattdessen die Ressourcen der Ausführung mit einer permanenten Ressource, und referenzieren Sie sie in Ihrer IAM-Policy. Die beiden häufigsten Vorgehensweisen hierfür sind:
    Übergeordnete Anwendungs-ID
    Melden Sie sich bei der Data Flow-Anwendung an, die sie erstellt hat, und fügen Sie die ID der Data Flow-Anwendung in die IAM-Policy ein. Um Berechtigungen für eine bestimmte Anwendung festzulegen, erstellen Sie eine dynamische Gruppe, die allen von der Anwendung gestarteten Ausführungen entspricht, und autorisieren Sie die dynamische Gruppe für den Zugriff auf IAM-Ressourcen. Jede Ausführung enthält ein Tag, das sie mit der übergeordneten Anwendung verknüpft. Sie können dieses Tag in einer Übereinstimmungsregel für dynamische Gruppen verwenden.
    Hinweis

    Dieses Tag kann nicht in einer IAM-Policy für "any-user" verwendet werden. Sie müssen eine dynamische Gruppe erstellen.
    Beispiel: Wenn Ihre Datenflussanwendung die ID ocid1.dataflowapplication.oc1.iad.A hat, erstellen Sie eine dynamische Gruppe:
    ALL {resource.type='dataflowrun', tag.oci-dataflow.application-id.value='ocid1.dataflowapplication.oc1.iad.A'}
    mit den folgenden Policys:
    allow dynamic-group <dynamic_group_name> to manage objects in tenancy where all {
     target.bucket.name='<bucket_name>'
    }
    allow dynamic-group <dynamic_group_name> to {STREAM_INSPECT, STREAM_READ, STREAM_CONSUME} in tenancy where all {
     target.streampool.id='<streampool_id>'
    }
    Ziel-Compartment-ID

    Melden Sie sich bei der Data Flow-Ausführung beim Compartment an, in dem Ausführungen erstellt werden, und fügen Sie die Compartment-ID in die IAM-Policy ein. Dieser Ansatz ist weniger spezifisch, weil alle im Compartment ausgeführten Spark-Anwendungen Zugriff auf diese Ressourcen erhalten. Wenn Sie spark-submit über die CLI verwenden möchten, müssen Sie diesen Ansatz verwenden, da sowohl die Anwendungs-ID als auch die Ausführungs-ID auf Anforderung verfügbar sind.

    Beispiel: Wenn Sie eine Ausführung mit der ID ocid1.dataflowrun.oc1.iad.R2 in einem Compartment mit der ID ocid1.tenancy.oc1.C verwenden, haben Sie die folgenden Policys:
    allow any-user to manage objects in tenancy where all {
     request.principal.type='dataflowrun',
     request.compartment.id='ocid1.tenancy.oc1.C',
     target.bucket.name='<bucket_name>'
    }
    allow any-user to {STREAM_INSPECT, STREAM_READ, STREAM_CONSUME} in tenancy where all {
     request.principal.type='dataflowrun',
     request.compartment.id='ocid1.tenancy.oc1.C',
     target.streampool.id='<streampool_id>'
    }

Verbindung zu Oracle Cloud Infrastructure Streaming herstellen

Erfahren Sie, wie Sie eine Verbindung zu Oracle Cloud Infrastructure Streaming herstellen.

Richten Sie Streaming ein:
  • Richten Sie Oracle Streaming Service ein, und erstellen Sie einen Stream.
    Hinweis

    Für Oracle Streaming Service gelten folgende Einschränkungen:
    • Nachrichten in einem Stream werden mindestens 24 Stunden und höchstens sieben Tage lang aufbewahrt.
    • Alle Nachrichten in einem Stream werden nach Ablauf des Aufbewahrungszeitraums gelöscht, unabhängig davon, ob sie gelesen wurden oder nicht.
    • Der Aufbewahrungszeitraum eines Streams kann nicht geändert werden, nachdem der Stream erstellt wurde.
    • Ein Mandant hat je nach Lizenz ein Standardlimit von null oder fünf Partitionen. Wenn Sie mehr Partitionen benötigen, können Sie eine Erhöhung des Servicelimits beantragen.
    • Die Anzahl der Partitionen für einen Stream kann nach dem Erstellen des Streams nicht mehr geändert werden.
    • Ein einzelner Stream kann bis zu 50 Consumer-Gruppen unterstützen, die Daten aus dem Stream lesen.
    • Jede Partition verfügt über eine Datenschreibrate von insgesamt 1 MB pro Sekunde. Die Anzahl der PUT-Anforderungen ist unbegrenzt, sofern das Datenschreiblimit nicht überschritten wird.
    • Jede Partition verfügt über fünf GET-Anforderungen pro Sekunde pro Consumer-Gruppe. Da ein einzelner Stream bis zu 50 Consumer-Gruppen unterstützen kann und eine einzelne Partition in einem Stream nur von einem Consumer in einer Consumer-Gruppe gelesen werden kann, kann eine Partition bis zu 250 GET-Anforderungen pro Sekunde unterstützen.
    • Producers können eine Nachricht von maximal 1 MB in einem Stream veröffentlichen.
    • Eine Anforderung darf nicht größer als 1 MB sein. Die Größe einer Anforderung entspricht der Summe der zugehörigen Schlüssel und Nachrichten nach der Decodierung aus Base64.
  • Fügen Sie Streaming-Policys zu Data Flow hinzu.
Stellen Sie mit Java oder Python eine Verbindung zu Kafka her. Es gibt zwei Möglichkeiten zur Authentifizierung:
  • Verwenden Sie ein Klartextkennwort oder ein Authentifizierungstoken. Diese Methode eignet sich für das rasche Testen verschiedener Umgebungen. Beispiel: Prototyping von strukturierten Spark-Streaminganwendungen, in denen Sie Oracle Streaming Service lokal und in Data Flow ausführen können.
    Hinweis

    Die Hartcodierung oder das Anzeigen des Kennworts in Anwendungsargumenten wird nicht als sicher betrachtet. Verwenden Sie diese Methode daher nicht für Produktionsausführungen.
  • Die Resource-Principal-Authentifizierung ist sicherer als ein Klartextkennwort oder ein Authentifizierungstoken. Die Authentifizierung mit Oracle Streaming Service ist flexibler. Richten Sie Streaming-Policys ein, um die Resource-Principal-Authentifizierung zu verwenden.

Eine Java-Beispielanwendung und eine Python-Beispielanwendung sind verfügbar.

  1. Suchen Sie den Streampool, den Sie zum Herstellen einer Verbindung zu Kafka verwenden möchten.
    1. Wählen Sie Home aus.
    2. Wählen Sie Streaming aus.
    3. Wählen Sie Streampools aus.
    4. Wählen Sie den gewünschten Streampool aus, um die zugehörigen Details anzuzeigen.
    5. Wählen Sie Kafka-Verbindungseinstellungen aus.
    6. Kopieren Sie die folgenden Informationen:
      • Streampool-OCID
      • Bootstrap-Server
      • Verbindungszeichenfolge
      • Sicherheitsprotokoll. Beispiel: SASL_SSL
      • Sicherheitsmechanismus. Beispiel: PLAIN
      Hinweis

      Wenn das Kennwort in der Verbindungszeichenfolge auf AUTH_TOKEN gesetzt ist, erstellen Sie ein Authentifizierungstoken oder verwenden Sie ein vorhandenes Token (password="<auth_token>") für den im Benutzernamen angegebenen Benutzer (username="<tenancy>/<username>/<stream_pool_id>":
      1. Wählen Sie Identität aus.
      2. Wählen Sie Benutzer.
      3. Zeigen Sie für Ihren Benutzer die Benutzerdetails an.
      4. Erstellen Sie ein Authentifizierungstoken, oder verwenden Sie ein vorhandenes.
  2. Spark ist standardmäßig nicht an Kafka-Integrations-Librarys gebunden. Sie müssen es daher als Teil der Spark-Anwendungsabhängigkeiten hinzufügen.
    • Verknüpfen Sie Ihre Anwendung für Java- oder Scala-Anwendungen mit SBT- oder Maven-Projektdefinitionen mit diesem Artefakt:
      groupId = org.apache.spark
      artifactId = spark-sql-kafka-0-10_2.12
      version = 3.0.2
      Hinweis

      Um die Headerfunktionalität verwenden zu können, müssen Sie mindestens die Kafka-Clientversion 0.11.0.0 verwenden.
    • Fügen Sie für Python-Anwendungen beim Deployment der Anwendung die Kafka-Integrations-Librarys und -abhängigkeiten hinzu.
    • Wenn Sie die Resource- Principal-Authentifizierung von Data Flow verwenden, benötigen Sie das folgende Artefakt:
      groupId = com.oracle.oci.sdk
      artifactId = oci-java-sdk-addons-sasl
      version = 1.36.1
  3. Konfigurieren Sie das System.
    Das Verhalten von Kafka-Verbindungen wird von der Systemkonfiguration gesteuert, z.B. Server, Authentifizierung, Thema, Gruppen usw. Die Konfiguration ist leistungsstark, wobei eine einzige Wertänderung große Auswirkungen auf das gesamte System hat.
    Allgemeine Konfiguration
    subscribe = <Kafka_topic_to_consume>
    kafka.max.partition.fetch.bytes = <Fetch_rate_limit>
    startingOffsets = <Start_point_of_the_first_run_of_the_application> 
    failOnDataLoss = <Failure_streaming_application>
    Weitere Informationen zum Abrufratenlimit finden Sie unter Limits für Streamingressourcen.
    Hinweis

    Spätere Neustarts werden ab dem letzten Checkpoint fortgesetzt, nicht von der Stelle, die in startingOffsets angegeben ist. Weitere Optionen finden Sie im Dokumentation für die Integration von strukturiertem Streaming und Kafka (Kafka-Brokerversion 0.10.0 oder höher).

    failOnDataLoss gibt die zu verwendende Streaminganwendung an, wenn die Daten nicht abgerufen werden können, weil sie aus Oracle Streaming entfernt wurden.

    Erweiterte Konfiguration

    Weitere Informationen finden Sie in der Dokumentation für die Integration von Spark Streaming und Kafka.

    Beispielkonfigurationen
    Klartextkennwort:
    kafka.bootstrap.servers = <bootstrap_server_name>:<port_number>
    kafka.security.protocol = SASL_SSL
    kafka.sasl.mechanism = PLAIN
    kafka.sasl.jaas.config = org.apache.kafka.common.security.plain.PlainLoginModule required username="<tenancy_name>/<username>/<streampool_ocid>" password="<example-password>";
    Resource Principal:
    kafka.bootstrap.servers = <bootstrap_server_name>:<port_number>
    kafka.security.protocol = SASL_SSL
    kafka.sasl.mechanism = OCI-RSA-SHA256
    kafka.sasl.jaas.config = com.oracle.bmc.auth.sasl.ResourcePrincipalsLoginModule required intent="streamPoolId:<streampool_ocid>";
  4. Stellen Sie eine Verbindung zu Kafka her.
    Beispielverbindungen.
    Java mit Resource Principal für Oracle Cloud Infrastructure-Streaming
    // Create DataFrame representing the stream of input lines from Kafka
    Dataset<Row> lines = spark
        .readStream()
        .format("kafka")
        .option("kafka.bootstrap.servers", bootstrapServers)
        .option("subscribe", topics)
        .option("kafka.security.protocol", "SASL_SSL")
        .option("kafka.sasl.mechanism", "OCI-RSA-SHA256")
        .option("kafka.sasl.jaas.config", "com.oracle.bmc.auth.sasl.ResourcePrincipalsLoginModule required intent=\"streamPoolId:<streampool_ocid>\";")
        .option("kafka.max.partition.fetch.bytes", 1024 * 1024) // limit request size to 1MB per partition
        .option("startingOffsets", "latest")
    Java mit einem Klartextkennwort
    // Create DataFrame representing the stream of input lines from Kafka
    Dataset<Row> lines = spark
        .readStream()
        .format("kafka")
        .option("kafka.bootstrap.servers", bootstrapServers)
        .option("subscribe", topics)
        .option("kafka.security.protocol", "SASL_SSL")
        .option("kafka.sasl.mechanism", "PLAIN")
        .option("kafka.sasl.jaas.config",
            "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<tenancy_name>/<username>/<streampool_ocid>" password=\"<example-password> \";")
        .option("kafka.max.partition.fetch.bytes", 1024 * 1024) // limit request size to 1MB per partition
        .option("startingOffsets", "latest")
        .load()
    Python
    spark = (
        SparkSession.builder.config("failOnDataLoss", "false")
        .appName("kafka_streaming_aggregate")
        .getOrCreate()
    )
    spark.sparkContext.setLogLevel("WARN")
     
    # Configure settings we need to read Kafka.
    if args.ocid is not None:
        jaas_template = 'com.oracle.bmc.auth.sasl.ResourcePrincipalsLoginModule required intent="streamPoolId:{ocid}";'
        args.auth_type = "OCI-RSA-SHA256"
    else:
        jaas_template = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="{username}" password="{password}";'
     
    # For the raw source stream.
    raw_kafka_options = {
        "kafka.sasl.jaas.config": jaas_template.format(
            username=args.stream_username, password=args.stream_password, ocid=args.ocid
        ),
        "kafka.sasl.mechanism": args.auth_type,
        "kafka.security.protocol": args.encryption,
        "kafka.bootstrap.servers": "{}:{}".format(
            args.bootstrap_server, args.bootstrap_port
        ),
        "group.id": args.raw_stream,
        "subscribe": args.raw_stream,
    }
     
    # The actual reader.
    raw = spark.readStream.format("kafka").options(**raw_kafka_options).load()
    Hinweis

    Um Python mit Resource Principal für das Oracle Cloud Infrastructure-Streaming zu verwenden, müssen Sie archive.zip verwenden. Weitere Informationen finden Sie im Abschnitt zur Spark-Submit-Funktionalität in Data Flow.
Java-Beispielanwendung

Dies ist eine Java-Beispielanwendung für Data Flow.

package example;

import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
import org.apache.spark.sql.catalyst.encoders.RowEncoder;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.Trigger;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StringType$;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.StructType$;
import org.apache.spark.sql.types.TimestampType$;

import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.concurrent.TimeoutException;

public class StructuredKafkaWordCount {

  public static void main(String[] args) throws Exception {

    Logger log = LogManager.getLogger(StructuredKafkaWordCount.class);
    log.info("Started StructuredKafkaWordCount");

    Thread.setDefaultUncaughtExceptionHandler((thread, e) -> {
      log.error("Exception uncaught: ", e);
    });

    //Uncomment following line to enable debug log level.
    //Logger.getRootLogger().setLevel(Level.DEBUG);

    if (args.length < 4) {
      printUsage();
    }

    String bootstrapServers = args[0];
    String topics = args[1];
    String checkpointLocation = args[2];
    String type = args[3];
    String outputLocation = null;

    switch (type) {
      case "console":
        System.err.println("Using console output sink");
        break;

      case "csv":
        if (args.length < 5) {
          printUsage();
        }
        outputLocation = args[4];
        System.err.println("Using csv output sink, output location = " + outputLocation);
        break;

      default:
        printUsage();
    }

    SparkSession spark;

    SparkConf conf = new SparkConf();
    if (conf.contains("spark.master")) {
      spark = SparkSession.builder()
          .appName("StructuredKafkaWordCount")
          .config("spark.sql.streaming.minBatchesToRetain", "10")
          .config("spark.sql.shuffle.partitions", "1")
          .config("spark.sql.streaming.stateStore.maintenanceInterval", "300")
          .getOrCreate();
    } else {
      spark = SparkSession.builder()
          .appName("StructuredKafkaWordCount")
          .master("local[*]")
          .config("spark.sql.streaming.minBatchesToRetain", "10")
          .config("spark.sql.shuffle.partitions", "1")
          .config("spark.sql.streaming.stateStore.maintenanceInterval", "300")
          .getOrCreate();
    }

    // Create DataFrame representing the stream of input lines from Kafka
    Dataset<Row> lines = spark
        .readStream()
        .format("kafka")
        .option("kafka.bootstrap.servers", bootstrapServers)
        .option("subscribe", topics)
        .option("kafka.security.protocol", "SASL_SSL")
        .option("kafka.sasl.mechanism", "OCI-RSA-SHA256")
        .option("kafka.sasl.jaas.config",
            "com.oracle.bmc.auth.sasl.ResourcePrincipalsLoginModule required intent=\"streamPoolId:ocid1.streampool.oc1.phx.amaaaaaaep4fdfaartcuoi5y72mkrcg7hzogcx2jnygbmgik3hqwssxqa6pq\";")
        .option("kafka.max.partition.fetch.bytes", 1024 * 1024) // limit request size to 1MB per partition
        .option("startingOffsets", "latest")
        .load()
        .selectExpr("CAST(value AS STRING)");

    // Split the lines into timestamp and words
    StructType wordsSchema = StructType$.MODULE$.apply(
        new StructField[]{
            StructField.apply("timestamp", TimestampType$.MODULE$, true, Metadata.empty()),
            StructField.apply("value", StringType$.MODULE$, true, Metadata.empty())
        }
    );
    ExpressionEncoder<Row> encoder = RowEncoder.apply(wordsSchema);
    final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss");

    Dataset<Row> words = lines
        .flatMap(
            (FlatMapFunction<Row, Row>) row -> {
              // parse Kafka record in format: "timestamp(iso8601) text"
              String text = row.getString(0);
              String timestampString = text.substring(0, 25);
              String message = text.substring(26);
              Timestamp timestamp = new Timestamp(dateFormat.parse(timestampString).getTime());

              return Arrays.asList(message.split(" ")).stream()
                  .map(word -> RowFactory.create(timestamp, word)).iterator();
            }
            , encoder);

    // Time window aggregation
    Dataset<Row> wordCounts = words
        .withWatermark("timestamp", "1 minutes")
        .groupBy(
            functions.window(functions.col("timestamp"), "1 minutes", "1 minutes"),
            functions.col("value")
        )
        .count()
        .selectExpr("CAST(window.start AS timestamp) AS START_TIME",
            "CAST(window.end AS timestamp) AS END_TIME",
            "value AS WORD", "CAST(count AS long) AS COUNT");

    wordCounts.printSchema();

    // Reducing to a single partition
    wordCounts = wordCounts.coalesce(1);

    // Start streaming query
    StreamingQuery query = null;
    switch (type) {
      case "console":
        query = outputToConsole(wordCounts, checkpointLocation);
        break;
      case "csv":
        query = outputToCsv(wordCounts, checkpointLocation, outputLocation);
        break;
      default:
        System.err.println("Unknown type " + type);
        System.exit(1);
    }

    query.awaitTermination();
  }

  private static void printUsage() {
    System.err.println("Usage: StructuredKafkaWordCount <bootstrap-servers> " +
        "<subscribe-topics> <checkpoint-location> <type> ...");
    System.err.println("<type>: console");
    System.err.println("<type>: csv <output-location>");
    System.err.println("<type>: adw <wallet-path> <wallet-password> <tns-name>");
    System.exit(1);
  }

  private static StreamingQuery outputToConsole(Dataset<Row> wordCounts, String checkpointLocation)
      throws TimeoutException {
    return wordCounts
        .writeStream()
        .format("console")
        .outputMode("complete")
        .option("checkpointLocation", checkpointLocation)
        .start();
  }

  private static StreamingQuery outputToCsv(Dataset<Row> wordCounts, String checkpointLocation,
      String outputLocation) throws TimeoutException {
    return wordCounts
        .writeStream()
        .format("csv")
        .outputMode("append")
        .option("checkpointLocation", checkpointLocation)
        .trigger(Trigger.ProcessingTime("1 minutes"))
        .option("path", outputLocation)
        .start();
  }
}
Python-Beispielanwendung

Dies ist eine Python-Beispielanwendung für Data Flow.

#!/usr/bin/env python3
 
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.functions import concat, col, current_timestamp, lit, window, \
  substring, to_timestamp, explode, split, length
 
import argparse
import os
 
 
def main():
  parser = argparse.ArgumentParser()
  parser.add_argument('--auth-type', default='PLAIN')
  parser.add_argument('--bootstrap-port', default='9092')
  parser.add_argument('--bootstrap-server')
  parser.add_argument('--checkpoint-location')
  parser.add_argument('--encryption', default='SASL_SSL')
  parser.add_argument('--ocid')
  parser.add_argument('--output-location')
  parser.add_argument('--output-mode', default='file')
  parser.add_argument('--stream-password')
  parser.add_argument('--raw-stream')
  parser.add_argument('--stream-username')
  args = parser.parse_args()
 
  if args.bootstrap_server is None:
    args.bootstrap_server = os.environ.get('BOOTSTRAP_SERVER')
  if args.raw_stream is None:
    args.raw_stream = os.environ.get('RAW_STREAM')
  if args.stream_username is None:
    args.stream_username = os.environ.get('STREAM_USERNAME')
  if args.stream_password is None:
    args.stream_password = os.environ.get('STREAM_PASSWORD')
 
  assert args.bootstrap_server is not None, "Kafka bootstrap server (--bootstrap-server) name must be set"
  assert args.checkpoint_location is not None, "Checkpoint location (--checkpoint-location) must be set"
  assert args.output_location is not None, "Output location (--output-location) must be set"
  assert args.raw_stream is not None, "Kafka topic (--raw-stream) name must be set"
 
  spark = (
    SparkSession.builder
      .appName('StructuredKafkaWordCount')
      .config('failOnDataLoss', 'true')
      .config('spark.sql.streaming.minBatchesToRetain', '10')
      .config('spark.sql.shuffle.partitions', '1')
      .config('spark.sql.streaming.stateStore.maintenanceInterval', '300')
      .getOrCreate()
  )
 
  # Uncomment following line to enable debug log level.
  # spark.sparkContext.setLogLevel('DEBUG')
 
  # Configure Kafka connection settings.
  if args.ocid is not None:
    jaas_template = 'com.oracle.bmc.auth.sasl.ResourcePrincipalsLoginModule required intent="streamPoolId:{ocid}";'
    args.auth_type = 'OCI-RSA-SHA256'
  else:
    jaas_template = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="{username}" password="{password}";'
 
  raw_kafka_options = {
    'kafka.sasl.jaas.config': jaas_template.format(
      username=args.stream_username, password=args.stream_password,
      ocid=args.ocid
    ),
    'kafka.sasl.mechanism': args.auth_type,
    'kafka.security.protocol': args.encryption,
    'kafka.bootstrap.servers': '{}:{}'.format(args.bootstrap_server,
                                              args.bootstrap_port),
    'subscribe': args.raw_stream,
    'kafka.max.partition.fetch.bytes': 1024 * 1024,
    'startingOffsets': 'latest'
  }
 
  # Reading raw Kafka stream.
  raw = spark.readStream.format('kafka').options(**raw_kafka_options).load()
 
  # Cast raw lines to a string.
  lines = raw.selectExpr('CAST(value AS STRING)')
 
  # Split value column into timestamp and words columns.
  parsedLines = (
    lines.select(
      to_timestamp(substring('value', 1, 25))
        .alias('timestamp'),
      lines.value.substr(lit(26), length('value') - 25)
        .alias('words'))
  )
 
  # Split words into array and explode single record into multiple.
  words = (
    parsedLines.select(
      col('timestamp'),
      explode(split('words', ' ')).alias('word')
    )
  )
 
  # Do time window aggregation
  wordCounts = (
    words
      .withWatermark('timestamp', '1 minutes')
      .groupBy('word', window('timestamp', '1 minute'))
      .count()
      .selectExpr('CAST(window.start AS timestamp) AS START_TIME',
                  'CAST(window.end AS timestamp) AS END_TIME',
                  'word AS WORD',
                  'CAST(count AS long) AS COUNT')
  )
 
  # Reduce partitions to a single.
  wordCounts = wordCounts.coalesce(1)
 
  wordCounts.printSchema()
 
  # Output it to the chosen channel.
  if args.output_mode == 'console':
    print("Writing aggregates to console")
    query = (
      wordCounts.writeStream
        .option('checkpointLocation', args.checkpoint_location)
        .outputMode('complete')
        .format('console')
        .option('truncate', False)
        .start()
    )
  else:
    print("Writing aggregates to Object Storage")
    query = (
      wordCounts.writeStream
        .format('csv')
        .outputMode('append')
        .trigger(processingTime='1 minutes')
        .option('checkpointLocation', args.checkpoint_location)
        .option('path', args.output_location)
        .start()
    )
 
  query.awaitTermination()
 
 
main()

Verbindung zu einer Streamingquelle in einem privaten Subnetz herstellen

Führen Sie diese Schritte aus, um eine Verbindung zu einer Streamingquelle in einem privaten Subnetz herzustellen.

Kopieren Sie den FDQN der Streamingquelle, um Traffic zwischen VNICs innerhalb des privaten Subnetzes zuzulassen, das zum Erstellen des privaten Data Flow-Endpunkts verwendet wird. Wenn sich die Streamingquelle in einem anderen Subnetz als der private Data Flow-Endpunkt befindet, lassen Sie Traffic zwischen dem Streaming-Subnetz und dem privaten Data Flow-Endpunktsubnetz zu.

  1. Erstellen Sie einen Streamingpool mit einem privaten Endpunkt.
    Weitere Informationen finden Sie in der Streaming-Dokumentation.
  2. Zeigen Sie die Streampooldetails an, und kopieren Sie den Wert für FDQN.
  3. Bearbeiten Sie den privaten Endpunkt, und ersetzen Sie den Wert von zu steuernden DNS-Zonen durch den Wert des Streampool-FDQN, den Sie im vorherigen Schritt kopiert haben.
  4. Hängen Sie den privaten Endpunkt an die Streaminganwendung an.
  5. Führen Sie die Anwendung aus.