Introduction à la diffusion en continu Spark

Pour pouvoir utiliser la diffusion en continu Spark avec le service de flux de données, vous devez la configurer.

Apache Spark unifie le traitement par lots, le traitement de flux et l'apprentissage automatique dans une seule API. Le service de flux de données exécute les applications Spark dans un temps d'exécution Apache Spark standard. Lorsque vous exécutez une application de diffusion en continu, le service de flux de données n'utilise pas un environnement d'exécution différent, mais il exécute l'application Spark différemment :
Différences entre les exécutions avec et sans diffusion en continu
Différence Exécution sans diffusion en continu Exécution avec diffusion en continu
Authentification Utilise un jeton OBO (au nom de) l'utilisateur demandeur. Les jetons OBO expirent au bout de 24 heures, ce qui n'est pas adapté aux tâches de longue durée. Permet d'accéder à Oracle Cloud Infrastructure Object Storage à l'aide de jetons de session liés au principal de ressource de l'exécution. Cette approche convient aux tâches de longue durée.
Politique de redémarrage Échoue si le code de sortie d'exécution Spark est différent de zéro. Redémarre jusqu'à dix fois si le code de sortie d'exécution Spark est différent de zéro.
Politique d'application de correctifs Aucune politique d'application de correctifs, car les tâches sont censées durer moins de 24 heures. Correctifs mensuels automatiques.
  1. Créer une application de diffusion en continu Spark.
    Lorsque l'application est exécutée, elle utilise l'authentification à l'aide du principal de ressource, l'application automatique de correctifs et le redémarrage automatique.
  2. Configuration d'une politique pour la diffusion en continu Spark
    Comme vos applications de diffusion en continu Spark utilisent des jetons de session de principal de ressource pour s'authentifier auprès des ressources Oracle Cloud Infrastructure, vous devez créer des politiques IAM autorisant vos applications avant qu'elles puissent accéder à ces ressources. Les exécutions de flux de données sont démarrées sur demande afin que vous ne puissiez pas utiliser l'OCID d'exécution dans votre politique IAM, car il n'est pas affecté avant le démarrage de l'exécution. À la place, connectez les ressources de l'exécution à une ressource permanente et référencez-la dans votre politique GIA. Les deux méthodes les plus courantes sont les suivantes :
    ID application parent
    Connectez l'exécution de flux de données à l'application de flux de données qui l'a créée et placez l'ID application de flux de données dans la politique IAM. Pour définir les autorisations d'une application particulière, créez un groupe dynamique correspondant à toutes les exécutions démarrées à partir de l'application et autorisez le groupe dynamique à accéder aux ressources IAM. Chaque exécution comprend un marqueur qui l'associe à son application parent. Vous pouvez utiliser ce marqueur dans une règle de correspondance de groupe dynamique.
    Note

    Ce marqueur ne peut pas être utilisé dans une politique "any-user" IAM, vous devez créer un groupe dynamique.
    Par exemple, pour une application de flux de données portant l'ID ocid1.dataflowapplication.oc1.iad.A, créez un groupe dynamique :
    ALL {resource.type='dataflowrun', tag.oci-dataflow.application-id.value='ocid1.dataflowapplication.oc1.iad.A'}
    Avec les politiques suivantes :
    allow dynamic-group <dynamic_group_name> to manage objects in tenancy where all {
     target.bucket.name='<bucket_name>'
    }
    allow dynamic-group <dynamic_group_name> to {STREAM_INSPECT, STREAM_READ, STREAM_CONSUME} in tenancy where all {
     target.streampool.id='<streampool_id>'
    }
    ID compartiment cible

    Connectez l'exécution de flux de données au compartiment où les exécutions sont créées et placez l'ID compartiment dans la politique IAM. Cette approche est moins spécifique, car toute application Spark exécutée dans le compartiment peut accéder à ces ressources. Si vous prévoyez d'utiliser spark-submit au moyen de l'interface CLI, vous devez utiliser cette approche car l'ID application et l'ID exécution sont disponibles sur demande.

    Par exemple, pour une exécution portant l'ID ocid1.dataflowrun.oc1.iad.R2 dans un compartiment portant l'ID ocid1.tenancy.oc1.C, créez les politiques suivantes :
    allow any-user to manage objects in tenancy where all {
     request.principal.type='dataflowrun',
     request.compartment.id='ocid1.tenancy.oc1.C',
     target.bucket.name='<bucket_name>'
    }
    allow any-user to {STREAM_INSPECT, STREAM_READ, STREAM_CONSUME} in tenancy where all {
     request.principal.type='dataflowrun',
     request.compartment.id='ocid1.tenancy.oc1.C',
     target.streampool.id='<streampool_id>'
    }

