Apache Avro verwenden

Apache Avro bietet Datenserialisierungs- und Datenaustauschdienste für Apache Hadoop.

Inkompatibilitätsprobleme

Daten/Schema

Wenn das Schema einen Standardwert wie "null" (mit Anführungszeichen) anstelle von null (ohne Anführungszeichen) aufweist, führt dies zu einem Fehler bei der Schemavalidierung.

Es wird empfohlen, das Schema zu korrigieren, um null (ohne Anführungszeichen) zu verwenden. Weitere Informationen finden Sie unter https://issues.apache.org/jira/browse/AVRO-2509.

API-Inkompatibilität

Der gesamte Feldzugriff erfolgt über Setter-/getter-Methoden. Avro-generierter Code von Schema-Gettern/Settern unterscheidet sich in Avro 1.8.2/1.9 gegenüber Avro 1.11.3. Beispiel: counters.groups und counters.getGroups.

Um serialisierbare Objekte zu marshalen, müssen die darin enthaltenen Packages in der Systemeigenschaft org.apache.avro.SERIALIZABLE_PACKAGE deklariert werden.

Hinweis

Ändern Sie den Anwendungscode, indem Sie ihn neu kompilieren, bevor Sie Avro 1.11.3 verwenden, und identifizieren Sie alle Pakete, die in org.apache.avro.SERIALIZABLE_PACKAGE aufgeführt werden müssen.

So verwenden Sie eine bestimmte Avro-Version: Link zum Runbook

  • Platzieren Sie die benutzerdefinierte Avro-Version in einem separaten Ordner, und verwenden Sie während der Jobweiterleitung denselben Ordner.

Benutzerdefinierte Avro-Versionen in Komponenten verwenden

Hadoop - MR

Schritte zur Verwendung einer benutzerdefinierten Avro-Version

Avro-Gläser müssen sowohl in HADOOP_CLASSPATH als auch in -libjars-Gläser vorhanden sein.

Beispiel:

export HADOOP_CLASSPATH="$(hadoop classpath):/path/to/avro-1.8.2.jar:/path/to/avro-mapred-1.8.2.jar
hadoop jar avro2.jar AvroTest -libjars /path/to/avro-1.8.2.jar:/path/to/avro-ipc-1.8.2.jar:/path/to/avro-mapred-1.8.2-hadoop2.jar <args>

Sqoop

Schritte zur Verwendung einer benutzerdefinierten Avro-Version

Avro-Gläser müssen sowohl in HADOOP_CLASSPATH als auch in -libjars-Gläser vorhanden sein.

Beispiel:

Oozie

Schritte zur Verwendung einer benutzerdefinierten Avro-Version
  • Sqoop unterstützt das dynamische Übergeben von JAR-Dateien während der Ausführung nicht.
  • avro-1.8.2-JARs müssen explizit in Sqoop lib (/usr/lib/sqoop/lib) eingefügt werden.

Spark

Schritte zur Verwendung einer benutzerdefinierten Avro-Version

Der Pfad der Avro-JARs muss zu `mapreduce.application.classpath` hinzugefügt werden.

Beispiel:

mapreduce.application.classpath = $PWD/mr-framework/hadoop/share/hadoop/mapreduce/*:...:/path/to/avro-1.8.2.jar:/path/to/avro-ipc-1.8.2.jar:/path/to/avro-mapred-1.8.2-hadoop2.jar

Spark

Schritte zur Verwendung einer benutzerdefinierten Avro-Version

Der Pfad der Avro-JARs muss über --jars und --files hinzugefügt werden.

Beispiel:

spark-submit \
  --class AvroWordCount \
  --master yarn \
  --deploy-mode client \
  --name "WordCount Avro Java Example" \
  --conf spark.executor.memory=2g \
  --conf spark.executor.cores=2 \
  --conf spark.executor.instances=2 \
  --conf spark.driver.memory=1g \
    --jars /path/to/avro-1.8.2.jar:/path/to/avro-ipc-1.8.2.jar:/path/to/avro-mapred-1.8.2-hadoop2.jar \
  avro-test.jar \
  --files /path/to/avro-1.8.2.jar:/path/to/avro-ipc-1.8.2.jar:/path/to/avro-mapred-1.8.2-hadoop2.jar \
  hdfs:///user/ambari-qa/input/sample.avro \
  hdfs:///user/ambari-qa/output25

Hive

Schritte zur Verwendung einer benutzerdefinierten Avro-Version

  • Hive unterstützt das dynamische Übergeben von JAR-Dateien während der Ausführung nicht.
  • Es muss explizit avro-1.8.2-JARs in Hive lib (/usr/lib/hive/lib) eingefügt werden.

Beispiel:

CREATE TABLE users_from_avro_schema STORED AS AVRO TBLPROPERTIES ('avro.schema.url'='hdfs://rpavro1nha-mn0.bdsdnstest.bdsclitest.oraclevcn.com:8020/user/yarn/text_schema.avsc');
LOAD DATA INPATH '/user/yarn/sample.avro' INTO TABLE users_from_avro_schema;

Flume

Schritte zur Verwendung einer benutzerdefinierten Avro-Version

Die alten Avro-JARs müssen in HADOOP_CLASSPATH angegeben werden.

Beispiel:

> export HADOOP_CLASSPATH="/usr/lib/flume/lib/path/to/avro-1.10.2.jar*:$(hadoop classpath)"

 > flume-ng agent -n MY_AGENT -f flume.conf -Dflume.root.logger=INFO,console

#list hdfs sources
MY_AGENT.sources = my-source
MY_AGENT.channels = my-channel
MY_AGENT.sinks = my-sink
  
#source
MY_AGENT.sources.my-source.type = org.apache.flume.source.kafka.KafkaSource
MY_AGENT.sources.my-source.channels = my-channel
MY_AGENT.sources.my-source.batchSize = 10000
MY_AGENT.sources.my-source.batchDurationMillis = 5000
MY_AGENT.sources.my-source.kafka.bootstrap.servers = rpavro2nha-wn1.bdsdnstest.bdsclitest.oraclevcn.com:6667
MY_AGENT.sources.my-source.kafka.topics = truck_events_stream
MY_AGENT.sources.my-source.kafka.consumer.group.id = truck_events_streamgrp_1
MY_AGENT.sources.my-source.kafka.consumer.client.id = truck_events_stream_clnt_1
MY_AGENT.sources.my-source.kafka.auto.commit.enable = true
MY_AGENT.sources.my-source.kafka.consumer.session.timeout.ms=100000
MY_AGENT.sources.my-source.kafka.consumer.request.timeout.ms=120000
MY_AGENT.sources.my-source.kafka.consumer.auto.offset.reset=earliest
  
#channel
MY_AGENT.channels.my-channel.type = memory
MY_AGENT.channels.my-channel.capacity = 100000000
MY_AGENT.channels.my-channel.transactionCapacity = 100000
MY_AGENT.channels.my-channel.parseAsFlumeEvent = false
  
#Sink
MY_AGENT.sinks.my-sink.type = hdfs
MY_AGENT.sinks.my-sink.channel = my-channel
MY_AGENT.sinks.my-sink.hdfs.writeFormat= Text
MY_AGENT.sinks.my-sink.hdfs.fileType = DataStream
MY_AGENT.sinks.my-sink.hdfs.useLocalTimeStamp = true
MY_AGENT.sinks.my-sink.hdfs.path = hdfs://rpavro2nha/user/flume/flume-output
MY_AGENT.sinks.my-sink.hdfs.rollCount=0
MY_AGENT.sinks.my-sink.hdfs.rollSize=0
MY_AGENT.sinks.my-sink.hdfs.batchSize=100000
MY_AGENT.sinks.my-sink.hdfs.maxOpenFiles=2000
MY_AGENT.sinks.my-sink.hdfs.callTimeout=50000
MY_AGENT.sinks.my-sink.hdfs.fileSuffix=.avro
  
MY_AGENT.sinks.my-sink.serializer = org.apache.flume.sink.hdfs.AvroEventSerializer$Builder
MY_AGENT.sinks.my-sink.serializer.schemaURL = hdfs:///user/flume/truck_events.avsc



>  /usr/lib/kafka/bin/kafka-console-producer.sh \  --bootstrap-server hostname:6667 \
  --topic truck_events_stream
> {"driverId": 1001, "truckId": 101, "eventTime": "2025-04-01T12:35:00", "eventType": "NORMAL", "latitude": 37.7749, "longitude": -122.4194, "eventKey": "NORM-1001-101", "correlationId": "c1001", "driverName": "John Smith", "routeId": 5001, "routeName": "Bay Area Express", "eventDate": "2025-04-01", "miles": 125}