Conceitos Básicos do Streaming do Spark

Para poder usar o Spark Streaming com o serviço Data Flow, configure-o.

O Apache Spark unifica o Processamento em Batch, Processamento de Streams e o Aprendizado de Máquina em uma API. O serviço Data Flow executa aplicativos Spark em um runtime padrão do Apache Spark. Quando você executa um Aplicativo de streaming, o serviço Data Flow não usa outro runtime; em vez disso, ele executa o aplicativo Spark de outra forma:
Diferenças entre execuções de streaming e não streaming
Qual é Diferente Execução de Não Streaming Execução do Streaming
Autenticação Usa um token OBO (On-Behalf-Of) do usuário solicitante. Os tokens OBO expiram após 24 horas, portanto, isso não é adequado para jobs de longa execução. Acessa o Oracle Cloud Infrastructure Object Storage usando tokens de sessão vinculados ao Controlador de Recursos da Execução. É adequado para jobs de longa duração.
Reiniciar Política Falha se o código de saída de runtime do Spark não for zero. Reinicia até dez vezes se o código de saída de runtime do Spark não for zero.
Política de Patch Nenhuma política de aplicação de patches, pois espera-se que os jobs durem menos de 24 horas. Patches mensais automáticos.
  1. Crie um Aplicativo Spark Streaming.
    Quando o aplicativo é executado, ele usa autenticação do Controlador de Recursos, aplicação automática de patches e reinicialização automática.
  2. Configurando uma Política para o Serviço Spark Streaming
    Como seus Aplicativos Spark Streaming usam os tokens de sessão do Resource Principal para autenticação nos recursos do Oracle Cloud Infrastructure, você deve criar políticas do serviço IAM que autorizem seus aplicativos para que eles possam acessar esses recursos. As Execuções do Serviço Data Flow são iniciadas sob demanda; portanto, você não pode usar o OCID de Execução em sua política do serviço IAM, porque ele não será alocado até que a Execução seja iniciada. Em vez disso, conecte os recursos da Execução com um recurso permanente e faça referência a ele na sua política do serviço IAM. As duas formas mais comuns de fazer isso são:
    ID do Aplicativo Principal
    Conecte a Execução do serviço Data Flow ao Aplicativo do serviço Data Flow que o criou e coloque o ID do Aplicativo do serviço Data Flow na Política do serviço IAM. Para definir permissões para um Aplicativo específico, crie um grupo dinâmico que corresponda a todas as Execuções iniciadas do Aplicativo e autorize o Grupo Dinâmico a acessar recursos do serviço IAM. Cada Execução inclui uma tag que a associa ao Aplicativo principal. Você pode usar essa tag em uma regra de correspondência de Grupo Dinâmico.
    Observação

    Esta tag não pode ser usada em uma política "any-user" do serviço IAM. Crie um Grupo Dinâmico.
    Por exemplo, se você tiver um Aplicativo do serviço Data Flow com o ID ocid1.dataflowapplication.oc1.iad.A, crie um grupo dinâmico:
    ALL {resource.type='dataflowrun', tag.oci-dataflow.application-id.value='ocid1.dataflowapplication.oc1.iad.A'}
    com as seguintes políticas:
    allow dynamic-group <dynamic_group_name> to manage objects in tenancy where all {
     target.bucket.name='<bucket_name>'
    }
    allow dynamic-group <dynamic_group_name> to {STREAM_INSPECT, STREAM_READ, STREAM_CONSUME} in tenancy where all {
     target.streampool.id='<streampool_id>'
    }
    ID do Compartimento de Destino

    Conecte a Execução do Serviço Data Flow com o Compartimento no qual as Execuções são criadas e coloque o ID do Compartimento na Política do Serviço IAM. Essa abordagem é menos específica porque qualquer aplicativo Spark executado no Compartimento obtém acesso a esses recursos. Se você pretende usar o script spark-submit via CLI, use essa abordagem porque o ID do Aplicativo e o ID da Execução são sob demanda.

    Por exemplo, se você tiver uma Execução com o ID ocid1.dataflowrun.oc1.iad.R2 em um compartimento com o ID ocid1.tenancy.oc1.C, terá as seguintes políticas:
    allow any-user to manage objects in tenancy where all {
     request.principal.type='dataflowrun',
     request.compartment.id='ocid1.tenancy.oc1.C',
     target.bucket.name='<bucket_name>'
    }
    allow any-user to {STREAM_INSPECT, STREAM_READ, STREAM_CONSUME} in tenancy where all {
     request.principal.type='dataflowrun',
     request.compartment.id='ocid1.tenancy.oc1.C',
     target.streampool.id='<streampool_id>'
    }

