Flume实战案例运维篇
作者:尹正杰
版权声明:原创作品,谢绝转载!否则将追究法律责任。
一.Flume概述
1>.什么是Flume
Flume是一个分布式、可靠、高可用的海量日志聚合系统,支持在系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接收方。
官方地址:http://flume.apache.org/。
2>.Flume特性
(1)高可靠性 Flume提供了end to end的数据可靠性机制
(2)易于扩展 Agent为分布式架构,可水平扩展
(3)易于恢复 Channel中保存了与数据源有关的事件,用于失败时的恢复
(4)功能丰富 Flume内置了多种组件,包括不同数据源和不同存储方式
3>.Flume常用组件
(1)Source: 数据源,简单的说就是agent获取数据的入口。 (2)Channel: 管道,数据流通和存储的通道。一个source必须至少和一个channel关联。 (3)Sink: 用来接收channel传输的数据并将之传送到指定的地方,成功后从channel中删除。
4>.Flume架构
二.部署Flume环境
1>.下载flume组件
[root@node101.yinzhengjie.org.cn ~]# yum -y install wget Loaded plugins: fastestmirror Determining fastest mirrors * base: mirrors.tuna.tsinghua.edu.cn * extras: mirrors.aliyun.com * updates: mirror.bit.edu.cn base | 3.6 kB 00:00:00 extras | 3.4 kB 00:00:00 updates | 3.4 kB 00:00:00 updates/7/x86_64/primary_db | 6.5 MB 00:00:02 Resolving Dependencies --> Running transaction check ---> Package wget.x86_64 0:1.14-18.el7_6.1 will be installed --> Finished Dependency Resolution Dependencies Resolved ============================================================================================================================================================================================================================================================================== Package Arch Version Repository Size ============================================================================================================================================================================================================================================================================== Installing: wget x86_64 1.14-18.el7_6.1 updates 547 k Transaction Summary ============================================================================================================================================================================================================================================================================== Install 1 Package Total download size: 547 k Installed size: 2.0 M Downloading packages: wget-1.14-18.el7_6.1.x86_64.rpm | 547 kB 00:00:00 Running transaction check Running transaction test Transaction test succeeded Running transaction Installing : wget-1.14-18.el7_6.1.x86_64 1/1 Verifying : wget-1.14-18.el7_6.1.x86_64 1/1 Installed: wget.x86_64 0:1.14-18.el7_6.1 Complete! [root@node105.yinzhengjie.org.cn ~]#
[root@node101.yinzhengjie.org.cn ~]# wget http://mirrors.tuna.tsinghua.edu.cn/apache/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz --2019-07-19 14:29:35-- http://mirrors.tuna.tsinghua.edu.cn/apache/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz Resolving mirrors.tuna.tsinghua.edu.cn (mirrors.tuna.tsinghua.edu.cn)... 101.6.8.193, 2402:f000:1:408:8100::1 Connecting to mirrors.tuna.tsinghua.edu.cn (mirrors.tuna.tsinghua.edu.cn)|101.6.8.193|:80... connected. HTTP request sent, awaiting response... 302 Found Location: http://103.238.48.8/mirrors.tuna.tsinghua.edu.cn/apache/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz [following] --2019-07-19 14:29:35-- http://103.238.48.8/mirrors.tuna.tsinghua.edu.cn/apache/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz Connecting to 103.238.48.8:80... connected. HTTP request sent, awaiting response... 200 OK Length: 67938106 (65M) [application/x-gzip] Saving to: ‘apache-flume-1.9.0-bin.tar.gz’ 100%[====================================================================================================================================================================================================================================>] 67,938,106 2.87MB/s in 22s 2019-07-19 14:29:57 (2.95 MB/s) - ‘apache-flume-1.9.0-bin.tar.gz’ saved [67938106/67938106] [root@node105.yinzhengjie.org.cn ~]#
2>.解压flume
[root@node105.yinzhengjie.org.cn ~]# ll total 66348 -rw-r--r-- 1 root root 67938106 Jan 2 2019 apache-flume-1.9.0-bin.tar.gz [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# tar -zxf apache-flume-1.9.0-bin.tar.gz -C /home/softwares/ [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# ll /home/softwares/apache-flume-1.9.0-bin/ total 168 drwxr-xr-x 2 mysql mysql 62 Jul 19 14:31 bin -rw-rw-r-- 1 mysql mysql 85602 Nov 29 2018 CHANGELOG drwxr-xr-x 2 mysql mysql 127 Jul 19 14:31 conf -rw-r--r-- 1 mysql mysql 5681 Nov 16 2017 DEVNOTES -rw-r--r-- 1 mysql mysql 2873 Nov 16 2017 doap_Flume.rdf drwxrwxr-x 12 mysql mysql 4096 Dec 18 2018 docs drwxr-xr-x 2 root root 8192 Jul 19 14:31 lib -rw-rw-r-- 1 mysql mysql 43405 Dec 10 2018 LICENSE -rw-r--r-- 1 mysql mysql 249 Nov 29 2018 NOTICE -rw-r--r-- 1 mysql mysql 2483 Nov 16 2017 README.md -rw-rw-r-- 1 mysql mysql 1958 Dec 10 2018 RELEASE-NOTES drwxr-xr-x 2 root root 68 Jul 19 14:31 tools [root@node105.yinzhengjie.org.cn ~]#
3>.配置flume的环境变量
[root@node105.yinzhengjie.org.cn ~]# vi /etc/profile [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# tail -3 /etc/profile #Add by yinzhengjie FLUME_HOME=/home/softwares/apache-flume-1.9.0-bin PATH=$PATH:$FLUME_HOME/bin [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# source /etc/profile [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# flume-ng version Flume 1.9.0 Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git Revision: d4fcab4f501d41597bc616921329a4339f73585e Compiled by fszabo on Mon Dec 17 20:45:25 CET 2018 From source with checksum 35db629a3bda49d23e9b3690c80737f9 [root@node105.yinzhengjie.org.cn ~]#
4>.自定义flume的配置文件存放目录
[root@node105.yinzhengjie.org.cn ~]# mkdir -pv /home/data/flume/{log,job,shell} mkdir: created directory ‘/home/data’ mkdir: created directory ‘/home/data/flume’ mkdir: created directory ‘/home/data/flume/log’ mkdir: created directory ‘/home/data/flume/job’ mkdir: created directory ‘/home/data/flume/shell’ [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# ll /home/data/flume/ total 0 drwxr-xr-x 2 root root 6 Jul 19 14:42 job #用于存放flume启动的agent端的配置文件 drwxr-xr-x 2 root root 6 Jul 19 14:42 log #用于存放日志文件 drwxr-xr-x 2 root root 6 Jul 19 14:42 shell #用于存放启动脚本 [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]#
三.Flume案例
1>.监控端口数据(netcat source-memory channel-logger sink)
[root@node105.yinzhengjie.org.cn ~]# yum -y install telnet net-tools Loaded plugins: fastestmirror Determining fastest mirrors * base: mirror.bit.edu.cn * extras: mirrors.aliyun.com * updates: mirrors.aliyun.com base | 3.6 kB 00:00:00 extras | 3.4 kB 00:00:00 updates | 3.4 kB 00:00:00 updates/7/x86_64/primary_db | 6.5 MB 00:00:02 Package net-tools-2.0-0.24.20131004git.el7.x86_64 already installed and latest version Resolving Dependencies --> Running transaction check ---> Package telnet.x86_64 1:0.17-64.el7 will be installed --> Finished Dependency Resolution Dependencies Resolved ============================================================================================================================================================================================================================================================================== Package Arch Version Repository Size ============================================================================================================================================================================================================================================================================== Installing: telnet x86_64 1:0.17-64.el7 base 64 k Transaction Summary ============================================================================================================================================================================================================================================================================== Install 1 Package Total download size: 64 k Installed size: 113 k Downloading packages: telnet-0.17-64.el7.x86_64.rpm | 64 kB 00:00:04 Running transaction check Running transaction test Transaction test succeeded Running transaction Installing : 1:telnet-0.17-64.el7.x86_64 1/1 Verifying : 1:telnet-0.17-64.el7.x86_64 1/1 Installed: telnet.x86_64 1:0.17-64.el7 Complete! [root@node105.yinzhengjie.org.cn ~]#
[root@node105.yinzhengjie.org.cn ~]# cat /home/data/flume/job/flume-netcat.conf # 这里的“yinzhengjie”是agent的名称,它是我们自定义的。我们分别给“yinzhengjie”的sources,sinks,channels的别名分别为r1,k1和c1 yinzhengjie.sources = r1 yinzhengjie.sinks = k1 yinzhengjie.channels = c1 yinzhengjie.sources.r1.type = netcat yinzhengjie.sources.r1.bind = node105.yinzhengjie.org.cn yinzhengjie.sources.r1.port = 8888 # 指定sink的类型,我们这里指定的为logger,即控制台输出。 yinzhengjie.sinks.k1.type = logger # 指定channel的类型为memory,指定channel的容量是1000,每次传输的容量是100 yinzhengjie.channels.c1.type = memory yinzhengjie.channels.c1.capacity = 1000 yinzhengjie.channels.c1.transactionCapacity = 100 # 绑定source和sink yinzhengjie.sources.r1.channels = c1 yinzhengjie.sinks.k1.channel = c1 [root@node105.yinzhengjie.org.cn ~]#
[root@node105.yinzhengjie.org.cn ~]# flume-ng agent --conf /home/softwares/apache-flume-1.9.0-bin/conf --name yinzhengjie --conf-file /home/data/flume/job/flume-netcat.conf -Dflume.monitoring.type=http -Dflume.monitoring.port=10501 -Dflume.root.logger==INFO,console Warning: JAVA_HOME is not set! Info: Including Hive libraries found via () for Hive access + exec /home/softwares/jdk1.8.0_201/bin/java -Xmx20m -Dflume.monitoring.type=http -Dflume.monitoring.port=10501 -Dflume.root.logger==INFO,console -cp '/home/softwares/apache-flume-1.9.0-bin/conf:/home/softwares/apache-flume-1.9.0-bin/lib/*:/lib/*' -Djava.library.path= o rg.apache.flume.node.Application --name yinzhengjie --conf-file /home/data/flume/job/flume-netcat.conf2019-07-19 15:07:00,130 (main) [DEBUG - org.apache.flume.util.SSLUtil.initSysPropFromEnvVar(SSLUtil.java:95)] No global SSL keystore path specified. 2019-07-19 15:07:00,136 (main) [DEBUG - org.apache.flume.util.SSLUtil.initSysPropFromEnvVar(SSLUtil.java:95)] No global SSL keystore password specified. 2019-07-19 15:07:00,136 (main) [DEBUG - org.apache.flume.util.SSLUtil.initSysPropFromEnvVar(SSLUtil.java:95)] No global SSL keystore type specified. 2019-07-19 15:07:00,136 (main) [DEBUG - org.apache.flume.util.SSLUtil.initSysPropFromEnvVar(SSLUtil.java:95)] No global SSL truststore path specified. 2019-07-19 15:07:00,154 (main) [DEBUG - org.apache.flume.util.SSLUtil.initSysPropFromEnvVar(SSLUtil.java:95)] No global SSL truststore password specified. 2019-07-19 15:07:00,154 (main) [DEBUG - org.apache.flume.util.SSLUtil.initSysPropFromEnvVar(SSLUtil.java:95)] No global SSL truststore type specified. 2019-07-19 15:07:00,154 (main) [DEBUG - org.apache.flume.util.SSLUtil.initSysPropFromEnvVar(SSLUtil.java:95)] No global SSL include protocols specified. 2019-07-19 15:07:00,154 (main) [DEBUG - org.apache.flume.util.SSLUtil.initSysPropFromEnvVar(SSLUtil.java:95)] No global SSL exclude protocols specified. 2019-07-19 15:07:00,154 (main) [DEBUG - org.apache.flume.util.SSLUtil.initSysPropFromEnvVar(SSLUtil.java:95)] No global SSL include cipher suites specified. 2019-07-19 15:07:00,154 (main) [DEBUG - org.apache.flume.util.SSLUtil.initSysPropFromEnvVar(SSLUtil.java:95)] No global SSL exclude cipher suites specified. 2019-07-19 15:07:00,265 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider.start(PollingPropertiesFileConfigurationProvider.java:62)] Configuration provider starting 2019-07-19 15:07:00,272 (lifecycleSupervisor-1-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider.start(PollingPropertiesFileConfigurationProvider.java:79)] Configuration provider started 2019-07-19 15:07:00,274 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:131)] Checking file:/home/data/flume/job/flume-netcat.conf for changes 2019-07-19 15:07:00,274 (conf-file-poller-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:138)] Reloading configuration file:/home/data/flume/job/flume-netcat.conf 2019-07-19 15:07:00,278 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addComponentConfig(FlumeConfiguration.java:1203)] Processing:r1 2019-07-19 15:07:00,279 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addComponentConfig(FlumeConfiguration.java:1207)] Created context for r1: type 2019-07-19 15:07:00,282 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addComponentConfig(FlumeConfiguration.java:1203)] Processing:c1 2019-07-19 15:07:00,283 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addComponentConfig(FlumeConfiguration.java:1207)] Created context for c1: type 2019-07-19 15:07:00,283 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1117)] Added sinks: k1 Agent: yinzhengjie 2019-07-19 15:07:00,283 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addComponentConfig(FlumeConfiguration.java:1203)] Processing:c1 2019-07-19 15:07:00,283 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addComponentConfig(FlumeConfiguration.java:1203)] Processing:r1 2019-07-19 15:07:00,283 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addComponentConfig(FlumeConfiguration.java:1203)] Processing:k1 2019-07-19 15:07:00,293 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addComponentConfig(FlumeConfiguration.java:1207)] Created context for k1: channel 2019-07-19 15:07:00,294 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addComponentConfig(FlumeConfiguration.java:1203)] Processing:r1 2019-07-19 15:07:00,294 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addComponentConfig(FlumeConfiguration.java:1203)] Processing:r1 2019-07-19 15:07:00,294 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addComponentConfig(FlumeConfiguration.java:1203)] Processing:k1 2019-07-19 15:07:00,294 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addComponentConfig(FlumeConfiguration.java:1203)] Processing:c1 2019-07-19 15:07:00,294 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.isValid(FlumeConfiguration.java:350)] Starting validation of configuration for agent: yinzhengjie 2019-07-19 15:07:00,295 (conf-file-poller-0) [INFO - org.apache.flume.conf.LogPrivacyUtil.<clinit>(LogPrivacyUtil.java:51)] Logging of configuration details is disabled. To see configuration details in the log run the agent with -Dorg.apache.flume.log.printconfig=true J VM argument. Please note that this is not recommended in production systems as it may leak private information to the logfile.2019-07-19 15:07:00,295 (conf-file-poller-0) [WARN - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateConfigFilterSet(FlumeConfiguration.java:623)] Agent configuration for 'yinzhengjie' has no configfilters. 2019-07-19 15:07:00,310 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateChannels(FlumeConfiguration.java:583)] Created channel c1 2019-07-19 15:07:00,314 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateSinks(FlumeConfiguration.java:861)] Creating sink: k1 using LOGGER 2019-07-19 15:07:00,315 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:158)] Channels:c1 2019-07-19 15:07:00,315 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:159)] Sinks k1 2019-07-19 15:07:00,315 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:160)] Sources r1 2019-07-19 15:07:00,316 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:163)] Post-validation flume configuration contains configuration for agents: [yinzhengjie] 2019-07-19 15:07:00,316 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:151)] Creating channels 2019-07-19 15:07:00,340 (conf-file-poller-0) [INFO - org.apache.flume.channel.DefaultChannelFactory.create(DefaultChannelFactory.java:42)] Creating instance of channel c1 type memory 2019-07-19 15:07:00,343 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:205)] Created channel c1 2019-07-19 15:07:00,344 (conf-file-poller-0) [INFO - org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory.java:41)] Creating instance of source r1, type netcat 2019-07-19 15:07:00,354 (conf-file-poller-0) [INFO - org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:42)] Creating instance of sink: k1, type: logger 2019-07-19 15:07:00,358 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:120)] Channel c1 connected to [r1, k1] 2019-07-19 15:07:00,379 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:162)] Starting new configuration:{ sourceRunners:{r1=EventDrivenSourceRunner: { source:org.apache.flume.source.NetcatSource{name:r1,state:IDLE} }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@3344b1b counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} }2019-07-19 15:07:00,383 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:169)] Starting Channel c1 2019-07-19 15:07:00,386 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:184)] Waiting for channel: c1 to start. Sleeping for 500 ms 2019-07-19 15:07:00,470 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean. 2019-07-19 15:07:00,471 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: CHANNEL, name: c1 started 2019-07-19 15:07:00,887 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:196)] Starting Sink k1 2019-07-19 15:07:00,890 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:207)] Starting Source r1 2019-07-19 15:07:00,893 (lifecycleSupervisor-1-4) [INFO - org.apache.flume.source.NetcatSource.start(NetcatSource.java:155)] Source starting 2019-07-19 15:07:00,958 (conf-file-poller-0) [DEBUG - org.eclipse.jetty.util.log.Log.initialized(Log.java:180)] Logging to org.slf4j.impl.Log4jLoggerAdapter(org.eclipse.jetty.util.log) via org.eclipse.jetty.util.log.Slf4jLog 2019-07-19 15:07:00,959 (conf-file-poller-0) [INFO - org.eclipse.jetty.util.log.Log.initialized(Log.java:192)] Logging initialized @1169ms to org.eclipse.jetty.util.log.Slf4jLog 2019-07-19 15:07:00,964 (conf-file-poller-0) [DEBUG - org.eclipse.jetty.util.component.ContainerLifeCycle.addBean(ContainerLifeCycle.java:322)] org.eclipse.jetty.server.Server@346a3eed added {qtp1818551798{STOPPED,8<=0<=200,i=0,q=0},AUTO} 2019-07-19 15:07:00,967 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:141)] Polling sink runner starting 2019-07-19 15:07:00,982 (lifecycleSupervisor-1-4) [INFO - org.apache.flume.source.NetcatSource.start(NetcatSource.java:166)] Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/172.30.1.105:8888] 2019-07-19 15:07:00,985 (lifecycleSupervisor-1-4) [DEBUG - org.apache.flume.source.NetcatSource.start(NetcatSource.java:191)] Source started 2019-07-19 15:07:00,986 (Thread-2) [DEBUG - org.apache.flume.source.NetcatSource$AcceptHandler.run(NetcatSource.java:271)] Starting accept handler 2019-07-19 15:07:01,025 (conf-file-poller-0) [DEBUG - org.eclipse.jetty.util.component.ContainerLifeCycle.addBean(ContainerLifeCycle.java:322)] HttpConnectionFactory@5ac8a68a[HTTP/1.1] added {HttpConfiguration@7454628a{32768/8192,8192/8192,https://:0,[]},POJO} 2019-07-19 15:07:01,028 (conf-file-poller-0) [DEBUG - org.eclipse.jetty.util.component.ContainerLifeCycle.addBean(ContainerLifeCycle.java:322)] ServerConnector@65639034{null,[]}{0.0.0.0:0} added {org.eclipse.jetty.server.Server@346a3eed,UNMANAGED} 2019-07-19 15:07:01,029 (conf-file-poller-0) [DEBUG - org.eclipse.jetty.util.component.ContainerLifeCycle.addBean(ContainerLifeCycle.java:322)] ServerConnector@65639034{null,[]}{0.0.0.0:0} added {qtp1818551798{STOPPED,8<=0<=200,i=0,q=0},AUTO} 2019-07-19 15:07:01,030 (conf-file-poller-0) [DEBUG - org.eclipse.jetty.util.component.ContainerLifeCycle.addBean(ContainerLifeCycle.java:322)] ServerConnector@65639034{null,[]}{0.0.0.0:0} added {org.eclipse.jetty.util.thread.ScheduledExecutorScheduler@1da3d1e8,AUTO} 2019-07-19 15:07:01,030 (conf-file-poller-0) [DEBUG - org.eclipse.jetty.util.component.ContainerLifeCycle.addBean(ContainerLifeCycle.java:322)] ServerConnector@65639034{null,[]}{0.0.0.0:0} added {org.eclipse.jetty.io.ArrayByteBufferPool@4fa9a485,POJO} 2019-07-19 15:07:01,031 (conf-file-poller-0) [DEBUG - org.eclipse.jetty.util.component.ContainerLifeCycle.addBean(ContainerLifeCycle.java:322)] ServerConnector@65639034{null,[http/1.1]}{0.0.0.0:0} added {HttpConnectionFactory@5ac8a68a[HTTP/1.1],AUTO} 2019-07-19 15:07:01,032 (conf-file-poller-0) [DEBUG - org.eclipse.jetty.server.AbstractConnector.addConnectionFactory(AbstractConnector.java:406)] ServerConnector@65639034{HTTP/1.1,[http/1.1]}{0.0.0.0:0} added HttpConnectionFactory@5ac8a68a[HTTP/1.1] 2019-07-19 15:07:01,033 (conf-file-poller-0) [DEBUG - org.eclipse.jetty.util.component.ContainerLifeCycle.addBean(ContainerLifeCycle.java:322)] ServerConnector@65639034{HTTP/1.1,[http/1.1]}{0.0.0.0:0} added {org.eclipse.jetty.server.ServerConnector$ServerConnectorManage r@4fc180ce,MANAGED}2019-07-19 15:07:01,046 (conf-file-poller-0) [DEBUG - org.eclipse.jetty.util.component.ContainerLifeCycle.addBean(ContainerLifeCycle.java:322)] org.eclipse.jetty.server.Server@346a3eed added {ServerConnector@65639034{HTTP/1.1,[http/1.1]}{0.0.0.0:10501},AUTO} 2019-07-19 15:07:01,101 (conf-file-poller-0) [DEBUG - org.eclipse.jetty.util.component.ContainerLifeCycle.addBean(ContainerLifeCycle.java:322)] org.eclipse.jetty.server.Server@346a3eed added {org.apache.flume.instrumentation.http.HTTPMetricsServer$HTTPMetricsHandler@52e 879b7,MANAGED}2019-07-19 15:07:01,101 (conf-file-poller-0) [DEBUG - org.eclipse.jetty.util.component.AbstractLifeCycle.setStarting(AbstractLifeCycle.java:185)] starting org.eclipse.jetty.server.Server@346a3eed 2019-07-19 15:07:01,102 (conf-file-poller-0) [DEBUG - org.eclipse.jetty.util.component.ContainerLifeCycle.addBean(ContainerLifeCycle.java:322)] org.eclipse.jetty.server.Server@346a3eed added {org.eclipse.jetty.server.handler.ErrorHandler@4ed57293,AUTO} 2019-07-19 15:07:01,103 (conf-file-poller-0) [INFO - org.eclipse.jetty.server.Server.doStart(Server.java:372)] jetty-9.4.6.v20170531 2019-07-19 15:07:01,142 (conf-file-poller-0) [DEBUG - org.eclipse.jetty.server.handler.AbstractHandler.doStart(AbstractHandler.java:110)] starting org.eclipse.jetty.server.Server@346a3eed 2019-07-19 15:07:01,143 (conf-file-poller-0) [DEBUG - org.eclipse.jetty.util.component.AbstractLifeCycle.setStarting(AbstractLifeCycle.java:185)] starting qtp1818551798{STOPPED,8<=0<=200,i=0,q=0} 2019-07-19 15:07:01,170 (conf-file-poller-0) [DEBUG - org.eclipse.jetty.util.component.AbstractLifeCycle.setStarted(AbstractLifeCycle.java:177)] STARTED @1380ms qtp1818551798{STARTED,8<=8<=200,i=7,q=0} 2019-07-19 15:07:01,171 (conf-file-poller-0) [DEBUG - org.eclipse.jetty.util.component.AbstractLifeCycle.setStarting(AbstractLifeCycle.java:185)] starting org.apache.flume.instrumentation.http.HTTPMetricsServer$HTTPMetricsHandler@52e879b7 2019-07-19 15:07:01,171 (conf-file-poller-0) [DEBUG - org.eclipse.jetty.server.handler.AbstractHandler.doStart(AbstractHandler.java:110)] starting org.apache.flume.instrumentation.http.HTTPMetricsServer$HTTPMetricsHandler@52e879b7 2019-07-19 15:07:01,171 (conf-file-poller-0) [DEBUG - org.eclipse.jetty.util.component.AbstractLifeCycle.setStarted(AbstractLifeCycle.java:177)] STARTED @1381ms org.apache.flume.instrumentation.http.HTTPMetricsServer$HTTPMetricsHandler@52e879b7 2019-07-19 15:07:01,171 (conf-file-poller-0) [DEBUG - org.eclipse.jetty.util.component.AbstractLifeCycle.setStarting(AbstractLifeCycle.java:185)] starting org.eclipse.jetty.server.handler.ErrorHandler@4ed57293 2019-07-19 15:07:01,172 (conf-file-poller-0) [DEBUG - org.eclipse.jetty.server.handler.AbstractHandler.doStart(AbstractHandler.java:110)] starting org.eclipse.jetty.server.handler.ErrorHandler@4ed57293 2019-07-19 15:07:01,172 (conf-file-poller-0) [DEBUG - org.eclipse.jetty.util.component.AbstractLifeCycle.setStarted(AbstractLifeCycle.java:177)] STARTED @1382ms org.eclipse.jetty.server.handler.ErrorHandler@4ed57293 2019-07-19 15:07:01,172 (conf-file-poller-0) [DEBUG - org.eclipse.jetty.util.component.AbstractLifeCycle.setStarting(AbstractLifeCycle.java:185)] starting ServerConnector@65639034{HTTP/1.1,[http/1.1]}{0.0.0.0:10501} 2019-07-19 15:07:01,173 (conf-file-poller-0) [DEBUG - org.eclipse.jetty.util.component.ContainerLifeCycle.addBean(ContainerLifeCycle.java:322)] ServerConnector@65639034{HTTP/1.1,[http/1.1]}{0.0.0.0:10501} added {sun.nio.ch.ServerSocketChannelImpl[/0.0.0.0:10501],POJO} 2019-07-19 15:07:01,173 (conf-file-poller-0) [DEBUG - org.eclipse.jetty.util.component.AbstractLifeCycle.setStarting(AbstractLifeCycle.java:185)] starting org.eclipse.jetty.util.thread.ScheduledExecutorScheduler@1da3d1e8 2019-07-19 15:07:01,174 (conf-file-poller-0) [DEBUG - org.eclipse.jetty.util.component.AbstractLifeCycle.setStarted(AbstractLifeCycle.java:177)] STARTED @1384ms org.eclipse.jetty.util.thread.ScheduledExecutorScheduler@1da3d1e8 2019-07-19 15:07:01,174 (conf-file-poller-0) [DEBUG - org.eclipse.jetty.util.component.AbstractLifeCycle.setStarting(AbstractLifeCycle.java:185)] starting HttpConnectionFactory@5ac8a68a[HTTP/1.1] 2019-07-19 15:07:01,174 (conf-file-poller-0) [DEBUG - org.eclipse.jetty.util.component.AbstractLifeCycle.setStarted(AbstractLifeCycle.java:177)] STARTED @1384ms HttpConnectionFactory@5ac8a68a[HTTP/1.1] 2019-07-19 15:07:01,175 (conf-file-poller-0) [DEBUG - org.eclipse.jetty.util.component.AbstractLifeCycle.setStarting(AbstractLifeCycle.java:185)] starting org.eclipse.jetty.server.ServerConnector$ServerConnectorManager@4fc180ce 2019-07-19 15:07:01,178 (conf-file-poller-0) [DEBUG - org.eclipse.jetty.util.component.ContainerLifeCycle.addBean(ContainerLifeCycle.java:322)] org.eclipse.jetty.io.ManagedSelector@1fe6c0aa id=0 keys=-1 selected=-1 added {EatWhatYouKill@41905dc0/org.eclipse.jetty.io.Man agedSelector$SelectorProducer@7dcfdb71/IDLE/0/1,AUTO}2019-07-19 15:07:01,178 (conf-file-poller-0) [DEBUG - org.eclipse.jetty.util.component.ContainerLifeCycle.addBean(ContainerLifeCycle.java:322)] org.eclipse.jetty.server.ServerConnector$ServerConnectorManager@4fc180ce added {org.eclipse.jetty.io.ManagedSelector@1fe6c0aa id=0 keys=-1 selected=-1,AUTO}2019-07-19 15:07:01,179 (conf-file-poller-0) [DEBUG - org.eclipse.jetty.util.component.AbstractLifeCycle.setStarting(AbstractLifeCycle.java:185)] starting org.eclipse.jetty.io.ManagedSelector@1fe6c0aa id=0 keys=-1 selected=-1 2019-07-19 15:07:01,179 (conf-file-poller-0) [DEBUG - org.eclipse.jetty.util.component.AbstractLifeCycle.setStarting(AbstractLifeCycle.java:185)] starting EatWhatYouKill@41905dc0/org.eclipse.jetty.io.ManagedSelector$SelectorProducer@7dcfdb71/IDLE/0/1 2019-07-19 15:07:01,179 (conf-file-poller-0) [DEBUG - org.eclipse.jetty.util.component.AbstractLifeCycle.setStarted(AbstractLifeCycle.java:177)] STARTED @1389ms EatWhatYouKill@41905dc0/org.eclipse.jetty.io.ManagedSelector$SelectorProducer@7dcfdb71/IDLE/0/1 2019-07-19 15:07:01,266 (conf-file-poller-0) [DEBUG - org.eclipse.jetty.util.thread.QueuedThreadPool.execute(QueuedThreadPool.java:381)] queue org.eclipse.jetty.io.ManagedSelector$$Lambda$1/1484530269@6d147c43 2019-07-19 15:07:01,267 (conf-file-poller-0) [DEBUG - org.eclipse.jetty.util.component.AbstractLifeCycle.setStarted(AbstractLifeCycle.java:177)] STARTED @1477ms org.eclipse.jetty.io.ManagedSelector@1fe6c0aa id=0 keys=0 selected=0 2019-07-19 15:07:01,267 (conf-file-poller-0) [DEBUG - org.eclipse.jetty.util.component.AbstractLifeCycle.setStarted(AbstractLifeCycle.java:177)] STARTED @1477ms org.eclipse.jetty.server.ServerConnector$ServerConnectorManager@4fc180ce 2019-07-19 15:07:01,268 (conf-file-poller-0) [DEBUG - org.eclipse.jetty.util.component.ContainerLifeCycle.addBean(ContainerLifeCycle.java:322)] ServerConnector@65639034{HTTP/1.1,[http/1.1]}{0.0.0.0:10501} added {acceptor-0@2a11399b,POJO} 2019-07-19 15:07:01,268 (conf-file-poller-0) [DEBUG - org.eclipse.jetty.util.thread.QueuedThreadPool.execute(QueuedThreadPool.java:381)] queue acceptor-0@2a11399b 2019-07-19 15:07:01,268 (conf-file-poller-0) [INFO - org.eclipse.jetty.server.AbstractConnector.doStart(AbstractConnector.java:280)] Started ServerConnector@65639034{HTTP/1.1,[http/1.1]}{0.0.0.0:10501} 2019-07-19 15:07:01,269 (conf-file-poller-0) [DEBUG - org.eclipse.jetty.util.component.AbstractLifeCycle.setStarted(AbstractLifeCycle.java:177)] STARTED @1479ms ServerConnector@65639034{HTTP/1.1,[http/1.1]}{0.0.0.0:10501} 2019-07-19 15:07:01,269 (conf-file-poller-0) [INFO - org.eclipse.jetty.server.Server.doStart(Server.java:444)] Started @1479ms 2019-07-19 15:07:01,269 (conf-file-poller-0) [DEBUG - org.eclipse.jetty.util.component.AbstractLifeCycle.setStarted(AbstractLifeCycle.java:177)] STARTED @1479ms org.eclipse.jetty.server.Server@346a3eed 2019-07-19 15:07:01,269 (qtp1818551798-22) [DEBUG - org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:590)] run org.eclipse.jetty.io.ManagedSelector$$Lambda$1/1484530269@6d147c43 2019-07-19 15:07:01,270 (qtp1818551798-22) [DEBUG - org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.produce(EatWhatYouKill.java:123)] EatWhatYouKill@41905dc0/org.eclipse.jetty.io.ManagedSelector$SelectorProducer@7dcfdb71/PRODUCING/0/1 execute true 2019-07-19 15:07:01,270 (qtp1818551798-22) [DEBUG - org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:206)] EatWhatYouKill@41905dc0/org.eclipse.jetty.io.ManagedSelector$SelectorProducer@7dcfdb71/PRODUCING/0/1 produce non-blocking 2019-07-19 15:07:01,270 (qtp1818551798-22) [DEBUG - org.eclipse.jetty.io.ManagedSelector$SelectorProducer.select(ManagedSelector.java:233)] Selector loop waiting on select 2019-07-19 15:07:01,272 (qtp1818551798-23) [DEBUG - org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:590)] run acceptor-0@2a11399b 2019-07-19 15:07:31,271 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:131)] Checking file:/home/data/flume/job/flume-netcat.conf for changes 2019-07-19 15:07:47,034 (netcat-handler-0) [DEBUG - org.apache.flume.source.NetcatSource$NetcatSocketHandler.run(NetcatSource.java:316)] Starting connection handler 2019-07-19 15:07:58,158 (netcat-handler-0) [DEBUG - org.apache.flume.source.NetcatSource$NetcatSocketHandler.run(NetcatSource.java:328)] Chars read = 10 2019-07-19 15:07:58,174 (netcat-handler-0) [DEBUG - org.apache.flume.source.NetcatSource$NetcatSocketHandler.run(NetcatSource.java:332)] Events processed = 1 2019-07-19 15:08:01,273 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:131)] Checking file:/home/data/flume/job/flume-netcat.conf for changes 2019-07-19 15:08:02,988 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: E5 B0 B9 E6 AD A3 E6 9D B0 E5 88 B0 E6 AD A4 E4 ................ } 2019-07-19 15:08:31,274 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:131)] Checking file:/home/data/flume/job/flume-netcat.conf for changes 2019-07-19 15:08:33,975 (netcat-handler-0) [DEBUG - org.apache.flume.source.NetcatSource$NetcatSocketHandler.run(NetcatSource.java:328)] Chars read = 29 2019-07-19 15:08:33,976 (netcat-handler-0) [DEBUG - org.apache.flume.source.NetcatSource$NetcatSocketHandler.run(NetcatSource.java:332)] Events processed = 1 2019-07-19 15:08:33,976 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 79 69 6E 7A 68 65 6E 67 6A 69 65 20 64 61 6F 20 yinzhengjie dao } 2019-07-19 15:08:51,938 (netcat-handler-0) [DEBUG - org.apache.flume.source.NetcatSource$NetcatSocketHandler.run(NetcatSource.java:328)] Chars read = 8 2019-07-19 15:08:51,938 (netcat-handler-0) [DEBUG - org.apache.flume.source.NetcatSource$NetcatSocketHandler.run(NetcatSource.java:332)] Events processed = 1 2019-07-19 15:08:51,938 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 67 6F 6C 61 6E 67 0D golang. } 2019-07-19 15:08:54,481 (netcat-handler-0) [DEBUG - org.apache.flume.source.NetcatSource$NetcatSocketHandler.run(NetcatSource.java:328)] Chars read = 8 2019-07-19 15:08:54,481 (netcat-handler-0) [DEBUG - org.apache.flume.source.NetcatSource$NetcatSocketHandler.run(NetcatSource.java:332)] Events processed = 1 2019-07-19 15:08:54,481 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 70 79 74 68 6F 6E 0D python. } 2019-07-19 15:08:56,285 (netcat-handler-0) [DEBUG - org.apache.flume.source.NetcatSource$NetcatSocketHandler.run(NetcatSource.java:328)] Chars read = 6 2019-07-19 15:08:56,285 (netcat-handler-0) [DEBUG - org.apache.flume.source.NetcatSource$NetcatSocketHandler.run(NetcatSource.java:332)] Events processed = 1 2019-07-19 15:08:56,285 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 6A 61 76 61 0D java. } 2019-07-19 15:09:01,277 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:131)] Checking file:/home/data/flume/job/flume-netcat.conf for changes
[root@node105.yinzhengjie.org.cn ~]# ss -ntl State Recv-Q Send-Q Local Address:Port Peer Address:Port LISTEN 0 128 *:22 *:* LISTEN 0 50 172.30.1.105:8888 *:* LISTEN 0 50 *:10501 *:* LISTEN 0 128 :::22 :::* [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# telnet node105.yinzhengjie.org.cn 8888 Trying 172.30.1.105... Connected to node105.yinzhengjie.org.cn. Escape character is '^]'. 尹正杰到此一游! OK yinzhengjie dao ci yi you ! OK golang OK python OK java OK
[root@node105.yinzhengjie.org.cn ~]# vi /home/data/flume/shell/start-netcat.sh [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# cat /home/data/flume/shell/start-netcat.sh #!/bin/bash #@author :yinzhengjie #blog:http://www.cnblogs.com/yinzhengjie #EMAIL:y1053419035@qq.com #Data:Thu Oct 18 11:26:06 CST 2018 #将监控数据发送给ganglia,需要指定ganglia服务器地址,使用请确认是否部署好ganglia服务! #nohup flume-ng agent -c /home/data/flume/job/ --conf-file=/home/data/flume/job/flume-netcat.conf --name yinzhengjie -Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=node105.yinzhengjie.org.cn:8649 -Dflume.root.logger=INFO,console >> /home/data/flume/log/flume-g anglia-flume-netcat.log 2>&1 & #启动flume自身的监控参数,默认执行以下脚本 nohup flume-ng agent -c /home/softwares/apache-flume-1.9.0-bin/conf --conf-file=/home/data/flume/job/flume-netcat.conf --name yinzhengjie -Dflume.monitoring.type=http -Dflume.monitoring.port=10501 -Dflume.root.logger=INFO,console >> /home/data/flume/log/flume-netcat.l og 2>&1 &[root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# chmod +x /home/data/flume/shell/start-netcat.sh [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# ll /home/data/flume/shell/start-netcat.sh -rwxr-xr-x 1 root root 902 Jul 19 15:15 /home/data/flume/shell/start-netcat.sh [root@node105.yinzhengjie.org.cn ~]#
[root@node105.yinzhengjie.org.cn ~]# ss -ntl State Recv-Q Send-Q Local Address:Port Peer Address:Port LISTEN 0 128 *:22 *:* LISTEN 0 128 :::22 :::* [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# /home/data/flume/shell/start-netcat.sh [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# ss -ntl State Recv-Q Send-Q Local Address:Port Peer Address:Port LISTEN 0 128 *:22 *:* LISTEN 0 50 172.30.1.105:8888 *:* LISTEN 0 50 *:10501 *:* LISTEN 0 128 :::22 :::* [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# tail -100f /home/data/flume/log/flume-netcat.log Warning: JAVA_HOME is not set! Info: Including Hive libraries found via () for Hive access + exec /home/softwares/jdk1.8.0_201/bin/java -Xmx20m -Dflume.monitoring.type=http -Dflume.monitoring.port=10501 -Dflume.root.logger=INFO,console -cp '/home/data/flume/job:/home/softwares/apache-flume-1.9.0-bin/lib/*:/lib/*' -Djava.library.path= org.apache.flume.node.App lication --conf-file=/home/data/flume/job/flume-netcat.conf --name yinzhengjielog4j:WARN No appenders could be found for logger (org.apache.flume.util.SSLUtil). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. Warning: JAVA_HOME is not set! Info: Including Hive libraries found via () for Hive access + exec /home/softwares/jdk1.8.0_201/bin/java -Xmx20m -Dflume.monitoring.type=http -Dflume.monitoring.port=10501 -Dflume.root.logger=INFO,console -cp '/home/softwares/apache-flume-1.9.0-bin/conf:/home/softwares/apache-flume-1.9.0-bin/lib/*:/lib/*' -Djava.library.path= or g.apache.flume.node.Application --conf-file=/home/data/flume/job/flume-netcat.conf --name yinzhengjie2019-07-19 15:14:19,044 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider.start(PollingPropertiesFileConfigurationProvider.java:62)] Configuration provider starting 2019-07-19 15:14:19,054 (conf-file-poller-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:138)] Reloading configuration file:/home/data/flume/job/flume-netcat.conf 2019-07-19 15:14:19,060 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addComponentConfig(FlumeConfiguration.java:1203)] Processing:r1 2019-07-19 15:14:19,060 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addComponentConfig(FlumeConfiguration.java:1203)] Processing:c1 2019-07-19 15:14:19,060 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1117)] Added sinks: k1 Agent: yinzhengjie 2019-07-19 15:14:19,061 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addComponentConfig(FlumeConfiguration.java:1203)] Processing:c1 2019-07-19 15:14:19,061 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addComponentConfig(FlumeConfiguration.java:1203)] Processing:r1 2019-07-19 15:14:19,061 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addComponentConfig(FlumeConfiguration.java:1203)] Processing:k1 2019-07-19 15:14:19,061 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addComponentConfig(FlumeConfiguration.java:1203)] Processing:r1 2019-07-19 15:14:19,061 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addComponentConfig(FlumeConfiguration.java:1203)] Processing:r1 2019-07-19 15:14:19,061 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addComponentConfig(FlumeConfiguration.java:1203)] Processing:k1 2019-07-19 15:14:19,061 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addComponentConfig(FlumeConfiguration.java:1203)] Processing:c1 2019-07-19 15:14:19,062 (conf-file-poller-0) [WARN - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateConfigFilterSet(FlumeConfiguration.java:623)] Agent configuration for 'yinzhengjie' has no configfilters. 2019-07-19 15:14:19,080 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:163)] Post-validation flume configuration contains configuration for agents: [yinzhengjie] 2019-07-19 15:14:19,080 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:151)] Creating channels 2019-07-19 15:14:19,093 (conf-file-poller-0) [INFO - org.apache.flume.channel.DefaultChannelFactory.create(DefaultChannelFactory.java:42)] Creating instance of channel c1 type memory 2019-07-19 15:14:19,098 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:205)] Created channel c1 2019-07-19 15:14:19,105 (conf-file-poller-0) [INFO - org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory.java:41)] Creating instance of source r1, type netcat 2019-07-19 15:14:19,110 (conf-file-poller-0) [INFO - org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:42)] Creating instance of sink: k1, type: logger 2019-07-19 15:14:19,112 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:120)] Channel c1 connected to [r1, k1] 2019-07-19 15:14:19,129 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:162)] Starting new configuration:{ sourceRunners:{r1=EventDrivenSourceRunner: { source:org.apache.flume.source.NetcatSource{name:r1,state:IDLE} }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@5b204d1f counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} }2019-07-19 15:14:19,133 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:169)] Starting Channel c1 2019-07-19 15:14:19,135 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:184)] Waiting for channel: c1 to start. Sleeping for 500 ms 2019-07-19 15:14:19,221 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean. 2019-07-19 15:14:19,221 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: CHANNEL, name: c1 started 2019-07-19 15:14:19,636 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:196)] Starting Sink k1 2019-07-19 15:14:19,637 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:207)] Starting Source r1 2019-07-19 15:14:19,638 (lifecycleSupervisor-1-4) [INFO - org.apache.flume.source.NetcatSource.start(NetcatSource.java:155)] Source starting 2019-07-19 15:14:19,681 (lifecycleSupervisor-1-4) [INFO - org.apache.flume.source.NetcatSource.start(NetcatSource.java:166)] Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/172.30.1.105:8888] 2019-07-19 15:14:19,694 (conf-file-poller-0) [INFO - org.eclipse.jetty.util.log.Log.initialized(Log.java:192)] Logging initialized @1117ms to org.eclipse.jetty.util.log.Slf4jLog 2019-07-19 15:14:19,812 (conf-file-poller-0) [INFO - org.eclipse.jetty.server.Server.doStart(Server.java:372)] jetty-9.4.6.v20170531 2019-07-19 15:14:19,935 (conf-file-poller-0) [INFO - org.eclipse.jetty.server.AbstractConnector.doStart(AbstractConnector.java:280)] Started ServerConnector@48970ee9{HTTP/1.1,[http/1.1]}{0.0.0.0:10501} 2019-07-19 15:14:19,935 (conf-file-poller-0) [INFO - org.eclipse.jetty.server.Server.doStart(Server.java:444)] Started @1358ms 2019-07-19 15:15:05,701 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 79 69 6E 7A 68 65 6E 67 6A 69 65 20 64 61 6F 20 yinzhengjie dao }
[root@node105.yinzhengjie.org.cn ~]# yum -y install epel-release Loaded plugins: fastestmirror Loading mirror speeds from cached hostfile * base: mirror.bit.edu.cn * extras: mirrors.aliyun.com * updates: mirrors.aliyun.com Resolving Dependencies --> Running transaction check ---> Package epel-release.noarch 0:7-11 will be installed --> Finished Dependency Resolution Dependencies Resolved ============================================================================================================================================================================================================================================================================== Package Arch Version Repository Size ============================================================================================================================================================================================================================================================================== Installing: epel-release noarch 7-11 extras 15 k Transaction Summary ============================================================================================================================================================================================================================================================================== Install 1 Package Total download size: 15 k Installed size: 24 k Downloading packages: epel-release-7-11.noarch.rpm | 15 kB 00:00:00 Running transaction check Running transaction test Transaction test succeeded Running transaction Installing : epel-release-7-11.noarch 1/1 Verifying : epel-release-7-11.noarch 1/1 Installed: epel-release.noarch 0:7-11 Complete! [root@node105.yinzhengjie.org.cn ~]#
[root@node105.yinzhengjie.org.cn ~]# yum list jq Loaded plugins: fastestmirror Loading mirror speeds from cached hostfile epel/x86_64/metalink | 6.1 kB 00:00:00 * base: mirror.bit.edu.cn * epel: mirrors.tuna.tsinghua.edu.cn * extras: mirrors.aliyun.com * updates: mirrors.aliyun.com epel | 5.3 kB 00:00:00 (1/3): epel/x86_64/group_gz | 88 kB 00:00:00 epel/x86_64/updateinfo FAILED http://ftp.jaist.ac.jp/pub/Linux/Fedora/epel/7/x86_64/repodata/52f0298e60c86c08c5a90ffdff1f223a1166be2d7e011c9015ecfc8dc8bdf38b-updateinfo.xml.bz2: [Errno 14] HTTP Error 404 - Not Found ] 0.0 B/s | 0 B --:--:-- ETA Trying other mirror. To address this issue please refer to the below wiki article https://wiki.centos.org/yum-errors If above article doesn't help to resolve this issue please use https://bugs.centos.org/. (2/3): epel/x86_64/updateinfo | 990 kB 00:00:00 (3/3): epel/x86_64/primary_db | 6.8 MB 00:00:04 Available Packages jq.x86_64 1.5-1.el7 epel [root@node105.yinzhengjie.org.cn ~]#
[root@node105.yinzhengjie.org.cn ~]# yum -y install jq Loaded plugins: fastestmirror Loading mirror speeds from cached hostfile * base: mirror.bit.edu.cn * epel: mirrors.yun-idc.com * extras: mirrors.aliyun.com * updates: mirrors.aliyun.com Resolving Dependencies --> Running transaction check ---> Package jq.x86_64 0:1.5-1.el7 will be installed --> Processing Dependency: libonig.so.2()(64bit) for package: jq-1.5-1.el7.x86_64 --> Running transaction check ---> Package oniguruma.x86_64 0:5.9.5-3.el7 will be installed --> Finished Dependency Resolution Dependencies Resolved ============================================================================================================================================================================================================================================================================== Package Arch Version Repository Size ============================================================================================================================================================================================================================================================================== Installing: jq x86_64 1.5-1.el7 epel 153 k Installing for dependencies: oniguruma x86_64 5.9.5-3.el7 epel 129 k Transaction Summary ============================================================================================================================================================================================================================================================================== Install 1 Package (+1 Dependent package) Total download size: 282 k Installed size: 906 k Downloading packages: warning: /var/cache/yum/x86_64/7/epel/packages/jq-1.5-1.el7.x86_64.rpm: Header V3 RSA/SHA256 Signature, key ID 352c64e5: NOKEY Public key for jq-1.5-1.el7.x86_64.rpm is not installed (1/2): jq-1.5-1.el7.x86_64.rpm | 153 kB 00:00:00 (2/2): oniguruma-5.9.5-3.el7.x86_64.rpm | 129 kB 00:00:02 ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ Total 118 kB/s | 282 kB 00:00:02 Retrieving key from file:///etc/pki/rpm-gpg/RPM-GPG-KEY-EPEL-7 Importing GPG key 0x352C64E5: Userid : "Fedora EPEL (7) <epel@fedoraproject.org>" Fingerprint: 91e9 7d7c 4a5e 96f1 7f3e 888f 6a2f aea2 352c 64e5 Package : epel-release-7-11.noarch (@extras) From : /etc/pki/rpm-gpg/RPM-GPG-KEY-EPEL-7 Running transaction check Running transaction test Transaction test succeeded Running transaction Installing : oniguruma-5.9.5-3.el7.x86_64 1/2 Installing : jq-1.5-1.el7.x86_64 2/2 Verifying : oniguruma-5.9.5-3.el7.x86_64 1/2 Verifying : jq-1.5-1.el7.x86_64 2/2 Installed: jq.x86_64 0:1.5-1.el7 Dependency Installed: oniguruma.x86_64 0:5.9.5-3.el7 Complete! [root@node105.yinzhengjie.org.cn ~]#
[root@node105.yinzhengjie.org.cn ~]# curl http://node105.yinzhengjie.org.cn:10501/metrics | jq % Total % Received % Xferd Average Speed Time Time Time Current Dload Upload Total Spent Left Speed 100 264 0 264 0 0 21605 0 --:--:-- --:--:-- --:--:-- 22000 { "CHANNEL.c1": { #这是c1的CHANEL监控数据,c1该名称在flume-netcat.conf中配置文件中定义的。 "ChannelCapacity": "1000", #channel的容量,目前仅支持File Channel,Memory channel的统计数据。 "ChannelFillPercentage": "0.0", #channel已填入的百分比。 "Type": "CHANNEL", #很显然,这里是CHANNEL监控项,类型为CHANNEL。 "ChannelSize": "0", #目前channel中事件的总数量,目前仅支持File Channel,Memory channel的统计数据。 "EventTakeSuccessCount": "64", #sink成功从channel读取事件的总数量。 "EventTakeAttemptCount": "227", #sink尝试从channel拉取事件的总次数。这不意味着每次时间都被返回,因为sink拉取的时候channel可能没有任何数据。 "StartTime": "1563520459221", #channel启动时的毫秒值时间。 "EventPutAttemptCount": "64", #Source尝试写入Channe的事件总次数。 "EventPutSuccessCount": "64", #成功写入channel且提交的事件总次数。 "StopTime": "0" #channel停止时的毫秒值时间,为0表示一直在运行。 } } [root@node105.yinzhengjie.org.cn ~]# 温馨提示: 如果你还要想了解更多度量值,可参考官方文档:http://flume.apache.org/FlumeUserGuide.html#monitoring。
[root@node105.yinzhengjie.org.cn ~]# ss -ntl State Recv-Q Send-Q Local Address:Port Peer Address:Port LISTEN 0 128 *:22 *:* LISTEN 0 50 172.30.1.105:8888 *:* LISTEN 0 50 *:10501 *:* LISTEN 0 128 :::22 :::* [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# netstat -untalp | grep 8888 tcp 0 0 172.30.1.105:8888 0.0.0.0:* LISTEN 3816/java tcp 0 0 172.30.1.105:8888 172.30.1.105:47672 TIME_WAIT - [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# jps 3816 Application 4426 Jps [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# kill 3816 [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# jps 4440 Jps [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# netstat -untalp | grep 8888 tcp 0 0 172.30.1.105:8888 172.30.1.105:47672 TIME_WAIT - [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# ss -ntl State Recv-Q Send-Q Local Address:Port Peer Address:Port LISTEN 0 128 *:22 *:* LISTEN 0 128 :::22 :::* [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]#
2>.实时读取本地文件到HDFS集群(需要flume节点配置hadoop集群环境哟,exec source - memory channel - hdfs sink)
[root@node105.yinzhengjie.org.cn ~]# cat /home/data/flume/job/flume-hdfs.conf yinzhengjie2.sources = file_source yinzhengjie2.sinks = hdfs_sink yinzhengjie2.channels = memory_channel yinzhengjie2.sources.file_source.type = exec yinzhengjie2.sources.file_source.command = tail -F /var/log/messages yinzhengjie2.sources.file_source.shell = /bin/bash -c yinzhengjie2.sinks.hdfs_sink.type = hdfs yinzhengjie2.sinks.hdfs_sink.hdfs.path = hdfs://node101.yinzhengjie.org.cn:8020/flume/%Y%m%d/%H #上传文件的前缀 yinzhengjie2.sinks.hdfs_sink.hdfs.filePrefix = 172.30.1.105- #是否按照时间滚动文件夹 yinzhengjie2.sinks.hdfs_sink.hdfs.round = true #多少时间单位创建一个新的文件夹 yinzhengjie2.sinks.hdfs_sink.hdfs.roundValue = 1 #重新定义时间单位 yinzhengjie2.sinks.hdfs_sink.hdfs.roundUnit = hour #是否使用本地时间戳 yinzhengjie2.sinks.hdfs_sink.hdfs.useLocalTimeStamp = true #积攒多少个Event才flush到HDFS一次 yinzhengjie2.sinks.hdfs_sink.hdfs.batchSize = 1000 #设置文件类型,可支持压缩 yinzhengjie2.sinks.hdfs_sink.hdfs.fileType = DataStream #多久生成一个新的文件 yinzhengjie2.sinks.hdfs_sink.hdfs.rollInterval = 600 #设置每个文件的滚动大小 yinzhengjie2.sinks.hdfs_sink.hdfs.rollSize = 134217700 #文件的滚动与Event数量无关 yinzhengjie2.sinks.hdfs_sink.hdfs.rollCount = 0 #最小副本数 yinzhengjie2.sinks.hdfs_sink.hdfs.minBlockReplicas = 1 yinzhengjie2.channels.memory_channel.type = memory yinzhengjie2.channels.memory_channel.capacity = 1000 yinzhengjie2.channels.memory_channel.transactionCapacity = 1000 yinzhengjie2.sources.file_source.channels = memory_channel yinzhengjie2.sinks.hdfs_sink.channel = memory_channel [root@node105.yinzhengjie.org.cn ~]#
[root@node105.yinzhengjie.org.cn ~]# ss -ntl State Recv-Q Send-Q Local Address:Port Peer Address:Port LISTEN 0 128 *:22 *:* LISTEN 0 128 :::22 :::* [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# cat /home/data/flume/shell/start-hdfs.sh #!/bin/bash #@author :yinzhengjie #blog:http://www.cnblogs.com/yinzhengjie #EMAIL:y1053419035@qq.com #Data:Thu Oct 18 11:26:06 CST 2018 #将监控数据发送给ganglia,需要指定ganglia服务器地址,使用请确认是否部署好ganglia服务! #nohup flume-ng agent -c /home/softwares/apache-flume-1.9.0-bin/conf --conf-file=/home/data/flume/job/flume-hdfs.conf --name yinzhengjie2 -Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=node105.yinzhengjie.org.cn:864 9 -Dflume.root.logger=INFO,console >> /home/data/flume/log/flume-ganglia-flume-hdfs.log 2>&1 & #启动flume自身的监控参数,默认执行以下脚本 nohup flume-ng agent -c /home/data/flume/job --conf-file=/home/data/flume/job/flume-hdfs.conf --name yinzhengjie2 -Dflume.monitoring.type=http -Dflume.monitoring.port=10502 -Dflume.root.logger=INFO,console >> /home/data/flu me/log/flume-hdfs.log 2>&1 &[root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# chmod +x /home/data/flume/shell/start-hdfs.sh [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# /home/data/flume/shell/start-hdfs.sh [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# ss -ntl State Recv-Q Send-Q Local Address:Port Peer Address:Port LISTEN 0 128 *:22 *:* LISTEN 0 50 *:10502 *:* LISTEN 0 128 :::22 :::* [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# jps 5643 Application 5757 Jps [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]#
[root@node105.yinzhengjie.org.cn ~]# tail -100f /home/data/flume/log/flume-hdfs.log Warning: JAVA_HOME is not set! Info: Including Hive libraries found via () for Hive access + exec /home/softwares/jdk1.8.0_201/bin/java -Xmx20m -Dflume.monitoring.type=http -Dflume.monitoring.port=10502 -Dflume.root.logger=INFO,console -cp '/home/data/flume/job:/home/softwares/apache-flume-1.9.0-bin/lib/*:/lib/*' - Djava.library.path= org.apache.flume.node.Application --conf-file=/home/data/flume/job/flume-hdfs.conf --name yinzhengjie2log4j:WARN No appenders could be found for logger (org.apache.flume.util.SSLUtil). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. Warning: JAVA_HOME is not set! Info: Including Hadoop libraries found via (/home/softwares/hadoop-2.6.0/bin/hadoop) for HDFS access Info: Including Hive libraries found via () for Hive access + exec /home/softwares/jdk1.8.0_201/bin/java -Xmx20m -Dflume.monitoring.type=http -Dflume.monitoring.port=10502 -Dflume.root.logger=INFO,console -cp '/home/data/flume/job:/home/softwares/apache-flume-1.9.0-bin/lib/*:/home/sof twares/hadoop-2.6.0/etc/hadoop:/home/softwares/hadoop-2.6.0/share/hadoop/common/lib/*:/home/softwares/hadoop-2.6.0/share/hadoop/common/*:/home/softwares/hadoop-2.6.0/share/hadoop/hdfs:/home/softwares/hadoop-2.6.0/share/hadoop/hdfs/lib/*:/home/softwares/hadoop-2.6.0/share/hadoop/hdfs/*:/home/softwares/hadoop-2.6.0/share/hadoop/yarn/lib/*:/home/softwares/hadoop-2.6.0/share/hadoop/yarn/*:/home/softwares/hadoop-2.6.0/share/hadoop/mapreduce/lib/*:/home/softwares/hadoop-2.6.0/share/hadoop/mapreduce/*:/contrib/capacity-scheduler/*.jar:/lib/*' -Djava.library.path=:/home/softwares/hadoop-2.6.0/lib/native org.apache.flume.node.Application --conf-file=/home/data/flume/job/flume-hdfs.conf --name yinzhengjie2SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/home/softwares/apache-flume-1.9.0-bin/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/home/softwares/hadoop-2.6.0/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 19/07/19 17:27:18 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting 19/07/19 17:27:18 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:/home/data/flume/job/flume-hdfs.conf 19/07/19 17:27:18 INFO conf.FlumeConfiguration: Processing:memory_channel 19/07/19 17:27:18 INFO conf.FlumeConfiguration: Processing:file_source 19/07/19 17:27:18 INFO conf.FlumeConfiguration: Processing:hdfs_sink 19/07/19 17:27:18 INFO conf.FlumeConfiguration: Processing:hdfs_sink 19/07/19 17:27:18 INFO conf.FlumeConfiguration: Processing:hdfs_sink 19/07/19 17:27:18 INFO conf.FlumeConfiguration: Processing:hdfs_sink 19/07/19 17:27:18 INFO conf.FlumeConfiguration: Processing:hdfs_sink 19/07/19 17:27:18 INFO conf.FlumeConfiguration: Processing:memory_channel 19/07/19 17:27:18 INFO conf.FlumeConfiguration: Processing:hdfs_sink 19/07/19 17:27:18 INFO conf.FlumeConfiguration: Processing:file_source 19/07/19 17:27:18 INFO conf.FlumeConfiguration: Processing:hdfs_sink 19/07/19 17:27:18 INFO conf.FlumeConfiguration: Processing:hdfs_sink 19/07/19 17:27:18 INFO conf.FlumeConfiguration: Processing:hdfs_sink 19/07/19 17:27:18 INFO conf.FlumeConfiguration: Processing:file_source 19/07/19 17:27:18 INFO conf.FlumeConfiguration: Processing:file_source 19/07/19 17:27:18 INFO conf.FlumeConfiguration: Processing:memory_channel 19/07/19 17:27:18 INFO conf.FlumeConfiguration: Processing:hdfs_sink 19/07/19 17:27:18 INFO conf.FlumeConfiguration: Processing:hdfs_sink 19/07/19 17:27:18 INFO conf.FlumeConfiguration: Processing:hdfs_sink 19/07/19 17:27:18 INFO conf.FlumeConfiguration: Processing:hdfs_sink 19/07/19 17:27:18 INFO conf.FlumeConfiguration: Processing:hdfs_sink 19/07/19 17:27:18 INFO conf.FlumeConfiguration: Added sinks: hdfs_sink Agent: yinzhengjie2 19/07/19 17:27:18 WARN conf.FlumeConfiguration: Agent configuration for 'yinzhengjie2' has no configfilters. 19/07/19 17:27:18 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [yinzhengjie2] 19/07/19 17:27:18 INFO node.AbstractConfigurationProvider: Creating channels 19/07/19 17:27:18 INFO channel.DefaultChannelFactory: Creating instance of channel memory_channel type memory 19/07/19 17:27:18 INFO node.AbstractConfigurationProvider: Created channel memory_channel 19/07/19 17:27:18 INFO source.DefaultSourceFactory: Creating instance of source file_source, type exec 19/07/19 17:27:18 INFO sink.DefaultSinkFactory: Creating instance of sink: hdfs_sink, type: hdfs 19/07/19 17:27:18 INFO node.AbstractConfigurationProvider: Channel memory_channel connected to [file_source, hdfs_sink] 19/07/19 17:27:18 INFO node.Application: Starting new configuration:{ sourceRunners:{file_source=EventDrivenSourceRunner: { source:org.apache.flume.source.ExecSource{name:file_source,state:IDLE} }} sinkRunners:{hdfs_sink=Sink Runner: { policy:org.apache.flume.sink.DefaultSinkProcessor@331a821d counterGroup:{ name:null counters:{} } }} channels:{memory_channel=org.apache.flume.channel.MemoryChannel{name: memory_channel}} }19/07/19 17:27:18 INFO node.Application: Starting Channel memory_channel 19/07/19 17:27:18 INFO node.Application: Waiting for channel: memory_channel to start. Sleeping for 500 ms 19/07/19 17:27:18 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: memory_channel: Successfully registered new MBean. 19/07/19 17:27:18 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: memory_channel started 19/07/19 17:27:19 INFO node.Application: Starting Sink hdfs_sink 19/07/19 17:27:19 INFO node.Application: Starting Source file_source 19/07/19 17:27:19 INFO source.ExecSource: Exec source starting with command: tail -F /var/log/messages 19/07/19 17:27:19 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: file_source: Successfully registered new MBean. 19/07/19 17:27:19 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: file_source started 19/07/19 17:27:19 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: hdfs_sink: Successfully registered new MBean. 19/07/19 17:27:19 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: hdfs_sink started 19/07/19 17:27:19 INFO util.log: Logging initialized @1347ms to org.eclipse.jetty.util.log.Slf4jLog 19/07/19 17:27:19 INFO server.Server: jetty-9.4.6.v20170531 19/07/19 17:27:19 INFO server.AbstractConnector: Started ServerConnector@3ab21218{HTTP/1.1,[http/1.1]}{0.0.0.0:10502} 19/07/19 17:27:19 INFO server.Server: Started @1695ms 19/07/19 17:27:23 INFO hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false 19/07/19 17:27:23 INFO hdfs.BucketWriter: Creating hdfs://node101.yinzhengjie.org.cn:8020/flume/20190719/17/172.30.1.105-.1563528443487.tmp ^C [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]#
[root@node105.yinzhengjie.org.cn ~]# hdfs dfs -ls /flume/20190719/17 Found 1 items -rw-r--r-- 3 root supergroup 815 2019-07-19 17:27 /flume/20190719/17/172.30.1.105-.1563528443487.tmp [root@node105.yinzhengjie.org.cn ~]#
[root@node105.yinzhengjie.org.cn ~]# curl http://node105.yinzhengjie.org.cn:10502/metrics | jq % Total % Received % Xferd Average Speed Time Time Time Current Dload Upload Total Spent Left Speed 100 951 0 951 0 0 72739 0 --:--:-- --:--:-- --:--:-- 73153 { "SOURCE.file_source": { "AppendBatchAcceptedCount": "0", #成功提交到channel的批次的总数量。 "GenericProcessingFail": "0", #常规处理失败的次数 "EventAcceptedCount": "9", #成功写出到channel的事件总数量。 "AppendReceivedCount": "0", #每批只有一个事件的事件总数量(与RPC调用的一个append调用相等)。 "StartTime": "1563528439426", #SOURCE启动时的毫秒值时间。 "AppendBatchReceivedCount": "0", #接收到事件批次的总数量。 "ChannelWriteFail": "0", #往CHANNEL写失败的次数 "EventReceivedCount": "9", #目前为止source已经接收到的事件总数量。 "EventReadFail": "0", #时间读取失败的次数 "Type": "SOURCE", #当前类型为SOURRCE "AppendAcceptedCount": "0", #逐条录入的次数,单独传入的事件到Channel且成功返回的事件总数量。 "OpenConnectionCount": "0", #目前与客户端或sink保持连接的总数量,目前仅支持avro source展现该度量。 "StopTime": "0" #SOURCE停止时的毫秒值时间,0代表一直运行着 }, "CHANNEL.memory_channel": { "ChannelCapacity": "1000", #channel的容量,目前仅支持File Channel,Memory channel的统计数据。 "ChannelFillPercentage": "0.0", #channel已填入的百分比。 "Type": "CHANNEL", #当前类型为CHANNEL "ChannelSize": "0", #目前channel中事件的总数量,目前仅支持File Channel,Memory channel的统计数据。 "EventTakeSuccessCount": "9", #sink成功从channel读取事件的总数量。 "EventTakeAttemptCount": "36", #sink尝试从channel拉取事件的总次数。这不意味着每次时间都被返回,因为sink拉取的时候channel可能没有任何数据。 "StartTime": "1563528438997", #CHANNEL启动时的毫秒值时间。 "EventPutAttemptCount": "9", #Source尝试写入Channe的事件总次数。 "EventPutSuccessCount": "9", #成功写入channel且提交的事件总次数。 "StopTime": "0" #CHANNEL停止时的毫秒值时间。 }, "SINK.hdfs_sink": { "ConnectionCreatedCount": "1", #下一个阶段(或存储系统)创建链接的数量(如HDFS创建一个文件)。 "BatchCompleteCount": "0", #批量处理event的个数等于批处理大小的数量。 "EventWriteFail": "0", #时间写失败的次数 "BatchEmptyCount": "26", #批量处理event的个数为0的数量(空的批量的数量),如果数量很大表示source写入数据的速度比sink处理数据的速度慢很多。 "EventDrainAttemptCount": "9", #sink尝试写出到存储的事件总数量。 "StartTime": "1563528439448", #SINK启动时的毫秒值时间。 "BatchUnderflowCount": "1", #批量处理event的个数小于批处理大小的数量(比sink配置使用的最大批量尺寸更小的批量的数量),如果该值很高也表示sink比source更快。 "ChannelReadFail": "0", #从CHANNEL读取失败的次数 "ConnectionFailedCount": "0", #连接失败的次数 "ConnectionClosedCount": "0", #连接关闭的次数 "Type": "SINK", #当前类型为SINK "EventDrainSuccessCount": "9", #sink成功写出到存储的事件总数量。 "StopTime": "0" #SINK停止时的毫秒值时间。 } } [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]#
3>.实时指定目录文件内容到HDFS集群(需要flume节点配置hadoop集群环境哟,spooldir source - memory channel - hdfs sink)
[root@node105.yinzhengjie.org.cn ~]# cat /home/data/flume/job/flume-dir.conf yinzhengjie3.sources = spooldir_source yinzhengjie3.sinks = hdfs_sink yinzhengjie3.channels = memory_channel # Describe/configure the source yinzhengjie3.sources.spooldir_source.type = spooldir yinzhengjie3.sources.spooldir_source.spoolDir = /yinzhengjie/data/flume/upload yinzhengjie3.sources.spooldir_source.fileSuffix = .COMPLETED yinzhengjie3.sources.spooldir_source.fileHeader = true #忽略所有以.tmp结尾的文件,不上传 yinzhengjie3.sources.spooldir_source.ignorePattern = ([^ ]*\.tmp) #获取源文件名称,方便下面的sink调用变量fileName yinzhengjie3.sources.spooldir_source.basenameHeader = true yinzhengjie3.sources.spooldir_source.basenameHeaderKey = fileName # Describe the sink yinzhengjie3.sinks.hdfs_sink.type = hdfs yinzhengjie3.sinks.hdfs_sink.hdfs.path = hdfs://node101.yinzhengjie.org.cn:8020/flume #上传文件的前缀 yinzhengjie3.sinks.hdfs_sink.hdfs.filePrefix = 172.30.1.105-upload- #是否按照时间滚动文件夹 yinzhengjie3.sinks.hdfs_sink.hdfs.round = true #多少时间单位创建一个新的文件夹 yinzhengjie3.sinks.hdfs_sink.hdfs.roundValue = 1 #重新定义时间单位 yinzhengjie3.sinks.hdfs_sink.hdfs.roundUnit = hour #是否使用本地时间戳 yinzhengjie3.sinks.hdfs_sink.hdfs.useLocalTimeStamp = true #积攒多少个Event才flush到HDFS一次 yinzhengjie3.sinks.hdfs_sink.hdfs.batchSize = 100 #设置文件类型,可支持压缩 yinzhengjie3.sinks.hdfs_sink.hdfs.fileType = DataStream #多久生成一个新的文件 yinzhengjie3.sinks.hdfs_sink.hdfs.rollInterval = 60 #设置每个文件的滚动大小大概是128M yinzhengjie3.sinks.hdfs_sink.hdfs.rollSize = 134217700 #文件的滚动与Event数量无关 yinzhengjie3.sinks.hdfs_sink.hdfs.rollCount = 0 #最小冗余数 yinzhengjie3.sinks.hdfs_sink.hdfs.minBlockReplicas = 1 #和source的basenameHeader,basenameHeaderKey两个属性一起用可以保持原文件名称上传 yinzhengjie3.sinks.hdfs_sink.hdfs.filePrefix = %{fileName} # Use a channel which buffers events in memory yinzhengjie3.channels.memory_channel.type = memory yinzhengjie3.channels.memory_channel.capacity = 1000 yinzhengjie3.channels.memory_channel.transactionCapacity = 1000 # Bind the source and sink to the channel yinzhengjie3.sources.spooldir_source.channels = memory_channel yinzhengjie3.sinks.hdfs_sink.channel = memory_channel [root@node105.yinzhengjie.org.cn ~]#
[root@node105.yinzhengjie.org.cn ~]# cat /home/data/flume/shell/start-dir.sh #!/bin/bash #@author :yinzhengjie #blog:http://www.cnblogs.com/yinzhengjie #EMAIL:y1053419035@qq.com #Data:Thu Oct 18 11:26:06 CST 2018 #将监控数据发送给ganglia,需要指定ganglia服务器地址,使用请确认是否部署好ganglia服务! #nohup flume-ng agent -c /home/data/flume/job --conf-file=/home/data/flume/job/flume-dir.conf --name yinzhengjie3 -Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=node105.yinzhengjie.org.cn:8649 -Dflume.root.logger=INFO,console >> /home/data/flume/log/flume-gang lia-flume-dir.log 2>&1 & #启动flume自身的监控参数,默认执行以下脚本 nohup flume-ng agent -c /home/data/flume/job --conf-file=/home/data/flume/job/flume-dir.conf --name yinzhengjie3 -Dflume.monitoring.type=http -Dflume.monitoring.port=10503 -Dflume.root.logger=INFO,console >> /home/data/flume/log/flume-dir.log 2>&1 & [root@node105.yinzhengjie.org.cn ~]#
[root@node105.yinzhengjie.org.cn ~]# mkdir -pv /yinzhengjie/data/flume/upload mkdir: created directory ‘/yinzhengjie’ mkdir: created directory ‘/yinzhengjie/data’ mkdir: created directory ‘/yinzhengjie/data/flume’ mkdir: created directory ‘/yinzhengjie/data/flume/upload’ [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# echo http://www.cnblogs.com/yinzhengjie>/yinzhengjie/data/flume/upload/yinzhengjie.blog [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# echo http://www.cnblogs.com/yinzhengjie>/yinzhengjie/data/flume/upload/yinzhengjie2.tmp [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# echo http://www.cnblogs.com/yinzhengjie>/yinzhengjie/data/flume/upload/yinzhengjie3.txt [root@node105.yinzhengjie.org.cn ~]#
[root@node105.yinzhengjie.org.cn ~]# ss -ntl State Recv-Q Send-Q Local Address:Port Peer Address:Port LISTEN 0 128 *:22 *:* LISTEN 0 128 :::22 :::* [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# /home/data/flume/shell/start-dir.sh [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# ss -ntl State Recv-Q Send-Q Local Address:Port Peer Address:Port LISTEN 0 50 *:10503 *:* LISTEN 0 128 *:22 *:* LISTEN 0 128 :::22 :::* [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# tail -100f /home/data/flume/log/flume-dir.log Warning: JAVA_HOME is not set! Info: Including Hadoop libraries found via (/home/softwares/hadoop-2.6.0/bin/hadoop) for HDFS access Info: Including Hive libraries found via () for Hive access + exec /home/softwares/jdk1.8.0_201/bin/java -Xmx20m -Dflume.monitoring.type=http -Dflume.monitoring.port=10503 -Dflume.root.logger=INFO,console -cp '/home/data/flume/job:/home/softwares/apache-flume-1.9.0-bin/lib/*:/home/softwares/hadoop-2.6.0/etc/hadoop:/home/software s/hadoop-2.6.0/share/hadoop/common/lib/*:/home/softwares/hadoop-2.6.0/share/hadoop/common/*:/home/softwares/hadoop-2.6.0/share/hadoop/hdfs:/home/softwares/hadoop-2.6.0/share/hadoop/hdfs/lib/*:/home/softwares/hadoop-2.6.0/share/hadoop/hdfs/*:/home/softwares/hadoop-2.6.0/share/hadoop/yarn/lib/*:/home/softwares/hadoop-2.6.0/share/hadoop/yarn/*:/home/softwares/hadoop-2.6.0/share/hadoop/mapreduce/lib/*:/home/softwares/hadoop-2.6.0/share/hadoop/mapreduce/*:/contrib/capacity-scheduler/*.jar:/lib/*' -Djava.library.path=:/home/softwares/hadoop-2.6.0/lib/native org.apache.flume.node.Application --conf-file=/home/data/flume/job/flume-dir.conf --name yinzhengjie3SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/home/softwares/apache-flume-1.9.0-bin/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/home/softwares/hadoop-2.6.0/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 19/07/19 18:28:43 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting 19/07/19 18:28:43 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:/home/data/flume/job/flume-dir.conf 19/07/19 18:28:43 INFO conf.FlumeConfiguration: Processing:hdfs_sink 19/07/19 18:28:43 INFO conf.FlumeConfiguration: Processing:memory_channel 19/07/19 18:28:43 INFO conf.FlumeConfiguration: Processing:hdfs_sink 19/07/19 18:28:43 INFO conf.FlumeConfiguration: Processing:spooldir_source 19/07/19 18:28:43 INFO conf.FlumeConfiguration: Processing:spooldir_source 19/07/19 18:28:43 INFO conf.FlumeConfiguration: Processing:spooldir_source 19/07/19 18:28:43 INFO conf.FlumeConfiguration: Processing:memory_channel 19/07/19 18:28:43 INFO conf.FlumeConfiguration: Processing:hdfs_sink 19/07/19 18:28:43 INFO conf.FlumeConfiguration: Processing:hdfs_sink 19/07/19 18:28:43 INFO conf.FlumeConfiguration: Processing:spooldir_source 19/07/19 18:28:43 INFO conf.FlumeConfiguration: Processing:hdfs_sink 19/07/19 18:28:43 INFO conf.FlumeConfiguration: Processing:hdfs_sink 19/07/19 18:28:43 INFO conf.FlumeConfiguration: Processing:hdfs_sink 19/07/19 18:28:43 INFO conf.FlumeConfiguration: Processing:hdfs_sink 19/07/19 18:28:43 INFO conf.FlumeConfiguration: Added sinks: hdfs_sink Agent: yinzhengjie3 19/07/19 18:28:43 INFO conf.FlumeConfiguration: Processing:spooldir_source 19/07/19 18:28:43 INFO conf.FlumeConfiguration: Processing:spooldir_source 19/07/19 18:28:43 INFO conf.FlumeConfiguration: Processing:hdfs_sink 19/07/19 18:28:43 INFO conf.FlumeConfiguration: Processing:hdfs_sink 19/07/19 18:28:43 INFO conf.FlumeConfiguration: Processing:hdfs_sink 19/07/19 18:28:43 INFO conf.FlumeConfiguration: Processing:memory_channel 19/07/19 18:28:43 INFO conf.FlumeConfiguration: Processing:hdfs_sink 19/07/19 18:28:43 INFO conf.FlumeConfiguration: Processing:spooldir_source 19/07/19 18:28:43 INFO conf.FlumeConfiguration: Processing:spooldir_source 19/07/19 18:28:43 INFO conf.FlumeConfiguration: Processing:hdfs_sink 19/07/19 18:28:43 INFO conf.FlumeConfiguration: Processing:hdfs_sink 19/07/19 18:28:43 WARN conf.FlumeConfiguration: Agent configuration for 'yinzhengjie3' has no configfilters. 19/07/19 18:28:43 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [yinzhengjie3] 19/07/19 18:28:43 INFO node.AbstractConfigurationProvider: Creating channels 19/07/19 18:28:43 INFO channel.DefaultChannelFactory: Creating instance of channel memory_channel type memory 19/07/19 18:28:43 INFO node.AbstractConfigurationProvider: Created channel memory_channel 19/07/19 18:28:43 INFO source.DefaultSourceFactory: Creating instance of source spooldir_source, type spooldir 19/07/19 18:28:43 INFO sink.DefaultSinkFactory: Creating instance of sink: hdfs_sink, type: hdfs 19/07/19 18:28:43 INFO node.AbstractConfigurationProvider: Channel memory_channel connected to [spooldir_source, hdfs_sink] 19/07/19 18:28:43 INFO node.Application: Starting new configuration:{ sourceRunners:{spooldir_source=EventDrivenSourceRunner: { source:Spool Directory source spooldir_source: { spoolDir: /yinzhengjie/data/flume/upload } }} sinkRunners:{hdfs_sink=SinkRunner: { policy:org .apache.flume.sink.DefaultSinkProcessor@440e91df counterGroup:{ name:null counters:{} } }} channels:{memory_channel=org.apache.flume.channel.MemoryChannel{name: memory_channel}} }19/07/19 18:28:43 INFO node.Application: Starting Channel memory_channel 19/07/19 18:28:43 INFO node.Application: Waiting for channel: memory_channel to start. Sleeping for 500 ms 19/07/19 18:28:43 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: memory_channel: Successfully registered new MBean. 19/07/19 18:28:43 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: memory_channel started 19/07/19 18:28:43 INFO node.Application: Starting Sink hdfs_sink 19/07/19 18:28:43 INFO node.Application: Starting Source spooldir_source 19/07/19 18:28:43 INFO source.SpoolDirectorySource: SpoolDirectorySource source starting with directory: /yinzhengjie/data/flume/upload 19/07/19 18:28:43 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: hdfs_sink: Successfully registered new MBean. 19/07/19 18:28:43 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: hdfs_sink started 19/07/19 18:28:43 INFO util.log: Logging initialized @1358ms to org.eclipse.jetty.util.log.Slf4jLog 19/07/19 18:28:43 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: spooldir_source: Successfully registered new MBean. 19/07/19 18:28:43 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: spooldir_source started 19/07/19 18:28:44 INFO server.Server: jetty-9.4.6.v20170531 19/07/19 18:28:44 INFO server.AbstractConnector: Started ServerConnector@1d367324{HTTP/1.1,[http/1.1]}{0.0.0.0:10503} 19/07/19 18:28:44 INFO server.Server: Started @1609ms 19/07/19 18:29:16 INFO avro.ReliableSpoolingFileEventReader: Last read took us just up to a file boundary. Rolling to the next file, if there is one. 19/07/19 18:29:16 INFO avro.ReliableSpoolingFileEventReader: Preparing to move file /yinzhengjie/data/flume/upload/yinzhengjie.blog to /yinzhengjie/data/flume/upload/yinzhengjie.blog.COMPLETED 19/07/19 18:29:16 INFO hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false 19/07/19 18:29:16 INFO hdfs.BucketWriter: Creating hdfs://node101.yinzhengjie.org.cn:8020/flume/yinzhengjie.blog.1563532156286.tmp 19/07/19 18:30:17 INFO hdfs.HDFSEventSink: Writer callback called. 19/07/19 18:30:17 INFO hdfs.BucketWriter: Closing hdfs://node101.yinzhengjie.org.cn:8020/flume/yinzhengjie.blog.1563532156286.tmp 19/07/19 18:30:18 INFO hdfs.BucketWriter: Renaming hdfs://node101.yinzhengjie.org.cn:8020/flume/yinzhengjie.blog.1563532156286.tmp to hdfs://node101.yinzhengjie.org.cn:8020/flume/yinzhengjie.blog.1563532156286 19/07/19 18:31:03 INFO avro.ReliableSpoolingFileEventReader: Last read took us just up to a file boundary. Rolling to the next file, if there is one. 19/07/19 18:31:03 INFO avro.ReliableSpoolingFileEventReader: Preparing to move file /yinzhengjie/data/flume/upload/yinzhengjie3.txt to /yinzhengjie/data/flume/upload/yinzhengjie3.txt.COMPLETED 19/07/19 18:31:03 INFO hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false 19/07/19 18:31:03 INFO hdfs.BucketWriter: Creating hdfs://node101.yinzhengjie.org.cn:8020/flume/yinzhengjie3.txt.1563532263271.tmp 19/07/19 18:32:03 INFO hdfs.HDFSEventSink: Writer callback called. 19/07/19 18:32:03 INFO hdfs.BucketWriter: Closing hdfs://node101.yinzhengjie.org.cn:8020/flume/yinzhengjie3.txt.1563532263271.tmp 19/07/19 18:32:03 INFO hdfs.BucketWriter: Renaming hdfs://node101.yinzhengjie.org.cn:8020/flume/yinzhengjie3.txt.1563532263271.tmp to hdfs://node101.yinzhengjie.org.cn:8020/flume/yinzhengjie3.txt.1563532263271
[root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# ll /yinzhengjie/data/flume/upload/ total 12 -rw-r--r-- 1 root root 35 Jul 19 18:30 yinzhengjie2.tmp -rw-r--r-- 1 root root 35 Jul 19 18:31 yinzhengjie3.txt.COMPLETED -rw-r--r-- 1 root root 35 Jul 19 18:29 yinzhengjie.blog.COMPLETED [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# hdfs dfs -ls /flume Found 3 items drwxr-xr-x - root supergroup 0 2019-07-19 18:01 /flume/20190719 -rw-r--r-- 3 root supergroup 35 2019-07-19 18:30 /flume/yinzhengjie.blog.1563532156286 -rw-r--r-- 3 root supergroup 35 2019-07-19 18:32 /flume/yinzhengjie3.txt.1563532263271 [root@node105.yinzhengjie.org.cn ~]#
[root@node105.yinzhengjie.org.cn ~]# curl http://node105.yinzhengjie.org.cn:10503/metrics | jq % Total % Received % Xferd Average Speed Time Time Time Current Dload Upload Total Spent Left Speed 100 955 0 955 0 0 4936 0 --:--:-- --:--:-- --:--:-- 4948 { "CHANNEL.memory_channel": { "ChannelCapacity": "1000", "ChannelFillPercentage": "0.0", "Type": "CHANNEL", "ChannelSize": "0", "EventTakeSuccessCount": "2", "EventTakeAttemptCount": "72", "StartTime": "1563532123413", "EventPutAttemptCount": "2", "EventPutSuccessCount": "2", "StopTime": "0" }, "SOURCE.spooldir_source": { "AppendBatchAcceptedCount": "2", "GenericProcessingFail": "0", "EventAcceptedCount": "2", "AppendReceivedCount": "0", "StartTime": "1563532123986", "AppendBatchReceivedCount": "2", "ChannelWriteFail": "0", "EventReceivedCount": "2", "EventReadFail": "0", "Type": "SOURCE", "AppendAcceptedCount": "0", "OpenConnectionCount": "0", "StopTime": "0" }, "SINK.hdfs_sink": { "ConnectionCreatedCount": "2", "BatchCompleteCount": "0", "EventWriteFail": "0", "BatchEmptyCount": "68", "EventDrainAttemptCount": "2", "StartTime": "1563532123890", "BatchUnderflowCount": "2", "ChannelReadFail": "0", "ConnectionFailedCount": "0", "ConnectionClosedCount": "2", "Type": "SINK", "EventDrainSuccessCount": "2", "StopTime": "0" } } [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]#
4>.Flume与Flume之间数据传递,多Flume汇总数据到单Flume(需要flume节点配置hadoop集群环境哟,大致架构如下图所示)
flume-1监控文件yinzhengjie.log,flume-2监控某一个端口的数据流,flume-1与flume-2将数据发送给flume-3,flume3将最终数据写入到HDFS。
[root@node105.yinzhengjie.org.cn ~]# cat /home/data/flume/job/flume-aggregation.conf # Name the components on this agent aggregation.sources = avro_source aggregation.sinks = hdfs_sink aggregation.channels = memory_channel # Describe/configure the source aggregation.sources.avro_source.type = avro aggregation.sources.avro_source.bind = node105.yinzhengjie.org.cn aggregation.sources.avro_source.port = 6666 # Describe the sink aggregation.sinks.hdfs_sink.type = hdfs aggregation.sinks.hdfs_sink.hdfs.path = hdfs://node101.yinzhengjie.org.cn:8020/flume/%Y%m%d/%H #上传文件的前缀 aggregation.sinks.hdfs_sink.hdfs.filePrefix = 172.30.1.105- #是否按照时间滚动文件夹 aggregation.sinks.hdfs_sink.hdfs.round = true #多少时间单位创建一个新的文件夹 aggregation.sinks.hdfs_sink.hdfs.roundValue = 1 #重新定义时间单位 aggregation.sinks.hdfs_sink.hdfs.roundUnit = hour #是否使用本地时间戳 aggregation.sinks.hdfs_sink.hdfs.useLocalTimeStamp = true #积攒多少个Event才flush到HDFS一次 aggregation.sinks.hdfs_sink.hdfs.batchSize = 100 #设置文件类型,可支持压缩 aggregation.sinks.hdfs_sink.hdfs.fileType = DataStream #多久生成一个新的文件 aggregation.sinks.hdfs_sink.hdfs.rollInterval = 600 #设置每个文件的滚动大小大概是128M aggregation.sinks.hdfs_sink.hdfs.rollSize = 134217700 #文件的滚动与Event数量无关 aggregation.sinks.hdfs_sink.hdfs.rollCount = 0 #最小冗余数 aggregation.sinks.hdfs_sink.hdfs.minBlockReplicas = 1 # Describe the channel aggregation.channels.memory_channel.type = memory aggregation.channels.memory_channel.capacity = 1000 aggregation.channels.memory_channel.transactionCapacity = 100 # Bind the source and sink to the channel aggregation.sources.avro_source.channels = memory_channel aggregation.sinks.hdfs_sink.channel = memory_channel [root@node105.yinzhengjie.org.cn ~]#
[root@node105.yinzhengjie.org.cn ~]# vi /home/data/flume/shell/start-aggregation.sh [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# cat /home/data/flume/shell/start-aggregation.sh #!/bin/bash #@author :yinzhengjie #blog:http://www.cnblogs.com/yinzhengjie #EMAIL:y1053419035@qq.com #Data:Thu Oct 18 11:26:06 CST 2018 #将监控数据发送给ganglia,需要指定ganglia服务器地址,使用请确认是否部署好ganglia服务! #nohup flume-ng agent -c /home/data/flume/job --conf-file=/home/data/flume/job/flume-aggregation.conf --name aggregation -Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=node105.yinzhengjie.org.cn:8649 -Dflume.root.logger=INFO,console >> /home/data/flume/log/flu me-ganglia-flume-aggregation.log 2>&1 & #启动flume自身的监控参数,默认执行以下脚本 nohup flume-ng agent -c /home/data/flume/job --conf-file=/home/data/flume/job/flume-aggregation.conf --name aggregation -Dflume.monitoring.type=http -Dflume.monitoring.port=10511 -Dflume.root.logger=INFO,console >> /home/data/flume/log/flume-aggregation.log 2>&1 & [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# chmod +x /home/data/flume/shell/start-aggregation.sh [root@node105.yinzhengjie.org.cn ~]#
[root@node105.yinzhengjie.org.cn ~]# /home/data/flume/shell/start-aggregation.sh [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# jps 8147 Application 8207 Jps [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# ss -ntl State Recv-Q Send-Q Local Address:Port Peer Address:Port LISTEN 0 50 172.30.1.105:6666 *:* LISTEN 0 50 *:10511 *:* LISTEN 0 128 *:22 *:* LISTEN 0 128 :::22 :::* [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# tail -1000f /home/data/flume/log/flume-aggregation.log Warning: JAVA_HOME is not set! Info: Including Hadoop libraries found via (/home/softwares/hadoop-2.6.0/bin/hadoop) for HDFS access Info: Including Hive libraries found via () for Hive access + exec /home/softwares/jdk1.8.0_201/bin/java -Xmx20m -Dflume.monitoring.type=http -Dflume.monitoring.port=10511 -Dflume.root.logger=INFO,console -cp '/home/data/flume/job:/home/softwares/apache-flume-1.9.0-bin/lib/*:/home/softwares/hadoop-2.6.0/etc/hadoop:/home/software s/hadoop-2.6.0/share/hadoop/common/lib/*:/home/softwares/hadoop-2.6.0/share/hadoop/common/*:/home/softwares/hadoop-2.6.0/share/hadoop/hdfs:/home/softwares/hadoop-2.6.0/share/hadoop/hdfs/lib/*:/home/softwares/hadoop-2.6.0/share/hadoop/hdfs/*:/home/softwares/hadoop-2.6.0/share/hadoop/yarn/lib/*:/home/softwares/hadoop-2.6.0/share/hadoop/yarn/*:/home/softwares/hadoop-2.6.0/share/hadoop/mapreduce/lib/*:/home/softwares/hadoop-2.6.0/share/hadoop/mapreduce/*:/contrib/capacity-scheduler/*.jar:/lib/*' -Djava.library.path=:/home/softwares/hadoop-2.6.0/lib/native org.apache.flume.node.Application --conf-file=/home/data/flume/job/flume-aggregation.conf --name aggregationSLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/home/softwares/apache-flume-1.9.0-bin/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/home/softwares/hadoop-2.6.0/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 19/07/20 21:18:38 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting 19/07/20 21:18:38 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:/home/data/flume/job/flume-aggregation.conf 19/07/20 21:18:38 INFO conf.FlumeConfiguration: Processing:hdfs_sink 19/07/20 21:18:38 INFO conf.FlumeConfiguration: Processing:hdfs_sink 19/07/20 21:18:38 INFO conf.FlumeConfiguration: Processing:memory_channel 19/07/20 21:18:38 INFO conf.FlumeConfiguration: Processing:hdfs_sink 19/07/20 21:18:38 INFO conf.FlumeConfiguration: Processing:hdfs_sink 19/07/20 21:18:38 INFO conf.FlumeConfiguration: Processing:hdfs_sink 19/07/20 21:18:38 INFO conf.FlumeConfiguration: Processing:memory_channel 19/07/20 21:18:38 INFO conf.FlumeConfiguration: Processing:hdfs_sink 19/07/20 21:18:38 INFO conf.FlumeConfiguration: Added sinks: hdfs_sink Agent: aggregation 19/07/20 21:18:38 INFO conf.FlumeConfiguration: Processing:avro_source 19/07/20 21:18:38 INFO conf.FlumeConfiguration: Processing:hdfs_sink 19/07/20 21:18:38 INFO conf.FlumeConfiguration: Processing:hdfs_sink 19/07/20 21:18:38 INFO conf.FlumeConfiguration: Processing:hdfs_sink 19/07/20 21:18:38 INFO conf.FlumeConfiguration: Processing:avro_source 19/07/20 21:18:38 INFO conf.FlumeConfiguration: Processing:hdfs_sink 19/07/20 21:18:38 INFO conf.FlumeConfiguration: Processing:avro_source 19/07/20 21:18:38 INFO conf.FlumeConfiguration: Processing:memory_channel 19/07/20 21:18:38 INFO conf.FlumeConfiguration: Processing:hdfs_sink 19/07/20 21:18:38 INFO conf.FlumeConfiguration: Processing:hdfs_sink 19/07/20 21:18:38 INFO conf.FlumeConfiguration: Processing:avro_source 19/07/20 21:18:38 INFO conf.FlumeConfiguration: Processing:hdfs_sink 19/07/20 21:18:38 INFO conf.FlumeConfiguration: Processing:hdfs_sink 19/07/20 21:18:38 WARN conf.FlumeConfiguration: Agent configuration for 'aggregation' has no configfilters. 19/07/20 21:18:39 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [aggregation] 19/07/20 21:18:39 INFO node.AbstractConfigurationProvider: Creating channels 19/07/20 21:18:39 INFO channel.DefaultChannelFactory: Creating instance of channel memory_channel type memory 19/07/20 21:18:39 INFO node.AbstractConfigurationProvider: Created channel memory_channel 19/07/20 21:18:39 INFO source.DefaultSourceFactory: Creating instance of source avro_source, type avro 19/07/20 21:18:39 INFO sink.DefaultSinkFactory: Creating instance of sink: hdfs_sink, type: hdfs 19/07/20 21:18:39 INFO node.AbstractConfigurationProvider: Channel memory_channel connected to [avro_source, hdfs_sink] 19/07/20 21:18:39 INFO node.Application: Starting new configuration:{ sourceRunners:{avro_source=EventDrivenSourceRunner: { source:Avro source avro_source: { bindAddress: node105.yinzhengjie.org.cn, port: 6666 } }} sinkRunners:{hdfs_sink=SinkRunner: { policy:org.apache. flume.sink.DefaultSinkProcessor@77a41618 counterGroup:{ name:null counters:{} } }} channels:{memory_channel=org.apache.flume.channel.MemoryChannel{name: memory_channel}} }19/07/20 21:18:39 INFO node.Application: Starting Channel memory_channel 19/07/20 21:18:39 INFO node.Application: Waiting for channel: memory_channel to start. Sleeping for 500 ms 19/07/20 21:18:39 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: memory_channel: Successfully registered new MBean. 19/07/20 21:18:39 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: memory_channel started 19/07/20 21:18:39 INFO node.Application: Starting Sink hdfs_sink 19/07/20 21:18:39 INFO node.Application: Starting Source avro_source 19/07/20 21:18:39 INFO source.AvroSource: Starting Avro source avro_source: { bindAddress: node105.yinzhengjie.org.cn, port: 6666 }... 19/07/20 21:18:39 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: hdfs_sink: Successfully registered new MBean. 19/07/20 21:18:39 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: hdfs_sink started 19/07/20 21:18:39 INFO util.log: Logging initialized @1309ms to org.eclipse.jetty.util.log.Slf4jLog 19/07/20 21:18:39 INFO server.Server: jetty-9.4.6.v20170531 19/07/20 21:18:40 INFO server.AbstractConnector: Started ServerConnector@72e46d99{HTTP/1.1,[http/1.1]}{0.0.0.0:10511} 19/07/20 21:18:40 INFO server.Server: Started @1811ms 19/07/20 21:18:40 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: avro_source: Successfully registered new MBean. 19/07/20 21:18:40 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: avro_source started 19/07/20 21:18:40 INFO source.AvroSource: Avro source avro_source started.
[root@node105.yinzhengjie.org.cn ~]# cat /home/data/flume/job/flume-my_netcat.conf # Name the components on this agent my_netcat.sources = netcat_source my_netcat.sinks = avro_sink my_netcat.channels = memory_channel # Describe/configure the source my_netcat.sources.netcat_source.type = netcat my_netcat.sources.netcat_source.bind = node105.yinzhengjie.org.cn my_netcat.sources.netcat_source.port = 8888 # Describe the sink my_netcat.sinks.avro_sink.type = avro my_netcat.sinks.avro_sink.hostname = node105.yinzhengjie.org.cn my_netcat.sinks.avro_sink.port = 6666 # Use a channel which buffers events in memory my_netcat.channels.memory_channel.type = memory my_netcat.channels.memory_channel.capacity = 1000 my_netcat.channels.memory_channel.transactionCapacity = 100 # Bind the source and sink to the channel my_netcat.sources.netcat_source.channels = memory_channel my_netcat.sinks.avro_sink.channel = memory_channel [root@node105.yinzhengjie.org.cn ~]#
[root@node105.yinzhengjie.org.cn ~]# cat /home/data/flume/shell/start-my_netcat.sh #!/bin/bash #@author :yinzhengjie #blog:http://www.cnblogs.com/yinzhengjie #EMAIL:y1053419035@qq.com #Data:Thu Oct 18 11:26:06 CST 2018 #将监控数据发送给ganglia,需要指定ganglia服务器地址,使用请确认是否部署好ganglia服务! #nohup flume-ng agent -c /home/data/flume/job --conf-file=/home/data/flume/job/flume-my_netcat.conf --name my_netcat -Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=node105.yinzhengjie.org.cn:8649 -Dflume.root.logger=INFO,console >> /home/data/flume/log/flume-g anglia-flume-my_netcat.log 2>&1 & #启动flume自身的监控参数,默认执行以下脚本 nohup flume-ng agent -c /home/data/flume/job --conf-file=/home/data/flume/job/flume-my_netcat.conf --name my_netcat -Dflume.monitoring.type=http -Dflume.monitoring.port=10512 -Dflume.root.logger=INFO,console >> /home/data/flume/log/flume-my_netcat.log 2>&1 & [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# chmod +x /home/data/flume/shell/start-my_netcat.sh [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# ss -ntl State Recv-Q Send-Q Local Address:Port Peer Address:Port LISTEN 0 50 172.30.1.105:6666 *:* LISTEN 0 50 *:10511 *:* LISTEN 0 128 *:22 *:* LISTEN 0 128 :::22 :::* [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# /home/data/flume/shell/start-my_netcat.sh [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# ss -ntl State Recv-Q Send-Q Local Address:Port Peer Address:Port LISTEN 0 50 172.30.1.105:6666 *:* LISTEN 0 50 *:10511 *:* LISTEN 0 50 *:10512 *:* LISTEN 0 128 *:22 *:* LISTEN 0 50 172.30.1.105:8888 *:* LISTEN 0 128 :::22 :::* [root@node105.yinzhengjie.org.cn ~]#
[root@node105.yinzhengjie.org.cn ~]# tail -100f /home/data/flume/log/flume-my_netcat.log Warning: JAVA_HOME is not set! Info: Including Hadoop libraries found via (/home/softwares/hadoop-2.6.0/bin/hadoop) for HDFS access Info: Including Hive libraries found via () for Hive access + exec /home/softwares/jdk1.8.0_201/bin/java -Xmx20m -Dflume.monitoring.type=http -Dflume.monitoring.port=10512 -Dflume.root.logger=INFO,console -cp '/home/data/flume/job:/home/softwares/apache-flume-1.9.0-bin/lib/*:/home/softwares/hadoop-2.6.0/etc/hadoop:/home/software s/hadoop-2.6.0/share/hadoop/common/lib/*:/home/softwares/hadoop-2.6.0/share/hadoop/common/*:/home/softwares/hadoop-2.6.0/share/hadoop/hdfs:/home/softwares/hadoop-2.6.0/share/hadoop/hdfs/lib/*:/home/softwares/hadoop-2.6.0/share/hadoop/hdfs/*:/home/softwares/hadoop-2.6.0/share/hadoop/yarn/lib/*:/home/softwares/hadoop-2.6.0/share/hadoop/yarn/*:/home/softwares/hadoop-2.6.0/share/hadoop/mapreduce/lib/*:/home/softwares/hadoop-2.6.0/share/hadoop/mapreduce/*:/contrib/capacity-scheduler/*.jar:/lib/*' -Djava.library.path=:/home/softwares/hadoop-2.6.0/lib/native org.apache.flume.node.Application --conf-file=/home/data/flume/job/flume-my_netcat.conf --name my_netcatSLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/home/softwares/apache-flume-1.9.0-bin/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/home/softwares/hadoop-2.6.0/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 19/07/20 21:28:56 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting 19/07/20 21:28:56 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:/home/data/flume/job/flume-my_netcat.conf 19/07/20 21:28:56 INFO conf.FlumeConfiguration: Processing:netcat_source 19/07/20 21:28:56 INFO conf.FlumeConfiguration: Added sinks: avro_sink Agent: my_netcat 19/07/20 21:28:56 INFO conf.FlumeConfiguration: Processing:netcat_source 19/07/20 21:28:56 INFO conf.FlumeConfiguration: Processing:avro_sink 19/07/20 21:28:56 INFO conf.FlumeConfiguration: Processing:avro_sink 19/07/20 21:28:56 INFO conf.FlumeConfiguration: Processing:netcat_source 19/07/20 21:28:56 INFO conf.FlumeConfiguration: Processing:avro_sink 19/07/20 21:28:56 INFO conf.FlumeConfiguration: Processing:memory_channel 19/07/20 21:28:56 INFO conf.FlumeConfiguration: Processing:netcat_source 19/07/20 21:28:56 INFO conf.FlumeConfiguration: Processing:avro_sink 19/07/20 21:28:56 INFO conf.FlumeConfiguration: Processing:memory_channel 19/07/20 21:28:56 INFO conf.FlumeConfiguration: Processing:memory_channel 19/07/20 21:28:56 WARN conf.FlumeConfiguration: Agent configuration for 'my_netcat' has no configfilters. 19/07/20 21:28:56 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [my_netcat] 19/07/20 21:28:56 INFO node.AbstractConfigurationProvider: Creating channels 19/07/20 21:28:56 INFO channel.DefaultChannelFactory: Creating instance of channel memory_channel type memory 19/07/20 21:28:56 INFO node.AbstractConfigurationProvider: Created channel memory_channel 19/07/20 21:28:56 INFO source.DefaultSourceFactory: Creating instance of source netcat_source, type netcat 19/07/20 21:28:56 INFO sink.DefaultSinkFactory: Creating instance of sink: avro_sink, type: avro 19/07/20 21:28:57 INFO sink.AbstractRpcSink: Connection reset is set to 0. Will not reset connection to next hop 19/07/20 21:28:57 INFO node.AbstractConfigurationProvider: Channel memory_channel connected to [netcat_source, avro_sink] 19/07/20 21:28:57 INFO node.Application: Starting new configuration:{ sourceRunners:{netcat_source=EventDrivenSourceRunner: { source:org.apache.flume.source.NetcatSource{name:netcat_source,state:IDLE} }} sinkRunners:{avro_sink=SinkRunner: { policy:org.apache.flume.sink. DefaultSinkProcessor@54ee68c1 counterGroup:{ name:null counters:{} } }} channels:{memory_channel=org.apache.flume.channel.MemoryChannel{name: memory_channel}} }19/07/20 21:28:57 INFO node.Application: Starting Channel memory_channel 19/07/20 21:28:57 INFO node.Application: Waiting for channel: memory_channel to start. Sleeping for 500 ms 19/07/20 21:28:57 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: memory_channel: Successfully registered new MBean. 19/07/20 21:28:57 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: memory_channel started 19/07/20 21:28:57 INFO node.Application: Starting Sink avro_sink 19/07/20 21:28:57 INFO node.Application: Starting Source netcat_source 19/07/20 21:28:57 INFO source.NetcatSource: Source starting 19/07/20 21:28:57 INFO source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/172.30.1.105:8888] 19/07/20 21:28:57 INFO sink.AbstractRpcSink: Starting RpcSink avro_sink { host: node105.yinzhengjie.org.cn, port: 6666 }... 19/07/20 21:28:57 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: avro_sink: Successfully registered new MBean. 19/07/20 21:28:57 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: avro_sink started 19/07/20 21:28:57 INFO sink.AbstractRpcSink: Rpc sink avro_sink: Building RpcClient with hostname: node105.yinzhengjie.org.cn, port: 6666 19/07/20 21:28:57 INFO sink.AvroSink: Attempting to create Avro Rpc client. 19/07/20 21:28:57 INFO api.NettyAvroRpcClient: Using default maxIOWorkers 19/07/20 21:28:57 INFO util.log: Logging initialized @1360ms to org.eclipse.jetty.util.log.Slf4jLog 19/07/20 21:28:57 INFO server.Server: jetty-9.4.6.v20170531 19/07/20 21:28:57 INFO server.AbstractConnector: Started ServerConnector@eb2eb8e{HTTP/1.1,[http/1.1]}{0.0.0.0:10512} 19/07/20 21:28:57 INFO server.Server: Started @1716ms 19/07/20 21:28:58 INFO sink.AbstractRpcSink: Rpc sink avro_sink started.
[root@node105.yinzhengjie.org.cn ~]# cat /home/data/flume/job/flume-my_exec.conf # Name the components on this agent my_exec.sources = exec_source my_exec.sinks = avro_sink my_exec.channels = memory_channel # Describe/configure the source my_exec.sources.exec_source.type = exec my_exec.sources.exec_source.command = tail -F /yinzhengjie/data/flume/blog.txt my_exec.sources.exec_source.shell = /bin/bash -c # Describe the sink my_exec.sinks.avro_sink.type = avro my_exec.sinks.avro_sink.hostname = node105.yinzhengjie.org.cn my_exec.sinks.avro_sink.port = 6666 # Describe the channel my_exec.channels.memory_channel.type = memory my_exec.channels.memory_channel.capacity = 1000 my_exec.channels.memory_channel.transactionCapacity = 100 # Bind the source and sink to the channel my_exec.sources.exec_source.channels = memory_channel my_exec.sinks.avro_sink.channel = memory_channel [root@node105.yinzhengjie.org.cn ~]#
[root@node105.yinzhengjie.org.cn ~]# cat /home/data/flume/shell/start-my_exec.sh #!/bin/bash #@author :yinzhengjie #blog:http://www.cnblogs.com/yinzhengjie #EMAIL:y1053419035@qq.com #Data:Thu Oct 18 11:26:06 CST 2018 #将监控数据发送给ganglia,需要指定ganglia服务器地址,使用请确认是否部署好ganglia服务! #nohup flume-ng agent -c /home/data/flume/job --conf-file=/home/data/flume/job/flume-my_exec.conf --name my_exec -Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=node105.yinzhengjie.org.cn:8649 -Dflume.root.logger=INFO,console >> /home/data/flume/log/flume-gangl ia-flume-my_exec.log 2>&1 & #启动flume自身的监控参数,默认执行以下脚本 nohup flume-ng agent -c /home/data/flume/job --conf-file=/home/data/flume/job/flume-my_exec.conf --name my_exec -Dflume.monitoring.type=http -Dflume.monitoring.port=10513 -Dflume.root.logger=INFO,console >> /home/data/flume/log/flume-my_exec.log 2>&1 & [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# ss -ntl State Recv-Q Send-Q Local Address:Port Peer Address:Port LISTEN 0 50 172.30.1.105:6666 *:* LISTEN 0 50 *:10511 *:* LISTEN 0 50 *:10512 *:* LISTEN 0 128 *:22 *:* LISTEN 0 50 172.30.1.105:8888 *:* LISTEN 0 128 :::22 :::* [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# /home/data/flume/shell/start-my_exec.sh [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# ss -ntl State Recv-Q Send-Q Local Address:Port Peer Address:Port LISTEN 0 50 172.30.1.105:6666 *:* LISTEN 0 50 *:10511 *:* LISTEN 0 50 *:10512 *:* LISTEN 0 50 *:10513 *:* LISTEN 0 128 *:22 *:* LISTEN 0 50 172.30.1.105:8888 *:* LISTEN 0 128 :::22 :::* [root@node105.yinzhengjie.org.cn ~]#
[root@node105.yinzhengjie.org.cn ~]# tail -100f /home/data/flume/log/flume-my_exec.log Warning: JAVA_HOME is not set! Info: Including Hadoop libraries found via (/home/softwares/hadoop-2.6.0/bin/hadoop) for HDFS access Info: Including Hive libraries found via () for Hive access + exec /home/softwares/jdk1.8.0_201/bin/java -Xmx20m -Dflume.monitoring.type=http -Dflume.monitoring.port=10513 -Dflume.root.logger=INFO,console -cp '/home/data/flume/job:/home/softwares/apache-flume-1.9.0-bin/lib/*:/home/softwares/hadoop-2.6.0/etc/hadoop:/home/software s/hadoop-2.6.0/share/hadoop/common/lib/*:/home/softwares/hadoop-2.6.0/share/hadoop/common/*:/home/softwares/hadoop-2.6.0/share/hadoop/hdfs:/home/softwares/hadoop-2.6.0/share/hadoop/hdfs/lib/*:/home/softwares/hadoop-2.6.0/share/hadoop/hdfs/*:/home/softwares/hadoop-2.6.0/share/hadoop/yarn/lib/*:/home/softwares/hadoop-2.6.0/share/hadoop/yarn/*:/home/softwares/hadoop-2.6.0/share/hadoop/mapreduce/lib/*:/home/softwares/hadoop-2.6.0/share/hadoop/mapreduce/*:/contrib/capacity-scheduler/*.jar:/lib/*' -Djava.library.path=:/home/softwares/hadoop-2.6.0/lib/native org.apache.flume.node.Application --conf-file=/home/data/flume/job/flume-my_exec.conf --name my_execSLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/home/softwares/apache-flume-1.9.0-bin/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/home/softwares/hadoop-2.6.0/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 19/07/20 21:44:00 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting 19/07/20 21:44:00 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:/home/data/flume/job/flume-my_exec.conf 19/07/20 21:44:00 INFO conf.FlumeConfiguration: Processing:memory_channel 19/07/20 21:44:00 INFO conf.FlumeConfiguration: Added sinks: avro_sink Agent: my_exec 19/07/20 21:44:00 INFO conf.FlumeConfiguration: Processing:exec_source 19/07/20 21:44:00 INFO conf.FlumeConfiguration: Processing:avro_sink 19/07/20 21:44:00 INFO conf.FlumeConfiguration: Processing:avro_sink 19/07/20 21:44:00 INFO conf.FlumeConfiguration: Processing:avro_sink 19/07/20 21:44:00 INFO conf.FlumeConfiguration: Processing:exec_source 19/07/20 21:44:00 INFO conf.FlumeConfiguration: Processing:avro_sink 19/07/20 21:44:00 INFO conf.FlumeConfiguration: Processing:memory_channel 19/07/20 21:44:00 INFO conf.FlumeConfiguration: Processing:exec_source 19/07/20 21:44:00 INFO conf.FlumeConfiguration: Processing:exec_source 19/07/20 21:44:00 INFO conf.FlumeConfiguration: Processing:memory_channel 19/07/20 21:44:00 WARN conf.FlumeConfiguration: Agent configuration for 'my_exec' has no configfilters. 19/07/20 21:44:00 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [my_exec] 19/07/20 21:44:00 INFO node.AbstractConfigurationProvider: Creating channels 19/07/20 21:44:00 INFO channel.DefaultChannelFactory: Creating instance of channel memory_channel type memory 19/07/20 21:44:00 INFO node.AbstractConfigurationProvider: Created channel memory_channel 19/07/20 21:44:00 INFO source.DefaultSourceFactory: Creating instance of source exec_source, type exec 19/07/20 21:44:00 INFO sink.DefaultSinkFactory: Creating instance of sink: avro_sink, type: avro 19/07/20 21:44:00 INFO sink.AbstractRpcSink: Connection reset is set to 0. Will not reset connection to next hop 19/07/20 21:44:00 INFO node.AbstractConfigurationProvider: Channel memory_channel connected to [exec_source, avro_sink] 19/07/20 21:44:00 INFO node.Application: Starting new configuration:{ sourceRunners:{exec_source=EventDrivenSourceRunner: { source:org.apache.flume.source.ExecSource{name:exec_source,state:IDLE} }} sinkRunners:{avro_sink=SinkRunner: { policy:org.apache.flume.sink.Defaul tSinkProcessor@2739d05f counterGroup:{ name:null counters:{} } }} channels:{memory_channel=org.apache.flume.channel.MemoryChannel{name: memory_channel}} }19/07/20 21:44:00 INFO node.Application: Starting Channel memory_channel 19/07/20 21:44:00 INFO node.Application: Waiting for channel: memory_channel to start. Sleeping for 500 ms 19/07/20 21:44:00 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: memory_channel: Successfully registered new MBean. 19/07/20 21:44:00 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: memory_channel started 19/07/20 21:44:01 INFO node.Application: Starting Sink avro_sink 19/07/20 21:44:01 INFO node.Application: Starting Source exec_source 19/07/20 21:44:01 INFO source.ExecSource: Exec source starting with command: tail -F /yinzhengjie/data/flume/blog.txt 19/07/20 21:44:01 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: exec_source: Successfully registered new MBean. 19/07/20 21:44:01 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: exec_source started 19/07/20 21:44:01 INFO sink.AbstractRpcSink: Starting RpcSink avro_sink { host: node105.yinzhengjie.org.cn, port: 6666 }... 19/07/20 21:44:01 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: avro_sink: Successfully registered new MBean. 19/07/20 21:44:01 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: avro_sink started 19/07/20 21:44:01 INFO sink.AbstractRpcSink: Rpc sink avro_sink: Building RpcClient with hostname: node105.yinzhengjie.org.cn, port: 6666 19/07/20 21:44:01 INFO sink.AvroSink: Attempting to create Avro Rpc client. 19/07/20 21:44:01 INFO api.NettyAvroRpcClient: Using default maxIOWorkers 19/07/20 21:44:01 INFO util.log: Logging initialized @1342ms to org.eclipse.jetty.util.log.Slf4jLog 19/07/20 21:44:01 INFO server.Server: jetty-9.4.6.v20170531 19/07/20 21:44:01 INFO server.AbstractConnector: Started ServerConnector@79d1663b{HTTP/1.1,[http/1.1]}{0.0.0.0:10513} 19/07/20 21:44:01 INFO server.Server: Started @1660ms 19/07/20 21:44:01 INFO sink.AbstractRpcSink: Rpc sink avro_sink started.
[root@node105.yinzhengjie.org.cn ~]# telnet node105.yinzhengjie.org.cn 8888 Trying 172.30.1.105... Connected to node105.yinzhengjie.org.cn. Escape character is '^]'. yinzhengjie dao ci yi you! OK
[root@node105.yinzhengjie.org.cn ~]# echo "https://www.cnblogs.com/yinzhengjie" >> /yinzhengjie/data/flume/blog.txt [root@node105.yinzhengjie.org.cn ~]#
[root@node105.yinzhengjie.org.cn ~]# hdfs dfs -cat /flume/20190720/21/172.30.1.105-.1563630502053.tmp #查看上面写入的2条测试数据 yinzhengjie dao ci yi you! https://www.cnblogs.com/yinzhengjie [root@node105.yinzhengjie.org.cn ~]#
5>.挑选器案例
channel selector: 通道挑选器,选择指定的event发送到指定的channel (1)Replicating Channel Selector 默认为副本挑选器,事件均以副本方式输出,换句话说就是有几个channel就发送几个副本。 (2)multiplexing selector 多路复用挑选器,作用就是可以将不同的内容发送到指定的channel 详情请参考: http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#flume-channel-selectors
[root@node105.yinzhengjie.org.cn ~]# cat /home/data/flume/job/flume-my_avro.conf # Name the components on this agent my_avro.sources = avro_source my_avro.sinks = hdfs_sink my_avro.channels = memory_channel # Describe/configure the source my_avro.sources.avro_source.type = avro my_avro.sources.avro_source.bind = node105.yinzhengjie.org.cn my_avro.sources.avro_source.port = 8888 # 定义到hdfs的sink my_avro.sinks.hdfs_sink.type = hdfs my_avro.sinks.hdfs_sink.hdfs.path = hdfs://node101.yinzhengjie.org.cn:8020/flume/%Y%m%d/%H #上传文件的前缀 my_avro.sinks.hdfs_sink.hdfs.filePrefix = 172.30.1.105- #是否按照时间滚动文件夹 my_avro.sinks.hdfs_sink.hdfs.round = true #多少时间单位创建一个新的文件夹 my_avro.sinks.hdfs_sink.hdfs.roundValue = 1 #重新定义时间单位 my_avro.sinks.hdfs_sink.hdfs.roundUnit = hour #是否使用本地时间戳 my_avro.sinks.hdfs_sink.hdfs.useLocalTimeStamp = true #积攒多少个Event才flush到HDFS一次 my_avro.sinks.hdfs_sink.hdfs.batchSize = 100 #设置文件类型,可支持压缩 my_avro.sinks.hdfs_sink.hdfs.fileType = DataStream #多久生成一个新的文件 my_avro.sinks.hdfs_sink.hdfs.rollInterval = 60 #设置每个文件的滚动大小大概是128M my_avro.sinks.hdfs_sink.hdfs.rollSize = 134210000 #文件的滚动与Event数量无关 my_avro.sinks.hdfs_sink.hdfs.rollCount = 0 #最小冗余数 my_avro.sinks.hdfs_sink.hdfs.minBlockReplicas = 1 # Describe the channel my_avro.channels.memory_channel.type = memory my_avro.channels.memory_channel.capacity = 1000 my_avro.channels.memory_channel.transactionCapacity = 100 # Bind the source and sink to the channel my_avro.sources.avro_source.channels = memory_channel my_avro.sinks.hdfs_sink.channel = memory_channel [root@node105.yinzhengjie.org.cn ~]#
[root@node105.yinzhengjie.org.cn ~]# cat /home/data/flume/shell/start-my_avro.sh #!/bin/bash #@author :yinzhengjie #blog:http://www.cnblogs.com/yinzhengjie #EMAIL:y1053419035@qq.com #Data:Thu Oct 18 11:26:06 CST 2018 #将监控数据发送给ganglia,需要指定ganglia服务器地址,使用请确认是否部署好ganglia服务! #nohup flume-ng agent -c /home/data/flume/job --conf-file=/home/data/flume/job/flume-my_avro.conf --name my_avro -Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=node105.yinzhengjie.org.cn:8649 -Dflume.root.logger=INFO,console >> /home/data/flume/log/flume-gangl ia-flume-my_avro.log 2>&1 & #启动flume自身的监控参数,默认执行以下脚本 nohup flume-ng agent -c /home/data/flume/job --conf-file=/home/data/flume/job/flume-my_avro.conf --name my_avro -Dflume.monitoring.type=http -Dflume.monitoring.port=10514 -Dflume.root.logger=INFO,console >> /home/data/flume/log/flume-my_avro.log 2>&1 & [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# ss -ntl State Recv-Q Send-Q Local Address:Port Peer Address:Port LISTEN 0 128 *:22 *:* LISTEN 0 128 :::22 :::* [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# /home/data/flume/shell/start-my_avro.sh [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# ss -ntl State Recv-Q Send-Q Local Address:Port Peer Address:Port LISTEN 0 50 *:10514 *:* LISTEN 0 128 *:22 *:* LISTEN 0 50 172.30.1.105:8888 *:* LISTEN 0 128 :::22 :::* [root@node105.yinzhengjie.org.cn ~]#
[root@node105.yinzhengjie.org.cn ~]# tail -100f /home/data/flume/log/flume-my_avro.log Warning: JAVA_HOME is not set! Info: Including Hadoop libraries found via (/home/softwares/hadoop-2.6.0/bin/hadoop) for HDFS access Info: Including Hive libraries found via () for Hive access + exec /home/softwares/jdk1.8.0_201/bin/java -Xmx20m -Dflume.monitoring.type=http -Dflume.monitoring.port=10514 -Dflume.root.logger=INFO,console -cp '/home/data/flume/job:/home/softwares/apache-flume-1.9.0-bin/lib/*:/home/softwares/hadoop-2.6.0/etc/hadoop:/home/software s/hadoop-2.6.0/share/hadoop/common/lib/*:/home/softwares/hadoop-2.6.0/share/hadoop/common/*:/home/softwares/hadoop-2.6.0/share/hadoop/hdfs:/home/softwares/hadoop-2.6.0/share/hadoop/hdfs/lib/*:/home/softwares/hadoop-2.6.0/share/hadoop/hdfs/*:/home/softwares/hadoop-2.6.0/share/hadoop/yarn/lib/*:/home/softwares/hadoop-2.6.0/share/hadoop/yarn/*:/home/softwares/hadoop-2.6.0/share/hadoop/mapreduce/lib/*:/home/softwares/hadoop-2.6.0/share/hadoop/mapreduce/*:/contrib/capacity-scheduler/*.jar:/lib/*' -Djava.library.path=:/home/softwares/hadoop-2.6.0/lib/native org.apache.flume.node.Application --conf-file=/home/data/flume/job/flume-my_avro.conf --name my_avroSLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/home/softwares/apache-flume-1.9.0-bin/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/home/softwares/hadoop-2.6.0/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 19/07/20 22:16:38 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting 19/07/20 22:16:38 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:/home/data/flume/job/flume-my_avro.conf 19/07/20 22:16:38 INFO conf.FlumeConfiguration: Processing:avro_source 19/07/20 22:16:38 INFO conf.FlumeConfiguration: Processing:hdfs_sink 19/07/20 22:16:38 INFO conf.FlumeConfiguration: Processing:hdfs_sink 19/07/20 22:16:38 INFO conf.FlumeConfiguration: Processing:avro_source 19/07/20 22:16:38 INFO conf.FlumeConfiguration: Processing:hdfs_sink 19/07/20 22:16:38 INFO conf.FlumeConfiguration: Processing:hdfs_sink 19/07/20 22:16:38 INFO conf.FlumeConfiguration: Added sinks: hdfs_sink Agent: my_avro 19/07/20 22:16:38 INFO conf.FlumeConfiguration: Processing:memory_channel 19/07/20 22:16:38 INFO conf.FlumeConfiguration: Processing:hdfs_sink 19/07/20 22:16:38 INFO conf.FlumeConfiguration: Processing:hdfs_sink 19/07/20 22:16:38 INFO conf.FlumeConfiguration: Processing:memory_channel 19/07/20 22:16:38 INFO conf.FlumeConfiguration: Processing:hdfs_sink 19/07/20 22:16:38 INFO conf.FlumeConfiguration: Processing:hdfs_sink 19/07/20 22:16:38 INFO conf.FlumeConfiguration: Processing:hdfs_sink 19/07/20 22:16:38 INFO conf.FlumeConfiguration: Processing:avro_source 19/07/20 22:16:38 INFO conf.FlumeConfiguration: Processing:hdfs_sink 19/07/20 22:16:38 INFO conf.FlumeConfiguration: Processing:avro_source 19/07/20 22:16:38 INFO conf.FlumeConfiguration: Processing:hdfs_sink 19/07/20 22:16:38 INFO conf.FlumeConfiguration: Processing:hdfs_sink 19/07/20 22:16:38 INFO conf.FlumeConfiguration: Processing:hdfs_sink 19/07/20 22:16:38 INFO conf.FlumeConfiguration: Processing:hdfs_sink 19/07/20 22:16:38 INFO conf.FlumeConfiguration: Processing:memory_channel 19/07/20 22:16:38 WARN conf.FlumeConfiguration: Agent configuration for 'my_avro' has no configfilters. 19/07/20 22:16:38 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [my_avro] 19/07/20 22:16:38 INFO node.AbstractConfigurationProvider: Creating channels 19/07/20 22:16:38 INFO channel.DefaultChannelFactory: Creating instance of channel memory_channel type memory 19/07/20 22:16:38 INFO node.AbstractConfigurationProvider: Created channel memory_channel 19/07/20 22:16:38 INFO source.DefaultSourceFactory: Creating instance of source avro_source, type avro 19/07/20 22:16:38 INFO sink.DefaultSinkFactory: Creating instance of sink: hdfs_sink, type: hdfs 19/07/20 22:16:38 INFO node.AbstractConfigurationProvider: Channel memory_channel connected to [avro_source, hdfs_sink] 19/07/20 22:16:38 INFO node.Application: Starting new configuration:{ sourceRunners:{avro_source=EventDrivenSourceRunner: { source:Avro source avro_source: { bindAddress: node105.yinzhengjie.org.cn, port: 8888 } }} sinkRunners:{hdfs_sink=SinkRunner: { policy:org.apache. flume.sink.DefaultSinkProcessor@77a41618 counterGroup:{ name:null counters:{} } }} channels:{memory_channel=org.apache.flume.channel.MemoryChannel{name: memory_channel}} }19/07/20 22:16:38 INFO node.Application: Starting Channel memory_channel 19/07/20 22:16:38 INFO node.Application: Waiting for channel: memory_channel to start. Sleeping for 500 ms 19/07/20 22:16:38 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: memory_channel: Successfully registered new MBean. 19/07/20 22:16:38 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: memory_channel started 19/07/20 22:16:39 INFO node.Application: Starting Sink hdfs_sink 19/07/20 22:16:39 INFO node.Application: Starting Source avro_source 19/07/20 22:16:39 INFO source.AvroSource: Starting Avro source avro_source: { bindAddress: node105.yinzhengjie.org.cn, port: 8888 }... 19/07/20 22:16:39 INFO util.log: Logging initialized @1334ms to org.eclipse.jetty.util.log.Slf4jLog 19/07/20 22:16:39 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: hdfs_sink: Successfully registered new MBean. 19/07/20 22:16:39 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: hdfs_sink started 19/07/20 22:16:39 INFO server.Server: jetty-9.4.6.v20170531 19/07/20 22:16:39 INFO server.AbstractConnector: Started ServerConnector@72e46d99{HTTP/1.1,[http/1.1]}{0.0.0.0:10514} 19/07/20 22:16:39 INFO server.Server: Started @1740ms 19/07/20 22:16:39 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: avro_source: Successfully registered new MBean. 19/07/20 22:16:39 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: avro_source started 19/07/20 22:16:39 INFO source.AvroSource: Avro source avro_source started.
[root@node105.yinzhengjie.org.cn ~]# cat /home/data/flume/job/flume-my_file_roll.conf # Name the components on this agent my_file_roll.sources = avro_source my_file_roll.sinks = file_roll_sink my_file_roll.channels = memory_channel # Describe/configure the source my_file_roll.sources.avro_source.type = avro my_file_roll.sources.avro_source.bind = node105.yinzhengjie.org.cn my_file_roll.sources.avro_source.port = 9999 # Describe the sink my_file_roll.sinks.file_roll_sink.type = file_roll #输出的本地目录必须是已经存在的目录,如果该目录不存在,并不会创建新的目录。 my_file_roll.sinks.file_roll_sink.sink.directory = /yinzhengjie/data/flume/output # Describe the channel my_file_roll.channels.memory_channel.type = memory my_file_roll.channels.memory_channel.capacity = 1000 my_file_roll.channels.memory_channel.transactionCapacity = 100 # Bind the source and sink to the channel my_file_roll.sources.avro_source.channels = memory_channel my_file_roll.sinks.file_roll_sink.channel = memory_channel [root@node105.yinzhengjie.org.cn ~]#
[root@node105.yinzhengjie.org.cn ~]# cat /home/data/flume/shell/start-my_file_roll.sh #!/bin/bash #@author :yinzhengjie #blog:http://www.cnblogs.com/yinzhengjie #EMAIL:y1053419035@qq.com #Data:Thu Oct 18 11:26:06 CST 2018 #将监控数据发送给ganglia,需要指定ganglia服务器地址,使用请确认是否部署好ganglia服务! #nohup flume-ng agent -c /home/data/flume/job --conf-file=/home/data/flume/job/flume-my_file_roll.conf --name my_file_roll -Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=node105.yinzhengjie.org.cn:8649 -Dflume.root.logger=INFO,console >> /home/data/flume/log/f lume-ganglia-flume-my_file_roll.log 2>&1 & #启动flume自身的监控参数,默认执行以下脚本 nohup flume-ng agent -c /home/data/flume/job --conf-file=/home/data/flume/job/flume-my_file_roll.conf --name my_file_roll -Dflume.monitoring.type=http -Dflume.monitoring.port=10515 -Dflume.root.logger=INFO,console >> /home/data/flume/log/flume-my_file_roll.log 2>&1 & [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# ss -ntl State Recv-Q Send-Q Local Address:Port Peer Address:Port LISTEN 0 50 *:10514 *:* LISTEN 0 128 *:22 *:* LISTEN 0 50 172.30.1.105:8888 *:* LISTEN 0 128 :::22 :::* [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# /home/data/flume/shell/start-my_file_roll.sh [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# ss -ntl State Recv-Q Send-Q Local Address:Port Peer Address:Port LISTEN 0 50 *:10514 *:* LISTEN 0 50 *:10515 *:* LISTEN 0 128 *:22 *:* LISTEN 0 50 172.30.1.105:8888 *:* LISTEN 0 128 :::22 :::* [root@node105.yinzhengjie.org.cn ~]#
[root@node105.yinzhengjie.org.cn ~]# tail -100f /home/data/flume/log/flume-my_file_roll.log Warning: JAVA_HOME is not set! Info: Including Hadoop libraries found via (/home/softwares/hadoop-2.6.0/bin/hadoop) for HDFS access Info: Including Hive libraries found via () for Hive access + exec /home/softwares/jdk1.8.0_201/bin/java -Xmx20m -Dflume.monitoring.type=http -Dflume.monitoring.port=10515 -Dflume.root.logger=INFO,console -cp '/home/data/flume/job:/home/softwares/apache-flume-1.9.0-bin/lib/*:/home/softwares/hadoop-2.6.0/etc/hadoop:/home/software s/hadoop-2.6.0/share/hadoop/common/lib/*:/home/softwares/hadoop-2.6.0/share/hadoop/common/*:/home/softwares/hadoop-2.6.0/share/hadoop/hdfs:/home/softwares/hadoop-2.6.0/share/hadoop/hdfs/lib/*:/home/softwares/hadoop-2.6.0/share/hadoop/hdfs/*:/home/softwares/hadoop-2.6.0/share/hadoop/yarn/lib/*:/home/softwares/hadoop-2.6.0/share/hadoop/yarn/*:/home/softwares/hadoop-2.6.0/share/hadoop/mapreduce/lib/*:/home/softwares/hadoop-2.6.0/share/hadoop/mapreduce/*:/contrib/capacity-scheduler/*.jar:/lib/*' -Djava.library.path=:/home/softwares/hadoop-2.6.0/lib/native org.apache.flume.node.Application --conf-file=/home/data/flume/job/flume-my_file_roll.conf --name my_file_rollSLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/home/softwares/apache-flume-1.9.0-bin/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/home/softwares/hadoop-2.6.0/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 19/07/20 22:25:32 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting 19/07/20 22:25:32 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:/home/data/flume/job/flume-my_file_roll.conf 19/07/20 22:25:32 INFO conf.FlumeConfiguration: Processing:file_roll_sink 19/07/20 22:25:32 INFO conf.FlumeConfiguration: Processing:file_roll_sink 19/07/20 22:25:32 INFO conf.FlumeConfiguration: Processing:memory_channel 19/07/20 22:25:32 INFO conf.FlumeConfiguration: Processing:file_roll_sink 19/07/20 22:25:32 INFO conf.FlumeConfiguration: Added sinks: file_roll_sink Agent: my_file_roll 19/07/20 22:25:32 INFO conf.FlumeConfiguration: Processing:avro_source 19/07/20 22:25:32 INFO conf.FlumeConfiguration: Processing:memory_channel 19/07/20 22:25:32 INFO conf.FlumeConfiguration: Processing:avro_source 19/07/20 22:25:32 INFO conf.FlumeConfiguration: Processing:memory_channel 19/07/20 22:25:32 INFO conf.FlumeConfiguration: Processing:avro_source 19/07/20 22:25:32 INFO conf.FlumeConfiguration: Processing:avro_source 19/07/20 22:25:32 WARN conf.FlumeConfiguration: Agent configuration for 'my_file_roll' has no configfilters. 19/07/20 22:25:32 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [my_file_roll] 19/07/20 22:25:32 INFO node.AbstractConfigurationProvider: Creating channels 19/07/20 22:25:32 INFO channel.DefaultChannelFactory: Creating instance of channel memory_channel type memory 19/07/20 22:25:32 INFO node.AbstractConfigurationProvider: Created channel memory_channel 19/07/20 22:25:32 INFO source.DefaultSourceFactory: Creating instance of source avro_source, type avro 19/07/20 22:25:32 INFO sink.DefaultSinkFactory: Creating instance of sink: file_roll_sink, type: file_roll 19/07/20 22:25:32 INFO node.AbstractConfigurationProvider: Channel memory_channel connected to [avro_source, file_roll_sink] 19/07/20 22:25:32 INFO node.Application: Starting new configuration:{ sourceRunners:{avro_source=EventDrivenSourceRunner: { source:Avro source avro_source: { bindAddress: node105.yinzhengjie.org.cn, port: 9999 } }} sinkRunners:{file_roll_sink=SinkRunner: { policy:org.ap ache.flume.sink.DefaultSinkProcessor@5dda816 counterGroup:{ name:null counters:{} } }} channels:{memory_channel=org.apache.flume.channel.MemoryChannel{name: memory_channel}} }19/07/20 22:25:32 INFO node.Application: Starting Channel memory_channel 19/07/20 22:25:32 INFO node.Application: Waiting for channel: memory_channel to start. Sleeping for 500 ms 19/07/20 22:25:32 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: memory_channel: Successfully registered new MBean. 19/07/20 22:25:32 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: memory_channel started 19/07/20 22:25:33 INFO node.Application: Starting Sink file_roll_sink 19/07/20 22:25:33 INFO node.Application: Starting Source avro_source 19/07/20 22:25:33 INFO source.AvroSource: Starting Avro source avro_source: { bindAddress: node105.yinzhengjie.org.cn, port: 9999 }... 19/07/20 22:25:33 INFO util.log: Logging initialized @1346ms to org.eclipse.jetty.util.log.Slf4jLog 19/07/20 22:25:33 INFO sink.RollingFileSink: Starting org.apache.flume.sink.RollingFileSink{name:file_roll_sink, channel:memory_channel}... 19/07/20 22:25:33 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: file_roll_sink: Successfully registered new MBean. 19/07/20 22:25:33 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: file_roll_sink started 19/07/20 22:25:33 INFO sink.RollingFileSink: RollingFileSink file_roll_sink started. 19/07/20 22:25:33 INFO server.Server: jetty-9.4.6.v20170531 19/07/20 22:25:33 INFO server.AbstractConnector: Started ServerConnector@1bca8c57{HTTP/1.1,[http/1.1]}{0.0.0.0:10515} 19/07/20 22:25:33 INFO server.Server: Started @1818ms 19/07/20 22:25:33 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: avro_source: Successfully registered new MBean. 19/07/20 22:25:33 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: avro_source started 19/07/20 22:25:33 INFO source.AvroSource: Avro source avro_source started.
[root@node105.yinzhengjie.org.cn ~]# cat /home/data/flume/job/flume-replica.conf # Name the components on this agent replica.sources = exec_source replica.sinks = hdfs_sink file_roll_sink replica.channels = hdfs_channel file_roll_channel # 将数据流复制给多个channel replica.sources.exec_source.selector.type = replicating # Describe/configure the source replica.sources.exec_source.type = exec replica.sources.exec_source.command = tail -F /yinzhengjie/data/flume/blog.txt replica.sources.exec_source.shell = /bin/bash -c # 定义要输出到hdfs的sink,注意端口号 replica.sinks.hdfs_sink.type = avro replica.sinks.hdfs_sink.hostname = node105.yinzhengjie.org.cn replica.sinks.hdfs_sink.port = 8888 # 定义要输出到local filesystem的sink replica.sinks.file_roll_sink.type = avro replica.sinks.file_roll_sink.hostname = node105.yinzhengjie.org.cn replica.sinks.file_roll_sink.port = 9999 # Describe the channel replica.channels.hdfs_channel.type = memory replica.channels.hdfs_channel.capacity = 1000 replica.channels.hdfs_channel.transactionCapacity = 100 replica.channels.file_roll_channel.type = memory replica.channels.file_roll_channel.capacity = 1000 replica.channels.file_roll_channel.transactionCapacity = 100 # Bind the source and sink to the channel replica.sources.exec_source.channels = hdfs_channel file_roll_channel replica.sinks.hdfs_sink.channel = hdfs_channel replica.sinks.file_roll_sink.channel = file_roll_channel [root@node105.yinzhengjie.org.cn ~]#
[root@node105.yinzhengjie.org.cn ~]# cat /home/data/flume/shell/start-replica.sh #!/bin/bash #@author :yinzhengjie #blog:http://www.cnblogs.com/yinzhengjie #EMAIL:y1053419035@qq.com #Data:Thu Oct 18 11:26:06 CST 2018 #将监控数据发送给ganglia,需要指定ganglia服务器地址,使用请确认是否部署好ganglia服务! #nohup flume-ng agent -c /home/data/flume/job --conf-file=/home/data/flume/job/flume-replica.conf --name replica -Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=node105.yinzhengjie.org.cn:8649 -Dflume.root.logger=INFO,console >> /home/data/flume/log/flume-gangl ia-flume-replica.log 2>&1 & #启动flume自身的监控参数,默认执行以下脚本 nohup flume-ng agent -c /home/data/flume/job --conf-file=/home/data/flume/job/flume-replica.conf --name replica -Dflume.monitoring.type=http -Dflume.monitoring.port=10516 -Dflume.root.logger=INFO,console >> /home/data/flume/log/flume-replica.log 2>&1 & [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# ss -ntl State Recv-Q Send-Q Local Address:Port Peer Address:Port LISTEN 0 50 172.30.1.105:9999 *:* LISTEN 0 50 *:10514 *:* LISTEN 0 50 *:10515 *:* LISTEN 0 128 *:22 *:* LISTEN 0 50 172.30.1.105:8888 *:* LISTEN 0 128 :::22 :::* [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# /home/data/flume/shell/start-replica.sh [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# ss -ntl State Recv-Q Send-Q Local Address:Port Peer Address:Port LISTEN 0 50 172.30.1.105:9999 *:* LISTEN 0 50 *:10514 *:* LISTEN 0 50 *:10515 *:* LISTEN 0 50 *:10516 *:* LISTEN 0 128 *:22 *:* LISTEN 0 50 172.30.1.105:8888 *:* LISTEN 0 128 :::22 :::* [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]#
[root@node105.yinzhengjie.org.cn ~]# tail -100f /home/data/flume/log/flume-replica.log Warning: JAVA_HOME is not set! Info: Including Hadoop libraries found via (/home/softwares/hadoop-2.6.0/bin/hadoop) for HDFS access Info: Including Hive libraries found via () for Hive access + exec /home/softwares/jdk1.8.0_201/bin/java -Xmx20m -Dflume.monitoring.type=http -Dflume.monitoring.port=10516 -Dflume.root.logger=INFO,console -cp '/home/data/flume/job:/home/softwares/apache-flume-1.9.0-bin/lib/*:/home/softwares/hadoop-2.6.0/etc/hadoop:/home/software s/hadoop-2.6.0/share/hadoop/common/lib/*:/home/softwares/hadoop-2.6.0/share/hadoop/common/*:/home/softwares/hadoop-2.6.0/share/hadoop/hdfs:/home/softwares/hadoop-2.6.0/share/hadoop/hdfs/lib/*:/home/softwares/hadoop-2.6.0/share/hadoop/hdfs/*:/home/softwares/hadoop-2.6.0/share/hadoop/yarn/lib/*:/home/softwares/hadoop-2.6.0/share/hadoop/yarn/*:/home/softwares/hadoop-2.6.0/share/hadoop/mapreduce/lib/*:/home/softwares/hadoop-2.6.0/share/hadoop/mapreduce/*:/contrib/capacity-scheduler/*.jar:/lib/*' -Djava.library.path=:/home/softwares/hadoop-2.6.0/lib/native org.apache.flume.node.Application --conf-file=/home/data/flume/job/flume-replica.conf --name replicaSLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/home/softwares/apache-flume-1.9.0-bin/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/home/softwares/hadoop-2.6.0/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 19/07/20 22:35:04 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting 19/07/20 22:35:04 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:/home/data/flume/job/flume-replica.conf 19/07/20 22:35:04 INFO conf.FlumeConfiguration: Processing:hdfs_sink 19/07/20 22:35:04 INFO conf.FlumeConfiguration: Processing:file_roll_channel 19/07/20 22:35:04 INFO conf.FlumeConfiguration: Processing:hdfs_sink 19/07/20 22:35:04 INFO conf.FlumeConfiguration: Processing:hdfs_channel 19/07/20 22:35:04 INFO conf.FlumeConfiguration: Processing:hdfs_channel 19/07/20 22:35:04 INFO conf.FlumeConfiguration: Processing:file_roll_channel 19/07/20 22:35:04 INFO conf.FlumeConfiguration: Processing:file_roll_sink 19/07/20 22:35:04 INFO conf.FlumeConfiguration: Processing:hdfs_channel 19/07/20 22:35:04 INFO conf.FlumeConfiguration: Added sinks: hdfs_sink file_roll_sink Agent: replica 19/07/20 22:35:04 INFO conf.FlumeConfiguration: Processing:file_roll_sink 19/07/20 22:35:04 INFO conf.FlumeConfiguration: Processing:exec_source 19/07/20 22:35:04 INFO conf.FlumeConfiguration: Processing:file_roll_sink 19/07/20 22:35:04 INFO conf.FlumeConfiguration: Processing:exec_source 19/07/20 22:35:04 INFO conf.FlumeConfiguration: Processing:exec_source 19/07/20 22:35:04 INFO conf.FlumeConfiguration: Processing:exec_source 19/07/20 22:35:04 INFO conf.FlumeConfiguration: Processing:hdfs_sink 19/07/20 22:35:04 INFO conf.FlumeConfiguration: Processing:file_roll_channel 19/07/20 22:35:04 INFO conf.FlumeConfiguration: Processing:hdfs_sink 19/07/20 22:35:04 INFO conf.FlumeConfiguration: Processing:exec_source 19/07/20 22:35:04 INFO conf.FlumeConfiguration: Processing:file_roll_sink 19/07/20 22:35:04 WARN conf.FlumeConfiguration: Agent configuration for 'replica' has no configfilters. 19/07/20 22:35:04 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [replica] 19/07/20 22:35:04 INFO node.AbstractConfigurationProvider: Creating channels 19/07/20 22:35:04 INFO channel.DefaultChannelFactory: Creating instance of channel hdfs_channel type memory 19/07/20 22:35:04 INFO node.AbstractConfigurationProvider: Created channel hdfs_channel 19/07/20 22:35:04 INFO channel.DefaultChannelFactory: Creating instance of channel file_roll_channel type memory 19/07/20 22:35:04 INFO node.AbstractConfigurationProvider: Created channel file_roll_channel 19/07/20 22:35:04 INFO source.DefaultSourceFactory: Creating instance of source exec_source, type exec 19/07/20 22:35:04 INFO sink.DefaultSinkFactory: Creating instance of sink: file_roll_sink, type: avro 19/07/20 22:35:04 INFO sink.AbstractRpcSink: Connection reset is set to 0. Will not reset connection to next hop 19/07/20 22:35:04 INFO sink.DefaultSinkFactory: Creating instance of sink: hdfs_sink, type: avro 19/07/20 22:35:04 INFO sink.AbstractRpcSink: Connection reset is set to 0. Will not reset connection to next hop 19/07/20 22:35:04 INFO node.AbstractConfigurationProvider: Channel hdfs_channel connected to [exec_source, hdfs_sink] 19/07/20 22:35:04 INFO node.AbstractConfigurationProvider: Channel file_roll_channel connected to [exec_source, file_roll_sink] 19/07/20 22:35:04 INFO node.Application: Starting new configuration:{ sourceRunners:{exec_source=EventDrivenSourceRunner: { source:org.apache.flume.source.ExecSource{name:exec_source,state:IDLE} }} sinkRunners:{file_roll_sink=SinkRunner: { policy:org.apache.flume.sink.D efaultSinkProcessor@444b86bf counterGroup:{ name:null counters:{} } }, hdfs_sink=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@2d990d41 counterGroup:{ name:null counters:{} } }} channels:{hdfs_channel=org.apache.flume.channel.MemoryChannel{name: hdfs_channel}, file_roll_channel=org.apache.flume.channel.MemoryChannel{name: file_roll_channel}} }19/07/20 22:35:04 INFO node.Application: Starting Channel hdfs_channel 19/07/20 22:35:04 INFO node.Application: Starting Channel file_roll_channel 19/07/20 22:35:04 INFO node.Application: Waiting for channel: hdfs_channel to start. Sleeping for 500 ms 19/07/20 22:35:04 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: file_roll_channel: Successfully registered new MBean. 19/07/20 22:35:04 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: file_roll_channel started 19/07/20 22:35:04 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: hdfs_channel: Successfully registered new MBean. 19/07/20 22:35:04 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: hdfs_channel started 19/07/20 22:35:04 INFO node.Application: Starting Sink file_roll_sink 19/07/20 22:35:04 INFO node.Application: Starting Sink hdfs_sink 19/07/20 22:35:04 INFO sink.AbstractRpcSink: Starting RpcSink hdfs_sink { host: node105.yinzhengjie.org.cn, port: 8888 }... 19/07/20 22:35:04 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: hdfs_sink: Successfully registered new MBean. 19/07/20 22:35:04 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: hdfs_sink started 19/07/20 22:35:04 INFO sink.AbstractRpcSink: Rpc sink hdfs_sink: Building RpcClient with hostname: node105.yinzhengjie.org.cn, port: 8888 19/07/20 22:35:04 INFO sink.AvroSink: Attempting to create Avro Rpc client. 19/07/20 22:35:04 INFO api.NettyAvroRpcClient: Using default maxIOWorkers 19/07/20 22:35:04 INFO sink.AbstractRpcSink: Starting RpcSink file_roll_sink { host: node105.yinzhengjie.org.cn, port: 9999 }... 19/07/20 22:35:04 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: file_roll_sink: Successfully registered new MBean. 19/07/20 22:35:04 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: file_roll_sink started 19/07/20 22:35:04 INFO sink.AbstractRpcSink: Rpc sink file_roll_sink: Building RpcClient with hostname: node105.yinzhengjie.org.cn, port: 9999 19/07/20 22:35:04 INFO sink.AvroSink: Attempting to create Avro Rpc client. 19/07/20 22:35:04 INFO api.NettyAvroRpcClient: Using default maxIOWorkers 19/07/20 22:35:04 INFO node.Application: Starting Source exec_source 19/07/20 22:35:04 INFO source.ExecSource: Exec source starting with command: tail -F /yinzhengjie/data/flume/blog.txt 19/07/20 22:35:04 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: exec_source: Successfully registered new MBean. 19/07/20 22:35:04 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: exec_source started 19/07/20 22:35:04 INFO util.log: Logging initialized @1370ms to org.eclipse.jetty.util.log.Slf4jLog 19/07/20 22:35:05 INFO server.Server: jetty-9.4.6.v20170531 19/07/20 22:35:05 INFO server.AbstractConnector: Started ServerConnector@3e973d1f{HTTP/1.1,[http/1.1]}{0.0.0.0:10516} 19/07/20 22:35:05 INFO server.Server: Started @1627ms 19/07/20 22:35:05 INFO sink.AbstractRpcSink: Rpc sink hdfs_sink started. 19/07/20 22:35:05 INFO sink.AbstractRpcSink: Rpc sink file_roll_sink started.
[root@node105.yinzhengjie.org.cn ~]# cat /yinzhengjie/data/flume/blog.txt https://www.cnblogs.com/yinzhengjie [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# hdfs dfs -ls /flume/20190720/22 Found 1 items -rw-r--r-- 3 root supergroup 36 2019-07-20 22:36 /flume/20190720/22/172.30.1.105-.1563633311415 [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# hdfs dfs -cat /flume/20190720/22/172.30.1.105-.1563633311415 https://www.cnblogs.com/yinzhengjie [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# ll /yinzhengjie/data/flume/output/ total 4 -rw-r--r-- 1 root root 0 Jul 20 22:25 1563632732571-1 -rw-r--r-- 1 root root 0 Jul 20 22:30 1563632732571-10 -rw-r--r-- 1 root root 0 Jul 20 22:30 1563632732571-11 -rw-r--r-- 1 root root 0 Jul 20 22:31 1563632732571-12 -rw-r--r-- 1 root root 0 Jul 20 22:31 1563632732571-13 -rw-r--r-- 1 root root 0 Jul 20 22:32 1563632732571-14 -rw-r--r-- 1 root root 0 Jul 20 22:32 1563632732571-15 -rw-r--r-- 1 root root 0 Jul 20 22:33 1563632732571-16 -rw-r--r-- 1 root root 0 Jul 20 22:33 1563632732571-17 -rw-r--r-- 1 root root 0 Jul 20 22:34 1563632732571-18 -rw-r--r-- 1 root root 0 Jul 20 22:34 1563632732571-19 -rw-r--r-- 1 root root 0 Jul 20 22:26 1563632732571-2 -rw-r--r-- 1 root root 36 Jul 20 22:35 1563632732571-20 -rw-r--r-- 1 root root 0 Jul 20 22:35 1563632732571-21 -rw-r--r-- 1 root root 0 Jul 20 22:36 1563632732571-22 -rw-r--r-- 1 root root 0 Jul 20 22:36 1563632732571-23 -rw-r--r-- 1 root root 0 Jul 20 22:37 1563632732571-24 -rw-r--r-- 1 root root 0 Jul 20 22:37 1563632732571-25 -rw-r--r-- 1 root root 0 Jul 20 22:38 1563632732571-26 -rw-r--r-- 1 root root 0 Jul 20 22:38 1563632732571-27 -rw-r--r-- 1 root root 0 Jul 20 22:39 1563632732571-28 -rw-r--r-- 1 root root 0 Jul 20 22:39 1563632732571-29 -rw-r--r-- 1 root root 0 Jul 20 22:26 1563632732571-3 -rw-r--r-- 1 root root 0 Jul 20 22:40 1563632732571-30 -rw-r--r-- 1 root root 0 Jul 20 22:40 1563632732571-31 -rw-r--r-- 1 root root 0 Jul 20 22:41 1563632732571-32 -rw-r--r-- 1 root root 0 Jul 20 22:41 1563632732571-33 -rw-r--r-- 1 root root 0 Jul 20 22:42 1563632732571-34 -rw-r--r-- 1 root root 0 Jul 20 22:42 1563632732571-35 -rw-r--r-- 1 root root 0 Jul 20 22:43 1563632732571-36 -rw-r--r-- 1 root root 0 Jul 20 22:43 1563632732571-37 -rw-r--r-- 1 root root 0 Jul 20 22:27 1563632732571-4 -rw-r--r-- 1 root root 0 Jul 20 22:27 1563632732571-5 -rw-r--r-- 1 root root 0 Jul 20 22:28 1563632732571-6 -rw-r--r-- 1 root root 0 Jul 20 22:28 1563632732571-7 -rw-r--r-- 1 root root 0 Jul 20 22:29 1563632732571-8 -rw-r--r-- 1 root root 0 Jul 20 22:29 1563632732571-9 [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# cat /yinzhengjie/data/flume/output/1563632732571-20 https://www.cnblogs.com/yinzhengjie [root@node105.yinzhengjie.org.cn ~]#
[root@node105.yinzhengjie.org.cn ~]# cat /home/data/flume/job/flume-my_multiplexing_selector.conf # Name the components on this agent my_multiplexing_selector.sources = avro_source my_multiplexing_selector.sinks = Java_sink Go_sink Python_sink my_multiplexing_selector.channels = Java_channel Go_channel Python_channel # Describe/configure the source my_multiplexing_selector.sources.avro_source.type = avro my_multiplexing_selector.sources.avro_source.bind = node105.yinzhengjie.org.cn my_multiplexing_selector.sources.avro_source.port = 8888 # 指定挑选器类型为:多路复用 my_multiplexing_selector.sources.avro_source.selector.type = multiplexing # 指定event header的key值 my_multiplexing_selector.sources.avro_source.selector.header = language # 指定event header的key值对应的value值,编辑文件内容如:language java my_multiplexing_selector.sources.avro_source.selector.mapping.JAVA = Java_channel my_multiplexing_selector.sources.avro_source.selector.mapping.GOLANG = Go_channel my_multiplexing_selector.sources.avro_source.selector.default = Python_channel # Describe the sink my_multiplexing_selector.sinks.Java_sink.type = file_roll my_multiplexing_selector.sinks.Java_sink.sink.directory= /yinzhengjie/language/java my_multiplexing_selector.sinks.Java_sink.sink.rollInterval = 0 my_multiplexing_selector.sinks.Go_sink.type = file_roll my_multiplexing_selector.sinks.Go_sink.sink.directory= /yinzhengjie/language/golang my_multiplexing_selector.sinks.Go_sink.sink.rollInterval = 0 my_multiplexing_selector.sinks.Python_sink.type = file_roll my_multiplexing_selector.sinks.Python_sink.sink.directory= /yinzhengjie/language/python my_multiplexing_selector.sinks.Python_sink.sink.rollInterval = 0 # Use a channel which buffers events in memory my_multiplexing_selector.channels.Java_channel.type = memory my_multiplexing_selector.channels.Java_channel.capacity = 100000 my_multiplexing_selector.channels.Java_channel.transactionCapacity = 10000 my_multiplexing_selector.channels.Go_channel.type = memory my_multiplexing_selector.channels.Go_channel.capacity = 100000 my_multiplexing_selector.channels.Go_channel.transactionCapacity = 10000 my_multiplexing_selector.channels.Python_channel.type = memory my_multiplexing_selector.channels.Python_channel.capacity = 100000 my_multiplexing_selector.channels.Python_channel.transactionCapacity = 10000 # Bind the source and sink to the channel my_multiplexing_selector.sources.avro_source.channels = Java_channel Go_channel Python_channel my_multiplexing_selector.sinks.Java_sink.channel = Java_channel my_multiplexing_selector.sinks.Go_sink.channel = Go_channel my_multiplexing_selector.sinks.Python_sink.channel = Python_channel [root@node105.yinzhengjie.org.cn ~]#
[root@node105.yinzhengjie.org.cn ~]# cat /home/data/flume/shell/start-my_multiplexing_selector.sh #!/bin/bash #@author :yinzhengjie #blog:http://www.cnblogs.com/yinzhengjie #EMAIL:y1053419035@qq.com #Data:Thu Oct 18 11:26:06 CST 2018 #将监控数据发送给ganglia,需要指定ganglia服务器地址,使用请确认是否部署好ganglia服务! #nohup flume-ng agent -c /home/data/flume/job --conf-file=/home/data/flume/job/flume-my_multiplexing_selector.conf --name my_multiplexing_selector -Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=node105.yinzhengjie.org.cn:8649 -Dflume.root.logger=INFO,console > > /home/data/flume/log/flume-ganglia-flume-my_multiplexing_selector.log 2>&1 & #启动flume自身的监控参数,默认执行以下脚本 nohup flume-ng agent -c /home/data/flume/job --conf-file=/home/data/flume/job/flume-my_multiplexing_selector.conf --name my_multiplexing_selector -Dflume.monitoring.type=http -Dflume.monitoring.port=10522 -Dflume.root.logger=INFO,console >> /home/data/flume/log/flume- my_multiplexing_selector.log 2>&1 &[root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# ss -ntl State Recv-Q Send-Q Local Address:Port Peer Address:Port LISTEN 0 128 *:22 *:* LISTEN 0 128 :::22 :::* [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# /home/data/flume/shell/start-my_multiplexing_selector.sh [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# ss -ntl State Recv-Q Send-Q Local Address:Port Peer Address:Port LISTEN 0 128 *:22 *:* LISTEN 0 50 172.30.1.105:8888 *:* LISTEN 0 50 *:10522 *:* LISTEN 0 128 :::22 :::* [root@node105.yinzhengjie.org.cn ~]#
[root@node105.yinzhengjie.org.cn ~]# cat a.txt language java language php language shell language golang language python language scanla language js language vbs language c++ language linux [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# cat header.txt language java [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# flume-ng avro-client -H node105.yinzhengjie.org.cn -p 8888 -F a.txt -R header.txt Warning: No configuration directory set! Use --conf <dir> to override. Warning: JAVA_HOME is not set! Info: Including Hadoop libraries found via (/home/softwares/hadoop-2.6.0/bin/hadoop) for HDFS access Info: Including Hive libraries found via () for Hive access + exec /home/softwares/jdk1.8.0_201/bin/java -Xmx20m -cp '/home/softwares/apache-flume-1.9.0-bin/lib/*:/home/softwares/hadoop-2.6.0/etc/hadoop:/home/softwares/hadoop-2.6.0/share/hadoop/common/lib/*:/home/softwares/hadoop-2.6.0/share/hadoop/common/*:/home/softwares/hadoo p-2.6.0/share/hadoop/hdfs:/home/softwares/hadoop-2.6.0/share/hadoop/hdfs/lib/*:/home/softwares/hadoop-2.6.0/share/hadoop/hdfs/*:/home/softwares/hadoop-2.6.0/share/hadoop/yarn/lib/*:/home/softwares/hadoop-2.6.0/share/hadoop/yarn/*:/home/softwares/hadoop-2.6.0/share/hadoop/mapreduce/lib/*:/home/softwares/hadoop-2.6.0/share/hadoop/mapreduce/*:/contrib/capacity-scheduler/*.jar:/lib/*' -Djava.library.path=:/home/softwares/hadoop-2.6.0/lib/native org.apache.flume.client.avro.AvroCLIClient -H node105.yinzhengjie.org.cn -p 8888 -F a.txt -R header.txtSLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/home/softwares/apache-flume-1.9.0-bin/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/home/softwares/hadoop-2.6.0/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 19/07/21 00:09:22 INFO api.NettyAvroRpcClient: Using default maxIOWorkers [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# ll /yinzhengjie/language/ -R /yinzhengjie/language/: total 0 drwxr-xr-x 2 root root 29 Jul 21 00:08 golang drwxr-xr-x 2 root root 29 Jul 21 00:08 java drwxr-xr-x 2 root root 29 Jul 21 00:08 python /yinzhengjie/language/golang: total 0 -rw-r--r-- 1 root root 0 Jul 21 00:08 1563638927737-1 /yinzhengjie/language/java: total 0 -rw-r--r-- 1 root root 0 Jul 21 00:08 1563638927710-1 /yinzhengjie/language/python: total 4 -rw-r--r-- 1 root root 143 Jul 21 00:09 1563638927710-1 [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# cat /yinzhengjie/language/python/1563638927710-1 language java language php language shell language golang language python language scanla language js language vbs language c++ language linux [root@node105.yinzhengjie.org.cn ~]#
[root@node105.yinzhengjie.org.cn ~]# tail -100f /home/data/flume/log/flume-my_multiplexing_selector.log Warning: JAVA_HOME is not set! Info: Including Hadoop libraries found via (/home/softwares/hadoop-2.6.0/bin/hadoop) for HDFS access Info: Including Hive libraries found via () for Hive access + exec /home/softwares/jdk1.8.0_201/bin/java -Xmx20m -Dflume.monitoring.type=http -Dflume.monitoring.port=10522 -Dflume.root.logger=INFO,console -cp '/home/data/flume/job:/home/softwares/apache-flume-1.9.0-bin/lib/*:/home/softwares/hadoop-2.6.0/etc/hadoop:/home/software s/hadoop-2.6.0/share/hadoop/common/lib/*:/home/softwares/hadoop-2.6.0/share/hadoop/common/*:/home/softwares/hadoop-2.6.0/share/hadoop/hdfs:/home/softwares/hadoop-2.6.0/share/hadoop/hdfs/lib/*:/home/softwares/hadoop-2.6.0/share/hadoop/hdfs/*:/home/softwares/hadoop-2.6.0/share/hadoop/yarn/lib/*:/home/softwares/hadoop-2.6.0/share/hadoop/yarn/*:/home/softwares/hadoop-2.6.0/share/hadoop/mapreduce/lib/*:/home/softwares/hadoop-2.6.0/share/hadoop/mapreduce/*:/contrib/capacity-scheduler/*.jar:/lib/*' -Djava.library.path=:/home/softwares/hadoop-2.6.0/lib/native org.apache.flume.node.Application --conf-file=/home/data/flume/job/flume-my_multiplexing_selector.conf --name my_multiplexing_selectorSLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/home/softwares/apache-flume-1.9.0-bin/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/home/softwares/hadoop-2.6.0/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 19/07/21 00:08:47 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting 19/07/21 00:08:47 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:/home/data/flume/job/flume-my_multiplexing_selector.conf 19/07/21 00:08:47 INFO conf.FlumeConfiguration: Processing:Java_sink 19/07/21 00:08:47 INFO conf.FlumeConfiguration: Processing:Go_channel 19/07/21 00:08:47 INFO conf.FlumeConfiguration: Processing:avro_source 19/07/21 00:08:47 INFO conf.FlumeConfiguration: Processing:Go_channel 19/07/21 00:08:47 INFO conf.FlumeConfiguration: Processing:avro_source 19/07/21 00:08:47 INFO conf.FlumeConfiguration: Processing:avro_source 19/07/21 00:08:47 INFO conf.FlumeConfiguration: Processing:Java_sink 19/07/21 00:08:47 INFO conf.FlumeConfiguration: Processing:avro_source 19/07/21 00:08:47 INFO conf.FlumeConfiguration: Processing:avro_source 19/07/21 00:08:47 INFO conf.FlumeConfiguration: Processing:Python_sink 19/07/21 00:08:47 INFO conf.FlumeConfiguration: Processing:Go_sink 19/07/21 00:08:47 INFO conf.FlumeConfiguration: Processing:Java_channel 19/07/21 00:08:47 INFO conf.FlumeConfiguration: Processing:avro_source 19/07/21 00:08:47 INFO conf.FlumeConfiguration: Processing:avro_source 19/07/21 00:08:47 INFO conf.FlumeConfiguration: Processing:Python_channel 19/07/21 00:08:47 INFO conf.FlumeConfiguration: Processing:Python_sink 19/07/21 00:08:47 INFO conf.FlumeConfiguration: Processing:Python_sink 19/07/21 00:08:47 INFO conf.FlumeConfiguration: Processing:Go_sink 19/07/21 00:08:47 INFO conf.FlumeConfiguration: Processing:Java_sink 19/07/21 00:08:47 INFO conf.FlumeConfiguration: Processing:Go_channel 19/07/21 00:08:47 INFO conf.FlumeConfiguration: Added sinks: Java_sink Go_sink Python_sink Agent: my_multiplexing_selector 19/07/21 00:08:47 INFO conf.FlumeConfiguration: Processing:Python_channel 19/07/21 00:08:47 INFO conf.FlumeConfiguration: Processing:Java_channel 19/07/21 00:08:47 INFO conf.FlumeConfiguration: Processing:Python_channel 19/07/21 00:08:47 INFO conf.FlumeConfiguration: Processing:Python_sink 19/07/21 00:08:47 INFO conf.FlumeConfiguration: Processing:Java_sink 19/07/21 00:08:47 INFO conf.FlumeConfiguration: Processing:avro_source 19/07/21 00:08:47 INFO conf.FlumeConfiguration: Processing:Go_sink 19/07/21 00:08:47 INFO conf.FlumeConfiguration: Processing:avro_source 19/07/21 00:08:47 INFO conf.FlumeConfiguration: Processing:Java_channel 19/07/21 00:08:47 INFO conf.FlumeConfiguration: Processing:Go_sink 19/07/21 00:08:47 WARN conf.FlumeConfiguration: Agent configuration for 'my_multiplexing_selector' has no configfilters. 19/07/21 00:08:47 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [my_multiplexing_selector] 19/07/21 00:08:47 INFO node.AbstractConfigurationProvider: Creating channels 19/07/21 00:08:47 INFO channel.DefaultChannelFactory: Creating instance of channel Java_channel type memory 19/07/21 00:08:47 INFO node.AbstractConfigurationProvider: Created channel Java_channel 19/07/21 00:08:47 INFO channel.DefaultChannelFactory: Creating instance of channel Go_channel type memory 19/07/21 00:08:47 INFO node.AbstractConfigurationProvider: Created channel Go_channel 19/07/21 00:08:47 INFO channel.DefaultChannelFactory: Creating instance of channel Python_channel type memory 19/07/21 00:08:47 INFO node.AbstractConfigurationProvider: Created channel Python_channel 19/07/21 00:08:47 INFO source.DefaultSourceFactory: Creating instance of source avro_source, type avro 19/07/21 00:08:47 INFO sink.DefaultSinkFactory: Creating instance of sink: Java_sink, type: file_roll 19/07/21 00:08:47 INFO sink.DefaultSinkFactory: Creating instance of sink: Python_sink, type: file_roll 19/07/21 00:08:47 INFO sink.DefaultSinkFactory: Creating instance of sink: Go_sink, type: file_roll 19/07/21 00:08:47 INFO node.AbstractConfigurationProvider: Channel Java_channel connected to [avro_source, Java_sink] 19/07/21 00:08:47 INFO node.AbstractConfigurationProvider: Channel Go_channel connected to [avro_source, Go_sink] 19/07/21 00:08:47 INFO node.AbstractConfigurationProvider: Channel Python_channel connected to [avro_source, Python_sink] 19/07/21 00:08:47 INFO node.Application: Starting new configuration:{ sourceRunners:{avro_source=EventDrivenSourceRunner: { source:Avro source avro_source: { bindAddress: node105.yinzhengjie.org.cn, port: 8888 } }} sinkRunners:{Java_sink=SinkRunner: { policy:org.apache. flume.sink.DefaultSinkProcessor@12f20d45 counterGroup:{ name:null counters:{} } }, Python_sink=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@40b91f49 counterGroup:{ name:null counters:{} } }, Go_sink=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@328a1feb counterGroup:{ name:null counters:{} } }} channels:{Java_channel=org.apache.flume.channel.MemoryChannel{name: Java_channel}, Go_channel=org.apache.flume.channel.MemoryChannel{name: Go_channel}, Python_channel=org.apache.flume.channel.MemoryChannel{name: Python_channel}} }19/07/21 00:08:47 INFO node.Application: Starting Channel Java_channel 19/07/21 00:08:47 INFO node.Application: Starting Channel Go_channel 19/07/21 00:08:47 INFO node.Application: Starting Channel Python_channel 19/07/21 00:08:47 INFO node.Application: Waiting for channel: Java_channel to start. Sleeping for 500 ms 19/07/21 00:08:47 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: Go_channel: Successfully registered new MBean. 19/07/21 00:08:47 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: Go_channel started 19/07/21 00:08:47 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: Java_channel: Successfully registered new MBean. 19/07/21 00:08:47 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: Java_channel started 19/07/21 00:08:47 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: Python_channel: Successfully registered new MBean. 19/07/21 00:08:47 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: Python_channel started 19/07/21 00:08:48 INFO node.Application: Starting Sink Java_sink 19/07/21 00:08:48 INFO node.Application: Starting Sink Python_sink 19/07/21 00:08:48 INFO sink.RollingFileSink: Starting org.apache.flume.sink.RollingFileSink{name:Python_sink, channel:Python_channel}... 19/07/21 00:08:48 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: Python_sink: Successfully registered new MBean. 19/07/21 00:08:48 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: Python_sink started 19/07/21 00:08:48 INFO sink.RollingFileSink: RollInterval is not valid, file rolling will not happen. 19/07/21 00:08:48 INFO sink.RollingFileSink: RollingFileSink Python_sink started. 19/07/21 00:08:48 INFO node.Application: Starting Sink Go_sink 19/07/21 00:08:48 INFO node.Application: Starting Source avro_source 19/07/21 00:08:48 INFO sink.RollingFileSink: Starting org.apache.flume.sink.RollingFileSink{name:Java_sink, channel:Java_channel}... 19/07/21 00:08:48 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: Java_sink: Successfully registered new MBean. 19/07/21 00:08:48 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: Java_sink started 19/07/21 00:08:48 INFO sink.RollingFileSink: RollInterval is not valid, file rolling will not happen. 19/07/21 00:08:48 INFO sink.RollingFileSink: RollingFileSink Java_sink started. 19/07/21 00:08:48 INFO sink.RollingFileSink: Starting org.apache.flume.sink.RollingFileSink{name:Go_sink, channel:Go_channel}... 19/07/21 00:08:48 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: Go_sink: Successfully registered new MBean. 19/07/21 00:08:48 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: Go_sink started 19/07/21 00:08:48 INFO sink.RollingFileSink: RollInterval is not valid, file rolling will not happen. 19/07/21 00:08:48 INFO sink.RollingFileSink: RollingFileSink Go_sink started. 19/07/21 00:08:48 INFO source.AvroSource: Starting Avro source avro_source: { bindAddress: node105.yinzhengjie.org.cn, port: 8888 }... 19/07/21 00:08:48 INFO util.log: Logging initialized @1387ms to org.eclipse.jetty.util.log.Slf4jLog 19/07/21 00:08:48 INFO server.Server: jetty-9.4.6.v20170531 19/07/21 00:08:48 INFO server.AbstractConnector: Started ServerConnector@773090d9{HTTP/1.1,[http/1.1]}{0.0.0.0:10522} 19/07/21 00:08:48 INFO server.Server: Started @1824ms 19/07/21 00:08:48 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: avro_source: Successfully registered new MBean. 19/07/21 00:08:48 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: avro_source started 19/07/21 00:08:48 INFO source.AvroSource: Avro source avro_source started. 19/07/21 00:09:22 INFO ipc.NettyServer: [id: 0x047099b3, /172.30.1.105:48044 => /172.30.1.105:8888] OPEN 19/07/21 00:09:22 INFO ipc.NettyServer: [id: 0x047099b3, /172.30.1.105:48044 => /172.30.1.105:8888] BOUND: /172.30.1.105:8888 19/07/21 00:09:22 INFO ipc.NettyServer: [id: 0x047099b3, /172.30.1.105:48044 => /172.30.1.105:8888] CONNECTED: /172.30.1.105:48044 19/07/21 00:09:23 INFO ipc.NettyServer: [id: 0x047099b3, /172.30.1.105:48044 :> /172.30.1.105:8888] DISCONNECTED 19/07/21 00:09:23 INFO ipc.NettyServer: [id: 0x047099b3, /172.30.1.105:48044 :> /172.30.1.105:8888] UNBOUND 19/07/21 00:09:23 INFO ipc.NettyServer: [id: 0x047099b3, /172.30.1.105:48044 :> /172.30.1.105:8888] CLOSED 19/07/21 00:09:23 INFO ipc.NettyServer: Connection to /172.30.1.105:48044 disconnected.
6>.主机拦截器案例
拦截器(interceptor): 是source端的在处理过程中能够对数据(event)进行修改或丢弃的组件。常见的拦截器有:
(1)host interceptor 将发送的event添加主机名的header (2)timestamp interceptor 将发送的event添加时间戳的header 更多拦截器可参考官方文档: http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#flume-interceptors
[root@node105.yinzhengjie.org.cn ~]# cat /home/data/flume/job/flume-host_interceptor.conf # Name the components on this agent my_host_interceptor.sources = netcat_source my_host_interceptor.sinks = logger_sink my_host_interceptor.channels = memory_channel # Describe/configure the source my_host_interceptor.sources.netcat_source.type = netcat my_host_interceptor.sources.netcat_source.bind = node105.yinzhengjie.org.cn my_host_interceptor.sources.netcat_source.port = 8888 # 指定添加拦截器 my_host_interceptor.sources.netcat_source.interceptors = i1 my_host_interceptor.sources.netcat_source.interceptors.i1.type = org.apache.flume.interceptor.HostInterceptor$Builder my_host_interceptor.sources.netcat_source.interceptors.i1.preserveExisting = false # 指定header的key my_host_interceptor.sources.netcat_source.interceptors.i1.hostHeader = hostname # 指定header的value为主机ip my_host_interceptor.sources.netcat_source.interceptors.i1.useIP = true # Describe the sink my_host_interceptor.sinks.logger_sink.type = logger # Use a channel which buffers events in memory my_host_interceptor.channels.memory_channel.type = memory my_host_interceptor.channels.memory_channel.capacity = 100000 my_host_interceptor.channels.memory_channel.transactionCapacity = 10000 # Bind the source and sink to the channel my_host_interceptor.sources.netcat_source.channels = memory_channel my_host_interceptor.sinks.logger_sink.channel = memory_channel [root@node105.yinzhengjie.org.cn ~]#
[root@node105.yinzhengjie.org.cn ~]# cat /home/data/flume/shell/start-my_host_interceptor.sh #!/bin/bash #@author :yinzhengjie #blog:http://www.cnblogs.com/yinzhengjie #EMAIL:y1053419035@qq.com #Data:Thu Oct 18 11:26:06 CST 2018 #将监控数据发送给ganglia,需要指定ganglia服务器地址,使用请确认是否部署好ganglia服务! #nohup flume-ng agent -c /home/data/flume/job --conf-file=/home/data/flume/job/flume-host_interceptor.conf --name my_host_interceptor -Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=node105.yinzhengjie.org.cn:8649 -Dflume.root.logger=INFO,console >> /home/data/ flume/log/flume-ganglia-flume-my_host_interceptor.log 2>&1 & #启动flume自身的监控参数,默认执行以下脚本 nohup flume-ng agent -c /home/data/flume/job --conf-file=/home/data/flume/job/flume-host_interceptor.conf --name my_host_interceptor -Dflume.monitoring.type=http -Dflume.monitoring.port=10520 -Dflume.root.logger=INFO,console >> /home/data/flume/log/flume-my_host_inter ceptor.log 2>&1 &[root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# ss -ntl State Recv-Q Send-Q Local Address:Port Peer Address:Port LISTEN 0 128 *:22 *:* LISTEN 0 128 :::22 :::* [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# /home/data/flume/shell/start-my_host_interceptor.sh [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# ss -ntl State Recv-Q Send-Q Local Address:Port Peer Address:Port LISTEN 0 128 *:22 *:* LISTEN 0 50 *:10520 *:* LISTEN 0 50 172.30.1.105:8888 *:* LISTEN 0 128 :::22 :::* [root@node105.yinzhengjie.org.cn ~]#
[root@node105.yinzhengjie.org.cn ~]# telnet node105.yinzhengjie.org.cn 8888 #连接到指定端口并发送测试数据 Trying 172.30.1.105... Connected to node105.yinzhengjie.org.cn. Escape character is '^]'. yinzhengjie dao ci yi you! OK
[root@node105.yinzhengjie.org.cn ~]# tail -100f /home/data/flume/log/flume-my_host_interceptor.log Warning: JAVA_HOME is not set! Info: Including Hadoop libraries found via (/home/softwares/hadoop-2.6.0/bin/hadoop) for HDFS access Info: Including Hive libraries found via () for Hive access + exec /home/softwares/jdk1.8.0_201/bin/java -Xmx20m -Dflume.monitoring.type=http -Dflume.monitoring.port=10520 -Dflume.root.logger=INFO,console -cp '/home/data/flume/job:/home/softwares/apache-flume-1.9.0-bin/lib/*:/home/softwares/hadoop-2.6.0/etc/hadoop:/home/software s/hadoop-2.6.0/share/hadoop/common/lib/*:/home/softwares/hadoop-2.6.0/share/hadoop/common/*:/home/softwares/hadoop-2.6.0/share/hadoop/hdfs:/home/softwares/hadoop-2.6.0/share/hadoop/hdfs/lib/*:/home/softwares/hadoop-2.6.0/share/hadoop/hdfs/*:/home/softwares/hadoop-2.6.0/share/hadoop/yarn/lib/*:/home/softwares/hadoop-2.6.0/share/hadoop/yarn/*:/home/softwares/hadoop-2.6.0/share/hadoop/mapreduce/lib/*:/home/softwares/hadoop-2.6.0/share/hadoop/mapreduce/*:/contrib/capacity-scheduler/*.jar:/lib/*' -Djava.library.path=:/home/softwares/hadoop-2.6.0/lib/native org.apache.flume.node.Application --conf-file=/home/data/flume/job/flume-host_interceptor.conf --name my_host_interceptorSLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/home/softwares/apache-flume-1.9.0-bin/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/home/softwares/hadoop-2.6.0/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 19/07/20 23:10:03 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting 19/07/20 23:10:03 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:/home/data/flume/job/flume-host_interceptor.conf 19/07/20 23:10:03 INFO conf.FlumeConfiguration: Processing:netcat_source 19/07/20 23:10:03 INFO conf.FlumeConfiguration: Processing:netcat_source 19/07/20 23:10:03 INFO conf.FlumeConfiguration: Processing:memory_channel 19/07/20 23:10:03 INFO conf.FlumeConfiguration: Processing:netcat_source 19/07/20 23:10:03 INFO conf.FlumeConfiguration: Processing:netcat_source 19/07/20 23:10:03 INFO conf.FlumeConfiguration: Processing:logger_sink 19/07/20 23:10:03 INFO conf.FlumeConfiguration: Processing:memory_channel 19/07/20 23:10:03 INFO conf.FlumeConfiguration: Added sinks: logger_sink Agent: my_host_interceptor 19/07/20 23:10:03 INFO conf.FlumeConfiguration: Processing:netcat_source 19/07/20 23:10:03 INFO conf.FlumeConfiguration: Processing:netcat_source 19/07/20 23:10:03 INFO conf.FlumeConfiguration: Processing:memory_channel 19/07/20 23:10:03 INFO conf.FlumeConfiguration: Processing:netcat_source 19/07/20 23:10:03 INFO conf.FlumeConfiguration: Processing:netcat_source 19/07/20 23:10:03 INFO conf.FlumeConfiguration: Processing:logger_sink 19/07/20 23:10:03 INFO conf.FlumeConfiguration: Processing:netcat_source 19/07/20 23:10:03 WARN conf.FlumeConfiguration: Agent configuration for 'my_host_interceptor' has no configfilters. 19/07/20 23:10:03 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [my_host_interceptor] 19/07/20 23:10:03 INFO node.AbstractConfigurationProvider: Creating channels 19/07/20 23:10:03 INFO channel.DefaultChannelFactory: Creating instance of channel memory_channel type memory 19/07/20 23:10:03 INFO node.AbstractConfigurationProvider: Created channel memory_channel 19/07/20 23:10:03 INFO source.DefaultSourceFactory: Creating instance of source netcat_source, type netcat 19/07/20 23:10:03 INFO sink.DefaultSinkFactory: Creating instance of sink: logger_sink, type: logger 19/07/20 23:10:03 INFO node.AbstractConfigurationProvider: Channel memory_channel connected to [netcat_source, logger_sink] 19/07/20 23:10:03 INFO node.Application: Starting new configuration:{ sourceRunners:{netcat_source=EventDrivenSourceRunner: { source:org.apache.flume.source.NetcatSource{name:netcat_source,state:IDLE} }} sinkRunners:{logger_sink=SinkRunner: { policy:org.apache.flume.sin k.DefaultSinkProcessor@2739d05f counterGroup:{ name:null counters:{} } }} channels:{memory_channel=org.apache.flume.channel.MemoryChannel{name: memory_channel}} }19/07/20 23:10:03 INFO node.Application: Starting Channel memory_channel 19/07/20 23:10:03 INFO node.Application: Waiting for channel: memory_channel to start. Sleeping for 500 ms 19/07/20 23:10:03 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: memory_channel: Successfully registered new MBean. 19/07/20 23:10:03 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: memory_channel started 19/07/20 23:10:04 INFO node.Application: Starting Sink logger_sink 19/07/20 23:10:04 INFO node.Application: Starting Source netcat_source 19/07/20 23:10:04 INFO source.NetcatSource: Source starting 19/07/20 23:10:04 INFO source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/172.30.1.105:8888] 19/07/20 23:10:04 INFO util.log: Logging initialized @1344ms to org.eclipse.jetty.util.log.Slf4jLog 19/07/20 23:10:04 INFO server.Server: jetty-9.4.6.v20170531 19/07/20 23:10:04 INFO server.AbstractConnector: Started ServerConnector@6a5cf88c{HTTP/1.1,[http/1.1]}{0.0.0.0:10520} 19/07/20 23:10:04 INFO server.Server: Started @1584ms 19/07/20 23:10:21 INFO sink.LoggerSink: Event: { headers:{hostname=172.30.1.105} body: 79 69 6E 7A 68 65 6E 67 6A 69 65 20 64 61 6F 20 yinzhengjie dao }
[root@node105.yinzhengjie.org.cn ~]# cat /home/data/flume/job/flume-my_timestamp_interceptor.conf # Name the components on this agent my_timestamp_interceptor.sources = netcat_source my_timestamp_interceptor.sinks = logger_sink my_timestamp_interceptor.channels = memory_channel # Describe/configure the source my_timestamp_interceptor.sources.netcat_source.type = netcat my_timestamp_interceptor.sources.netcat_source.bind = node105.yinzhengjie.org.cn my_timestamp_interceptor.sources.netcat_source.port = 8888 # 指定添加拦截器 my_timestamp_interceptor.sources.netcat_source.interceptors = i1 my_timestamp_interceptor.sources.netcat_source.interceptors.i1.type = timestamp # Describe the sink my_timestamp_interceptor.sinks.logger_sink.type = logger # Use a channel which buffers events in memory my_timestamp_interceptor.channels.memory_channel.type = memory my_timestamp_interceptor.channels.memory_channel.capacity = 100000 my_timestamp_interceptor.channels.memory_channel.transactionCapacity = 10000 # Bind the source and sink to the channel my_timestamp_interceptor.sources.netcat_source.channels = memory_channel my_timestamp_interceptor.sinks.logger_sink.channel = memory_channel [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]#
[root@node105.yinzhengjie.org.cn ~]# cat /home/data/flume/shell/start-my_timestamp_interceptor.sh #!/bin/bash #@author :yinzhengjie #blog:http://www.cnblogs.com/yinzhengjie #EMAIL:y1053419035@qq.com #Data:Thu Oct 18 11:26:06 CST 2018 #将监控数据发送给ganglia,需要指定ganglia服务器地址,使用请确认是否部署好ganglia服务! #nohup flume-ng agent -c /home/data/flume/job --conf-file=/home/data/flume/job/flume-my_timestamp_interceptor.conf --name my_timestamp_interceptor -Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=node105.yinzhengjie.org.cn:8649 -Dflume.root.logger=INFO,console > > /home/data/flume/log/flume-ganglia-flume-my_timestamp_interceptor.log 2>&1 & #启动flume自身的监控参数,默认执行以下脚本 nohup flume-ng agent -c /home/data/flume/job --conf-file=/home/data/flume/job/flume-my_timestamp_interceptor.conf --name my_timestamp_interceptor -Dflume.monitoring.type=http -Dflume.monitoring.port=10521 -Dflume.root.logger=INFO,console >> /home/data/flume/log/flume- my_timestamp_interceptor.log 2>&1 &[root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# /home/data/flume/shell/start-my_timestamp_interceptor.sh [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]# ss -ntl State Recv-Q Send-Q Local Address:Port Peer Address:Port LISTEN 0 128 *:22 *:* LISTEN 0 50 172.30.1.105:8888 *:* LISTEN 0 50 *:10521 *:* LISTEN 0 128 :::22 :::* [root@node105.yinzhengjie.org.cn ~]# [root@node105.yinzhengjie.org.cn ~]#
[root@node105.yinzhengjie.org.cn ~]# tail -100f /home/data/flume/log/flume-my_timestamp_interceptor.log Warning: JAVA_HOME is not set! Info: Including Hadoop libraries found via (/home/softwares/hadoop-2.6.0/bin/hadoop) for HDFS access Info: Including Hive libraries found via () for Hive access + exec /home/softwares/jdk1.8.0_201/bin/java -Xmx20m -Dflume.monitoring.type=http -Dflume.monitoring.port=10521 -Dflume.root.logger=INFO,console -cp '/home/data/flume/job:/home/softwares/apache-flume-1.9.0-bin/lib/*:/home/softwares/hadoop-2.6.0/etc/hadoop:/home/software s/hadoop-2.6.0/share/hadoop/common/lib/*:/home/softwares/hadoop-2.6.0/share/hadoop/common/*:/home/softwares/hadoop-2.6.0/share/hadoop/hdfs:/home/softwares/hadoop-2.6.0/share/hadoop/hdfs/lib/*:/home/softwares/hadoop-2.6.0/share/hadoop/hdfs/*:/home/softwares/hadoop-2.6.0/share/hadoop/yarn/lib/*:/home/softwares/hadoop-2.6.0/share/hadoop/yarn/*:/home/softwares/hadoop-2.6.0/share/hadoop/mapreduce/lib/*:/home/softwares/hadoop-2.6.0/share/hadoop/mapreduce/*:/contrib/capacity-scheduler/*.jar:/lib/*' -Djava.library.path=:/home/softwares/hadoop-2.6.0/lib/native org.apache.flume.node.Application --conf-file=/home/data/flume/job/flume-my_timestamp_interceptor.conf --name my_timestamp_interceptorSLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/home/softwares/apache-flume-1.9.0-bin/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/home/softwares/hadoop-2.6.0/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 19/07/20 23:25:03 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting 19/07/20 23:25:03 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:/home/data/flume/job/flume-my_timestamp_interceptor.conf 19/07/20 23:25:03 INFO conf.FlumeConfiguration: Processing:netcat_source 19/07/20 23:25:03 INFO conf.FlumeConfiguration: Processing:memory_channel 19/07/20 23:25:03 INFO conf.FlumeConfiguration: Added sinks: logger_sink Agent: my_timestamp_interceptor 19/07/20 23:25:03 INFO conf.FlumeConfiguration: Processing:logger_sink 19/07/20 23:25:03 INFO conf.FlumeConfiguration: Processing:netcat_source 19/07/20 23:25:03 INFO conf.FlumeConfiguration: Processing:memory_channel 19/07/20 23:25:03 INFO conf.FlumeConfiguration: Processing:netcat_source 19/07/20 23:25:03 INFO conf.FlumeConfiguration: Processing:netcat_source 19/07/20 23:25:03 INFO conf.FlumeConfiguration: Processing:logger_sink 19/07/20 23:25:03 INFO conf.FlumeConfiguration: Processing:netcat_source 19/07/20 23:25:03 INFO conf.FlumeConfiguration: Processing:memory_channel 19/07/20 23:25:03 INFO conf.FlumeConfiguration: Processing:netcat_source 19/07/20 23:25:03 WARN conf.FlumeConfiguration: Agent configuration for 'my_timestamp_interceptor' has no configfilters. 19/07/20 23:25:04 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [my_timestamp_interceptor] 19/07/20 23:25:04 INFO node.AbstractConfigurationProvider: Creating channels 19/07/20 23:25:04 INFO channel.DefaultChannelFactory: Creating instance of channel memory_channel type memory 19/07/20 23:25:04 INFO node.AbstractConfigurationProvider: Created channel memory_channel 19/07/20 23:25:04 INFO source.DefaultSourceFactory: Creating instance of source netcat_source, type netcat 19/07/20 23:25:04 INFO sink.DefaultSinkFactory: Creating instance of sink: logger_sink, type: logger 19/07/20 23:25:04 INFO node.AbstractConfigurationProvider: Channel memory_channel connected to [netcat_source, logger_sink] 19/07/20 23:25:04 INFO node.Application: Starting new configuration:{ sourceRunners:{netcat_source=EventDrivenSourceRunner: { source:org.apache.flume.source.NetcatSource{name:netcat_source,state:IDLE} }} sinkRunners:{logger_sink=SinkRunner: { policy:org.apache.flume.sin k.DefaultSinkProcessor@64de9bea counterGroup:{ name:null counters:{} } }} channels:{memory_channel=org.apache.flume.channel.MemoryChannel{name: memory_channel}} }19/07/20 23:25:04 INFO node.Application: Starting Channel memory_channel 19/07/20 23:25:04 INFO node.Application: Waiting for channel: memory_channel to start. Sleeping for 500 ms 19/07/20 23:25:04 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: memory_channel: Successfully registered new MBean. 19/07/20 23:25:04 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: memory_channel started 19/07/20 23:25:04 INFO node.Application: Starting Sink logger_sink 19/07/20 23:25:04 INFO node.Application: Starting Source netcat_source 19/07/20 23:25:04 INFO source.NetcatSource: Source starting 19/07/20 23:25:04 INFO util.log: Logging initialized @1344ms to org.eclipse.jetty.util.log.Slf4jLog 19/07/20 23:25:04 INFO source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/172.30.1.105:8888] 19/07/20 23:25:04 INFO server.Server: jetty-9.4.6.v20170531 19/07/20 23:25:04 INFO server.AbstractConnector: Started ServerConnector@47d7718d{HTTP/1.1,[http/1.1]}{0.0.0.0:10521} 19/07/20 23:25:04 INFO server.Server: Started @1597ms 19/07/20 23:25:34 INFO sink.LoggerSink: Event: { headers:{timestamp=1563636333779} body: 79 69 6E 7A 68 65 6E 67 6A 69 65 20 64 61 6F 20 yinzhengjie dao }