Zookeeper3.5.7源码分析

时间:2024-01-26 07:00:29

文章目录

  • 一、Zookeeper算法一致性
    • 1、Paxos 算法
      • 1.1 概述
      • 1.2 算法流程
      • 1.3 算法缺陷
    • 2、ZAB 协议
      • 2.1 概述
      • 2.2 Zab 协议内容
    • 3、CAP理论
  • 二、源码详解
    • 1、辅助源码
      • 1.1 持久化源码(了解)
      • 1.2 序列化源码
    • 2、ZK 服务端初始化源码解析
      • 2.1 启用脚本分析
      • 2.2 ZK 服务端启动入口
      • 2.3 解析参数 zoo.cfg 和 myid
      • 2.4 过期快照删除
      • 2.5 初始化通信组件
    • 3、ZK 服务端加载数据源码解析
      • 3.1 冷启动数据恢复快照数据
      • 3.2 冷启动数据恢复编辑日志
    • 4、ZK 选举源码解析
      • 4.1 选举准备
      • 4.2 选举执行
    • 5、Follower 和 Leader 状态同步源码
    • 6、服务端启动
      • 6.1 Leader 启动
      • 6.2 Follower 启动
    • 7、客户端启动
      • 7.1 创建 ZookeeperMain
      • 7.2 初始化监听器
      • 7.3 解析连接地址
      • 7.4 创建通信
      • 7.5 执行 run()

一、Zookeeper算法一致性

1、Paxos 算法

1.1 概述

Paxos算法:一种基于消息传递且具有高度容错特性的一致性算法。Paxos算法解决的问题:就是如何快速正确的在一个分布式系统中对某个数据值达成一致,并且保证不论发生任何异常,都不会破坏整个系统的一致性。

在一个Paxos系统中,首先将所有节点划分为Proposer(提议者),Acceptor(接受者),和Learner(学习者)。(注意:每个节点都可以身兼数职),一个完整的Paxos算法流程分为三个阶段:

Prepare准备阶段

  • Proposer向多个Acceptor发出Propose请求Promise(承诺)
  • Acceptor针对收到的Propose请求进行Promise(承诺)

Accept接受阶段

  • Proposer收到多数Acceptor承诺的Promise后,向Acceptor发出Propose请求
  • Acceptor针对收到的Propose请求进行Accept处理

Learn学习阶段:Proposer将形成的决议发送给所有Learners

1.2 算法流程

1.3 算法缺陷

在网络复杂的情况下,一个应用 Paxos 算法的分布式系统,可能很久无法收敛,甚至陷入活锁的情况。造成这种情况的原因是系统中有一个以上的 Proposer,多个Proposers 相互争夺Acceptor,造成迟迟无法达成一致的情况。针对这种情况,一种改进的 Paxos 算法被提出:从系统中选出一个节点作为 Leader,只有 Leader 能够发起提案。这样,一次 Paxos 流程中只有一个Proposer,不会出现活锁的情况

2、ZAB 协议

2.1 概述

Zab 借鉴了 Paxos 算法,是特别为 Zookeeper 设计的支持崩溃恢复的原子广播协议。基于该协议,Zookeeper 设计为只有一台客户端(Leader)负责处理外部的写事务请求,然后Leader 客户端将数据同步到其他 Follower 节点。即 Zookeeper 只有一个 Leader 可以发起提案

2.2 Zab 协议内容

Zab 协议包括两种基本的模式:消息广播、崩溃恢复

3、CAP理论

CAP理论告诉我们,一个分布式系统不可能同时满足以下三种

  • 一致性(C:Consistency)

    在分布式环境中,一致性是指数据在多个副本之间是否能够保持数据一致的特性。在一致性的需求下,当一个系统在数据一致的状态下执行更新操作后,应该保证系统的数据仍然处于一致的状态。

  • 可用性(A:Available)

    可用性是指系统提供的服务必须一直处于可用的状态,对于用户的每一个操作请求总是能够在有限的时间内返回结果

  • 分区容错性(P:Partition Tolerance)

    分布式系统在遇到任何网络分区故障的时候,仍然需要能够保证对外提供满足一致性和可用性的服务,除非是整个网络环境都发生了故障

