Apache Avro verwenden
Apache Avro bietet Datenserialisierungs- und Datenaustauschdienste für Apache Hadoop.
Inkompatibilitätsprobleme
Daten/Schema
Wenn das Schema einen Standardwert wie "null" (mit Anführungszeichen) anstelle von null (ohne Anführungszeichen) aufweist, führt dies zu einem Fehler bei der Schemavalidierung.
Es wird empfohlen, das Schema zu korrigieren, um null (ohne Anführungszeichen) zu verwenden. Weitere Informationen finden Sie unter https://issues.apache.org/jira/browse/AVRO-2509.
API-Inkompatibilität
Der gesamte Feldzugriff erfolgt über Setter-/getter-Methoden. Avro-generierter Code von Schema-Gettern/Settern unterscheidet sich in Avro 1.8.2/1.9 gegenüber Avro 1.11.3. Beispiel: counters.groups und counters.getGroups.
Um serialisierbare Objekte zu marshalen, müssen die darin enthaltenen Packages in der Systemeigenschaft org.apache.avro.SERIALIZABLE_PACKAGE
deklariert werden.
Ändern Sie den Anwendungscode, indem Sie ihn neu kompilieren, bevor Sie Avro 1.11.3 verwenden, und identifizieren Sie alle Pakete, die in
org.apache.avro.SERIALIZABLE_PACKAGE
aufgeführt werden müssen.So verwenden Sie eine bestimmte Avro-Version: Link zum Runbook
- Platzieren Sie die benutzerdefinierte Avro-Version in einem separaten Ordner, und verwenden Sie während der Jobweiterleitung denselben Ordner.
Benutzerdefinierte Avro-Versionen in Komponenten verwenden
Hadoop - MR
Schritte zur Verwendung einer benutzerdefinierten Avro-Version
Avro-Gläser müssen sowohl in HADOOP_CLASSPATH
als auch in -libjars
-Gläser vorhanden sein.
Beispiel:
export HADOOP_CLASSPATH="$(hadoop classpath):/path/to/avro-1.8.2.jar:/path/to/avro-mapred-1.8.2.jar
hadoop jar avro2.jar AvroTest -libjars /path/to/avro-1.8.2.jar:/path/to/avro-ipc-1.8.2.jar:/path/to/avro-mapred-1.8.2-hadoop2.jar <args>
Sqoop
Schritte zur Verwendung einer benutzerdefinierten Avro-Version
Avro-Gläser müssen sowohl in HADOOP_CLASSPATH
als auch in -libjars
-Gläser vorhanden sein.
Beispiel:
Oozie
- Sqoop unterstützt das dynamische Übergeben von JAR-Dateien während der Ausführung nicht.
avro-1.8.2
-JARs müssen explizit in Sqoop lib (/usr/lib/sqoop/lib
) eingefügt werden.
Spark
Schritte zur Verwendung einer benutzerdefinierten Avro-Version
Der Pfad der Avro-JARs muss zu `mapreduce.application.classpath
` hinzugefügt werden.
Beispiel:
mapreduce.application.classpath = $PWD/mr-framework/hadoop/share/hadoop/mapreduce/*:...:/path/to/avro-1.8.2.jar:/path/to/avro-ipc-1.8.2.jar:/path/to/avro-mapred-1.8.2-hadoop2.jar
Spark
Schritte zur Verwendung einer benutzerdefinierten Avro-Version
Der Pfad der Avro-JARs muss über --jars
und --files
hinzugefügt werden.
Beispiel:
spark-submit \
--class AvroWordCount \
--master yarn \
--deploy-mode client \
--name "WordCount Avro Java Example" \
--conf spark.executor.memory=2g \
--conf spark.executor.cores=2 \
--conf spark.executor.instances=2 \
--conf spark.driver.memory=1g \
--jars /path/to/avro-1.8.2.jar:/path/to/avro-ipc-1.8.2.jar:/path/to/avro-mapred-1.8.2-hadoop2.jar \
avro-test.jar \
--files /path/to/avro-1.8.2.jar:/path/to/avro-ipc-1.8.2.jar:/path/to/avro-mapred-1.8.2-hadoop2.jar \
hdfs:///user/ambari-qa/input/sample.avro \
hdfs:///user/ambari-qa/output25
Hive
Schritte zur Verwendung einer benutzerdefinierten Avro-Version
- Hive unterstützt das dynamische Übergeben von JAR-Dateien während der Ausführung nicht.
- Es muss explizit
avro-1.8.2
-JARs in Hive lib(/usr/lib/hive/lib
) eingefügt werden.
Beispiel:
CREATE TABLE users_from_avro_schema STORED AS AVRO TBLPROPERTIES ('avro.schema.url'='hdfs://rpavro1nha-mn0.bdsdnstest.bdsclitest.oraclevcn.com:8020/user/yarn/text_schema.avsc');
LOAD DATA INPATH '/user/yarn/sample.avro' INTO TABLE users_from_avro_schema;
Flume
Schritte zur Verwendung einer benutzerdefinierten Avro-Version
Die alten Avro-JARs müssen in HADOOP_CLASSPATH
angegeben werden.
Beispiel:
> export HADOOP_CLASSPATH="/usr/lib/flume/lib/path/to/avro-1.10.2.jar*:$(hadoop classpath)"
> flume-ng agent -n MY_AGENT -f flume.conf -Dflume.root.logger=INFO,console
#list hdfs sources
MY_AGENT.sources = my-source
MY_AGENT.channels = my-channel
MY_AGENT.sinks = my-sink
#source
MY_AGENT.sources.my-source.type = org.apache.flume.source.kafka.KafkaSource
MY_AGENT.sources.my-source.channels = my-channel
MY_AGENT.sources.my-source.batchSize = 10000
MY_AGENT.sources.my-source.batchDurationMillis = 5000
MY_AGENT.sources.my-source.kafka.bootstrap.servers = rpavro2nha-wn1.bdsdnstest.bdsclitest.oraclevcn.com:6667
MY_AGENT.sources.my-source.kafka.topics = truck_events_stream
MY_AGENT.sources.my-source.kafka.consumer.group.id = truck_events_streamgrp_1
MY_AGENT.sources.my-source.kafka.consumer.client.id = truck_events_stream_clnt_1
MY_AGENT.sources.my-source.kafka.auto.commit.enable = true
MY_AGENT.sources.my-source.kafka.consumer.session.timeout.ms=100000
MY_AGENT.sources.my-source.kafka.consumer.request.timeout.ms=120000
MY_AGENT.sources.my-source.kafka.consumer.auto.offset.reset=earliest
#channel
MY_AGENT.channels.my-channel.type = memory
MY_AGENT.channels.my-channel.capacity = 100000000
MY_AGENT.channels.my-channel.transactionCapacity = 100000
MY_AGENT.channels.my-channel.parseAsFlumeEvent = false
#Sink
MY_AGENT.sinks.my-sink.type = hdfs
MY_AGENT.sinks.my-sink.channel = my-channel
MY_AGENT.sinks.my-sink.hdfs.writeFormat= Text
MY_AGENT.sinks.my-sink.hdfs.fileType = DataStream
MY_AGENT.sinks.my-sink.hdfs.useLocalTimeStamp = true
MY_AGENT.sinks.my-sink.hdfs.path = hdfs://rpavro2nha/user/flume/flume-output
MY_AGENT.sinks.my-sink.hdfs.rollCount=0
MY_AGENT.sinks.my-sink.hdfs.rollSize=0
MY_AGENT.sinks.my-sink.hdfs.batchSize=100000
MY_AGENT.sinks.my-sink.hdfs.maxOpenFiles=2000
MY_AGENT.sinks.my-sink.hdfs.callTimeout=50000
MY_AGENT.sinks.my-sink.hdfs.fileSuffix=.avro
MY_AGENT.sinks.my-sink.serializer = org.apache.flume.sink.hdfs.AvroEventSerializer$Builder
MY_AGENT.sinks.my-sink.serializer.schemaURL = hdfs:///user/flume/truck_events.avsc
> /usr/lib/kafka/bin/kafka-console-producer.sh \ --bootstrap-server hostname:6667 \
--topic truck_events_stream
> {"driverId": 1001, "truckId": 101, "eventTime": "2025-04-01T12:35:00", "eventType": "NORMAL", "latitude": 37.7749, "longitude": -122.4194, "eventKey": "NORM-1001-101", "correlationId": "c1001", "driverName": "John Smith", "routeId": 5001, "routeName": "Bay Area Express", "eventDate": "2025-04-01", "miles": 125}