Flume-NG启动过程源码分析(二)(原创)

时间:2022-07-21 08:06:55

  在上一节中讲解了——Flume-NG启动过程源码分析(一)(原创)  本节分析配置文件的解析,即PollingPropertiesFileConfigurationProvider.FileWatcherRunnable.run中的eventBus.post(getConfiguration())。分析getConfiguration()方法。此方法在AbstractConfigurationProvider类中实现了,并且这个类也初始化了三大组件的工厂类:this.sourceFactory = new DefaultSourceFactory();this.sinkFactory = new DefaultSinkFactory();this.channelFactory = new DefaultChannelFactory()。

  getConfiguration()的具体代码如下:  

public MaterializedConfiguration getConfiguration() {
MaterializedConfiguration conf = new SimpleMaterializedConfiguration();//三大组件
//加载配置文件,PropertiesFileConfigurationProvider中,解析配置文件,得出代理名字,sources...,各个配置属性和值
FlumeConfiguration fconfig = getFlumeConfiguration();
AgentConfiguration agentConf = fconfig.getConfigurationFor(getAgentName());//配置文件
if (agentConf != null) {
Map<String, ChannelComponent> channelComponentMap = Maps.newHashMap();
Map<String, SourceRunner> sourceRunnerMap = Maps.newHashMap();
Map<String, SinkRunner> sinkRunnerMap = Maps.newHashMap();
try {
loadChannels(agentConf, channelComponentMap);
loadSources(agentConf, channelComponentMap, sourceRunnerMap);
loadSinks(agentConf, channelComponentMap, sinkRunnerMap);
Set<String> channelNames =
new HashSet<String>(channelComponentMap.keySet());
for(String channelName : channelNames) {
ChannelComponent channelComponent = channelComponentMap.
get(channelName);
if(channelComponent.components.isEmpty()) {
LOGGER.warn(String.format("Channel %s has no components connected" +
" and has been removed.", channelName));
channelComponentMap.remove(channelName);
Map<String, Channel> nameChannelMap = channelCache.
get(channelComponent.channel.getClass());
if(nameChannelMap != null) {
nameChannelMap.remove(channelName);
}
} else {
LOGGER.info(String.format("Channel %s connected to %s",
channelName, channelComponent.components.toString()));
conf.addChannel(channelName, channelComponent.channel);
}
}
for(Map.Entry<String, SourceRunner> entry : sourceRunnerMap.entrySet()) {
conf.addSourceRunner(entry.getKey(), entry.getValue());
}
for(Map.Entry<String, SinkRunner> entry : sinkRunnerMap.entrySet()) {
conf.addSinkRunner(entry.getKey(), entry.getValue());
}
} catch (InstantiationException ex) {
LOGGER.error("Failed to instantiate component", ex);
} finally {
channelComponentMap.clear();
sourceRunnerMap.clear();
sinkRunnerMap.clear();
}
} else {
LOGGER.warn("No configuration found for this host:{}", getAgentName());
}
return conf;
}

  1、SimpleMaterializedConfiguration对象构造了三大组件

 public SimpleMaterializedConfiguration() {
channels = new HashMap<String, Channel>();
sourceRunners = new HashMap<String, SourceRunner>();
sinkRunners = new HashMap<String, SinkRunner>();
}

  2、getFlumeConfiguration()方法是在AbstractConfigurationProvider的子类PropertiesFileConfigurationProvider中实现了。这个方法读取配置文件,然后解析成name(输姓名全称,即等号左侧的全部)、value(等号的右侧)对,存入一个Map当中,返回一个封装了这个Map的FlumeConfiguration对象。FlumeConfiguration类的构造函数会遍历这个Map的所有<name,value>对,调用addRawProperty(String name, String value)处理<name,value>对,如果为false就忽略了。遍历完后调用validateConfiguration()来验证和删除配置不当组件。

  一、addRawProperty方法会先做一些合法性检查,首次启动Flume会构造一个AgentConfiguration对象aconf,然后agentConfigMap.put(agentName, aconf),以后动态加载配置文件时只需要AgentConfiguration aconf = agentConfigMap.get(agentName)就可以得到,然后调用aconf.addProperty(configKey, value)处理,configKey是配置文件中等号左侧去掉agent名字和点之后的内容。agentConfigMap是封装了该agent和其所有组件的配置信息的一个Map。    

  (1)addProperty(configKey, value)方法首先会依次判断是否是sources、sinks、channels、sinkgroups四大组件的总配,不允许重复,将对应的value赋值给这四个String类型的对象。例如:

    caiji-agent.sources = log-source
    caiji-agent.sinks = avro-sink16 avro-sink15
    caiji-agent.channels = mem-channel
    caiji-agent.sinkgroups = sg

  ps:以上是举例,到这一步时其实已经没了"caiji-agent."这个字段了。举一个具体的匹配例子,其他3个和这个相同只是匹配内容不同而已,代码如下:  

  if (key.equals(BasicConfigurationConstants.CONFIG_SOURCES)) {  //等于sources,在此是配置组件名称时
if (sources == null) {
sources = value;
return true;
} else { //重复指定source
logger
.warn("Duplicate source list specified for agent: " + agentName);
errorList.add(new FlumeConfigurationError(agentName,
BasicConfigurationConstants.CONFIG_SOURCES,
FlumeConfigurationErrorType.DUPLICATE_PROPERTY,
ErrorOrWarning.ERROR));
return false;
}
}

  BasicConfigurationConstants.CONFIG_SOURCES的值是sources。

  (2)、addProperty(configKey, value)中如果configKey参数均不是上述四大组件总配,则是具体的单个组件的详细参数配置。则调用parseConfigKey(String key, String prefix)方法来判断解析sources、sinks、channels、sinkgroups的具体组件,还是举一个代码例子,其他3个和此相同:

  ComponentNameAndConfigKey cnck = parseConfigKey(key,
BasicConfigurationConstants.CONFIG_SOURCES_PREFIX); if (cnck != null) {
// it is a source
String name = cnck.getComponentName();
Context srcConf = sourceContextMap.get(name); //这个map,key是组件名字,value是其相应的配置属性context if (srcConf == null) {
srcConf = new Context();
sourceContextMap.put(name, srcConf);//j将组件放入map
} srcConf.put(cnck.getConfigKey(), value); //将属性名和对应的值放入context
return true;
}

  BasicConfigurationConstants.CONFIG_SOURCES_PREFIX的值是"sources.",注意这个带点。另外,这里还会构造对应四大组件的四个存储配置信息的Map<String, Context>  :sourceContextMap、channelContextMap、sinkContextMap和sinkGroupContextMap,这四个Map分别存储对应的总配信息中指定个数的组件的对应配置信息,比如上述总配信息中,sourceContextMap、channelContextMap和sinkGroupContextMap依次存储着<log-source,log-source的配置信息>、<mem-channel,mem-channel的配置信息>、<sg,sg的配置信息>以及sinkContextMap的<avro-sink16,avro-sink16的配置信息>、<avro-sink15,avro-sink15的配置信息>。

  parseConfigKey(String key, String prefix)中的prefix用来鉴别是什么类型的组件。这个方法返回一个封装了解析后的(name是组件名称, configKey对应的组件属性名称)的对象。

  二、validateConfiguration()来验证和删除配置不当组件。此方法会遍历agentConfigMap中的每个agent,判断agent对应的配置文件是否合法aconf.isValid(),不合法就从agentConfigMap中删除这个agent。aconf.isValid()是AgentConfiguration.isValid()方法。

  (1)、会先判断channels是否为空,不为空的话,就判断channels组件集channelSet是否合法channelSet = validateChannels(channelSet)。validateChannels就是遍历所有的channel:先判断是否有对应的配置信息context,没有的话就删除组件;有对应配置信息的话,再判断是否是flume内置的type类型,还是自定义的,如果是自定义的还要判断是否有外部的config配置类,如果没有有配置config参数指定外部配置类,则自定义的type会自动设置为OTHER,否则config设置为指定的外部配置类。

  构造ChannelConfiguration对象conf(这个类继承自ComponentConfiguration)=ComponentConfigurationFactory.create(channelName, config, ComponentType.CHANNEL),其中config指的是配置类,如果配置了就会根据配置类进行初始化返回一个配置类对象,此时不会设置conf.isNotFoundConfigClass();如果没有配置config参数,默认的类型及自定义的类型都会爆异常,在异常处理时,则返回的都是一个instance = ChannelConfiguration(name)对象且内置类型会instance.setNotFoundConfigClass(),因为内置的channel也没实现配置类,自定义的类型不会设置conf.isNotFoundConfigClass()。

  然后conf.configure(channelContext)执行配置,在这说明内置的channel通过封装使得Context配置信息变成ComponentConfiguration配置信息;如果是自定义的类型且有外部配置类即有config参数时,会将这个channel对象及配置信息放入channelConfigMap中;否则,没有config参数的channel对象,包括内置的以及自定义没有外围配置类config的,存放在newContextMap中。然后是重置channelContextMap = newContextMap,重置的目的是为了以后分类对内置和自定义的channel分别做处理,这在后面要讲的loadChannels时会用到。然后返回的内容是channelContextMap 和channelConfigMap中的key值的综合与channelSet的交集部分。

  validateChannels(channelSet)方法会最终返回一个两个set(一个是从总配中解析出来的;另外一个是从channelContextMap中解析出来的,后者对应配置文件的实际配置信息,因为存在:1、在总配中声明组件但在具体配置时没有配置;2、没有在总配中声明的组件,但在具体配置时有详细配置信息)的交集。

  (2)、validateSources(channelSet)和(1)的大体思路是一样的,只不过需要获取在总配中的sources和每个source的channels(在这可能指定多个channel)的交集,并将交集重新配置:srcContext.put(BasicConfigurationConstants.CONFIG_CHANNELS,this.getSpaceDelimitedList(channels));//将set转化为String

  另外在srcConf.configure(srcContext)中除了获取该source的channels之外,还会对selector(默认是REPLICATING)进行配置。需要时再讲。channelSet(通过检查合格的channel,活动的channel)作为参数传进来的目的是要配置文件中source对应的channels取交集,除去在配置文件中无效的channel。

  方法的返回值的思路参考(1)。  值得注意的是,从代码可以看出每个agent可以配置多个source。证据在此:Set<String> sourceSet =new HashSet<String>(Arrays.asList(sources.split("\\s+")))。sourceSet的组成也分两部分:一部分是有指定config外部配置的sourceConfigMap只可能是自定义的组件非内置的,另一部分是没有指定参数config的source,后者可能包括自定义的以及内置的。

  (3)、validateSinks(channelSet)和(2)基本类似,只不过每个sink只能配一个channel,不像source可以有多个”爱妾“,所有在这只需要判断sink的channel是否包含在channelSet之中:channelSet.contains(sinkConf.getChannel()),不包含就异常退出。其他可参考(1)的思路。返回的sinkSet也分为两部分可参考(2)中返回的sourceSet的两部分组成。

  (4)、validateGroups(sinkSet)遍历所有sinkgroups,获取合法的sink,并将正确的sinkgroupConfigMap.put(sinkgroupName, conf)。

