flume学习03-flume收集log4j日志

时间:2022-02-28 21:48:27

前几篇已经介绍了flume相关的知识,包括flume架构、如何配置启动以及暂停,接下去主要说说flume接受log4j日志。
前面介绍过了log4j需要用avro的方式讲日志传递到flume,所以我们的配置都是基于avro的source

log4j日志输出到flume的console

  • 修改配置文件log4j-agent.properties
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.


# The configuration file needs to define the sources, 
# the channels and the sinks.
# Sources, channels and sinks are defined per agent, 
# in this case called 'agent'

agent.sources = so1
agent.channels = c1
agent.sinks = s1

# For each one of the sources, the type is defined
agent.sources.so1.type = avro
agent.sources.so1.bind = 0.0.0.0
agent.sources.so1.port = 44444
tier1.channels.channel1.keep-alive=30

# The channel can be defined as follows.
# agent.sources.seqGenSrc.channels = memoryChannel

# Each sink's type must be defined
agent.sinks.s1.type = logger

#Specify the channel the sink should use
# agent.sinks.loggerSink.channel = memoryChannel

# Each channel's type is defined.
agent.channels.c1.type = memory
agent.channels.c1.capacity = 1000
agent.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
agent.sources.so1.channels = c1
agent.sinks.s1.channel = c1


# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
# agent.channels.memoryChannel.capacity = 100
  • 启动./start.sh log4j-agent.properties agent
  • 写一段简单的代码用log4j输出log,代码如下:
import java.util.Date;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class WriteLog {

    protected static final Log logger = LogFactory.getLog("xxx");

    public static void main(String[] args) throws InterruptedException {
        while (true) {
            // 每隔两秒log输出一下当前系统时间戳
            logger.info(new Date().getTime());
            Thread.sleep(1000);
        }
    }
}

那么我们该如何配置log4j.xml呢,其实只要加一个appender

<appender name="flume-avro" class="org.apache.flume.clients.log4jappender.Log4jAppender">
    <param name="Hostname" value="localhost" />
    <param name="Port" value="44444" />
    <param name="UnsafeMode" value="true" />
    <layout class="org.apache.log4j.PatternLayout">
        <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss} [%l] [%rms] %m" />
    </layout>
</appender>
  • 接下去运行那么main函数,看flume的输出
