MyCat源码分析系列之——配置信息和启动流程

时间:2021-11-21 17:29:46

更多MyCat源码分析,请戳MyCat源码分析系列


 MyCat配置信息

除了一些默认的配置参数,大多数的MyCat配置信息是通过读取若干.xml/.properties文件获取的,主要包括:

1)server.xml:系统和用户相关配置

2)schema.xml:虚拟库、表、数据节点配置等

3)rule.xml:分片规则设置

4)cacheservice.properties:缓存相关设置

5)dnindex.properties:datahost主从切换配置文件

6)sequence_conf.properties:本地全局序列号配置文件

而在代码层面,与配置相关的类主要包括3个:

1)MycatConfig:最为重要的配置类

2)ReloadConfig:用于通过管理端口执行mysql> reload @@config或config_all命令,重新载入配置文件

3)RollbackConfig:用于通过管理端口执行mysql> rollback @@config命令,将配置信息回滚至reload之前的状态

接下来重点介绍MycatConfig,它最关键的属性如下:

private volatile SystemConfig system;
private volatile MycatCluster cluster;
private volatile MycatCluster _cluster;
private volatile QuarantineConfig quarantine;
private volatile QuarantineConfig _quarantine;
private volatile Map<String, UserConfig> users;
private volatile Map<String, UserConfig> _users;
private volatile Map<String, SchemaConfig> schemas;
private volatile Map<String, SchemaConfig> _schemas;
private volatile Map<String, PhysicalDBNode> dataNodes;
private volatile Map<String, PhysicalDBNode> _dataNodes;
private volatile Map<String, PhysicalDBPool> dataHosts;
private volatile Map<String, PhysicalDBPool> _dataHosts;
  • SystemConfig:包含了诸多系统相关的配置参数(如端口、编码、线程池大小、BufferPool大小、隔离级别等)
  • MycatCluster:MyCat集群配置信息
  • QuarantineConfig:用户权限隔离(黑白名单)
  • UserConfig:用户配置,包含用户名/密码和允许访问的虚拟库
  • SchemaConfig:虚拟库配置,包括所有下属的表配置(

    TableConfig

    )以及这些表涉及的datanode
  • PhysicalDBNode:datanode相关,对应一个数据库实例中的数据库
  • PhysicalDBPool:datahost相关,里面包含了DataHostConfig配置,包括所有writeHosts和readHosts以及读写分离类型等

注意到除了SystemConfig,其余属性都还有一个前缀加了_的同名属性,这些属性其实是作为备份的,用于reload/rollback配置文件时的切换。reload和rollback相关的方法如下:

public void reload(Map<String, UserConfig> users,
Map
<String, SchemaConfig> schemas,
Map
<String, PhysicalDBNode> dataNodes,
Map
<String, PhysicalDBPool> dataHosts, MycatCluster cluster,
QuarantineConfig quarantine,
boolean reloadAll) {
apply(users, schemas, dataNodes, dataHosts, cluster, quarantine,reloadAll);
this.reloadTime = TimeUtil.currentTimeMillis();
this.status = reloadAll?RELOAD_ALL:RELOAD;
}
public void rollback(Map<String, UserConfig> users,
Map
<String, SchemaConfig> schemas,
Map
<String, PhysicalDBNode> dataNodes,
Map
<String, PhysicalDBPool> dataHosts, MycatCluster cluster,
QuarantineConfig quarantine) {
apply(users, schemas, dataNodes, dataHosts, cluster, quarantine,status
==RELOAD_ALL);
this.rollbackTime = TimeUtil.currentTimeMillis();
this.status = ROLLBACK;
}

在reload的时候注意到有reload和reload_all,区别就在于前者不会重新加载与datahost/datanode相关的更改,而后者会。

 


 启动流程

MyCat的启动类为MycatStartup,而主体为MycatServer,其中主要分为两个步骤:

1)初始化:此过程在MycatServer构造函数时执行,包括配置文件的读取、CacheService和RouteService的创建等

public MycatServer() {
this.config = new MycatConfig();
this.timer = new Timer(NAME + "Timer", true);
this.sqlRecorder = new SQLRecorder(config.getSystem()
.getSqlRecordCount());
this.isOnline = new AtomicBoolean(true);
cacheService
= new CacheService();
routerService
= new RouteService(cacheService);
// load datanode active index from properties
dnIndexProperties = loadDnIndexProps();
try {
sqlInterceptor
= (SQLInterceptor) Class.forName(
config.getSystem().getSqlInterceptor()).newInstance();
}
catch (Exception e) {
throw new RuntimeException(e);
}
catletClassLoader
= new DynaClassLoader(SystemConfig.getHomePath()
+ File.separator + "catlet", config.getSystem()
.getCatletClassCheckSeconds());
this.startupTime = TimeUtil.currentTimeMillis();
}

2)运行:此过程由startup()方法触发,包括处理器对象创建、bufferpool创建、处理线程池创建、AIOConnector/NIOConnector创建与启动、两个AIOAcceptor/NIOAcceptor创建与启动、后端数据库的初始连接建立、定时器线程池/定时任务创建与启动