Conexão com o Oracle Cloud Infrastructure Streaming

Saiba como estabelecer conexão com o Oracle Cloud Infrastructure Streaming.

Configure o streaming:
  • Configure o Serviço Streaming da Oracle e crie um stream.
    Observação

    O Serviço Streaming da Oracle tem os limites a seguir:
    • As mensagens em um stream são mantidas por no mínimo 24 horas e no máximo sete dias.
    • Todas as mensagens em um stream são excluídas após a expiração do período de retenção, sejam elas lidas ou não.
    • O período de retenção de um stream não pode ser alterado após a criação do stream.
    • Uma tenancy tem um limite padrão de zero ou cinco partições, dependendo da sua licença. Se você precisar de mais partições, poderá solicitar um aumento do limite de serviço.
    • O número de partições de um stream não pode ser alterado após a criação do stream.
    • Um único stream pode suportar até 50 grupos de consumidores lendo o stream.
    • Cada partição tem um total de gravação de dados de 1 MB por segundo. Não há limite para o número de solicitações PUT, desde que o limite de gravação de dados não seja excedido.
    • Cada partição tem cinco solicitações GET por segundo por grupo de consumidores. Como um único stream pode suportar até 50 grupos de consumidores, e uma única partição em um stream pode ser lida por apenas um consumidor em um grupo de consumidores, uma partição pode suportar até 250 solicitações GET por segundo.
    • Os produtores podem publicar uma mensagem de no máximo 1 MB em um stream.
    • Uma solicitação não pode ter mais de 1 MB. O tamanho de uma solicitação é a soma de suas chaves e mensagens após elas terem sido decodificadas do Base64.
  • Adicione políticas de streaming ao serviço Data Flow.
Conecte-se com o Kafka usando Java ou Python. Autentique-se de uma das duas seguintes maneiras:
  • Use uma senha simples ou um token de autenticação. Esse método é adequado para testes rápidos entre ambientes. Por exemplo, o protótipo de aplicativos de streaming estruturados Spark, no qual você deseja executar localmente e no serviço Data Flow no Serviço Streaming da Oracle.
    Observação

    A codificação ou a exposição da senha nos argumentos do aplicativo não é considerada segura, portanto, não use esse método para execuções de produção.
  • A autenticação do controlador de recursos é mais segura do que a senha simples ou o token de autenticação. É uma maneira mais flexível de fazer a autenticação com o Serviço Streaming da Oracle. Configure políticas de streaming para usar a autenticação do controlador de recursos.