conf =(SinkGroupConfiguration) ComponentConfigurationFactory.create(sinkgroupName, "sinkgroup", ComponentType.SINKGROUP)已不像上述三种组件有外围的配置类,sinkgroup没有自定义的功能,也就没有指定外围配置类的功能,所以是固定的"sinkgroup",该方法返回的是SinkGroupConfiguration(name)。conf.configure(context)会获取配置文件中的sinkgroups,并对processor的配置类进行配置。

validGroupSinks(sinkSet, usedSinks, conf)方法删除不符合条件的:1、和已知的其他sinkgroup有相同的sink;2、使用了非活动的sink,满足这俩种任何一个对应的sink将会被删除。解析出来的sinkgroup(有可能为null,可能少一个或者多个,又或者都满足)都同一存入sinkgroupConfigMap,和其他三个组件有所不同,其他三个组件的ConfigMap都是存放自定义且有外围实现配置类的。

  以上4个组件均可以在总配中配置多项,且上述4个方法的返回值均是符合要求的组件,去除了声明但是没配置的以及配置但没声明等组件。

  (5)、再判断返回的sink和source是否为空。

  (6)、将合法的四个组件均转换为String。getSpaceDelimitedList(Set<String> entries)就是将set转换为String。

     this.sources = getSpaceDelimitedList(sourceSet);
