Introduzione allo streaming Spark

Prima di poter utilizzare lo streaming Spark con Data Flow, è necessario impostarlo.

Apache Spark unifica l'elaborazione in batch, l'elaborazione in streaming e l'apprendimento automatico in un'unica API. Data Flow esegue le applicazioni Spark all'interno di un runtime Apache Spark standard. Quando esegui un'applicazione di streaming, Data Flow non utilizza un runtime diverso, ma esegue l'applicazione Spark in un modo diverso:
Differenze tra esecuzioni di streaming e non di streaming
Elemento differente Esecuzione non in streaming Esecuzione streaming
Autenticazione Utilizza un token OBO (On-Behalf-Of) dell'utente richiedente. I token OBO scadono dopo 24 ore, pertanto non sono adatti per lavori con tempi di esecuzione lunghi. Accedi a Oracle Cloud Infrastructure Object Storage utilizzando i token di sessione legati al principal delle risorse dell'esecuzione. È adatto per lavori di lunga durata.
Riavvia criterio Non riesce se il codice di uscita del runtime Spark è diverso da zero. Riavvia fino a dieci volte se il codice di uscita del runtime Spark è diverso da zero.
Criterio patch Nessun criterio di applicazione delle patch poiché si prevede che i job durino meno di 24 ore. Patch mensili automatiche.
  1. Crea un'applicazione di streaming Spark.
    Quando viene eseguita, l'applicazione utilizza l'autenticazione del principal delle risorse, l'applicazione di patch automatiche e il riavvio automatico.
  2. Impostazione di un criterio per lo streaming Spark
    Poiché le tue applicazioni di streaming Spark utilizzano i token di sessione del principal delle risorse per eseguire l'autenticazione alle risorse di Oracle Cloud Infrastructure, è necessario creare criteri IAM che autorizzano le tue applicazioni prima di poter accedere a queste risorse. Le esecuzioni dei flussi di dati vengono avviate su richiesta in modo da non poter utilizzare l'OCID di esecuzione nel criterio IAM perché non viene allocato fino all'avvio dell'esecuzione. Collegare invece le risorse dell'esecuzione a una risorsa permanente e farvi riferimento nel criterio IAM. I due modi più comuni per farlo sono:
    ID applicazione padre
    Connettere l'esecuzione del flusso di dati all'applicazione del flusso di dati che l'ha creata e inserire l'ID applicazione del flusso di dati nel criterio IAM. Per impostare le autorizzazioni per una determinata applicazione, creare un gruppo dinamico che corrisponda a tutte le esecuzioni avviate dall'applicazione e autorizzare il gruppo dinamico ad accedere alle risorse IAM. Ogni esecuzione include una tag che la associa all'applicazione padre. È possibile utilizzare questa tag in una regola di corrispondenza del gruppo dinamico.
    Nota

    Questa tag non può essere utilizzata in un criterio IAM "any-user", è necessario creare un gruppo dinamico.
    Ad esempio, se si dispone di un'applicazione di flusso dati con ID ocid1.dataflowapplication.oc1.iad.A, si crea un gruppo dinamico:
    ALL {resource.type='dataflowrun', tag.oci-dataflow.application-id.value='ocid1.dataflowapplication.oc1.iad.A'}
    con i seguenti criteri:
    allow dynamic-group <dynamic_group_name> to manage objects in tenancy where all {
     target.bucket.name='<bucket_name>'
    }
    allow dynamic-group <dynamic_group_name> to {STREAM_INSPECT, STREAM_READ, STREAM_CONSUME} in tenancy where all {
     target.streampool.id='<streampool_id>'
    }
    ID compartimento di destinazione

    Connettere l'esecuzione del flusso di dati al compartimento in cui vengono create le esecuzioni e inserire l'ID compartimento nel criterio IAM. Questo approccio è meno specifico, poiché qualsiasi applicazione Spark eseguita nel compartimento ottiene l'accesso a queste risorse. Se prevedi di utilizzare spark-submit tramite CLI, devi utilizzare questo approccio poiché sia l'ID applicazione che l'ID esecuzione sono su richiesta.

    Ad esempio, se si dispone di un'esecuzione con ID ocid1.dataflowrun.oc1.iad.R2 in un compartimento con ID ocid1.tenancy.oc1.C, si avrebbero i criteri riportati di seguito.
    allow any-user to manage objects in tenancy where all {
     request.principal.type='dataflowrun',
     request.compartment.id='ocid1.tenancy.oc1.C',
     target.bucket.name='<bucket_name>'
    }
    allow any-user to {STREAM_INSPECT, STREAM_READ, STREAM_CONSUME} in tenancy where all {
     request.principal.type='dataflowrun',
     request.compartment.id='ocid1.tenancy.oc1.C',
     target.streampool.id='<streampool_id>'
    }

