第53课实战操作Kafka+Flume成功! Spark大型项目广告点击项目技术骨架实现之Spark+Kafka+Flume实战

时间:2021-09-28 12:02:26
第53课实战操作Kafka+Flume成功! Spark大型项目广告点击项目技术骨架实现之Spark+Kafka+Flume实战  


flume 安装在集群的worker4上,地址192.168.189.5


1.下载 flume 
http://flume.apache.org/download.html


2.上传worker4 192.168.189.5


3.worker4 192.168.189.5解压缩
#tar -zxvf apache-flume-1.6.0-bin.tar.gz
# mv  apache-flume-1.6.0-bin /usr/local/


4。worker4 192.168.189.5配置
root@worker4:/usr/local/apache-flume-1.6.0-bin# cd  conf
root@worker4:/usr/local/apache-flume-1.6.0-bin/conf# ls
flume-conf.properties.template  flume-env.sh.template
flume-env.ps1.template          log4j.properties
root@worker4:/usr/local/apache-flume-1.6.0-bin/conf# cp flume-conf.properties.template  flume.conf
root@worker4:/usr/local/apache-flume-1.6.0-bin/conf# cp flume-env.sh.template  flume-env.sh
root@worker4:/usr/local/apache-flume-1.6.0-bin/conf# 


5。# rm flume.conf


6。worker4 192.168.189.5建立一个新的 flume.conf


root@worker4:/usr/local/apache-flume-1.6.0-bin/conf# cat flume.conf
agent1.channels = channel1
agent1.sources = source1
agent1.sinks = sink1


agent1.sources.source1.type = spooldir   
agent1.sources.source1.channels = channel1
agent1.sources.source1.spoolDir = /usr/local/IMF_flume/IMF_Flume_Dir
agent1.sources.source1.fileHeader = false
agent1.sources.source1.interceptors = i1
agent1.sources.source1.interceptors.i1.type = timestamp




agent1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.sink1.brokerList =  master:9092,worker1:9092,worker2:9092
agent1.sinks.sink1.topic = IMFHelloKafka
agent1.sinks.sink1.requiredAcks = 1
agent1.sinks.sink1.batchSize = 20
agent1.sinks.sink1.channel = channel1


agent1.channels.channel1.type   = memory
agent1.channels.channel1.capacity = 1000
agent1.channels.channel1.transactionCapacity = 100


7。worker4 192.168.189.5配置/etc/profile


export FLUME_HOME=/usr/local/apache-flume-1.6.0-bin
export PATH=.:$PATH:$JAVA_HOME/bin:$SCALA_HOME/bin:$HADOOP_HOME/bin:$SPARK_HOME/bin:$HIVE_HOME/bin:$FLUME_HOME/bin:


$ZOOKEEPER_HOME/bin:$KAFKA_HOME/bin:$FLUME_HOME/bin
# source /etc/profile




7。 在master 192.168.189.1 worker1 192.168.189.2  worker2 192.168.189.3 上启动zookeeper集群,kafka集群




root@master:~# zkServer.sh start
JMX enabled by default
Using config: /usr/local/zookeeper-3.4.6/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
root@master:~# nohup /usr/local/kafka_2.10-0.8.2.1/bin/kafka-server-start.sh  /usr/local/kafka_2.10-


0.8.2.1/config/server.properties &
[1] 2941








root@worker1:~# zkServer.sh start
JMX enabled by default
Using config: /usr/local/zookeeper-3.4.6/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
root@worker1:~# nohup /usr/local/kafka_2.10-0.8.2.1/bin/kafka-server-start.sh  /usr/local/kafka_2.10-


0.8.2.1/config/server.properties &
[1] 2523






root@worker2:~# zkServer.sh start
JMX enabled by default
Using config: /usr/local/zookeeper-3.4.6/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
root@worker2:~# nohup /usr/local/kafka_2.10-0.8.2.1/bin/kafka-server-start.sh  /usr/local/kafka_2.10-


0.8.2.1/config/server.properties &
[1] 2517






8。在worker4 192.168.189.5 启动flume
flume-ng agent --conf /usr/local/apache-flume-1.6.0-bin/conf/ -f  flume.conf -Dflume.root.logger=DEBUG,console -n 
agent1










9。测试一下,再新开 一个worker4 192.168.189.5终端 ,往监控目录/usr/local/IMF_flume/IMF_Flume_Dir中输入数据


root@worker4:/usr/local/IMF_flume/IMF_Flume_Dir# echo "hello flume hello kafka " 


>/usr/local/IMF_flume/IMF_Flume_Dir/helloflumekafka.txt


root@worker4:/usr/local/IMF_flume/IMF_Flume_Dir# ls
helloflumekafka.txt.COMPLETED


