Introduction à Spark Streaming

Pour pouvoir utiliser Spark Streaming avec Data Flow, vous devez procéder à sa configuration.

Apache Spark regroupe le traitement en batch, le traitement de flux de données et l'apprentissage automatique dans une seule API. Data Flow exécute les applications Spark dans une exécution Apache Spark standard. Lorsque vous exécutez une application de streaming, Data Flow n'utilise pas une autre exécution, il exécute plutôt l'application Spark d'une autre manière :
Différences entre les exécutions de transmission en continu et les exécutions autres que de transmission en continu
Différence Exécution autre que de transmission en continu Exécution de transmission en continu
Authentification Utilise un jeton OBO (On-Behalf-Of) de l'utilisateur à l'origine de la demande. Les jetons OBO expirent au bout de 24 heures. Ils ne conviennent donc pas aux travaux à longue durée d'exécution. Accède à Oracle Cloud Infrastructure Object Storage grâce à des tokens de session liés au principal de ressource de l'exécution. Ils conviennent aux travaux à longue durée d'exécution.
Stratégie de redémarrage Echoue si le code de sortie de l'exécution Spark est différent de zéro. Redémarre jusqu'à dix fois si le code de sortie de l'exécution Spark est différent de zéro.
Stratégie de patch Aucune stratégie d'application de patches car les travaux sont censés durer moins de 24 heures. Patches mensuels automatiques.
  1. Créez une application Spark Streaming.
    Lorsque l'application est exécutée, elle utilise l'authentification par principal de ressource, l'application automatique de patches et le redémarrage automatique.
  2. Configuration d'une stratégie pour Spark Streaming
    Vos applications Spark Streaming utilisant les jetons de session de principal de ressource pour l'authentification auprès des ressources Oracle Cloud Infrastructure, vous devez créer des stratégies IAM qui autorisent vos applications à accéder à ces ressources. Les exécutions Data Flow sont démarrées à la demande. Vous ne pouvez donc pas utiliser l'OCID d'exécution dans votre stratégie IAM, car il n'est alloué qu'au démarrage de l'exécution. Associez plutôt les ressources de l'exécution à une ressource permanente et faites référence à cette dernière dans la stratégie IAM. Les deux méthodes les plus courantes sont les suivantes :
    ID d'application parent
    Associez l'exécution Data Flow à l'application Data Flow qui l'a créée, puis indiquez l'ID de l'application Data Flow dans la stratégie IAM. Afin de définir des droits d'accès pour une application spécifique, créez un groupe dynamique correspondant à toutes les exécutions démarrées à partir de l'application et autorisez le groupe dynamique à accéder aux ressources IAM. Chaque exécution comprend une balise l'associant à son application parent. Vous pouvez utiliser cette balise dans une règle de mise en correspondance de groupe dynamique.
    Remarque

    Cette balise ne peut pas être utilisée dans une stratégie IAM "any-user". Vous devez créer un groupe dynamique.
    Par exemple, si vous disposez d'une application Data Flow avec l'ID ocid1.dataflowapplication.oc1.iad.A, vous créez un groupe dynamique comme suit :
    ALL {resource.type='dataflowrun', tag.oci-dataflow.application-id.value='ocid1.dataflowapplication.oc1.iad.A'}
    avec les stratégies suivantes :
    allow dynamic-group <dynamic_group_name> to manage objects in tenancy where all {
     target.bucket.name='<bucket_name>'
    }
    allow dynamic-group <dynamic_group_name> to {STREAM_INSPECT, STREAM_READ, STREAM_CONSUME} in tenancy where all {
     target.streampool.id='<streampool_id>'
    }
    ID de compartiment cible

    Associez l'exécution Data Flow à celui dans lequel les exécutions sont créées, puis indiquez l'ID du compartiment dans la stratégie IAM. Cette approche est moins spécifique car toute application Spark exécutée dans le compartiment obtient l'accès à ces ressources. Si vous prévoyez d'utiliser spark-submit via l'interface de ligne de commande, vous devez utiliser cette approche car l'ID d'application et l'ID d'exécution sont obtenus à la demande.

    Par exemple, si vous disposez d'une exécution avec l'ID ocid1.dataflowrun.oc1.iad.R2 dans un compartiment portant l'ID ocid1.tenancy.oc1.C, vous disposez des stratégies suivantes :
    allow any-user to manage objects in tenancy where all {
     request.principal.type='dataflowrun',
     request.compartment.id='ocid1.tenancy.oc1.C',
     target.bucket.name='<bucket_name>'
    }
    allow any-user to {STREAM_INSPECT, STREAM_READ, STREAM_CONSUME} in tenancy where all {
     request.principal.type='dataflowrun',
     request.compartment.id='ocid1.tenancy.oc1.C',
     target.streampool.id='<streampool_id>'
    }