Um aplicativo de amostra Java e um aplicativo de amostra Python estão disponíveis.

  1. Localize o pool de streams que você deseja usar para estabelecer conexão com o Kafka.
    1. Selecione Home.
    2. Selecione Transmissão.
    3. SelecionarPools de Streams.
    4. Selecione o pool de streams que deseja usar para ver seus detalhes.
    5. Selecione Definições de Conexão do Kafka.
    6. Copie as seguintes informações:
      • OCID do pool de streams
      • Servidor de bootstrap
      • String de conexão
      • Protocolo de segurança, por exemplo, SASL_SSL
      • Mecanismo de segurança, por exemplo, PLAIN
      Observação

      Se a senha na string de conexão for definida como AUTH_TOKEN, crie um token de autenticação ou use um existente (password="<auth_token>") para o usuário especificado no nome de usuário (username="<tenancy>/<username>/<stream_pool_id>":
      1. Selecione Identidade.
      2. Selecionar Usuários
      3. Para o seu usuário, exiba os detalhes do usuário.
      4. Crie um token de autenticação ou use um existente.
  2. O Spark não se vincula às bibliotecas de integração do Kafka por padrão; portanto, você deve adicioná-lo como parte das dependências do aplicativo Spark.
    • Para aplicativos Java ou Scala usando definições de projeto SBT ou Maven, vincule seu aplicativo a este artefato:
      groupId = org.apache.spark
      artifactId = spark-sql-kafka-0-10_2.12
      version = 3.0.2
      Observação

      Para usar a funcionalidade de cabeçalhos, a versão do seu cliente Kafka deve ser pelo menos 0.11.0.0.
    • Para aplicativos Python, adicione as bibliotecas de integração e dependências do Kafka ao implantar seu aplicativo.
    • Se você usar a autenticação do Controlador de Recursos do serviço Data Flow, precisará deste artefato:
      groupId = com.oracle.oci.sdk
      artifactId = oci-java-sdk-addons-sasl
      version = 1.36.1
  3. Configure o sistema.
    O comportamento das conexões do Kafka é controlado pela configuração do sistema, por exemplo, servidores, autenticação, tópico, grupos e assim por diante. A configuração é poderosa, com uma única mudança de valor tendo um grande efeito em todo o sistema.
    Configuração Comum
    subscribe = <Kafka_topic_to_consume>
    kafka.max.partition.fetch.bytes = <Fetch_rate_limit>
    startingOffsets = <Start_point_of_the_first_run_of_the_application> 
    failOnDataLoss = <Failure_streaming_application>
    Para obter mais informações sobre o limite de taxa de extração, consulte Limites de Recursos do Serviço Streaming.
    Observação

    As reinicializações posteriores continuam do último checkpoint, e não do local especificado em startingOffsets. Para outras opções, consulte Structured Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher).

    failOnDataLoss especifica o aplicativo de streaming a ser usado quando os dados não puderem ser extraídos porque foram removidos do Oracle Streaming.

    Configuração Avançada

    Consulte o Spark Streaming Kafka Integration Guide.

    Configurações de Exemplo
    Senha simples:
    kafka.bootstrap.servers = <bootstrap_server_name>:<port_number>
    kafka.security.protocol = SASL_SSL
    kafka.sasl.mechanism = PLAIN
    kafka.sasl.jaas.config = org.apache.kafka.common.security.plain.PlainLoginModule required username="<tenancy_name>/<username>/<streampool_ocid>" password="<example-password>";
    Controlador de recursos:
    kafka.bootstrap.servers = <bootstrap_server_name>:<port_number>
    kafka.security.protocol = SASL_SSL
    kafka.sasl.mechanism = OCI-RSA-SHA256
    kafka.sasl.jaas.config = com.oracle.bmc.auth.sasl.ResourcePrincipalsLoginModule required intent="streamPoolId:<streampool_ocid>";
  4. Estabeleça conexão com o Kafka.
    Exemplo de conexões.
    Java com controlador de recursos para streaming do Oracle Cloud Infrastructure
    // Create DataFrame representing the stream of input lines from Kafka
    Dataset<Row> lines = spark
        .readStream()
        .format("kafka")
        .option("kafka.bootstrap.servers", bootstrapServers)
        .option("subscribe", topics)
        .option("kafka.security.protocol", "SASL_SSL")
        .option("kafka.sasl.mechanism", "OCI-RSA-SHA256")
        .option("kafka.sasl.jaas.config", "com.oracle.bmc.auth.sasl.ResourcePrincipalsLoginModule required intent=\"streamPoolId:<streampool_ocid>\";")
        .option("kafka.max.partition.fetch.bytes", 1024 * 1024) // limit request size to 1MB per partition
        .option("startingOffsets", "latest")
    Java com uma senha simples
    // Create DataFrame representing the stream of input lines from Kafka
    Dataset<Row> lines = spark
        .readStream()
        .format("kafka")
        .option("kafka.bootstrap.servers", bootstrapServers)
        .option("subscribe", topics)
        .option("kafka.security.protocol", "SASL_SSL")
        .option("kafka.sasl.mechanism", "PLAIN")
        .option("kafka.sasl.jaas.config",
            "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<tenancy_name>/<username>/<streampool_ocid>" password=\"<example-password> \";")
        .option("kafka.max.partition.fetch.bytes", 1024 * 1024) // limit request size to 1MB per partition
        .option("startingOffsets", "latest")
        .load()
    Python
    spark = (
        SparkSession.builder.config("failOnDataLoss", "false")
        .appName("kafka_streaming_aggregate")
        .getOrCreate()
    )
    spark.sparkContext.setLogLevel("WARN")
     
    # Configure settings we need to read Kafka.
    if args.ocid is not None:
        jaas_template = 'com.oracle.bmc.auth.sasl.ResourcePrincipalsLoginModule required intent="streamPoolId:{ocid}";'
        args.auth_type = "OCI-RSA-SHA256"
    else:
        jaas_template = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="{username}" password="{password}";'
     
    # For the raw source stream.
    raw_kafka_options = {
        "kafka.sasl.jaas.config": jaas_template.format(
            username=args.stream_username, password=args.stream_password, ocid=args.ocid
        ),
        "kafka.sasl.mechanism": args.auth_type,
        "kafka.security.protocol": args.encryption,
        "kafka.bootstrap.servers": "{}:{}".format(
            args.bootstrap_server, args.bootstrap_port
        ),
        "group.id": args.raw_stream,
        "subscribe": args.raw_stream,
    }
     
    # The actual reader.
    raw = spark.readStream.format("kafka").options(**raw_kafka_options).load()
    Observação

    Para usar o Python com o controlador de recursos para streaming do Oracle Cloud Infrastructure, use archive.zip. Mais informações estão disponíveis na seção em Funcionalidade Spark-Submit no Serviço Data Flow.