public void startup() throws IOException {

SystemConfig system
= config.getSystem();
int processorCount = system.getProcessors();

// server startup
LOGGER.info("===============================================");
LOGGER.info(NAME
+ " is ready to startup ...");
String inf
= "Startup processors ...,total processors:"
+ system.getProcessors() + ",aio thread pool size:"
+ system.getProcessorExecutor()
+ " \r\n each process allocated socket buffer pool "
+ " bytes ,buffer chunk size:"
+ system.getProcessorBufferChunk()
+ " buffer pool's capacity(buferPool/bufferChunk) is:"
+ system.getProcessorBufferPool()
/ system.getProcessorBufferChunk();
LOGGER.info(inf);
LOGGER.info(
"sysconfig params:" + system.toString());

// startup manager
ManagerConnectionFactory mf = new ManagerConnectionFactory();
ServerConnectionFactory sf
= new ServerConnectionFactory();
SocketAcceptor manager
= null;
SocketAcceptor server
= null;
aio
= (system.getUsingAIO() == 1);

// startup processors
int threadPoolSize = system.getProcessorExecutor();
processors
= new NIOProcessor[processorCount];
long processBuferPool = system.getProcessorBufferPool();
int processBufferChunk = system.getProcessorBufferChunk();
int socketBufferLocalPercent = system.getProcessorBufferLocalPercent();
bufferPool
= new BufferPool(processBuferPool, processBufferChunk,
socketBufferLocalPercent
/ processorCount);
businessExecutor
= ExecutorUtil.create("BusinessExecutor",
threadPoolSize);
timerExecutor
= ExecutorUtil.create("Timer", system.getTimerExecutor());
listeningExecutorService
= MoreExecutors.listeningDecorator(businessExecutor);

for (int i = 0; i < processors.length; i++) {
processors[i]
= new NIOProcessor("Processor" + i, bufferPool,
businessExecutor);
}

if (aio) {
LOGGER.info(
"using aio network handler ");
asyncChannelGroups
= new AsynchronousChannelGroup[processorCount];
// startup connector
connector = new AIOConnector();
for (int i = 0; i < processors.length; i++) {
asyncChannelGroups[i]
= AsynchronousChannelGroup
.withFixedThreadPool(processorCount,
new ThreadFactory() {
private int inx = 1;

@Override
public Thread newThread(Runnable r) {
Thread th
= new Thread(r);
th.setName(BufferPool.LOCAL_BUF_THREAD_PREX
+ "AIO" + (inx++));
LOGGER.info(
"created new AIO thread "
+ th.getName());
return th;
}
});

}
manager
= new AIOAcceptor(NAME + "Manager", system.getBindIp(),
system.getManagerPort(), mf,
this.asyncChannelGroups[0]);

// startup server

server
= new AIOAcceptor(NAME + "Server", system.getBindIp(),
system.getServerPort(), sf,
this.asyncChannelGroups[0]);

}
else {
LOGGER.info(
"using nio network handler ");
NIOReactorPool reactorPool
= new NIOReactorPool(
BufferPool.LOCAL_BUF_THREAD_PREX
+ "NIOREACTOR",
processors.length);
connector
= new NIOConnector(BufferPool.LOCAL_BUF_THREAD_PREX
+ "NIOConnector", reactorPool);
((NIOConnector) connector).start();

manager
= new NIOAcceptor(BufferPool.LOCAL_BUF_THREAD_PREX + NAME
+ "Manager", system.getBindIp(), system.getManagerPort(),
mf, reactorPool);

server
= new NIOAcceptor(BufferPool.LOCAL_BUF_THREAD_PREX + NAME
+ "Server", system.getBindIp(), system.getServerPort(), sf,
reactorPool);
}
// manager start
manager.start();
LOGGER.info(manager.getName()
+ " is started and listening on "
+ manager.getPort());
server.start();
// server started
LOGGER.info(server.getName() + " is started and listening on "
+ server.getPort());
LOGGER.info(
"===============================================");
// init datahost
Map<String, PhysicalDBPool> dataHosts = config.getDataHosts();
LOGGER.info(
"Initialize dataHost ...");
for (PhysicalDBPool node : dataHosts.values()) {
String index
= dnIndexProperties.getProperty(node.getHostName(),
"0");
if (!"0".equals(index)) {
LOGGER.info(
"init datahost: " + node.getHostName()
+ " to use datasource index:" + index);
}
node.init(Integer.valueOf(index));
node.startHeartbeat();
}
long dataNodeIldeCheckPeriod = system.getDataNodeIdleCheckPeriod();
timer.schedule(updateTime(),
0L, TIME_UPDATE_PERIOD);
timer.schedule(processorCheck(),
0L, system.getProcessorCheckPeriod());
timer.schedule(dataNodeConHeartBeatCheck(dataNodeIldeCheckPeriod),
0L,
dataNodeIldeCheckPeriod);
timer.schedule(dataNodeHeartbeat(),
0L,
system.getDataNodeHeartbeatPeriod());
timer.schedule(catletClassClear(),
30000);
}

从以上代码中不难看出,connector用于作为客户端与后端MySQL建立连接,而server和manager则作为服务端接受来自前端应用的连接请求,其中server负责常规业务流程(默认端口8066),而manager负责监控与管理(默认端口9066)。

而datahost的初始化行为由node.init(Integer.valueOf(index));触发,其目的是每个datahost中writehost会连续创建若干初始连接供使用(该数量由schema.xml中datahost标签的minCon属性确定),当后续连接不足时会创建新的连接(最大不超过maxCon)。

由此,MyCat程序启动完成,等待接受来自应用的连接请求和后续的命令处理。

 


为尊重原创成果,如需转载烦请注明本文出处:http://www.cnblogs.com/fernandolee24/p/5193983.html,特此感谢