Connexion à Oracle Cloud Infrastructure Streaming

Découvrez comment vous connecter à Oracle Cloud Infrastructure Streaming.

Configurez la transmission en continu :
  • Configurez le service Oracle Streaming et créez un flux de données.
    Remarque

    Le service Oracle Streaming présente les limites suivantes :
    • Les messages d'un flux de données sont conservés pour une durée comprise entre 24 heures et sept jours.
    • Tous les messages d'un flux de données sont supprimés après l'expiration de la période de conservation, qu'ils aient été lus ou non.
    • La période de conservation d'un flux de données ne peut pas être modifiée une fois celui-ci créé.
    • Une location a une limite par défaut de zéro ou cinq partitions en fonction de votre licence. Si vous avez besoin de partitions supplémentaires, vous pouvez demander une augmentation de limite de service.
    • Le nombre de partitions d'un flux de données ne peut pas être modifié une fois celui-ci créé.
    • Un seul flux de données peut prendre en charge jusqu'à 50 groupes de destinataires qui le lisent.
    • Chaque partition dispose d'un taux d'écriture de données total de 1 Mo par seconde. Le nombre de demandes PUT est illimité, à condition que la limite d'écriture de données ne soit pas dépassée.
    • Chaque partition dispose de cinq demandes GET par seconde et par groupe de destinataires. Etant donné qu'un flux de données peut prendre en charge jusqu'à 50 groupes de destinataires et qu'une seule partition de flux peut être lue par seulement un destinataire de groupe de destinataires, une partition peut prendre en charge jusqu'à 250 demandes GET par seconde.
    • Les émetteurs ne peuvent pas publier de message de plus de 1 Mo dans un flux de données.
    • Une demande ne peut pas dépasser 1 Mo. La taille d'une demande correspond à la somme de ses clés et messages après leur décodage à partir de Base64.
  • Ajoutez des stratégies de flux de données à Data Flow.
Connectez-vous à Kafka à l'aide de Java ou de Python. Authentifiez-vous de l'une des deux manières suivantes :
  • Utilisez un mot de passe en clair ou un jeton d'authentification. Cette méthode convient aux tests rapides inter-environnements. Par exemple, prototypage d'une application de transmission en continu structurée Spark, où vous voulez exécuter localement et dans Data Flow sur le service Oracle Streaming.
    Remarque

    Le codage dur ou l'exposition du mot de passe dans les arguments d'application n'est pas considéré comme sécurisé. N'utilisez donc pas cette méthode pour les exécutions de production.
  • L'authentification par principal de ressource est plus sécurisée que l'utilisation d'un mot de passe en clair ou d'un jeton d'authentification. Il s'agit d'un moyen plus flexible pour s'authentifier auprès du service Oracle Streaming. Configurez des stratégies de transmission en continu pour utiliser l'authentification par principal de ressource.

