1)先看一下整个start方法构成
public void start() throws Throwable { logger.info("## start the canal server[{}:{}]", ip, port); // 创建整个canal的工作节点 final String path = ZookeeperPathUtils.getCanalClusterNode(ip + ":" + port); initCid(path); if (zkclientx != null) { this.zkclientx.subscribeStateChanges(new IZkStateListener() { public void handleStateChanged(KeeperState state) throws Exception { } public void handleNewSession() throws Exception { initCid(path); } @Override public void handleSessionEstablishmentError(Throwable error) throws Exception { logger.error("failed to connect to zookeeper", error); } }); } // 优先启动embeded服务 embededCanalServer.start(); // 尝试启动一下非lazy状态的通道 for (Map.Entry<String, InstanceConfig> entry : instanceConfigs.entrySet()) { final String destination = entry.getKey(); InstanceConfig config = entry.getValue(); // 创建destination的工作节点 if (!embededCanalServer.isStart(destination)) { // HA机制启动 ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination); if (!config.getLazy() && !runningMonitor.isStart()) { runningMonitor.start(); } } if (autoScan) { instanceConfigMonitors.get(config.getMode()).register(destination, defaultAction); } } if (autoScan) { instanceConfigMonitors.get(globalInstanceConfig.getMode()).start(); for (InstanceConfigMonitor monitor : instanceConfigMonitors.values()) { if (!monitor.isStart()) { monitor.start(); } } } // 启动网络接口 canalServer.start(); }
1.1)path =/otter/canal/cluster/172.20.0.20:11111
1.2)initcid(path) 因为本地没有zookeeper 所以该方法,可暂时不用解释
private void initCid(String path) { // logger.info("## init the canalId = {}", cid); // 初始化系统目录 if (zkclientx != null) { try { zkclientx.createEphemeral(path); } catch (ZkNoNodeException e) { // 如果父目录不存在,则创建 String parentDir = path.substring(0, path.lastIndexOf('/')); zkclientx.createPersistent(parentDir, true); zkclientx.createEphemeral(path); } catch (ZkNodeExistsException e) { // ignore // 因为第一次启动时创建了cid,但在stop/start的时可能会关闭和新建,允许出现NodeExists问题s } } }1.3)
// 优先启动embeded服务 embededCanalServer.start();
实现:
public void start() { if (!isStart()) { super.start(); canalInstances = MigrateMap.makeComputingMap(new Function<String, CanalInstance>() { public CanalInstance apply(String destination) { return canalInstanceGenerator.generate(destination); } }); // lastRollbackPostions = new MapMaker().makeMap(); } }
调用父类start方法,
protected volatile boolean running = false; // 是否处于运行中
public void start() { if (running) { throw new CanalException(this.getClass().getName() + " has startup , don't repeat start"); } running = true; }
初始化canalInstances 为null,size为0
解释: 本地embda 没有任何启动,只是初始化了一些数据
1.4)遍历集合中实例, 进行启动
for (Map.Entry<String, InstanceConfig> entry : instanceConfigs.entrySet()) { final String destination = entry.getKey(); InstanceConfig config = entry.getValue(); // 创建destination的工作节点 if (!embededCanalServer.isStart(destination)) { // HA机制启动 ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination); if (!config.getLazy() && !runningMonitor.isStart()) { runningMonitor.start(); } }
instanceConfigs="example" -> "InstanceConfig[globalConfig=InstanceConfig[globalConfig=<null>,mode=SPRING,lazy=false,managerAddress=<null>,springXml=classpath:spring/file-instance.xml],mode=<null>,lazy=<null>,managerAddress=<null>,springXml=<null>]"
判断传入的destination 实例是否启动,若未启动,则进入方法,
public boolean isStart(String destination) { return canalInstances.containsKey(destination) && canalInstances.get(destination).isStart(); }
1.5)启动监控类
ServerRunningMonitor为针对server的running节点控制类,每一个destination都有一个ServerRunningMonitor来监控。
public synchronized void start() { super.start(); try { processStart(); if (zkClient != null) { // 如果需要尽可能释放instance资源,不需要监听running节点,不然即使stop了这台机器,另一台机器立马会start String path = ZookeeperPathUtils.getDestinationServerRunning(destination); zkClient.subscribeDataChanges(path, dataListener); initRunning(); } else { processActiveEnter();// 没有zk,直接启动 } } catch (Exception e) { logger.error("start failed", e); // 没有正常启动,重置一下状态,避免干扰下一次start stop(); } }
1.6)
processStart();为zk启动,本地则直接进入方法processActiveEnter();// 没有zk,直接启动
private void processActiveEnter() { if (listener != null) { listener.processActiveEnter(); } }
1.7)再次回调到
1.8)进入嵌入启动类 调用start方法,
public void start(final String destination) { final CanalInstance canalInstance = canalInstances.get(destination); if (!canalInstance.isStart()) { try { MDC.put("destination", destination); canalInstance.start(); logger.info("start CanalInstances[{}] successfully", destination); } finally { MDC.remove("destination"); } } }
在获取 final CanalInstance canalInstance = canalInstances.get(destination)的时候,因为canalInstances 是一个map 在初始化的时候为null,只有当获取其方法的时候才会调用其app的方法,也就是canalInstanceGenerator.generate(destination);,
只是generate方法在controller里的initGlobalConfig里,所以才会再次返回调用其中的generate方法。
现在才会调用其apply中的方法,和上面的1.3)对应
其中该canalInstanceGenerator接口有两个实现类
因为目前走的是本地,也就是spring模式 ,然后initbean ,通过加载配置文件里的 name=instance的类
public class PropertyPlaceholderConfigurer extends org.springframework.beans.factory.config.PropertyPlaceholderConfigurer implements ResourceLoaderAware, InitializingBean { private static final String PLACEHOLDER_PREFIX = "${"; private static final String PLACEHOLDER_SUFFIX = "}"; private ResourceLoader loader; private String[] locationNames; public PropertyPlaceholderConfigurer(){ setIgnoreUnresolvablePlaceholders(true); } public void setResourceLoader(ResourceLoader loader) { this.loader = loader; } public void setLocationNames(String[] locations) { this.locationNames = locations; } public void afterPropertiesSet() throws Exception { Assert.notNull(loader, "no resourceLoader"); if (locationNames != null) { for (int i = 0; i < locationNames.length; i++) { locationNames[i] = resolveSystemPropertyPlaceholders(locationNames[i]); } } if (locationNames != null) { List<Resource> resources = new ArrayList<Resource>(locationNames.length); for (String location : locationNames) { location = trimToNull(location); if (location != null) { resources.add(loader.getResource(location)); } } super.setLocations(resources.toArray(new Resource[resources.size()])); } }
Spring容器初始化的时候,会读取xml或者annotation对Bean进行初始化。初始化的时候,这个PropertyPlaceholderConfigurer会拦截Bean的初始化,
初始化的时候会对配置的${pname}进行替换,根据我们Properties中配置的进行替换。从而实现表达式的替换操作 。
locationNames 里两个值:classpath:canal.properties 和classpath:example/instance.properties 其中example为传入参数替换的, placeholder为canal.instance.destination 加载配置文件里参数
首先加载file.instance.xml 里的方法,can.instance, 加载数据库,然后加载mybatis
其中中间括号里为变量
配置文件
# binlog ddl isolation canal.instance.get.ddl.isolation = false ################################################# ######### destinations ############# ################################################# canal.destinations= example # conf root dir canal.conf.dir = ../conf # auto scan instance dir add/remove and start/stop instance canal.auto.scan = true canal.auto.scan.interval = 5 canal.instance.tsdb.spring.xml=classpath:spring/tsdb/h2-tsdb.xml #canal.instance.tsdb.spring.xml=classpath:spring/tsdb/mysql-tsdb.xml canal.instance.global.mode = spring canal.instance.global.lazy = false #canal.instance.global.manager.address = 127.0.0.1:1099 #canal.instance.global.spring.xml = classpath:spring/local-instance.xml #canal.instance.global.spring.xml = classpath:spring/memory-instance.xml canal.instance.global.spring.xml = classpath:spring/file-instance.xml #canal.instance.global.spring.xml = classpath:spring/default-instance.xml
继续依次加载
通过配置文件初始化bean,和静态类,成员变量,等值替换
1.7) 准备启动 ,canalinstance 的实现类canalinstancewithspring
1.9)调用父类abstractcanalinstance里的start方法
public void start() { super.start(); if (!metaManager.isStart()) { metaManager.start(); } if (!alarmHandler.isStart()) { alarmHandler.start(); } if (!eventStore.isStart()) { eventStore.start(); } if (!eventSink.isStart()) { eventSink.start(); } if (!eventParser.isStart()) { beforeStartEventParser(eventParser); eventParser.start(); afterStartEventParser(eventParser); } logger.info("start successful...."); }
第一行super.start 为如下,看项目是否在运行,全局变量