发布时间: 2019-09-26 14:14:41
在大数据项目中,常常会使用到flume把数据发送到kafka 消息系统或者hdfs 存储。以下介绍flume组件与kakfa组件两个端的数据连接。在实验过程中,主要是配置文件。
flume配置项目配置flume的conf文件
#通过sink把数据分别输出到kafka和HDFS上 # Name the components on this agent agent.sources = r1 agent.sinks = k1 k2 agent.channels = c1 c2 # Describe/configuration the source agent.sources.r1.type = exec agent.sources.r1.command = tail -f /root/test.log agent.sources.r1.shell = /bin/bash -c ## kafka #Describe the sink agent.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink agent.sinks.k1.topic = kafkatest agent.sinks.k1.brokerList = master:9092 agent.sinks.k1.requiredAcks = 1 agent.sinks.k1.batchSize = 2 # Use a channel which buffers events in memory agent.channels.c1.type = memory agent.channels.c1.capacity = 1000 #agent.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel agent.sources.r1.channels = c1 c2 agent.sinks.k1.channel = c1 ## hdfs #Describe the sink agent.sinks.k2.type = hdfs agent.sinks.k2.hdfs.path = hdfs://master:9000/data/flume/tail agent.sinks.k2.hdfs.fileType=DataStream agent.sinks.k2.hdfs.writeFormat=Text #agent.sinks.k2.hdfs.rollInterval = 0 #agent.sinks.k2.hdfs.rollSize = 134217728 #agent.sinks.k2.hdfs.rollCount = 1000000 agent.sinks.k2.hdfs.batchSize=10 ## Use a channel which buffers events in memory agent.channels.c2.type = memory #agent.channels.c1.capacity = 1000 #agent.channels.c2.transactionCapacity = 100 ## Bind the source and sink to the channel #agent.sources.r1.channels = c2 agent.sinks.k2.channel = c2 |
服务端/usr/local/flume/bin/flume-ng agent -f flume-exec-total.conf -n agent -Dflume.root.logger=INFO, console 客户端 echo "wangzai doubi" > test.log
Kafka配置项目
kafka创建topic
kafka-topics.sh --create --zookeeper master:2181,slave1:2181,slave2:2181 --replication-factor 1 --partitions 1 --topic kafkatest |
/usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper master:2181,slave1:2181,slave2:2181 --topic kafkatest --from-beginning
上一篇: 华为认证HCIP-Big Data Developer V2.0正式发布!
下一篇: 人工智能AI培训_机器学习之K近邻算法