C#基于Redis实现分布式锁

时间:2024-03-08 18:31:58

  【本博客属于原创,如需转载,请注明出处:https://www.cnblogs.com/gdouzz/p/12097968.html

  最近研究库存的相关,在高峰期经常出现超卖等等情况,最后根据采用是基于Redis来实现了分布式锁,特此拿出来和大家分享。

  准备工作:centos7,Redis,Nginx,以及JMeter测试工具。

  分布式锁的引出

  在传统的程序中,我们写了如下最简单对库存操作的代码如下:

  下面是基于AspNetCore.WebAPI 创建的一个对库存进行操作(减少)的接口,我相信很多同志都能够写出这种加lock来保证高并发的时候,库存不会出现超卖,这种做法的性能问题,不属于我们这篇文章的讨论范围,我们要讨论的是,这种写法到底会不会造成超卖的情况出现呢?,如果这是传统的企业内部应用,单体架构,如下图所示,也能满足需求;   

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using StackExchange.Redis;

namespace RedisDistributedLockMvc.Controllers
{
    [Route("api/inv")]
    [ApiController]
    public class InvController : ControllerBase
    {

        private static readonly object LockObject = new object();
        private static readonly ConfigurationOptions Options = new ConfigurationOptions()
        {
            EndPoints = { { "192.168.232.132", 6379 } },
            Password = "123456"
        };


        [HttpGet]
        public ActionResult<string> Get()
        {
            string msg = null;
            lock (LockObject)
            {
                int invQty = GetInvQty();
                if (invQty > 0)
                {
                    invQty = invQty - 1;
                    SetInvQty(invQty);
                    msg = $"扣减成功,当前库存:{invQty}"; 
                }
                else
                {
                    msg = "扣减失败,库存不足";
                }
            }
            Console.WriteLine(msg);
            return msg;
        }


        private int GetInvQty()
        {
            var qty = 0;
            using (var conn = ConnectionMultiplexer.Connect(Options))
            {
                var db = conn.GetDatabase();
                qty = Convert.ToInt32(db.StringGet("InvQty"));
            }
            return qty;
        }

