一、Zookeeper在大型分布式系统中的应用
1. 有些分布式系统是master-slave模式的,master是一个单节点,一旦master挂掉了整个集群就挂掉了,所以一般master都会有一个备份master-back,一旦master挂掉了,备份master就会顶上去
那么ZK是如何实现的呢?
前置条件:
统一的一个临时节点:TemporaryNode(/ds/TemporaryNode仅仅这样一个节点)
第一步:zk有这样一个持久节点/ds
第二步:master1和master2同时启动,同时向/ds这个持久节点申请创建临时子节点TemporaryNode(同一时间只有一个请求能够创建成功)。
如果master1创建成功,这个节点(TemporaryNode)就不允许master2创建(锁的机制)
master1的状态变为active,真正的master。路径:/ds/TemporaryNode
master2的状态变为standby(master-back)。
master2同时对节点/ds/TemporaryNode注册事件监听。
第三步:master1挂掉或者超过一定时间没有响应。TemporaryNode节点会被删除(master2注册的事件机制就会起作用),就会通知master2,master2就会创建临时节点/ds/TemporaryNode,同时修改状态为active。
备注:
假如master1并没有挂掉,只有由于网络延时导致,当网络顺畅的时候就会出现“脑裂”状态。都认为自己是active。出现两个master
解决脑裂的办法:对/ds/TemporaryNode加一个权限ACL控制(节点删除以后,权限同时也不在了)。master1对于这个节点/ds/TemporaryNode没有权限。自己把状态改成standby。
实际的案例1:Hadoop(NameNode、ResourceManager),普通的部署NameNode、ResourceManager仅仅是单节点。Hadoop HA(NameNode和ResourceManager有多个备份)
二、Zookeeper实现分布式锁
分布式锁主要用于在分布式环境中包括跨主机、跨进程、跨网络,导致共享资源不一致的问题,保证数据的一致性。
1. 分布式锁的实现思路
说明:
这种实现会有一个缺点,即当有很多进程在等待锁的时候,在释放锁的时候会有很多进程就过来争夺锁,这种现象称为 “惊群效应”
2. 分布式锁优化后的实现思路
3. Zookeeper分布式锁的代码实现
准备工作:
1)安装Zookeeper,具体参考我前面的我文章Zookeeper系列一:Zookeeper介绍、Zookeeper安装配置、ZK Shell的使用
2)新建一个maven项目ZK-Demo,然后在pom.xml里面引入相关的依赖
<dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.10</version> </dependency>
3.1 Zookeeper分布式锁的核心代码实现
实现逻辑参考“2. 分布式锁优化后的实现思路”中的流程图
package com.study.demo.lock; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import org.I0Itec.zkclient.IZkDataListener; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.serialize.SerializableSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * * @Description: Zookeeper分布式锁的核心代码实现 * @author leeSmall * @date 2018年9月4日 * */ public class DistributedLock implements Lock { private static Logger logger = LoggerFactory.getLogger(DistributedLock.class); private static final String ZOOKEEPER_IP_PORT = "192.168.152.130:2181"; private static final String LOCK_PATH = "/LOCK"; private ZkClient client = new ZkClient(ZOOKEEPER_IP_PORT, 4000, 4000, new SerializableSerializer()); private CountDownLatch cdl; private String beforePath;// 当前请求的节点前一个节点 private String currentPath;// 当前请求的节点 // 判断有没有LOCK目录,没有则创建 public DistributedLock() { if (!this.client.exists(LOCK_PATH)) { this.client.createPersistent(LOCK_PATH); } } public void lock() { //尝试去获取分布式锁失败 if (!tryLock()) { //对次小节点进行监听 waitForLock(); lock(); } else { logger.info(Thread.currentThread().getName() + " 获得分布式锁!"); } } public boolean tryLock() { // 如果currentPath为空则为第一次尝试加锁,第一次加锁赋值currentPath if (currentPath == null || currentPath.length() <= 0) { // 创建一个临时顺序节点 currentPath = this.client.createEphemeralSequential(LOCK_PATH + '/', "lock"); System.out.println("---------------------------->" + currentPath); } // 获取所有临时节点并排序,临时节点名称为自增长的字符串如:0000000400 List<String> childrens = this.client.getChildren(LOCK_PATH); //由小到大排序所有子节点 Collections.sort(childrens); //判断创建的子节点/LOCK/Node-n是否最小,即currentPath,如果当前节点等于childrens中的最小的一个就占用锁 if (currentPath.equals(LOCK_PATH + '/' + childrens.get(0))) { return true; } //找出比创建的临时顺序节子节点/LOCK/Node-n次小的节点,并赋值给beforePath else { int wz = Collections.binarySearch(childrens, currentPath.substring(6)); beforePath = LOCK_PATH + '/' + childrens.get(wz - 1); } return false; } //等待锁,对次小节点进行监听 private void waitForLock() { IZkDataListener listener = new IZkDataListener() { public void handleDataDeleted(String dataPath) throws Exception { logger.info(Thread.currentThread().getName() + ":捕获到DataDelete事件!---------------------------"); if (cdl != null) { cdl.countDown(); } } public void handleDataChange(String dataPath, Object data) throws Exception { } }; // 对次小节点进行监听,即beforePath-给排在前面的的节点增加数据删除的watcher this.client.subscribeDataChanges(beforePath, listener); if (this.client.exists(beforePath)) { cdl = new CountDownLatch(1); try { cdl.await(); } catch (InterruptedException e) { e.printStackTrace(); } } this.client.unsubscribeDataChanges(beforePath, listener); } //完成业务逻辑以后释放锁 public void unlock() { // 删除当前临时节点 client.delete(currentPath); } // ========================================== public void lockInterruptibly() throws InterruptedException { } public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return false; } public Condition newCondition() { return null; } }
3.2 在业务里面使用分布式锁
package com.study.demo.lock; import java.util.concurrent.CountDownLatch; import java.util.concurrent.locks.Lock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * * @Description: 在业务里面使用分布式锁 * @author leeSmall * @date 2018年9月4日 * */ public class OrderServiceImpl implements Runnable { private static OrderCodeGenerator ong = new OrderCodeGenerator(); private Logger logger = LoggerFactory.getLogger(OrderServiceImpl.class); // 同时并发的线程数 private static final int NUM = 10; // 按照线程数初始化倒计数器,倒计数器 private static CountDownLatch cdl = new CountDownLatch(NUM); private Lock lock = new DistributedLock(); // 创建订单接口 public void createOrder() { String orderCode = null; //准备获取锁 lock.lock(); try { // 获取订单编号 orderCode = ong.getOrderCode(); } catch (Exception e) { // TODO: handle exception } finally { //完成业务逻辑以后释放锁 lock.unlock(); } // ……业务代码 logger.info("insert into DB使用id:=======================>" + orderCode); } public void run() { try { // 等待其他线程初始化 cdl.await(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } // 创建订单 createOrder(); } public static void main(String[] args) { for (int i = 1; i <= NUM; i++) { // 按照线程数迭代实例化线程 new Thread(new OrderServiceImpl()).start(); // 创建一个线程,倒计数器减1 cdl.countDown(); } } }
package com.study.demo.lock; import java.text.SimpleDateFormat; import java.util.Date; public class OrderCodeGenerator { // 自增长序列 private static int i = 0; // 按照“年-月-日-小时-分钟-秒-自增长序列”的规则生成订单编号 public String getOrderCode() { Date now = new Date(); SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss"); return sdf.format(now) + ++i; } }