Amostra de Aplicativo Java

Este é um aplicativo Java de amostra para o serviço Data Flow.

package example;

import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
import org.apache.spark.sql.catalyst.encoders.RowEncoder;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.Trigger;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StringType$;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.StructType$;
import org.apache.spark.sql.types.TimestampType$;

import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.concurrent.TimeoutException;

public class StructuredKafkaWordCount {

  public static void main(String[] args) throws Exception {

    Logger log = LogManager.getLogger(StructuredKafkaWordCount.class);
    log.info("Started StructuredKafkaWordCount");

    Thread.setDefaultUncaughtExceptionHandler((thread, e) -> {
      log.error("Exception uncaught: ", e);
    });

    //Uncomment following line to enable debug log level.
    //Logger.getRootLogger().setLevel(Level.DEBUG);

    if (args.length < 4) {
      printUsage();
    }

    String bootstrapServers = args[0];
    String topics = args[1];
    String checkpointLocation = args[2];
    String type = args[3];
    String outputLocation = null;

    switch (type) {
      case "console":
        System.err.println("Using console output sink");
        break;

      case "csv":
        if (args.length < 5) {
          printUsage();
        }
        outputLocation = args[4];
        System.err.println("Using csv output sink, output location = " + outputLocation);
        break;

      default:
        printUsage();
    }

    SparkSession spark;

    SparkConf conf = new SparkConf();
    if (conf.contains("spark.master")) {
      spark = SparkSession.builder()
          .appName("StructuredKafkaWordCount")
          .config("spark.sql.streaming.minBatchesToRetain", "10")
          .config("spark.sql.shuffle.partitions", "1")
          .config("spark.sql.streaming.stateStore.maintenanceInterval", "300")
          .getOrCreate();
    } else {
      spark = SparkSession.builder()
          .appName("StructuredKafkaWordCount")
          .master("local[*]")
          .config("spark.sql.streaming.minBatchesToRetain", "10")
          .config("spark.sql.shuffle.partitions", "1")
          .config("spark.sql.streaming.stateStore.maintenanceInterval", "300")
          .getOrCreate();
    }

    // Create DataFrame representing the stream of input lines from Kafka
    Dataset<Row> lines = spark
        .readStream()
        .format("kafka")
        .option("kafka.bootstrap.servers", bootstrapServers)
        .option("subscribe", topics)
        .option("kafka.security.protocol", "SASL_SSL")
        .option("kafka.sasl.mechanism", "OCI-RSA-SHA256")
        .option("kafka.sasl.jaas.config",
            "com.oracle.bmc.auth.sasl.ResourcePrincipalsLoginModule required intent=\"streamPoolId:ocid1.streampool.oc1.phx.amaaaaaaep4fdfaartcuoi5y72mkrcg7hzogcx2jnygbmgik3hqwssxqa6pq\";")
        .option("kafka.max.partition.fetch.bytes", 1024 * 1024) // limit request size to 1MB per partition
        .option("startingOffsets", "latest")
        .load()
        .selectExpr("CAST(value AS STRING)");

    // Split the lines into timestamp and words
    StructType wordsSchema = StructType$.MODULE$.apply(
        new StructField[]{
            StructField.apply("timestamp", TimestampType$.MODULE$, true, Metadata.empty()),
            StructField.apply("value", StringType$.MODULE$, true, Metadata.empty())
        }
    );
    ExpressionEncoder<Row> encoder = RowEncoder.apply(wordsSchema);
    final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss");

    Dataset<Row> words = lines
        .flatMap(
            (FlatMapFunction<Row, Row>) row -> {
              // parse Kafka record in format: "timestamp(iso8601) text"
              String text = row.getString(0);
              String timestampString = text.substring(0, 25);
              String message = text.substring(26);
              Timestamp timestamp = new Timestamp(dateFormat.parse(timestampString).getTime());

              return Arrays.asList(message.split(" ")).stream()
                  .map(word -> RowFactory.create(timestamp, word)).iterator();
            }
            , encoder);

    // Time window aggregation
    Dataset<Row> wordCounts = words
        .withWatermark("timestamp", "1 minutes")
        .groupBy(
            functions.window(functions.col("timestamp"), "1 minutes", "1 minutes"),
            functions.col("value")
        )
        .count()
        .selectExpr("CAST(window.start AS timestamp) AS START_TIME",
            "CAST(window.end AS timestamp) AS END_TIME",
            "value AS WORD", "CAST(count AS long) AS COUNT");

    wordCounts.printSchema();

    // Reducing to a single partition
    wordCounts = wordCounts.coalesce(1);

    // Start streaming query
    StreamingQuery query = null;
    switch (type) {
      case "console":
        query = outputToConsole(wordCounts, checkpointLocation);
        break;
      case "csv":
        query = outputToCsv(wordCounts, checkpointLocation, outputLocation);
        break;
      default:
        System.err.println("Unknown type " + type);
        System.exit(1);
    }

    query.awaitTermination();
  }