Connessione a Oracle Cloud Infrastructure Streaming

Scopri come connetterti a Oracle Cloud Infrastructure Streaming.

Configura streaming:
  • Impostare Oracle Streaming Service e creare un flusso.
    Nota

    Oracle Streaming Service ha i limiti seguenti:
    • I messaggi in un flusso vengono conservati per non meno di 24 ore e non più di sette giorni.
    • Tutti i messaggi in un flusso vengono eliminati dopo la scadenza del periodo di conservazione, indipendentemente dal fatto che siano stati letti o meno.
    • Il periodo di conservazione di un vapore non può essere modificato dopo la creazione del flusso.
    • Una tenancy ha un limite predefinito pari a zero o cinque partizioni, a seconda della licenza in uso. Se sono necessarie più partizioni, è possibile richiedere un aumento del limite del servizio.
    • Il numero di partizioni per un flusso non può essere modificato dopo la creazione del flusso.
    • Un singolo flusso può supportare fino a 50 gruppi di consumatori che leggono da esso.
    • Ogni partizione ha una scrittura totale di dati di 1 MB al secondo. Non è previsto alcun limite al numero di richieste PUT, a condizione che il limite di scrittura dei dati non venga superato.
    • Ogni partizione dispone di cinque richieste GET al secondo per gruppo di consumer. Poiché un singolo flusso può supportare fino a 50 gruppi di consumer e una singola partizione in un flusso può essere letta da un solo consumer in un gruppo di consumer, una partizione può supportare fino a 250 richieste GET al secondo.
    • I produttori possono pubblicare un messaggio non superiore a 1 MB in un flusso.
    • Una richiesta non può superare 1 MB. La dimensione di una richiesta è la somma delle chiavi e dei messaggi dopo che sono stati decodificati da Base64.
  • Aggiungere criteri di streaming a Flusso dati.
Connettersi a Kafka utilizzando Java o Python. Autenticare in uno dei due modi indicati di seguito.
  • Utilizzare una password o un token di autenticazione non codificati. Questo metodo è adatto per test rapidi in ambiente incrociato. Ad esempio, la creazione di prototipi di applicazioni di streaming strutturate Spark, in cui si desidera eseguire localmente e su Data Flow su Oracle Streaming Service.
    Nota

    L'hardcoding o l'esposizione della password negli argomenti dell'applicazione non sono considerati sicuri, pertanto non utilizzare questo metodo per i cicli di produzione.
  • L'autenticazione del principal risorsa è più sicura della password semplice o del token di autenticazione. È un modo più flessibile per eseguire l'autenticazione con il servizio di streaming Oracle. Impostare i criteri di streaming per utilizzare l'autenticazione del principal risorsa.

