Zookeeper系列四:Zookeeper在大型分布式系统中的应用、Zookeeper实现分布式锁

时间:2024-01-21 14:22:29

一、Zookeeper在大型分布式系统中的应用

1. 有些分布式系统是master-slave模式的,master是一个单节点,一旦master挂掉了整个集群就挂掉了,所以一般master都会有一个备份master-back,一旦master挂掉了,备份master就会顶上去

那么ZK是如何实现的呢?

前置条件:

统一的一个临时节点:TemporaryNode(/ds/TemporaryNode仅仅这样一个节点)

第一步:zk有这样一个持久节点/ds

第二步:master1和master2同时启动,同时向/ds这个持久节点申请创建临时子节点TemporaryNode(同一时间只有一个请求能够创建成功)。

如果master1创建成功,这个节点(TemporaryNode)就不允许master2创建(锁的机制)

master1的状态变为active,真正的master。路径:/ds/TemporaryNode

master2的状态变为standby(master-back)。

master2同时对节点/ds/TemporaryNode注册事件监听。

第三步:master1挂掉或者超过一定时间没有响应。TemporaryNode节点会被删除(master2注册的事件机制就会起作用),就会通知master2,master2就会创建临时节点/ds/TemporaryNode,同时修改状态为active。

备注:

假如master1并没有挂掉,只有由于网络延时导致,当网络顺畅的时候就会出现“脑裂”状态。都认为自己是active。出现两个master

解决脑裂的办法:对/ds/TemporaryNode加一个权限ACL控制(节点删除以后,权限同时也不在了)。master1对于这个节点/ds/TemporaryNode没有权限。自己把状态改成standby。

实际的案例1:Hadoop(NameNode、ResourceManager),普通的部署NameNode、ResourceManager仅仅是单节点。Hadoop HA(NameNode和ResourceManager有多个备份)

二、Zookeeper实现分布式锁

分布式锁主要用于在分布式环境中包括跨主机、跨进程、跨网络,导致共享资源不一致的问题,保证数据的一致性。

1. 分布式锁的实现思路

说明:

这种实现会有一个缺点,即当有很多进程在等待锁的时候,在释放锁的时候会有很多进程就过来争夺锁,这种现象称为 “惊群效应”

2. 分布式锁优化后的实现思路

 

3. Zookeeper分布式锁的代码实现

准备工作:

1)安装Zookeeper,具体参考我前面的我文章Zookeeper系列一:Zookeeper介绍、Zookeeper安装配置、ZK Shell的使用

2)新建一个maven项目ZK-Demo,然后在pom.xml里面引入相关的依赖

        <dependency>
            <groupId>com.101tec</groupId>
            <artifactId>zkclient</artifactId>
            <version>0.10</version>
        </dependency>

3.1 Zookeeper分布式锁的核心代码实现

实现逻辑参考“2. 分布式锁优化后的实现思路”中的流程图

package com.study.demo.lock;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.SerializableSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* 
* @Description: Zookeeper分布式锁的核心代码实现
* @author leeSmall
* @date 2018年9月4日
*
*/
public class DistributedLock implements Lock {
    private static Logger logger = LoggerFactory.getLogger(DistributedLock.class);

    private static final String ZOOKEEPER_IP_PORT = "192.168.152.130:2181";
    private static final String LOCK_PATH = "/LOCK";

    private ZkClient client = new ZkClient(ZOOKEEPER_IP_PORT, 4000, 4000, new SerializableSerializer());

    private CountDownLatch cdl;

    private String beforePath;// 当前请求的节点前一个节点
    private String currentPath;// 当前请求的节点

    // 判断有没有LOCK目录,没有则创建
    public DistributedLock() {
        if (!this.client.exists(LOCK_PATH)) {
            this.client.createPersistent(LOCK_PATH);
        }
    }

    public void lock() {
        //尝试去获取分布式锁失败
        if (!tryLock()) {
            //对次小节点进行监听
            waitForLock();
            lock();
        } 
        else {
            logger.info(Thread.currentThread().getName() + " 获得分布式锁!");
        }
    }
    
