cassandra 服务启动流程

时间:2025-01-27 08:57:39

cassandra 服务启动流程

1.  setup

1)   CassandraDaemon ->main

 publicstaticvoidmain(String[]args)

   {

        instance.activate();

   }

2)   系统参数初始化

    配置文件的读取和解析都是在 类中完成的,这个类的作用非常简单,就是读取配置文件中各个配置项所定义的值,经过简单的验证,符合条件就将其值赋给 DatabaseDescriptor 的私有静态常量。具体的实现过程如下:

() //加载了系统设置,静态变量,加载了系统的默认参数:

->applyConfig(loadConfig());

下面是获取系统需要的表格,

->// Hardcoded system keyspaces

  List<KSMetaData> systemKeyspaces =(());

       assert () == ();

       for (KSMetaData ksmd : systemKeyspaces)

            (ksmd);

    ->每一个表格写入到Schema

       /**

     *Load specific keyspace into Schema

     *

     *@param keyspaceDef The keyspace to load up

     *

     *@return self to support chaining calls

    */

   public Schema load(KSMetaData keyspaceDef)

    {

       for (CFMetaData cfm : ().values())

           load(cfm);

       setKeyspaceDefinition(keyspaceDef);

       return this;

    }

 

最后的cfm都是存放到:private final ConcurrentBiMap<Pair<String,String>, UUID> cfIdMap = new ConcurrentBiMap<>();

上面的这段代码获取到系统需要默认的表格,但是这边还没有创建表格。

3)  keyspacemeta

a)        for(MemoryPoolMXBean pool:())

("{} {}: {}",(), (), ());//输出cassandra jvm的所有pool的信息

输出结果如下:

INFO 07:20:32 Heap size: 124780544/954728448

INFO 07:21:12 Code Cache Non-heap memory: init = 2555904(2496K) used =828800(809K) committed = 2555904(2496K) max = 50331648(49152K)

INFO 07:21:28 PS Eden Space Heap memory: init = 33030144(32256K) used =33030144(32256K) committed = 33030144(32256K) max = 347602944(339456K)

INFO 07:21:29 PS Survivor Space Heap memory: init = 5242880(5120K) used =5227632(5105K) committed = 5242880(5120K) max = 5242880(5120K)

INFO 07:22:43 PS Old Gen Heap memory: init = 86507520(84480K) used =351840(343K) committed = 86507520(84480K) max = 715653120(698880K)

INFO 07:22:49 PS Perm Gen Non-heap memory: init = 22020096(21504K) used =16674864(16284K) committed = 22020096(21504K) max = 85983232(83968K)

b)        检查目录是否存在和权限

c)        启动内存初始化

private CacheService()

 {

............................................

       keyCache = initKeyCache();

       //keycache初始化

       rowCache = initRowCache();

        // rowCache初始化

       counterCache = initCounterCache();

      // counterCache处理化

   }

下面我们分析keycache处理化的实现过程:

private AutoSavingCache<KeyCacheKey,RowIndexEntry> initKeyCache() {

  longkeyCacheInMemoryCapacity = () * 1024 *1024;

  ICache<KeyCacheKey,RowIndexEntry> kc;

  kc =(keyCacheInMemoryCapacity);

 AutoSavingCache<KeyCacheKey,RowIndexEntry> keyCache = new AutoSavingCache<>(kc,CacheType.KEY_CACHE, new KeyCacheSerializer());

int keyCacheKeysToSave =();

((),keyCacheKeysToSave);

       return keyCache;

   }

 

分析(keyCacheInMemoryCapacity)实现过程:

创建了一个:ConcurrentLinkedHashMap<K, V> map;存储所用cache;

具体的创建过程如下:

ConcurrentLinkedHashMap<K, V> map =new <K, V>()

                                           .weigher(entryWeiger)

                                           .maximumWeightedCapacity(weightedCapacity)

                                           .concurrencyLevel(DEFAULT_CONCURENCY_LEVEL)

                                           .build();

d)        initialize keyspaces

        for (String keyspaceName :())

        {

            if (())

                ("openingkeyspace {}", keyspaceName);

            // disable auto compaction untilcommit log replay ends

            for (ColumnFamilyStore cfs :(keyspaceName).getColumnFamilyStores())

            {

                for (ColumnFamilyStore store :())

                {

                   ();//关闭完以后,关闭自动compaction功能

                }

            }

        }

 

4)  commlogrecover