这三个基本需求,最多只能同时满足其中的两项,因为P是必须的,因此往往选择就在CP或者AP中。ZooKeeper保证的是CP

  • ZooKeeper不能保证每次服务请求的可用性。(注:在极端环境下,ZooKeeper可能会丢弃一些请求,消费者程序需要重新请求才能获得结果)。所以说,ZooKeeper不能保证服务可用性。
  • 进行Leader选举时集群都是不可用

二、源码详解

Zookeeper源码下载地址:https://archive.apache.org/dist/zookeeper/zookeeper-3.5.7/

1、辅助源码

1.1 持久化源码(了解)

Leader 和 Follower 中的数据会在内存和磁盘中各保存一份。所以需要将内存中的数据持久化到磁盘中,在 org.apache.zookeeper.server.persistence 包下的相关类都是序列化相关的代码

public interface SnapShot {
    // 反序列化方法
    long deserialize(DataTree dt, Map<Long, Integer> sessions)
    throws IOException;
    // 序列化方法
    void serialize(DataTree dt, Map<Long, Integer> sessions,
    File name) throws IOException;
    
    /**
    *find the most recent snapshot file
    *查找最近的快照文件
    */
    File findMostRecentSnapshot() throws IOException;
    // 释放资源
    void close() throws IOException;
}

public interface TxnLog {
    // 设置服务状态
    void setServerStats(ServerStats serverStats);
    // 滚动日志
    void rollLog() throws IOException;
    // 追 加
    boolean append(TxnHeader hdr, Record r) throws IOException;
    // 读取数据
    TxnIterator read(long zxid) throws IOException;
    // 获取最后一个 zxid
    long getLastLoggedZxid() throws IOException;
    // 删除日志
    boolean truncate(long zxid) throws IOException;
    // 获 取 DbId
    long getDbId() throws IOException;
    // 提 交
    void commit() throws IOException;
    // 日志同步时间
    long getTxnLogSyncElapsedTime();
    // 关闭日志
    void close() throws IOException;
    // 读取日志的接口
    public interface TxnIterator {
        // 获取头信息
        TxnHeader getHeader();
        // 获取传输的内容
        Record getTxn();
        // 下一条记录
        boolean next() throws IOException;
        // 关闭资源
        void close() throws IOException;
        // 获取存储的大小
        long getStorageSize() throws IOException;
    }
}

1.2 序列化源码

zookeeper-jute 代码是关于Zookeeper 序列化相关源码

2、ZK 服务端初始化源码解析

2.1 启用脚本分析

zkServer.sh start 底层的实际执行内容,所以程序的入口是 QuorumPeerMain.java 类

nohup "$JAVA" 
+ 一堆提交参数
+ $ZOOMAIN(org.apache.zookeeper.server.quorum.QuorumPeerMain)
+ "$ZOOCFG"(zkEnv.sh 文件中 ZOOCFG="zoo.cfg")

2.2 ZK 服务端启动入口

源码里查找QuorumPeerMain类

public static void main(String[] args) {
    // 创建了一个 zk 节点
    QuorumPeerMain main = new QuorumPeerMain();
    try {
    // 初始化节点并运行,args 相当于提交参数中的 zoo.cfg
    main.initializeAndRun(args);
    } catch (IllegalArgumentException e) {
    ... ...
    }
    LOG.info("Exiting normally"); System.exit(0);
}

