Apache Avroの使用

Apache Avroは、Apache Hadoopのデータ・シリアライズおよびデータ交換サービスを提供します。

非互換性の問題

データ/スキーマ

スキーマにnull (引用符なし)ではなく"null" (引用符付き)などのデフォルト値がある場合、スキーマ検証が失敗します。

null (引用符なし)を使用するようにスキーマを修正することをお薦めします。詳細は、https://issues.apache.org/jira/browse/AVRO-2509を参照してください。

APIの非互換性

すべてのフィールド・アクセスは、setter/getterメソッドによって行われます。Avro 1.8.2/1.9とAvro 1.11.3では、スキーマ・ゲッター/セッターから生成されたAvroコードが異なります。たとえば、counters.groupscounters.getGroupsです。

シリアライズ可能オブジェクトをマーシャリングするには、含まれているパッケージをシステム・プロパティorg.apache.avro.SERIALIZABLE_PACKAGEで宣言する必要があります。

ノート

Avro 1.11.3を使用する前に再コンパイルしてアプリケーション・コードを変更し、org.apache.avro.SERIALIZABLE_PACKAGEにリストする必要があるすべてのパッケージを識別します。

特定のAvroバージョンを使用するには : ランブックへのリンク

  • カスタムAvroバージョンを別のフォルダに配置し、ジョブの発行時に同じフォルダを使用します。

コンポーネントでのカスタムAvroバージョンの使用

Hadoop MR

カスタムAvroバージョンを使用する手順

Avro jarは、HADOOP_CLASSPATH-libjars jarの両方に存在する必要があります。

例:

export HADOOP_CLASSPATH="$(hadoop classpath):/path/to/avro-1.8.2.jar:/path/to/avro-mapred-1.8.2.jar
hadoop jar avro2.jar AvroTest -libjars /path/to/avro-1.8.2.jar:/path/to/avro-ipc-1.8.2.jar:/path/to/avro-mapred-1.8.2-hadoop2.jar <args>

Sqoop

カスタムAvroバージョンを使用する手順

Avro jarは、HADOOP_CLASSPATH-libjars jarの両方に存在する必要があります。

例:

Oozie

カスタムAvroバージョンを使用する手順
  • Sqoopでは、実行中に動的にjarを渡すことはできません。
  • avro-1.8.2 jarsをSqoop lib (/usr/lib/sqoop/lib)に明示的に配置する必要があります。

Spark

カスタムAvroバージョンを使用する手順

Avro jarのパスを`mapreduce.application.classpath`に追加する必要があります。

例:

mapreduce.application.classpath = $PWD/mr-framework/hadoop/share/hadoop/mapreduce/*:...:/path/to/avro-1.8.2.jar:/path/to/avro-ipc-1.8.2.jar:/path/to/avro-mapred-1.8.2-hadoop2.jar

Spark

カスタムAvroバージョンを使用する手順

--jarsおよび--filesを介してAvro jarのパスを追加する必要があります。

例:

spark-submit \
  --class AvroWordCount \
  --master yarn \
  --deploy-mode client \
  --name "WordCount Avro Java Example" \
  --conf spark.executor.memory=2g \
  --conf spark.executor.cores=2 \
  --conf spark.executor.instances=2 \
  --conf spark.driver.memory=1g \
    --jars /path/to/avro-1.8.2.jar:/path/to/avro-ipc-1.8.2.jar:/path/to/avro-mapred-1.8.2-hadoop2.jar \
  avro-test.jar \
  --files /path/to/avro-1.8.2.jar:/path/to/avro-ipc-1.8.2.jar:/path/to/avro-mapred-1.8.2-hadoop2.jar \
  hdfs:///user/ambari-qa/input/sample.avro \
  hdfs:///user/ambari-qa/output25

Hive

カスタムAvroバージョンを使用する手順

  • Hiveでは、実行中にjarを動的に渡すことはできません。
  • Hive lib (/usr/lib/hive/libavro-1.8.2 jarを明示的に配置する必要があります)。

例:

CREATE TABLE users_from_avro_schema STORED AS AVRO TBLPROPERTIES ('avro.schema.url'='hdfs://rpavro1nha-mn0.bdsdnstest.bdsclitest.oraclevcn.com:8020/user/yarn/text_schema.avsc');
LOAD DATA INPATH '/user/yarn/sample.avro' INTO TABLE users_from_avro_schema;

Flume

カスタムAvroバージョンを使用する手順

HADOOP_CLASSPATHに古いAvro jarを指定する必要があります。

例:

> export HADOOP_CLASSPATH="/usr/lib/flume/lib/path/to/avro-1.10.2.jar*:$(hadoop classpath)"

 > flume-ng agent -n MY_AGENT -f flume.conf -Dflume.root.logger=INFO,console

#list hdfs sources
MY_AGENT.sources = my-source
MY_AGENT.channels = my-channel
MY_AGENT.sinks = my-sink
  
#source
MY_AGENT.sources.my-source.type = org.apache.flume.source.kafka.KafkaSource
MY_AGENT.sources.my-source.channels = my-channel
MY_AGENT.sources.my-source.batchSize = 10000
MY_AGENT.sources.my-source.batchDurationMillis = 5000
MY_AGENT.sources.my-source.kafka.bootstrap.servers = rpavro2nha-wn1.bdsdnstest.bdsclitest.oraclevcn.com:6667
MY_AGENT.sources.my-source.kafka.topics = truck_events_stream
MY_AGENT.sources.my-source.kafka.consumer.group.id = truck_events_streamgrp_1
MY_AGENT.sources.my-source.kafka.consumer.client.id = truck_events_stream_clnt_1
MY_AGENT.sources.my-source.kafka.auto.commit.enable = true
MY_AGENT.sources.my-source.kafka.consumer.session.timeout.ms=100000
MY_AGENT.sources.my-source.kafka.consumer.request.timeout.ms=120000
MY_AGENT.sources.my-source.kafka.consumer.auto.offset.reset=earliest
  
#channel
MY_AGENT.channels.my-channel.type = memory
MY_AGENT.channels.my-channel.capacity = 100000000
MY_AGENT.channels.my-channel.transactionCapacity = 100000
MY_AGENT.channels.my-channel.parseAsFlumeEvent = false
  
#Sink
MY_AGENT.sinks.my-sink.type = hdfs
MY_AGENT.sinks.my-sink.channel = my-channel
MY_AGENT.sinks.my-sink.hdfs.writeFormat= Text
MY_AGENT.sinks.my-sink.hdfs.fileType = DataStream
MY_AGENT.sinks.my-sink.hdfs.useLocalTimeStamp = true
MY_AGENT.sinks.my-sink.hdfs.path = hdfs://rpavro2nha/user/flume/flume-output
MY_AGENT.sinks.my-sink.hdfs.rollCount=0
MY_AGENT.sinks.my-sink.hdfs.rollSize=0
MY_AGENT.sinks.my-sink.hdfs.batchSize=100000
MY_AGENT.sinks.my-sink.hdfs.maxOpenFiles=2000
MY_AGENT.sinks.my-sink.hdfs.callTimeout=50000
MY_AGENT.sinks.my-sink.hdfs.fileSuffix=.avro
  
MY_AGENT.sinks.my-sink.serializer = org.apache.flume.sink.hdfs.AvroEventSerializer$Builder
MY_AGENT.sinks.my-sink.serializer.schemaURL = hdfs:///user/flume/truck_events.avsc



>  /usr/lib/kafka/bin/kafka-console-producer.sh \  --bootstrap-server hostname:6667 \
  --topic truck_events_stream
> {"driverId": 1001, "truckId": 101, "eventTime": "2025-04-01T12:35:00", "eventType": "NORMAL", "latitude": 37.7749, "longitude": -122.4194, "eventKey": "NORM-1001-101", "correlationId": "c1001", "driverName": "John Smith", "routeId": 5001, "routeName": "Bay Area Express", "eventDate": "2025-04-01", "miles": 125}