Introduzione allo streaming Spark
Prima di poter utilizzare lo streaming Spark con Data Flow, è necessario impostarlo.
Elemento differente | Esecuzione non in streaming | Esecuzione streaming |
---|---|---|
Autenticazione | Utilizza un token OBO (On-Behalf-Of) dell'utente richiedente. I token OBO scadono dopo 24 ore, pertanto non sono adatti per lavori con tempi di esecuzione lunghi. | Accedi a Oracle Cloud Infrastructure Object Storage utilizzando i token di sessione legati al principal delle risorse dell'esecuzione. È adatto per lavori di lunga durata. |
Riavvia criterio | Non riesce se il codice di uscita del runtime Spark è diverso da zero. | Riavvia fino a dieci volte se il codice di uscita del runtime Spark è diverso da zero. |
Criterio patch | Nessun criterio di applicazione delle patch poiché si prevede che i job durino meno di 24 ore. | Patch mensili automatiche. |
Connessione a Oracle Cloud Infrastructure Streaming
Scopri come connetterti a Oracle Cloud Infrastructure Streaming.
- Impostare Oracle Streaming Service e creare un flusso.Nota
Oracle Streaming Service ha i limiti seguenti:- I messaggi in un flusso vengono conservati per non meno di 24 ore e non più di sette giorni.
- Tutti i messaggi in un flusso vengono eliminati dopo la scadenza del periodo di conservazione, indipendentemente dal fatto che siano stati letti o meno.
- Il periodo di conservazione di un vapore non può essere modificato dopo la creazione del flusso.
- Una tenancy ha un limite predefinito pari a zero o cinque partizioni, a seconda della licenza in uso. Se sono necessarie più partizioni, è possibile richiedere un aumento del limite del servizio.
- Il numero di partizioni per un flusso non può essere modificato dopo la creazione del flusso.
- Un singolo flusso può supportare fino a 50 gruppi di consumatori che leggono da esso.
- Ogni partizione ha una scrittura totale di dati di 1 MB al secondo. Non è previsto alcun limite al numero di richieste PUT, a condizione che il limite di scrittura dei dati non venga superato.
- Ogni partizione dispone di cinque richieste GET al secondo per gruppo di consumer. Poiché un singolo flusso può supportare fino a 50 gruppi di consumer e una singola partizione in un flusso può essere letta da un solo consumer in un gruppo di consumer, una partizione può supportare fino a 250 richieste GET al secondo.
- I produttori possono pubblicare un messaggio non superiore a 1 MB in un flusso.
- Una richiesta non può superare 1 MB. La dimensione di una richiesta è la somma delle chiavi e dei messaggi dopo che sono stati decodificati da Base64.
- Aggiungere criteri di streaming a Flusso dati.
- Utilizzare una password o un token di autenticazione non codificati. Questo metodo è adatto per test rapidi in ambiente incrociato. Ad esempio, la creazione di prototipi di applicazioni di streaming strutturate Spark, in cui si desidera eseguire localmente e su Data Flow su Oracle Streaming Service. Nota
L'hardcoding o l'esposizione della password negli argomenti dell'applicazione non sono considerati sicuri, pertanto non utilizzare questo metodo per i cicli di produzione. - L'autenticazione del principal risorsa è più sicura della password semplice o del token di autenticazione. È un modo più flessibile per eseguire l'autenticazione con il servizio di streaming Oracle. Impostare i criteri di streaming per utilizzare l'autenticazione del principal risorsa.
Sono disponibili un'applicazione di esempio Java e un'applicazione di esempio Python.
Questa è un'applicazione Java di esempio per Data Flow.
package example;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
import org.apache.spark.sql.catalyst.encoders.RowEncoder;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.Trigger;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StringType$;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.StructType$;
import org.apache.spark.sql.types.TimestampType$;
import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.concurrent.TimeoutException;
public class StructuredKafkaWordCount {
public static void main(String[] args) throws Exception {
Logger log = LogManager.getLogger(StructuredKafkaWordCount.class);
log.info("Started StructuredKafkaWordCount");
Thread.setDefaultUncaughtExceptionHandler((thread, e) -> {
log.error("Exception uncaught: ", e);
});
//Uncomment following line to enable debug log level.
//Logger.getRootLogger().setLevel(Level.DEBUG);
if (args.length < 4) {
printUsage();
}
String bootstrapServers = args[0];
String topics = args[1];
String checkpointLocation = args[2];
String type = args[3];
String outputLocation = null;
switch (type) {
case "console":
System.err.println("Using console output sink");
break;
case "csv":
if (args.length < 5) {
printUsage();
}
outputLocation = args[4];
System.err.println("Using csv output sink, output location = " + outputLocation);
break;
default:
printUsage();
}
SparkSession spark;
SparkConf conf = new SparkConf();
if (conf.contains("spark.master")) {
spark = SparkSession.builder()
.appName("StructuredKafkaWordCount")
.config("spark.sql.streaming.minBatchesToRetain", "10")
.config("spark.sql.shuffle.partitions", "1")
.config("spark.sql.streaming.stateStore.maintenanceInterval", "300")
.getOrCreate();
} else {
spark = SparkSession.builder()
.appName("StructuredKafkaWordCount")
.master("local[*]")
.config("spark.sql.streaming.minBatchesToRetain", "10")
.config("spark.sql.shuffle.partitions", "1")
.config("spark.sql.streaming.stateStore.maintenanceInterval", "300")
.getOrCreate();
}
// Create DataFrame representing the stream of input lines from Kafka
Dataset<Row> lines = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option("subscribe", topics)
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism", "OCI-RSA-SHA256")
.option("kafka.sasl.jaas.config",
"com.oracle.bmc.auth.sasl.ResourcePrincipalsLoginModule required intent=\"streamPoolId:ocid1.streampool.oc1.phx.amaaaaaaep4fdfaartcuoi5y72mkrcg7hzogcx2jnygbmgik3hqwssxqa6pq\";")
.option("kafka.max.partition.fetch.bytes", 1024 * 1024) // limit request size to 1MB per partition
.option("startingOffsets", "latest")
.load()
.selectExpr("CAST(value AS STRING)");
// Split the lines into timestamp and words
StructType wordsSchema = StructType$.MODULE$.apply(
new StructField[]{
StructField.apply("timestamp", TimestampType$.MODULE$, true, Metadata.empty()),
StructField.apply("value", StringType$.MODULE$, true, Metadata.empty())
}
);
ExpressionEncoder<Row> encoder = RowEncoder.apply(wordsSchema);
final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss");
Dataset<Row> words = lines
.flatMap(
(FlatMapFunction<Row, Row>) row -> {
// parse Kafka record in format: "timestamp(iso8601) text"
String text = row.getString(0);
String timestampString = text.substring(0, 25);
String message = text.substring(26);
Timestamp timestamp = new Timestamp(dateFormat.parse(timestampString).getTime());
return Arrays.asList(message.split(" ")).stream()
.map(word -> RowFactory.create(timestamp, word)).iterator();
}
, encoder);
// Time window aggregation
Dataset<Row> wordCounts = words
.withWatermark("timestamp", "1 minutes")
.groupBy(
functions.window(functions.col("timestamp"), "1 minutes", "1 minutes"),
functions.col("value")
)
.count()
.selectExpr("CAST(window.start AS timestamp) AS START_TIME",
"CAST(window.end AS timestamp) AS END_TIME",
"value AS WORD", "CAST(count AS long) AS COUNT");
wordCounts.printSchema();
// Reducing to a single partition
wordCounts = wordCounts.coalesce(1);
// Start streaming query
StreamingQuery query = null;
switch (type) {
case "console":
query = outputToConsole(wordCounts, checkpointLocation);
break;
case "csv":
query = outputToCsv(wordCounts, checkpointLocation, outputLocation);
break;
default:
System.err.println("Unknown type " + type);
System.exit(1);
}
query.awaitTermination();
}
private static void printUsage() {
System.err.println("Usage: StructuredKafkaWordCount <bootstrap-servers> " +
"<subscribe-topics> <checkpoint-location> <type> ...");
System.err.println("<type>: console");
System.err.println("<type>: csv <output-location>");
System.err.println("<type>: adw <wallet-path> <wallet-password> <tns-name>");
System.exit(1);
}
private static StreamingQuery outputToConsole(Dataset<Row> wordCounts, String checkpointLocation)
throws TimeoutException {
return wordCounts
.writeStream()
.format("console")
.outputMode("complete")
.option("checkpointLocation", checkpointLocation)
.start();
}
private static StreamingQuery outputToCsv(Dataset<Row> wordCounts, String checkpointLocation,
String outputLocation) throws TimeoutException {
return wordCounts
.writeStream()
.format("csv")
.outputMode("append")
.option("checkpointLocation", checkpointLocation)
.trigger(Trigger.ProcessingTime("1 minutes"))
.option("path", outputLocation)
.start();
}
}
Si tratta di un'applicazione Python di esempio per Data Flow.
#!/usr/bin/env python3
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.functions import concat, col, current_timestamp, lit, window, \
substring, to_timestamp, explode, split, length
import argparse
import os
def main():
parser = argparse.ArgumentParser()
parser.add_argument('--auth-type', default='PLAIN')
parser.add_argument('--bootstrap-port', default='9092')
parser.add_argument('--bootstrap-server')
parser.add_argument('--checkpoint-location')
parser.add_argument('--encryption', default='SASL_SSL')
parser.add_argument('--ocid')
parser.add_argument('--output-location')
parser.add_argument('--output-mode', default='file')
parser.add_argument('--stream-password')
parser.add_argument('--raw-stream')
parser.add_argument('--stream-username')
args = parser.parse_args()
if args.bootstrap_server is None:
args.bootstrap_server = os.environ.get('BOOTSTRAP_SERVER')
if args.raw_stream is None:
args.raw_stream = os.environ.get('RAW_STREAM')
if args.stream_username is None:
args.stream_username = os.environ.get('STREAM_USERNAME')
if args.stream_password is None:
args.stream_password = os.environ.get('STREAM_PASSWORD')
assert args.bootstrap_server is not None, "Kafka bootstrap server (--bootstrap-server) name must be set"
assert args.checkpoint_location is not None, "Checkpoint location (--checkpoint-location) must be set"
assert args.output_location is not None, "Output location (--output-location) must be set"
assert args.raw_stream is not None, "Kafka topic (--raw-stream) name must be set"
spark = (
SparkSession.builder
.appName('StructuredKafkaWordCount')
.config('failOnDataLoss', 'true')
.config('spark.sql.streaming.minBatchesToRetain', '10')
.config('spark.sql.shuffle.partitions', '1')
.config('spark.sql.streaming.stateStore.maintenanceInterval', '300')
.getOrCreate()
)
# Uncomment following line to enable debug log level.
# spark.sparkContext.setLogLevel('DEBUG')
# Configure Kafka connection settings.
if args.ocid is not None:
jaas_template = 'com.oracle.bmc.auth.sasl.ResourcePrincipalsLoginModule required intent="streamPoolId:{ocid}";'
args.auth_type = 'OCI-RSA-SHA256'
else:
jaas_template = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="{username}" password="{password}";'
raw_kafka_options = {
'kafka.sasl.jaas.config': jaas_template.format(
username=args.stream_username, password=args.stream_password,
ocid=args.ocid
),
'kafka.sasl.mechanism': args.auth_type,
'kafka.security.protocol': args.encryption,
'kafka.bootstrap.servers': '{}:{}'.format(args.bootstrap_server,
args.bootstrap_port),
'subscribe': args.raw_stream,
'kafka.max.partition.fetch.bytes': 1024 * 1024,
'startingOffsets': 'latest'
}
# Reading raw Kafka stream.
raw = spark.readStream.format('kafka').options(**raw_kafka_options).load()
# Cast raw lines to a string.
lines = raw.selectExpr('CAST(value AS STRING)')
# Split value column into timestamp and words columns.
parsedLines = (
lines.select(
to_timestamp(substring('value', 1, 25))
.alias('timestamp'),
lines.value.substr(lit(26), length('value') - 25)
.alias('words'))
)
# Split words into array and explode single record into multiple.
words = (
parsedLines.select(
col('timestamp'),
explode(split('words', ' ')).alias('word')
)
)
# Do time window aggregation
wordCounts = (
words
.withWatermark('timestamp', '1 minutes')
.groupBy('word', window('timestamp', '1 minute'))
.count()
.selectExpr('CAST(window.start AS timestamp) AS START_TIME',
'CAST(window.end AS timestamp) AS END_TIME',
'word AS WORD',
'CAST(count AS long) AS COUNT')
)
# Reduce partitions to a single.
wordCounts = wordCounts.coalesce(1)
wordCounts.printSchema()
# Output it to the chosen channel.
if args.output_mode == 'console':
print("Writing aggregates to console")
query = (
wordCounts.writeStream
.option('checkpointLocation', args.checkpoint_location)
.outputMode('complete')
.format('console')
.option('truncate', False)
.start()
)
else:
print("Writing aggregates to Object Storage")
query = (
wordCounts.writeStream
.format('csv')
.outputMode('append')
.trigger(processingTime='1 minutes')
.option('checkpointLocation', args.checkpoint_location)
.option('path', args.output_location)
.start()
)
query.awaitTermination()
main()
Connessione a un'origine di streaming in una subnet privata
Attenersi alla procedura riportata di seguito per connettersi a un'origine di streaming in una subnet privata.
-
Creare un endpoint privato se non ne esiste già uno.
-
Impostare Oracle Streaming Service e creare un flusso come descritto nella sezione Connessione a Oracle Cloud Infrastructure Streaming
Copiare l'FDQN di origine streaming per consentire il traffico tra le VNIC all'interno della subnet privata utilizzata per creare l'endpoint privato del flusso di dati. Se l'origine di streaming si trova in una subnet diversa dall'endpoint privato del flusso di dati, consenti traffico tra la subnet di streaming e la subnet dell'endpoint privato del flusso di dati.