Uso de Apache Flume

Apache Flume recopila y mueve de forma eficaz grandes cantidades de datos de eventos de transmisión.

Para obtener más información sobre Apache Flume, consulte la documentación de Flume.

Propiedades de configuración de Flume

Propiedades de configuración de Flume incluidas en Big Data Service 3.1.1 o posterior.

Configuración Propiedad Descripción
flume-env flume_java_opts Classpath de Hadoop de Flink
flume_user_classpath Flume user classpath

Configuraciones de origen, canal y fregadero de Apache Flume

Un evento Flume es una unidad de flujo de datos que tiene una carga útil de bytes y un juego opcional de atributos de cadena. Un agente de Flume es un proceso (JVM) que aloja los componentes que los eventos fluyen de un origen externo al siguiente destino (salto).

El origen de Flume está diseñado para consumir los eventos de cualquier fuente externa, por ejemplo, un dispositivo IOT en un formato legible para el origen de Flume. El formato puede ser Avro, JSON, texto sin formato, etc. según el origen de Flume configurado. Estos datos son recibidos por el fregadero Flume.

Cuando un origen de Flume recibe un evento, se almacena en un canal. Los canales más utilizados se encuentran en el canal de memoria, el canal de archivos y el canal de Kafka. El canal contiene los datos hasta que los lee el fregadero.

El receptor de Flume elimina los datos del canal y los reenvía a otro origen de Flume o a un almacenamiento externo, por ejemplo, HDFS o a un almacenamiento de objetos para que los procesos descendentes los consuman.

A continuación, se muestran ejemplos de configuraciones de origen, canal y receptor.

Ejemplo de origen de Netcat con fregadero HDFS
#list hdfs sources
hdfs_agent.sources = hdfs_source
hdfs_agent.channels = memchannel
hdfs_agent.sinks = hdfs_write
 
# configure hdfs-agent source
hdfs_agent.sources.hdfs_source.type = netcat
hdfs_agent.sources.hdfs_source.bind = 0.0.0.0
hdfs_agent.sources.hdfs_source.port = 33333
 
# properties of hdfs-Cluster1-sink
hdfs_agent.sinks.hdfs_write.type = hdfs
hdfs_agent.sinks.hdfs_write.hdfs.path = <HDFS_PATH>
hdfs_agent.sinks.hdfs_write.hdfs.roll.Interval = 30
hdfs_agent.sinks.hdfs_write.hdfs.writeFormat = Text
hdfs_agent.sinks.hdfs_write.hdfs..fileType = DataStream
 
hdfs_agent.channels.memchannel.capacity = 10000
hdfs_agent.channels.memchannel.type = memory
 
hdfs_agent.sources.hdfs_source.channels = memchannel
hdfs_agent.sinks.hdfs_write.channel = memchannel
hdfs_agent.sinks.hdfs_write.hdfs.kerberosPrincipal = <User_Principal>
hdfs_agent.sinks.hdfs_write.hdfs.kerberosKeytab = <Keytab_path>
Ejemplo de origen de Netcat y fregadero HBase
agent1.sources = netcat1
agent1.sinks = hbase
agent1.channels = Qmemory1
 
agent1.sources.netcat1.type = netcat
agent1.sources.netcat1.bind = 0.0.0.0
agent1.sources.netcat1.port = 11111
 
agent1.sinks.hbase.type = hbase2
agent1.sinks.hbase.table = test_table
agent1.sinks.hbase.columnFamily = test_cf
agent1.sinks.hbase.serializer = org.apache.flume.sink.hbase2.SimpleHBase2EventSerializer
 
agent1.channels.memory1.type = memory
agent1.channels.memory1.capacity = 1000
agent1.channels.memory1.transactionCapacity = 100
 
agent1.sources.netcat1.channels = memory1
agent1.sinks.hbase.channel = memory1
 
hdfs_agent.sinks.hdfs_write.hdfs.kerberosPrincipal = <User_Principal>
hdfs_agent.sinks.hdfs_write.hdfs.kerberosKeytab = <Keytab_path>
Ejemplo de origen de HDFS y fregadero de Object Storage

Para configurar un conector HDFS en el cluster:

#list hdfs sources
hdfs_agent.sources = hdfs_source
hdfs_agent.channels = memchannel
hdfs_agent.sinks = hdfs_write
 
# configure hdfs-agent source
hdfs_agent.sources.hdfs_source.type = netcat
hdfs_agent.sources.hdfs_source.bind = 0.0.0.0
hdfs_agent.sources.hdfs_source.port = 33333
 
# properties of hdfs-Cluster1-sink
hdfs_agent.sinks.hdfs_write.type = hdfs
hdfs_agent.sinks.hdfs_write.hdfs.path = oci://TRAINING@bdsdevcluster/new
hdfs_agent.sinks.hdfs_write.hdfs.roll.Interval = 30
hdfs_agent.sinks.hdfs_write.hdfs.writeFormat = Text
hdfs_agent.sinks.hdfs_write.hdfs..fileType = DataStream
 
hdfs_agent.channels.memchannel.capacity = 100000
hdfs_agent.channels.memchannel.type = memory
 
