go etcd分布式锁的实现和使用

时间:2025-02-21 08:27:10

etcd的分布式锁

在分布式系统中,通常需要使用分布式锁解决跨网络的多线程的共享资源竞争问题。
使用etcd实现分布式锁可以确保确保了锁的安全性、互斥性,同时可以通过Lease确保持有锁的client的活性,当client发送宕机或者网络分区的时候,不会出现死锁。并且基于Raft保证了数据的强一致性,不会因为etcd实例宕机出现锁数据丢失的问题。
这里介绍一下clientv3提供的concurrency包里的分布式锁的实现原理和使用示例。

concurrency分布式锁源码介绍

首先方法创建Session对象,本质是创建了一份租约,可以通过续租保证锁的安全性,当持有锁的client宕机不能主动释放锁的时候,锁会因为租约到期而自动释放,避免产生死锁。

type Session struct {
	client *v3.Client
	opts   *sessionOptions
	id     v3.LeaseID

	cancel context.CancelFunc
	donec  <-chan struct{}
}

使用Session对象通过 创建了一个Mutex对象,这个对象是分布式锁的核心逻辑,咱们通过源码分析下

type Mutex struct {
	s *Session

	pfx   string //前缀,实际上就是加锁的key + "/"
	myKey string //在etcd中真实的key,就是pfx/LeaseID的格式
	myRev int64 //创建key的Revision
	hdr   *pb.ResponseHeader
}


func (m *Mutex) Lock(ctx context.Context) error {
	s := m.s
	client := m.s.Client()

    //根据要加锁资源的key和租约ID为这个锁创建一个myKey
	m.myKey = fmt.Sprintf("%s%x", m.pfx, s.Lease())
	//使用事务,如果myKey的CreateRevision=0,则代表key不存在,就创建一个,如果存在就获取myKey的信息和同样prefix中Revision最小的key,
	//同样prefix代表锁的是同一个资源。
	cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0)
	put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease()))
	get := v3.OpGet(m.myKey)
	getOwner := v3.OpGet(m.pfx, v3.WithFirstCreate()...)
	resp, err := client.Txn(ctx).If(cmp).Then(put, getOwner).Else(get, getOwner).Commit()
	if err != nil {
		return err
	}

	//如果myKey已经存在则代表是同一个租约再次请求锁,实现了可重入锁
	m.myRev = resp.Header.Revision
	if !resp.Succeeded {
		m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision
	}


    //如果myKey的Revision是同样prefix的key中最小的,就直接获取到锁。
	ownerKey := resp.Responses[1].GetResponseRange().Kvs
	if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {
		m.hdr = resp.Header
		return nil
	}

	//监听等待比自己Revision小的key释放锁
	hdr, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1)
	//如果等待失败,释放锁键
	if werr != nil {
		m.Unlock(client.Ctx())
	} else {
		m.hdr = hdr
	}
	return werr
}

分布式锁的demo

下面是ectd分布式锁的应用,通过两个goroutine模拟两个client同时请求锁,可以看到后面的请求是阻塞的,直到持有锁的client释放。


package main

import (
	"log"
	"time"
	"/coreos/etcd/clientv3"
	"/coreos/etcd/clientv3/concurrency"
	"context"
	"fmt"
)

func main(){
	cli, err := clientv3.New(clientv3.Config{
		Endpoints:   []string{"127.0.0.1:2379"},
		DialTimeout: time.Second * 5,
	})
	if err != nil {
		log.Fatal(err)
	}
	defer cli.Close()
	s1, err := concurrency.NewSession(cli, concurrency.WithTTL(5))
	if err != nil {
		log.Fatal(err)
	}
	defer s1.Close()

	//会话1上锁成功,然后开启goroutine去新建一个会话去上锁,5秒钟后会话1解锁。
	m1 := concurrency.NewMutex(s1, "mylock")
	if err := m1.Lock(context.TODO()); err != nil {
		log.Fatal(err)
	}
	fmt.Printf("session1 上锁成功。 time:%d \n",time.Now().Unix())

	g2 := make(chan struct{})
	go func() {
		defer close(g2)
		s2, err := concurrency.NewSession(cli, concurrency.WithTTL(5))
		if err != nil {
			log.Fatal(err)
		}
		defer s2.Close()
		m2 := concurrency.NewMutex(s2, "mylock")
		if err := m2.Lock(context.TODO()); err != nil {
			log.Fatal(err)
		}
		fmt.Printf("session2 上锁成功。 time:%d \n",time.Now().Unix())
		if err := m2.Unlock(context.TODO()); err != nil {
			log.Fatal(err)
		}
		fmt.Printf("session2 解锁。 time:%d \n",time.Now().Unix())

	}()

	time.Sleep(5 * time.Second)
	if err := m1.Unlock(context.TODO()); err != nil {
		log.Fatal(err)
	}
	fmt.Printf("session1 解锁。 time:%d \n",time.Now().Unix())
	<- g2
}

/**
结果输出:
session1 上锁成功。 time:1641454818 
session1 解锁。 time:1641454823 
session2 上锁成功。 time:1641454823 
session2 解锁。 time:1641454823 
*/

相关文章