Cassandra启动过程详解
这里的分析从CassandraDaemon.java文件开始。
一、配置文件storage-config.xml的读取和log4j的配置文件log4j.property的设置。
配置文件的读取和解析都是在org.apache.cassandra.config.DatabaseDescriptor类中完成的,这个类的作用非常简单,就是读取配置文件中各个配置项所定义的值,经过简单的验证,符合条件就将其值赋给DatabaseDescriptor的私有静态常量。值得注意的是关于Keyspace的解析,按照ColumnFamily的配置信息构建成org.apache.cassandra.config.CFMetaData对象,最后把这些所有ColumnFamily放入Keyspace的HashMap对象org.apache.cassandra.config.KSMetaData中,每个Keyspace就是一个Table。这些信息都是作为基本的元信息,可以通过DatabaseDescriptor类直接获取。
二、Keyspace的初始化。
这里主要调用Table.open(tableName)方法创建每个Table的实例。创建Table的实例将完成:1)获取该Table的元信息TableMatedate。2)创建改Table下每个ColumnFamily的存储操作对象ColumnFamilyStore。3)启动定时程序,检查该ColumnFamily的Memtable设置的MemtableFlushAfterMinutes是否已经过期,过期立即写到磁盘。详细过程可参见我前面关于该方法的详细代码跟踪分析。
一个Keyspace对应一个Table,一个Table持有多个ColumnFamilyStore,而一个ColumnFamily对应一个ColumnFamilyStore。Table并没有直接持有ColumnFamily的引用而是持有ColumnFamilyStore,这是因为ColumnFamilyStore类中不仅定义了对ColumnFamily的各种操作而且它还持有ColumnFamily在各种状态下数据对象的引用,所以持有了ColumnFamilyStore就可以操作任何与ColumnFamily相关的数据了。
三、Commitlog日志文件的恢复。
这里调用CmmitLog.recover()方法主要完成这几个操作,发现是否有没有被写到磁盘的数据,恢复这个数据,构建新的日志文件。CommitLog日志文件的恢复策略是,在头文件中发现没有被序列化的最新的ColumnFamilyId,然后取出这个这个被序列化RowMutation对象的起始地址,反序列化成为RowMutation对象,后面的操作和新添一条数据的流程是一样的,如果这个RowMutation对象中的数据被成功写到磁盘中,那么会在CommitLog去掉已经被持久化的ColumnFamilyId。
四、检查数据文件是否需要压缩
调用CompactionManager.instance.checkAllColumnFamilies()检查CF对应的数据文件是否需要压缩。将相似大小的SStable放到一个bucket中,然后调用submitMinorIfNeeded(cfs)。
五、启动存储服务
这是启动过程最重要的一步,需要启动很多服务。具体步骤有:
5.1)创建StorageMetadata
调用方法SystemTable.initMetadata()创建StorageMetadata。元数据只创建一次,如果元数据已经存在,则直接返回。StorageMetadata将包含三个关键信息:本节点的Token、当前generation以及ClusterName。这三个信息被存在StorageService类的属性metadata中(metadata是StorageMetadata类型的对象),以便后面随时调用。
Cassandra判断如果是第一次启动,Cassandra将会创建三列分别存储这些信息并将它们存在在系统表的LocationInfoColumnFamily中,key是“L”。这里的Token判断用户是否指定,如果指定了则使用用户指定的,否则随机生成一个Token,但是这个Token有可能在后面被修改;generation=System.currentTimeMillis()/ 1000,ClusterName为读取配置文件得到的值。
如果不是第一次启动将会更新这三个值:读取数据文件中的Token信息,generation信息以及ClusterName信息后设置Token值和ClusterName的值,更新generation的值为max(当前时间秒数,old_generation+1)。这里有点要注意的是,如果在后续的过程中更改了配置文件中ClusterName的名字,这会跟数据文件中存储的信息不一致,最终会导致Cassandra无法启动。
5.2)创建所有目录
调用方法DatabaseDescriptor.createAllDirectories()创建所有的目录。包括数据文件目录data/以及日志文件目录commitlog/。同时还为keyspaces创建了数据文件目录的子目录data/system和data/keyspace,...(keyspace为用户定义的keyspace)。当然这个方法早在Table.open()已经调用过了,在这里再次调用可能是为了某些测试需要。
5.3)启动GCInspector.instance.start服务
主要是统计统计当前系统中资源的使用情况(主要就是内存使用和回收情况),将这个信息记录到日志文件中,这个可以作为系统的监控日志使用。
5.4)启动消息监听服务
这个消息监听服务就是监听整个集群中其它节点发送到本节点的所有消息,Cassandra会根据每个消息的类型,做出相应的反应。消息监听代码如下:
public void listen(InetAddress localEp) throws IOException {
ServerSocketChannel serverChannel = ServerSocketChannel.open();
final ServerSocket ss = serverChannel.socket();
ss.setReuseAddress(true);
ss.bind(new InetSocketAddress(localEp, DatabaseDescriptor.getStoragePort()));
socketThread = new SocketThread(ss, "ACCEPT-" + localEp);
socketThread.start();
}
这里用到了nio里面的异步IO与连网的部分。监听端口默认为7000。创建一个线程SocketThread用于监听消息。
每接收到一个消息,就创建一个新的线程newIncomingTcpConnection(socket).start()进行消息响应;该线程run方法中主要是对消息进行魔数的验证,以及读取消息头部和消息体等内容,然后将消息内容反序列化的任务MessageDeserializationTask递交到相应的消息反序列化线程池messageDeserializerExecutor_中。
MessageDeserializationTask反序列化消息内容后调用MessagingService.receive()处理消息。
receive()方法中创建MessageDeliveryTask任务对象,根据消息类型得到相应的stage的线程池对象,如果没有对应的线程池,则使用messageDeserializerExecutor_。
stage线程池执行MessageDeliveryTask任务,该任务主要是根据消息中的Verb,调用相应的VerbHandler.doVerb()方法来完成消息的处理。比如GossipDigestAckVerbHandler.doVerb()用来处理Gossip阶段的ACK消息。
5.5)启动StorageLoadBalancer.instance.startBroadcasting服务
调用方法loadTimer_.schedule(newLoadDisseminator(), 2 * Gossiper.intervalInMillis_,BROADCAST_INTERVAL),定时得到节点负载信息,2个Gossiper心跳后开始,间隔时间为60s。该任务得到节点数据总量(包括所有Data文件、FIlter文件以及Index文件),并将其更新到ApplicationState中,然后就可以通过这个state来和其它节点交换信息。这个load信息在数据的存储和新节点加入的时候,会有参考价值。
5.6)启动Gossiper服务
在启动Gossiper服务之前,将StorageService注册为观察者,一旦节点的某些状态发生变化,而这些状态是StorageService感兴趣的,StorageService的onChange方法就会触发。Gossiper服务就是一个定时程序,它会创建一个EndPointState对象。EndPointState对象持有HeartBeatState的引用和ApplicationState的一个引用集Map<String,ApplicationState> applicationState_ = newHashtable<String,ApplicationState>()。对于每个Application对象,EndPointState只保存一个最新的值,所以新值会覆盖旧值。
HeartBeatState对象记录了当前心跳的generation和version,这个generation和前面的StorageMetadata存储的generation是一致的,在节点每次启动的时候更新;而version是从0开始的,每次更新加1;每个节点有一个HeartBeatState对象与之关联。
ApplicationState的一个引用集Map<String,ApplicationState> applicationState_则是记录一些状态信息,比如前面startBroadcasting()过程中记录节点负载情况。ApplicationState对象包含state值和version值。比如表示节点负载的状态信息可能表示形式为(5.2,45),意思就是在version为45的时候负载为5.2;相似地,节点启动的状态信息可能表示形式为(bLpassF3XD8Kyks,56),前面的值表示启动的token,后面的56是version值。是需要注意的是创建ApplicationState对象时,version值加1。
还有一个结构需要注意,就是Gossiper中的Map<InetAddress,EndPointState>endPointStateMap__,它保存了它监听到的所有节点的EndPointState信息,包括它自己的。
Gossiper这个定时程序每隔一秒钟随机向定义的节点发送一个消息,而这个消息是保持集群中节点状态一致的唯一途径(具体过程后面详述)。
5.7)判断启动模式。
启动模式跟配置文件中的AutoBootstrap这一项相关。那这个配置项与Token和负载又有何关联性?其实表面上看起来这个配置项是当这个节点启动时是否自动加入集群。但是,当你设置成False时它是不是就不加入集群呢?显然不是,这还要看你有没有配置seeds,如果你配置了其它seed,那么它仍然会去加入集群。
那么到底有何区别,通过分析其启动代码发现,这个配置项不仅跟seed配置项有关而且和Cassandra是否是第一次启动也有关。Cassandra的启动规则大慨如下:
1)当AutoBootstrap设为FALSE,
第一次启动时Cassandra会设置系统表中key为Bootstrap,CF为STATUS_CF的Column为B的值为TRUE,以表示这是由系统自动设置的,其实这个只是作为一个标志来判断你以后的启动情况。标记启动方式,主要是防止用户再修改AutoBootstrap的启动模式。
调用TokenMetadata.updateToken()更新token。
加入ApplicationState对象到EPS的map<key,AS>中。key为MOVE,state为NORMAL:Token。
设置模式为“Normal”。
2)当AutoBootstrap设为TRUE,第一次启动,Cassandra会判断当前节点配置在seeds中,Cassandra的启动情况和1是一样的。
3)当AutoBootstrap设为TRUE,第一次启动,并且没有配置为seed,Cassandra将会有一个漫长的启动过程,当然这个时间的长短和你的当前的集群的数据量有很大的关系。
设置模式为“Joining:getting load information”。
等待90s(BROADCAST_INTERVAL+RING_DELAY)为了节点获得所有其他节点的负载信息。
如果tokenMetadata已经包含了本节点ip,则抛出异常。
设置模式为“Joining:getting bootstrap token”。
如果在配置文件中指定了InitialToken,则返回这个InitialToken。否则调用getBalancedToken(metadata,load)。
getBalancedToken()方法首先调用getBootstrapSource()方法得到负载最大的节点的ip地址,(如果没有任何节点的负载信息,则抛出运行时异常;这个排序过程是先以落入该节点token范围内的正处于bootstrap的节点数目排序,数目越多优先级越低。如果落入其中的bootstrap节点数目相同,再以负载大小排序)。然后向这个节点发送消息,获取其一半key范围所对应的Token,这个Token是前半部分值(如果key的数目<3,则返回一个随机Token值)。
-
public static Token getBalancedToken(TokenMetadata metadata, Map<InetAddress, Double> load) {
InetAddress maxEndpoint = getBootstrapSource(metadata, load);
Token<?> t = getBootstrapTokenFrom(maxEndpoint);
return t;
}
调用startBootstrap()方法。
-
private void startBootstrap(Token token) throws IOException {
isBootstrapMode = true; //设置isBootstrapMode。
SystemTable.updateToken(token); //更新本节点的Token值。
//加入状态信息。(MOVE, appstate(BOOT:Token))
Gossiper.instance.addLocalApplicationState(MOVE_STATE, new ApplicationState(STATE_BOOTSTRAPPING + Delimiter
partitioner_.getTokenFactory().toString(token)));
//设置模式为等待range重新分布
setMode("Joining: sleeping " + RING_DELAY + " for pending range setup", true);
try {
Thread.sleep(RING_DELAY);
} catch (InterruptedException e) {
throw new AssertionError(e);
}
//设置模式为正在启动
setMode("Bootstrapping", true);
//开始启动
new BootStrapper(FBUtilities.getLocalAddress(), token, tokenMetadata_).startBootstrap(); // handles
} -
调用BootStrapper.startBootstrap()方法完成启动,发送STREAM_REQUEST消息,请求数据。
public void startBootstrap() throws IOException { for (String table : DatabaseDescriptor.getNonSystemTables()) { Multimap<Range, InetAddress> rangesWithSourceTarget = getRangesWithSources(table); /* Send messages to respective folks to stream data over to me*/ for (Map.Entry<InetAddress, Collection<Range>> entry : getWorkMap(rangesWithSourceTarget).asMap().entrySet()) { InetAddress source = entry.getKey(); StorageService.instance.addBootstrapSource(source, table); StreamIn.requestRanges(source, table, entry.getValue()); } } }
1)首先调用getRangesWithSources(table)得到该节点负责的ranges所对应的节点集合。
2)对于上一步得到的节点集合,移除掉不存活的节点,然后将活着的节点加入到Multimap<InetAddress,Range> sources 集合中。通过循环对每一个活着的节点,将其加入到bootstrapSet中(即作为bootstrapsource),然后调用StreamIn.requestRanges()请求该节点对应范围内的数据。
3)requestRanges(ip,tableName,ranges)方法构建流请求消息StreamRequestMessage,然后调用MessagingService.instance.sendOneWay(message,source)发送消息。
4)流请求消息的序列化格式为
streammetadata
.length
local
ip
table
name
ranges.size
type
RANGE
lefttoken
length+byte
righttoken
length+byte
...
<—metadata.len-><————————metadata——<———————ranges————————>>
5)流数据需要经历STREAM_REQUEST,STREAM_INITIATE, STREAM_INITIATE_DONE, STREAM_COMPLETE,STREAM_FINISHED..等阶段。在最后会调用finishBootstrapping()方法,其中设置启动标志,并在setToken()中设置系统表中token值,并调用updateToken()更新token环。最后加入状态信息<MOVE,NORMAL:Token>并设置mode为Normal。
private void finishBootstrapping() {
isBootstrapMode = false;
SystemTable.setBootstrapped(true);
setToken(getLocalToken());
Gossiper.instance.addLocalApplicationState(MOVE_STATE,
new ApplicationState(STATE_NORMAL + Delimiter + partitioner_.getTokenFactory().toString(getLocalToken())));
setMode("Normal", false);
}
参考资料
cassandra详解 http://www.ibm.com/developerworks/cn/opensource/os-cn-cassandraxu1/
分布式系统设计