Un exemple d'application Java et un exemple d'application Python sont disponibles.

  1. Recherchez le pool de flux de données à utiliser pour la connexion à Kafka.
    1. Sélectionnez Accueil.
    2. Sélectionnez Transmission en continu.
    3. Sélectionnez Pools de flux de données.
    4. Sélectionnez le pool de flux de données que vous voulez utiliser pour afficher ses détails.
    5. Sélectionnez Paramètres de connexion Kafka.
    6. Copiez les informations suivantes :
      • OCID de pool de flux de données
      • Serveur de démarrage
      • Chaîne de connexion :
      • Protocole de sécurité, par exemple : SASL_SSL
      • Mécanisme de sécurité, par exemple : PLAIN
      Remarque

      Si le mot de passe dans la chaîne de connexion est défini sur AUTH_TOKEN, créez un jeton d'authentification ou utilisez un jeton d'authentification existant (password="<auth_token>") pour l'utilisateur indiqué dans le nom utilisateur (username="<tenancy>/<username>/<stream_pool_id>" :
      1. Sélectionnez Identité.
      2. Sélectionnez Utilisateurs.
      3. Affichez les détails de votre utilisateur.
      4. Créez un jeton d'authentification ou utilisez un jeton existant.
  2. Par défaut, Spark n'est pas lié aux bibliothèques d'intégration Kafka. Vous devez donc les ajouter dans le cadre des dépendances d'application Spark.
    • Pour les applications Java ou Scala utilisant des définitions de projet SBT ou Maven, liez l'application à l'artefact suivant :
      groupId = org.apache.spark
      artifactId = spark-sql-kafka-0-10_2.12
      version = 3.0.2
      Remarque

      Pour utiliser la fonctionnalité d'en-têtes, la version du client Kafka doit être au moins 0.11.0.0.
    • Pour les applications Python, ajoutez les dépendances et les bibliothèques d'intégration Kafka lors du déploiement de votre application.
    • Si vous utilisez l'authentification par principal de ressource Data Flow, vous avez besoin de l'artefact suivant :
      groupId = com.oracle.oci.sdk
      artifactId = oci-java-sdk-addons-sasl
      version = 1.36.1
  3. Configurez le système.
    Pour contrôler le comportement des connexions Kafka, configurez le système, par exemple les serveurs, l'authentification, le sujet, les groupes, etc. La configuration est puissante : la modification d'une seule valeur a une incidence importante sur l'intégralité du système.
    Configuration commune
    subscribe = <Kafka_topic_to_consume>
    kafka.max.partition.fetch.bytes = <Fetch_rate_limit>
    startingOffsets = <Start_point_of_the_first_run_of_the_application> 
    failOnDataLoss = <Failure_streaming_application>
    Pour plus d'informations sur la limite de taux d'extraction, reportez-vous à Limites relatives aux ressources Streaming.
    Remarque

    Les redémarrage ultérieurs se poursuivent à partir du dernier point de reprise, et non à partir de l'emplacement indiqué dans startingOffsets. Pour connaître les autres options, reportez-vous au guide d'intégration Structured Streaming + Kafka (version de broker Kafka 0.10.0 ou supérieure).

    failOnDataLoss indique l'application de transmission en continu à utiliser lorsque les données ne peuvent pas être extraites car elles ont été enlevées d'Oracle Streaming.

    Configuration avancée

    Reportez-vous au guide d'intégration Spark Streaming et Kafka.

    Exemples de configuration
    Mot de passe en clair :
    kafka.bootstrap.servers = <bootstrap_server_name>:<port_number>
    kafka.security.protocol = SASL_SSL
    kafka.sasl.mechanism = PLAIN
    kafka.sasl.jaas.config = org.apache.kafka.common.security.plain.PlainLoginModule required username="<tenancy_name>/<username>/<streampool_ocid>" password="<example-password>";
    Principal de ressource :
    kafka.bootstrap.servers = <bootstrap_server_name>:<port_number>
    kafka.security.protocol = SASL_SSL
    kafka.sasl.mechanism = OCI-RSA-SHA256
    kafka.sasl.jaas.config = com.oracle.bmc.auth.sasl.ResourcePrincipalsLoginModule required intent="streamPoolId:<streampool_ocid>";
  4. Connectez-vous à Kafka.
    Exemples de connexions.
    Java avec principal de ressource pour la diffusion en continu Oracle Cloud Infrastructure
    // Create DataFrame representing the stream of input lines from Kafka
    Dataset<Row> lines = spark
        .readStream()
        .format("kafka")
        .option("kafka.bootstrap.servers", bootstrapServers)
        .option("subscribe", topics)
        .option("kafka.security.protocol", "SASL_SSL")
        .option("kafka.sasl.mechanism", "OCI-RSA-SHA256")
        .option("kafka.sasl.jaas.config", "com.oracle.bmc.auth.sasl.ResourcePrincipalsLoginModule required intent=\"streamPoolId:<streampool_ocid>\";")
        .option("kafka.max.partition.fetch.bytes", 1024 * 1024) // limit request size to 1MB per partition
        .option("startingOffsets", "latest")
    Java avec un mot de passe en clair
    // Create DataFrame representing the stream of input lines from Kafka
    Dataset<Row> lines = spark
        .readStream()
        .format("kafka")
        .option("kafka.bootstrap.servers", bootstrapServers)
        .option("subscribe", topics)
        .option("kafka.security.protocol", "SASL_SSL")
        .option("kafka.sasl.mechanism", "PLAIN")
        .option("kafka.sasl.jaas.config",
            "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<tenancy_name>/<username>/<streampool_ocid>" password=\"<example-password> \";")
        .option("kafka.max.partition.fetch.bytes", 1024 * 1024) // limit request size to 1MB per partition
        .option("startingOffsets", "latest")
        .load()
    Python
    spark = (
        SparkSession.builder.config("failOnDataLoss", "false")
        .appName("kafka_streaming_aggregate")
        .getOrCreate()
    )
    spark.sparkContext.setLogLevel("WARN")
     
    # Configure settings we need to read Kafka.
    if args.ocid is not None:
        jaas_template = 'com.oracle.bmc.auth.sasl.ResourcePrincipalsLoginModule required intent="streamPoolId:{ocid}";'
        args.auth_type = "OCI-RSA-SHA256"
    else:
        jaas_template = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="{username}" password="{password}";'
     
    # For the raw source stream.
    raw_kafka_options = {
        "kafka.sasl.jaas.config": jaas_template.format(
            username=args.stream_username, password=args.stream_password, ocid=args.ocid
        ),
        "kafka.sasl.mechanism": args.auth_type,
        "kafka.security.protocol": args.encryption,
        "kafka.bootstrap.servers": "{}:{}".format(
            args.bootstrap_server, args.bootstrap_port
        ),
        "group.id": args.raw_stream,
        "subscribe": args.raw_stream,
    }
     
    # The actual reader.
    raw = spark.readStream.format("kafka").options(**raw_kafka_options).load()
    Remarque

    Afin d'utiliser Python avec le principal de ressource pour la transmission en continu Oracle Cloud Infrastructure, vous devez utiliser archive.zip. Pour plus d'informations, reportez-vous à Fonctionnalité spark-submit dans Data Flow.