2015-02-13 16:40:33,027 (New I/O  worker #1) [DEBUG - org.apache.flume.source.AvroSource.append(AvroSource.java:347)] Avro source so1: Received avro event: {"headers": {"flume.client.log4j.log.level": "20000", "flume.client.log4j.logger.name": "xxx", "flume.client.log4j.message.encoding": "UTF8", "flume.client.log4j.timestamp": "1423816832971"}, "body": {"bytes": "2015-02-13 16:40:32 [XXX.flume.test.log.WriteLog.main(WriteLog.java:15)] [289ms] 1423816832971"}}
2015-02-13 16:40:33,031 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{flume.client.log4j.log.level=20000, flume.client.log4j.message.encoding=UTF8, flume.client.log4j.logger.name=xxx, flume.client.log4j.timestamp=1423816832971} body: 32 30 31 35 2D 30 32 2D 31 33 20 31 36 3A 34 30 2015-02-13 16:40 }
2015-02-13 16:40:34,052 (New I/O  worker #1) [DEBUG - org.apache.flume.source.AvroSource.append(AvroSource.java:347)] Avro source so1: Received avro event: {"headers": {"flume.client.log4j.log.level": "20000", "flume.client.log4j.logger.name": "xxx", "flume.client.log4j.message.encoding": "UTF8", "flume.client.log4j.timestamp": "1423816834050"}, "body": {"bytes": "2015-02-13 16:40:34 [XXX.flume.test.log.WriteLog.main(WriteLog.java:15)] [1368ms] 1423816834050"}}
2015-02-13 16:40:34,053 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{flume.client.log4j.log.level=20000, flume.client.log4j.message.encoding=UTF8, flume.client.log4j.logger.name=xxx, flume.client.log4j.timestamp=1423816834050} body: 32 30 31 35 2D 30 32 2D 31 33 20 31 36 3A 34 30 2015-02-13 16:40 }
2015-02-13 16:40:35,056 (New I/O  worker #1) [DEBUG - org.apache.flume.source.AvroSource.append(AvroSource.java:347)] Avro source so1: Received avro event: {"headers": {"flume.client.log4j.log.level": "20000", "flume.client.log4j.logger.name": "xxx", "flume.client.log4j.message.encoding": "UTF8", "flume.client.log4j.timestamp": "1423816835053"}, "body": {"bytes": "2015-02-13 16:40:35 [XXX.flume.test.log.WriteLog.main(WriteLog.java:15)] [2371ms] 1423816835053"}}
2015-02-13 16:40:35,056 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{flume.client.log4j.log.level=20000, flume.client.log4j.message.encoding=UTF8, flume.client.log4j.logger.name=xxx, flume.client.log4j.timestamp=1423816835053} body: 32 30 31 35 2D 30 32 2D 31 33 20 31 36 3A 34 30 2015-02-13 16:40 }
2015-02-13 16:40:36,059 (New I/O  worker #1) [DEBUG - org.apache.flume.source.AvroSource.append(AvroSource.java:347)] Avro source so1: Received avro event: {"headers": {"flume.client.log4j.log.level": "20000", "flume.client.log4j.logger.name": "xxx", "flume.client.log4j.message.encoding": "UTF8", "flume.client.log4j.timestamp": "1423816836057"}, "body": {"bytes": "2015-02-13 16:40:36 [XXX.flume.test.log.WriteLog.main(WriteLog.java:15)] [3375ms] 1423816836057"}}
2015-02-13 16:40:36,059 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{flume.client.log4j.log.level=20000, flume.client.log4j.message.encoding=UTF8, flume.client.log4j.logger.name=xxx, flume.client.log4j.timestamp=1423816836057} body: 32 30 31 35 2D 30 32 2D 31 33 20 31 36 3A 34 30 2015-02-13 16:40 }
2015-02-13 16:40:37,064 (New I/O  worker #1) [DEBUG - org.apache.flume.source.AvroSource.append(AvroSource.java:347)] Avro source so1: Received avro event: {"headers": {"flume.client.log4j.log.level": "20000", "flume.client.log4j.logger.name": "xxx", "flume.client.log4j.message.encoding": "UTF8", "flume.client.log4j.timestamp": "1423816837062"}, "body": {"bytes": "2015-02-13 16:40:37 [XXX.flume.test.log.WriteLog.main(WriteLog.java:15)] [4380ms] 1423816837062"}}

这就是将log4j的日志输出到了flume的console中了

flume多节点输出

之前说的都是flume单机,那么我们如果日志量比较大,或者很多系统产生日志,那么flume这一层我们就需要多个节点,其实比较简单,找多台机器安装flume,然后只要配置下log4j.xml

<appender name="flume-load-balancing-avro" class="org.apache.flume.clients.log4jappender.LoadBalancingLog4jAppender">
    <param name="Hosts" value="xxx:12306 xxx:12306 xxx:12306" />
    <param name="Selector" value="ROUND_ROBIN" />
    <param name="MaxBackoff" value="2000" />
    <layout class="org.apache.log4j.PatternLayout">
        <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss} [%l] [%rms] %m" />
    </layout>
</appender>

<appender name="flume-async" class="org.apache.log4j.AsyncAppender">
    <param name="BufferSize" value="256" />
    <appender-ref ref="flume-load-balancing-avro" />
</appender>

可以自己测试是不是多节点在工作,这里其实有很多种算法可选,具体可以看我flume学习01-flume介绍

log4j日志输出到flume存入hbase

  • 这里只说多节点的输出
  • 修改配置文件
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.


# The configuration file needs to define the sources, 
# the channels and the sinks.
# Sources, channels and sinks are defined per agent, 
# in this case called 'agent'

agent.sources = avroSource
agent.channels = memChannel
agent.sinks = hbaseSink

# For each one of the sources, the type is defined
agent.sources.avroSource.type = avro
agent.sources.avroSource.bind = 0.0.0.0
agent.sources.avroSource.port = 12306

# The channel can be defined as follows.
# agent.sources.seqGenSrc.channels = memoryChannel

# Each sink's type must be defined
agent.sinks.hbaseSink.type = hbase
agent.sinks.hbaseSink.table = flume
agent.sinks.hbaseSink.columnFamily = cf
agent.sinks.hbaseSink.serializer = org.apache.flume.sink.hbase.SimpleHbaseEventSerializer

#Specify the channel the sink should use
# agent.sinks.loggerSink.channel = memoryChannel

# Each channel's type is defined.
agent.channels.memChannel.type = memory
agent.channels.memChannel.capacity = 1000
agent.channels.memChannel.transactionCapacity = 100

# Bind the source and sink to the channel
agent.sources.avroSource.channels = memChannel
agent.sinks.hbaseSink.channel = memChannel
  • 在多个节点上同样这么配置然后启动
  • log4j.xml