root@worker4:/usr/local/IMF_flume/IMF_Flume_Dir# cat helloflumekafka.txt.COMPLETED
hello flume hello kafka 








10。kafka集群中,在worker2 192.168.189.3 上察看flume传过来的数据
root@worker2:~# kafka-console-consumer.sh --zookeeper master:2181,worker1:2181,worker2:2181 --from-beginning --topic 


IMFHelloKafka
hello flume hello kafka 










11。worker4 192.168.189.5 的flume日志记录


 
2016-10-04 20:46:28,339 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider.start(PollingPropertiesFileConfigurationProvider.java:61)] Configuration provider starting
2016-10-04 20:46:28,380 (lifecycleSupervisor-1-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider.start(PollingPropertiesFileConfigurationProvider.java:78)] Configuration provider started
2016-10-04 20:46:28,388 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:126)] Checking file:flume.conf for changes
2016-10-04 20:46:28,400 (conf-file-poller-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:133)] Reloading configuration file:flume.conf
2016-10-04 20:46:28,429 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1017)] Processing:sink1
2016-10-04 20:46:28,429 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1021)] Created context for sink1: brokerList
2016-10-04 20:46:28,430 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:931)] Added sinks: sink1 Agent: agent1
2016-10-04 20:46:28,430 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1017)] Processing:sink1
2016-10-04 20:46:28,431 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1017)] Processing:sink1
2016-10-04 20:46:28,431 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1017)] Processing:sink1
2016-10-04 20:46:28,432 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1017)] Processing:sink1
2016-10-04 20:46:28,433 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1017)] Processing:sink1
2016-10-04 20:46:28,434 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.isValid(FlumeConfiguration.java:314)] Starting validation of configuration for agent: agent1, initial-configuration: AgentConfiguration[agent1]
SOURCES: {source1={ parameters:{interceptors.i1.type=timestamp, channels=channel1, spoolDir=/usr/local/IMF_flume/IMF_Flume_Dir, fileHeader=false, type=spooldir, interceptors=i1} }}
CHANNELS: {channel1={ parameters:{type=memory, transactionCapacity=100, capacity=1000} }}
SINKS: {sink1={ parameters:{channel=channel1, topic=IMFHelloKafka, requiredAcks=1, batchSize=20, type=org.apache.flume.sink.kafka.KafkaSink, brokerList=master:9092,worker1:9092,worker2:9092} }}

2016-10-04 20:46:28,449 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateChannels(FlumeConfiguration.java:469)] Created channel channel1
2016-10-04 20:46:28,463 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateSinks(FlumeConfiguration.java:675)] Creating sink: sink1 using OTHER
2016-10-04 20:46:28,467 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.isValid(FlumeConfiguration.java:372)] Post validation configuration for agent1
AgentConfiguration created without Configuration stubs for which only basic syntactical validation was performed[agent1]
SOURCES: {source1={ parameters:{interceptors.i1.type=timestamp, channels=channel1, spoolDir=/usr/local/IMF_flume/IMF_Flume_Dir, fileHeader=false, type=spooldir, interceptors=i1} }}
CHANNELS: {channel1={ parameters:{type=memory, transactionCapacity=100, capacity=1000} }}
SINKS: {sink1={ parameters:{channel=channel1, topic=IMFHelloKafka, requiredAcks=1, batchSize=20, type=org.apache.flume.sink.kafka.KafkaSink, brokerList=master:9092,worker1:9092,worker2:9092} }}

2016-10-04 20:46:28,473 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:136)] Channels:channel1

2016-10-04 20:46:28,474 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:137)] Sinks sink1

2016-10-04 20:46:28,474 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:138)] Sources source1