Exemple d'application Java

Exemple d'application Java pour Data Flow.

package example;

import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
import org.apache.spark.sql.catalyst.encoders.RowEncoder;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.Trigger;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StringType$;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.StructType$;
import org.apache.spark.sql.types.TimestampType$;

import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.concurrent.TimeoutException;

public class StructuredKafkaWordCount {

  public static void main(String[] args) throws Exception {

    Logger log = LogManager.getLogger(StructuredKafkaWordCount.class);
    log.info("Started StructuredKafkaWordCount");

    Thread.setDefaultUncaughtExceptionHandler((thread, e) -> {
      log.error("Exception uncaught: ", e);
    });

    //Uncomment following line to enable debug log level.
    //Logger.getRootLogger().setLevel(Level.DEBUG);

    if (args.length < 4) {
      printUsage();
    }

    String bootstrapServers = args[0];
    String topics = args[1];
    String checkpointLocation = args[2];
    String type = args[3];
    String outputLocation = null;

    switch (type) {
      case "console":
        System.err.println("Using console output sink");
        break;

      case "csv":
        if (args.length < 5) {
          printUsage();
        }
        outputLocation = args[4];
        System.err.println("Using csv output sink, output location = " + outputLocation);
        break;

      default:
        printUsage();
    }

    SparkSession spark;

    SparkConf conf = new SparkConf();
    if (conf.contains("spark.master")) {
      spark = SparkSession.builder()
          .appName("StructuredKafkaWordCount")
          .config("spark.sql.streaming.minBatchesToRetain", "10")
          .config("spark.sql.shuffle.partitions", "1")
          .config("spark.sql.streaming.stateStore.maintenanceInterval", "300")
          .getOrCreate();
    } else {
      spark = SparkSession.builder()
          .appName("StructuredKafkaWordCount")
          .master("local[*]")
          .config("spark.sql.streaming.minBatchesToRetain", "10")
          .config("spark.sql.shuffle.partitions", "1")
          .config("spark.sql.streaming.stateStore.maintenanceInterval", "300")
          .getOrCreate();
    }

    // Create DataFrame representing the stream of input lines from Kafka
    Dataset<Row> lines = spark
        .readStream()
        .format("kafka")
        .option("kafka.bootstrap.servers", bootstrapServers)
        .option("subscribe", topics)
        .option("kafka.security.protocol", "SASL_SSL")
        .option("kafka.sasl.mechanism", "OCI-RSA-SHA256")
        .option("kafka.sasl.jaas.config",
            "com.oracle.bmc.auth.sasl.ResourcePrincipalsLoginModule required intent=\"streamPoolId:ocid1.streampool.oc1.phx.amaaaaaaep4fdfaartcuoi5y72mkrcg7hzogcx2jnygbmgik3hqwssxqa6pq\";")
        .option("kafka.max.partition.fetch.bytes", 1024 * 1024) // limit request size to 1MB per partition
        .option("startingOffsets", "latest")
        .load()
        .selectExpr("CAST(value AS STRING)");

    // Split the lines into timestamp and words
    StructType wordsSchema = StructType$.MODULE$.apply(
        new StructField[]{
            StructField.apply("timestamp", TimestampType$.MODULE$, true, Metadata.empty()),
            StructField.apply("value", StringType$.MODULE$, true, Metadata.empty())
        }
    );
    ExpressionEncoder<Row> encoder = RowEncoder.apply(wordsSchema);
    final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss");

    Dataset<Row> words = lines
        .flatMap(
            (FlatMapFunction<Row, Row>) row -> {
              // parse Kafka record in format: "timestamp(iso8601) text"
              String text = row.getString(0);
              String timestampString = text.substring(0, 25);
              String message = text.substring(26);
              Timestamp timestamp = new Timestamp(dateFormat.parse(timestampString).getTime());

              return Arrays.asList(message.split(" ")).stream()
                  .map(word -> RowFactory.create(timestamp, word)).iterator();
            }
            , encoder);

    // Time window aggregation
    Dataset<Row> wordCounts = words
        .withWatermark("timestamp", "1 minutes")
        .groupBy(
            functions.window(functions.col("timestamp"), "1 minutes", "1 minutes"),
            functions.col("value")
        )
        .count()
        .selectExpr("CAST(window.start AS timestamp) AS START_TIME",
            "CAST(window.end AS timestamp) AS END_TIME",
            "value AS WORD", "CAST(count AS long) AS COUNT");

    wordCounts.printSchema();

    // Reducing to a single partition
    wordCounts = wordCounts.coalesce(1);

    // Start streaming query
    StreamingQuery query = null;
    switch (type) {
      case "console":
        query = outputToConsole(wordCounts, checkpointLocation);
        break;
      case "csv":
        query = outputToCsv(wordCounts, checkpointLocation, outputLocation);
        break;
      default:
        System.err.println("Unknown type " + type);
        System.exit(1);
    }

    query.awaitTermination();
  }