Connexion à Oracle Cloud Infrastructure Streaming

Voyez comment vous connecter à Oracle Cloud Infrastructure Streaming.

Configurer la diffusion en continu :
  • Configurez le service de diffusion en continu Oracle et créez un flux.
    Note

    Les limites suivantes s'appliquent au service de diffusion en continu Oracle :
    • Les messages d'un flux sont conservés pendant au moins 24 heures, et au plus sept jours.
    • Tous les messages d'un flux sont supprimés à la fin de la période de conservation, qu'ils aient été lus ou non.
    • La période de conservation d'un flux ne peut pas être modifiée après la création du flux.
    • Une location présente une limite par défaut de zéro ou cinq partitions selon votre licence. Si vous avez besoin de partitions supplémentaires, vous pouvez demander une augmentation de limite de service.
    • Le nombre de partitions pour un flux ne peut pas être modifié après la création du flux.
    • Un flux peut prendre en charge jusqu'à 50 groupes de consommateurs lisant le flux.
    • Le taux total d'écriture de données de chaque partition est de 1 Mo par seconde. Le nombre de demandes PUT n'est pas limité, à condition que la limite d'écriture de données ne soit pas dépassée.
    • Le nombre de demandes GET par seconde par groupe de consommateurs pour chaque partition est de cinq. Comme un flux peut prendre en charge jusqu'à 50 groupes de consommateurs, et qu'une seule partition dans un flux peut être lue par un seul consommateur d'un groupe de consommateurs, une partition peut prendre en charge jusqu'à 250 demandes GET par seconde.
    • Les producteurs peuvent publier un message de 1 Mo au maximum dans un flux.
    • La taille d'une demande ne peut pas dépasser 1 Mo. La taille d'une demande est la somme de ses clés et messages après leur décodage à partir de Base64.
  • Ajoutez des politiques de diffusion en continu au service de flux de données.
Connexion à Kafka à l'aide de Java ou de Python. Authentifiez-vous de l'une des deux façons suivantes :
  • Utilisez un mot de passe brut ou un jeton d'authentification. Cette méthode convient aux tests rapides inter-environnements. Par exemple, le prototypage d'applications de diffusion en continu structurée Spark, pour une exécution locale dans le service de flux de données sur le service de diffusion en continu Oracle.
    Note

    Le codage permanent, ou exposition, du mot de passe dans les arguments de l'application n'est pas considéré comme sécurisé. N'utilisez donc pas cette méthode pour les exécutions de production.
  • L'authentification avec le principal de ressource est plus sécurisée que le mot de passe brut ou le jeton d'authentification. Il s'agit d'un moyen plus flexible de vous authentifier auprès du service de diffusion en continu Oracle. Configurez des politiques de diffusion en continu pour utiliser l'authentification avec le principal de ressource.