Sono disponibili un'applicazione di esempio Java e un'applicazione di esempio Python.

  1. Trovare il pool di flussi che si desidera utilizzare per connettersi a Kafka.
    1. Selezionare Home.
    2. Selezionare Streaming.
    3. Seleziona pool di streaming.
    4. Selezionare il pool di flussi che si desidera utilizzare per visualizzarne i dettagli.
    5. Selezionare Impostazioni connessione Kafka.
    6. Copiare le seguenti informazioni:
      • OCID pool di flussi
      • Server bootstrap
      • Stringa di connessione
      • Protocollo di sicurezza, ad esempio SASL_SSL
      • Meccanismo di sicurezza, ad esempio PLAIN
      Nota

      Se la password nella stringa di connessione è impostata su AUTH_TOKEN, creare un token di autenticazione o utilizzare un token esistente (password="<auth_token>") per l'utente specificato nel nome utente (username="<tenancy>/<username>/<stream_pool_id>":
      1. Seleziona identità.
      2. Selezionare Utenti.
      3. Visualizzare i dettagli utente per l'utente.
      4. Creare un token di autenticazione o utilizzarne uno esistente.
  2. Spark non si associa alle librerie di integrazione Kafka per impostazione predefinita, pertanto è necessario aggiungerlo come parte delle dipendenze dell'applicazione Spark.
    • Per le applicazioni Java o Scala che utilizzano definizioni di progetto SBT o Maven, collegare l'applicazione con questo artifact:
      groupId = org.apache.spark
      artifactId = spark-sql-kafka-0-10_2.12
      version = 3.0.2
      Nota

      Per utilizzare la funzionalità Intestazioni, la versione del client Kafka deve essere almeno 0.11.0.0.
    • Per le applicazioni Python, aggiungere le librerie di integrazione e le dipendenze Kafka durante la distribuzione dell'applicazione.
    • Se si utilizza l'autenticazione del principal risorsa di Data Flow, è necessario eseguire le operazioni riportate di seguito.
      groupId = com.oracle.oci.sdk
      artifactId = oci-java-sdk-addons-sasl
      version = 1.36.1
  3. Configurare il sistema.
    Il funzionamento delle connessioni Kafka viene controllato mediante la configurazione del sistema, ad esempio server, autenticazione, argomento, gruppi e così via. La configurazione è potente con un singolo cambiamento di valore che ha un grande effetto sull'intero sistema.
    Configurazione comune
    subscribe = <Kafka_topic_to_consume>
    kafka.max.partition.fetch.bytes = <Fetch_rate_limit>
    startingOffsets = <Start_point_of_the_first_run_of_the_application> 
    failOnDataLoss = <Failure_streaming_application>
    Per ulteriori informazioni sul limite di frequenza di recupero, vedere Limiti sulle risorse di streaming.
    Nota

    I riavvii successivi continuano dall'ultimo checkpoint, non dal luogo specificato in startingOffsets. Per altre opzioni, vedere Structured Streaming + Kafka Integration Guide (Kafka broker versione 0.10.0 o successiva).

    failOnDataLoss specifica l'applicazione di streaming da utilizzare quando non è possibile recuperare i dati perché sono stati rimossi da Oracle Streaming.

    Configurazione avanzata

    Vedere Spark Streaming Kafka Integration Guide.

    Configurazioni di esempio
    Password semplice:
    kafka.bootstrap.servers = <bootstrap_server_name>:<port_number>
    kafka.security.protocol = SASL_SSL
    kafka.sasl.mechanism = PLAIN
    kafka.sasl.jaas.config = org.apache.kafka.common.security.plain.PlainLoginModule required username="<tenancy_name>/<username>/<streampool_ocid>" password="<example-password>";
    Principal risorsa:
    kafka.bootstrap.servers = <bootstrap_server_name>:<port_number>
    kafka.security.protocol = SASL_SSL
    kafka.sasl.mechanism = OCI-RSA-SHA256
    kafka.sasl.jaas.config = com.oracle.bmc.auth.sasl.ResourcePrincipalsLoginModule required intent="streamPoolId:<streampool_ocid>";
  4. Connettersi a Kafka.
    Connessioni di esempio.
    Java con principal delle risorse per lo streaming di Oracle Cloud Infrastructure
    // Create DataFrame representing the stream of input lines from Kafka
    Dataset<Row> lines = spark
        .readStream()
        .format("kafka")
        .option("kafka.bootstrap.servers", bootstrapServers)
        .option("subscribe", topics)
        .option("kafka.security.protocol", "SASL_SSL")
        .option("kafka.sasl.mechanism", "OCI-RSA-SHA256")
        .option("kafka.sasl.jaas.config", "com.oracle.bmc.auth.sasl.ResourcePrincipalsLoginModule required intent=\"streamPoolId:<streampool_ocid>\";")
        .option("kafka.max.partition.fetch.bytes", 1024 * 1024) // limit request size to 1MB per partition
        .option("startingOffsets", "latest")
    Java con una password semplice
    // Create DataFrame representing the stream of input lines from Kafka
    Dataset<Row> lines = spark
        .readStream()
        .format("kafka")
        .option("kafka.bootstrap.servers", bootstrapServers)
        .option("subscribe", topics)
        .option("kafka.security.protocol", "SASL_SSL")
        .option("kafka.sasl.mechanism", "PLAIN")
        .option("kafka.sasl.jaas.config",
            "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<tenancy_name>/<username>/<streampool_ocid>" password=\"<example-password> \";")
        .option("kafka.max.partition.fetch.bytes", 1024 * 1024) // limit request size to 1MB per partition
        .option("startingOffsets", "latest")
        .load()
    Python
    spark = (
        SparkSession.builder.config("failOnDataLoss", "false")
        .appName("kafka_streaming_aggregate")
        .getOrCreate()
    )
    spark.sparkContext.setLogLevel("WARN")
     
    # Configure settings we need to read Kafka.
    if args.ocid is not None:
        jaas_template = 'com.oracle.bmc.auth.sasl.ResourcePrincipalsLoginModule required intent="streamPoolId:{ocid}";'
        args.auth_type = "OCI-RSA-SHA256"
    else:
        jaas_template = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="{username}" password="{password}";'
     
    # For the raw source stream.
    raw_kafka_options = {
        "kafka.sasl.jaas.config": jaas_template.format(
            username=args.stream_username, password=args.stream_password, ocid=args.ocid
        ),
        "kafka.sasl.mechanism": args.auth_type,
        "kafka.security.protocol": args.encryption,
        "kafka.bootstrap.servers": "{}:{}".format(
            args.bootstrap_server, args.bootstrap_port
        ),
        "group.id": args.raw_stream,
        "subscribe": args.raw_stream,
    }
     
    # The actual reader.
    raw = spark.readStream.format("kafka").options(**raw_kafka_options).load()
    Nota

    Per utilizzare Python con il principal delle risorse per lo streaming di Oracle Cloud Infrastructure, è necessario utilizzare archive.zip. Per ulteriori informazioni, vedere la sezione sulla funzionalità Spark-Submit in Data Flow.