this.channels = getSpaceDelimitedList(channelSet);
this.sinks = getSpaceDelimitedList(sinkSet);
this.sinkgroups = getSpaceDelimitedList(sinkgroupSet);

  (7)、最终返回true。validateConfiguration()中若返回的是false则删除此agent。

  这样就完成了FlumeConfiguration对象的构造,本文开始的2步骤中getFlumeConfiguration()也得到了。

  3、loadChannels(agentConf, channelComponentMap)方法。该方法首先是会先保存旧的channels,从channelCache拷贝到channelsNotReused暂存。然后是获取channelNames=agentConf.getChannelSet()和compMap=agentConf.getChannelConfigMap(),前者是所有的channel的名字,后者是所有chennel对象中有在配置文件中配置config参数,即外部配置类的channel配置信息,所以Configurables.configure(channel, comp)会对有config外围配置类的进行配置;           agentConf.getChannelContext().get(chName)则是获取没有外围配置类(没有configcan参数,包括自定义的及内置的)的channel,自定义的必须实现Configurable接口,所以Configurables.configure(channel, context)会起作用对自定义的channel进行配置,我们再自定义或者在看源码时看到的configure(context)方法会在此时调用。在上述两次Configurables.configure分别会调用一次getOrCreateChannel方法,该方法除了返回指定的channel之外,还会将和channelsNotReused内同名的channel删除,这样保证channelsNotReused中是没有重新使用的channel,使得最后从channelCache 删除。但channelCache中可能还存在已失效的channel,因此需要根据channelsNotReused剩余的从channelCache中全部删除,可使channelCache中缓存的就是正在使用的channel。因为channelSet可能会包含两种:一种就是内置的;一种就是自定义的。所以就分两种类型初始化并配置。  

  channelComponentMap则是所有(包括内置和自定义(如果有的话))channel的配置信息。一般来说,自定义channel很少,内置的channel类型能满足绝大说的情况。

  由此,我们也可以看出外部的配置类至少需要具备以下几个条件:一、必须有configure(ComponentConfiguration conf)方法;二、实现ConfigurableComponent接口;三、必须有String类型做参数的构造方法。

  ChannelComponent类用来channel(对象)及其对应的sink和source(名字)。是channelComponentMap中value,key是channel的名字。

  channelsNotReused存在的意义就是当动态加载的时候能够清除channelCache中不在新的配置文件中的channel。

  4、loadSources(agentConf, channelComponentMap, sourceRunnerMap)方法。loadSources和loadChannels有一些相似。

  首先会获取getSourceSet()source集合,以及getSourceConfigMap()有外部配置类的channel,然后遍历soureSet对有外部配置类的source创建对应的source对象,并执行source的configure(context)方法进行配置;然后对每个source获取对应的channels,兵构造ChannelProcessor,执行ChannelProcessor.configure方法;根据source的类型构造SourceRunner。

