源码分析Flume启动过程

时间:2021-01-01 04:22:59

对Flume-NG的agent启动过程进行详细的分析。

启动过程

flume的main函数在Application.java中,在flume-ng的shell启动脚本中会用java来起flume:

$EXEC $JAVA_HOME/bin/java $JAVA_OPTS $FLUME_JAVA_OPTS "${arr_java_props[@]}" -cp "$FLUME_CLASSPATH" -Djava.library.path=$FLUME_JAVA_LIBRARY_PATH "$FLUME_APPLICATION_CLASS" $*

在main函数中,会检查一系列参数,最重要的是no-reload-conf,根据reload的不同,判断是否动态加载配置文件,然后start:

List<LifecycleAware> components = Lists.newArrayList();

if (reload) {
EventBus eventBus = new EventBus(agentName + "-event-bus");
PollingPropertiesFileConfigurationProvider configurationProvider =
new PollingPropertiesFileConfigurationProvider(
agentName, configurationFile, eventBus, 30);
components.add(configurationProvider);
application = new Application(components);
eventBus.register(application);
} else {
PropertiesFileConfigurationProvider configurationProvider =
new PropertiesFileConfigurationProvider(
agentName, configurationFile);
application = new Application();
application.handleConfigurationEvent(configurationProvider
.getConfiguration());
}
application.start();

这里面有很多需要讲解的:


1.1 LifecycleAware接口

实现这个接口的类是有定义好的一系列状态转化的生命周期的:

@InterfaceAudience.Public
@InterfaceStability.Stable
public interface LifecycleAware {

/**
* <p>
* Starts a service or component.
* </p>
* <p>
* Implementations should determine the result of any start logic and effect
* the return value of {@link #getLifecycleState()} accordingly.
* </p>
*
* @throws LifecycleException
* @throws InterruptedException
*/

public void start();

/**
* <p>
* Stops a service or component.
* </p>
* <p>
* Implementations should determine the result of any stop logic and effect
* the return value of {@link #getLifecycleState()} accordingly.
* </p>
*
* @throws LifecycleException
* @throws InterruptedException
*/

public void stop();

/**
* <p>
* Return the current state of the service or component.
* </p>
*/

public LifecycleState getLifecycleState();

}

比如给的样例:

 * Example usage
* </p>
* <code>
* public class MyService implements LifecycleAware {
*
* private LifecycleState lifecycleState;
*
* public MyService() {
* lifecycleState = LifecycleState.IDLE;
* }
*
* @Override
* public void start(Context context) throws LifecycleException,
* InterruptedException {
*
* ...your code does something.
*
* lifecycleState = LifecycleState.START;
* }
*
* @Override
* public void stop(Context context) throws LifecycleException,
* InterruptedException {
*
* try {
* ...you stop services here.
* } catch (SomethingException) {
* lifecycleState = LifecycleState.ERROR;
* }
*
* lifecycleState = LifecycleState.STOP;
* }
*
* @Override
* public LifecycleState getLifecycleState() {
* return lifecycleState;
* }
*
* }
* </code>
*/

每一个flume node,每一个flume component(source, channel, sink)都实现了这个接口。


1.2 EventBus事件监听和发布订阅模式

回到上面,再往前判断是否设置了no-reload-conf,如果设置了,会新建一个EventBus,它是Guava(Google写的java库)的事件处理机制,是设计模式中的观察者模式(生产/消费者编程模型)的优雅实现。

使用Guava之后, 如果要订阅消息, 就不用再继承指定的接口, 只需要在指定的方法上加上@Subscribe注解即可。

我们看flume是如何用的:

PollingPropertiesFileConfigurationProvider configurationProvider =
new PollingPropertiesFileConfigurationProvider(
agentName, configurationFile, eventBus, 30);

将eventBus传给PollingPropertiesFileConfigurationProvider,一方面它继承了PropertiesFileConfigurationProvider类,说明它是配置文件的提供者,另一方面,它实现了LifecycleAware接口,说明它是有生命周期的。

那么它在生命周期做了什么?

  @Override
public void start() {
LOGGER.info("Configuration provider starting");

Preconditions.checkState(file != null,
"The parameter file must not be null");

executorService = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("conf-file-poller-%d")
.build());

FileWatcherRunnable fileWatcherRunnable =
new FileWatcherRunnable(file, counterGroup);

executorService.scheduleWithFixedDelay(fileWatcherRunnable, 0, interval,
TimeUnit.SECONDS);

lifecycleState = LifecycleState.START;

LOGGER.debug("Configuration provider started");
}

@Override
public void stop() {
LOGGER.info("Configuration provider stopping");

executorService.shutdown();
try{
while(!executorService.awaitTermination(500, TimeUnit.MILLISECONDS)) {
LOGGER.debug("Waiting for file watcher to terminate");
}
} catch (InterruptedException e) {
LOGGER.debug("Interrupted while waiting for file watcher to terminate");
Thread.currentThread().interrupt();
}
lifecycleState = LifecycleState.STOP;
LOGGER.debug("Configuration provider stopped");
}

