flume 安装在集群的worker4上,地址192.168.189.5
1.下载 flume
http://flume.apache.org/download.html
2.上传worker4 192.168.189.5
3.worker4 192.168.189.5解压缩
#tar -zxvf apache-flume-1.6.0-bin.tar.gz
# mv apache-flume-1.6.0-bin /usr/local/
4。worker4 192.168.189.5配置
root@worker4:/usr/local/apache-flume-1.6.0-bin# cd conf
root@worker4:/usr/local/apache-flume-1.6.0-bin/conf# ls
flume-conf.properties.template flume-env.sh.template
flume-env.ps1.template log4j.properties
root@worker4:/usr/local/apache-flume-1.6.0-bin/conf# cp flume-conf.properties.template flume.conf
root@worker4:/usr/local/apache-flume-1.6.0-bin/conf# cp flume-env.sh.template flume-env.sh
root@worker4:/usr/local/apache-flume-1.6.0-bin/conf#
5。# rm flume.conf
6。worker4 192.168.189.5建立一个新的 flume.conf
root@worker4:/usr/local/apache-flume-1.6.0-bin/conf# cat flume.conf
agent1.channels = channel1
agent1.sources = source1
agent1.sinks = sink1
agent1.sources.source1.type = spooldir
agent1.sources.source1.channels = channel1
agent1.sources.source1.spoolDir = /usr/local/IMF_flume/IMF_Flume_Dir
agent1.sources.source1.fileHeader = false
agent1.sources.source1.interceptors = i1
agent1.sources.source1.interceptors.i1.type = timestamp
agent1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.sink1.brokerList = master:9092,worker1:9092,worker2:9092
agent1.sinks.sink1.topic = IMFHelloKafka
agent1.sinks.sink1.requiredAcks = 1
agent1.sinks.sink1.batchSize = 20
agent1.sinks.sink1.channel = channel1
agent1.channels.channel1.type = memory
agent1.channels.channel1.capacity = 1000
agent1.channels.channel1.transactionCapacity = 100
7。worker4 192.168.189.5配置/etc/profile
export FLUME_HOME=/usr/local/apache-flume-1.6.0-bin
export PATH=.:$PATH:$JAVA_HOME/bin:$SCALA_HOME/bin:$HADOOP_HOME/bin:$SPARK_HOME/bin:$HIVE_HOME/bin:$FLUME_HOME/bin:
$ZOOKEEPER_HOME/bin:$KAFKA_HOME/bin:$FLUME_HOME/bin
# source /etc/profile
7。 在master 192.168.189.1 worker1 192.168.189.2 worker2 192.168.189.3 上启动zookeeper集群,kafka集群
root@master:~# zkServer.sh start
JMX enabled by default
Using config: /usr/local/zookeeper-3.4.6/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
root@master:~# nohup /usr/local/kafka_2.10-0.8.2.1/bin/kafka-server-start.sh /usr/local/kafka_2.10-
0.8.2.1/config/server.properties &
[1] 2941
root@worker1:~# zkServer.sh start
JMX enabled by default
Using config: /usr/local/zookeeper-3.4.6/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
root@worker1:~# nohup /usr/local/kafka_2.10-0.8.2.1/bin/kafka-server-start.sh /usr/local/kafka_2.10-
0.8.2.1/config/server.properties &
[1] 2523
root@worker2:~# zkServer.sh start
JMX enabled by default
Using config: /usr/local/zookeeper-3.4.6/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
root@worker2:~# nohup /usr/local/kafka_2.10-0.8.2.1/bin/kafka-server-start.sh /usr/local/kafka_2.10-
0.8.2.1/config/server.properties &
[1] 2517
8。在worker4 192.168.189.5 启动flume
flume-ng agent --conf /usr/local/apache-flume-1.6.0-bin/conf/ -f flume.conf -Dflume.root.logger=DEBUG,console -n
agent1
9。测试一下,再新开 一个worker4 192.168.189.5终端 ,往监控目录/usr/local/IMF_flume/IMF_Flume_Dir中输入数据
root@worker4:/usr/local/IMF_flume/IMF_Flume_Dir# echo "hello flume hello kafka "
>/usr/local/IMF_flume/IMF_Flume_Dir/helloflumekafka.txt
root@worker4:/usr/local/IMF_flume/IMF_Flume_Dir# ls
helloflumekafka.txt.COMPLETED
root@worker4:/usr/local/IMF_flume/IMF_Flume_Dir# cat helloflumekafka.txt.COMPLETED
hello flume hello kafka
10。kafka集群中,在worker2 192.168.189.3 上察看flume传过来的数据
root@worker2:~# kafka-console-consumer.sh --zookeeper master:2181,worker1:2181,worker2:2181 --from-beginning --topic
IMFHelloKafka
hello flume hello kafka
11。worker4 192.168.189.5 的flume日志记录
2016-10-04 20:46:28,339 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider.start(PollingPropertiesFileConfigurationProvider.java:61)] Configuration provider starting 2016-10-04 20:46:28,380 (lifecycleSupervisor-1-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider.start(PollingPropertiesFileConfigurationProvider.java:78)] Configuration provider started 2016-10-04 20:46:28,388 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:126)] Checking file:flume.conf for changes 2016-10-04 20:46:28,400 (conf-file-poller-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:133)] Reloading configuration file:flume.conf 2016-10-04 20:46:28,429 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1017)] Processing:sink1 2016-10-04 20:46:28,429 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1021)] Created context for sink1: brokerList 2016-10-04 20:46:28,430 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:931)] Added sinks: sink1 Agent: agent1 2016-10-04 20:46:28,430 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1017)] Processing:sink1 2016-10-04 20:46:28,431 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1017)] Processing:sink1 2016-10-04 20:46:28,431 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1017)] Processing:sink1 2016-10-04 20:46:28,432 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1017)] Processing:sink1 2016-10-04 20:46:28,433 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1017)] Processing:sink1 2016-10-04 20:46:28,434 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.isValid(FlumeConfiguration.java:314)] Starting validation of configuration for agent: agent1, initial-configuration: AgentConfiguration[agent1] SOURCES: {source1={ parameters:{interceptors.i1.type=timestamp, channels=channel1, spoolDir=/usr/local/IMF_flume/IMF_Flume_Dir, fileHeader=false, type=spooldir, interceptors=i1} }} CHANNELS: {channel1={ parameters:{type=memory, transactionCapacity=100, capacity=1000} }} SINKS: {sink1={ parameters:{channel=channel1, topic=IMFHelloKafka, requiredAcks=1, batchSize=20, type=org.apache.flume.sink.kafka.KafkaSink, brokerList=master:9092,worker1:9092,worker2:9092} }} 2016-10-04 20:46:28,449 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateChannels(FlumeConfiguration.java:469)] Created channel channel1 2016-10-04 20:46:28,463 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateSinks(FlumeConfiguration.java:675)] Creating sink: sink1 using OTHER 2016-10-04 20:46:28,467 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.isValid(FlumeConfiguration.java:372)] Post validation configuration for agent1 AgentConfiguration created without Configuration stubs for which only basic syntactical validation was performed[agent1] SOURCES: {source1={ parameters:{interceptors.i1.type=timestamp, channels=channel1, spoolDir=/usr/local/IMF_flume/IMF_Flume_Dir, fileHeader=false, type=spooldir, interceptors=i1} }} CHANNELS: {channel1={ parameters:{type=memory, transactionCapacity=100, capacity=1000} }} SINKS: {sink1={ parameters:{channel=channel1, topic=IMFHelloKafka, requiredAcks=1, batchSize=20, type=org.apache.flume.sink.kafka.KafkaSink, brokerList=master:9092,worker1:9092,worker2:9092} }} 2016-10-04 20:46:28,473 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:136)] Channels:channel1 2016-10-04 20:46:28,474 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:137)] Sinks sink1 2016-10-04 20:46:28,474 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:138)] Sources source1 2016-10-04 20:46:28,475 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:141)] Post-validation flume configuration contains configuration for agents: [agent1] 2016-10-04 20:46:28,476 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:145)] Creating channels 2016-10-04 20:46:28,486 (conf-file-poller-0) [INFO - org.apache.flume.channel.DefaultChannelFactory.create(DefaultChannelFactory.java:42)] Creating instance of channel channel1 type memory 2016-10-04 20:46:28,579 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:200)] Created channel channel1 2016-10-04 20:46:28,581 (conf-file-poller-0) [INFO - org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory.java:41)] Creating instance of source source1, type spooldir 2016-10-04 20:46:28,618 (conf-file-poller-0) [INFO - org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:42)] Creating instance of sink: sink1, type: org.apache.flume.sink.kafka.KafkaSink 2016-10-04 20:46:28,619 (conf-file-poller-0) [DEBUG - org.apache.flume.sink.DefaultSinkFactory.getClass(DefaultSinkFactory.java:63)] Sink type org.apache.flume.sink.kafka.KafkaSink is a custom type 2016-10-04 20:46:28,629 (conf-file-poller-0) [DEBUG - org.apache.flume.sink.kafka.KafkaSink.configure(KafkaSink.java:198)] Using batch size: 20 2016-10-04 20:46:28,629 (conf-file-poller-0) [INFO - org.apache.flume.sink.kafka.KafkaSink.configure(KafkaSink.java:207)] Using the static topic: IMFHelloKafka this may be over-ridden by event headers 2016-10-04 20:46:28,630 (conf-file-poller-0) [INFO - org.apache.flume.sink.kafka.KafkaSinkUtil.getKafkaProperties(KafkaSinkUtil.java:34)] context={ parameters:{channel=channel1, topic=IMFHelloKafka, requiredAcks=1, batchSize=20, type=org.apache.flume.sink.kafka.KafkaSink, brokerList=master:9092,worker1:9092,worker2:9092} } 2016-10-04 20:46:28,631 (conf-file-poller-0) [DEBUG - org.apache.flume.sink.kafka.KafkaSink.configure(KafkaSink.java:214)] Kafka producer properties: {metadata.broker.list=master:9092,worker1:9092,worker2:9092, request.required.acks=1, key.serializer.class=kafka.serializer.StringEncoder, serializer.class=kafka.serializer.DefaultEncoder} 2016-10-04 20:46:28,647 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:114)] Channel channel1 connected to [source1, sink1] 2016-10-04 20:46:28,697 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:138)] Starting new configuration:{ sourceRunners:{source1=EventDrivenSourceRunner: { source:Spool Directory source source1: { spoolDir: /usr/local/IMF_flume/IMF_Flume_Dir } }} sinkRunners:{sink1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@51ef4f7e counterGroup:{ name:null counters:{} } }} channels:{channel1=org.apache.flume.channel.MemoryChannel{name: channel1}} } 2016-10-04 20:46:28,698 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:145)] Starting Channel channel1 2016-10-04 20:46:29,301 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:120)] Monitored counter group for type: CHANNEL, name: channel1: Successfully registered new MBean. 2016-10-04 20:46:29,307 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:96)] Component type: CHANNEL, name: channel1 started 2016-10-04 20:46:29,308 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:173)] Starting Sink sink1 2016-10-04 20:46:29,335 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:184)] Starting Source source1 2016-10-04 20:46:29,350 (lifecycleSupervisor-1-2) [INFO - org.apache.flume.source.SpoolDirectorySource.start(SpoolDirectorySource.java:78)] SpoolDirectorySource source starting with directory: /usr/local/IMF_flume/IMF_Flume_Dir 2016-10-04 20:46:29,368 (lifecycleSupervisor-1-2) [DEBUG - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.(ReliableSpoolingFileEventReader.java:138)] Initializing ReliableSpoolingFileEventReader with directory=/usr/local/IMF_flume/IMF_Flume_Dir, metaDir=.flumespool, deserializer=LINE 2016-10-04 20:46:29,588 (lifecycleSupervisor-1-2) [DEBUG - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.(ReliableSpoolingFileEventReader.java:160)] Successfully created and deleted canary file: /usr/local/IMF_flume/IMF_Flume_Dir/flume-spooldir-perm-check-8298061474023286457.canary 2016-10-04 20:46:29,590 (lifecycleSupervisor-1-2) [DEBUG - org.apache.flume.source.SpoolDirectorySource.start(SpoolDirectorySource.java:111)] SpoolDirectorySource source started 2016-10-04 20:46:29,594 (lifecycleSupervisor-1-2) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:120)] Monitored counter group for type: SOURCE, name: source1: Successfully registered new MBean. 2016-10-04 20:46:29,594 (lifecycleSupervisor-1-2) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:96)] Component type: SOURCE, name: source1 started 2016-10-04 20:46:29,838 (lifecycleSupervisor-1-1) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Verifying properties 2016-10-04 20:46:29,871 (lifecycleSupervisor-1-1) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Property key.serializer.class is overridden to kafka.serializer.StringEncoder 2016-10-04 20:46:29,871 (lifecycleSupervisor-1-1) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Property metadata.broker.list is overridden to master:9092,worker1:9092,worker2:9092 2016-10-04 20:46:29,872 (lifecycleSupervisor-1-1) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Property request.required.acks is overridden to 1 2016-10-04 20:46:29,873 (lifecycleSupervisor-1-1) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Property serializer.class is overridden to kafka.serializer.DefaultEncoder 2016-10-04 20:46:29,971 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:120)] Monitored counter group for type: SINK, name: sink1: Successfully registered new MBean. 2016-10-04 20:46:29,971 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:96)] Component type: SINK, name: sink1 started 2016-10-04 20:46:29,973 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:143)] Polling sink runner starting 2016-10-04 20:46:59,351 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:126)] Checking file:flume.conf for changes 2016-10-04 20:47:29,354 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:126)] Checking file:flume.conf for changes 2016-10-04 20:47:44,786 (pool-3-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:258)] Last read took us just up to a file boundary. Rolling to the next file, if there is one. 2016-10-04 20:47:44,787 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:114)] {Event} IMFHelloKafka : null : hello flume hello kafka 2016-10-04 20:47:44,788 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:116)] event #0 2016-10-04 20:47:44,790 (pool-3-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile(ReliableSpoolingFileEventReader.java:348)] Preparing to move file /usr/local/IMF_flume/IMF_Flume_Dir/helloflumekafka.txt to /usr/local/IMF_flume/IMF_Flume_Dir/helloflumekafka.txt.COMPLETED 2016-10-04 20:47:44,791 (pool-3-thread-1) [DEBUG - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile(ReliableSpoolingFileEventReader.java:384)] Successfully rolled file /usr/local/IMF_flume/IMF_Flume_Dir/helloflumekafka.txt to /usr/local/IMF_flume/IMF_Flume_Dir/helloflumekafka.txt.COMPLETED 2016-10-04 20:47:47,804 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - kafka.utils.Logging$class.debug(Logging.scala:52)] Handling 1 events 2016-10-04 20:47:47,901 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Fetching metadata from broker id:0,host:master,port:9092 with correlation id 0 for 1 topic(s) Set(IMFHelloKafka) 2016-10-04 20:47:48,234 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - kafka.utils.Logging$class.debug(Logging.scala:52)] Created socket with SO_TIMEOUT = 10000 (requested 10000), SO_RCVBUF = 186240 (requested -1), SO_SNDBUF = 102400 (requested 102400). 2016-10-04 20:47:48,235 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Connected to master:9092 for producing 2016-10-04 20:47:48,306 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Disconnecting from master:9092 2016-10-04 20:47:48,309 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - kafka.utils.Logging$class.debug(Logging.scala:52)] Successfully fetched metadata for 1 topic(s) Set(IMFHelloKafka) 2016-10-04 20:47:48,323 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - kafka.utils.Logging$class.debug(Logging.scala:52)] Getting broker partition info for topic IMFHelloKafka 2016-10-04 20:47:48,326 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - kafka.utils.Logging$class.debug(Logging.scala:52)] Partition [IMFHelloKafka,0] has leader 1 2016-10-04 20:47:48,329 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - kafka.utils.Logging$class.debug(Logging.scala:52)] Broker partitions registered for topic: IMFHelloKafka are 0 2016-10-04 20:47:48,343 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - kafka.utils.Logging$class.debug(Logging.scala:52)] Sending 1 messages with no compression to [IMFHelloKafka,0] 2016-10-04 20:47:48,372 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - kafka.utils.Logging$class.debug(Logging.scala:52)] Producer sending messages with correlation id 2 for topics [IMFHelloKafka,0] to broker 1 on worker1:9092 2016-10-04 20:47:48,387 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - kafka.utils.Logging$class.debug(Logging.scala:52)] Created socket with SO_TIMEOUT = 10000 (requested 10000), SO_RCVBUF = 186240 (requested -1), SO_SNDBUF = 102400 (requested 102400). 2016-10-04 20:47:48,387 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Connected to worker1:9092 for producing 2016-10-04 20:47:48,429 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - kafka.utils.Logging$class.debug(Logging.scala:52)] Producer sent messages with correlation id 2 for topics [IMFHelloKafka,0] to broker 1 on worker1:9092 2016-10-04 20:47:59,356 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:126)] Checking file:flume.conf for changes 2016-10-04 20:48:29,357 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:126)] Checking file:flume.conf for changes 2016-10-04 20:48:59,359 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:126)] Checking file:flume.conf for changes 2016-10-04 20:49:29,361 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:126)] Checking file:flume.conf for changes
12。测试验证 flume +kafka 测试运行圆满成功!
测试结果