java代码读取yarn聚合目录日志

时间:2022-03-30 16:15:33

可以直接使用org.apache.hadoop.yarn.client.cli.LogsCLI(yarn logs -applicationId)中的main方法逻辑,如

public static void main(String[] args)
throws Exception
{
Configuration conf = new YarnConfiguration();

conf.addResource(new Path("/etc/hadoop/conf.cloudera.yarn/core-site.xml"));
conf.addResource(new Path("/etc/hadoop/conf.cloudera.yarn/yarn-site.xml"));
conf.addResource(new Path("/etc/hadoop/conf.cloudera.yarn/hdfs-site.xml"));

LogsCLI logDumper = new LogsCLI();
logDumper.setConf(conf);
int exitCode = logDumper.run(args);
System.exit(exitCode);
}

也可以仿照他的逻辑自己实现如下:

Configuration conf = new YarnConfiguration();
conf.addResource(new Path("/etc/hadoop/conf.cloudera.yarn/core-site.xml"));
conf.addResource(new Path("/etc/hadoop/conf.cloudera.yarn/yarn-site.xml"));
conf.addResource(new Path("/etc/hadoop/conf.cloudera.yarn/hdfs-site.xml"));
String appIdStr="application_1529377575914_8380";

PrintStream out=new PrintStream(appIdStr);
ApplicationId appId = null;
appId = ConverterUtils.toApplicationId(appIdStr);

Path remoteRootLogDir = new Path(conf.get("yarn.nodemanager.remote-app-log-dir", "/tmp/logs"));

String user = UserGroupInformation.getCurrentUser().getShortUserName();;
String logDirSuffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(conf);

Path remoteAppLogDir = LogAggregationUtils.getRemoteAppLogDir(remoteRootLogDir, appId, user, logDirSuffix);
RemoteIterator<FileStatus> nodeFiles;
try
{
Path qualifiedLogDir = FileContext.getFileContext(conf).makeQualified(remoteAppLogDir);

nodeFiles = FileContext.getFileContext(qualifiedLogDir.toUri(), conf).listStatus(remoteAppLogDir);
}
catch (FileNotFoundException fnf)
{
logDirNotExist(remoteAppLogDir.toString());
return -1;
}

boolean foundAnyLogs = false;
while (nodeFiles.hasNext())
{
FileStatus thisNodeFile = (FileStatus)nodeFiles.next();
if (!thisNodeFile.getPath().getName().endsWith(".tmp"))
{
AggregatedLogFormat.LogReader reader = new AggregatedLogFormat.LogReader(conf, thisNodeFile.getPath());
try
{
//System.out.println(thisNodeFile.getPath().getName());
AggregatedLogFormat.LogKey key = new AggregatedLogFormat.LogKey();
DataInputStream valueStream = reader.next(key);
for (;;)
{
if (valueStream != null)
{
String containerString = "\n\nContainer: " + key + " on " + thisNodeFile.getPath().getName();

out.println(containerString);
out.println(StringUtils.repeat("=", containerString.length()));
try
{
for (;;)
{
AggregatedLogFormat.LogReader.readAContainerLogsForALogType(valueStream, out, thisNodeFile.getModificationTime());

foundAnyLogs = true;
}

}
catch (EOFException eof)
{
key = new AggregatedLogFormat.LogKey();
valueStream = reader.next(key);

}

}else{
break;
}
}
}
finally
{
reader.close();
}
}
}
if (!foundAnyLogs)
{
emptyLogDir(remoteAppLogDir.toString());
return -1;
}
return 0;
}