    public boolean tryLock() {
        // 如果currentPath为空则为第一次尝试加锁,第一次加锁赋值currentPath
        if (currentPath == null || currentPath.length() <= 0) {
            // 创建一个临时顺序节点
            currentPath = this.client.createEphemeralSequential(LOCK_PATH + '/', "lock");
            System.out.println("---------------------------->" + currentPath);
        }

        // 获取所有临时节点并排序,临时节点名称为自增长的字符串如:0000000400
        List<String> childrens = this.client.getChildren(LOCK_PATH);
        //由小到大排序所有子节点
        Collections.sort(childrens);
        //判断创建的子节点/LOCK/Node-n是否最小,即currentPath,如果当前节点等于childrens中的最小的一个就占用锁
        if (currentPath.equals(LOCK_PATH + '/' + childrens.get(0))) {
            return true;
        } 
        //找出比创建的临时顺序节子节点/LOCK/Node-n次小的节点,并赋值给beforePath
        else {
            int wz = Collections.binarySearch(childrens, currentPath.substring(6));
            beforePath = LOCK_PATH + '/' + childrens.get(wz - 1);
        }

        return false;
    }

    //等待锁,对次小节点进行监听
    private void waitForLock() {
        IZkDataListener listener = new IZkDataListener() {
            public void handleDataDeleted(String dataPath) throws Exception {
                logger.info(Thread.currentThread().getName() + ":捕获到DataDelete事件!---------------------------");
                if (cdl != null) {
                    cdl.countDown();
                }
            }

            public void handleDataChange(String dataPath, Object data) throws Exception {

            }
        };

        // 对次小节点进行监听,即beforePath-给排在前面的的节点增加数据删除的watcher
        this.client.subscribeDataChanges(beforePath, listener);
        if (this.client.exists(beforePath)) {
            cdl = new CountDownLatch(1);
            try {
                cdl.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        this.client.unsubscribeDataChanges(beforePath, listener);
    }
    
    //完成业务逻辑以后释放锁
    public void unlock() {
        // 删除当前临时节点
        client.delete(currentPath);
    }

    // ==========================================
    public void lockInterruptibly() throws InterruptedException {

    }

    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return false;
    }

    public Condition newCondition() {
        return null;
    }
}

 3.2 在业务里面使用分布式锁

package com.study.demo.lock;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.Lock;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* 
* @Description: 在业务里面使用分布式锁
* @author leeSmall
* @date 2018年9月4日
*
*/
public class OrderServiceImpl implements Runnable {
    private static OrderCodeGenerator ong = new OrderCodeGenerator();

    private Logger logger = LoggerFactory.getLogger(OrderServiceImpl.class);
    // 同时并发的线程数
    private static final int NUM = 10;
    // 按照线程数初始化倒计数器,倒计数器
    private static CountDownLatch cdl = new CountDownLatch(NUM);

    private Lock lock = new DistributedLock();

    // 创建订单接口
    public void createOrder() {
        String orderCode = null;

        //准备获取锁
        lock.lock();
        try {
            // 获取订单编号
            orderCode = ong.getOrderCode();
        } catch (Exception e) {
            // TODO: handle exception
        } finally {
            //完成业务逻辑以后释放锁
            lock.unlock();
        }

        // ……业务代码

        logger.info("insert into DB使用id:=======================>" + orderCode);
    }

    
    public void run() {
        try {
            // 等待其他线程初始化
            cdl.await();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        // 创建订单
        createOrder();
    }

    public static void main(String[] args) {
        for (int i = 1; i <= NUM; i++) {
            // 按照线程数迭代实例化线程
            new Thread(new OrderServiceImpl()).start();
            // 创建一个线程,倒计数器减1
            cdl.countDown();
        }
    }
}
package com.study.demo.lock;

import java.text.SimpleDateFormat;
import java.util.Date;

public class OrderCodeGenerator {
    // 自增长序列
    private static int i = 0;

    // 按照“年-月-日-小时-分钟-秒-自增长序列”的规则生成订单编号
    public String getOrderCode() {
        Date now = new Date();
        SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");
        return sdf.format(now) + ++i;
    }

}