  private static void printUsage() {
    System.err.println("Usage: StructuredKafkaWordCount <bootstrap-servers> " +
        "<subscribe-topics> <checkpoint-location> <type> ...");
    System.err.println("<type>: console");
    System.err.println("<type>: csv <output-location>");
    System.err.println("<type>: adw <wallet-path> <wallet-password> <tns-name>");
    System.exit(1);
  }

  private static StreamingQuery outputToConsole(Dataset<Row> wordCounts, String checkpointLocation)
      throws TimeoutException {
    return wordCounts
        .writeStream()
        .format("console")
        .outputMode("complete")
        .option("checkpointLocation", checkpointLocation)
        .start();
  }

  private static StreamingQuery outputToCsv(Dataset<Row> wordCounts, String checkpointLocation,
      String outputLocation) throws TimeoutException {
    return wordCounts
        .writeStream()
        .format("csv")
        .outputMode("append")
        .option("checkpointLocation", checkpointLocation)
        .trigger(Trigger.ProcessingTime("1 minutes"))
        .option("path", outputLocation)
        .start();
  }
}
Exemplo de Aplicativo Python

Este é um aplicativo Python de amostra para o serviço Data Flow.

#!/usr/bin/env python3
 
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.functions import concat, col, current_timestamp, lit, window, \
  substring, to_timestamp, explode, split, length
 