Un exemple d'application Java et un exemple d'application Python sont disponibles.

  1. Recherchez le groupe de flux à utiliser pour vous connecter à Kafka.
    1. Sélectionnez Accueil.
    2. Sélectionnez Flux.
    3. Sélectionner des groupes de flux.
    4. Sélectionnez le groupe de flux voulu pour en voir les détails.
    5. Sélectionnez Paramètres de connexion Kafka.
    6. Copiez les informations suivantes :
      • OCID du groupe de flux
      • Serveur d'amorçage
      • Chaîne de connexion
      • Protocole de sécurité, par exemple SASL_SSL
      • Mécanisme de sécurité, par exemple PLAIN
      Note

      Si le mot de passe dans la chaîne de connexion est réglé à AUTH_TOKEN, créez un jeton d'authentification ou utilisez un jeton existant (password="<auth_token>") pour le nom d'utilisateur spécifié (username="<tenancy>/<username>/<stream_pool_id>") :
      1. Sélectionnez Identité.
      2. Sélectionnez Utilisateurs.
      3. Affichez les détails de votre utilisateur.
      4. Créez un jeton d'authentification ou utilisez un jeton existant.
  2. Spark n'est pas lié aux bibliothèques d'intégration Kafka par défaut. Vous devez donc l'ajouter dans les dépendances d'application Spark.
    • Pour les applications Java ou Scala utilisant des définitions de projet SBT ou Maven, liez votre application à cet artefact :
      groupId = org.apache.spark
      artifactId = spark-sql-kafka-0-10_2.12
      version = 3.0.2
      Note

      Pour que la fonctionnalité d'en-tête puisse être utilisée, la version du client Kafka doit être au moins 0.11.0.0.
    • Pour les applications Python, ajoutez les bibliothèques d'intégration et les dépendances Kafka lors du déploiement de votre application.
    • Si vous utilisez l'authentification avec le principal de ressource pour le service de flux de données :
      groupId = com.oracle.oci.sdk
      artifactId = oci-java-sdk-addons-sasl
      version = 1.36.1
  3. Configurez le système.
    Le comportement des connexions Kafka est contrôlé par la configuration du système, par exemple, les serveurs, l'authentification, le sujet, les groupes, etc. La configuration est puissante; une seule modification de valeur a un effet important sur l'ensemble du système.
    Configuration commune
    subscribe = <Kafka_topic_to_consume>
    kafka.max.partition.fetch.bytes = <Fetch_rate_limit>
    startingOffsets = <Start_point_of_the_first_run_of_the_application> 
    failOnDataLoss = <Failure_streaming_application>
    Pour plus d'informations sur la limite de taux d'extraction, voir Limites pour les ressources de diffusion en continu.
    Note

    Les redémarrages ultérieurs se poursuivent à partir du dernier point de vérification, et non à partir de l'emplacement spécifié dans startingOffsets. Pour d'autres options, voir le guide d'intégration de la diffusion en continu structurée avec Kafka (courtier Kafka version 0.10.0 ou ultérieure).

    failOnDataLoss indique l'application de flux à utiliser lorsqu'il est impossible d'extraire les données, car elles ont été supprimées du service de flux Oracle.

    Configuration avancée

    Voir le guide d'intégration de la diffusion en continu Spark avec Kafka.

    Exemples de configuration
    Mot de passe brut :
    kafka.bootstrap.servers = <bootstrap_server_name>:<port_number>
    kafka.security.protocol = SASL_SSL
    kafka.sasl.mechanism = PLAIN
    kafka.sasl.jaas.config = org.apache.kafka.common.security.plain.PlainLoginModule required username="<tenancy_name>/<username>/<streampool_ocid>" password="<example-password>";
    Principal de ressource :
    kafka.bootstrap.servers = <bootstrap_server_name>:<port_number>
    kafka.security.protocol = SASL_SSL
    kafka.sasl.mechanism = OCI-RSA-SHA256
    kafka.sasl.jaas.config = com.oracle.bmc.auth.sasl.ResourcePrincipalsLoginModule required intent="streamPoolId:<streampool_ocid>";
  4. Connectez-vous à Kafka.
    Exemples de connexion.
    Java avec principal de ressource pour le service de diffusion en continu pour Oracle Cloud Infrastructure
    // Create DataFrame representing the stream of input lines from Kafka
    Dataset<Row> lines = spark
        .readStream()
        .format("kafka")
        .option("kafka.bootstrap.servers", bootstrapServers)
        .option("subscribe", topics)
        .option("kafka.security.protocol", "SASL_SSL")
        .option("kafka.sasl.mechanism", "OCI-RSA-SHA256")
        .option("kafka.sasl.jaas.config", "com.oracle.bmc.auth.sasl.ResourcePrincipalsLoginModule required intent=\"streamPoolId:<streampool_ocid>\";")
        .option("kafka.max.partition.fetch.bytes", 1024 * 1024) // limit request size to 1MB per partition
        .option("startingOffsets", "latest")
    Java avec mot de passe brut
    // Create DataFrame representing the stream of input lines from Kafka
    Dataset<Row> lines = spark
        .readStream()
        .format("kafka")
        .option("kafka.bootstrap.servers", bootstrapServers)
        .option("subscribe", topics)
        .option("kafka.security.protocol", "SASL_SSL")
        .option("kafka.sasl.mechanism", "PLAIN")
        .option("kafka.sasl.jaas.config",
            "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<tenancy_name>/<username>/<streampool_ocid>" password=\"<example-password> \";")
        .option("kafka.max.partition.fetch.bytes", 1024 * 1024) // limit request size to 1MB per partition
        .option("startingOffsets", "latest")
        .load()
    Python
    spark = (
        SparkSession.builder.config("failOnDataLoss", "false")
        .appName("kafka_streaming_aggregate")
        .getOrCreate()
    )
    spark.sparkContext.setLogLevel("WARN")
     
    # Configure settings we need to read Kafka.
    if args.ocid is not None:
        jaas_template = 'com.oracle.bmc.auth.sasl.ResourcePrincipalsLoginModule required intent="streamPoolId:{ocid}";'
        args.auth_type = "OCI-RSA-SHA256"
    else:
        jaas_template = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="{username}" password="{password}";'
     
    # For the raw source stream.
    raw_kafka_options = {
        "kafka.sasl.jaas.config": jaas_template.format(
            username=args.stream_username, password=args.stream_password, ocid=args.ocid
        ),
        "kafka.sasl.mechanism": args.auth_type,
        "kafka.security.protocol": args.encryption,
        "kafka.bootstrap.servers": "{}:{}".format(
            args.bootstrap_server, args.bootstrap_port
        ),
        "group.id": args.raw_stream,
        "subscribe": args.raw_stream,
    }
     
    # The actual reader.
    raw = spark.readStream.format("kafka").options(**raw_kafka_options).load()
    Note

    Pour utiliser Python avec un principal de ressource pour le service de diffusion en continu pour Oracle Cloud Infrastructure, vous devez utiliser archive.zip. Pour plus d'informations, voir la section Fonctionnalité spark-submit dans le service de flux de données.