hdfs_agent.sources.hdfs_source.channels = memchannel
hdfs_agent.sinks.hdfs_write.channel = memchannel
hdfs_agent.sinks.hdfs_write.hdfs.kerberosPrincipal = <User_Principal>
hdfs_agent.sinks.hdfs_write.hdfs.kerberosKeytab = <Keytab_path>
Ejemplo de origen de Kafka y fregadero de HDFS
#list hdfs sources
hdfs_agent.sources = hdfs_source
hdfs_agent.channels = memchannel
hdfs_agent.sinks = hdfs_write

# configure hdfs-agent source
hdfs_agent.sources.hdfs_source.type = org.apache.flume.source.kafka.KafkaSource
hdfs_agent.sources.hdfs_source.kafka.bootstrap.servers = kafkassltest-mn0.bmbdcsad1.bmbdcs.oraclevcn.com:6667
hdfs_agent.sources.hdfs_source.kafka.topics = flume-kafka-test
hdfs_agent.sources.hdfs_source.batchSize = 50
hdfs_agent.sources.hdfs_source.kafka.consumer.group.id = customgid
hdfs_agent.sources.hdfs_source.kafka.consumer.security.protocol = SASL_PLAINTEXT
hdfs_agent.sources.hdfs_source.kafka.consumer.sasl.mechanism = GSSAPI
hdfs_agent.sources.hdfs_source.kafka.consumer.sasl.kerberos.service.name = kafka

# properties of hdfs-Cluster1-sink
hdfs_agent.sinks.hdfs_write.type = hdfs
hdfs_agent.sinks.hdfs_write.hdfs.path = <HDFS_PATH>
hdfs_agent.sinks.hdfs_write.hdfs.roll.Interval = 30
hdfs_agent.sinks.hdfs_write.hdfs.writeFormat = Text
hdfs_agent.sinks.hdfs_write.hdfs..fileType = DataStream

hdfs_agent.channels.memchannel.capacity = 100000
hdfs_agent.channels.memchannel.type = memory

hdfs_agent.sources.hdfs_source.channels = memchannel
hdfs_agent.sinks.hdfs_write.channel = memchannel 

hdfs_agent.sinks.hdfs_write.hdfs.kerberosPrincipal = <User_Principal>
hdfs_agent.sinks.hdfs_write.hdfs.kerberosKeytab = <Keytab_path>
Ejemplo de varias configuraciones de orígenes, canales y receptores
#netcat and spool directory sources configured with Hbase and HDFS sinks
hdfs_agent.sinks.hdfs_write.hdfs.kerberosPrincipal = <User_Principal>
hdfs_agent.sinks.hdfs_write.hdfs.kerberosKeytab = <Keytab_path>

#list hdfs sources
hdfs_agent.sources = hdfs_source spooldir-source
hdfs_agent.channels = filechannel memchannel
hdfs_agent.sinks = hdfs_write hbase
 
# configure hdfs-agent source
hdfs_agent.sources.hdfs_source.type = netcat
hdfs_agent.sources.hdfs_source.bind = 0.0.0.0
hdfs_agent.sources.hdfs_source.port = 33333
 
# configure spooldir source
hdfs_agent.sources.spooldir-source.type = spooldir
hdfs_agent.sources.spooldir-source.spoolDir = /usr/lib/flume/spooldir
hdfs_agent.sources.spooldir-source.fileHeader = false
 
# properties of hdfs-Cluster1-sink
hdfs_agent.sinks.hdfs_write.type = hdfs
hdfs_agent.sinks.hdfs_write.hdfs.path = oci://TRAINING@bdsdevcluster/new
hdfs_agent.sinks.hdfs_write.hdfs.roll.Interval = 30
hdfs_agent.sinks.hdfs_write.hdfs.writeFormat = Text
hdfs_agent.sinks.hdfs_write.hdfs..fileType = DataStream
 
# configure hbase sink
hdfs_agent.sinks.hbase.type=hbase2
hdfs_agent.sinks.hbase.table=test_table
hdfs_agent.sinks.hbase.columnFamily= test_cf
hdfs_agent.sinks.hbase.serializer=org.apache.flume.sink.hbase2.SimpleHBase2EventSerializer
 
hdfs_agent.channels.filechannel.capacity = 10000
hdfs_agent.channels.filechannel.type = file
hdfs_agent.channels.filechannel.transactionCapacity = 100
hdfs_agent.channels.filechannel.checkpointDir = /usr/lib/flume/filecheckpoint
hdfs_agent.channels.filechannel.dataDirs = /usr/lib/flume/filechannel
 
hdfs_agent.channels.memchannel.capacity = 10000
hdfs_agent.channels.memchannel.type = memory
 
hdfs_agent.sources.hdfs_source.channels = memchannel
hdfs_agent.sources.spooldir-source.channels = filechannel
hdfs_agent.sinks.hdfs_write.channel = memchannel
hdfs_agent.sinks.hbase.channel = filechannel
 
hdfs_agent.sinks.hdfs_write.hdfs.kerberosPrincipal = <User_Principal>
hdfs_agent.sinks.hdfs_write.hdfs.kerberosKeytab = <Keytab_path>
 
hdfs_agent.sinks.hbase.kerberosPrincipal = <User_Principal>
hdfs_agent.sinks.hbase.kerberosKeytab = <Keytab_path>