public static SourceRunner forSource(Source source) {
SourceRunner runner = null; if (source instanceof PollableSource) {
runner = new PollableSourceRunner();
((PollableSourceRunner) runner).setSource((PollableSource) source);
} else if (source instanceof EventDrivenSource) {
runner = new EventDrivenSourceRunner();
((EventDrivenSourceRunner) runner).setSource((EventDrivenSource) source);
} else {
throw new IllegalArgumentException("No known runner type for source "
+ source);
} return runner;
}

  从上述代码可以看到source有两种,一种是实现PollableSource的就构造PollableSourceRunner;另外一种是实现EventDrivenSource接口的,就构造EventDrivenSourceRunner。两种的区别,在这里中有说明。然后将source对应的ChannelComponent加入:channelComponent.components.add(sourceName)。

  其次是getSourceContext(),对没有外部配置类的source进行加载。Configurables.configure(source, context)将会调用source.configure(context)对source自身进行配置。其它和上面基本相同。

  参数sourceRunnerMap则是保存了所有soure执行的方式。

  5、loadSinks(agentConf, channelComponentMap, sinkRunnerMap)方法。加载sink的过程中也是分两部分:

  首先是对有外部配置类的sink,先构造Sink对象,然后对其调用sink.configure方法进行参数配置;然后检查此sink是否有channel相连,接着将sinkName与sink一同加入sinks,并对相应的channel增加组件信息channelComponent.components.add(sinkName)。

  其次就是对没有指定外部配置类的sink进行和上述同样的操作,只不过Configurables.configure(sink, context)是调用的source的configure执行参数配置。

  最后调用loadSinkGroups(agentConf, sinks, sinkRunnerMap)

  6、loadSinkGroups(agentConf, sinks, sinkRunnerMap)方法,用于加载sinkGroups。sinkgroup与sink、source、channel不同,没有外部配置类,故只有getSinkGroupConfigMap()。加载sinkGroup方法首先会遍历所有的sinkgroup,获取每个sinkGroup对应的sink,然后构造SinkGroup对象,并对其进行参数配置Configurables.configure(group, groupConf),sinkRunnerMap.put(comp.getComponentName(),new SinkRunner(group.getProcessor()))这句代码则是将sinkgroup放入sinkRunnerMap,group.getProcessor()是获取processor的类型(null、org.apache.flume.sink.FailoverSinkProcessor容错、org.apache.flume.sink.DefaultSinkProcessor默认、org.apache.flume.sink.LoadBalancingSinkProcessor负载均衡,这四种之一)。

  然后对所有的sink遍历,如果sink没有参与sinkgroup则使用默认DefaultSinkProcessor,构造SinkProcessor对象,对SinkProcessor进行参数配置

