Introduction à la diffusion en continu Spark
Pour pouvoir utiliser la diffusion en continu Spark avec le service de flux de données, vous devez la configurer.
Différence | Exécution sans diffusion en continu | Exécution avec diffusion en continu |
---|---|---|
Authentification | Utilise un jeton OBO (au nom de) l'utilisateur demandeur. Les jetons OBO expirent au bout de 24 heures, ce qui n'est pas adapté aux tâches de longue durée. | Permet d'accéder à Oracle Cloud Infrastructure Object Storage à l'aide de jetons de session liés au principal de ressource de l'exécution. Cette approche convient aux tâches de longue durée. |
Politique de redémarrage | Échoue si le code de sortie d'exécution Spark est différent de zéro. | Redémarre jusqu'à dix fois si le code de sortie d'exécution Spark est différent de zéro. |
Politique d'application de correctifs | Aucune politique d'application de correctifs, car les tâches sont censées durer moins de 24 heures. | Correctifs mensuels automatiques. |
Connexion à Oracle Cloud Infrastructure Streaming
Voyez comment vous connecter à Oracle Cloud Infrastructure Streaming.
- Configurez le service de diffusion en continu Oracle et créez un flux.Note
Les limites suivantes s'appliquent au service de diffusion en continu Oracle :- Les messages d'un flux sont conservés pendant au moins 24 heures, et au plus sept jours.
- Tous les messages d'un flux sont supprimés à la fin de la période de conservation, qu'ils aient été lus ou non.
- La période de conservation d'un flux ne peut pas être modifiée après la création du flux.
- Une location présente une limite par défaut de zéro ou cinq partitions selon votre licence. Si vous avez besoin de partitions supplémentaires, vous pouvez demander une augmentation de limite de service.
- Le nombre de partitions pour un flux ne peut pas être modifié après la création du flux.
- Un flux peut prendre en charge jusqu'à 50 groupes de consommateurs lisant le flux.
- Le taux total d'écriture de données de chaque partition est de 1 Mo par seconde. Le nombre de demandes PUT n'est pas limité, à condition que la limite d'écriture de données ne soit pas dépassée.
- Le nombre de demandes GET par seconde par groupe de consommateurs pour chaque partition est de cinq. Comme un flux peut prendre en charge jusqu'à 50 groupes de consommateurs, et qu'une seule partition dans un flux peut être lue par un seul consommateur d'un groupe de consommateurs, une partition peut prendre en charge jusqu'à 250 demandes GET par seconde.
- Les producteurs peuvent publier un message de 1 Mo au maximum dans un flux.
- La taille d'une demande ne peut pas dépasser 1 Mo. La taille d'une demande est la somme de ses clés et messages après leur décodage à partir de Base64.
- Ajoutez des politiques de diffusion en continu au service de flux de données.
- Utilisez un mot de passe brut ou un jeton d'authentification. Cette méthode convient aux tests rapides inter-environnements. Par exemple, le prototypage d'applications de diffusion en continu structurée Spark, pour une exécution locale dans le service de flux de données sur le service de diffusion en continu Oracle. Note
Le codage permanent, ou exposition, du mot de passe dans les arguments de l'application n'est pas considéré comme sécurisé. N'utilisez donc pas cette méthode pour les exécutions de production. - L'authentification avec le principal de ressource est plus sécurisée que le mot de passe brut ou le jeton d'authentification. Il s'agit d'un moyen plus flexible de vous authentifier auprès du service de diffusion en continu Oracle. Configurez des politiques de diffusion en continu pour utiliser l'authentification avec le principal de ressource.
Un exemple d'application Java et un exemple d'application Python sont disponibles.
Voici un exemple d'application Java pour le service de flux de données.
package example;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
import org.apache.spark.sql.catalyst.encoders.RowEncoder;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.Trigger;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StringType$;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.StructType$;
import org.apache.spark.sql.types.TimestampType$;
import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.concurrent.TimeoutException;
public class StructuredKafkaWordCount {
public static void main(String[] args) throws Exception {
Logger log = LogManager.getLogger(StructuredKafkaWordCount.class);
log.info("Started StructuredKafkaWordCount");
Thread.setDefaultUncaughtExceptionHandler((thread, e) -> {
log.error("Exception uncaught: ", e);
});
//Uncomment following line to enable debug log level.
//Logger.getRootLogger().setLevel(Level.DEBUG);
if (args.length < 4) {
printUsage();
}
String bootstrapServers = args[0];
String topics = args[1];
String checkpointLocation = args[2];
String type = args[3];
String outputLocation = null;
switch (type) {
case "console":
System.err.println("Using console output sink");
break;
case "csv":
if (args.length < 5) {
printUsage();
}
outputLocation = args[4];
System.err.println("Using csv output sink, output location = " + outputLocation);
break;
default:
printUsage();
}
SparkSession spark;
SparkConf conf = new SparkConf();
if (conf.contains("spark.master")) {
spark = SparkSession.builder()
.appName("StructuredKafkaWordCount")
.config("spark.sql.streaming.minBatchesToRetain", "10")
.config("spark.sql.shuffle.partitions", "1")
.config("spark.sql.streaming.stateStore.maintenanceInterval", "300")
.getOrCreate();
} else {
spark = SparkSession.builder()
.appName("StructuredKafkaWordCount")
.master("local[*]")
.config("spark.sql.streaming.minBatchesToRetain", "10")
.config("spark.sql.shuffle.partitions", "1")
.config("spark.sql.streaming.stateStore.maintenanceInterval", "300")
.getOrCreate();
}
// Create DataFrame representing the stream of input lines from Kafka
Dataset<Row> lines = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option("subscribe", topics)
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism", "OCI-RSA-SHA256")
.option("kafka.sasl.jaas.config",
"com.oracle.bmc.auth.sasl.ResourcePrincipalsLoginModule required intent=\"streamPoolId:ocid1.streampool.oc1.phx.amaaaaaaep4fdfaartcuoi5y72mkrcg7hzogcx2jnygbmgik3hqwssxqa6pq\";")
.option("kafka.max.partition.fetch.bytes", 1024 * 1024) // limit request size to 1MB per partition
.option("startingOffsets", "latest")
.load()
.selectExpr("CAST(value AS STRING)");
// Split the lines into timestamp and words
StructType wordsSchema = StructType$.MODULE$.apply(
new StructField[]{
StructField.apply("timestamp", TimestampType$.MODULE$, true, Metadata.empty()),
StructField.apply("value", StringType$.MODULE$, true, Metadata.empty())
}
);
ExpressionEncoder<Row> encoder = RowEncoder.apply(wordsSchema);
final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss");
Dataset<Row> words = lines
.flatMap(
(FlatMapFunction<Row, Row>) row -> {
// parse Kafka record in format: "timestamp(iso8601) text"
String text = row.getString(0);
String timestampString = text.substring(0, 25);
String message = text.substring(26);
Timestamp timestamp = new Timestamp(dateFormat.parse(timestampString).getTime());
return Arrays.asList(message.split(" ")).stream()
.map(word -> RowFactory.create(timestamp, word)).iterator();
}
, encoder);
// Time window aggregation
Dataset<Row> wordCounts = words
.withWatermark("timestamp", "1 minutes")
.groupBy(
functions.window(functions.col("timestamp"), "1 minutes", "1 minutes"),
functions.col("value")
)
.count()
.selectExpr("CAST(window.start AS timestamp) AS START_TIME",
"CAST(window.end AS timestamp) AS END_TIME",
"value AS WORD", "CAST(count AS long) AS COUNT");
wordCounts.printSchema();
// Reducing to a single partition
wordCounts = wordCounts.coalesce(1);
// Start streaming query
StreamingQuery query = null;
switch (type) {
case "console":
query = outputToConsole(wordCounts, checkpointLocation);
break;
case "csv":
query = outputToCsv(wordCounts, checkpointLocation, outputLocation);
break;
default:
System.err.println("Unknown type " + type);
System.exit(1);
}
query.awaitTermination();
}
private static void printUsage() {
System.err.println("Usage: StructuredKafkaWordCount <bootstrap-servers> " +
"<subscribe-topics> <checkpoint-location> <type> ...");
System.err.println("<type>: console");
System.err.println("<type>: csv <output-location>");
System.err.println("<type>: adw <wallet-path> <wallet-password> <tns-name>");
System.exit(1);
}
private static StreamingQuery outputToConsole(Dataset<Row> wordCounts, String checkpointLocation)
throws TimeoutException {
return wordCounts
.writeStream()
.format("console")
.outputMode("complete")
.option("checkpointLocation", checkpointLocation)
.start();
}
private static StreamingQuery outputToCsv(Dataset<Row> wordCounts, String checkpointLocation,
String outputLocation) throws TimeoutException {
return wordCounts
.writeStream()
.format("csv")
.outputMode("append")
.option("checkpointLocation", checkpointLocation)
.trigger(Trigger.ProcessingTime("1 minutes"))
.option("path", outputLocation)
.start();
}
}
Voici un exemple d'application Python pour le service de flux de données.
#!/usr/bin/env python3
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.functions import concat, col, current_timestamp, lit, window, \
substring, to_timestamp, explode, split, length
import argparse
import os
def main():
parser = argparse.ArgumentParser()
parser.add_argument('--auth-type', default='PLAIN')
parser.add_argument('--bootstrap-port', default='9092')
parser.add_argument('--bootstrap-server')
parser.add_argument('--checkpoint-location')
parser.add_argument('--encryption', default='SASL_SSL')
parser.add_argument('--ocid')
parser.add_argument('--output-location')
parser.add_argument('--output-mode', default='file')
parser.add_argument('--stream-password')
parser.add_argument('--raw-stream')
parser.add_argument('--stream-username')
args = parser.parse_args()
if args.bootstrap_server is None:
args.bootstrap_server = os.environ.get('BOOTSTRAP_SERVER')
if args.raw_stream is None:
args.raw_stream = os.environ.get('RAW_STREAM')
if args.stream_username is None:
args.stream_username = os.environ.get('STREAM_USERNAME')
if args.stream_password is None:
args.stream_password = os.environ.get('STREAM_PASSWORD')
assert args.bootstrap_server is not None, "Kafka bootstrap server (--bootstrap-server) name must be set"
assert args.checkpoint_location is not None, "Checkpoint location (--checkpoint-location) must be set"
assert args.output_location is not None, "Output location (--output-location) must be set"
assert args.raw_stream is not None, "Kafka topic (--raw-stream) name must be set"
spark = (
SparkSession.builder
.appName('StructuredKafkaWordCount')
.config('failOnDataLoss', 'true')
.config('spark.sql.streaming.minBatchesToRetain', '10')
.config('spark.sql.shuffle.partitions', '1')
.config('spark.sql.streaming.stateStore.maintenanceInterval', '300')
.getOrCreate()
)
# Uncomment following line to enable debug log level.
# spark.sparkContext.setLogLevel('DEBUG')
# Configure Kafka connection settings.
if args.ocid is not None:
jaas_template = 'com.oracle.bmc.auth.sasl.ResourcePrincipalsLoginModule required intent="streamPoolId:{ocid}";'
args.auth_type = 'OCI-RSA-SHA256'
else:
jaas_template = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="{username}" password="{password}";'
raw_kafka_options = {
'kafka.sasl.jaas.config': jaas_template.format(
username=args.stream_username, password=args.stream_password,
ocid=args.ocid
),
'kafka.sasl.mechanism': args.auth_type,
'kafka.security.protocol': args.encryption,
'kafka.bootstrap.servers': '{}:{}'.format(args.bootstrap_server,
args.bootstrap_port),
'subscribe': args.raw_stream,
'kafka.max.partition.fetch.bytes': 1024 * 1024,
'startingOffsets': 'latest'
}
# Reading raw Kafka stream.
raw = spark.readStream.format('kafka').options(**raw_kafka_options).load()
# Cast raw lines to a string.
lines = raw.selectExpr('CAST(value AS STRING)')
# Split value column into timestamp and words columns.
parsedLines = (
lines.select(
to_timestamp(substring('value', 1, 25))
.alias('timestamp'),
lines.value.substr(lit(26), length('value') - 25)
.alias('words'))
)
# Split words into array and explode single record into multiple.
words = (
parsedLines.select(
col('timestamp'),
explode(split('words', ' ')).alias('word')
)
)
# Do time window aggregation
wordCounts = (
words
.withWatermark('timestamp', '1 minutes')
.groupBy('word', window('timestamp', '1 minute'))
.count()
.selectExpr('CAST(window.start AS timestamp) AS START_TIME',
'CAST(window.end AS timestamp) AS END_TIME',
'word AS WORD',
'CAST(count AS long) AS COUNT')
)
# Reduce partitions to a single.
wordCounts = wordCounts.coalesce(1)
wordCounts.printSchema()
# Output it to the chosen channel.
if args.output_mode == 'console':
print("Writing aggregates to console")
query = (
wordCounts.writeStream
.option('checkpointLocation', args.checkpoint_location)
.outputMode('complete')
.format('console')
.option('truncate', False)
.start()
)
else:
print("Writing aggregates to Object Storage")
query = (
wordCounts.writeStream
.format('csv')
.outputMode('append')
.trigger(processingTime='1 minutes')
.option('checkpointLocation', args.checkpoint_location)
.option('path', args.output_location)
.start()
)
query.awaitTermination()
main()
Connexion à une source de diffusion en continu dans un sous-réseau privé
Suivez ces étapes pour vous connecter à une source de diffusion en continu dans un sous-réseau privé.
-
Créez un point d'extrémité privé s'il n'existe pas déjà.
-
Configurez le service de diffusion en continu Oracle et créez un flux, comme décrit dans Connexion à Oracle Cloud Infrastructure Streaming
Copiez le nom de domaine complet de la source de diffusion en continu pour autoriser le trafic entre les cartes vNIC du sous-réseau privé utilisé pour créer le point d'extrémité privé du service de flux de données. Si la source de diffusion en continu se trouve dans un sous-réseau différent du point d'extrémité privé du service de flux de données, autorisez le trafic entre le sous-réseau de diffusion en continu et le sous-réseau de point d'extrémité privé du service de flux de données.