时间:2024-01-21 19:33:03

Overview

<Flume><Source Code><Flume源码阅读笔记>

  • source采集的日志首先会传入ChannelProcessor, 在其内首先会通过Interceptors进行过滤加工,然后通过ChannelSelector选择channel。

  • Source和Sink之间是异步的,sink只需要监听自己关系的Channel的变化即可。

  • sink存在写失败的情况,flume提供了如下策略:

    • 默认是一个sink,若写入失败,则该事务失败,稍后重试。

    • 故障转移策略:给多个sink定义优先级,失败时会路由到下一个优先级的sink。sink只要抛出一次异常就会被认为是失败了,则从存活Sink中移除,然后指数级时间等待重试,默认是等待1s开始重试,最大等待重试时间是30s.

      <Flume><Source Code><Flume源码阅读笔记>

  • flume还提供了负载均衡策略:默认提供轮训和随机两种算法。通过抽象一个类似ChannelSelector的SinkSelector进行选择。

  • 以上,对于Source和sink如何异步、channel如何实现事务机制,详见后面的具体源码分析。

The whole process

  • 首先是flume的启动, 提供了两种启动方式:使用EmbeddedAgent内嵌在Java应用中或使用Application单独启动一个进程。 一般使用Application起一个进程比较多,我们这里也主要分析这种方式。

  • 程序入口:org.apache.flume.node.Application的main方法。

  • 注:因为暂时还没有了解到Zookeeper原理,所以这里关于ZK的部分就跳过了。

  • flume启动流程大致如下:

    1. 设置默认值启动参数,参数是否是必须的

      Options options = new Options();

      Option option = new Option("n", "name", true, "the name of this agent");
      option.setRequired(true);
      options.addOption(option);

      option = new Option("f", "conf-file", true,
      "specify a config file (required if -z missing)");
      option.setRequired(false);
      options.addOption(option);
      ......
    2. 解析命令行参数

      if (commandLine.hasOption('h')) {
      new HelpFormatter().printHelp("flume-ng agent", options, true);
      return;
      }
      String agentName = commandLine.getOptionValue('n');
      boolean reload = !commandLine.hasOption("no-reload-conf"); // 是否reload配置文件

      if (commandLine.hasOption('z') || commandLine.hasOption("zkConnString")) {
      isZkConfigured = true;
      }
    3. Zookepper相关:暂时略

    4. 打开配置文件

         
       if (isZkConfigured) {
      ... // 若配置了zk,则使用zk参数启动
      } else {
      // 打开配置文件,如果不存在则快速失败
      File configurationFile = new File(commandLine.getOptionValue('f'));

      // 确保没有配置文件的时候agent会启动失败
      if (!configurationFile.exists()) {
      ...// If command line invocation, then need to fail fast
      }
      List<LifecycleAware> components = Lists.newArrayList();

      // 若需要定期reload配置文件
      if (reload) {
      // 使用EventBus事件总线, to allow publish-subscribe-style communication
      EventBus eventBus = new EventBus(agentName + "-event-bus");
      // 读取配置文件,使用定期轮训拉起策略,默认30s拉取一次
      PollingPropertiesFileConfigurationProvider configurationProvider =
      new PollingPropertiesFileConfigurationProvider(
      agentName, configurationFile, eventBus, 30);
      components.add(configurationProvider);
      // 向Application注册组件
      application = new Application(components);
      // 向EventBus注册本应用,EB会自动注册Application中使用@Subscribe声明的方法
      // TODO: EventBus, and why reload configuration
      eventBus.register(application);
      } else {
      // 若配置文件不支持定期reload
      PropertiesFileConfigurationProvider configurationProvider =
      new PropertiesFileConfigurationProvider(agentName, configurationFile);
      application = new Application();
      // 直接使用配置文件初始化Flume组件
      application.handleConfigurationEvent(configurationProvider.getConfiguration());
      }
      }
    5. reload conf:若需要reload,则使用事件总线EventBus实现,Application的handleConfigurationEvent是事件订阅者,PollingPropertiesFileConfigurationProvider是事件发布者,其会定期轮训检查文件是否变更,如果变更则重新读取配置文件,发布配置文件事件变更,而handleConfigurationEvent会收到该配置变更重新进行初始化。

    6. handleConfigurationEvent:

       @Subscribe
      public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) {
      // MaterializedConfiguration存储Flume运行时需要的组件:Source、Channel、Sink、SourceRunner、SinkRunner等。其是通过ConfigurationProvider进行初始化获取,比如PollingPropertiesFileConfigurationProvider会读取配置文件然后进行组件的初始化。
      stopAllComponents(); // 停止所有组件
      startAllComponents(conf);// 使用配置文件初始化所有组件
      }
    7. startAllComponents

      • 要首先启动channels,等待所有channels启动才能继续。然后启动SinkRunner,准备好消费者。最后启动SourceRunner开始进行采集日志。

      • LifecycleSupervisor是组件守护哨兵,对这些组件进行守护,出问题时默认策略是自动重启。

      • 这里的启动都是supervisor.supervise(entry.getValue(),new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); 这是如何启动的,我们后面再介绍。

      private void startAllComponents(MaterializedConfiguration materializedConfiguration) {
      logger.info("Starting new configuration:{}", materializedConfiguration);

      this.materializedConfiguration = materializedConfiguration;

      // 启动channels。
      for (Entry<String, Channel> entry :
      materializedConfiguration.getChannels().entrySet()) {
      try {
      logger.info("Starting Channel " + entry.getKey());
      // TODO: LifecycleSupervisor启动
      // new SupervisorPolicy.AlwaysRestartPolicy():使用失败时总是重启的策略
      // LifecycleState.START: 初始化组件默认状态为START
      supervisor.supervise(entry.getValue(),
      new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
      } catch (Exception e) {
      logger.error("Error while starting {}", entry.getValue(), e);
      }
      }

      /*
      * Wait for all channels to start.
      */
      for (Channel ch : materializedConfiguration.getChannels().values()) {
      while (ch.getLifecycleState() != LifecycleState.START
      && !supervisor.isComponentInErrorState(ch)) {
      try {
      logger.info("Waiting for channel: " + ch.getName() +
      " to start. Sleeping for 500 ms");
      Thread.sleep(500);
      } catch (InterruptedException e) {
      logger.error("Interrupted while waiting for channel to start.", e);
      Throwables.propagate(e);
      }
      }
      }

      // 启动sinkRunner
      for (Entry<String, SinkRunner> entry : materializedConfiguration.getSinkRunners().entrySet()) {
      try {
      logger.info("Starting Sink " + entry.getKey());
      supervisor.supervise(entry.getValue(),
      new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
      } catch (Exception e) {
      logger.error("Error while starting {}", entry.getValue(), e);
      }
      }

      // 启动SourceRunner TODO: SourceRunner & SinkRunner
      for (Entry<String, SourceRunner> entry :
      materializedConfiguration.getSourceRunners().entrySet()) {
      try {
      logger.info("Starting Source " + entry.getKey());
      supervisor.supervise(entry.getValue(),
      new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
      } catch (Exception e) {
      logger.error("Error while starting {}", entry.getValue(), e);
      }
      }

      this.loadMonitoring();
      }
    8. 之后main函数调用了application.start();

      /**
      其循环Application注册的组件,然后守护哨兵对它进行守护,默认策略是出现问题会自动重启组件,假设我们支持reload配置文件,则之前启动Application时注册过PollingPropertiesFileConfigurationProvider组件,即该组件会被守护哨兵守护着,出现问题默认策略自动重启
      **/
      public synchronized void start() {
      // private final List<LifecycleAware> components;
      for (LifecycleAware component : components) {
      // private final LifecycleSupervisor supervisor;
      supervisor.supervise(component,
      new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
      }
      }

      相应的stop函数。首先是main函数中:

      final Application appReference = application;
      // Runtinme.getRuntime(): Returns the runtime object associated with the current Java application.
      /**
      addShutdownHook: 注册一个新的虚拟机关闭钩子。
      虚拟机shutdown有两种情况:1)当最后一个非守护进行户外那个退出或调用system.exit时,程序正常退出;2)JVM通过ctrl-c等被用户中断。
      **/
      Runtime.getRuntime().addShutdownHook(new Thread("agent-shutdown-hook") {
      @Override
      public void run() {
      appReference.stop();
      }
      });
      public synchronized void stop() {
      // 关闭守护哨兵和监控服务。
      supervisor.stop();
      if (monitorServer != null) {
      monitorServer.stop();
      }
      }
    9. 至此,Application整个流程就分析完了。

  • 整体流程可以总结为:

    1. 首先初始化命令行配置;

    2. 接着读取配置文件;

    3. 根据是否需要reload初始化配置文件中的组件;如果需要reload会使用EventBus进行发布订阅变化;

    4. 接着创建Application,创建守护哨兵LifecycleSupervisor,并先停止所有组件,接着启动所有组件;启动顺序:Channel、SinkRunner、SourceRunner,并把这些组件注册给守护哨兵、初始化监控服务MonitorService;停止顺序:SourceRunner、SinkRunner、Channel;

    5. 如果配置文件需要定期reload,则需要注册PollingPropertiesFileConfigurationProvider到守护哨兵;

    6. 最后注册虚拟机关闭钩子,停止守护哨兵和监控服务。

LifecycleSupervisor

  • 守护哨兵,负责监控和重启组件

  • My: 所有需要被监控和重启的组件都应implements LifecycleAware

    public class LifecycleSupervisor implements LifecycleAware {
    public LifecycleSupervisor() {
    lifecycleState = LifecycleState.IDLE;
    // 存放被监控的组件
    supervisedProcesses = new HashMap<LifecycleAware, Supervisoree>();
    // 存放正在被监控的组件
    monitorFutures = new HashMap<LifecycleAware, ScheduledFuture<?>>();
    // 创建监控服务线程池
    monitorService = new ScheduledThreadPoolExecutor(10,
    new ThreadFactoryBuilder().setNameFormat(
    "lifecycleSupervisor-" + Thread.currentThread().getId() + "-%d")
    .build());
    monitorService.setMaximumPoolSize(20);
    monitorService.setKeepAliveTime(30, TimeUnit.SECONDS);
    // 定期清理被取消的组件
    purger = new Purger();
    // 默认不进行清理
    needToPurge = false;
    }
    ... // start() & stop()... // 进行组件守护
    public synchronized void supervise(LifecycleAware lifecycleAware,
    SupervisorPolicy policy, LifecycleState desiredState) {
    if (this.monitorService.isShutdown()
    || this.monitorService.isTerminated()
    || this.monitorService.isTerminating()) {
    ...// 如果哨兵已停止则抛出异常
    }

    // 初始化守护组件
    Supervisoree process = new Supervisoree();
    process.status = new Status();

    // 默认策略是失败重启
    process.policy = policy;
    process.status.desiredState = desiredState; // 初始化组件默认状态,一般为START
    process.status.error = false;

    // 组件监控器,用于定时获取组件的最新状态,或重启组件。后面会介绍MonitorRunnable具体做什么。
    MonitorRunnable monitorRunnable = new MonitorRunnable();
    monitorRunnable.lifecycleAware = lifecycleAware;
    monitorRunnable.supervisoree = process;
    monitorRunnable.monitorService = monitorService;

    supervisedProcesses.put(lifecycleAware, process);

    // 以固定时间间隔执行monitorRunnable线程
    // scheduleWithFixedDelay: Creates and executes a periodic action. If any execution of the task encounters an exception, subsequent executions are suppressed. Otherwise, the task will only terminate via cancellation or termination of the executor.
    // 所以需要把所有异常捕获,才能保证定时任务继续执行。
    ScheduledFuture<?> future = monitorService.scheduleWithFixedDelay(
    monitorRunnable, 0, 3, TimeUnit.SECONDS);
    monitorFutures.put(lifecycleAware, future);
    }
  • MonitorRunnable:负责进行组件状态迁移或组件故障恢复

    public static class MonitorRunnable implements Runnable {

    public ScheduledExecutorService monitorService;
    public LifecycleAware lifecycleAware;
    public Supervisoree supervisoree;

    @Override
    public void run() {
    long now = System.currentTimeMillis();

    try {
    if (supervisoree.status.firstSeen == null) {
    logger.debug("first time seeing {}", lifecycleAware);

    supervisoree.status.firstSeen = now; // 记录第一次状态查看时间
    }

    supervisoree.status.lastSeen = now; // 记录最后一次状态查看时间
    synchronized (lifecycleAware) {
    // 如果守护组件被丢弃或出错了,则直接返回
    if (supervisoree.status.discard) {
    // 也就是此时已经调用了unsupervise
    logger.info("Component has already been stopped {}", lifecycleAware);
    return;
    } else if (supervisoree.status.error) {
    logger.info("Component {} is in error state, and Flume will not"
    + "attempt to change its state", lifecycleAware);
    return;
    }

    // 更新最后一次查看到的状态
    supervisoree.status.lastSeenState = lifecycleAware.getLifecycleState();

    // 如果组件的状态和守护组件看到的状态不一致,则以守护组件的状态为准,然后进行初始化
    if (!lifecycleAware.getLifecycleState().equals(
    supervisoree.status.desiredState)) {
    switch (supervisoree.status.desiredState) {
    // 如果是启动状态,则启动组件。# 最开始的时候组件应该就是这么启动的
    case START:
    try {
    lifecycleAware.start();
    } catch (Throwable e) {
    logger.error("Unable to start " + lifecycleAware
    + " - Exception follows.", e);
    if (e instanceof Error) {
    // This component can never recover, shut it down.
    supervisoree.status.desiredState = LifecycleState.STOP;
    try {
    lifecycleAware.stop();
    logger.warn("Component {} stopped, since it could not be"
    + "successfully started due to missing dependencies",
    lifecycleAware);
    } catch (Throwable e1) {
    logger.error("Unsuccessful attempt to "
    + "shutdown component: {} due to missing dependencies."
    + " Please shutdown the agent"
    + "or disable this component, or the agent will be"
    + "in an undefined state.", e1);
    supervisoree.status.error = true;
    if (e1 instanceof Error) {
    throw (Error) e1;
    }
    // Set the state to stop, so that the conf poller can
    // proceed.
    }
    }
    supervisoree.status.failures++;
    }
    break;
    case STOP:
    try {
    lifecycleAware.stop();
    } catch (Throwable e) {
    logger.error("Unable to stop " + lifecycleAware
    + " - Exception follows.", e);
    if (e instanceof Error) {
    throw (Error) e;
    }
    supervisoree.status.failures++;
    }
    break;
    default:
    logger.warn("I refuse to acknowledge {} as a desired state",
    supervisoree.status.desiredState);
    }

    if (!supervisoree.policy.isValid(lifecycleAware, supervisoree.status)) {
    logger.error(
    "Policy {} of {} has been violated - supervisor should exit!",
    supervisoree.policy, lifecycleAware);
    }
    }
    }
    } catch (Throwable t) {
    logger.error("Unexpected error", t);
    }
    logger.debug("Status check complete");
    }
    }

Source

SourceRunner

  • 首先是SourceRunner,它控制how a source is driven。​

    <Flume><Source Code><Flume源码阅读笔记>

  • 它是一个用来实例化derived classes(派生类)的抽象类。 根据指定的source,来通过其内的static factory method 来实例化runner。

      // 根据指定source的类型来实例化一个source runner的静态工厂方法
    // 输入是要运行的source,返回可以运行指定source的runner
    public static SourceRunner forSource(Source source) {
    SourceRunner runner = null;

    if (source instanceof PollableSource) {
    runner = new PollableSourceRunner();
    ((PollableSourceRunner) runner).setSource((PollableSource) source);
    } else if (source instanceof EventDrivenSource) {
    runner = new EventDrivenSourceRunner();
    ((EventDrivenSourceRunner) runner).setSource((EventDrivenSource) source);
    } else {
    throw new IllegalArgumentException("No known runner type for source "
    + source);
    }

    return runner;
    }

EventDrivenSourceRunner

  • starts、stops and manages EventDrivenSource event-driven sources

  • 其内有如下几个方法:

    • 构造方法

      public EventDrivenSourceRunner() {
      lifecycleState = LifecycleState.IDLE;
      }
    • start()

      @Override
      public void start() {
      Source source = getSource(); //获取Source
      ChannelProcessor cp = source.getChannelProcessor(); //Channel处理器
      cp.initialize(); //初始化Channel处理器
      source.start(); //启动Source
      lifecycleState = LifecycleState.START; //本组件状态改成启动状态
      }
    • stop()、toString()、getLifecycleState()

PollableSourceRunner

public class PollableSourceRunner extends SourceRunner {
@Override
public void start() {
PollableSource source = (PollableSource) getSource(); //获取Source
ChannelProcessor cp = source.getChannelProcessor(); //Channel处理器
cp.initialize(); //初始化channel处理器
source.start(); //启动source

runner = new PollingRunner(); //新建一个PollingRunner线程来拉取数据
runner.source = source;
runner.counterGroup = counterGroup;
runner.shouldStop = shouldStop;

runnerThread = new Thread(runner);
runnerThread.setName(getClass().getSimpleName() + "-" +
source.getClass().getSimpleName() + "-" + source.getName());
runnerThread.start();

lifecycleState = LifecycleState.START;
}
}
  • PollingRunner线程

@Override
public void run() {
while (!shouldStop.get()) { //如果没有停止,则一直在死循环运行
counterGroup.incrementAndGet("runner.polls"); //原子操作

try {
//调用PollableSource的process方法进行轮训拉取,然后判断是否遇到了失败补偿
if (source.process().equals(PollableSource.Status.BACKOFF)) {/
counterGroup.incrementAndGet("runner.backoffs");

//失败补偿时暂停线程处理,等待超时时间之后重试
Thread.sleep(Math.min(
counterGroup.incrementAndGet("runner.backoffs.consecutive")
* source.getBackOffSleepIncrement(), source.getMaxBackOffSleepInterval()));
} else {
counterGroup.set("runner.backoffs.consecutive", 0L);
}
} catch (InterruptedException e) {
}
}
}
}
}
  • TODO

Source

public interface Source extends LifecycleAware, NamedComponent {
public void setChannelProcessor(ChannelProcessor channelProcessor);
public ChannelProcessor getChannelProcessor();
}
  • 继承了LifecycleAware接口,然后只提供了ChannelProcessor的setter和getter接口。其中:

    • 它的的所有逻辑的实现应该在LifecycleAware接口的start和stop中实现;

    • ChannelProcessor用来进行日志流的过滤和Channel的选择及调度。

  • 由上述的Runner我们知道,Source 提供了两种机制: PollableSource (轮训拉取)和 EventDrivenSource (事件驱动)

  • Source作用就是监听日志,采集,然后交给ChannelProcessor处理。

EventDrivenSource

  • 事件驱动型source不需要外部driver来获取event,EventDriven是一个implement Source的空接口。

  • 从这里开始~~~‘

Channel

  • 通过 Channel 实现了 Source 和 Sink 的解耦,可以实现多对多的关联,和 Source 、 Sink 的异步化

  • Channel exposes a transaction interface that can be used by its clients to ensure automic put(Event) and take() semantics.

ChannelProcesoor

  • 前面我们了解到Source采集日志后会交给ChannelProcessor处理,so接下来我们从ChannelProcessor入手,其依赖如下组件:

    private final ChannelSelector selector;  //Channel选择器,.flume.ChannelSelector
    private final InterceptorChain interceptorChain; //拦截器链,.flume.interceptor.InterceptorChain
    private ExecutorService execService; //用于实现可选Channel的ExecutorService,默认是单线程实现 [注:这个我在某个博客上看到的,但这个组件我在ChannelProcessor中没有搜到]
  • 我们来看ChannelProcessor是如何处理Event的:

    // Attempts to put the given event into each configured channel
    public void processEvent(Event event) {

    event = interceptorChain.intercept(event); //首先进行拦截器链过滤,TODO:intercep...
    // InterceptorChain实现了Interceptor接口,调用a list of other Interceptors. 实现event的过滤和加工。具体见后面
    if (event == null) {
    return;
    }

    // Process required channels
    //通过Channel选择器获取必须成功处理的Channel,然后事务中执行.
    List<Channel> requiredChannels = selector.getRequiredChannels(event);
    for (Channel reqChannel : requiredChannels) {
    Transaction tx = reqChannel.getTransaction(); // 继承自Channel接口的类要实现getTransaction()方法,TODO:getTransaction
    Preconditions.checkNotNull(tx, "Transaction object must not be null");
    try {
    tx.begin(); //开始事务

    reqChannel.put(event); // 将event放到reqChannel

    tx.commit(); //提交事务
    } catch (Throwable t) {
    tx.rollback(); // 如果捕捉到throwable(including Error & Exception),则回滚事务
    if (t instanceof Error) {
    LOG.error("Error while writing to required channel: " + reqChannel, t);
    throw (Error) t;
    } else if (t instanceof ChannelException) {
    throw (ChannelException) t;
    } else {
    throw new ChannelException("Unable to put event on required " +
    "channel: " + reqChannel, t); //TODO: Channelexception可能会被handle,不然如何保证RequiredChannel的成功处理?
    }
    } finally {
    if (tx != null) {
    tx.close(); // 最后如果事务非空,还得关闭该事务
    }
    }
    }

    // Process optional channels
    //通过Channel选择器获取可选的Channel,这些Channel失败是可以忽略,不影响其他Channel的处理
    List<Channel> optionalChannels = selector.getOptionalChannels(event);
    for (Channel optChannel : optionalChannels) {
    Transaction tx = null;
    try {
    tx = optChannel.getTransaction();
    tx.begin();

    optChannel.put(event);

    tx.commit();
    } catch (Throwable t) {
    tx.rollback();
    LOG.error("Unable to put event on optional channel: " + optChannel, t);
    if (t instanceof Error) {
    throw (Error) t;
    }
    } finally {
    if (tx != null) {
    tx.close();
    }
    }
    }
    }
  • 看下flume内实现的channel类

    <Flume><Source Code><Flume源码阅读笔记>

Channel接口

public interface Channel extends LifecycleAware, NamedComponent {
// put() and get() must be invoked within an active Transaction boundary
public void put(Event event) throws ChannelException;
public Event take() throws ChannelException;
// @return: the transaction instance associated with this channel
public Transaction getTransaction();
}

AbstractChannel

  • abstract class AbstractChannel implements Channel, LifecycleAware, Configurable

  • 实现了lifecycleStatus的改变(在构造、start()和stop()方法中),实现了空configure()方法。没有做什么具体的channel相关的处理。

BasicChannelSemantics

  • 基本Channel语义的实现,包括Transaction类的thread-local语义的实现。

public abstract class BasicChannelSemantics extends AbstractChannel {

// 1. 事务使用ThreadLocal存储,保证事务线程安全
private ThreadLocal<BasicTransactionSemantics> currentTransaction
= new ThreadLocal<BasicTransactionSemantics>();

private boolean initialized = false;

protected void initialize() {} // 2. 进行一些初始化工作

// 3.提供给实现类(子类)的创建事务的回调
// 用于new Transaction对象,该对象必须继承自BasicTransactionSemantics
// 比如MemoryChannel覆盖了该方法,方法体内new了一个实例,该实例为其内私有类MemoryTransaction,该私有类继承了BasicTransactionSemantics。
// MemoryTransaction内部用两条双向并发阻塞队列LinkedBlockingDeque实现putList和takeList。具体的稍后看,会介绍到MemoryChannel TODO
protected abstract BasicTransactionSemantics createTransaction();

// 4. 往Channel中放Event,其直接委托给事务的put方法
// 确保该thread存在一个事务,然后将put方法委托给该线程的BasicTransactionSemantics实例
@Override
public void put(Event event) throws ChannelException {
// ThreadLocal<BasicTransactionSemantics>的实例currentTransaction
// 即取得当前线程的事务实例
BasicTransactionSemantics transaction = currentTransaction.get();
Preconditions.checkState(transaction != null,
"No transaction exists for this thread");
transaction.put(event);
}

// 5.从Channel获取Event,也是直接委托给事务的take方法实现
@Override
public Event take() throws ChannelException {
BasicTransactionSemantics transaction = currentTransaction.get();
Preconditions.checkState(transaction != null,
"No transaction exists for this thread");
return transaction.take();
}

@Override
public Transaction getTransaction() {
// 1. 如果channel is not ready, then 初始化该channel
if (!initialized) {
synchronized (this) {
if (!initialized) {
initialize();
initialized = true;
}
}
}

// 2. 如果当前线程没有open的事务(无事务或已关闭),则创建一个,并绑定到currentTransaction中
BasicTransactionSemantics transaction = currentTransaction.get();
if (transaction == null || transaction.getState().equals(
BasicTransactionSemantics.State.CLOSED)) {
transaction = createTransaction();
currentTransaction.set(transaction);
}
return transaction;
}
}

MemoryChannel

  • 当写入硬盘不实际或不需要数据持久化时,推荐使用。或在单元测试时使用。

  • 大部分channel会把put和take委托给事务去完成。

  • 纯内存的Channel实现,整个事务操作都是在内存中完成的。

  • 每个事务都有一个TakeList和PutList,分别用于存储事务相关的取数据和放数据,等事务提交时才完全同步到Channel Queue,或者失败把取数据回滚到Channel Queue。 TODO:整体理解何时commit、rollback。

    <Flume><Source Code><Flume源码阅读笔记>

    public class MemoryChannel extends BasicChannelSemantics {
    // TODO: about factory
    private static Logger LOGGER = LoggerFactory.getLogger(MemoryChannel.class);
    ...//一些常量定义:缺省值defaultCapacity、defaultTransCapacity、byteCapacitySlotSize..

    // 内部类,继承自BasicTransactionSemantics。TODO: About BasicTransactionSemantics
    private class MemoryTransaction extends BasicTransactionSemantics {
    // 每个事务都有两条双向并发阻塞队列,TODO: LinkedBlockingDeque
    private LinkedBlockingDeque<Event> takeList;
    private LinkedBlockingDeque<Event> putList;
    private final ChannelCounter channelCounter;
    ...//
    public MemoryTransaction(int transCapacity, ChannelCounter counter) {
    putList = new LinkedBlockingDeque<Event>(transCapacity);
    takeList = new LinkedBlockingDeque<Event>(transCapacity);

    channelCounter = counter;
    }

    // 将event放到putList中
    // 整个doPut操作相对来说比较简单,就是往事务putList队列放入Event,如果满了则直接抛异常回滚事务;否则放入putList暂存,等事务提交时转移到Channel Queue。另外需要增加放入队列的字节数计数器,以便之后做字节容量限制
    @Override
    protected void doPut(Event event) throws InterruptedException {
    // channelCounter是一个计数器,记录当前队列放入Event数、取出event数、成功数等。
    channelCounter.incrementEventPutAttemptCount(); // 增加放入event计数器
    // estimateEventSize计算当前Event body大小,ceil():向上取整
    int eventByteSize = (int) Math.ceil(estimateEventSize(event) / byteCapacitySlotSize);

    // 往事务队列的putList中放入Event,如果满了,则抛异常回滚事务
    if (!putList.offer(event)) {
    throw new ChannelException(
    "Put queue for MemoryTransaction of capacity " +
    putList.size() + " full, consider committing more frequently, " +
    "increasing capacity or increasing thread count");
    }
    putByteCounter += eventByteSize; // 增加放入队列字节数计数器
    }

    // 从Channel Queue中取event放到takeList中
    @Override
    protected Event doTake() throws InterruptedException {
    channelCounter.incrementEventTakeAttemptCount();
    // 如果takeList队列没有剩余容量,即当前事务已经消费了最大容量的Event
    if (takeList.remainingCapacity() == 0) {
    throw new ChannelException("Take list for MemoryTransaction, capacity " +
    takeList.size() + " full, consider committing more frequently, " +
    "increasing capacity, or increasing thread count");
    }
    // queueStored试图获取一个信号量,超时直接返回null
    if (!queueStored.tryAcquire(keepAlive, TimeUnit.SECONDS)) {
    return null;
    }
    Event event;
    // 从Channel Queue获取一个Event, 对Channel Queue的操作必须加queueLock
    synchronized (queueLock) {
    event = queue.poll();
    }
    // 因为信号量的保证,Channel Queue不应该返回null,出现了就不正常了
    Preconditions.checkNotNull(event, "Queue.poll returned NULL despite semaphore " +
    "signalling existence of entry");
    // 暂存到事务的takeList队列
    takeList.put(event);
    // 计算当前Event body大小并增加取出队列字节数计数器
    int eventByteSize = (int) Math.ceil(estimateEventSize(event) / byteCapacitySlotSize);
    takeByteCounter += eventByteSize;

    return event;
    }

    // 等事务提交时,才将当前事务的put list同步到Channel Queue
    @Override
    protected void doCommit() throws InterruptedException {
    // /1、计算改变的Event数量,即取出数量-放入数
    int remainingChange = takeList.size() - putList.size();
    if (remainingChange < 0) {
    // bytesRemaining是字节容量信号量,超出容量则回滚事务
    if (!bytesRemaining.tryAcquire(putByteCounter, keepAlive, TimeUnit.SECONDS)) {
    throw new ChannelException("Cannot commit transaction. Byte capacity " +
    "allocated to store event body " + byteCapacity * byteCapacitySlotSize +
    "reached. Please increase heap space/byte capacity allocated to " +
    "the channel as the sinks may not be keeping up with the sources");
    }
    if (!queueRemaining.tryAcquire(-remainingChange, keepAlive, TimeUnit.SECONDS)) {
    bytesRemaining.release(putByteCounter);
    throw new ChannelFullException("Space for commit to queue couldn't be acquired." +
    " Sinks are likely not keeping up with sources, or the buffer size is too tight");
    }
    }
    int puts = putList.size();
    int takes = takeList.size();
    synchronized (queueLock) {
    if (puts > 0) {
    while (!putList.isEmpty()) {
    if (!queue.offer(putList.removeFirst())) { // offer:添加一个元素并返回true
    throw new RuntimeException("Queue add failed, this shouldn't be able to happen");
    }
    }
    }
    putList.clear();
    takeList.clear();
    }
    bytesRemaining.release(takeByteCounter);
    takeByteCounter = 0;
    putByteCounter = 0;

    queueStored.release(puts);
    if (remainingChange > 0) {
    queueRemaining.release(remainingChange);
    }
    if (puts > 0) {
    channelCounter.addToEventPutSuccessCount(puts);
    }
    if (takes > 0) {
    channelCounter.addToEventTakeSuccessCount(takes);
    }

    channelCounter.setChannelSize(queue.size());
    }

    // 事务失败时,将take list数据回滚到Channel Queue
    // 在回滚时,需要把takeList中暂存的事件回滚到Channel Queue,并回滚queueStored信号量。
    @Override
    protected void doRollback() {
    int takes = takeList.size();
    synchronized (queueLock) {
    Preconditions.checkState(queue.remainingCapacity() >= takeList.size(),
    "Not enough space in memory channel " +
    "queue to rollback takes. This should never happen, please report");
    while (!takeList.isEmpty()) {
    queue.addFirst(takeList.removeLast());
    }
    putList.clear();
    }
    bytesRemaining.release(putByteCounter);
    putByteCounter = 0;
    takeByteCounter = 0;

    queueStored.release(takes);
    channelCounter.setChannelSize(queue.size());
    }

    }

    private Object queueLock = new Object();
    // 在操作Channel Queue时都需要锁定,因为Channel Queue可能动态扩容(会被重新new)。用法就是synchronized(queueLock){...操作queue}
    @GuardedBy(value = "queueLock") // 用@GuardedBy注解告诉维护者这个变量被哪个锁保护着
    private LinkedBlockingDeque<Event> queue; // 由一个Channel Queue存储整个Channel的Event数据

    // Semaphore可控制某资源可被同时访问的个数,acquire()获取一个许可,若无等待,而release()释放一个许可
    // queueRemaining表示可存储事件容量。在提交事务时增加或减少该信号量
    // 1. 首先在configure()函数中初始化为一个capacity大小的信号量
    // 2. 在resize的时候,如果要缩容则要看是否还能acquire到oldCapacity - capacity个许可,不能则不允许缩容(很合理啊,不然就丢失数据了)。若是扩容,则queueRemaining.release(capacity - oldCapacity)
    // 3. 提交事务时,如果takeList.size() < putList.size(),则要检查是否有足够的queueRemaining
    private Semaphore queueRemaining;

    // 表示ChannelQueue已存储事件容量
    // 2. 在configure()中初始化为一个大小为0的信号量
    // 3. 在doTake()时tryAcquire是否有许可
    // 4. 在commit()时release(puts)增加puts个许可
    // 5. 在rollback()时release(takes)个许可
    private Semaphore queueStored;

    // maximum items in a transaction queue
    private volatile Integer transCapacity;
    private volatile int keepAlive;
    private volatile int byteCapacity;
    private volatile int lastByteCapacity;
    private volatile int byteCapacityBufferPercentage;
    private Semaphore bytesRemaining;
    private ChannelCounter channelCounter;

    public MemoryChannel() {
    super();
    }

    @Override
    public void configure(Context context) {
    // Read parameters from context
    // capacity、transactionCapacity、byteCapacity、byteCapacityBufferPercentage...
    }

    // 因为多个事务要操作ChannelQueue,还要考虑ChannelQueue的扩容问题,因此MemoryChannel使用了锁来实现;而容量问题则使用了信号量来实现。
    // 改变queue的容量,是通过新建一个LinkedBlockingDeque来实现的,并将原queue的东西加进来。
    private void resizeQueue(int capacity) throws InterruptedException {
    int oldCapacity;
    // 计算原queue的capacity,注意该方法需加锁
    synchronized (queueLock) {
    oldCapacity = queue.size() + queue.remainingCapacity();
    }

    if (oldCapacity == capacity) {
    return;
    } else if (oldCapacity > capacity) {
    // tryAcquire():从该信号量中获取指定数量的许可
    //首先要预占老容量-新容量的大小,以便缩容容量。如果获取失败,默认是记录日志,然后忽略
    if (!queueRemaining.tryAcquire(oldCapacity - capacity, keepAlive, TimeUnit.SECONDS)) {
    LOGGER.warn("Couldn't acquire permits to downsize the queue, resizing has been aborted");
    } else {
    //否则,直接缩容,然后复制老Queue的数据,缩容时需要锁定queueLock,这一系列操作要线程安全
    synchronized (queueLock) {
    LinkedBlockingDeque<Event> newQueue = new LinkedBlockingDeque<Event>(capacity);
    newQueue.addAll(queue);
    queue = newQueue;
    }
    }
    }

Interceptor

  • flume内部实现了很多自定义的Interceptor,如下图:

    <Flume><Source Code><Flume源码阅读笔记>

  • 同时还实现了InterceptorChain用来链式处理event。

InterceptorChain

  • Implementation of Interceptor that calls a list of other Interceptors

  • Interptor接口: 用于过滤、加工Event,然后返回一个新的Event。

  • 相比之下,InterceptorChain就是对event逐个(链式)调用其内的Interceptor(接口子类)实例的各个方法。

    public class InterceptorChain implements Interceptor {

    // list of interceptors that will be traversed, in order
    private List<Interceptor> interceptors;

    public InterceptorChain() {
    interceptors = Lists.newLinkedList(); // 构造方法,type LinkedList
    }
    public void setInterceptors(List<Interceptor> interceptors) {
    this.interceptors = interceptors; // set方法
    }

    // Interceptor接口的intercept方法: Interception of a single Event.事件拦截
    // @return: Original or modified event, or null if the Event is to be dropped.
    @Override
    public Event intercept(Event event) {
    for (Interceptor interceptor : interceptors) {
    if (event == null) {
    return null;
    }
    event = interceptor.intercept(event); // 注意:该类的实例会调用上面的set方法初始化intercptors,其中的intercptor是Interceptor接口子类的实例。所以这里的intercept()方法调用的是Interceptor的某个接口所覆盖的方法。[Interceptor有很多子类,下面有一个demo子类的分析,可以往下看HostInterceptor]
    }
    return event;
    }

    // Interceptor接口: Interception of a batch of events
    // @return: Output list of events
    @Override
    public List<Event> intercept(List<Event> events) {
    ... // 基本同上面的方法,不过调用的是interceptor.intercept(events);
    }

    // Interceptor: Any initialization / startup needed by the Interceptor.
    @Override
    public void initialize() {
    Iterator<Interceptor> iter = interceptors.iterator();
    while (iter.hasNext()) {
    Interceptor interceptor = iter.next();
    interceptor.initialize(); // 挨个对linkedlist中的interceptor实例进行initialize
    }
    }

    @Override
    public void close() {
    ...// 挨个对linkedlist中的interceptor实例进行close
    }

HostInterceptor

  • implements Interceptor

  • 功能:在所有拦截的events的header中上加上本机的host name或IP

    public class HostInterceptor implements Interceptor {
    ... // 一些private变量
    /**
    * Only {@link HostInterceptor.Builder} can build me
    */
    // private的构造方法,so只能通过下面的静态方法Builder实例化
    private HostInterceptor(boolean preserveExisting,
    boolean useIP, String header) {
    // 用xx.conf内的值初始化这些变量
    this.preserveExisting = preserveExisting;
    this.header = header;
    InetAddress addr;
    try {
    addr = InetAddress.getLocalHost(); //Returns the address of the local host.
    if (useIP) {
    //Returns the IP address string in textual presentation
    host = addr.getHostAddress();
    } else {
    // Gets the fully qualified domain name for this IP address.
    host = addr.getCanonicalHostName();
    }
    } catch (UnknownHostException e) {
    logger.warn("Could not get local host address. Exception follows.", e);
    }

    }

    @Override
    public void initialize() {
    // no-op
    }

    /**
    * Modifies events in-place.
    */
    @Override
    public Event intercept(Event event) {
    Map<String, String> headers = event.getHeaders();

    // 如果要要保存当前的'host‘值并且当前已有头部,那么就不处理直接返回。
    if (preserveExisting && headers.containsKey(header)) {
    return event;
    }
    if (host != null) {
    headers.put(header, host); //将host添加到头部
    }

    return event;
    }

    @Override
    public List<Event> intercept(List<Event> events) {
    ... // 为events中的每一个event调用intercept(Event event)
    }

    @Override
    public void close() {
    // no-op
    }

    /**
    * Builder which builds new instances of the HostInterceptor.
    */
    public static class Builder implements Interceptor.Builder {

    private boolean preserveExisting = PRESERVE_DFLT;
    private boolean useIP = USE_IP_DFLT;
    private String header = HOST;

    @Override
    public Interceptor build() {
    return new HostInterceptor(preserveExisting, useIP, header);
    }

    @Override
    public void configure(Context context) {
    preserveExisting = context.getBoolean(PRESERVE, PRESERVE_DFLT);
    useIP = context.getBoolean(USE_IP, USE_IP_DFLT);
    header = context.getString(HOST_HEADER, HOST);
    }
    }

    public static class Constants {
    public static String HOST = "host";
    ... // 一些配置的缺省值
    }
    }
  • demo Usage in xx.conf: more details see User Guide

    agent.sources.r1.interceptors = i1
    agent.sources.r1.interceptors.i1.type = host
    # preserveExisting: 是否保存当前已存在的'host'值,缺省是不保存
    agent.sources.r1.interceptors.i1.preserveExisting = true
    agent.sources.r1.interceptors.i1.useIP = false
    agent.sources.r1.interceptors.i1.hostHeader = hostname

Selector

  • 先上一张所有selector的继承关系图

    <Flume><Source Code><Flume源码阅读笔记>

    可见ChannelSelector默认提供了两种实现:复制和多路复用。默认实现是ReplicatingChannelSelector。

ChannelSelector

  • interface

  • 基于不同实现政策,允许在channels的集合中选取channels子集。

    // NamedComponent接口:用于给component附加一个名字,包括setName()和getName()方法
    public interface ChannelSelector extends NamedComponent, Configurable {

    // @param channels:all channels the selector could select from.
    public void setChannels(List<Channel> channels);

    /**
    * Returns a list of required channels. 这些channels的写入失败会传达回接收事件的source.
    * @param: event
    * @return: the list of required channels that this selector has selected for
    * the given event.
    */
    public List<Channel> getRequiredChannels(Event event);
    /**
    * Returns a list of optional channels. 这些channels的写入失败会被忽略。
    * @param: event
    * @return: the list of optional channels that this selector has selected for
    * the given event.
    */
    public List<Channel> getOptionalChannels(Event event);

    /**
    * @return the list of all channels that this selector is configured to work
    * with.
    */
    public List<Channel> getAllChannels();
    } ​
    ## AbstractChannelSelector

    * abstract class

    ```java
    public abstract class AbstractChannelSelector implements ChannelSelector {

    private List<Channel> channels;
    private String name; ...// override ChannelSelctor的getAllChannels()、setChannels(List<Channel> channels)、setName(String name)、getName()方法。 //@return: A map of name to channel instance.
    protected Map<String, Channel> getChannelNameMap() {
    Map<String, Channel> channelNameMap = new HashMap<String, Channel>();
    for (Channel ch : getAllChannels()) {
    // 对每一个Channel, 将Channel和其名字放到HashMap中
    channelNameMap.put(ch.getName(), ch);
    }
    return channelNameMap;
    }

    /**
    * Given a list of channel names as space delimited string,
    * returns list of channels.
    * @return List of {@linkplain Channel}s represented by the names.
    */
    // 根据(space分隔的channel名字的)字符串, 返回相应的channel,利用名字-channel的HashMap
    protected List<Channel> getChannelListFromNames(String channels,
    Map<String, Channel> channelNameMap) {
    List<Channel> configuredChannels = new ArrayList<Channel>();
    if (channels == null || channels.isEmpty()) { // 判空
    return configuredChannels;
    }
    String[] chNames = channels.split(" ");
    for (String name : chNames) {
    Channel ch = channelNameMap.get(name);
    if (ch != null) {
    configuredChannels.add(ch);
    } else {
    throw new FlumeException("Selector channel not found: "
    + name);
    }
    }
    return configuredChannels;
    }

    }

ReplicatingChannelSelector

  • ChannelSelector的一个具体实现,即把接收到的消息复制到每一个Channel。【与之对应的,MultiplexingChannelSelector会根据 Event Header 中的参数进行选择,以此来选择使用哪个 Channel】

  • Replicating channel selector. 允许event被放置到source所配置的所有channels中。

  • 实际的实现方式是,默认将所有channel加入requiredChannels中,optionalChannels为空。然后根据配置的"optional"将该配置对应的channel加入optionalChannels,并从requiredChannels中移除(添加和移除是在configure方法中实现的)。 TODO:看一下这个配置如何实现

    public class ReplicatingChannelSelector extends AbstractChannelSelector {

    // Configuration to set a subset of the channels as optional.
    public static final String CONFIG_OPTIONAL = "optional";
    List<Channel> requiredChannels = null; // 在configure()中被设置为getAllChannels()的返回值,即所有配置的channels
    List<Channel> optionalChannels = new ArrayList<Channel>();

    @Override
    public List<Channel> getRequiredChannels(Event event) {
    /*
    * Seems like there are lot of components within flume that do not call
    * configure method. It is conceiveable that custom component tests too
    * do that. So in that case, revert to old behavior.
    */
    // 如果component没有调用configure(),那么requiredChannels为null,此时再调用一次。
    // TODO: configure()方法是在哪里调用的? 同样的问题在很多class中都存在
    if (requiredChannels == null) {
    return getAllChannels();
    }
    return requiredChannels;
    }

    @Override
    public List<Channel> getOptionalChannels(Event event) {
    return optionalChannels;
    }

    @Override
    public void configure(Context context) {
    String optionalList = context.getString(CONFIG_OPTIONAL);
    requiredChannels = new ArrayList<Channel>(getAllChannels());
    Map<String, Channel> channelNameMap = getChannelNameMap();
    // 根据OptionList(String, 是空格分隔的channel名字),得到相应的Channel,并将channel放到optionalChannel&& 从requiredChannels中移除。
    if (optionalList != null && !optionalList.isEmpty()) {
    for (String optional : optionalList.split("\\s+")) {
    Channel optionalChannel = channelNameMap.get(optional);
    requiredChannels.remove(optionalChannel);
    if (!optionalChannels.contains(optionalChannel)) {
    optionalChannels.add(optionalChannel);
    }
    }
    }
    }
    }

Sink

Sink Runner

  • A driver for sinks that polls them, attempting to process events if any are available in the Channel. All sinks are polled.

    public class SinkRunner implements LifecycleAware {
    private PollingRunner runner; // 内部类,实现了Runnable接口
    private SinkProcessor policy; //
    }

Sink Processor

<Flume><Source Code><Flume源码阅读笔记>

  • 分为两类:

    • DefaultSinkProcessor处理单sink,直接传送不附加任何处理。

      public void start() {
      Preconditions.checkNotNull(sink, "DefaultSinkProcessor sink not set");
      sink.start(); // start()方法直接启动single sink
      lifecycleState = LifecycleState.START;
      }
      // stop()方法类似,configure()方法为空
      public Status process() throws EventDeliveryException {
      return sink.process(); // 直接调用sink的process()
      }
      public void setSinks(List<Sink> sinks) {
      Preconditions.checkNotNull(sinks);
      Preconditions.checkArgument(sinks.size() == 1, "DefaultSinkPolicy can "
      + "only handle one sink, "
      + "try using a policy that supports multiple sinks");
      sink = sinks.get(0);
      }
    • 多sink处理(AbstractSinkProcessor),其中又包括两种:

      • FailoverSinkProcessor:故障切换—>通过维持一个sinks的优先级list —> 把故障sinks降级放到一个pool中被赋予一个冷冻周期。必须先调用setSinks()再configure()

        public void setSinks(List<Sink> sinks) {
        // needed to implement the start/stop functionality
        super.setSinks(sinks);

        this.sinks = new HashMap<String, Sink>();
        for (Sink sink : sinks) {
        this.sinks.put(sink.getName(), sink);
        }
        }
        private Sink moveActiveToDeadAndGetNext() {
        Integer key = liveSinks.lastKey();
        failedSinks.add(new FailedSink(key, activeSink, 1)); // 把当前liveSinks里的第一优先级key移除到failedSinks中
        liveSinks.remove(key);
        if (liveSinks.isEmpty()) return null;
        if (liveSinks.lastKey() != null) {
        return liveSinks.get(liveSinks.lastKey());
        } else {
        return null;
        }
        }
        ...
      • LoadBalancingSinkProcessor: 提供在多个sinks之间负载均衡的能力—> 维持一个active sinks的索引序列(load需分布在这些sinks上) —> 算法包括ROUND_ROBIN(default)和RANDOM选择机制。

        内部通过一个private interface SinkSelector实现。该接口下实现了两个私有静态类RoundRobinSinkSelectorRandomOrderSinkSelector.