Utiliser Apache Flume

Apache Flume collecte et déplace efficacement de grandes quantités de données d'événements de transmission en continu.

Pour plus d'informations sur Apache Flume, reportez-vous à la documentation Flume.

Propriétés de configuration Flume

Propriétés de configuration Flume incluses dans Big Data Service 3.1.1 ou version ultérieure.

Configuration Propriété Description
flume-env flume_java_opts Classpath Flink Hadoop
flume_user_classpath Variable d'environnement CLASSPATH utilisateur Flume

Configurations de source, de canal et de récepteur Apache Flume

Un événement Flume est une unité de flux de données ayant une charge utile d'octet et un ensemble facultatif d'attributs de chaîne. Un agent Flume est un processus (JVM) qui héberge les composants dont les événements circulent d'une source externe vers la destination suivante (hop).

La source Flume est conçue pour consommer les événements de n'importe quelle source externe, par exemple, un périphérique IOT dans un format lisible pour la source Flume. Le format peut être Avro, JSON, texte brut, etc. selon la source Flume configurée. Ces données sont reçues par le lavabo Flume.

Lorsqu'une source Flume reçoit un événement, il est stocké dans un canal. Les canaux les plus couramment utilisés sont les canaux de mémoire, de fichier et Kafka. Le canal conserve les données jusqu'à ce qu'elles soient lues par l'évier.

Le récepteur Flume supprime les données du canal et les transmet à une autre source Flume ou à un stockage externe, par exemple, HDFS ou un stockage d'objets pour que les processus en aval puissent les utiliser.

Voici des exemples de configuration de source, de canal et d'évier.

Exemple de source Netcat avec récepteur HDFS
#list hdfs sources
hdfs_agent.sources = hdfs_source
hdfs_agent.channels = memchannel
hdfs_agent.sinks = hdfs_write
 
# configure hdfs-agent source
hdfs_agent.sources.hdfs_source.type = netcat
hdfs_agent.sources.hdfs_source.bind = 0.0.0.0
hdfs_agent.sources.hdfs_source.port = 33333
 
# properties of hdfs-Cluster1-sink
hdfs_agent.sinks.hdfs_write.type = hdfs
hdfs_agent.sinks.hdfs_write.hdfs.path = <HDFS_PATH>
hdfs_agent.sinks.hdfs_write.hdfs.roll.Interval = 30
hdfs_agent.sinks.hdfs_write.hdfs.writeFormat = Text
hdfs_agent.sinks.hdfs_write.hdfs..fileType = DataStream
 
hdfs_agent.channels.memchannel.capacity = 10000
hdfs_agent.channels.memchannel.type = memory
 
hdfs_agent.sources.hdfs_source.channels = memchannel
hdfs_agent.sinks.hdfs_write.channel = memchannel
hdfs_agent.sinks.hdfs_write.hdfs.kerberosPrincipal = <User_Principal>
hdfs_agent.sinks.hdfs_write.hdfs.kerberosKeytab = <Keytab_path>
Exemple de source Netcat et de récepteur HBase
agent1.sources = netcat1
agent1.sinks = hbase
agent1.channels = Qmemory1
 
agent1.sources.netcat1.type = netcat
agent1.sources.netcat1.bind = 0.0.0.0
agent1.sources.netcat1.port = 11111
 
agent1.sinks.hbase.type = hbase2
agent1.sinks.hbase.table = test_table
agent1.sinks.hbase.columnFamily = test_cf
agent1.sinks.hbase.serializer = org.apache.flume.sink.hbase2.SimpleHBase2EventSerializer
 
agent1.channels.memory1.type = memory
agent1.channels.memory1.capacity = 1000
agent1.channels.memory1.transactionCapacity = 100
 
agent1.sources.netcat1.channels = memory1
agent1.sinks.hbase.channel = memory1
 
hdfs_agent.sinks.hdfs_write.hdfs.kerberosPrincipal = <User_Principal>
hdfs_agent.sinks.hdfs_write.hdfs.kerberosKeytab = <Keytab_path>
Exemple de source HDFS et de récepteur Object Storage

Pour configurer un connecteur HDFS sur le cluster, procédez comme suit :

#list hdfs sources
hdfs_agent.sources = hdfs_source
hdfs_agent.channels = memchannel
hdfs_agent.sinks = hdfs_write
 
# configure hdfs-agent source
hdfs_agent.sources.hdfs_source.type = netcat
hdfs_agent.sources.hdfs_source.bind = 0.0.0.0
hdfs_agent.sources.hdfs_source.port = 33333
 
# properties of hdfs-Cluster1-sink
hdfs_agent.sinks.hdfs_write.type = hdfs
hdfs_agent.sinks.hdfs_write.hdfs.path = oci://TRAINING@bdsdevcluster/new
hdfs_agent.sinks.hdfs_write.hdfs.roll.Interval = 30
hdfs_agent.sinks.hdfs_write.hdfs.writeFormat = Text
hdfs_agent.sinks.hdfs_write.hdfs..fileType = DataStream
 
hdfs_agent.channels.memchannel.capacity = 100000
hdfs_agent.channels.memchannel.type = memory
 