Exemple d'application Java

Voici un exemple d'application Java pour le service de flux de données.

package example;

import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
import org.apache.spark.sql.catalyst.encoders.RowEncoder;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.Trigger;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StringType$;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.StructType$;
import org.apache.spark.sql.types.TimestampType$;

import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.concurrent.TimeoutException;

public class StructuredKafkaWordCount {

  public static void main(String[] args) throws Exception {

    Logger log = LogManager.getLogger(StructuredKafkaWordCount.class);
    log.info("Started StructuredKafkaWordCount");

    Thread.setDefaultUncaughtExceptionHandler((thread, e) -> {
      log.error("Exception uncaught: ", e);
    });

    //Uncomment following line to enable debug log level.
    //Logger.getRootLogger().setLevel(Level.DEBUG);

    if (args.length < 4) {
      printUsage();
    }

    String bootstrapServers = args[0];
    String topics = args[1];
    String checkpointLocation = args[2];
    String type = args[3];
    String outputLocation = null;

    switch (type) {
      case "console":
        System.err.println("Using console output sink");
        break;

      case "csv":
        if (args.length < 5) {
          printUsage();
        }
        outputLocation = args[4];
        System.err.println("Using csv output sink, output location = " + outputLocation);
        break;

      default:
        printUsage();
    }

    SparkSession spark;

    SparkConf conf = new SparkConf();
    if (conf.contains("spark.master")) {
      spark = SparkSession.builder()
          .appName("StructuredKafkaWordCount")
          .config("spark.sql.streaming.minBatchesToRetain", "10")
          .config("spark.sql.shuffle.partitions", "1")
          .config("spark.sql.streaming.stateStore.maintenanceInterval", "300")
          .getOrCreate();
    } else {
      spark = SparkSession.builder()
          .appName("StructuredKafkaWordCount")
          .master("local[*]")
          .config("spark.sql.streaming.minBatchesToRetain", "10")
          .config("spark.sql.shuffle.partitions", "1")
          .config("spark.sql.streaming.stateStore.maintenanceInterval", "300")
          .getOrCreate();
    }

    // Create DataFrame representing the stream of input lines from Kafka
    Dataset<Row> lines = spark
        .readStream()
        .format("kafka")
        .option("kafka.bootstrap.servers", bootstrapServers)
        .option("subscribe", topics)
        .option("kafka.security.protocol", "SASL_SSL")
        .option("kafka.sasl.mechanism", "OCI-RSA-SHA256")
        .option("kafka.sasl.jaas.config",
            "com.oracle.bmc.auth.sasl.ResourcePrincipalsLoginModule required intent=\"streamPoolId:ocid1.streampool.oc1.phx.amaaaaaaep4fdfaartcuoi5y72mkrcg7hzogcx2jnygbmgik3hqwssxqa6pq\";")
        .option("kafka.max.partition.fetch.bytes", 1024 * 1024) // limit request size to 1MB per partition
        .option("startingOffsets", "latest")
        .load()
        .selectExpr("CAST(value AS STRING)");

    // Split the lines into timestamp and words
    StructType wordsSchema = StructType$.MODULE$.apply(
        new StructField[]{
            StructField.apply("timestamp", TimestampType$.MODULE$, true, Metadata.empty()),
            StructField.apply("value", StringType$.MODULE$, true, Metadata.empty())
        }
    );
    ExpressionEncoder<Row> encoder = RowEncoder.apply(wordsSchema);
    final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss");

    Dataset<Row> words = lines
        .flatMap(
            (FlatMapFunction<Row, Row>) row -> {
              // parse Kafka record in format: "timestamp(iso8601) text"
              String text = row.getString(0);
              String timestampString = text.substring(0, 25);
              String message = text.substring(26);
              Timestamp timestamp = new Timestamp(dateFormat.parse(timestampString).getTime());

              return Arrays.asList(message.split(" ")).stream()
                  .map(word -> RowFactory.create(timestamp, word)).iterator();
            }
            , encoder);

    // Time window aggregation
    Dataset<Row> wordCounts = words
        .withWatermark("timestamp", "1 minutes")
        .groupBy(
            functions.window(functions.col("timestamp"), "1 minutes", "1 minutes"),
            functions.col("value")
        )
        .count()
        .selectExpr("CAST(window.start AS timestamp) AS START_TIME",
            "CAST(window.end AS timestamp) AS END_TIME",
            "value AS WORD", "CAST(count AS long) AS COUNT");

    wordCounts.printSchema();

    // Reducing to a single partition
    wordCounts = wordCounts.coalesce(1);

    // Start streaming query
    StreamingQuery query = null;
    switch (type) {
      case "console":
        query = outputToConsole(wordCounts, checkpointLocation);
        break;
      case "csv":
        query = outputToCsv(wordCounts, checkpointLocation, outputLocation);
        break;
      default:
        System.err.println("Unknown type " + type);
        System.exit(1);
    }

    query.awaitTermination();
  }