import argparse
import os
 
 
def main():
  parser = argparse.ArgumentParser()
  parser.add_argument('--auth-type', default='PLAIN')
  parser.add_argument('--bootstrap-port', default='9092')
  parser.add_argument('--bootstrap-server')
  parser.add_argument('--checkpoint-location')
  parser.add_argument('--encryption', default='SASL_SSL')
  parser.add_argument('--ocid')
  parser.add_argument('--output-location')
  parser.add_argument('--output-mode', default='file')
  parser.add_argument('--stream-password')
  parser.add_argument('--raw-stream')
  parser.add_argument('--stream-username')
  args = parser.parse_args()
 
  if args.bootstrap_server is None:
    args.bootstrap_server = os.environ.get('BOOTSTRAP_SERVER')
  if args.raw_stream is None:
    args.raw_stream = os.environ.get('RAW_STREAM')
  if args.stream_username is None:
    args.stream_username = os.environ.get('STREAM_USERNAME')
  if args.stream_password is None:
    args.stream_password = os.environ.get('STREAM_PASSWORD')
 
  assert args.bootstrap_server is not None, "Kafka bootstrap server (--bootstrap-server) name must be set"
  assert args.checkpoint_location is not None, "Checkpoint location (--checkpoint-location) must be set"
  assert args.output_location is not None, "Output location (--output-location) must be set"
  assert args.raw_stream is not None, "Kafka topic (--raw-stream) name must be set"
 
  spark = (
    SparkSession.builder
      .appName('StructuredKafkaWordCount')
      .config('failOnDataLoss', 'true')
      .config('spark.sql.streaming.minBatchesToRetain', '10')
      .config('spark.sql.shuffle.partitions', '1')
      .config('spark.sql.streaming.stateStore.maintenanceInterval', '300')
      .getOrCreate()
  )
 
  # Uncomment following line to enable debug log level.
  # spark.sparkContext.setLogLevel('DEBUG')
 
  # Configure Kafka connection settings.
  if args.ocid is not None:
    jaas_template = 'com.oracle.bmc.auth.sasl.ResourcePrincipalsLoginModule required intent="streamPoolId:{ocid}";'
    args.auth_type = 'OCI-RSA-SHA256'
  else:
    jaas_template = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="{username}" password="{password}";'
 
  raw_kafka_options = {
    'kafka.sasl.jaas.config': jaas_template.format(
      username=args.stream_username, password=args.stream_password,
      ocid=args.ocid
    ),
    'kafka.sasl.mechanism': args.auth_type,
    'kafka.security.protocol': args.encryption,
    'kafka.bootstrap.servers': '{}:{}'.format(args.bootstrap_server,
                                              args.bootstrap_port),
    'subscribe': args.raw_stream,
    'kafka.max.partition.fetch.bytes': 1024 * 1024,
    'startingOffsets': 'latest'
  }
 
  # Reading raw Kafka stream.
  raw = spark.readStream.format('kafka').options(**raw_kafka_options).load()
 
  # Cast raw lines to a string.
  lines = raw.selectExpr('CAST(value AS STRING)')
 
  # Split value column into timestamp and words columns.
  parsedLines = (
    lines.select(
      to_timestamp(substring('value', 1, 25))
        .alias('timestamp'),
      lines.value.substr(lit(26), length('value') - 25)
        .alias('words'))
  )
 
  # Split words into array and explode single record into multiple.
  words = (
    parsedLines.select(
      col('timestamp'),
      explode(split('words', ' ')).alias('word')
    )
  )
 
  # Do time window aggregation
  wordCounts = (
    words
      .withWatermark('timestamp', '1 minutes')
      .groupBy('word', window('timestamp', '1 minute'))
      .count()
      .selectExpr('CAST(window.start AS timestamp) AS START_TIME',
                  'CAST(window.end AS timestamp) AS END_TIME',
                  'word AS WORD',
                  'CAST(count AS long) AS COUNT')
  )
 
  # Reduce partitions to a single.
  wordCounts = wordCounts.coalesce(1)
 
  wordCounts.printSchema()
 
  # Output it to the chosen channel.
  if args.output_mode == 'console':
    print("Writing aggregates to console")
    query = (
      wordCounts.writeStream
        .option('checkpointLocation', args.checkpoint_location)
        .outputMode('complete')
        .format('console')
        .option('truncate', False)
        .start()
    )
  else:
    print("Writing aggregates to Object Storage")
    query = (
      wordCounts.writeStream
        .format('csv')
        .outputMode('append')
        .trigger(processingTime='1 minutes')
        .option('checkpointLocation', args.checkpoint_location)
        .option('path', args.output_location)
        .start()
    )
 
  query.awaitTermination()
 
 
main()

Estabelecendo Conexão com uma Origem de Streaming em uma Sub-rede Privada

Siga estas etapas para estabelecer conexão com uma origem de streaming em uma sub-rede privada.

Copie o FDQN da origem de streaming para permitir o tráfego entre VNICs dentro da sub-rede privada usada para criar o Ponto Final Privado do Serviço Data Flow. Se a origem de streaming estiver em uma sub-rede diferente do Ponto Final Privado do Serviço Data Flow, permita o tráfego entre a sub-rede do serviço Streaming e a sub-rede do Ponto Final Privado do Serviço Data Flow.

  1. Crie um pool de streaming com um ponto final privado.
    Consulte a documentação do Streaming para obter mais informações.
  2. Exiba os detalhes do pool de streams e copie o valor do FDQN.
  3. Edite o ponto final privado e substitua o valor de zonas DNS a serem controladas pelo valor do FDQN do pool de streams que você copiou na etapa anterior.
  4. Anexe o ponto final privado ao Aplicativo de streaming.
  5. Executar a Aplicação.