Configurables.configure(pr, new Context())然后加入sinkRunnerMap.put(entry.getKey(),new SinkRunner(pr))。

  7、检查每个channel的channelComponent.components是否为空,为空则表明没有和这个channel相连接的组件应该删除。否则将所有的channel、SourceRunner、SinkRunner加入1中的MaterializedConfiguration对象。  

    ...
    conf.addChannel(channelName, channelComponent.channel);
    ...
    for(Map.Entry<String, SourceRunner> entry : sourceRunnerMap.entrySet()) {
conf.addSourceRunner(entry.getKey(), entry.getValue());
}
for(Map.Entry<String, SinkRunner> entry : sinkRunnerMap.entrySet()) {
conf.addSinkRunner(entry.getKey(), entry.getValue());
}

  由代码可知,将解析出来的都存储进MaterializedConfiguration的三大组件,即1中的三大组件。source与channel的对应关系存储在SourceRunner中的source中的channelProcessor中的selector中。sink与channel的关系在sink.setChannel(channelComponent.channel)设定。

  8、清空channelComponentMap、sourceRunnerMap、sinkRunnerMap。

  9、返回MaterializedConfiguration对象conf。

  

  接下来就返回到PollingPropertiesFileConfigurationProvider.FileWatcherRunnable.run()方法中的eventBus.post(getConfiguration())。会通知Application.handleConfigurationEvent(MaterializedConfiguration conf)方法,下一篇再讲这个。

总结:1、上面有指定外部配置类的必定是自定义的组件;2、每个配置文件可以配置多个agent,用命令选择使用哪个,每个agent可以配置多个source、多个sink、多个channel、多个sinkgroups,但是每个sink只能对应一个sink、每个source可以对应多个channel、每个channel可以对应多个sink也可以对应多个source。

问题:1、为什么只有channel有channelCache,source没有sourceCache,sink没有sinkCache??

   答:但是loadChannels方法的最后会将不再重复用的channel从channelCache中删除,每次调用loadChannels方法都会尝试去删除不再重用的channel,我认为是channel相对于其他组件比较少定制,变化也少,缓存后当重新加载配置文件时可以立即从缓存中获取channel(如果有的话)这样可以节省一些时间。source和sink都是易变的组件因此每次都重新加载。(这样是不是有点牵强,还是我理解错了?)

getConfiguration()这一个方法涉及数千行代码,花费时间颇多,涉及的变量非常多,而且不容易串联,上面所讲肯定有不妥之处,望大伙指正。后续仍会再详细阅读不断修改这篇文章。