2016-10-04 20:46:28,475 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:141)] Post-validation flume configuration contains configuration for agents: [agent1]
2016-10-04 20:46:28,476 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:145)] Creating channels
2016-10-04 20:46:28,486 (conf-file-poller-0) [INFO - org.apache.flume.channel.DefaultChannelFactory.create(DefaultChannelFactory.java:42)] Creating instance of channel channel1 type memory
2016-10-04 20:46:28,579 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:200)] Created channel channel1
2016-10-04 20:46:28,581 (conf-file-poller-0) [INFO - org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory.java:41)] Creating instance of source source1, type spooldir
2016-10-04 20:46:28,618 (conf-file-poller-0) [INFO - org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:42)] Creating instance of sink: sink1, type: org.apache.flume.sink.kafka.KafkaSink
2016-10-04 20:46:28,619 (conf-file-poller-0) [DEBUG - org.apache.flume.sink.DefaultSinkFactory.getClass(DefaultSinkFactory.java:63)] Sink type org.apache.flume.sink.kafka.KafkaSink is a custom type
2016-10-04 20:46:28,629 (conf-file-poller-0) [DEBUG - org.apache.flume.sink.kafka.KafkaSink.configure(KafkaSink.java:198)] Using batch size: 20
2016-10-04 20:46:28,629 (conf-file-poller-0) [INFO - org.apache.flume.sink.kafka.KafkaSink.configure(KafkaSink.java:207)] Using the static topic: IMFHelloKafka this may be over-ridden by event headers
2016-10-04 20:46:28,630 (conf-file-poller-0) [INFO - org.apache.flume.sink.kafka.KafkaSinkUtil.getKafkaProperties(KafkaSinkUtil.java:34)] context={ parameters:{channel=channel1, topic=IMFHelloKafka, requiredAcks=1, batchSize=20, type=org.apache.flume.sink.kafka.KafkaSink, brokerList=master:9092,worker1:9092,worker2:9092} }
2016-10-04 20:46:28,631 (conf-file-poller-0) [DEBUG - org.apache.flume.sink.kafka.KafkaSink.configure(KafkaSink.java:214)] Kafka producer properties: {metadata.broker.list=master:9092,worker1:9092,worker2:9092, request.required.acks=1, key.serializer.class=kafka.serializer.StringEncoder, serializer.class=kafka.serializer.DefaultEncoder}
2016-10-04 20:46:28,647 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:114)] Channel channel1 connected to [source1, sink1]
2016-10-04 20:46:28,697 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:138)] Starting new configuration:{ sourceRunners:{source1=EventDrivenSourceRunner: { source:Spool Directory source source1: { spoolDir: /usr/local/IMF_flume/IMF_Flume_Dir } }} sinkRunners:{sink1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@51ef4f7e counterGroup:{ name:null counters:{} } }} channels:{channel1=org.apache.flume.channel.MemoryChannel{name: channel1}} }
2016-10-04 20:46:28,698 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:145)] Starting Channel channel1
2016-10-04 20:46:29,301 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:120)] Monitored counter group for type: CHANNEL, name: channel1: Successfully registered new MBean.
2016-10-04 20:46:29,307 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:96)] Component type: CHANNEL, name: channel1 started
2016-10-04 20:46:29,308 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:173)] Starting Sink sink1
2016-10-04 20:46:29,335 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:184)] Starting Source source1
2016-10-04 20:46:29,350 (lifecycleSupervisor-1-2) [INFO - org.apache.flume.source.SpoolDirectorySource.start(SpoolDirectorySource.java:78)] SpoolDirectorySource source starting with directory: /usr/local/IMF_flume/IMF_Flume_Dir
2016-10-04 20:46:29,368 (lifecycleSupervisor-1-2) [DEBUG - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.(ReliableSpoolingFileEventReader.java:138)] Initializing ReliableSpoolingFileEventReader with directory=/usr/local/IMF_flume/IMF_Flume_Dir, metaDir=.flumespool, deserializer=LINE
2016-10-04 20:46:29,588 (lifecycleSupervisor-1-2) [DEBUG - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.(ReliableSpoolingFileEventReader.java:160)] Successfully created and deleted canary file: /usr/local/IMF_flume/IMF_Flume_Dir/flume-spooldir-perm-check-8298061474023286457.canary
2016-10-04 20:46:29,590 (lifecycleSupervisor-1-2) [DEBUG - org.apache.flume.source.SpoolDirectorySource.start(SpoolDirectorySource.java:111)] SpoolDirectorySource source started
2016-10-04 20:46:29,594 (lifecycleSupervisor-1-2) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:120)] Monitored counter group for type: SOURCE, name: source1: Successfully registered new MBean.
2016-10-04 20:46:29,594 (lifecycleSupervisor-1-2) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:96)] Component type: SOURCE, name: source1 started
2016-10-04 20:46:29,838 (lifecycleSupervisor-1-1) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Verifying properties
2016-10-04 20:46:29,871 (lifecycleSupervisor-1-1) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Property key.serializer.class is overridden to kafka.serializer.StringEncoder
2016-10-04 20:46:29,871 (lifecycleSupervisor-1-1) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Property metadata.broker.list is overridden to master:9092,worker1:9092,worker2:9092
2016-10-04 20:46:29,872 (lifecycleSupervisor-1-1) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Property request.required.acks is overridden to 1
2016-10-04 20:46:29,873 (lifecycleSupervisor-1-1) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Property serializer.class is overridden to kafka.serializer.DefaultEncoder
2016-10-04 20:46:29,971 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:120)] Monitored counter group for type: SINK, name: sink1: Successfully registered new MBean.
2016-10-04 20:46:29,971 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:96)] Component type: SINK, name: sink1 started
2016-10-04 20:46:29,973 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:143)] Polling sink runner starting
2016-10-04 20:46:59,351 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:126)] Checking file:flume.conf for changes
2016-10-04 20:47:29,354 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:126)] Checking file:flume.conf for changes
2016-10-04 20:47:44,786 (pool-3-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:258)] Last read took us just up to a file boundary. Rolling to the next file, if there is one.
2016-10-04 20:47:44,787 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:114)] {Event} IMFHelloKafka : null : hello flume hello kafka 
2016-10-04 20:47:44,788 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:116)] event #0
2016-10-04 20:47:44,790 (pool-3-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile(ReliableSpoolingFileEventReader.java:348)] Preparing to move file /usr/local/IMF_flume/IMF_Flume_Dir/helloflumekafka.txt to /usr/local/IMF_flume/IMF_Flume_Dir/helloflumekafka.txt.COMPLETED
2016-10-04 20:47:44,791 (pool-3-thread-1) [DEBUG - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile(ReliableSpoolingFileEventReader.java:384)] Successfully rolled file /usr/local/IMF_flume/IMF_Flume_Dir/helloflumekafka.txt to /usr/local/IMF_flume/IMF_Flume_Dir/helloflumekafka.txt.COMPLETED
2016-10-04 20:47:47,804 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - kafka.utils.Logging$class.debug(Logging.scala:52)] Handling 1 events
2016-10-04 20:47:47,901 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Fetching metadata from broker id:0,host:master,port:9092 with correlation id 0 for 1 topic(s) Set(IMFHelloKafka)
2016-10-04 20:47:48,234 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - kafka.utils.Logging$class.debug(Logging.scala:52)] Created socket with SO_TIMEOUT = 10000 (requested 10000), SO_RCVBUF = 186240 (requested -1), SO_SNDBUF = 102400 (requested 102400).
2016-10-04 20:47:48,235 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Connected to master:9092 for producing
2016-10-04 20:47:48,306 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Disconnecting from master:9092
2016-10-04 20:47:48,309 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - kafka.utils.Logging$class.debug(Logging.scala:52)] Successfully fetched metadata for 1 topic(s) Set(IMFHelloKafka)
2016-10-04 20:47:48,323 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - kafka.utils.Logging$class.debug(Logging.scala:52)] Getting broker partition info for topic IMFHelloKafka
2016-10-04 20:47:48,326 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - kafka.utils.Logging$class.debug(Logging.scala:52)] Partition [IMFHelloKafka,0] has leader 1
2016-10-04 20:47:48,329 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - kafka.utils.Logging$class.debug(Logging.scala:52)] Broker partitions registered for topic: IMFHelloKafka are 0
2016-10-04 20:47:48,343 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - kafka.utils.Logging$class.debug(Logging.scala:52)] Sending 1 messages with no compression to [IMFHelloKafka,0]
2016-10-04 20:47:48,372 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - kafka.utils.Logging$class.debug(Logging.scala:52)] Producer sending messages with correlation id 2 for topics [IMFHelloKafka,0] to broker 1 on worker1:9092
2016-10-04 20:47:48,387 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - kafka.utils.Logging$class.debug(Logging.scala:52)] Created socket with SO_TIMEOUT = 10000 (requested 10000), SO_RCVBUF = 186240 (requested -1), SO_SNDBUF = 102400 (requested 102400).
2016-10-04 20:47:48,387 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Connected to worker1:9092 for producing
2016-10-04 20:47:48,429 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - kafka.utils.Logging$class.debug(Logging.scala:52)] Producer sent messages with correlation id 2 for topics [IMFHelloKafka,0] to broker 1 on worker1:9092
2016-10-04 20:47:59,356 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:126)] Checking file:flume.conf for changes
2016-10-04 20:48:29,357 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:126)] Checking file:flume.conf for changes
2016-10-04 20:48:59,359 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:126)] Checking file:flume.conf for changes
2016-10-04 20:49:29,361 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:126)] Checking file:flume.conf for changes









 


12。测试验证 flume +kafka 测试运行圆满成功! 


测试结果 


第53课实战操作Kafka+Flume成功! Spark大型项目广告点击项目技术骨架实现之Spark+Kafka+Flume实战


第53课实战操作Kafka+Flume成功! Spark大型项目广告点击项目技术骨架实现之Spark+Kafka+Flume实战


第53课实战操作Kafka+Flume成功! Spark大型项目广告点击项目技术骨架实现之Spark+Kafka+Flume实战


第53课实战操作Kafka+Flume成功! Spark大型项目广告点击项目技术骨架实现之Spark+Kafka+Flume实战


第53课实战操作Kafka+Flume成功! Spark大型项目广告点击项目技术骨架实现之Spark+Kafka+Flume实战