  private static void printUsage() {
    System.err.println("Usage: StructuredKafkaWordCount <bootstrap-servers> " +
        "<subscribe-topics> <checkpoint-location> <type> ...");
    System.err.println("<type>: console");
    System.err.println("<type>: csv <output-location>");
    System.err.println("<type>: adw <wallet-path> <wallet-password> <tns-name>");
    System.exit(1);
  }

  private static StreamingQuery outputToConsole(Dataset<Row> wordCounts, String checkpointLocation)
      throws TimeoutException {
    return wordCounts
        .writeStream()
        .format("console")
        .outputMode("complete")
        .option("checkpointLocation", checkpointLocation)
        .start();
  }

  private static StreamingQuery outputToCsv(Dataset<Row> wordCounts, String checkpointLocation,
      String outputLocation) throws TimeoutException {
    return wordCounts
        .writeStream()
        .format("csv")
        .outputMode("append")
        .option("checkpointLocation", checkpointLocation)
        .trigger(Trigger.ProcessingTime("1 minutes"))
        .option("path", outputLocation)
        .start();
  }
}
Exemple d'application Python

Voici un exemple d'application Python pour le service de flux de données.

#!/usr/bin/env python3
 
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.functions import concat, col, current_timestamp, lit, window, \
  substring, to_timestamp, explode, split, length
 
