Apache Avroの使用
Apache Avroは、Apache Hadoopのデータ・シリアライズおよびデータ交換サービスを提供します。
非互換性の問題
データ/スキーマ
スキーマにnull (引用符なし)ではなく"null" (引用符付き)などのデフォルト値がある場合、スキーマ検証が失敗します。
null (引用符なし)を使用するようにスキーマを修正することをお薦めします。詳細は、https://issues.apache.org/jira/browse/AVRO-2509を参照してください。
APIの非互換性
すべてのフィールド・アクセスは、setter/getterメソッドによって行われます。Avro 1.8.2/1.9とAvro 1.11.3では、スキーマ・ゲッター/セッターから生成されたAvroコードが異なります。たとえば、counters.groupsとcounters.getGroupsです。
シリアライズ可能オブジェクトをマーシャリングするには、含まれているパッケージをシステム・プロパティorg.apache.avro.SERIALIZABLE_PACKAGE
で宣言する必要があります。
Avro 1.11.3を使用する前に再コンパイルしてアプリケーション・コードを変更し、
org.apache.avro.SERIALIZABLE_PACKAGE
にリストする必要があるすべてのパッケージを識別します。特定のAvroバージョンを使用するには : ランブックへのリンク
- カスタムAvroバージョンを別のフォルダに配置し、ジョブの発行時に同じフォルダを使用します。
コンポーネントでのカスタムAvroバージョンの使用
Hadoop MR
カスタムAvroバージョンを使用する手順
Avro jarは、HADOOP_CLASSPATH
と-libjars
jarの両方に存在する必要があります。
例:
export HADOOP_CLASSPATH="$(hadoop classpath):/path/to/avro-1.8.2.jar:/path/to/avro-mapred-1.8.2.jar
hadoop jar avro2.jar AvroTest -libjars /path/to/avro-1.8.2.jar:/path/to/avro-ipc-1.8.2.jar:/path/to/avro-mapred-1.8.2-hadoop2.jar <args>
Sqoop
カスタムAvroバージョンを使用する手順
Avro jarは、HADOOP_CLASSPATH
と-libjars
jarの両方に存在する必要があります。
例:
Oozie
- Sqoopでは、実行中に動的にjarを渡すことはできません。
avro-1.8.2
jarsをSqoop lib (/usr/lib/sqoop/lib
)に明示的に配置する必要があります。
Spark
カスタムAvroバージョンを使用する手順
Avro jarのパスを`mapreduce.application.classpath
`に追加する必要があります。
例:
mapreduce.application.classpath = $PWD/mr-framework/hadoop/share/hadoop/mapreduce/*:...:/path/to/avro-1.8.2.jar:/path/to/avro-ipc-1.8.2.jar:/path/to/avro-mapred-1.8.2-hadoop2.jar
Spark
カスタムAvroバージョンを使用する手順
--jars
および--files
を介してAvro jarのパスを追加する必要があります。
例:
spark-submit \
--class AvroWordCount \
--master yarn \
--deploy-mode client \
--name "WordCount Avro Java Example" \
--conf spark.executor.memory=2g \
--conf spark.executor.cores=2 \
--conf spark.executor.instances=2 \
--conf spark.driver.memory=1g \
--jars /path/to/avro-1.8.2.jar:/path/to/avro-ipc-1.8.2.jar:/path/to/avro-mapred-1.8.2-hadoop2.jar \
avro-test.jar \
--files /path/to/avro-1.8.2.jar:/path/to/avro-ipc-1.8.2.jar:/path/to/avro-mapred-1.8.2-hadoop2.jar \
hdfs:///user/ambari-qa/input/sample.avro \
hdfs:///user/ambari-qa/output25
Hive
カスタムAvroバージョンを使用する手順
- Hiveでは、実行中にjarを動的に渡すことはできません。
- Hive lib
(/usr/lib/hive/lib
にavro-1.8.2
jarを明示的に配置する必要があります)。
例:
CREATE TABLE users_from_avro_schema STORED AS AVRO TBLPROPERTIES ('avro.schema.url'='hdfs://rpavro1nha-mn0.bdsdnstest.bdsclitest.oraclevcn.com:8020/user/yarn/text_schema.avsc');
LOAD DATA INPATH '/user/yarn/sample.avro' INTO TABLE users_from_avro_schema;
Flume
カスタムAvroバージョンを使用する手順
HADOOP_CLASSPATH
に古いAvro jarを指定する必要があります。
例:
> export HADOOP_CLASSPATH="/usr/lib/flume/lib/path/to/avro-1.10.2.jar*:$(hadoop classpath)"
> flume-ng agent -n MY_AGENT -f flume.conf -Dflume.root.logger=INFO,console
#list hdfs sources
MY_AGENT.sources = my-source
MY_AGENT.channels = my-channel
MY_AGENT.sinks = my-sink
#source
MY_AGENT.sources.my-source.type = org.apache.flume.source.kafka.KafkaSource
MY_AGENT.sources.my-source.channels = my-channel
MY_AGENT.sources.my-source.batchSize = 10000
MY_AGENT.sources.my-source.batchDurationMillis = 5000
MY_AGENT.sources.my-source.kafka.bootstrap.servers = rpavro2nha-wn1.bdsdnstest.bdsclitest.oraclevcn.com:6667
MY_AGENT.sources.my-source.kafka.topics = truck_events_stream
MY_AGENT.sources.my-source.kafka.consumer.group.id = truck_events_streamgrp_1
MY_AGENT.sources.my-source.kafka.consumer.client.id = truck_events_stream_clnt_1
MY_AGENT.sources.my-source.kafka.auto.commit.enable = true
MY_AGENT.sources.my-source.kafka.consumer.session.timeout.ms=100000
MY_AGENT.sources.my-source.kafka.consumer.request.timeout.ms=120000
MY_AGENT.sources.my-source.kafka.consumer.auto.offset.reset=earliest
#channel
MY_AGENT.channels.my-channel.type = memory
MY_AGENT.channels.my-channel.capacity = 100000000
MY_AGENT.channels.my-channel.transactionCapacity = 100000
MY_AGENT.channels.my-channel.parseAsFlumeEvent = false
#Sink
MY_AGENT.sinks.my-sink.type = hdfs
MY_AGENT.sinks.my-sink.channel = my-channel
MY_AGENT.sinks.my-sink.hdfs.writeFormat= Text
MY_AGENT.sinks.my-sink.hdfs.fileType = DataStream
MY_AGENT.sinks.my-sink.hdfs.useLocalTimeStamp = true
MY_AGENT.sinks.my-sink.hdfs.path = hdfs://rpavro2nha/user/flume/flume-output
MY_AGENT.sinks.my-sink.hdfs.rollCount=0
MY_AGENT.sinks.my-sink.hdfs.rollSize=0
MY_AGENT.sinks.my-sink.hdfs.batchSize=100000
MY_AGENT.sinks.my-sink.hdfs.maxOpenFiles=2000
MY_AGENT.sinks.my-sink.hdfs.callTimeout=50000
MY_AGENT.sinks.my-sink.hdfs.fileSuffix=.avro
MY_AGENT.sinks.my-sink.serializer = org.apache.flume.sink.hdfs.AvroEventSerializer$Builder
MY_AGENT.sinks.my-sink.serializer.schemaURL = hdfs:///user/flume/truck_events.avsc
> /usr/lib/kafka/bin/kafka-console-producer.sh \ --bootstrap-server hostname:6667 \
--topic truck_events_stream
> {"driverId": 1001, "truckId": 101, "eventTime": "2025-04-01T12:35:00", "eventType": "NORMAL", "latitude": 37.7749, "longitude": -122.4194, "eventKey": "NORM-1001-101", "correlationId": "c1001", "driverName": "John Smith", "routeId": 5001, "routeName": "Bay Area Express", "eventDate": "2025-04-01", "miles": 125}