protected void initializeAndRun(String[] args)
throws ConfigException, IOException, AdminServerException{
    // 管理 zk 的配置信息
    QuorumPeerConfig config = new QuorumPeerConfig();
    if (args.length == 1) {
    // 1 解析参数,zoo.cfg 和 myid
    config.parse(args[0]);
    }
    // 2 启动定时任务,对过期的快照,执行删除(默认该功能关闭)
    // Start and schedule the the purge task
    DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
    .getDataDir(), config.getDataLogDir(), config
    .getSnapRetainCount(), config.getPurgeInterval()); purgeMgr.start();
    
    if (args.length == 1 && config.isDistributed()) {
    // 3 启动集群
    runFromConfig(config);
    } else {
    LOG.warn("Either no config or no quorum defined in config, running "
    + " in standalone mode");
    // there is only server in the quorum -- run as standalone ZooKeeperServerMain.main(args);
    }
}

2.3 解析参数 zoo.cfg 和 myid

public void parse(String path) throws ConfigException {
    LOG.info("Reading configuration from: " + path);
    try {
    // 校验文件路径及是否存在
    File configFile = (new VerifyingFileFactory.Builder(LOG)
    .warnForRelativePath()
    .failForNonExistingPath()
    .build()).create(path);
    
    Properties cfg = new Properties();
    FileInputStream in = new FileInputStream(configFile); 
    // 加载配置文件
    cfg.load(in);
    configFileStr = path;
    } finally {
    in.close();
    }
    // 解析配置文件
    parseProperties(cfg);
    } catch (IOException e) {
    throw new ConfigException("Error processing " + path, e);
    } catch (IllegalArgumentException e) {
    throw new ConfigException("Error processing " + path, e);
    }
    ... ...
}

// parseProperties(cfg)方法拉到最下面setupQuorumPeerConfig

void setupQuorumPeerConfig(Properties prop, boolean configBackwardCompatibilityMode)
            throws IOException, ConfigException {
    quorumVerifier = parseDynamicConfig(prop, electionAlg, true, configBackwardCompatibilityMode);
    
    setupMyId();
    setupClientPort();
    setupPeerType();
    checkValidity();
}


private void setupMyId() throws IOException {
    File myIdFile = new File(dataDir, "myid");
    // standalone server doesn't need myid file.
    if (!myIdFile.isFile()) {
        return;
    }
    BufferedReader br = new BufferedReader(new FileReader(myIdFile));
    String myIdString;
    try {
        myIdString = br.readLine();
    } finally {
        br.close();
    }
    try {
        // 将解析 myid 文件中的 id 赋值给 serverId
        serverId = Long.parseLong(myIdString);
        MDC.put("myid", myIdString);
    } catch (NumberFormatException e) {
        throw new IllegalArgumentException("serverid " + myIdString
                + " is not a number");
    }
}

2.4 过期快照删除

可以启动定时任务,对过期的快照,执行删除。默认该功能时关闭的

// 2 启动定时任务,对过期的快照,执行删除(默认是关闭)
// config.getSnapRetainCount() = 3 最少保留的快照个数
// config.getPurgeInterval() = 0  默认 0 表示关闭
DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
        .getDataDir(), config.getDataLogDir(), config
        .getSnapRetainCount(), config.getPurgeInterval());
purgeMgr.start();

public void start() {
    if (PurgeTaskStatus.STARTED == purgeTaskStatus) { 
        LOG.warn("Purge task is already running."); return;
    }
    // 默认情况 purgeInterval=0,该任务关闭,直接返回
    // Don't schedule the purge task with zero or negative purge interval.
    if (purgeInterval <= 0) {
        LOG.info("Purge task is not scheduled."); 
        return;
    }
    // 创建一个定时器
    timer = new Timer("PurgeTask", true);
    // 创建一个清理快照任务
    TimerTask task = new PurgeTask(dataLogDir, snapDir, snapRetainCount);
    // 如果 purgeInterval 设置的值是 1,表示 1 小时检查一次,判断是否有过期快照, 有则删除
    timer.scheduleAtFixedRate(task, 0, TimeUnit.HOURS.toMillis(purgeInterval));
    
    purgeTaskStatus = PurgeTaskStatus.STARTED;
}