可以看到,在start时候,它起了一个周期调用线程executorService,这个周期调用线程又回每隔30s调用fileWatcherRunnable这个配置文件监控线程,在FileWatcherRunnable这里面,会去监听flume配置文件的变化,如果修改时间发生变化,eventBus会说我感兴趣的事件发生了!即eventBus.post(getConfiguration())

    @Override
public void run() {
LOGGER.debug("Checking file:{} for changes", file);

counterGroup.incrementAndGet("file.checks");

long lastModified = file.lastModified();

if (lastModified > lastChange) {
LOGGER.info("Reloading configuration file:{}", file);

counterGroup.incrementAndGet("file.loads");

lastChange = lastModified;

try {
eventBus.post(getConfiguration());
} catch (Exception e) {
LOGGER.error("Failed to load configuration data. Exception follows.",
e);
} catch (NoClassDefFoundError e) {
LOGGER.error("Failed to start agent because dependencies were not " +
"found in classpath. Error follows.", e);
} catch (Throwable t) {
// caught because the caller does not handle or log Throwables
LOGGER.error("Unhandled error", t);
}
}
}

之后对该事件感兴趣的listener就会进行事件处理,这里flume本身的Application对配置文件的变化感兴趣:

eventBus.register(application);
# 相当于
eventBus.register(listener);

在application中,用注解@Subscribe标明的方法就告诉了我们,事件发生后,如何处理:

  @Subscribe
public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) {
stopAllComponents();
startAllComponents(conf);
}

到这里为止,讲清楚了如果启动flume时候配置了no-reload-con参数,flume就会动态加载配置文件,默认每30秒检查一次配置文件,如果有修改,会重启所有的components;如果没有配置该参数,则只会启动一次。


1.3 Application的start前与后

1.3.1start前:
components.add(configurationProvider);
application = new Application(components);

将监听文件变化的类作为一个component

  public Application(List<LifecycleAware> components) {
this.components = components;
supervisor = new LifecycleSupervisor();
}

然后这里出现了另一个重要的类LifecycleSupervisor

  public LifecycleSupervisor() {
lifecycleState = LifecycleState.IDLE;
supervisedProcesses = new HashMap<LifecycleAware, Supervisoree>();
monitorFutures = new HashMap<LifecycleAware, ScheduledFuture<?>>();
monitorService = new ScheduledThreadPoolExecutor(10,
new ThreadFactoryBuilder().setNameFormat(
"lifecycleSupervisor-" + Thread.currentThread().getId() + "-%d")
.build());
monitorService.setMaximumPoolSize(20);
monitorService.setKeepAliveTime(30, TimeUnit.SECONDS);
purger = new Purger();
needToPurge = false;
}

可以看到,它构建了一个定期执行任务的线程池,大小是10


1.3.2 start后:application.start();

对每一个component使用SupervisorPolicy.AlwaysRestartPolicy的策略,有监督的执行:

public synchronized void start() {
for(LifecycleAware component : components) {
supervisor.supervise(component,
new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
}
}

再看supervisor.supervise()方法的主要流程:

Supervisoree process = new Supervisoree();
process.status = new Status();

process.policy = policy;
process.status.desiredState = desiredState;
process.status.error = false;

MonitorRunnable monitorRunnable = new MonitorRunnable();
monitorRunnable.lifecycleAware = lifecycleAware;
monitorRunnable.supervisoree = process;
monitorRunnable.monitorService = monitorService;

supervisedProcesses.put(lifecycleAware, process);

ScheduledFuture<?> future = monitorService.scheduleWithFixedDelay(
monitorRunnable, 0, 3, TimeUnit.SECONDS);
monitorFutures.put(lifecycleAware, future);

对每一个component会通过线程池ScheduledThreadPoolExecutor monitorService起一个MonitorRunnable线程执行,在monitorRunnable中的run()方法中,是每一个component的执行逻辑,根据desiredState的不同,选择不同的状态执行方法去执行,比如:

  1. PollingPropertiesFileConfigurationProvider有对应监控配置文件的start、stop逻辑;
  2. ExecSource有对应的source的start、stop执行逻辑;
  3. MemoryChannel有对应channel的start、stop逻辑;
  4. ElasticSearchSink有对应sink的start、stop逻辑;

等等…

那每个component,也就是每个monitorRunnable隔多长时间会执行一次呢?

    ScheduledFuture<?> future = monitorService.scheduleWithFixedDelay(
monitorRunnable, 0, 3, TimeUnit.SECONDS);

可以看出,这里设定的执行线程计划是:初始0s延迟,每次任务完成后延迟3s再执行一次任务,比如,ExecSource每次从数据源取数据,取完后回来隔3s再去取一次。

在application.start()后,还有一个钩子方法:

final Application appReference = application;
Runtime.getRuntime().addShutdownHook(new Thread("agent-shutdown-hook") {
@Override
public void run() {
appReference.stop();
}
});

它主要在agent停止后用来执行内存清理、对象销毁等操作。

好了,以上就是对flume agent启动过程的详细分析。