Applicazione Java di esempio

Questa è un'applicazione Java di esempio per Data Flow.

package example;

import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
import org.apache.spark.sql.catalyst.encoders.RowEncoder;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.Trigger;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StringType$;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.StructType$;
import org.apache.spark.sql.types.TimestampType$;

import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.concurrent.TimeoutException;

public class StructuredKafkaWordCount {

  public static void main(String[] args) throws Exception {

    Logger log = LogManager.getLogger(StructuredKafkaWordCount.class);
    log.info("Started StructuredKafkaWordCount");

    Thread.setDefaultUncaughtExceptionHandler((thread, e) -> {
      log.error("Exception uncaught: ", e);
    });

    //Uncomment following line to enable debug log level.
    //Logger.getRootLogger().setLevel(Level.DEBUG);

    if (args.length < 4) {
      printUsage();
    }

    String bootstrapServers = args[0];
    String topics = args[1];
    String checkpointLocation = args[2];
    String type = args[3];
    String outputLocation = null;

    switch (type) {
      case "console":
        System.err.println("Using console output sink");
        break;

      case "csv":
        if (args.length < 5) {
          printUsage();
        }
        outputLocation = args[4];
        System.err.println("Using csv output sink, output location = " + outputLocation);
        break;

      default:
        printUsage();
    }

    SparkSession spark;

    SparkConf conf = new SparkConf();
    if (conf.contains("spark.master")) {
      spark = SparkSession.builder()
          .appName("StructuredKafkaWordCount")
          .config("spark.sql.streaming.minBatchesToRetain", "10")
          .config("spark.sql.shuffle.partitions", "1")
          .config("spark.sql.streaming.stateStore.maintenanceInterval", "300")
          .getOrCreate();
    } else {
      spark = SparkSession.builder()
          .appName("StructuredKafkaWordCount")
          .master("local[*]")
          .config("spark.sql.streaming.minBatchesToRetain", "10")
          .config("spark.sql.shuffle.partitions", "1")
          .config("spark.sql.streaming.stateStore.maintenanceInterval", "300")
          .getOrCreate();
    }

    // Create DataFrame representing the stream of input lines from Kafka
    Dataset<Row> lines = spark
        .readStream()
        .format("kafka")
        .option("kafka.bootstrap.servers", bootstrapServers)
        .option("subscribe", topics)
        .option("kafka.security.protocol", "SASL_SSL")
        .option("kafka.sasl.mechanism", "OCI-RSA-SHA256")
        .option("kafka.sasl.jaas.config",
            "com.oracle.bmc.auth.sasl.ResourcePrincipalsLoginModule required intent=\"streamPoolId:ocid1.streampool.oc1.phx.amaaaaaaep4fdfaartcuoi5y72mkrcg7hzogcx2jnygbmgik3hqwssxqa6pq\";")
        .option("kafka.max.partition.fetch.bytes", 1024 * 1024) // limit request size to 1MB per partition
        .option("startingOffsets", "latest")
        .load()
        .selectExpr("CAST(value AS STRING)");

    // Split the lines into timestamp and words
    StructType wordsSchema = StructType$.MODULE$.apply(
        new StructField[]{
            StructField.apply("timestamp", TimestampType$.MODULE$, true, Metadata.empty()),
            StructField.apply("value", StringType$.MODULE$, true, Metadata.empty())
        }
    );
    ExpressionEncoder<Row> encoder = RowEncoder.apply(wordsSchema);
    final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss");

    Dataset<Row> words = lines
        .flatMap(
            (FlatMapFunction<Row, Row>) row -> {
              // parse Kafka record in format: "timestamp(iso8601) text"
              String text = row.getString(0);
              String timestampString = text.substring(0, 25);
              String message = text.substring(26);
              Timestamp timestamp = new Timestamp(dateFormat.parse(timestampString).getTime());

              return Arrays.asList(message.split(" ")).stream()
                  .map(word -> RowFactory.create(timestamp, word)).iterator();
            }
            , encoder);

    // Time window aggregation
    Dataset<Row> wordCounts = words
        .withWatermark("timestamp", "1 minutes")
        .groupBy(
            functions.window(functions.col("timestamp"), "1 minutes", "1 minutes"),
            functions.col("value")
        )
        .count()
        .selectExpr("CAST(window.start AS timestamp) AS START_TIME",
            "CAST(window.end AS timestamp) AS END_TIME",
            "value AS WORD", "CAST(count AS long) AS COUNT");

    wordCounts.printSchema();

    // Reducing to a single partition
    wordCounts = wordCounts.coalesce(1);

    // Start streaming query
    StreamingQuery query = null;
    switch (type) {
      case "console":
        query = outputToConsole(wordCounts, checkpointLocation);
        break;
      case "csv":
        query = outputToCsv(wordCounts, checkpointLocation, outputLocation);
        break;
      default:
        System.err.println("Unknown type " + type);
        System.exit(1);
    }

    query.awaitTermination();
  }

