前几篇已经介绍了flume相关的知识,包括flume架构、如何配置启动以及暂停,接下去主要说说flume接受log4j日志。
前面介绍过了log4j需要用avro的方式讲日志传递到flume,所以我们的配置都是基于avro的source
log4j日志输出到flume的console
- 修改配置文件log4j-agent.properties
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# The configuration file needs to define the sources,
# the channels and the sinks.
# Sources, channels and sinks are defined per agent,
# in this case called 'agent'
agent.sources = so1
agent.channels = c1
agent.sinks = s1
# For each one of the sources, the type is defined
agent.sources.so1.type = avro
agent.sources.so1.bind = 0.0.0.0
agent.sources.so1.port = 44444
tier1.channels.channel1.keep-alive=30
# The channel can be defined as follows.
# agent.sources.seqGenSrc.channels = memoryChannel
# Each sink's type must be defined
agent.sinks.s1.type = logger
#Specify the channel the sink should use
# agent.sinks.loggerSink.channel = memoryChannel
# Each channel's type is defined.
agent.channels.c1.type = memory
agent.channels.c1.capacity = 1000
agent.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
agent.sources.so1.channels = c1
agent.sinks.s1.channel = c1
# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
# agent.channels.memoryChannel.capacity = 100
- 启动./start.sh log4j-agent.properties agent
- 写一段简单的代码用log4j输出log,代码如下:
import java.util.Date;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class WriteLog {
protected static final Log logger = LogFactory.getLog("xxx");
public static void main(String[] args) throws InterruptedException {
while (true) {
// 每隔两秒log输出一下当前系统时间戳
logger.info(new Date().getTime());
Thread.sleep(1000);
}
}
}
那么我们该如何配置log4j.xml呢,其实只要加一个appender
<appender name="flume-avro" class="org.apache.flume.clients.log4jappender.Log4jAppender">
<param name="Hostname" value="localhost" />
<param name="Port" value="44444" />
<param name="UnsafeMode" value="true" />
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss} [%l] [%rms] %m" />
</layout>
</appender>
- 接下去运行那么main函数,看flume的输出
2015-02-13 16:40:33,027 (New I/O worker #1) [DEBUG - org.apache.flume.source.AvroSource.append(AvroSource.java:347)] Avro source so1: Received avro event: {"headers": {"flume.client.log4j.log.level": "20000", "flume.client.log4j.logger.name": "xxx", "flume.client.log4j.message.encoding": "UTF8", "flume.client.log4j.timestamp": "1423816832971"}, "body": {"bytes": "2015-02-13 16:40:32 [XXX.flume.test.log.WriteLog.main(WriteLog.java:15)] [289ms] 1423816832971"}}
2015-02-13 16:40:33,031 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{flume.client.log4j.log.level=20000, flume.client.log4j.message.encoding=UTF8, flume.client.log4j.logger.name=xxx, flume.client.log4j.timestamp=1423816832971} body: 32 30 31 35 2D 30 32 2D 31 33 20 31 36 3A 34 30 2015-02-13 16:40 }
2015-02-13 16:40:34,052 (New I/O worker #1) [DEBUG - org.apache.flume.source.AvroSource.append(AvroSource.java:347)] Avro source so1: Received avro event: {"headers": {"flume.client.log4j.log.level": "20000", "flume.client.log4j.logger.name": "xxx", "flume.client.log4j.message.encoding": "UTF8", "flume.client.log4j.timestamp": "1423816834050"}, "body": {"bytes": "2015-02-13 16:40:34 [XXX.flume.test.log.WriteLog.main(WriteLog.java:15)] [1368ms] 1423816834050"}}
2015-02-13 16:40:34,053 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{flume.client.log4j.log.level=20000, flume.client.log4j.message.encoding=UTF8, flume.client.log4j.logger.name=xxx, flume.client.log4j.timestamp=1423816834050} body: 32 30 31 35 2D 30 32 2D 31 33 20 31 36 3A 34 30 2015-02-13 16:40 }
2015-02-13 16:40:35,056 (New I/O worker #1) [DEBUG - org.apache.flume.source.AvroSource.append(AvroSource.java:347)] Avro source so1: Received avro event: {"headers": {"flume.client.log4j.log.level": "20000", "flume.client.log4j.logger.name": "xxx", "flume.client.log4j.message.encoding": "UTF8", "flume.client.log4j.timestamp": "1423816835053"}, "body": {"bytes": "2015-02-13 16:40:35 [XXX.flume.test.log.WriteLog.main(WriteLog.java:15)] [2371ms] 1423816835053"}}
2015-02-13 16:40:35,056 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{flume.client.log4j.log.level=20000, flume.client.log4j.message.encoding=UTF8, flume.client.log4j.logger.name=xxx, flume.client.log4j.timestamp=1423816835053} body: 32 30 31 35 2D 30 32 2D 31 33 20 31 36 3A 34 30 2015-02-13 16:40 }
2015-02-13 16:40:36,059 (New I/O worker #1) [DEBUG - org.apache.flume.source.AvroSource.append(AvroSource.java:347)] Avro source so1: Received avro event: {"headers": {"flume.client.log4j.log.level": "20000", "flume.client.log4j.logger.name": "xxx", "flume.client.log4j.message.encoding": "UTF8", "flume.client.log4j.timestamp": "1423816836057"}, "body": {"bytes": "2015-02-13 16:40:36 [XXX.flume.test.log.WriteLog.main(WriteLog.java:15)] [3375ms] 1423816836057"}}
2015-02-13 16:40:36,059 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{flume.client.log4j.log.level=20000, flume.client.log4j.message.encoding=UTF8, flume.client.log4j.logger.name=xxx, flume.client.log4j.timestamp=1423816836057} body: 32 30 31 35 2D 30 32 2D 31 33 20 31 36 3A 34 30 2015-02-13 16:40 }
2015-02-13 16:40:37,064 (New I/O worker #1) [DEBUG - org.apache.flume.source.AvroSource.append(AvroSource.java:347)] Avro source so1: Received avro event: {"headers": {"flume.client.log4j.log.level": "20000", "flume.client.log4j.logger.name": "xxx", "flume.client.log4j.message.encoding": "UTF8", "flume.client.log4j.timestamp": "1423816837062"}, "body": {"bytes": "2015-02-13 16:40:37 [XXX.flume.test.log.WriteLog.main(WriteLog.java:15)] [4380ms] 1423816837062"}}
这就是将log4j的日志输出到了flume的console中了
flume多节点输出
之前说的都是flume单机,那么我们如果日志量比较大,或者很多系统产生日志,那么flume这一层我们就需要多个节点,其实比较简单,找多台机器安装flume,然后只要配置下log4j.xml
<appender name="flume-load-balancing-avro" class="org.apache.flume.clients.log4jappender.LoadBalancingLog4jAppender">
<param name="Hosts" value="xxx:12306 xxx:12306 xxx:12306" />
<param name="Selector" value="ROUND_ROBIN" />
<param name="MaxBackoff" value="2000" />
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss} [%l] [%rms] %m" />
</layout>
</appender>
<appender name="flume-async" class="org.apache.log4j.AsyncAppender">
<param name="BufferSize" value="256" />
<appender-ref ref="flume-load-balancing-avro" />
</appender>
可以自己测试是不是多节点在工作,这里其实有很多种算法可选,具体可以看我flume学习01-flume介绍
log4j日志输出到flume存入hbase
- 这里只说多节点的输出
- 修改配置文件
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# The configuration file needs to define the sources,
# the channels and the sinks.
# Sources, channels and sinks are defined per agent,
# in this case called 'agent'
agent.sources = avroSource
agent.channels = memChannel
agent.sinks = hbaseSink
# For each one of the sources, the type is defined
agent.sources.avroSource.type = avro
agent.sources.avroSource.bind = 0.0.0.0
agent.sources.avroSource.port = 12306
# The channel can be defined as follows.
# agent.sources.seqGenSrc.channels = memoryChannel
# Each sink's type must be defined
agent.sinks.hbaseSink.type = hbase
agent.sinks.hbaseSink.table = flume
agent.sinks.hbaseSink.columnFamily = cf
agent.sinks.hbaseSink.serializer = org.apache.flume.sink.hbase.SimpleHbaseEventSerializer
#Specify the channel the sink should use
# agent.sinks.loggerSink.channel = memoryChannel
# Each channel's type is defined.
agent.channels.memChannel.type = memory
agent.channels.memChannel.capacity = 1000
agent.channels.memChannel.transactionCapacity = 100
# Bind the source and sink to the channel
agent.sources.avroSource.channels = memChannel
agent.sinks.hbaseSink.channel = memChannel
- 在多个节点上同样这么配置然后启动
- log4j.xml
<appender name="flume-load-balancing-avro" class="org.apache.flume.clients.log4jappender.LoadBalancingLog4jAppender">
<param name="Hosts" value="xxx:12306 xxx:12306 xxx:12306" />
<param name="Selector" value="ROUND_ROBIN" />
<param name="MaxBackoff" value="2000" />
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss} [%l] [%rms] %m" />
</layout>
</appender>
<appender name="flume-async" class="org.apache.log4j.AsyncAppender">
<param name="BufferSize" value="256" />
<appender-ref ref="flume-load-balancing-avro" />
</appender>
- 运行刚刚那个java main函数就可以看到日志输出到hbase中了
- 如果想要进入hbase格式按照自己定义的,那么需要自定义HbaseEventSerializer,这里给个简单例子
public class XXXHbaseEventSerializer implements HbaseEventSerializer {
private static final Map<String, byte[]> LOG_LEVEL = new HashMap<String, byte[]>();
static {
LOG_LEVEL.put("10000", "DEBUG".getBytes(Charsets.UTF_8));
LOG_LEVEL.put("20000", "INFO".getBytes(Charsets.UTF_8));
LOG_LEVEL.put("30000", "WARN".getBytes(Charsets.UTF_8));
LOG_LEVEL.put("40000", "ERROR".getBytes(Charsets.UTF_8));
LOG_LEVEL.put("50000", "FATAL".getBytes(Charsets.UTF_8));
}
// 列族
private byte[] cf;
// 消息等级列名
private byte[] levelCol;
// 消息内容列名
private byte[] plCol;
// 消息内容
private byte[] payload;
// 日志级别
private byte[] level;
// 系统标示
private String systemId;
// 日志时间
private String timestamp;
private byte[] incCol;
private byte[] incrementRow;
public XXXHbaseEventSerializer() {
}
@Override
public void configure(Context context) {
this.levelCol = "l".getBytes(Charsets.UTF_8);
this.plCol = "m".getBytes(Charsets.UTF_8);
this.incCol = "iCol".getBytes(Charsets.UTF_8);
incrementRow = "incRow".getBytes(Charsets.UTF_8);
}
@Override
public void configure(ComponentConfiguration conf) {
}
@Override
public void initialize(Event event, byte[] cf) {
this.cf = cf;
Map<String, String> headers = event.getHeaders();
/** * 日志级别,10000表示DEBUG;20000表示INFO;30000表示WARN;40000表示ERROR */
this.level = LOG_LEVEL.get(headers.get("flume.client.log4j.log.level"));
/** * 系统ID */
this.systemId = headers.get("flume.client.log4j.logger.name");
/** * 时间戳,格式:1421995677371 */
this.timestamp = headers.get("flume.client.log4j.timestamp");
/** * 日志内容 */
this.payload = event.getBody();
// /**
// * 转换成String对象后,希望得到的消息结构:<systemId>\t<日志内容>
// */
// String body = new String(event.getBody(), Charsets.UTF_8);
// int index = body.indexOf("\t");
//
// if (index == -1) {
// this.systemId = "default";
// this.payload = body.getBytes(Charsets.UTF_8);
// } else {
// this.systemId = body.substring(0, index);
// this.payload = body.substring(index + 1).getBytes(Charsets.UTF_8);
// }
}
@Override
public List<Row> getActions() throws FlumeException {
List<Row> actions = new LinkedList<Row>();
if (plCol != null) {
byte[] rowKey;
try {
rowKey = XXXRowKeyGenerator.getRowKey(systemId, timestamp);
Put put = new Put(rowKey);
put.add(cf, plCol, payload);
put.add(cf, levelCol, level);
actions.add(put);
} catch (Exception e) {
throw new FlumeException("Could not get row key!", e);
}
}
return actions;
}
@Override
public List<Increment> getIncrements() {
List<Increment> incs = new LinkedList<Increment>();
if (incCol != null) {
Increment inc = new Increment(incrementRow);
inc.addColumn(cf, incCol, 1);
incs.add(inc);
}
return incs;
}
@Override
public void close() {
}
配置文件修改一行:
up-agent.sinks.hbaseSink.serializer = xxx.xxx.xxx.xxx.XXXHbaseEventSerializer
说明:
- 根据自己的需求配置flume的配置文件,可以多查查,自己先想好自己的flume拓扑结构,根据这个去配置flume
- flume配置比较多,功能也比较强大,可以参照flume的用户手册