  private static void printUsage() {
    System.err.println("Usage: StructuredKafkaWordCount <bootstrap-servers> " +
        "<subscribe-topics> <checkpoint-location> <type> ...");
    System.err.println("<type>: console");
    System.err.println("<type>: csv <output-location>");
    System.err.println("<type>: adw <wallet-path> <wallet-password> <tns-name>");
    System.exit(1);
  }

  private static StreamingQuery outputToConsole(Dataset<Row> wordCounts, String checkpointLocation)
      throws TimeoutException {
    return wordCounts
        .writeStream()
        .format("console")
        .outputMode("complete")
        .option("checkpointLocation", checkpointLocation)
        .start();
  }

  private static StreamingQuery outputToCsv(Dataset<Row> wordCounts, String checkpointLocation,
      String outputLocation) throws TimeoutException {
    return wordCounts
        .writeStream()
        .format("csv")
        .outputMode("append")
        .option("checkpointLocation", checkpointLocation)
        .trigger(Trigger.ProcessingTime("1 minutes"))
        .option("path", outputLocation)
        .start();
  }
}
Exemple d'application Python

Exemple d'application Python pour Data Flow.

#!/usr/bin/env python3
 
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.functions import concat, col, current_timestamp, lit, window, \
  substring, to_timestamp, explode, split, length
 
