Apache Flume verwenden
Apache Flume sammelt und verschiebt große Mengen von Streaming-Ereignisdaten effizient.
Weitere Informationen zu Apache Flume finden Sie in der Flume-Dokumentation.
Flume-Konfigurationseigenschaften
Flume-Konfigurationseigenschaften in Big Data Service 3.1.1 oder höher.
Konfiguration | Eigenschaft | Beschreibung |
---|---|---|
flume-env |
flume_java_opts |
Hadoop-Classpath flinken |
flume_user_classpath |
Benutzer-Classpath fortsetzen |
Apache Flume-Konfigurationen für Quelle, Kanal und Sink
Ein Flume-Ereignis ist eine Einheit des Datenflusses mit einer Byte-Payload und einer optionalen Gruppe von Zeichenfolgenattributen. Ein Flume-Agent ist ein (JVM-)Prozess, der die Komponenten hostet, die Ereignisse von einer externen Quelle zum nächsten Ziel (Hop) fließen.
Die Flume-Quelle ist so konzipiert, dass sie die Ereignisse von einer beliebigen externen Quelle konsumiert, z. B. von einem IOT-Gerät in einem lesbaren Format für die Flume-Quelle. Das Format kann Avro, JSON, Nur-Text usw. gemäß der konfigurierten Flume-Quelle sein. Diese Daten werden von der Flume-Spüle empfangen.
Wenn eine Flume-Quelle ein Ereignis empfängt, wird es in einem Kanal gespeichert. Die am häufigsten verwendeten Kanäle befinden sich im Speicherkanal, im Dateikanal und im Kafka-Kanal. Der Kanal speichert die Daten, bis sie von der Senke gelesen werden.
Die Flume-Senke entfernt die Daten aus dem Kanal und leitet sie an eine andere Flume-Quelle oder einen externen Speicher weiter, z.B. HDFS oder einen Objektspeicher, damit nachgelagerte Prozesse konsumiert werden können.
Im Folgenden finden Sie Beispiele für Quell-, Kanal- und Sinkkonfigurationen.
#list hdfs sources
hdfs_agent.sources = hdfs_source
hdfs_agent.channels = memchannel
hdfs_agent.sinks = hdfs_write
# configure hdfs-agent source
hdfs_agent.sources.hdfs_source.type = netcat
hdfs_agent.sources.hdfs_source.bind = 0.0.0.0
hdfs_agent.sources.hdfs_source.port = 33333
# properties of hdfs-Cluster1-sink
hdfs_agent.sinks.hdfs_write.type = hdfs
hdfs_agent.sinks.hdfs_write.hdfs.path = <HDFS_PATH>
hdfs_agent.sinks.hdfs_write.hdfs.roll.Interval = 30
hdfs_agent.sinks.hdfs_write.hdfs.writeFormat = Text
hdfs_agent.sinks.hdfs_write.hdfs..fileType = DataStream
hdfs_agent.channels.memchannel.capacity = 10000
hdfs_agent.channels.memchannel.type = memory
hdfs_agent.sources.hdfs_source.channels = memchannel
hdfs_agent.sinks.hdfs_write.channel = memchannel
hdfs_agent.sinks.hdfs_write.hdfs.kerberosPrincipal = <User_Principal>
hdfs_agent.sinks.hdfs_write.hdfs.kerberosKeytab = <Keytab_path>
agent1.sources = netcat1
agent1.sinks = hbase
agent1.channels = Qmemory1
agent1.sources.netcat1.type = netcat
agent1.sources.netcat1.bind = 0.0.0.0
agent1.sources.netcat1.port = 11111
agent1.sinks.hbase.type = hbase2
agent1.sinks.hbase.table = test_table
agent1.sinks.hbase.columnFamily = test_cf
agent1.sinks.hbase.serializer = org.apache.flume.sink.hbase2.SimpleHBase2EventSerializer
agent1.channels.memory1.type = memory
agent1.channels.memory1.capacity = 1000
agent1.channels.memory1.transactionCapacity = 100
agent1.sources.netcat1.channels = memory1
agent1.sinks.hbase.channel = memory1
hdfs_agent.sinks.hdfs_write.hdfs.kerberosPrincipal = <User_Principal>
hdfs_agent.sinks.hdfs_write.hdfs.kerberosKeytab = <Keytab_path>
So richten Sie einen HDFS-Connector im Cluster ein:
#list hdfs sources
hdfs_agent.sources = hdfs_source
hdfs_agent.channels = memchannel
hdfs_agent.sinks = hdfs_write
# configure hdfs-agent source
hdfs_agent.sources.hdfs_source.type = netcat
hdfs_agent.sources.hdfs_source.bind = 0.0.0.0
hdfs_agent.sources.hdfs_source.port = 33333
# properties of hdfs-Cluster1-sink
hdfs_agent.sinks.hdfs_write.type = hdfs
hdfs_agent.sinks.hdfs_write.hdfs.path = oci://TRAINING@bdsdevcluster/new
hdfs_agent.sinks.hdfs_write.hdfs.roll.Interval = 30
hdfs_agent.sinks.hdfs_write.hdfs.writeFormat = Text
hdfs_agent.sinks.hdfs_write.hdfs..fileType = DataStream
hdfs_agent.channels.memchannel.capacity = 100000
hdfs_agent.channels.memchannel.type = memory
hdfs_agent.sources.hdfs_source.channels = memchannel
hdfs_agent.sinks.hdfs_write.channel = memchannel
hdfs_agent.sinks.hdfs_write.hdfs.kerberosPrincipal = <User_Principal>
hdfs_agent.sinks.hdfs_write.hdfs.kerberosKeytab = <Keytab_path>
#list hdfs sources
hdfs_agent.sources = hdfs_source
hdfs_agent.channels = memchannel
hdfs_agent.sinks = hdfs_write
# configure hdfs-agent source
hdfs_agent.sources.hdfs_source.type = org.apache.flume.source.kafka.KafkaSource
hdfs_agent.sources.hdfs_source.kafka.bootstrap.servers = kafkassltest-mn0.bmbdcsad1.bmbdcs.oraclevcn.com:6667
hdfs_agent.sources.hdfs_source.kafka.topics = flume-kafka-test
hdfs_agent.sources.hdfs_source.batchSize = 50
hdfs_agent.sources.hdfs_source.kafka.consumer.group.id = customgid
hdfs_agent.sources.hdfs_source.kafka.consumer.security.protocol = SASL_PLAINTEXT
hdfs_agent.sources.hdfs_source.kafka.consumer.sasl.mechanism = GSSAPI
hdfs_agent.sources.hdfs_source.kafka.consumer.sasl.kerberos.service.name = kafka
# properties of hdfs-Cluster1-sink
hdfs_agent.sinks.hdfs_write.type = hdfs
hdfs_agent.sinks.hdfs_write.hdfs.path = <HDFS_PATH>
hdfs_agent.sinks.hdfs_write.hdfs.roll.Interval = 30
hdfs_agent.sinks.hdfs_write.hdfs.writeFormat = Text
hdfs_agent.sinks.hdfs_write.hdfs..fileType = DataStream
hdfs_agent.channels.memchannel.capacity = 100000
hdfs_agent.channels.memchannel.type = memory
hdfs_agent.sources.hdfs_source.channels = memchannel
hdfs_agent.sinks.hdfs_write.channel = memchannel
hdfs_agent.sinks.hdfs_write.hdfs.kerberosPrincipal = <User_Principal>
hdfs_agent.sinks.hdfs_write.hdfs.kerberosKeytab = <Keytab_path>
#netcat and spool directory sources configured with Hbase and HDFS sinks
hdfs_agent.sinks.hdfs_write.hdfs.kerberosPrincipal = <User_Principal>
hdfs_agent.sinks.hdfs_write.hdfs.kerberosKeytab = <Keytab_path>
#list hdfs sources
hdfs_agent.sources = hdfs_source spooldir-source
hdfs_agent.channels = filechannel memchannel
hdfs_agent.sinks = hdfs_write hbase
# configure hdfs-agent source
hdfs_agent.sources.hdfs_source.type = netcat
hdfs_agent.sources.hdfs_source.bind = 0.0.0.0
hdfs_agent.sources.hdfs_source.port = 33333
# configure spooldir source
hdfs_agent.sources.spooldir-source.type = spooldir
hdfs_agent.sources.spooldir-source.spoolDir = /usr/lib/flume/spooldir
hdfs_agent.sources.spooldir-source.fileHeader = false
# properties of hdfs-Cluster1-sink
hdfs_agent.sinks.hdfs_write.type = hdfs
hdfs_agent.sinks.hdfs_write.hdfs.path = oci://TRAINING@bdsdevcluster/new
hdfs_agent.sinks.hdfs_write.hdfs.roll.Interval = 30
hdfs_agent.sinks.hdfs_write.hdfs.writeFormat = Text
hdfs_agent.sinks.hdfs_write.hdfs..fileType = DataStream
# configure hbase sink
hdfs_agent.sinks.hbase.type=hbase2
hdfs_agent.sinks.hbase.table=test_table
hdfs_agent.sinks.hbase.columnFamily= test_cf
hdfs_agent.sinks.hbase.serializer=org.apache.flume.sink.hbase2.SimpleHBase2EventSerializer
hdfs_agent.channels.filechannel.capacity = 10000
hdfs_agent.channels.filechannel.type = file
hdfs_agent.channels.filechannel.transactionCapacity = 100
hdfs_agent.channels.filechannel.checkpointDir = /usr/lib/flume/filecheckpoint
hdfs_agent.channels.filechannel.dataDirs = /usr/lib/flume/filechannel
hdfs_agent.channels.memchannel.capacity = 10000
hdfs_agent.channels.memchannel.type = memory
hdfs_agent.sources.hdfs_source.channels = memchannel
hdfs_agent.sources.spooldir-source.channels = filechannel
hdfs_agent.sinks.hdfs_write.channel = memchannel
hdfs_agent.sinks.hbase.channel = filechannel
hdfs_agent.sinks.hdfs_write.hdfs.kerberosPrincipal = <User_Principal>
hdfs_agent.sinks.hdfs_write.hdfs.kerberosKeytab = <Keytab_path>
hdfs_agent.sinks.hbase.kerberosPrincipal = <User_Principal>
hdfs_agent.sinks.hbase.kerberosKeytab = <Keytab_path>