从Spark Streaming 收集应用程序日志到Flume agent的配置方法

时间:2021-07-30 15:36:28

从Spark Streaming 收集应用程序日志到Flume agent的配置方法

由于Spark 本身也是log4j收集日志,所以我们在应用程序里再配置一个log4j。先说一下spark Streaming 启动方法,在spark 启动的时候添加以下两个参数:--conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=log4j.properties". 比如:

spark-submit --class com.juniorchina.modeling.entry.spark.streaming.Main \
--num-executors 5 \
--executor-cores 8 \
--master yarn \
--deploy-mode cluster \
--files "log4j-executor.properties" \
--conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=log4j-executor.properties" \
user_tag-1.0-SNAPSHOT-jar-with-dependencies.jar

log4j-executor.properties配置

# Define some default values that can be overridden by system properties.
#
# For testing, it may also be convenient to specify
# -Dflume.root.logger=DEBUG,console when launching flume.

# Set everything to be logged to the console
log4j.rootCategory=INFO,console
log4j.logger.UserModelJob=INFO,console,flume

# don't log to rootCategory
log4j.additivity.UserModelJob=false

# info log file
log4j.appender.file=org.apache.log4j.DailyRollingFileAppender
log4j.appender.file.Encoding=UTF8
log4j.appender.file.Threshold=INFO
log4j.appender.file.File=logs/flume.log
log4j.appender.file.DatePattern='_'yyyyMMdd'.log'
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=[%p] [%d{yy/MM/dd HH:mm:ss}] [%c{1}]: %m%n


# LOGFILE
# Stock log4j rolling file appender
# Default log rotation configuration
log4j.appender.LOGFILE=org.apache.log4j.RollingFileAppender
log4j.appender.LOGFILE.MaxFileSize=10KB
log4j.appender.LOGFILE.MaxBackupIndex=10
log4j.appender.LOGFILE.File=${flume.log.dir}/${flume.log.file}
log4j.appender.LOGFILE.layout=org.apache.log4j.PatternLayout
log4j.appender.LOGFILE.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS}|%p|%l|%t|%c|%m%n


# Warning: If you enable the following appender it will fill up your disk if you don't have a cleanup job!
# This uses the updated rolling file appender from log4j-extras that supports a reliable time-based rolling policy.
# See http://logging.apache.org/log4j/companions/extras/apidocs/org/apache/log4j/rolling/TimeBasedRollingPolicy.html
# Add "DAILY" to flume.root.logger above if you want to use this
log4j.appender.DAILY=org.apache.log4j.rolling.RollingFileAppender
log4j.appender.DAILY.rollingPolicy=org.apache.log4j.rolling.TimeBasedRollingPolicy
log4j.appender.DAILY.rollingPolicy.ActiveFileName=${flume.log.dir}/${flume.log.file}.log
log4j.appender.DAILY.rollingPolicy.FileNamePattern=${flume.log.dir}/${flume.log.file}.%d{yyyyMMddHH}.log
log4j.appender.DAILY.layout=org.apache.log4j.PatternLayout
log4j.appender.DAILY.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss},%c,%m,(%C.%M:%L)%n



# console
# Add "console" to flume.root.logger above if you want to use this
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss},(%F),%c,%m%n

# flume
log4j.appender.flume=org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname=192.168.5.151
log4j.appender.flume.Port=41414
log4j.appender.flume.AppName=usermodeling
log4j.appender.flume.UnsafeMode=false
log4j.appender.flume.layout=org.apache.log4j.PatternLayout
log4j.appender.flume.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss},(%F),%c,%m%n



# Settings to quiet third party logs that are too verbose
log4j.logger.org.eclipse.jetty=WARN
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO

log4jappender.Log4jAppender

对log4jappender.Log4jAppender做了一些修改,在header中添加app.name,app.event.type,app.source.host参数。核心代码:

 @Override
public synchronized void append(LoggingEvent event) throws FlumeException {
//If rpcClient is null, it means either this appender object was never
//setup by setting hostname and port and then calling activateOptions
//or this appender object was closed by calling close(), so we throw an
//exception to show the appender is no longer accessible.
boolean ready = true;
if (rpcClient == null) {
String errorMsg = "Cannot Append to Appender! Appender either closed or" +
" not setup correctly!";
LogLog.error(errorMsg);
// start a new thread try
if (unsafeMode) {
reconnect();
if(rpcClient == null || !rpcClient.isActive()){
ready = false;
}
}else {
throw new FlumeException(errorMsg);
}
}

if(unsafeMode && !ready){
return;
}

if (!rpcClient.isActive()) {
reconnect();
}

//Client created first time append is called.
Map<String, String> hdrs = new HashMap<String, String>();
//hdrs.put(Log4jAvroHeaders.LOGGER_NAME.toString(), event.getLoggerName());
hdrs.put(Log4jAvroHeaders.TIMESTAMP.toString(),
String.valueOf(event.timeStamp));

//To get the level back simply use
//LoggerEvent.toLevel(hdrs.get(Integer.parseInt(
//Log4jAvroHeaders.LOG_LEVEL.toString()))
hdrs.put(Log4jAvroHeaders.LOG_LEVEL.toString(),
String.valueOf(event.getLevel().toInt()));

hdrs.put(Log4jAvroHeaders.APP_NAME.toString(), appName);
hdrs.put(Log4jAvroHeaders.APP_Event_TYPE.toString(), event.getLoggerName());

try {
String hostIp = null;
Enumeration<NetworkInterface> networkInterfaces = NetworkInterface.getNetworkInterfaces();
while (networkInterfaces.hasMoreElements()) {
NetworkInterface ni = (NetworkInterface) networkInterfaces.nextElement();
Enumeration<InetAddress> nias = ni.getInetAddresses();
while (nias.hasMoreElements()) {
InetAddress ia = (InetAddress) nias.nextElement();
if (!ia.isLinkLocalAddress()
&& !ia.isLoopbackAddress()
&& ia instanceof Inet4Address) {
hostIp = ia.getHostAddress();
}
}
}
hdrs.put(Log4jAvroHeaders.APP_SOURCE_HOST.toString(), hostIp + "");
} catch (SocketException e) {
String msg = "Can't get localhost IP";
LogLog.error(msg);
if (unsafeMode) {
return;
}
throw new FlumeException(msg + " Exception follows.", e);
}


Event flumeEvent;
Object message = event.getMessage();
if (message instanceof GenericRecord) {
GenericRecord record = (GenericRecord) message;
populateAvroHeaders(hdrs, record.getSchema(), message);
flumeEvent = EventBuilder.withBody(serialize(record, record.getSchema()), hdrs);
} else if (message instanceof SpecificRecord || avroReflectionEnabled) {
Schema schema = ReflectData.get().getSchema(message.getClass());
populateAvroHeaders(hdrs, schema, message);
flumeEvent = EventBuilder.withBody(serialize(message, schema), hdrs);
} else {
hdrs.put(Log4jAvroHeaders.MESSAGE_ENCODING.toString(), "UTF8");
String msg = layout != null ? layout.format(event) : message.toString();
flumeEvent = EventBuilder.withBody(msg, Charset.forName("UTF8"), hdrs);
}

try {
rpcClient.append(flumeEvent);
} catch (Exception e) {
String msg = "Flume append() failed.";
LogLog.error(msg);
if (unsafeMode) {
return;
}
throw new FlumeException(msg + " Exception follows.", e);
}
}

可以去github clone flume-log4j-appender https://github.com/kntao/flume/tree/flume-1.5/flume-ng-clients/flume-ng-log4jappender