import argparse
import os
 
 
def main():
  parser = argparse.ArgumentParser()
  parser.add_argument('--auth-type', default='PLAIN')
  parser.add_argument('--bootstrap-port', default='9092')
  parser.add_argument('--bootstrap-server')
  parser.add_argument('--checkpoint-location')
  parser.add_argument('--encryption', default='SASL_SSL')
  parser.add_argument('--ocid')
  parser.add_argument('--output-location')
  parser.add_argument('--output-mode', default='file')
  parser.add_argument('--stream-password')
  parser.add_argument('--raw-stream')
  parser.add_argument('--stream-username')
  args = parser.parse_args()
 
  if args.bootstrap_server is None:
    args.bootstrap_server = os.environ.get('BOOTSTRAP_SERVER')
  if args.raw_stream is None:
    args.raw_stream = os.environ.get('RAW_STREAM')
  if args.stream_username is None:
    args.stream_username = os.environ.get('STREAM_USERNAME')
  if args.stream_password is None:
    args.stream_password = os.environ.get('STREAM_PASSWORD')
 
  assert args.bootstrap_server is not None, "Kafka bootstrap server (--bootstrap-server) name must be set"
  assert args.checkpoint_location is not None, "Checkpoint location (--checkpoint-location) must be set"
  assert args.output_location is not None, "Output location (--output-location) must be set"
  assert args.raw_stream is not None, "Kafka topic (--raw-stream) name must be set"
 
  spark = (
    SparkSession.builder
      .appName('StructuredKafkaWordCount')
      .config('failOnDataLoss', 'true')
      .config('spark.sql.streaming.minBatchesToRetain', '10')
      .config('spark.sql.shuffle.partitions', '1')
      .config('spark.sql.streaming.stateStore.maintenanceInterval', '300')
      .getOrCreate()
  )
 
  # Uncomment following line to enable debug log level.
  # spark.sparkContext.setLogLevel('DEBUG')
 
  # Configure Kafka connection settings.
  if args.ocid is not None:
    jaas_template = 'com.oracle.bmc.auth.sasl.ResourcePrincipalsLoginModule required intent="streamPoolId:{ocid}";'
    args.auth_type = 'OCI-RSA-SHA256'
  else:
    jaas_template = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="{username}" password="{password}";'
 
  raw_kafka_options = {
    'kafka.sasl.jaas.config': jaas_template.format(
      username=args.stream_username, password=args.stream_password,
      ocid=args.ocid
    ),
    'kafka.sasl.mechanism': args.auth_type,
    'kafka.security.protocol': args.encryption,
    'kafka.bootstrap.servers': '{}:{}'.format(args.bootstrap_server,
                                              args.bootstrap_port),
    'subscribe': args.raw_stream,
    'kafka.max.partition.fetch.bytes': 1024 * 1024,
    'startingOffsets': 'latest'
  }
 
  # Reading raw Kafka stream.
  raw = spark.readStream.format('kafka').options(**raw_kafka_options).load()
 
  # Cast raw lines to a string.
  lines = raw.selectExpr('CAST(value AS STRING)')
 
  # Split value column into timestamp and words columns.
  parsedLines = (
    lines.select(
      to_timestamp(substring('value', 1, 25))
        .alias('timestamp'),
      lines.value.substr(lit(26), length('value') - 25)
        .alias('words'))
  )
 
  # Split words into array and explode single record into multiple.
  words = (
    parsedLines.select(
      col('timestamp'),
      explode(split('words', ' ')).alias('word')
    )
  )
 
  # Do time window aggregation
  wordCounts = (
    words
      .withWatermark('timestamp', '1 minutes')
      .groupBy('word', window('timestamp', '1 minute'))
      .count()
      .selectExpr('CAST(window.start AS timestamp) AS START_TIME',
                  'CAST(window.end AS timestamp) AS END_TIME',
                  'word AS WORD',
                  'CAST(count AS long) AS COUNT')
  )
 
  # Reduce partitions to a single.
  wordCounts = wordCounts.coalesce(1)
 
  wordCounts.printSchema()
 
  # Output it to the chosen channel.
  if args.output_mode == 'console':
    print("Writing aggregates to console")
    query = (
      wordCounts.writeStream
        .option('checkpointLocation', args.checkpoint_location)
        .outputMode('complete')
        .format('console')
        .option('truncate', False)
        .start()
    )
  else:
    print("Writing aggregates to Object Storage")
    query = (
      wordCounts.writeStream
        .format('csv')
        .outputMode('append')
        .trigger(processingTime='1 minutes')
        .option('checkpointLocation', args.checkpoint_location)
        .option('path', args.output_location)
        .start()
    )
 
  query.awaitTermination()
 
 
main()

Connexion à une source de transmission en continu dans un sous-réseau privé

Suivez ces étapes pour vous connecter à une source de transmission en continu dans un sous-réseau privé.

Copiez le nom de domaine qualifié complet de source de transmission en continu pour autoriser le trafic entre les cartes d'interface réseau virtuelles au sein du sous-réseau privé utilisé pour créer l'adresse privée Data Flow. Si la source de transmission en continu se trouve dans un sous-réseau différent de l'adresse privée Data Flow, autorisez le trafic entre le sous-réseau Streaming et le sous-réseau de l'adresse privée Data Flow.

  1. Créez un pool de transmission en continu avec une adresse privée.
    Pour plus d'informations, reportez-vous à la documentation Streaming.
  2. Visualisez les détails du pool de flux de données et copiez la valeur de FDQN.
  3. Modifiez l'adresse privée et remplacez la valeur des zones DNS à contrôler par la valeur du FDQN de pool de flux de données que vous avez copié à l'étape précédente.
  4. Attachez l'adresse privée à l'application de transmission en continu.
  5. Exécutez l'application.