        private void SetInvQty(int qty)
        {
            using (var conn = ConnectionMultiplexer.Connect(Options))
            {
                var db = conn.GetDatabase();
                db.StringSet("InvQty", qty);
            }
        }
    }
}

      随着业务的越来越复杂,这种单体架构的形式,已经满足不了我们的正常业务需求,很多公司演变成了下面这种架构模式;

  下面是我在测试环境搭建的过程(为了让自己有更深刻的体会,我建议大家按照上面的架构图搭建一个简单的环境);

  1、把上面的AspNetCore代码发布到Centos 7机器上,分别指向该机器的不同端口(5000,5001),等同于部署了两份;

  2、在Centos 7机器上安装Nginx,然后修改nginx配置文件,指向刚刚配置配置的地址;

  特别说明:如果觉得上面操作很难,在Centos中,可以通过yum源来安装相应的软件,通过使用xshell来编写命令,如果是对文件操作的,不熟悉命令可以用WinScp这种软件进行可视化修改完后保存。然后需要特别注意的是,防火墙以及相关的端口和服务是否启动。

  除此之外还要特别留意,Nginx是一个进程,刚刚两个不同的端口,分别对应不同的进程,这三个进程之间的安全是通过叫(seLinux)来管理的,如果nginx配置好之后,外网访问还是报502,可以尝试着把seLinux关闭(当然不推荐关闭,也有相关的解决方案)。

  当然,如果有同学想尝试这个过程,碰到问题的也可以联系我:QQ:3484677573,说明是在博客园看到的即可。

  搭建环境完毕之后,接下来开始我们的测试;

  测试之前,因为要模拟高并发的环境,我们采用的jmeter作为我们压测的工具,简单的使用教程如下:

  首先从官网下载和安装jmeter,安装完之后,找到安装下的bin目录,找到jmeter.bat,双击即可启动jmeter。

   打开jmeter之后,添加线程组,比较简单,可以指定线程的个数等等。

  接下来再添加一个HttpRequest,如下图所示,根据提示,输入相关的内容,然后点击上面的运行,即可开始测试。

   至此,测试工具也准备OK了,那我们开始测试,先假设我们Redis里面有50个库存;

  接下来在centos启动linux,以及运行我们的服务,如下图所示(5000和50001);

 

   使用jmeter进行压测,调用,看看是否会出现超卖的情况。

   1、先开启50个线程,进行压测,看看输出结果,两个服务输出的结果如下:

  把结果设置成50,确实出现了超卖的情况,只要两台机器输出的当前库存是一样,就说明出现了超卖。

    说明我们最开始的那段代码在分布式环境下,或者在我们最常见的负载均衡部署方式下面是不行的,所以就提出了我们分布式锁的解决方案。

  PS:我以前也觉得上面这种加锁的方法,好像是不能用在分布式环境中,经过上面这么一折腾,我印象更加深刻了,也有了更清晰的认识。

  分布式锁的解决方案

  分布式锁的解决方案,在业界内也有很多,也有很多成熟的框架,下面我们介绍一种基于Redis来实现的分布式锁解决方案;

  先解释一下,前面的做法为什么不行和我们为什么要采取redis来做分布式锁

  1、上面这种Lock属于进程内的锁,当只有一个进程的时候(只部署了一台服务器)是没有问题的,当存在多台服务器的时候,就会出问题;

     2、之所以采取Redis来做分布式锁,Redis是单线程的,当我们有N个请求同时到达的时候,它会通过队列的形式变成串行访问;

     话不多说,直接看代码

  这个版本的分布式锁,我们做了最简单的考虑

  1、锁超时的问题(通过对Redis官网给出的SetNx方法,对应的就是StackExchange.dll里面的 db.StringSet("InvQty222", "111", TimeSpan.FromSeconds(900), When.NotExists, CommandFlags.None);

  2、执行过程中,可能出异常的情况,在finally里面释放锁;

private static readonly object LockObject = new object();
        private static readonly ConfigurationOptions Options = new ConfigurationOptions()
        {
            EndPoints = { { "192.168.232.132", 6379 } },
            Password = "123456"
        };


        [HttpGet]
        public ActionResult<string> Get()
        {
            #region 用Lock方式实现锁
            //string msg = null;
            //lock (LockObject)
            //{
            //    int invQty = GetInvQty();
            //    if (invQty > 0)
            //    {
            //        invQty = invQty - 1;
            //        SetInvQty(invQty);
            //        msg = $"扣减成功,当前库存:{invQty}";
            //    }
            //    else
            //    {
            //        msg = "扣减失败,库存不足";
            //    }
            //}
            //Console.WriteLine(msg);
            //return msg;
            #endregion

            #region   Redis实现的第一版本分布式锁
            string msg = null;
            var isSuccess = SetLockVersion1("1");
            //如果key存在返回的就是false
            //如果key不存在返回,就set,返回true
            //除此之外,我们还要考虑的是这把锁的超时时间,
            //如果这把锁一直不释放(执行过程卡住了,那么要考虑把锁超时)
            if (isSuccess)
            {
                try
                {
                    int invQty = GetInvQty();
                    if (invQty > 0)
                    {
                        invQty = invQty - 1;
                        SetInvQty(invQty);
                        msg = $"扣减成功,当前库存:{invQty}";
                    }
                    else
                    {
                        msg = "扣减失败,库存不足";
                    }
                }
                finally
                {
                    //还要考虑执行过程中,如果出现了异常,也要把锁给释放掉。
                    UnLockVersion1();  //释放锁;
                }
            }
            else
            {
                msg = "资源正忙,请刷新后重试";
            }
            Console.WriteLine(msg);
            return msg;
            #endregion
        }


        private int GetInvQty()
        {
            var qty = 0;
            using (var conn = ConnectionMultiplexer.Connect(Options))
            {
                var db = conn.GetDatabase();
                qty = Convert.ToInt32(db.StringGet("InvQty"));
            }
            return qty;
        }

        private void SetInvQty(int qty)
        {
            using (var conn = ConnectionMultiplexer.Connect(Options))
            {
                var db = conn.GetDatabase();
                db.StringSet("InvQty", qty);
              
            }
        }

        private bool SetLockVersion1(string value)
        {
            using (var conn = ConnectionMultiplexer.Connect(Options))
            {
                var db = conn.GetDatabase();
                var flag= db.StringSet("LockValue", value, TimeSpan.FromSeconds(900), When.NotExists, CommandFlags.None); //如果存在了返回false,不存在才返回true;
                db.KeyExpire("LockValue", TimeSpan.FromSeconds(10));
                return flag;
            }
        }


        private bool UnLockVersion1()
        {
            using (var conn = ConnectionMultiplexer.Connect(Options))
            {
                var db = conn.GetDatabase();
                return db.KeyDelete("LockValue");
            }
        }

  按照上面的程序,我们再把代码部署到centos上,然后利用jmeter进行压测;

  经过我们这么一折腾,好像超卖的现象没有出现了;但是我们上面的做法还是比较的简单,很多情况都没有考虑在里面,就比如下面这几种情况;

  问题一、锁失效问题,问题根源,A线程加的锁,被B线程释放了。

  

 

    为了解决这种情况,我们可以在进来的时候,存一个clientId到Redis的锁里面,再失效key的时候,判断一下,当前clientId和redis锁里面的值是否一致,如果一致,就才释放。

string msg = null;
            string clientId = Guid.NewGuid().ToString();
            var isSuccess = SetLockVersion1(clientId);
            //如果key存在返回的就是false
            //如果key不存在返回,就set,返回true
            //除此之外,我们还要考虑的是这把锁的超时时间,
            //如果这把锁一直不释放(执行过程卡住了,那么要考虑把锁超时)
            if (isSuccess)
            {
                try
                {
                    int invQty = GetInvQty();
                    if (invQty > 0)
                    {
                        invQty = invQty - 1;
                        SetInvQty(invQty);
                        msg = $"扣减成功,当前库存:{invQty}";
                    }
                    else
                    {
                        msg = "扣减失败,库存不足";
                    }
                }
                finally
                {

                    if (clientId.Equals(GetLockValue()))
                    {
                        //还要考虑执行过程中,如果出现了异常,也要把锁给释放掉。
                        UnLockVersion1();  //释放锁;
                    }
                }
            }
            else
            {
                msg = "资源正忙,请刷新后重试";
            }
            Console.WriteLine(msg);
            return msg;

  问题二:锁超时的问题,如果客户端1需要执行这把锁的时间大于锁设定的超时时间,该怎么做呢

  1、开启一个守护线程(后台线程),假如你锁的设置30秒超时,那你每隔10秒去检查是不是还是client1持有锁,如果是那就延长10秒,类似于Watch dog思路。

  这个守护线程要注意的点就是,如果锁都没人使用了,这个守护线程要及时的关闭,不能一直开启着,如果不需要延长时间即不必要去延长。