代码入口:CommitLog.instance.recover();

    为了保证系统出现异常情况,现在系统选择从系统默认的commitlog恢复日志。这里主要完成这几个操作,发现是否有没有被写到磁盘的数据,恢复这个数据,构建新的日志文件。CommitLog 日志文件的恢复策略是,在头文件中发现没有被序列化的最新的ColumnFamily Id,然后取出这个这个被序列化 RowMutation 对象的起始地址,反序列化成为 RowMutation 对象,后面的操作和新添一条数据的流程是一样的,如果这个 RowMutation 对象中的数据被成功写到磁盘中,那么会在 CommitLog 去掉已经被持久化的 ColumnFamily Id。关于 CommitLog 日志文件的存储格式以及数据如何写到 CommitLog 文件中。

5)   auto compaction

在启动过程中,需要让每个keyspace去compaction,sstable的数据的也将flush到磁盘。所有如果在集群重启以后,这里会提交compact。

具体的实现代码如下:

   if(().shouldBeEnabled())

                       ();

6)  GCInspectorregister

     服务。主要是统计统计当前系统中资源的使用情况,将这个信息记录到日志文件中,这个可以作为系统的监控日志使用。

 

7)  ()

Ø  init StorageProxy

("");

Ø  init IndexSummaryManager

("");

Ø  从系统peers获取该节点的ring和hostid;

Ø  启动gossipservice,保证可以与其他节点通信;关于gossip怎么样通信,后面会详细分析其通信过程;

Ø  ();

Ø  ();

Ø  ().listen(());

Ø  ();

Ø  Thift  init

Ø  native transport int

2.  start

启动过程主要包含2个步骤:

1)        ();

2)        ();

   务启动工作已经setup步骤完成;下面专门分析nativeServer的启动过程,nativeServer使用了netty的通信模型,Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。本文第四小节的代码是使用netty

的example,读者感兴趣可以调试。

 nativeserver 具体设置如下:

eventExecutorGroup = newRequestThreadPoolExecutor();

workerGroup= newNioEventLoopGroup();

ServerBootstrapbootstrap

 = new ServerBootstrap().group(workerGroup)

                     .channel()

                     .childOption(ChannelOption.TCP_NODELAY,true)

                     .childOption(ChannelOption.SO_KEEPALIVE,())

                      .childOption(,)

                      .childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK,32 * 1024)

                      .childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK,8 * 1024);

在运行过程中,会对channel进行操作注册:

protected voidinitChannel(Channel channel) throws Exception

        {

            ChannelPipeline pipeline =();

            //("debug",new LoggingHandler());

           ("frameDecoder", ());

            ("frameEncoder",frameEncoder);

            ("frameDecompressor",frameDecompressor);

            ("frameCompressor",frameCompressor);

           ("messageDecoder", messageDecoder);

            ("messageEncoder",messageEncoder);

            (,"executor", dispatcher);

        }

3.  接受ConcurrentLinkedHashMap数据结构

Cassandra 源码里面详细该数据结构,实现了可以用来实现一个基于LRU策略的缓存。

1)    linkedHashMap

import ;

import ;

import ;

import ;

public class TestLinkedHashMap {

  public static void main(String args[])

  {

   ("*************************LinkedHashMap*************");

   Map<Integer,String>map = new LinkedHashMap<Integer,String>();

   (6, "apple");

   (3, "banana");

   (2,"pear");

   for (Iterator it = ().iterator();();)

   {

    Objectkey = ();

    (key+"="+ (key));

   }

   ("*************************HashMap*************");

   Map<Integer,String>map1 = new  HashMap<Integer,String>();

   (6, "apple");

   (3, "banana");

   (2,"pear");

   for (Iterator it = ().iterator();();)

   {

    Objectkey = ();

    (key+"="+ (key));

   }

  }

}

输出:

运行结果如下:

*************************LinkedHashMap*************
6=apple
3=banana
2=pear
*************************HashMap**************************
2=pear
6=apple
3=banana

分析:

l   LinkedHashmap的特点是put进去的对象位置未发生变化,HashMap会发生变化;

l   LinkedHashMap非线程安全 需要采用googleConcurrentLinkedHashMap/p/concurrentlinkedhashmap/

l   可以实现last recently used 功能

2)    ConcurrentLinkedHashMap

ConcurrentHashMap的封装,可以用来实现一个基于LRU策略的缓存.

public static voidmain(String[] args) {

ConcurrentLinkedHashMap<Integer,Integer> map = new

<Integer,Integer>().maximumWeightedCapacity(2);

weigher(()).build();

(1, 1); 

(2, 2); 

(3, 3); 

((1));//null已经失效了 

((2));

}

3)    整体架构

l  它本质是额外维护了一个双向链表,每次读和写都要改变相应节点的位置,将其移至队列头;

l  什么时候判断容易已经满了,是根据weight。每个元素都有一个weight,每增加一个元素,weight累计;

l  当达到最大值的时候,就需要剔除最少操作的那个元素了,并且触发相关的事件;