  private static void printUsage() {
    System.err.println("Usage: StructuredKafkaWordCount <bootstrap-servers> " +
        "<subscribe-topics> <checkpoint-location> <type> ...");
    System.err.println("<type>: console");
    System.err.println("<type>: csv <output-location>");
    System.err.println("<type>: adw <wallet-path> <wallet-password> <tns-name>");
    System.exit(1);
  }

  private static StreamingQuery outputToConsole(Dataset<Row> wordCounts, String checkpointLocation)
      throws TimeoutException {
    return wordCounts
        .writeStream()
        .format("console")
        .outputMode("complete")
        .option("checkpointLocation", checkpointLocation)
        .start();
  }

  private static StreamingQuery outputToCsv(Dataset<Row> wordCounts, String checkpointLocation,
      String outputLocation) throws TimeoutException {
    return wordCounts
        .writeStream()
        .format("csv")
        .outputMode("append")
        .option("checkpointLocation", checkpointLocation)
        .trigger(Trigger.ProcessingTime("1 minutes"))
        .option("path", outputLocation)
        .start();
  }
}
Applicazione Python di esempio

Si tratta di un'applicazione Python di esempio per Data Flow.

#!/usr/bin/env python3
 
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.functions import concat, col, current_timestamp, lit, window, \
  substring, to_timestamp, explode, split, length
 