<appender name="flume-load-balancing-avro" class="org.apache.flume.clients.log4jappender.LoadBalancingLog4jAppender">
        <param name="Hosts" value="xxx:12306 xxx:12306 xxx:12306" />
        <param name="Selector" value="ROUND_ROBIN" />
        <param name="MaxBackoff" value="2000" />
        <layout class="org.apache.log4j.PatternLayout">
            <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss} [%l] [%rms] %m" />
        </layout>
    </appender>

    <appender name="flume-async" class="org.apache.log4j.AsyncAppender">
        <param name="BufferSize" value="256" />
        <appender-ref ref="flume-load-balancing-avro" />
    </appender>
  • 运行刚刚那个java main函数就可以看到日志输出到hbase中了
  • 如果想要进入hbase格式按照自己定义的,那么需要自定义HbaseEventSerializer,这里给个简单例子
public class XXXHbaseEventSerializer implements HbaseEventSerializer {

    private static final Map<String, byte[]> LOG_LEVEL = new HashMap<String, byte[]>();
    static {
        LOG_LEVEL.put("10000", "DEBUG".getBytes(Charsets.UTF_8));
        LOG_LEVEL.put("20000", "INFO".getBytes(Charsets.UTF_8));
        LOG_LEVEL.put("30000", "WARN".getBytes(Charsets.UTF_8));
        LOG_LEVEL.put("40000", "ERROR".getBytes(Charsets.UTF_8));
        LOG_LEVEL.put("50000", "FATAL".getBytes(Charsets.UTF_8));
    }

    // 列族
    private byte[] cf;
    // 消息等级列名
    private byte[] levelCol;
    // 消息内容列名
    private byte[] plCol;
    // 消息内容
    private byte[] payload;
    // 日志级别
    private byte[] level;
    // 系统标示
    private String systemId;
    // 日志时间
    private String timestamp;

    private byte[] incCol;

    private byte[] incrementRow;

    public XXXHbaseEventSerializer() {

    }

    @Override
    public void configure(Context context) {
        this.levelCol = "l".getBytes(Charsets.UTF_8);
        this.plCol = "m".getBytes(Charsets.UTF_8);
        this.incCol = "iCol".getBytes(Charsets.UTF_8);
        incrementRow = "incRow".getBytes(Charsets.UTF_8);
    }

    @Override
    public void configure(ComponentConfiguration conf) {
    }

    @Override
    public void initialize(Event event, byte[] cf) {
        this.cf = cf;

        Map<String, String> headers = event.getHeaders();
        /** * 日志级别,10000表示DEBUG;20000表示INFO;30000表示WARN;40000表示ERROR */
        this.level = LOG_LEVEL.get(headers.get("flume.client.log4j.log.level"));
        /** * 系统ID */
        this.systemId = headers.get("flume.client.log4j.logger.name");
        /** * 时间戳,格式:1421995677371 */
        this.timestamp = headers.get("flume.client.log4j.timestamp");
        /** * 日志内容 */
        this.payload = event.getBody();

// /**
// * 转换成String对象后,希望得到的消息结构:<systemId>\t<日志内容>
// */
// String body = new String(event.getBody(), Charsets.UTF_8);
// int index = body.indexOf("\t");
// 
// if (index == -1) {
// this.systemId = "default";
// this.payload = body.getBytes(Charsets.UTF_8);
// } else {
// this.systemId = body.substring(0, index);
// this.payload = body.substring(index + 1).getBytes(Charsets.UTF_8);
// }
    }

    @Override
    public List<Row> getActions() throws FlumeException {
        List<Row> actions = new LinkedList<Row>();
        if (plCol != null) {
            byte[] rowKey;
            try {
                rowKey = XXXRowKeyGenerator.getRowKey(systemId, timestamp);
                Put put = new Put(rowKey);
                put.add(cf, plCol, payload);
                put.add(cf, levelCol, level);
                actions.add(put);
            } catch (Exception e) {
                throw new FlumeException("Could not get row key!", e);
            }

        }
        return actions;
    }

    @Override
    public List<Increment> getIncrements() {
        List<Increment> incs = new LinkedList<Increment>();
        if (incCol != null) {
            Increment inc = new Increment(incrementRow);
            inc.addColumn(cf, incCol, 1);
            incs.add(inc);
        }
        return incs;
    }

    @Override
    public void close() {
    }

配置文件修改一行:

up-agent.sinks.hbaseSink.serializer = xxx.xxx.xxx.xxx.XXXHbaseEventSerializer

说明:

  • 根据自己的需求配置flume的配置文件,可以多查查,自己先想好自己的flume拓扑结构,根据这个去配置flume
  • flume配置比较多,功能也比较强大,可以参照flume的用户手册