hdfs_agent.sources.hdfs_source.channels = memchannel
hdfs_agent.sinks.hdfs_write.channel = memchannel
hdfs_agent.sinks.hdfs_write.hdfs.kerberosPrincipal = <User_Principal>
hdfs_agent.sinks.hdfs_write.hdfs.kerberosKeytab = <Keytab_path>
Exemple de source Kafka et de récepteur HDFS
#list hdfs sources
hdfs_agent.sources = hdfs_source
hdfs_agent.channels = memchannel
hdfs_agent.sinks = hdfs_write

# configure hdfs-agent source
hdfs_agent.sources.hdfs_source.type = org.apache.flume.source.kafka.KafkaSource
hdfs_agent.sources.hdfs_source.kafka.bootstrap.servers = kafkassltest-mn0.bmbdcsad1.bmbdcs.oraclevcn.com:6667
hdfs_agent.sources.hdfs_source.kafka.topics = flume-kafka-test
hdfs_agent.sources.hdfs_source.batchSize = 50
hdfs_agent.sources.hdfs_source.kafka.consumer.group.id = customgid
hdfs_agent.sources.hdfs_source.kafka.consumer.security.protocol = SASL_PLAINTEXT
hdfs_agent.sources.hdfs_source.kafka.consumer.sasl.mechanism = GSSAPI
hdfs_agent.sources.hdfs_source.kafka.consumer.sasl.kerberos.service.name = kafka

# properties of hdfs-Cluster1-sink
hdfs_agent.sinks.hdfs_write.type = hdfs
hdfs_agent.sinks.hdfs_write.hdfs.path = <HDFS_PATH>
hdfs_agent.sinks.hdfs_write.hdfs.roll.Interval = 30
hdfs_agent.sinks.hdfs_write.hdfs.writeFormat = Text
hdfs_agent.sinks.hdfs_write.hdfs..fileType = DataStream

hdfs_agent.channels.memchannel.capacity = 100000
hdfs_agent.channels.memchannel.type = memory

hdfs_agent.sources.hdfs_source.channels = memchannel
hdfs_agent.sinks.hdfs_write.channel = memchannel 

hdfs_agent.sinks.hdfs_write.hdfs.kerberosPrincipal = <User_Principal>
hdfs_agent.sinks.hdfs_write.hdfs.kerberosKeytab = <Keytab_path>
Exemple de configurations de sources, de canaux et de puits multiples
#netcat and spool directory sources configured with Hbase and HDFS sinks
hdfs_agent.sinks.hdfs_write.hdfs.kerberosPrincipal = <User_Principal>
hdfs_agent.sinks.hdfs_write.hdfs.kerberosKeytab = <Keytab_path>

#list hdfs sources
hdfs_agent.sources = hdfs_source spooldir-source
hdfs_agent.channels = filechannel memchannel
hdfs_agent.sinks = hdfs_write hbase
 
# configure hdfs-agent source
hdfs_agent.sources.hdfs_source.type = netcat
hdfs_agent.sources.hdfs_source.bind = 0.0.0.0
hdfs_agent.sources.hdfs_source.port = 33333
 
# configure spooldir source
hdfs_agent.sources.spooldir-source.type = spooldir
hdfs_agent.sources.spooldir-source.spoolDir = /usr/lib/flume/spooldir
hdfs_agent.sources.spooldir-source.fileHeader = false
 
# properties of hdfs-Cluster1-sink
hdfs_agent.sinks.hdfs_write.type = hdfs
hdfs_agent.sinks.hdfs_write.hdfs.path = oci://TRAINING@bdsdevcluster/new
hdfs_agent.sinks.hdfs_write.hdfs.roll.Interval = 30
hdfs_agent.sinks.hdfs_write.hdfs.writeFormat = Text
hdfs_agent.sinks.hdfs_write.hdfs..fileType = DataStream
 
# configure hbase sink
hdfs_agent.sinks.hbase.type=hbase2
hdfs_agent.sinks.hbase.table=test_table
hdfs_agent.sinks.hbase.columnFamily= test_cf
hdfs_agent.sinks.hbase.serializer=org.apache.flume.sink.hbase2.SimpleHBase2EventSerializer
 
hdfs_agent.channels.filechannel.capacity = 10000
hdfs_agent.channels.filechannel.type = file
hdfs_agent.channels.filechannel.transactionCapacity = 100
hdfs_agent.channels.filechannel.checkpointDir = /usr/lib/flume/filecheckpoint
hdfs_agent.channels.filechannel.dataDirs = /usr/lib/flume/filechannel
 
hdfs_agent.channels.memchannel.capacity = 10000
hdfs_agent.channels.memchannel.type = memory
 
hdfs_agent.sources.hdfs_source.channels = memchannel
hdfs_agent.sources.spooldir-source.channels = filechannel
hdfs_agent.sinks.hdfs_write.channel = memchannel
hdfs_agent.sinks.hbase.channel = filechannel
 
hdfs_agent.sinks.hdfs_write.hdfs.kerberosPrincipal = <User_Principal>
hdfs_agent.sinks.hdfs_write.hdfs.kerberosKeytab = <Keytab_path>
 
hdfs_agent.sinks.hbase.kerberosPrincipal = <User_Principal>
hdfs_agent.sinks.hbase.kerberosKeytab = <Keytab_path>