Usando o Apache Flume
O Apache Flume coleta e move com eficiência grandes quantidades de dados de eventos de streaming.
Para obter mais informações sobre o Apache Flume, consulte a documentação do Flume.
Propriedades de Configuração do Flume
Propriedades de configuração do Flume incluídas no Big Data Service 3.1.1 ou posterior.
Configuração | Propriedade | Descrição |
---|---|---|
flume-env |
flume_java_opts |
Classpath do Flink Hadoop |
flume_user_classpath |
Classpath do usuário do Flume |
Configurações de origem, canal e pia do Apache Flume
Um evento Flume é uma unidade de fluxo de dados com um payload de byte e um conjunto opcional de atributos de string. Um agente Flume é um processo (JVM) que hospeda os componentes que os eventos fluem de uma origem externa para o próximo destino (hop).
A origem do Flume foi projetada para consumir os eventos de qualquer origem externa, por exemplo, um dispositivo IOT em um formato legível para a origem do Flume. O formato pode ser Avro, JSON, texto simples etc. de acordo com a origem Flume configurada. Esses dados são recebidos pelo dissipador Flume.
Quando uma fonte do Flume recebe um evento, ela é armazenada em um canal. Os canais mais usados estão no canal de memória, no canal de arquivos e no canal Kafka. O canal mantém os dados até que sejam lidos pela pia.
O sumidouro do Flume remove os dados do canal e os encaminha para outra origem do Flume ou para um armazenamento externo, por exemplo, HDFS ou um armazenamento de Objetos para consumo de processos downstream.
Veja a seguir exemplos de configurações de origem, canal e dissipador.
#list hdfs sources
hdfs_agent.sources = hdfs_source
hdfs_agent.channels = memchannel
hdfs_agent.sinks = hdfs_write
# configure hdfs-agent source
hdfs_agent.sources.hdfs_source.type = netcat
hdfs_agent.sources.hdfs_source.bind = 0.0.0.0
hdfs_agent.sources.hdfs_source.port = 33333
# properties of hdfs-Cluster1-sink
hdfs_agent.sinks.hdfs_write.type = hdfs
hdfs_agent.sinks.hdfs_write.hdfs.path = <HDFS_PATH>
hdfs_agent.sinks.hdfs_write.hdfs.roll.Interval = 30
hdfs_agent.sinks.hdfs_write.hdfs.writeFormat = Text
hdfs_agent.sinks.hdfs_write.hdfs..fileType = DataStream
hdfs_agent.channels.memchannel.capacity = 10000
hdfs_agent.channels.memchannel.type = memory
hdfs_agent.sources.hdfs_source.channels = memchannel
hdfs_agent.sinks.hdfs_write.channel = memchannel
hdfs_agent.sinks.hdfs_write.hdfs.kerberosPrincipal = <User_Principal>
hdfs_agent.sinks.hdfs_write.hdfs.kerberosKeytab = <Keytab_path>
agent1.sources = netcat1
agent1.sinks = hbase
agent1.channels = Qmemory1
agent1.sources.netcat1.type = netcat
agent1.sources.netcat1.bind = 0.0.0.0
agent1.sources.netcat1.port = 11111
agent1.sinks.hbase.type = hbase2
agent1.sinks.hbase.table = test_table
agent1.sinks.hbase.columnFamily = test_cf
agent1.sinks.hbase.serializer = org.apache.flume.sink.hbase2.SimpleHBase2EventSerializer
agent1.channels.memory1.type = memory
agent1.channels.memory1.capacity = 1000
agent1.channels.memory1.transactionCapacity = 100
agent1.sources.netcat1.channels = memory1
agent1.sinks.hbase.channel = memory1
hdfs_agent.sinks.hdfs_write.hdfs.kerberosPrincipal = <User_Principal>
hdfs_agent.sinks.hdfs_write.hdfs.kerberosKeytab = <Keytab_path>
Para configurar um conector HDFS no cluster:
#list hdfs sources
hdfs_agent.sources = hdfs_source
hdfs_agent.channels = memchannel
hdfs_agent.sinks = hdfs_write
# configure hdfs-agent source
hdfs_agent.sources.hdfs_source.type = netcat
hdfs_agent.sources.hdfs_source.bind = 0.0.0.0
hdfs_agent.sources.hdfs_source.port = 33333
# properties of hdfs-Cluster1-sink
hdfs_agent.sinks.hdfs_write.type = hdfs
hdfs_agent.sinks.hdfs_write.hdfs.path = oci://TRAINING@bdsdevcluster/new
hdfs_agent.sinks.hdfs_write.hdfs.roll.Interval = 30
hdfs_agent.sinks.hdfs_write.hdfs.writeFormat = Text
hdfs_agent.sinks.hdfs_write.hdfs..fileType = DataStream
hdfs_agent.channels.memchannel.capacity = 100000
hdfs_agent.channels.memchannel.type = memory
hdfs_agent.sources.hdfs_source.channels = memchannel
hdfs_agent.sinks.hdfs_write.channel = memchannel
hdfs_agent.sinks.hdfs_write.hdfs.kerberosPrincipal = <User_Principal>
hdfs_agent.sinks.hdfs_write.hdfs.kerberosKeytab = <Keytab_path>
#list hdfs sources
hdfs_agent.sources = hdfs_source
hdfs_agent.channels = memchannel
hdfs_agent.sinks = hdfs_write
# configure hdfs-agent source
hdfs_agent.sources.hdfs_source.type = org.apache.flume.source.kafka.KafkaSource
hdfs_agent.sources.hdfs_source.kafka.bootstrap.servers = kafkassltest-mn0.bmbdcsad1.bmbdcs.oraclevcn.com:6667
hdfs_agent.sources.hdfs_source.kafka.topics = flume-kafka-test
hdfs_agent.sources.hdfs_source.batchSize = 50
hdfs_agent.sources.hdfs_source.kafka.consumer.group.id = customgid
hdfs_agent.sources.hdfs_source.kafka.consumer.security.protocol = SASL_PLAINTEXT
hdfs_agent.sources.hdfs_source.kafka.consumer.sasl.mechanism = GSSAPI
hdfs_agent.sources.hdfs_source.kafka.consumer.sasl.kerberos.service.name = kafka
# properties of hdfs-Cluster1-sink
hdfs_agent.sinks.hdfs_write.type = hdfs
hdfs_agent.sinks.hdfs_write.hdfs.path = <HDFS_PATH>
hdfs_agent.sinks.hdfs_write.hdfs.roll.Interval = 30
hdfs_agent.sinks.hdfs_write.hdfs.writeFormat = Text
hdfs_agent.sinks.hdfs_write.hdfs..fileType = DataStream
hdfs_agent.channels.memchannel.capacity = 100000
hdfs_agent.channels.memchannel.type = memory
hdfs_agent.sources.hdfs_source.channels = memchannel
hdfs_agent.sinks.hdfs_write.channel = memchannel
hdfs_agent.sinks.hdfs_write.hdfs.kerberosPrincipal = <User_Principal>
hdfs_agent.sinks.hdfs_write.hdfs.kerberosKeytab = <Keytab_path>
#netcat and spool directory sources configured with Hbase and HDFS sinks
hdfs_agent.sinks.hdfs_write.hdfs.kerberosPrincipal = <User_Principal>
hdfs_agent.sinks.hdfs_write.hdfs.kerberosKeytab = <Keytab_path>
#list hdfs sources
hdfs_agent.sources = hdfs_source spooldir-source
hdfs_agent.channels = filechannel memchannel
hdfs_agent.sinks = hdfs_write hbase
# configure hdfs-agent source
hdfs_agent.sources.hdfs_source.type = netcat
hdfs_agent.sources.hdfs_source.bind = 0.0.0.0
hdfs_agent.sources.hdfs_source.port = 33333
# configure spooldir source
hdfs_agent.sources.spooldir-source.type = spooldir
hdfs_agent.sources.spooldir-source.spoolDir = /usr/lib/flume/spooldir
hdfs_agent.sources.spooldir-source.fileHeader = false
# properties of hdfs-Cluster1-sink
hdfs_agent.sinks.hdfs_write.type = hdfs
hdfs_agent.sinks.hdfs_write.hdfs.path = oci://TRAINING@bdsdevcluster/new
hdfs_agent.sinks.hdfs_write.hdfs.roll.Interval = 30
hdfs_agent.sinks.hdfs_write.hdfs.writeFormat = Text
hdfs_agent.sinks.hdfs_write.hdfs..fileType = DataStream
# configure hbase sink
hdfs_agent.sinks.hbase.type=hbase2
hdfs_agent.sinks.hbase.table=test_table
hdfs_agent.sinks.hbase.columnFamily= test_cf
hdfs_agent.sinks.hbase.serializer=org.apache.flume.sink.hbase2.SimpleHBase2EventSerializer
hdfs_agent.channels.filechannel.capacity = 10000
hdfs_agent.channels.filechannel.type = file
hdfs_agent.channels.filechannel.transactionCapacity = 100
hdfs_agent.channels.filechannel.checkpointDir = /usr/lib/flume/filecheckpoint
hdfs_agent.channels.filechannel.dataDirs = /usr/lib/flume/filechannel
hdfs_agent.channels.memchannel.capacity = 10000
hdfs_agent.channels.memchannel.type = memory
hdfs_agent.sources.hdfs_source.channels = memchannel
hdfs_agent.sources.spooldir-source.channels = filechannel
hdfs_agent.sinks.hdfs_write.channel = memchannel
hdfs_agent.sinks.hbase.channel = filechannel
hdfs_agent.sinks.hdfs_write.hdfs.kerberosPrincipal = <User_Principal>
hdfs_agent.sinks.hdfs_write.hdfs.kerberosKeytab = <Keytab_path>
hdfs_agent.sinks.hbase.kerberosPrincipal = <User_Principal>
hdfs_agent.sinks.hbase.kerberosKeytab = <Keytab_path>