【ZooKeeper学习笔记】-4. ZooKeeper分布式锁

时间:2024-07-13 22:40:40

4.1 ZooKeeper分布式锁原理

  • 核心思想:当用户获取到锁时就创建节点,使用完锁就删除节点
  1. 每当一个用户想要获取锁时就在/lock节点下创建一个 **临时顺序 **节点
  2. 然后获取/lock节点下的全部子节点,如果发现当前节点编号是最小的,则该节点对应的客户端获取到锁,使用完锁后,删除该节点
  3. 如果发现节点编号不是最小的,则对前一个比自己小的编号节点,并注册事件监听器,监听删除事件
  4. 如果后续发现比自己小的节点被删除,则客户端会接收到来自ZooKeeper的通知,然后再次判断所对应节点编号是否是最小的,重复上述步骤

注意:这里创建临时节点是因为防止获取到锁的客户端宕机了,进而导致锁永远不会被删的情况;这是创建顺序节点是方便编号的排序

Cutator提供了下面五种分布式锁的方式:

  • InterProcessMutex(分布式可重入排他锁)
  • InterProcessSemaphoreMutex(分布式不可重入排他锁)
  • InterProcessReadWriteLock(分布式读写锁)
  • InterProcessMutliLock(将多个锁作为单个实体管理的容器)
  • InterProcessSemaphoreV2(共享信号量)

4.2 分布式锁实战(模拟12306抢票)

代码如下:

package org.example;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;

import java.util.concurrent.TimeUnit;

public class ZooKeeperLockTest {

    private static int tickets = 10; // 票数

    public static void main(String[] args) {
        // 建立连接
        CuratorFramework client = CuratorFrameworkFactory
                .builder()
                .connectString("127.0.0.1:2181")
                .sessionTimeoutMs(3000)
                .connectionTimeoutMs(3000)
                .retryPolicy(new ExponentialBackoffRetry(3000, 1))
                .namespace("")
                .build();
        client.start();
        // 获取分布式锁
        InterProcessMutex lock = new InterProcessMutex(client, "/lock");
        Thread t1 = new Thread(() -> {
            while (true) {
                try {
                    boolean hasLock = lock.acquire(3, TimeUnit.SECONDS);
                    if (hasLock && tickets > 0) {
                        // 不断抢票
                        System.out.println("线程" + Thread.currentThread().getName() + "抢到了当前第" + tickets + "张票");
                        tickets--;
                        if (tickets <= 0) {
                            break;
                        }
                    }
                } catch (Exception e) {
                    throw new RuntimeException(e);
                } finally {
                    try {
                        lock.release();
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                }
            }
        }, "携程");

        Thread t2 = new Thread(() -> {
            while (true) {
                try {
                    boolean hasLock = lock.acquire(3, TimeUnit.SECONDS);
                    if (hasLock && tickets > 0) {
                        // 不断抢票
                        System.out.println("线程" + Thread.currentThread().getName() + "抢到了当前第" + tickets + "张票");
                        tickets--;
                        if (tickets <= 0) {
                            break;
                        }
                    }
                } catch (Exception e) {
                    throw new RuntimeException(e);
                } finally {
                    try {
                        lock.release();
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                }
            }
        }, "飞猪");
        t1.start();
        t2.start();
    }
}