static class PurgeTask extends TimerTask { 
    private File logsDir;
    private File snapsDir; private int snapRetainCount;
    
    public PurgeTask(File dataDir, File snapDir, int count) { 
    logsDir = dataDir;
    snapsDir = snapDir; snapRetainCount = count;
}

    @Override
    public void run() {
        LOG.info("Purge task started."); 
        try {
            // 清理过期的数据
            PurgeTxnLog.purge(logsDir, snapsDir, snapRetainCount);
        } catch (Exception e) {
            LOG.error("Error occurred while purging.", e);
        }
            LOG.info("Purge task completed.");
    }
}

public static void purge(File dataDir, File snapDir, int num) throws IOException { 
    if (num < 3) {
        throw new IllegalArgumentException(COUNT_ERR_MSG);
    }
    FileTxnSnapLog txnLog = new FileTxnSnapLog(dataDir, snapDir); 
    List<File> snaps = txnLog.findNRecentSnapshots(num);
    int numSnaps = snaps.size(); 
    if (numSnaps > 0) {
        purgeOlderSnapshots(txnLog, snaps.get(numSnaps - 1));
    }
}

2.5 初始化通信组件

if (args.length == 1 && config.isDistributed()) {
    runFromConfig(config);
} else {
    LOG.warn("Either no config or no quorum defined in config, running "
            + " in standalone mode");
    // there is only server in the quorum -- run as standalone
    ZooKeeperServerMain.main(args);
}

// 通信协议默认 NIO
public void runFromConfig(QuorumPeerConfig config)throws IOException, AdminServerException{
......
LOG.info("Starting quorum peer");
try {
    ServerCnxnFactory cnxnFactory = null;
    ServerCnxnFactory secureCnxnFactory = null;
    // 通信组件初始化,默认是 NIO 通信
    if (config.getClientPortAddress() != null) {
        // zookeeperAdmin.md 文件中
        //Default is `NIOServerCnxnFactory
        cnxnFactory = ServerCnxnFactory.createFactory();
        cnxnFactory.configure(config.getClientPortAddress(),
        config.getMaxClientCnxns(), false);
    }
    if (config.getSecureClientPortAddress() != null) {
        secureCnxnFactory = ServerCnxnFactory.createFactory();
        secureCnxnFactory.configure(config.getSecureClientPortAddress(),
        config.getMaxClientCnxns(), true);
    }
    // 把解析的参数赋值给该 zookeeper 节点
    quorumPeer = getQuorumPeer();
    quorumPeer.setTxnFactory(new FileTxnSnapLog(
    config.getDataLogDir(),
    config.getDataDir()));
    quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());
    quorumPeer.enableLocalSessionsUpgrading(
    config.isLocalSessionsUpgradingEnabled());
    //quorumPeer.setQuorumPeers(config.getAllMembers());
    quorumPeer.setElectionType(config.getElectionAlg());
    quorumPeer.setMyid(config.getServerId());
    quorumPeer.setTickTime(config.getTickTime());
    quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
    quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
    quorumPeer.setInitLimit(config.getInitLimit());
    quorumPeer.setSyncLimit(config.getSyncLimit());
    quorumPeer.setConfigFileName(config.getConfigFilename());
    // 管理 zk 数据的存储
    quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
    quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
    if (config.getLastSeenQuorumVerifier()!=null) {
        quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(),false);
    }
   quorumPeer.initConfigInZKDatabase();
   // 管理 zk 的通信
   quorumPeer.setCnxnFactory(cnxnFactory);
   quorumPeer.setSecureCnxnFactory(secureCnxnFactory);
   quorumPeer.setSslQuorum(config.isSslQuorum());
   quorumPeer.setUsePortUnification(config.shouldUsePortUnification());
   quorumPeer.setLearnerType(config.getPeerType());
   quorumPeer.setSyncEnabled(config.