etcd的分布式锁
在分布式系统中,通常需要使用分布式锁解决跨网络的多线程的共享资源竞争问题。
使用etcd实现分布式锁可以确保确保了锁的安全性、互斥性,同时可以通过Lease确保持有锁的client的活性,当client发送宕机或者网络分区的时候,不会出现死锁。并且基于Raft保证了数据的强一致性,不会因为etcd实例宕机出现锁数据丢失的问题。
这里介绍一下clientv3提供的concurrency包里的分布式锁的实现原理和使用示例。
concurrency分布式锁源码介绍
首先方法创建Session对象,本质是创建了一份租约,可以通过续租保证锁的安全性,当持有锁的client宕机不能主动释放锁的时候,锁会因为租约到期而自动释放,避免产生死锁。
type Session struct {
client *v3.Client
opts *sessionOptions
id v3.LeaseID
cancel context.CancelFunc
donec <-chan struct{}
}
使用Session对象通过 创建了一个Mutex对象,这个对象是分布式锁的核心逻辑,咱们通过源码分析下
type Mutex struct {
s *Session
pfx string //前缀,实际上就是加锁的key + "/"
myKey string //在etcd中真实的key,就是pfx/LeaseID的格式
myRev int64 //创建key的Revision
hdr *pb.ResponseHeader
}
func (m *Mutex) Lock(ctx context.Context) error {
s := m.s
client := m.s.Client()
//根据要加锁资源的key和租约ID为这个锁创建一个myKey
m.myKey = fmt.Sprintf("%s%x", m.pfx, s.Lease())
//使用事务,如果myKey的CreateRevision=0,则代表key不存在,就创建一个,如果存在就获取myKey的信息和同样prefix中Revision最小的key,
//同样prefix代表锁的是同一个资源。
cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0)
put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease()))
get := v3.OpGet(m.myKey)
getOwner := v3.OpGet(m.pfx, v3.WithFirstCreate()...)
resp, err := client.Txn(ctx).If(cmp).Then(put, getOwner).Else(get, getOwner).Commit()
if err != nil {
return err
}
//如果myKey已经存在则代表是同一个租约再次请求锁,实现了可重入锁
m.myRev = resp.Header.Revision
if !resp.Succeeded {
m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision
}
//如果myKey的Revision是同样prefix的key中最小的,就直接获取到锁。
ownerKey := resp.Responses[1].GetResponseRange().Kvs
if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {
m.hdr = resp.Header
return nil
}
//监听等待比自己Revision小的key释放锁
hdr, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1)
//如果等待失败,释放锁键
if werr != nil {
m.Unlock(client.Ctx())
} else {
m.hdr = hdr
}
return werr
}
分布式锁的demo
下面是ectd分布式锁的应用,通过两个goroutine模拟两个client同时请求锁,可以看到后面的请求是阻塞的,直到持有锁的client释放。
package main
import (
"log"
"time"
"/coreos/etcd/clientv3"
"/coreos/etcd/clientv3/concurrency"
"context"
"fmt"
)
func main(){
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: time.Second * 5,
})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
s1, err := concurrency.NewSession(cli, concurrency.WithTTL(5))
if err != nil {
log.Fatal(err)
}
defer s1.Close()
//会话1上锁成功,然后开启goroutine去新建一个会话去上锁,5秒钟后会话1解锁。
m1 := concurrency.NewMutex(s1, "mylock")
if err := m1.Lock(context.TODO()); err != nil {
log.Fatal(err)
}
fmt.Printf("session1 上锁成功。 time:%d \n",time.Now().Unix())
g2 := make(chan struct{})
go func() {
defer close(g2)
s2, err := concurrency.NewSession(cli, concurrency.WithTTL(5))
if err != nil {
log.Fatal(err)
}
defer s2.Close()
m2 := concurrency.NewMutex(s2, "mylock")
if err := m2.Lock(context.TODO()); err != nil {
log.Fatal(err)
}
fmt.Printf("session2 上锁成功。 time:%d \n",time.Now().Unix())
if err := m2.Unlock(context.TODO()); err != nil {
log.Fatal(err)
}
fmt.Printf("session2 解锁。 time:%d \n",time.Now().Unix())
}()
time.Sleep(5 * time.Second)
if err := m1.Unlock(context.TODO()); err != nil {
log.Fatal(err)
}
fmt.Printf("session1 解锁。 time:%d \n",time.Now().Unix())
<- g2
}
/**
结果输出:
session1 上锁成功。 time:1641454818
session1 解锁。 time:1641454823
session2 上锁成功。 time:1641454823
session2 解锁。 time:1641454823
*/