Uso de Apache Avro

Apache Avro proporciona servicios de serialización e intercambio de datos para Apache Hadoop.

Problemas de incompatibilidad

Datos/Esquema

Si el esquema tiene un valor por defecto, como "null" (con comillas) en lugar de null (sin comillas), se produce un fallo en la validación del esquema.

Se recomienda corregir el esquema para utilizar null (sin comillas). Para obtener más información, consulte https://issues.apache.org/jira/browse/AVRO-2509.

Incompatibilidad de API

Todo el acceso a los campos se realiza por métodos setter/getter. El código generado por Avro a partir de getters/setters de esquema es diferente en Avro 1.8.2/1.9 frente a Avro 1.11.3. Por ejemplo, counters.groups frente a counters.getGroups.

Para canalizar objetos serializables, los paquetes en los que están deben declararse en la propiedad del sistema org.apache.avro.SERIALIZABLE_PACKAGE.

Nota

Cambie el código de aplicación volviendo a compilar antes de utilizar Avro 1.11.3 e identifique todos los paquetes que se deben mostrar en org.apache.avro.SERIALIZABLE_PACKAGE.

Para utilizar una versión específica de Avro: enlace al runbook

  • Coloque la versión personalizada de Avro en una carpeta independiente y utilice la misma carpeta durante el envío del trabajo.

Uso de versiones Avro personalizadas en componentes

MR de Hadoop

Pasos para utilizar una versión Avro personalizada

Los archivos jar de Avro deben estar presentes en HADOOP_CLASSPATH y con el archivo jar -libjars.

Ejemplo:

export HADOOP_CLASSPATH="$(hadoop classpath):/path/to/avro-1.8.2.jar:/path/to/avro-mapred-1.8.2.jar
hadoop jar avro2.jar AvroTest -libjars /path/to/avro-1.8.2.jar:/path/to/avro-ipc-1.8.2.jar:/path/to/avro-mapred-1.8.2-hadoop2.jar <args>
                    

Sqoop

Pasos para utilizar una versión Avro personalizada

Los archivos jar de Avro deben estar presentes en HADOOP_CLASSPATH y con el archivo jar -libjars.

Ejemplo:

Oozie

Pasos para utilizar una versión Avro personalizada
  • Sqoop no admite el paso de frascos dinámicamente mientras se ejecuta.
  • Debe colocar explícitamente los archivos jar avro-1.8.2 en Sqoop lib (/usr/lib/sqoop/lib).

Spark

Pasos para utilizar una versión Avro personalizada

Debe agregar la ruta de los tarros de Avro a `mapreduce.application.classpath`.

Ejemplo:

mapreduce.application.classpath = $PWD/mr-framework/hadoop/share/hadoop/mapreduce/*:...:/path/to/avro-1.8.2.jar:/path/to/avro-ipc-1.8.2.jar:/path/to/avro-mapred-1.8.2-hadoop2.jar

Spark

Pasos para utilizar una versión Avro personalizada

Debe agregar la ruta de los archivos jar de Avro a través de --jars y --files.

Ejemplo:

spark-submit \
  --class AvroWordCount \
  --master yarn \
  --deploy-mode client \
  --name "WordCount Avro Java Example" \
  --conf spark.executor.memory=2g \
  --conf spark.executor.cores=2 \
  --conf spark.executor.instances=2 \
  --conf spark.driver.memory=1g \
    --jars /path/to/avro-1.8.2.jar:/path/to/avro-ipc-1.8.2.jar:/path/to/avro-mapred-1.8.2-hadoop2.jar \
  avro-test.jar \
  --files /path/to/avro-1.8.2.jar:/path/to/avro-ipc-1.8.2.jar:/path/to/avro-mapred-1.8.2-hadoop2.jar \
  hdfs:///user/ambari-qa/input/sample.avro \
  hdfs:///user/ambari-qa/output25

Hive

Pasos para utilizar una versión de Avro personalizada

  • Hive no admite el paso de frascos de forma dinámica mientras se ejecuta.
  • Debe colocar explícitamente los archivos jar avro-1.8.2 en la biblioteca de Hive (/usr/lib/hive/lib).

Ejemplo:

CREATE TABLE users_from_avro_schema STORED AS AVRO TBLPROPERTIES ('avro.schema.url'='hdfs://rpavro1nha-mn0.bdsdnstest.bdsclitest.oraclevcn.com:8020/user/yarn/text_schema.avsc');
LOAD DATA INPATH '/user/yarn/sample.avro' INTO TABLE users_from_avro_schema;

Flume

Pasos para utilizar una versión Avro personalizada

Debe especificar los archivos jar de Avro antiguos en HADOOP_CLASSPATH.

Ejemplo:

> export HADOOP_CLASSPATH="/usr/lib/flume/lib/path/to/avro-1.10.2.jar*:$(hadoop classpath)"

 > flume-ng agent -n MY_AGENT -f flume.conf -Dflume.root.logger=INFO,console

#list hdfs sources
MY_AGENT.sources = my-source
MY_AGENT.channels = my-channel
MY_AGENT.sinks = my-sink
  
#source
MY_AGENT.sources.my-source.type = org.apache.flume.source.kafka.KafkaSource
MY_AGENT.sources.my-source.channels = my-channel
MY_AGENT.sources.my-source.batchSize = 10000
MY_AGENT.sources.my-source.batchDurationMillis = 5000
MY_AGENT.sources.my-source.kafka.bootstrap.servers = rpavro2nha-wn1.bdsdnstest.bdsclitest.oraclevcn.com:6667
MY_AGENT.sources.my-source.kafka.topics = truck_events_stream
MY_AGENT.sources.my-source.kafka.consumer.group.id = truck_events_streamgrp_1
MY_AGENT.sources.my-source.kafka.consumer.client.id = truck_events_stream_clnt_1
MY_AGENT.sources.my-source.kafka.auto.commit.enable = true
MY_AGENT.sources.my-source.kafka.consumer.session.timeout.ms=100000
MY_AGENT.sources.my-source.kafka.consumer.request.timeout.ms=120000
MY_AGENT.sources.my-source.kafka.consumer.auto.offset.reset=earliest
  
#channel
MY_AGENT.channels.my-channel.type = memory
MY_AGENT.channels.my-channel.capacity = 100000000
MY_AGENT.channels.my-channel.transactionCapacity = 100000
MY_AGENT.channels.my-channel.parseAsFlumeEvent = false
  
#Sink
MY_AGENT.sinks.my-sink.type = hdfs
MY_AGENT.sinks.my-sink.channel = my-channel
MY_AGENT.sinks.my-sink.hdfs.writeFormat= Text
MY_AGENT.sinks.my-sink.hdfs.fileType = DataStream
MY_AGENT.sinks.my-sink.hdfs.useLocalTimeStamp = true
MY_AGENT.sinks.my-sink.hdfs.path = hdfs://rpavro2nha/user/flume/flume-output
MY_AGENT.sinks.my-sink.hdfs.rollCount=0
MY_AGENT.sinks.my-sink.hdfs.rollSize=0
MY_AGENT.sinks.my-sink.hdfs.batchSize=100000
MY_AGENT.sinks.my-sink.hdfs.maxOpenFiles=2000
MY_AGENT.sinks.my-sink.hdfs.callTimeout=50000
MY_AGENT.sinks.my-sink.hdfs.fileSuffix=.avro
  
MY_AGENT.sinks.my-sink.serializer = org.apache.flume.sink.hdfs.AvroEventSerializer$Builder
MY_AGENT.sinks.my-sink.serializer.schemaURL = hdfs:///user/flume/truck_events.avsc



>  /usr/lib/kafka/bin/kafka-console-producer.sh \  --bootstrap-server hostname:6667 \
  --topic truck_events_stream
> {"driverId": 1001, "truckId": 101, "eventTime": "2025-04-01T12:35:00", "eventType": "NORMAL", "latitude": 37.7749, "longitude": -122.4194, "eventKey": "NORM-1001-101", "correlationId": "c1001", "driverName": "John Smith", "routeId": 5001, "routeName": "Bay Area Express", "eventDate": "2025-04-01", "miles": 125}