import argparse
import os
 
 
def main():
  parser = argparse.ArgumentParser()
  parser.add_argument('--auth-type', default='PLAIN')
  parser.add_argument('--bootstrap-port', default='9092')
  parser.add_argument('--bootstrap-server')
  parser.add_argument('--checkpoint-location')
  parser.add_argument('--encryption', default='SASL_SSL')
  parser.add_argument('--ocid')
  parser.add_argument('--output-location')
  parser.add_argument('--output-mode', default='file')
  parser.add_argument('--stream-password')
  parser.add_argument('--raw-stream')
  parser.add_argument('--stream-username')
  args = parser.parse_args()
 
  if args.bootstrap_server is None:
    args.bootstrap_server = os.environ.get('BOOTSTRAP_SERVER')
  if args.raw_stream is None:
    args.raw_stream = os.environ.get('RAW_STREAM')
  if args.stream_username is None:
    args.stream_username = os.environ.get('STREAM_USERNAME')
  if args.stream_password is None:
    args.stream_password = os.environ.get('STREAM_PASSWORD')
 
  assert args.bootstrap_server is not None, "Kafka bootstrap server (--bootstrap-server) name must be set"
  assert args.checkpoint_location is not None, "Checkpoint location (--checkpoint-location) must be set"
  assert args.output_location is not None, "Output location (--output-location) must be set"
  assert args.raw_stream is not None, "Kafka topic (--raw-stream) name must be set"
 
  spark = (
    SparkSession.builder
      .appName('StructuredKafkaWordCount')
      .config('failOnDataLoss', 'true')
      .config('spark.sql.streaming.minBatchesToRetain', '10')
      .config('spark.sql.shuffle.partitions', '1')
      .config('spark.sql.streaming.stateStore.maintenanceInterval', '300')
      .getOrCreate()
  )
 
  # Uncomment following line to enable debug log level.
  # spark.sparkContext.setLogLevel('DEBUG')
 
  # Configure Kafka connection settings.
  if args.ocid is not None:
    jaas_template = 'com.oracle.bmc.auth.sasl.ResourcePrincipalsLoginModule required intent="streamPoolId:{ocid}";'
    args.auth_type = 'OCI-RSA-SHA256'
  else:
    jaas_template = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="{username}" password="{password}";'
 
  raw_kafka_options = {
    'kafka.sasl.jaas.config': jaas_template.format(
      username=args.stream_username, password=args.stream_password,
      ocid=args.ocid
    ),
    'kafka.sasl.mechanism': args.auth_type,
    'kafka.security.protocol': args.encryption,
    'kafka.bootstrap.servers': '{}:{}'.format(args.bootstrap_server,
                                              args.bootstrap_port),
    'subscribe': args.raw_stream,
    'kafka.max.partition.fetch.bytes': 1024 * 1024,
    'startingOffsets': 'latest'
  }
 
  # Reading raw Kafka stream.
  raw = spark.readStream.format('kafka').options(**raw_kafka_options).load()
 
  # Cast raw lines to a string.
  lines = raw.selectExpr('CAST(value AS STRING)')
 
  # Split value column into timestamp and words columns.
  parsedLines = (
    lines.select(
      to_timestamp(substring('value', 1, 25))
        .alias('timestamp'),
      lines.value.substr(lit(26), length('value') - 25)
        .alias('words'))
  )
 
  # Split words into array and explode single record into multiple.
  words = (
    parsedLines.select(
      col('timestamp'),
      explode(split('words', ' ')).alias('word')
    )
  )
 
  # Do time window aggregation
  wordCounts = (
    words
      .withWatermark('timestamp', '1 minutes')
      .groupBy('word', window('timestamp', '1 minute'))
      .count()
      .selectExpr('CAST(window.start AS timestamp) AS START_TIME',
                  'CAST(window.end AS timestamp) AS END_TIME',
                  'word AS WORD',
                  'CAST(count AS long) AS COUNT')
  )
 
  # Reduce partitions to a single.
  wordCounts = wordCounts.coalesce(1)
 
  wordCounts.printSchema()
 
  # Output it to the chosen channel.
  if args.output_mode == 'console':
    print("Writing aggregates to console")
    query = (
      wordCounts.writeStream
        .option('checkpointLocation', args.checkpoint_location)
        .outputMode('complete')
        .format('console')
        .option('truncate', False)
        .start()
    )
  else:
    print("Writing aggregates to Object Storage")
    query = (
      wordCounts.writeStream
        .format('csv')
        .outputMode('append')
        .trigger(processingTime='1 minutes')
        .option('checkpointLocation', args.checkpoint_location)
        .option('path', args.output_location)
        .start()
    )
 
  query.awaitTermination()
 
 
main()

Connexion à une source de diffusion en continu dans un sous-réseau privé

Suivez ces étapes pour vous connecter à une source de diffusion en continu dans un sous-réseau privé.

Copiez le nom de domaine complet de la source de diffusion en continu pour autoriser le trafic entre les cartes vNIC du sous-réseau privé utilisé pour créer le point d'extrémité privé du service de flux de données. Si la source de diffusion en continu se trouve dans un sous-réseau différent du point d'extrémité privé du service de flux de données, autorisez le trafic entre le sous-réseau de diffusion en continu et le sous-réseau de point d'extrémité privé du service de flux de données.

  1. Créez un groupe de diffusion en continu avec un point d'extrémité privé.
    Pour plus d'informations, voir la documentation sur le service de diffusion en continu.
  2. Consultez les détails du groupe de flux et copiez la valeur du nom de domaine complet.
  3. Modifiez le point d'extrémité privé et remplacez la valeur des zones DNS à contrôler par la valeur du nom de domaine complet du groupe de flux que vous avez copié à l'étape précédente.
  4. Attachez le point d'extrémité privé à l'application de diffusion en continu.
  5. Exécutez l'application.