Introducción al flujo de Spark
Para poder utilizar el flujo de Spark con Data Flow, debe configurarlo.
Qué es diferente | Ejecución no de flujo | Ejecución de flujo |
---|---|---|
Identificación | Utiliza un token en nombre del (OBO) usuario solicitante. Los tokens OBO caducan después de 24 horas, por lo que no son adecuados para trabajos de larga ejecución. | Accede a Oracle Cloud Infrastructure Object Storage mediante tokens de sesión vinculados a la entidad de recurso de la ejecución. Es adecuado para trabajos de larga ejecución. |
Política de reinicio | Falla si el código de salida de tiempo de ejecución de Spark es distinto de cero. | Reinicia hasta diez veces si el código de salida de tiempo de ejecución de Spark es distinto de cero. |
Política de parches | No hay ninguna política de aplicación de parches, ya que se espera que los trabajos duren menos de 24 horas. | Parches mensuales automáticos. |
Conexión a Oracle Cloud Infrastructure Streaming
Descubra cómo conectarse a Oracle Cloud Infrastructure Streaming.
- Configure el servicio Oracle Streaming y cree un flujo.Nota
El servicio Oracle Streaming tiene los límites siguientes:- Los mensajes de un flujo se conservan durante un mínimo de 24 horas y un máximo de siete días.
- Todos los mensajes de un flujo se suprimen después de caducar el período de retención, tanto si se han leído como si no.
- El período de retención de un flujo no se puede cambiar después de crear el flujo.
- Un arrendamiento tiene un límite por defecto de cero o cinco particiones en función de su licencia. Si necesita más particiones, puede solicitar un aumento del límite de servicio.
- El número de particiones de un flujo no se puede cambiar después de la creación del flujo.
- Un único flujo puede soportar hasta 50 grupos de consumidores leyendo de este.
- Cada partición tiene un total de escritura de datos de 1 MB por segundo. No hay ningún límite en el número de solicitudes PUT, siempre que no se supere el límite de escritura de datos.
- Cada partición tiene cinco solicitudes GET por segundo por grupo de consumidores. Como único flujo, puede soportar hasta 50 grupos de consumidores, y solo un consumidor de un grupo de consumidores puede leer una única partición de un flujo. Una partición puede soportar hasta 250 solicitudes GET por segundo.
- Los productores pueden publicar un mensaje de no más de 1 MB en un flujo.
- Una solicitud no puede tener más de 1 MB. El tamaño de una solicitud es la suma de sus claves y mensajes después de que se hayan descodificado desde Base64.
- Agregue políticas de flujo a Data Flow.
- Utilice una contraseña de texto sin formato o un token de autenticación. Este método es adecuado para pruebas rápidas entre entornos. Por ejemplo, la creación de prototipos de aplicación de flujo estructurado de Spark en la que desea ejecutar de forma local y en Data Flow en el servicio Oracle Streaming. Nota
La codificación o la exposición de la contraseña en argumentos de aplicación no se considera segura, por lo que no debe utilizar este método para las ejecuciones de producción. - La autenticación de la entidad de recurso es más segura que la contraseña de texto sin formato o el token de autenticación. Es una forma más flexible de autenticarse con el servicio Oracle Streaming. Configure políticas de flujo para utilizar la autenticación de entidad de recurso.
Dispone de una aplicación Java de ejemplo y una aplicación Python de ejemplo.
Esta es una aplicación Java de ejemplo para Data Flow.
package example;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
import org.apache.spark.sql.catalyst.encoders.RowEncoder;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.Trigger;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StringType$;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.StructType$;
import org.apache.spark.sql.types.TimestampType$;
import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.concurrent.TimeoutException;
public class StructuredKafkaWordCount {
public static void main(String[] args) throws Exception {
Logger log = LogManager.getLogger(StructuredKafkaWordCount.class);
log.info("Started StructuredKafkaWordCount");
Thread.setDefaultUncaughtExceptionHandler((thread, e) -> {
log.error("Exception uncaught: ", e);
});
//Uncomment following line to enable debug log level.
//Logger.getRootLogger().setLevel(Level.DEBUG);
if (args.length < 4) {
printUsage();
}
String bootstrapServers = args[0];
String topics = args[1];
String checkpointLocation = args[2];
String type = args[3];
String outputLocation = null;
switch (type) {
case "console":
System.err.println("Using console output sink");
break;
case "csv":
if (args.length < 5) {
printUsage();
}
outputLocation = args[4];
System.err.println("Using csv output sink, output location = " + outputLocation);
break;
default:
printUsage();
}
SparkSession spark;
SparkConf conf = new SparkConf();
if (conf.contains("spark.master")) {
spark = SparkSession.builder()
.appName("StructuredKafkaWordCount")
.config("spark.sql.streaming.minBatchesToRetain", "10")
.config("spark.sql.shuffle.partitions", "1")
.config("spark.sql.streaming.stateStore.maintenanceInterval", "300")
.getOrCreate();
} else {
spark = SparkSession.builder()
.appName("StructuredKafkaWordCount")
.master("local[*]")
.config("spark.sql.streaming.minBatchesToRetain", "10")
.config("spark.sql.shuffle.partitions", "1")
.config("spark.sql.streaming.stateStore.maintenanceInterval", "300")
.getOrCreate();
}
// Create DataFrame representing the stream of input lines from Kafka
Dataset<Row> lines = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option("subscribe", topics)
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism", "OCI-RSA-SHA256")
.option("kafka.sasl.jaas.config",
"com.oracle.bmc.auth.sasl.ResourcePrincipalsLoginModule required intent=\"streamPoolId:ocid1.streampool.oc1.phx.amaaaaaaep4fdfaartcuoi5y72mkrcg7hzogcx2jnygbmgik3hqwssxqa6pq\";")
.option("kafka.max.partition.fetch.bytes", 1024 * 1024) // limit request size to 1MB per partition
.option("startingOffsets", "latest")
.load()
.selectExpr("CAST(value AS STRING)");
// Split the lines into timestamp and words
StructType wordsSchema = StructType$.MODULE$.apply(
new StructField[]{
StructField.apply("timestamp", TimestampType$.MODULE$, true, Metadata.empty()),
StructField.apply("value", StringType$.MODULE$, true, Metadata.empty())
}
);
ExpressionEncoder<Row> encoder = RowEncoder.apply(wordsSchema);
final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss");
Dataset<Row> words = lines
.flatMap(
(FlatMapFunction<Row, Row>) row -> {
// parse Kafka record in format: "timestamp(iso8601) text"
String text = row.getString(0);
String timestampString = text.substring(0, 25);
String message = text.substring(26);
Timestamp timestamp = new Timestamp(dateFormat.parse(timestampString).getTime());
return Arrays.asList(message.split(" ")).stream()
.map(word -> RowFactory.create(timestamp, word)).iterator();
}
, encoder);
// Time window aggregation
Dataset<Row> wordCounts = words
.withWatermark("timestamp", "1 minutes")
.groupBy(
functions.window(functions.col("timestamp"), "1 minutes", "1 minutes"),
functions.col("value")
)
.count()
.selectExpr("CAST(window.start AS timestamp) AS START_TIME",
"CAST(window.end AS timestamp) AS END_TIME",
"value AS WORD", "CAST(count AS long) AS COUNT");
wordCounts.printSchema();
// Reducing to a single partition
wordCounts = wordCounts.coalesce(1);
// Start streaming query
StreamingQuery query = null;
switch (type) {
case "console":
query = outputToConsole(wordCounts, checkpointLocation);
break;
case "csv":
query = outputToCsv(wordCounts, checkpointLocation, outputLocation);
break;
default:
System.err.println("Unknown type " + type);
System.exit(1);
}
query.awaitTermination();
}
private static void printUsage() {
System.err.println("Usage: StructuredKafkaWordCount <bootstrap-servers> " +
"<subscribe-topics> <checkpoint-location> <type> ...");
System.err.println("<type>: console");
System.err.println("<type>: csv <output-location>");
System.err.println("<type>: adw <wallet-path> <wallet-password> <tns-name>");
System.exit(1);
}
private static StreamingQuery outputToConsole(Dataset<Row> wordCounts, String checkpointLocation)
throws TimeoutException {
return wordCounts
.writeStream()
.format("console")
.outputMode("complete")
.option("checkpointLocation", checkpointLocation)
.start();
}
private static StreamingQuery outputToCsv(Dataset<Row> wordCounts, String checkpointLocation,
String outputLocation) throws TimeoutException {
return wordCounts
.writeStream()
.format("csv")
.outputMode("append")
.option("checkpointLocation", checkpointLocation)
.trigger(Trigger.ProcessingTime("1 minutes"))
.option("path", outputLocation)
.start();
}
}
Esta es una aplicación Python de ejemplo para Data Flow.
#!/usr/bin/env python3
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.functions import concat, col, current_timestamp, lit, window, \
substring, to_timestamp, explode, split, length
import argparse
import os
def main():
parser = argparse.ArgumentParser()
parser.add_argument('--auth-type', default='PLAIN')
parser.add_argument('--bootstrap-port', default='9092')
parser.add_argument('--bootstrap-server')
parser.add_argument('--checkpoint-location')
parser.add_argument('--encryption', default='SASL_SSL')
parser.add_argument('--ocid')
parser.add_argument('--output-location')
parser.add_argument('--output-mode', default='file')
parser.add_argument('--stream-password')
parser.add_argument('--raw-stream')
parser.add_argument('--stream-username')
args = parser.parse_args()
if args.bootstrap_server is None:
args.bootstrap_server = os.environ.get('BOOTSTRAP_SERVER')
if args.raw_stream is None:
args.raw_stream = os.environ.get('RAW_STREAM')
if args.stream_username is None:
args.stream_username = os.environ.get('STREAM_USERNAME')
if args.stream_password is None:
args.stream_password = os.environ.get('STREAM_PASSWORD')
assert args.bootstrap_server is not None, "Kafka bootstrap server (--bootstrap-server) name must be set"
assert args.checkpoint_location is not None, "Checkpoint location (--checkpoint-location) must be set"
assert args.output_location is not None, "Output location (--output-location) must be set"
assert args.raw_stream is not None, "Kafka topic (--raw-stream) name must be set"
spark = (
SparkSession.builder
.appName('StructuredKafkaWordCount')
.config('failOnDataLoss', 'true')
.config('spark.sql.streaming.minBatchesToRetain', '10')
.config('spark.sql.shuffle.partitions', '1')
.config('spark.sql.streaming.stateStore.maintenanceInterval', '300')
.getOrCreate()
)
# Uncomment following line to enable debug log level.
# spark.sparkContext.setLogLevel('DEBUG')
# Configure Kafka connection settings.
if args.ocid is not None:
jaas_template = 'com.oracle.bmc.auth.sasl.ResourcePrincipalsLoginModule required intent="streamPoolId:{ocid}";'
args.auth_type = 'OCI-RSA-SHA256'
else:
jaas_template = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="{username}" password="{password}";'
raw_kafka_options = {
'kafka.sasl.jaas.config': jaas_template.format(
username=args.stream_username, password=args.stream_password,
ocid=args.ocid
),
'kafka.sasl.mechanism': args.auth_type,
'kafka.security.protocol': args.encryption,
'kafka.bootstrap.servers': '{}:{}'.format(args.bootstrap_server,
args.bootstrap_port),
'subscribe': args.raw_stream,
'kafka.max.partition.fetch.bytes': 1024 * 1024,
'startingOffsets': 'latest'
}
# Reading raw Kafka stream.
raw = spark.readStream.format('kafka').options(**raw_kafka_options).load()
# Cast raw lines to a string.
lines = raw.selectExpr('CAST(value AS STRING)')
# Split value column into timestamp and words columns.
parsedLines = (
lines.select(
to_timestamp(substring('value', 1, 25))
.alias('timestamp'),
lines.value.substr(lit(26), length('value') - 25)
.alias('words'))
)
# Split words into array and explode single record into multiple.
words = (
parsedLines.select(
col('timestamp'),
explode(split('words', ' ')).alias('word')
)
)
# Do time window aggregation
wordCounts = (
words
.withWatermark('timestamp', '1 minutes')
.groupBy('word', window('timestamp', '1 minute'))
.count()
.selectExpr('CAST(window.start AS timestamp) AS START_TIME',
'CAST(window.end AS timestamp) AS END_TIME',
'word AS WORD',
'CAST(count AS long) AS COUNT')
)
# Reduce partitions to a single.
wordCounts = wordCounts.coalesce(1)
wordCounts.printSchema()
# Output it to the chosen channel.
if args.output_mode == 'console':
print("Writing aggregates to console")
query = (
wordCounts.writeStream
.option('checkpointLocation', args.checkpoint_location)
.outputMode('complete')
.format('console')
.option('truncate', False)
.start()
)
else:
print("Writing aggregates to Object Storage")
query = (
wordCounts.writeStream
.format('csv')
.outputMode('append')
.trigger(processingTime='1 minutes')
.option('checkpointLocation', args.checkpoint_location)
.option('path', args.output_location)
.start()
)
query.awaitTermination()
main()
Conexión a un origen de Streaming en una subred privada
Siga estos pasos para conectarse a un origen de flujo en una subred privada.
-
Cree un punto final privado si aún no existe uno.
-
Configuración de Oracle Streaming Service y creación de un flujo como se describe en Conexión a Oracle Cloud Infrastructure Streaming
Copie el FDQN de origen de flujo para permitir el tráfico entre las VNIC de la subred privada utilizada para crear el punto final privado de Data Flow. Si el origen de flujo está en una subred diferente al punto final privado de Data Flow, permita el tráfico entre la subred de Streaming y la subred del punto final privado de Data Flow.