Using Apache Avro
Apache Avro provides data serialization and data exchange services for Apache Hadoop.
Incompatibility Issues
Data/Schema
If schema has a default value such as "null" (with quotes) instead of null (without quotes) causes schema validation failure.
We recommend to correct schema to use null (without quotes). For more information, see https://issues.apache.org/jira/browse/AVRO-2509.
API Incompatibility
All field access is by setter/getter methods. Avro generated code from schema getters/setters is different in Avro 1.8.2/1.9 vs Avro 1.11.3. For example, counters.groups vs. counters.getGroups.
To marshal serializable objects, the packages they're in must be declared in the system property org.apache.avro.SERIALIZABLE_PACKAGE
.
Change application code by recompiling before using Avro 1.11.3 and identify all packages that must be listed in
org.apache.avro.SERIALIZABLE_PACKAGE
.To use specific Avro version : Link to runbook
- Place the custom Avro version in a separate folder and use the same folder during job submission.
Using Custom Avro Versions in Components
Hadoop MR
Steps to Use a Custom Avro Version
Avro jars must be present in both HADOOP_CLASSPATH
and with -libjars
jar.
Example:
export HADOOP_CLASSPATH="$(hadoop classpath):/path/to/avro-1.8.2.jar:/path/to/avro-mapred-1.8.2.jar
hadoop jar avro2.jar AvroTest -libjars /path/to/avro-1.8.2.jar:/path/to/avro-ipc-1.8.2.jar:/path/to/avro-mapred-1.8.2-hadoop2.jar <args>
Sqoop
Steps to Use a Custom Avro Version
Avro jars must be present in both HADOOP_CLASSPATH
and with -libjars
jar.
Example:
Oozie
- Sqoop doesn't support passing jars dynamically while running.
- Must explicitly put
avro-1.8.2
jars in Sqoop lib (/usr/lib/sqoop/lib
).
Spark
Steps to Use a Custom Avro Version
Must add the path of Avro jars to `mapreduce.application.classpath
`.
Example:
mapreduce.application.classpath = $PWD/mr-framework/hadoop/share/hadoop/mapreduce/*:...:/path/to/avro-1.8.2.jar:/path/to/avro-ipc-1.8.2.jar:/path/to/avro-mapred-1.8.2-hadoop2.jar
Spark
Steps to Use a Custom Avro Version
Must add the path of Avro jars through --jars
and --files
.
Example:
spark-submit \
--class AvroWordCount \
--master yarn \
--deploy-mode client \
--name "WordCount Avro Java Example" \
--conf spark.executor.memory=2g \
--conf spark.executor.cores=2 \
--conf spark.executor.instances=2 \
--conf spark.driver.memory=1g \
--jars /path/to/avro-1.8.2.jar:/path/to/avro-ipc-1.8.2.jar:/path/to/avro-mapred-1.8.2-hadoop2.jar \
avro-test.jar \
--files /path/to/avro-1.8.2.jar:/path/to/avro-ipc-1.8.2.jar:/path/to/avro-mapred-1.8.2-hadoop2.jar \
hdfs:///user/ambari-qa/input/sample.avro \
hdfs:///user/ambari-qa/output25
Hive
Steps to Use a Custom Avro Version
- Hive doesn't support passing jars dynamically while running.
- Must explicitly put
avro-1.8.2
jars in Hive lib(/usr/lib/hive/lib
).
Example:
CREATE TABLE users_from_avro_schema STORED AS AVRO TBLPROPERTIES ('avro.schema.url'='hdfs://rpavro1nha-mn0.bdsdnstest.bdsclitest.oraclevcn.com:8020/user/yarn/text_schema.avsc');
LOAD DATA INPATH '/user/yarn/sample.avro' INTO TABLE users_from_avro_schema;
Flume
Steps to Use a Custom Avro Version
Must specify the old Avro jars in HADOOP_CLASSPATH
.
Example:
> export HADOOP_CLASSPATH="/usr/lib/flume/lib/path/to/avro-1.10.2.jar*:$(hadoop classpath)"
> flume-ng agent -n MY_AGENT -f flume.conf -Dflume.root.logger=INFO,console
#list hdfs sources
MY_AGENT.sources = my-source
MY_AGENT.channels = my-channel
MY_AGENT.sinks = my-sink
#source
MY_AGENT.sources.my-source.type = org.apache.flume.source.kafka.KafkaSource
MY_AGENT.sources.my-source.channels = my-channel
MY_AGENT.sources.my-source.batchSize = 10000
MY_AGENT.sources.my-source.batchDurationMillis = 5000
MY_AGENT.sources.my-source.kafka.bootstrap.servers = rpavro2nha-wn1.bdsdnstest.bdsclitest.oraclevcn.com:6667
MY_AGENT.sources.my-source.kafka.topics = truck_events_stream
MY_AGENT.sources.my-source.kafka.consumer.group.id = truck_events_streamgrp_1
MY_AGENT.sources.my-source.kafka.consumer.client.id = truck_events_stream_clnt_1
MY_AGENT.sources.my-source.kafka.auto.commit.enable = true
MY_AGENT.sources.my-source.kafka.consumer.session.timeout.ms=100000
MY_AGENT.sources.my-source.kafka.consumer.request.timeout.ms=120000
MY_AGENT.sources.my-source.kafka.consumer.auto.offset.reset=earliest
#channel
MY_AGENT.channels.my-channel.type = memory
MY_AGENT.channels.my-channel.capacity = 100000000
MY_AGENT.channels.my-channel.transactionCapacity = 100000
MY_AGENT.channels.my-channel.parseAsFlumeEvent = false
#Sink
MY_AGENT.sinks.my-sink.type = hdfs
MY_AGENT.sinks.my-sink.channel = my-channel
MY_AGENT.sinks.my-sink.hdfs.writeFormat= Text
MY_AGENT.sinks.my-sink.hdfs.fileType = DataStream
MY_AGENT.sinks.my-sink.hdfs.useLocalTimeStamp = true
MY_AGENT.sinks.my-sink.hdfs.path = hdfs://rpavro2nha/user/flume/flume-output
MY_AGENT.sinks.my-sink.hdfs.rollCount=0
MY_AGENT.sinks.my-sink.hdfs.rollSize=0
MY_AGENT.sinks.my-sink.hdfs.batchSize=100000
MY_AGENT.sinks.my-sink.hdfs.maxOpenFiles=2000
MY_AGENT.sinks.my-sink.hdfs.callTimeout=50000
MY_AGENT.sinks.my-sink.hdfs.fileSuffix=.avro
MY_AGENT.sinks.my-sink.serializer = org.apache.flume.sink.hdfs.AvroEventSerializer$Builder
MY_AGENT.sinks.my-sink.serializer.schemaURL = hdfs:///user/flume/truck_events.avsc
> /usr/lib/kafka/bin/kafka-console-producer.sh \ --bootstrap-server hostname:6667 \
--topic truck_events_stream
> {"driverId": 1001, "truckId": 101, "eventTime": "2025-04-01T12:35:00", "eventType": "NORMAL", "latitude": 37.7749, "longitude": -122.4194, "eventKey": "NORM-1001-101", "correlationId": "c1001", "driverName": "John Smith", "routeId": 5001, "routeName": "Bay Area Express", "eventDate": "2025-04-01", "miles": 125}