Erste Schritte mit Spark Streaming
Bevor Sie Spark-Streaming mit Data Flow verwenden können, müssen Sie es einrichten.
Unterschied | Nicht-Streamingausführung | Streamingausführung |
---|---|---|
Authentifizierung | Verwendet ein On-Behalf-Of-(OBO-)Token des anfordernden Benutzers. OBO-Token laufen nach 24 Stunden ab, sodass diese nicht für Jobs mit langer Ausführungszeit geeignet sind. | Ruft Oracle Cloud Infrastructure Object Storage mit Sessiontoken auf, die mit dem Resource Principal der Ausführung verknüpft sind. Eignet sich für Jobs mit langer Ausführungszeit. |
Neustart-Policy | Ist nicht erfolgreich, wenn der Exitcode der Spark-Laufzeit ungleich Null ist. | Startet bis zu zehn Mal, wenn der Exitcode der Spark-Laufzeit ungleich Null ist. |
Patch-Policy | Keine Patching-Policy, da Jobs voraussichtlich weniger als 24 Stunden dauern. | Automatische monatliche Patches. |
Verbindung zu Oracle Cloud Infrastructure Streaming herstellen
Erfahren Sie, wie Sie eine Verbindung zu Oracle Cloud Infrastructure Streaming herstellen.
- Richten Sie Oracle Streaming Service ein, und erstellen Sie einen Stream.Hinweis
Für Oracle Streaming Service gelten folgende Einschränkungen:- Nachrichten in einem Stream werden mindestens 24 Stunden und höchstens sieben Tage lang aufbewahrt.
- Alle Nachrichten in einem Stream werden nach Ablauf des Aufbewahrungszeitraums gelöscht, unabhängig davon, ob sie gelesen wurden oder nicht.
- Der Aufbewahrungszeitraum eines Streams kann nicht geändert werden, nachdem der Stream erstellt wurde.
- Ein Mandant hat je nach Lizenz ein Standardlimit von null oder fünf Partitionen. Wenn Sie mehr Partitionen benötigen, können Sie eine Erhöhung des Servicelimits beantragen.
- Die Anzahl der Partitionen für einen Stream kann nach dem Erstellen des Streams nicht mehr geändert werden.
- Ein einzelner Stream kann bis zu 50 Consumer-Gruppen unterstützen, die Daten aus dem Stream lesen.
- Jede Partition verfügt über eine Datenschreibrate von insgesamt 1 MB pro Sekunde. Die Anzahl der PUT-Anforderungen ist unbegrenzt, sofern das Datenschreiblimit nicht überschritten wird.
- Jede Partition verfügt über fünf GET-Anforderungen pro Sekunde pro Consumer-Gruppe. Da ein einzelner Stream bis zu 50 Consumer-Gruppen unterstützen kann und eine einzelne Partition in einem Stream nur von einem Consumer in einer Consumer-Gruppe gelesen werden kann, kann eine Partition bis zu 250 GET-Anforderungen pro Sekunde unterstützen.
- Producers können eine Nachricht von maximal 1 MB in einem Stream veröffentlichen.
- Eine Anforderung darf nicht größer als 1 MB sein. Die Größe einer Anforderung entspricht der Summe der zugehörigen Schlüssel und Nachrichten nach der Decodierung aus Base64.
- Fügen Sie Streaming-Policys zu Data Flow hinzu.
- Verwenden Sie ein Klartextkennwort oder ein Authentifizierungstoken. Diese Methode eignet sich für das rasche Testen verschiedener Umgebungen. Beispiel: Prototyping von strukturierten Spark-Streaminganwendungen, in denen Sie Oracle Streaming Service lokal und in Data Flow ausführen können. Hinweis
Die Hartcodierung oder das Anzeigen des Kennworts in Anwendungsargumenten wird nicht als sicher betrachtet. Verwenden Sie diese Methode daher nicht für Produktionsausführungen. - Die Resource-Principal-Authentifizierung ist sicherer als ein Klartextkennwort oder ein Authentifizierungstoken. Die Authentifizierung mit Oracle Streaming Service ist flexibler. Richten Sie Streaming-Policys ein, um die Resource-Principal-Authentifizierung zu verwenden.
Eine Java-Beispielanwendung und eine Python-Beispielanwendung sind verfügbar.
Dies ist eine Java-Beispielanwendung für Data Flow.
package example;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
import org.apache.spark.sql.catalyst.encoders.RowEncoder;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.Trigger;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StringType$;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.StructType$;
import org.apache.spark.sql.types.TimestampType$;
import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.concurrent.TimeoutException;
public class StructuredKafkaWordCount {
public static void main(String[] args) throws Exception {
Logger log = LogManager.getLogger(StructuredKafkaWordCount.class);
log.info("Started StructuredKafkaWordCount");
Thread.setDefaultUncaughtExceptionHandler((thread, e) -> {
log.error("Exception uncaught: ", e);
});
//Uncomment following line to enable debug log level.
//Logger.getRootLogger().setLevel(Level.DEBUG);
if (args.length < 4) {
printUsage();
}
String bootstrapServers = args[0];
String topics = args[1];
String checkpointLocation = args[2];
String type = args[3];
String outputLocation = null;
switch (type) {
case "console":
System.err.println("Using console output sink");
break;
case "csv":
if (args.length < 5) {
printUsage();
}
outputLocation = args[4];
System.err.println("Using csv output sink, output location = " + outputLocation);
break;
default:
printUsage();
}
SparkSession spark;
SparkConf conf = new SparkConf();
if (conf.contains("spark.master")) {
spark = SparkSession.builder()
.appName("StructuredKafkaWordCount")
.config("spark.sql.streaming.minBatchesToRetain", "10")
.config("spark.sql.shuffle.partitions", "1")
.config("spark.sql.streaming.stateStore.maintenanceInterval", "300")
.getOrCreate();
} else {
spark = SparkSession.builder()
.appName("StructuredKafkaWordCount")
.master("local[*]")
.config("spark.sql.streaming.minBatchesToRetain", "10")
.config("spark.sql.shuffle.partitions", "1")
.config("spark.sql.streaming.stateStore.maintenanceInterval", "300")
.getOrCreate();
}
// Create DataFrame representing the stream of input lines from Kafka
Dataset<Row> lines = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option("subscribe", topics)
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism", "OCI-RSA-SHA256")
.option("kafka.sasl.jaas.config",
"com.oracle.bmc.auth.sasl.ResourcePrincipalsLoginModule required intent=\"streamPoolId:ocid1.streampool.oc1.phx.amaaaaaaep4fdfaartcuoi5y72mkrcg7hzogcx2jnygbmgik3hqwssxqa6pq\";")
.option("kafka.max.partition.fetch.bytes", 1024 * 1024) // limit request size to 1MB per partition
.option("startingOffsets", "latest")
.load()
.selectExpr("CAST(value AS STRING)");
// Split the lines into timestamp and words
StructType wordsSchema = StructType$.MODULE$.apply(
new StructField[]{
StructField.apply("timestamp", TimestampType$.MODULE$, true, Metadata.empty()),
StructField.apply("value", StringType$.MODULE$, true, Metadata.empty())
}
);
ExpressionEncoder<Row> encoder = RowEncoder.apply(wordsSchema);
final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss");
Dataset<Row> words = lines
.flatMap(
(FlatMapFunction<Row, Row>) row -> {
// parse Kafka record in format: "timestamp(iso8601) text"
String text = row.getString(0);
String timestampString = text.substring(0, 25);
String message = text.substring(26);
Timestamp timestamp = new Timestamp(dateFormat.parse(timestampString).getTime());
return Arrays.asList(message.split(" ")).stream()
.map(word -> RowFactory.create(timestamp, word)).iterator();
}
, encoder);
// Time window aggregation
Dataset<Row> wordCounts = words
.withWatermark("timestamp", "1 minutes")
.groupBy(
functions.window(functions.col("timestamp"), "1 minutes", "1 minutes"),
functions.col("value")
)
.count()
.selectExpr("CAST(window.start AS timestamp) AS START_TIME",
"CAST(window.end AS timestamp) AS END_TIME",
"value AS WORD", "CAST(count AS long) AS COUNT");
wordCounts.printSchema();
// Reducing to a single partition
wordCounts = wordCounts.coalesce(1);
// Start streaming query
StreamingQuery query = null;
switch (type) {
case "console":
query = outputToConsole(wordCounts, checkpointLocation);
break;
case "csv":
query = outputToCsv(wordCounts, checkpointLocation, outputLocation);
break;
default:
System.err.println("Unknown type " + type);
System.exit(1);
}
query.awaitTermination();
}
private static void printUsage() {
System.err.println("Usage: StructuredKafkaWordCount <bootstrap-servers> " +
"<subscribe-topics> <checkpoint-location> <type> ...");
System.err.println("<type>: console");
System.err.println("<type>: csv <output-location>");
System.err.println("<type>: adw <wallet-path> <wallet-password> <tns-name>");
System.exit(1);
}
private static StreamingQuery outputToConsole(Dataset<Row> wordCounts, String checkpointLocation)
throws TimeoutException {
return wordCounts
.writeStream()
.format("console")
.outputMode("complete")
.option("checkpointLocation", checkpointLocation)
.start();
}
private static StreamingQuery outputToCsv(Dataset<Row> wordCounts, String checkpointLocation,
String outputLocation) throws TimeoutException {
return wordCounts
.writeStream()
.format("csv")
.outputMode("append")
.option("checkpointLocation", checkpointLocation)
.trigger(Trigger.ProcessingTime("1 minutes"))
.option("path", outputLocation)
.start();
}
}
Dies ist eine Python-Beispielanwendung für Data Flow.
#!/usr/bin/env python3
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.functions import concat, col, current_timestamp, lit, window, \
substring, to_timestamp, explode, split, length
import argparse
import os
def main():
parser = argparse.ArgumentParser()
parser.add_argument('--auth-type', default='PLAIN')
parser.add_argument('--bootstrap-port', default='9092')
parser.add_argument('--bootstrap-server')
parser.add_argument('--checkpoint-location')
parser.add_argument('--encryption', default='SASL_SSL')
parser.add_argument('--ocid')
parser.add_argument('--output-location')
parser.add_argument('--output-mode', default='file')
parser.add_argument('--stream-password')
parser.add_argument('--raw-stream')
parser.add_argument('--stream-username')
args = parser.parse_args()
if args.bootstrap_server is None:
args.bootstrap_server = os.environ.get('BOOTSTRAP_SERVER')
if args.raw_stream is None:
args.raw_stream = os.environ.get('RAW_STREAM')
if args.stream_username is None:
args.stream_username = os.environ.get('STREAM_USERNAME')
if args.stream_password is None:
args.stream_password = os.environ.get('STREAM_PASSWORD')
assert args.bootstrap_server is not None, "Kafka bootstrap server (--bootstrap-server) name must be set"
assert args.checkpoint_location is not None, "Checkpoint location (--checkpoint-location) must be set"
assert args.output_location is not None, "Output location (--output-location) must be set"
assert args.raw_stream is not None, "Kafka topic (--raw-stream) name must be set"
spark = (
SparkSession.builder
.appName('StructuredKafkaWordCount')
.config('failOnDataLoss', 'true')
.config('spark.sql.streaming.minBatchesToRetain', '10')
.config('spark.sql.shuffle.partitions', '1')
.config('spark.sql.streaming.stateStore.maintenanceInterval', '300')
.getOrCreate()
)
# Uncomment following line to enable debug log level.
# spark.sparkContext.setLogLevel('DEBUG')
# Configure Kafka connection settings.
if args.ocid is not None:
jaas_template = 'com.oracle.bmc.auth.sasl.ResourcePrincipalsLoginModule required intent="streamPoolId:{ocid}";'
args.auth_type = 'OCI-RSA-SHA256'
else:
jaas_template = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="{username}" password="{password}";'
raw_kafka_options = {
'kafka.sasl.jaas.config': jaas_template.format(
username=args.stream_username, password=args.stream_password,
ocid=args.ocid
),
'kafka.sasl.mechanism': args.auth_type,
'kafka.security.protocol': args.encryption,
'kafka.bootstrap.servers': '{}:{}'.format(args.bootstrap_server,
args.bootstrap_port),
'subscribe': args.raw_stream,
'kafka.max.partition.fetch.bytes': 1024 * 1024,
'startingOffsets': 'latest'
}
# Reading raw Kafka stream.
raw = spark.readStream.format('kafka').options(**raw_kafka_options).load()
# Cast raw lines to a string.
lines = raw.selectExpr('CAST(value AS STRING)')
# Split value column into timestamp and words columns.
parsedLines = (
lines.select(
to_timestamp(substring('value', 1, 25))
.alias('timestamp'),
lines.value.substr(lit(26), length('value') - 25)
.alias('words'))
)
# Split words into array and explode single record into multiple.
words = (
parsedLines.select(
col('timestamp'),
explode(split('words', ' ')).alias('word')
)
)
# Do time window aggregation
wordCounts = (
words
.withWatermark('timestamp', '1 minutes')
.groupBy('word', window('timestamp', '1 minute'))
.count()
.selectExpr('CAST(window.start AS timestamp) AS START_TIME',
'CAST(window.end AS timestamp) AS END_TIME',
'word AS WORD',
'CAST(count AS long) AS COUNT')
)
# Reduce partitions to a single.
wordCounts = wordCounts.coalesce(1)
wordCounts.printSchema()
# Output it to the chosen channel.
if args.output_mode == 'console':
print("Writing aggregates to console")
query = (
wordCounts.writeStream
.option('checkpointLocation', args.checkpoint_location)
.outputMode('complete')
.format('console')
.option('truncate', False)
.start()
)
else:
print("Writing aggregates to Object Storage")
query = (
wordCounts.writeStream
.format('csv')
.outputMode('append')
.trigger(processingTime='1 minutes')
.option('checkpointLocation', args.checkpoint_location)
.option('path', args.output_location)
.start()
)
query.awaitTermination()
main()
Verbindung zu einer Streamingquelle in einem privaten Subnetz herstellen
Führen Sie diese Schritte aus, um eine Verbindung zu einer Streamingquelle in einem privaten Subnetz herzustellen.
-
Erstellen Sie einen privaten Endpunkt, wenn noch keiner vorhanden ist.
-
Richten Sie Oracle Streaming Service ein, und erstellen Sie einen Stream wie unter Verbindung zu Oracle Cloud Infrastructure Streaming herstellen beschrieben.
Kopieren Sie den FDQN der Streamingquelle, um Traffic zwischen VNICs innerhalb des privaten Subnetzes zuzulassen, das zum Erstellen des privaten Data Flow-Endpunkts verwendet wird. Wenn sich die Streamingquelle in einem anderen Subnetz als der private Data Flow-Endpunkt befindet, lassen Sie Traffic zwischen dem Streaming-Subnetz und dem privaten Data Flow-Endpunktsubnetz zu.