import argparse
import os
 
 
def main():
  parser = argparse.ArgumentParser()
  parser.add_argument('--auth-type', default='PLAIN')
  parser.add_argument('--bootstrap-port', default='9092')
  parser.add_argument('--bootstrap-server')
  parser.add_argument('--checkpoint-location')
  parser.add_argument('--encryption', default='SASL_SSL')
  parser.add_argument('--ocid')
  parser.add_argument('--output-location')
  parser.add_argument('--output-mode', default='file')
  parser.add_argument('--stream-password')
  parser.add_argument('--raw-stream')
  parser.add_argument('--stream-username')
  args = parser.parse_args()
 
  if args.bootstrap_server is None:
    args.bootstrap_server = os.environ.get('BOOTSTRAP_SERVER')
  if args.raw_stream is None:
    args.raw_stream = os.environ.get('RAW_STREAM')
  if args.stream_username is None:
    args.stream_username = os.environ.get('STREAM_USERNAME')
  if args.stream_password is None:
    args.stream_password = os.environ.get('STREAM_PASSWORD')
 
  assert args.bootstrap_server is not None, "Kafka bootstrap server (--bootstrap-server) name must be set"
  assert args.checkpoint_location is not None, "Checkpoint location (--checkpoint-location) must be set"
  assert args.output_location is not None, "Output location (--output-location) must be set"
  assert args.raw_stream is not None, "Kafka topic (--raw-stream) name must be set"
 
  spark = (
    SparkSession.builder
      .appName('StructuredKafkaWordCount')
      .config('failOnDataLoss', 'true')
      .config('spark.sql.streaming.minBatchesToRetain', '10')
      .config('spark.sql.shuffle.partitions', '1')
      .config('spark.sql.streaming.stateStore.maintenanceInterval', '300')
      .getOrCreate()
  )
 
  # Uncomment following line to enable debug log level.
  # spark.sparkContext.setLogLevel('DEBUG')
 
  # Configure Kafka connection settings.
  if args.ocid is not None:
    jaas_template = 'com.oracle.bmc.auth.sasl.ResourcePrincipalsLoginModule required intent="streamPoolId:{ocid}";'
    args.auth_type = 'OCI-RSA-SHA256'
  else:
    jaas_template = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="{username}" password="{password}";'
 
  raw_kafka_options = {
    'kafka.sasl.jaas.config': jaas_template.format(
      username=args.stream_username, password=args.stream_password,
      ocid=args.ocid
    ),
    'kafka.sasl.mechanism': args.auth_type,
    'kafka.security.protocol': args.encryption,
    'kafka.bootstrap.servers': '{}:{}'.format(args.bootstrap_server,
                                              args.bootstrap_port),
    'subscribe': args.raw_stream,
    'kafka.max.partition.fetch.bytes': 1024 * 1024,
    'startingOffsets': 'latest'
  }
 
  # Reading raw Kafka stream.
  raw = spark.readStream.format('kafka').options(**raw_kafka_options).load()
 
  # Cast raw lines to a string.
  lines = raw.selectExpr('CAST(value AS STRING)')
 
  # Split value column into timestamp and words columns.
  parsedLines = (
    lines.select(
      to_timestamp(substring('value', 1, 25))
        .alias('timestamp'),
      lines.value.substr(lit(26), length('value') - 25)
        .alias('words'))
  )
 
  # Split words into array and explode single record into multiple.
  words = (
    parsedLines.select(
      col('timestamp'),
      explode(split('words', ' ')).alias('word')
    )
  )
 
  # Do time window aggregation
  wordCounts = (
    words
      .withWatermark('timestamp', '1 minutes')
      .groupBy('word', window('timestamp', '1 minute'))
      .count()
      .selectExpr('CAST(window.start AS timestamp) AS START_TIME',
                  'CAST(window.end AS timestamp) AS END_TIME',
                  'word AS WORD',
                  'CAST(count AS long) AS COUNT')
  )
 
  # Reduce partitions to a single.
  wordCounts = wordCounts.coalesce(1)
 
  wordCounts.printSchema()
 
  # Output it to the chosen channel.
  if args.output_mode == 'console':
    print("Writing aggregates to console")
    query = (
      wordCounts.writeStream
        .option('checkpointLocation', args.checkpoint_location)
        .outputMode('complete')
        .format('console')
        .option('truncate', False)
        .start()
    )
  else:
    print("Writing aggregates to Object Storage")
    query = (
      wordCounts.writeStream
        .format('csv')
        .outputMode('append')
        .trigger(processingTime='1 minutes')
        .option('checkpointLocation', args.checkpoint_location)
        .option('path', args.output_location)
        .start()
    )
 
  query.awaitTermination()
 
 
main()

Connessione a un'origine di streaming in una subnet privata

Attenersi alla procedura riportata di seguito per connettersi a un'origine di streaming in una subnet privata.

Copiare l'FDQN di origine streaming per consentire il traffico tra le VNIC all'interno della subnet privata utilizzata per creare l'endpoint privato del flusso di dati. Se l'origine di streaming si trova in una subnet diversa dall'endpoint privato del flusso di dati, consenti traffico tra la subnet di streaming e la subnet dell'endpoint privato del flusso di dati.

  1. Creare un pool di streaming con un endpoint privato.
    Per ulteriori informazioni, vedere la documentazione di Streaming.
  2. Visualizzare i dettagli del pool di flussi e copiare il valore per FDQN.
  3. Modificare l'endpoint privato e sostituire il valore delle zone DNS da controllare con il valore dell'FDQN del pool di flussi copiato nel passo precedente.
  4. Collegare l'endpoint privato all'applicazione di streaming.
  5. Eseguire l'applicazione.