基于.NetCore的Redis5.0.3(最新版)快速入门、源码解析、集群搭建与SDK使用【原创】

时间:2021-03-22 06:51:19

1、【基础】redis能带给我们什么福利

Redis(Remote Dictionary Server)官网:https://redis.io/

Redis命令:https://redis.io/commands

Redis is an open source (BSD licensed), in-memory data structure store, used as a database, cache and message broker. It supports data structures such as strings, hashes, lists, sets, sorted sets with range queries, bitmaps, hyperloglogs, geospatial indexes with radius queries and streams. Redis has built-in replication, Lua scripting, LRU eviction, transactions and different levels of on-disk persistence, and provides high availability via Redis Sentinel and automatic partitioning with Redis Cluster.
//-------------------------------------
Redis是一个开源(BSD许可),内存数据结构存储,用作数据库,缓存和消息代理。 它支持数据结构,如字符串,散列,列表,集合,带有范围查询的排序集,位图,超级日志,具有半径查询和流的地理空间索引。 Redis具有内置复制,Lua脚本,LRU驱逐,事务和不同级别的磁盘持久性,并通过Redis Sentinel提供高可用性并使用Redis Cluster自动分区。

1.1、Redis前世今生

  1. 最开始使用本机内存的NativeCache(NativeCache无法分布式共享),随着网站规模越大,我们需要一个分布式的缓存产品,Memcache诞生。
  2. 随着memcache缓存大行其道,互联网规模进一步扩大,对应用程序性能要求越来越高以及应用场景的越来越多 【09年】,比如内存数据库,异构化消息队列 等等,而原来市面上的memcache 暴露了以下几个缺点:

    1. memcache就是一个巨大的hash表,数据结构单一,我们知道编程语言中数据结构类型众多。
      数据结构类型:【List,HashSet, Dictionary, SortDictionary, BitArray, Queue, Stack, SortList。。。。】
    2. memcache 无法持久化,导致只能作为缓存使用,重启之后数据就会丢失。
    3. 无法做到规模化的集群,memcache可以使用 一致性hash 的方式做到一个简单的memcahce集群,非常依赖于客户端实现,也并非无损的。
      set username  jack     hash(username)=8亿 ,沿着顺时针走,碰到的第一个server节点就是要存放的节点。。。
      基于.NetCore的Redis5.0.3(最新版)快速入门、源码解析、集群搭建与SDK使用【原创】

      所以我们非常渴望有一个东西可以解决上面三个问题,自己研发太费时费力,刚好redis就是为了解决这些头疼的问题。

1.2、redis给我们带来了哪些福利

  • 概况
    可以在redis官网上看到,目前redis支持的数据类型之多,非常丰富:
    Redis数据类型 String Bitmap List(双端队列) Set Geo Hash HyperLogLogs Stream SortetSet(SkipList)
    C#数据类型 String BitArray (LinkedList+Stack+Queue+List) HashSet --- Dictionary ---  --- SortDictionary(红黑树)
  • 持久化
    使用AOF追加模式,RDB模式,以及混合模式,既然能缓存,就可以当做一个memroy db使用。
    • AOF: 使用大量的操作命令进行数据恢复。
    • RDB: 内存快照磁盘化。
    • FixMode:混合两种。
  • 集群
    Redis自带的Cluster集群模式,Sentinel 和  第三方豌豆荚的Codis集群搭建。

2、【搭建】使用centos和docker化快速部署

虚拟机CentOS7安装步骤:https://www.cnblogs.com/wyt007/p/10295834.html

XShell6破解版:链接: https://pan.baidu.com/s/1YtnkN4_yAOU5Dc1j69ltrg 提取码: nchp

2.1、centos7平台的部署

  • 安装
    首先到Redis官网获取Redis最新下载地址:http://download.redis.io/releases/redis-5.0.3.tar.gz
    然后在CentOS7上面进行安装
    mkdir /data
    cd /data
    wget http://download.redis.io/releases/redis-5.0.3.tar.gz
    tar xzf redis-5.0..tar.gz
    mv redis-5.0. redis
    cd redis
    make

    如果出现 gcc:命令未找到 ,安装gcc并重新执行 make
    基于.NetCore的Redis5.0.3(最新版)快速入门、源码解析、集群搭建与SDK使用【原创】

    yum -y install gcc automake autoconf libtool make
    //如果以上命令出现[Errno 256] No more mirrors to try.执行下面命令再重新安装gcc
    yum clean all

    如果出现:致命错误:jemalloc/jemalloc.h:没有那个文件或目录,则执行下方命令
    基于.NetCore的Redis5.0.3(最新版)快速入门、源码解析、集群搭建与SDK使用【原创】

    make MALLOC=libc 
  • 这时候我们查看是否成功安装Redis(/data/redis/src/  目录下有无redis-cli 与redis-server),并将它们拷贝到上级文件夹
    cd /data/redis/src/
    cp redis-cli ../
    cp redis-server ../
  • 启动Redis
    [root@localhost src]# cd /data/redis/
    [root@localhost redis]# ./redis-server ./redis-conf

    基于.NetCore的Redis5.0.3(最新版)快速入门、源码解析、集群搭建与SDK使用【原创】

  • 查看端口
    netstat -tlnp

    基于.NetCore的Redis5.0.3(最新版)快速入门、源码解析、集群搭建与SDK使用【原创】

  • 测试存储
    [root@localhost ~]# cd /data/redis/
    [root@localhost redis]# ./redis-cli
    127.0.0.1:> set username jack
    OK
    127.0.0.1:> get username
    "jack"
    127.0.0.1:> dbsize
    (integer)
    127.0.0.1:> keys *
    ) "username"
  • 退出客户端命令
    quit 
  • 配置Redis
    Redis启动完成后是无法进行外网访问的,因此我们需要修改redis.conf

    protect-mode 保护模式
    bind 绑定网卡接口

    bind 127.0.0.1   =>  bind 0.0.0.0
    protected-mode yes => protected-mode no

    现实场景:redis是生产内网部署,对外不开放端口。。。

  • 需要密码验证(可选)
    修改redis.conf默认参数 # requirepass foobared
    连接之后命令 auth <password>
    基于.NetCore的Redis5.0.3(最新版)快速入门、源码解析、集群搭建与SDK使用【原创】
  • 修改文件存储目录rdb + logfile + aof(可选)
    • rdb 修改redis.conf默认参数 dir ./ 文件夹路径
    • logfile 修改redis.conf默认参数  logfile "" 文件名称,可以改成“redis.log”
  • 后台执行 
    修改redis.conf默认参数 daemonize no ,改成 daemonize yes 
    会生成pid文件 /var/run/redis_6379.pid 存放进程号
    [root@localhost redis]# ./redis-server ./redis.conf
    [root@localhost redis]# netstat -tlnp
    Active Internet connections (only servers)
    Proto Recv-Q Send-Q Local Address Foreign Address State PID/Program name
    tcp 0.0.0.0: 0.0.0.0:* LISTEN /./redis-serve
    tcp 0.0.0.0: 0.0.0.0:* LISTEN /systemd
    tcp 0.0.0.0: 0.0.0.0:* LISTEN /X
    tcp 192.168.122.1: 0.0.0.0:* LISTEN /dnsmasq
    tcp 0.0.0.0: 0.0.0.0:* LISTEN /sshd
    tcp 127.0.0.1: 0.0.0.0:* LISTEN /cupsd
    tcp 127.0.0.1: 0.0.0.0:* LISTEN /master
    tcp 127.0.0.1: 0.0.0.0:* LISTEN /sshd: root@pts
    tcp 127.0.0.1: 0.0.0.0:* LISTEN /sshd: root@pt
    tcp 127.0.0.1: 0.0.0.0:* LISTEN /sshd: root@pt
    tcp6 ::: :::* LISTEN /systemd
    tcp6 ::: :::* LISTEN /X
    tcp6 ::: :::* LISTEN /vsftpd
    tcp6 ::: :::* LISTEN /sshd
    tcp6 ::: :::* LISTEN /cupsd
    tcp6 ::: :::* LISTEN /master
    tcp6 ::: :::* LISTEN /sshd: root@pts
    tcp6 ::: :::* LISTEN /sshd: root@pt
    tcp6 ::: :::* LISTEN /sshd: root@pt
    [root@localhost redis]# tail /var/run/redis_6379.pid

2.2、docker上进行部署

Docker安装步骤:https://www.cnblogs.com/wyt007/p/10295834.html

  • 启动Docker
    service docker start
  • 列出容器内容
    docker ps

    基于.NetCore的Redis5.0.3(最新版)快速入门、源码解析、集群搭建与SDK使用【原创】

    我们可以看到容器内是空的,我们接下来前往DockerHub下载安装redis(部分内容需要*)

  • 安装端口并绑定端口
    我这里是因为已经在虚拟机安装了Redis,占用了redis的6379端口,所以用外网6378端口映射docker6379端口
    安装完成会自动启动
    docker run --name some-redis -p : -d redis

    这时候在再查看Docker容器

    基于.NetCore的Redis5.0.3(最新版)快速入门、源码解析、集群搭建与SDK使用【原创】

    基于.NetCore的Redis5.0.3(最新版)快速入门、源码解析、集群搭建与SDK使用【原创】

    更复杂的配置,应该自己写一个redis.conf,通过docker-compose 部署进去。而不是自己敲命令。
    dockerfile需要拷贝redis.conf

  • 移除docker中的redis
    docker kill 90b45b58a571
    docker rm 90b45b58a571

3、【SDK】C#的sdk快速操作和两款可视化工具介绍

3.1、StackExchange.Redis

github地址:https://github.com/StackExchange/StackExchange.Redis/

使用文档:https://stackexchange.github.io/StackExchange.Redis/

String的应用

web网站上保存用户信息,模拟session。
netcore 中使用redis作为分布式session共享。 {框架集成}

Hash的应用 记录每个店铺的数据库连接串。(分库的场景) key: shopid  value:connectionstring
Set的应用 判断某一个用户是否在黑名单中。 O(1)
List的应用 消息队列  client -> 短信队列 <- 发送处理程序   -> 运营商
  • 安装
    Install-Package StackExchange.Redis
  • 使用示例
    class Program
    {
    static void Main(string[] args)
    {
    ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.181.131:6379");
    IDatabase db = redis.GetDatabase(); //////cookie(ui,sessionid)
    //////redis(sessionid,userinfo)
    //db.StringSet("sessionid", "jack", TimeSpan.FromSeconds(5)); //while (true)
    //{
    // var info = db.StringGet("sessionid"); // Console.WriteLine(info); // Thread.Sleep(1000);
    //} ////key: shopID value: connectionstring
    //db.HashSet("connetions", "1", "mysql://192.168.1.1/mydb");
    //db.HashSet("connetions", "2", "mysql://192.168.1.2/mydb");
    //db.HashSet("connetions", "3", "mysql://192.168.1.3/mydb");
    //db.HashSet("connetions", "4", "mysql://192.168.1.4/mydb");
    //db.HashSet("connetions", "5", "mysql://192.168.1.5/mydb"); //Console.WriteLine(db.HashGet("connetions", "3")); ////黑名单
    //db.SetAdd("blacklist", "1");
    //db.SetAdd("blacklist", "2");
    //db.SetAdd("blacklist", "3");
    //db.SetAdd("blacklist", "4"); //var r = db.SetContains("blacklist", 40); ////消息队列
    //db.ListLeftPush("sms", "18721073333");
    //db.ListLeftPush("sms", "18521073333");
    //db.ListLeftPush("sms", "18121073033"); //Console.WriteLine(db.ListRightPop("sms"));
    //Console.WriteLine(db.ListRightPop("sms"));
    //Console.WriteLine(db.ListRightPop("sms")); Console.ReadKey();
    }
    }
  • asp.net core使用redis存储session
    Session是我们在web开发中经常使用的对象,它默认是存在本机的,但是在ASP.NET Core中我们可以十分方便的将Session的存储介质改为分布式缓存(Redis)或者数据库(SqlServer)。分布式的缓存可以提高ASP.NET Core 应用的性能和可伸缩性 ,尤其是在托管在云中或服务器场环境中
    • 添加引用

      Microsoft.Extensions.Caching.Redis
    • 配置服务
      public void ConfigureServices(IServiceCollection services)
      {
      ... //添加了redis作为分布式缓存
      services.AddDistributedRedisCache(option =>
      {
      option.InstanceName = "session";
      option.Configuration = "192.168.181.131:6379";
      }); //添加session
      services.AddSession(options =>
      {
      //options.IdleTimeout = TimeSpan.FromMinutes(10); //session活期时间
      //options.Cookie.HttpOnly = true;//设为httponly
      }); ...
      } public void Configure(IApplicationBuilder app, IHostingEnvironment env)
      {
      ... //使用session
      app.UseSession(); ...
      }
    • 设置session
      //using Microsoft.AspNetCore.Http;
      HttpContext.Session.SetString("userinfo", "jack");
    • 显示数据
      @using Microsoft.AspNetCore.Http;
      @Context.Session.GetString("userinfo")
      @Context.Session.Id

      基于.NetCore的Redis5.0.3(最新版)快速入门、源码解析、集群搭建与SDK使用【原创】

3.2、可视化操作

4、【SDK】StackExchange强类型工具使用和自己动手封装连接池

4.1、StackExchange.Redis的强类型扩展

为什么要使用强类型扩展?我们可以先看一段代码:

class Program
{
static void Main(string[] args)
{
ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.181.131:6379");
IDatabase db = redis.GetDatabase(); var userModel = new UserModel()
{
UserName = "jack",
Email = "sdfasdf@qq.com",
IsVip = true
};
db.StringSet("userinfo", JsonConvert.SerializeObject(userModel));
var info = db.StringGet("userinfo");
var model = JsonConvert.DeserializeObject<UserModel>(info); Console.ReadKey();
}
} public class UserModel
{
public string UserName { get; set; }
public string Email { get; set; }
public bool IsVip { get; set; }
}

要存储数据先要进行序列化成String,然后进行存储,取出时又要进行反序列化,那么有没有更好的方式来处理这个问题呢? StackExchange.Redis.Extensions 为我们提供了很好的扩展

StackExchange.Redis.Extensions githun地址:https://github.com/imperugo/StackExchange.Redis.Extensions

  • 安装
    Install-Package StackExchange.Redis.Extensions.Core
    Install-Package StackExchange.Redis.Extensions.Newtonsoft //序列化方式
  • 使用
    var cacheClient = new StackExchangeRedisCacheClient(redis,new NewtonsoftSerializer());
    cacheClient.Add("userinfo", userModel);
    var model = cacheClient.Get<UserModel>("userinfo");
  • 完整代码示例
    class Program
    {
    static void Main(string[] args)
    {
    ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.181.131:6379");
    IDatabase db = redis.GetDatabase(); var userModel = new UserModel()
    {
    UserName = "jack",
    Email = "sdfasdf@qq.com",
    IsVip = true
    };
    db.StringSet("userinfo", JsonConvert.SerializeObject(userModel));
    var info = db.StringGet("userinfo");
    var model = JsonConvert.DeserializeObject<UserModel>(info); var cacheClient = new StackExchangeRedisCacheClient(redis,new NewtonsoftSerializer());
    cacheClient.Add("userinfo", userModel);
    model = cacheClient.Get<UserModel>("userinfo"); Console.ReadKey();
    }
    } public class UserModel
    {
    public string UserName { get; set; }
    public string Email { get; set; }
    public bool IsVip { get; set; }
    }
  • 缺点
    功能比底层要慢 + 功能要少。 暂时没有Stream

4.2、StackExchange.Redis连接问题

4.2.1、Socket连接过多的问题导致sdk挂掉

  • 原因描述:作为实例变量,会有什么后果。。。 如果每次调用都new一下,会有太多的socket。。。 频繁的打开和关闭。。
  • 解决办法:
    • 全局唯一的connection
    • 自己定义connection连接池

4.2.2、自定义connection连接池

  • 创建连接池 RedisConnectionPool.cs
    public class RedisConnectionPool
    {
    private static ConcurrentQueue<ConnectionMultiplexer> connectionPoolQueue = new ConcurrentQueue<ConnectionMultiplexer>(); private static int minConnectionNum;
    private static int maxConnectionNum; private static string host;
    private static int port; //通过构造函数 或者 config形式 获取 max,min host,port
    public static void InitializeConnectionPool()
    {
    minConnectionNum = ; maxConnectionNum = ; host = "192.168.181.131";
    port = ; for (int i = ; i < minConnectionNum; i++)
    {
    var client = OpenConnection(host, port); PushConnection(client);
    } Console.WriteLine($"{0} 个 connection 初始化完毕!");
    } /*
    * 1. 如果说池中没有connection了,那么你需要OpenConnection
    *
    * 2. 如果池中获取到了connection,并且isConnected=false,那么直接close
    *
    */
    public static ConnectionMultiplexer GetConnection()
    {
    while (connectionPoolQueue.Count > )
    {
    connectionPoolQueue.TryDequeue(out ConnectionMultiplexer client); if (!client.IsConnected)
    {
    client.Close();
    continue;
    } return client;
    } return OpenConnection(host, port);
    } /// <summary>
    /// 1. 如果 queue的个数 >=max 直接踢掉
    ///
    /// 2. client的IsConnected 如果为false, close
    /// </summary>
    /// <param name="client"></param>
    /// <returns></returns> public static void PushConnection(ConnectionMultiplexer client)
    {
    if (connectionPoolQueue.Count >= maxConnectionNum)
    {
    client.Close();
    return;
    } if (!client.IsConnected)
    {
    client.Close();
    return;
    } connectionPoolQueue.Enqueue(client);
    } public static ConnectionMultiplexer OpenConnection(string host, int port)
    {
    ConnectionMultiplexer client = ConnectionMultiplexer.Connect($"{host}:{port}");
    return client;
    }
    }
  • 使用方法
    RedisConnectionPool.InitializeConnectionPool();
    for (int m = ; m < ; m++)
    {
    ConnectionMultiplexer client = null; try
    {
    client = RedisConnectionPool.GetConnection(); var db = client.GetDatabase(); db.StringSet("username", "jack"); Console.WriteLine(db.StringGet("username") + " " + m);
    }
    finally
    {
    if (client != null)
    {
    RedisConnectionPool.PushConnection(client);
    }
    }
    //ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.181.131:6379");
    //Console.WriteLine(m);
    }

5、【内存结构】阅读redis源码中的五大基础对象

源码由Redis官方下载下来并解压,然后用VS2017打开,源码在src文件夹下,redis存储结构:
基于.NetCore的Redis5.0.3(最新版)快速入门、源码解析、集群搭建与SDK使用【原创】

  • RedisServer
    源码位置: src/server.h 
     redisServer 包含16个 redisDb 在 src/server.c 的 mian() 构造函数中,查看 void initServer(void) ,可以看到创建16个DB
    基于.NetCore的Redis5.0.3(最新版)快速入门、源码解析、集群搭建与SDK使用【原创】

    我们可以看到 server.dbnum 默认值为16
    基于.NetCore的Redis5.0.3(最新版)快速入门、源码解析、集群搭建与SDK使用【原创】

  • RedisDb
    源码位置: src/server.h 
    我们可以看到 dict *dict 数据字典,过期时间,长度等等
    基于.NetCore的Redis5.0.3(最新版)快速入门、源码解析、集群搭建与SDK使用【原创】
  • redisObject
    源码位置: src/server.h 
    基于.NetCore的Redis5.0.3(最新版)快速入门、源码解析、集群搭建与SDK使用【原创】
    我们可以看到有个 *ptr 属性,指向 sds(sds.h)、quicklist(quicklist.h)、dict(dict.h)、rax(rax.h) 
    可以在redis-cli中查看redisObject属性
    基于.NetCore的Redis5.0.3(最新版)快速入门、源码解析、集群搭建与SDK使用【原创】
  • sds
    源码位置: sds.h

    sds => char[] 中了一个封装,把内存优化到了极致

    typedef char *sds;
    
    /* Note: sdshdr5 is never used, we just access the flags byte directly.
    * However is here to document the layout of type 5 SDS strings. */
    struct __attribute__ ((__packed__)) sdshdr5 {
    unsigned char flags; /* 3 lsb of type, and 5 msb of string length */
    char buf[];
    };
    struct __attribute__ ((__packed__)) sdshdr8 {
    uint8_t len; /* used */
    uint8_t alloc; /* excluding the header and null terminator */
    unsigned char flags; /* 3 lsb of type, 5 unused bits */
    char buf[];
    };
    struct __attribute__ ((__packed__)) sdshdr16 {
    uint16_t len; /* used */
    uint16_t alloc; /* excluding the header and null terminator */
    unsigned char flags; /* 3 lsb of type, 5 unused bits */
    char buf[];
    };
    struct __attribute__ ((__packed__)) sdshdr32 {
    uint32_t len; /* used */
    uint32_t alloc; /* excluding the header and null terminator */
    unsigned char flags; /* 3 lsb of type, 5 unused bits */
    char buf[];
    };
    struct __attribute__ ((__packed__)) sdshdr64 {
    uint64_t len; /* used */
    uint64_t alloc; /* excluding the header and null terminator */
    unsigned char flags; /* 3 lsb of type, 5 unused bits */
    char buf[];
    };
  • redisClient
    源码位置: src/server.h ,包含三大重要参数:

    • redisDb *db  要进行操作的数据库

    • int argc  命令的数量
    • robj **argv  命令的所有参数

    • 查询示例

      set name jack
      
      ↓↓↓↓↓↓↓
      
      argv[]=set
      argv[]=name
      argv[]=jack ↓↓↓↓↓↓↓ commandTables [
        {set => setCommand}
        {get => getCommand}
      ]

      基于.NetCore的Redis5.0.3(最新版)快速入门、源码解析、集群搭建与SDK使用【原创】

6、【String】字符串命令介绍和源码阅读及秒杀和防重验证sdk实践

6.1、String中常见命令详解

Redis中String命令:https://redis.io/commands#string

Redis命令 incr decr incrby decrby
C#命令 ++ -- Interlocked.Incrment Interlocked.Decrement
命令示例 redis> SET mykey "10"
"OK"

redis> INCR mykey

(integer) 11

redis> GET mykey

"11"
redis> SET mykey "10"
"OK"

redis> DECR mykey

(integer) 9

redis> SET mykey "234293482390480948029348230948"

"OK"

redis> DECR mykey

ERR ERR value is not an integer or out of range
redis> SET mykey "10"
"OK"

redis> INCRBY mykey 5

(integer) 15
redis> SET mykey "10"
"OK"

redis> DECRBY mykey 3

(integer) 7
  • incr命令的应用场景:【简单的解决秒杀问题】 
    基于.NetCore的Redis5.0.3(最新版)快速入门、源码解析、集群搭建与SDK使用【原创】

    库存:1000   人数:10w

    购买:3000 只放3000进来。

    购买:1000

    待付款减库存,还是购买成功减库存,这是业务的事情!

    用max来进行人员的过滤。。。
    简单示例:

    class Program
    {
    static void Main(string[] args)
    {
    ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.181.131:6379"); IDatabase db = redis.GetDatabase();
    while (true)
    {
    var num = db.StringIncrement("max1");
    if (num>)
    {
    Console.WriteLine("当前请求>3000");
    break;
    }
    Console.WriteLine(num);
    }
    Console.ReadKey();
    }
    }
  • SetNx + Expire,Set 
    应用场景:解决订单场景中的重复提交问题。  【SetNx=Set if Not eXists】如果key存在,那么value不进行复制。。。
    基于.NetCore的Redis5.0.3(最新版)快速入门、源码解析、集群搭建与SDK使用【原创】

    setnx token 12345 (处理成功)
    setnx token 12345 (处理失败)
    基于.NetCore的Redis5.0.3(最新版)快速入门、源码解析、集群搭建与SDK使用【原创】

    EXPIRE设置过期时间
    基于.NetCore的Redis5.0.3(最新版)快速入门、源码解析、集群搭建与SDK使用【原创】

    转化成Set

    基于.NetCore的Redis5.0.3(最新版)快速入门、源码解析、集群搭建与SDK使用【原创】

    说明10秒之内重复SET是不被允许的
    c#代码示例:

    class Program
    {
    static void Main(string[] args)
    {
    ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.181.131:6379"); IDatabase db = redis.GetDatabase();
    while (true)
    {
    var b = db.StringSet("token", "", TimeSpan.FromSeconds(), When.NotExists);
    Console.WriteLine(b); Thread.Sleep(TimeSpan.FromSeconds());
    }
    Console.ReadKey();
    }
    }

    基于.NetCore的Redis5.0.3(最新版)快速入门、源码解析、集群搭建与SDK使用【原创】

6.2、源码解读

本篇示例解读incr,其他请自行参照本篇解读

我们首先查看 src/server.h 中的 redisCommand ,找到 incr 对应的 incrCommand ,然后定位到 t_string.c

void incrCommand(client *c) {
incrDecrCommand(c,);//这里可以看到是+1
}

然后找到 incrDecrCommand 的定义方法

void incrDecrCommand(client *c, long long incr) {
long long value, oldvalue;
robj *o, *new; o = lookupKeyWrite(c->db,c->argv[]);
if (o != NULL && checkType(c,o,OBJ_STRING)) return;
if (getLongLongFromObjectOrReply(c,o,&value,NULL) != C_OK) return; oldvalue = value;
if ((incr < && oldvalue < && incr < (LLONG_MIN-oldvalue)) ||
(incr > && oldvalue > && incr > (LLONG_MAX-oldvalue))) {
addReplyError(c,"increment or decrement would overflow");
return;
}
value += incr; if (o && o->refcount == && o->encoding == OBJ_ENCODING_INT &&
(value < || value >= OBJ_SHARED_INTEGERS) &&
value >= LONG_MIN && value <= LONG_MAX)
{
new = o;
o->ptr = (void*)((long)value);
} else {
new = createStringObjectFromLongLongForValue(value);
if (o) {
dbOverwrite(c->db,c->argv[],new);
} else {
dbAdd(c->db,c->argv[],new);
}
}
signalModifiedKey(c->db,c->argv[]);
notifyKeyspaceEvent(NOTIFY_STRING,"incrby",c->argv[],c->db->id);
server.dirty++;
addReply(c,shared.colon);
addReply(c,new);
addReply(c,shared.crlf);
}

基于.NetCore的Redis5.0.3(最新版)快速入门、源码解析、集群搭建与SDK使用【原创】

7、【String】位图命令介绍和黑名单场景应用

7.1、bitmap思想

  • 场景示例
    customerid: 1-32 都是黑名单用户,那么如何更省内存的存储。 
    HashSet<int> hashSet=new HashSet<int>();
    hashSet.Add(customerid=)
    ...
    hashSet.Add(customerid=)
    • int类型存储:32bit  * 32 = 1024bit
    • byte类型存储:8bit * 32 =  256bit
    • bitmap类型存储:1个int  = 32bit
      基于.NetCore的Redis5.0.3(最新版)快速入门、源码解析、集群搭建与SDK使用【原创】

      customerid 作为 数组的 position

      0,1 标识 标识 该 position 是否拥有值。。。

  • 重要场景
    如果用户有500W,其中100W是刷单用户。
    如果某个店铺的刷单用户<10W,则可以使用 set 
    如果某个店铺的刷单用户>10W,则要使用 bitmap
  • bitmap 主要适用于比较小的情况,如果key=21亿,那么要产生21亿/32=几千万个int
    普通模式只要一个int就可以了

7.2、setbit, getbit, bitcount 的使用

  • setbit:设置当前position到底是0还是1
  • getbit:获取当前position的value。
  • bitcount: 判断当前有多少黑名单用户
  • redis-cli示例:
    192.168.181.131:>setbit blacklist      //key=1黑名单
    "" 192.168.181.131:>setbit blacklist //key=2不是黑名单
    "" 192.168.181.131:>setbit blacklist
    "" 192.168.181.131:>setbit blacklist
    "" 192.168.181.131:>getbit blacklist //查询是否是黑名单
    "" 192.168.181.131:>getbit blacklist //查询是否是黑名单
    "" 192.168.181.131:>bitcount blacklist //查询黑名单数量
    ""
  • SDK示例:
    class Program
    {
    static void Main(string[] args)
    {
    ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.181.131:6379"); IDatabase db = redis.GetDatabase(); db.StringSetBit("blacklist", , true);
    db.StringSetBit("blacklist", , true); Console.WriteLine(db.StringGetBit("blacklist", ));
    Console.WriteLine(db.StringGetBit("blacklist", ));
    Console.WriteLine(db.StringGetBit("blacklist", ));
    Console.WriteLine(db.StringGetBit("blacklist", )); Console.ReadKey();
    }
    }

    基于.NetCore的Redis5.0.3(最新版)快速入门、源码解析、集群搭建与SDK使用【原创】

  • C#非SDK实现:
    class Program
    {
    static void Main(string[] args)
    {
    BitArray bitArray=new BitArray(); bitArray[] = true;
    bitArray[] = true; Console.WriteLine(bitArray[]);
    Console.WriteLine(bitArray[]);
    Console.WriteLine(bitArray[]);
    Console.WriteLine(bitArray[]); Console.ReadKey();
    }
    }

    基于.NetCore的Redis5.0.3(最新版)快速入门、源码解析、集群搭建与SDK使用【原创】

8、【List】常用命令介绍及源码阅读和sdk使用

redis命令(List):https://redis.io/commands#list

8.1、List

  • 链表结构解析
    List是无环双向列表,相邻节点的查找的复杂度未O(1),如下图所示
    基于.NetCore的Redis5.0.3(最新版)快速入门、源码解析、集群搭建与SDK使用【原创】
  • 常见方法
    lpush(左进),rpop(右出) ,rpush,lpop。这四种方法,可以作为堆栈(Stack)和链表(LinkList)使用

    lpush,rpop 这就是队列
    lpush,lpop 这就是堆栈 (括号的语法检查)

8.2、阻塞版的 bxxx

获取队列数据的时方法:

  • 写一个死循环(sleep(10ms))  消息导致cpu过高
  • 如果队列没有数据,那么线程卡住(卡住客户端),一直等待获取(阻塞)
    192.168.181.131:>lpush sms
    "" 192.168.181.131:>lpush sms
    "" 192.168.181.131:>lpush sms
    "" 192.168.181.131:>llen sms
    "" 192.168.181.131:>blpop sms
    ) "sms"
    ) ""
    192.168.181.131:>blpop sms
    ) "sms"
    ) ""
    192.168.181.131:>blpop sms
    ) "sms"
    ) ""
    192.168.181.131:>blpop sms
    Connection error:Execution timeout

8.3、Sdk实践

阻塞和非阻塞 对比一下。。(代码中控制进行非阻塞)

class Program
{
static void Main(string[] args)
{
ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.181.131:6379"); IDatabase db = redis.GetDatabase(); db.ListLeftPush("sms", );
db.ListLeftPush("sms", );
db.ListLeftPush("sms", );
db.ListLeftPush("sms", );
while (true)
{
var info = db.ListLeftPop("sms");
Console.WriteLine(info);
Thread.Sleep();
} Console.ReadKey();
}
}

8.4、源码解读

我们首先查看 src/quicklist.h 中的 quicklist 我们可以看到由 quicklistNode  组成,包含头指针和尾指针

typedef struct quicklist {
quicklistNode *head;
quicklistNode *tail;
unsigned long count; /* total count of all entries in all ziplists */
unsigned long len; /* number of quicklistNodes */
int fill : ; /* fill factor for individual nodes */
unsigned int compress : ; /* depth of end nodes not to compress;0=off */
} quicklist;

然后我们查看 quicklistNode 可以看到 quicklistNode 为当前节点,包含前节点和后一节点

typedef struct quicklistNode {
struct quicklistNode *prev;
struct quicklistNode *next;
unsigned char *zl;
unsigned int sz; /* ziplist size in bytes */
unsigned int count : ; /* count of items in ziplist */
unsigned int encoding : ; /* RAW==1 or LZF==2 */
unsigned int container : ; /* NONE==1 or ZIPLIST==2 */
unsigned int recompress : ; /* was this node previous compressed? */
unsigned int attempted_compress : ; /* node can't compress; too small */
unsigned int extra : ; /* more bits to steal for future usage */
} quicklistNode;

我们接下来查看 LLEN 命令的源码,我们首先查看 src/server.c 中的 redisCommand ,找到 llen 对应的 llenCommand ,然后定位到 t_list.c

void llenCommand(client *c) {
robj *o = lookupKeyReadOrReply(c,c->argv[],shared.czero);
if (o == NULL || checkType(c,o,OBJ_LIST)) return;
addReplyLongLong(c,listTypeLength(o));
}

我们接下来查看 listTypeLength 方法

unsigned long listTypeLength(const robj *subject) {
if (subject->encoding == OBJ_ENCODING_QUICKLIST) {
return quicklistCount(subject->ptr);
} else {
serverPanic("Unknown list encoding");
}
}

接下来查看 quicklistCount 方法

/* Return cached quicklist count */
unsigned long quicklistCount(const quicklist *ql) { return ql->count; }

于是我们可以看出 LLEN 命令实际上获取的是 ptr 指针指向的 count

我们接下来再看下 LPUSH 命令,我们还是要先查看 src/server.c 中的 lpush 对应的 lpushCommand 命令、

void lpushCommand(client *c) {
pushGenericCommand(c,LIST_HEAD);
}

然后查看 pushGenericCommand 方法

void pushGenericCommand(client *c, int where) {
int j, pushed = ;
robj *lobj = lookupKeyWrite(c->db,c->argv[]); if (lobj && lobj->type != OBJ_LIST) {
addReply(c,shared.wrongtypeerr);
return;
} for (j = ; j < c->argc; j++) {
if (!lobj) {
lobj = createQuicklistObject();
quicklistSetOptions(lobj->ptr, server.list_max_ziplist_size,
server.list_compress_depth);
dbAdd(c->db,c->argv[],lobj);
}
listTypePush(lobj,c->argv[j],where);
pushed++;
}
addReplyLongLong(c, (lobj ? listTypeLength(lobj) : ));
if (pushed) {
char *event = (where == LIST_HEAD) ? "lpush" : "rpush"; signalModifiedKey(c->db,c->argv[]);
notifyKeyspaceEvent(NOTIFY_LIST,event,c->argv[],c->db->id);
}
server.dirty += pushed;
}

然后查看  listTypePush 方法

void listTypePush(robj *subject, robj *value, int where) {
if (subject->encoding == OBJ_ENCODING_QUICKLIST) {
int pos = (where == LIST_HEAD) ? QUICKLIST_HEAD : QUICKLIST_TAIL;
value = getDecodedObject(value);
size_t len = sdslen(value->ptr);
quicklistPush(subject->ptr, value->ptr, len, pos);
decrRefCount(value);
} else {
serverPanic("Unknown list encoding");
}
}

然后查看 quicklistPush 方法,可以看到加入了头或者尾

/* Wrapper to allow argument-based switching between HEAD/TAIL pop */
void quicklistPush(quicklist *quicklist, void *value, const size_t sz,
int where) {
if (where == QUICKLIST_HEAD) {
quicklistPushHead(quicklist, value, sz);
} else if (where == QUICKLIST_TAIL) {
quicklistPushTail(quicklist, value, sz);
}
}

我们可以查看一下 quicklistPushHead ,可以看到count进行可+1

/* Add new entry to head node of quicklist.
*
* Returns 0 if used existing head.
* Returns 1 if new head created. */
int quicklistPushHead(quicklist *quicklist, void *value, size_t sz) {
quicklistNode *orig_head = quicklist->head;
if (likely(
_quicklistNodeAllowInsert(quicklist->head, quicklist->fill, sz))) {
quicklist->head->zl =
ziplistPush(quicklist->head->zl, value, sz, ZIPLIST_HEAD);
quicklistNodeUpdateSz(quicklist->head);
} else {
quicklistNode *node = quicklistCreateNode();
node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_HEAD); quicklistNodeUpdateSz(node);
_quicklistInsertNodeBefore(quicklist, quicklist->head, node);
}
quicklist->count++;
quicklist->head->count++;
return (orig_head != quicklist->head);
}

9、【Hash】哈希命令介绍和分库连接串存储及源码阅读

9.1、Hash的底层结构

redis的哈希对象的底层存储可以使用ziplist(压缩列表)和hashtable。当hash对象可以同时满足一下两个条件时,哈希对象使用ziplist编码。

  • 哈希对象保存的所有键值对的键和值的字符串长度都小于64字节
  • 哈希对象保存的键值对数量小于512个

redis的hash架构就是标准的hashtab的结构,通过挂链解决冲突问题。

基于.NetCore的Redis5.0.3(最新版)快速入门、源码解析、集群搭建与SDK使用【原创】

类比成C#:

Dictionary<string,string> dict=new Dictionary<string,string>();
dict.Add("username","jack");
//假设hash(username) = 100
//table[100]=dictEntry(username,jack,next ) => model
dict.Add("password","");
//假设hash(password) = 100 //hash冲突进行挂链
//table[100]= dictEntry(pasword,12345,next ) -> dictEntry(username,jack,next )
var info= dict["username"];
info=jack;

可以看出next的作用是将冲突的hash进行挂链

9.2、使用常用的hash命令

Hash命令地址:https://redis.io/commands#hash

常用的Hash命令:hset,hget,hdel,hlen,hexists,hkeys,hvals,hgetall

  • 命令简单使用
    127.0.0.1:> flushdb
    OK
    127.0.0.1:> hset conn mysql://
    (integer)
    127.0.0.1:> hset conn mysql://
    (integer)
    127.0.0.1:> hlen conn
    (integer)
    127.0.0.1:> hexists conn
    (integer)
    127.0.0.1:> hexists conn
    (integer)
    127.0.0.1:> hget conn
    "mysql://2"
    127.0.0.1:> hdel conn
    (integer)
    127.0.0.1:> hlen conn
    (integer)
    127.0.0.1:> hset conn mysql://
    (integer)
    127.0.0.1:> hlen conn
    (integer)
    127.0.0.1:> hkeys conn
    ) ""
    ) ""
    127.0.0.1:> hvals conn
    ) "mysql://1"
    ) "mysql://3"
    127.0.0.1:> hgetall conn
    ) ""
    ) "mysql://1"
    ) ""
    ) "mysql://3"

9.3、SDK的使用

class Program
{
static void Main(string[] args)
{
ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.181.131:6379"); IDatabase db = redis.GetDatabase(); db.HashSet("conn", , "mysql://10"); var info = db.HashGet("conn", ); Console.WriteLine(info); var len = db.HashLength("conn"); Console.WriteLine(len); var arr = db.HashKeys("conn"); Console.WriteLine(string.Join(",", arr)); Console.ReadKey();
}
}

9.4、源码解读

我们首先查看 src/dict.h 头文件中的 dict 我们可以看到由数组结构 dictht 组成

typedef struct dict {
dictType *type;
void *privdata;
dictht ht[2];
long rehashidx; /* rehashing not in progress if rehashidx == -1 */
unsigned long iterators; /* number of iterators currently running */
} dict;

然后我们查看 dictht 包含 dictEntry

/* This is our hash table structure. Every dictionary has two of this as we
* implement incremental rehashing, for the old to the new table. */
typedef struct dictht {
dictEntry **table;
unsigned long size; //开辟的大小空间
unsigned long sizemask; //求余使用
unsigned long used; //实际使用的大小空间
} dictht;

然后我们查看 dictEntry 、

typedef struct dictEntry {
void *key;
union {
void *val;
uint64_t u64;
int64_t s64;
double d;
} v;
struct dictEntry *next; //挂链使用
} dictEntry;

接下来我们查看 HLEN 命令,我们还是要先查看 src/server.c 中的 hlen 对应的 hlenCommand 命令

void hlenCommand(client *c) {
robj *o; if ((o = lookupKeyReadOrReply(c,c->argv[],shared.czero)) == NULL ||
checkType(c,o,OBJ_HASH)) return; addReplyLongLong(c,hashTypeLength(o));
}

然后我们接下来查看获取hash对象长度的 hashTypeLength 方法

/* Return the number of elements in a hash. */
unsigned long hashTypeLength(const robj *o) {
unsigned long length = ULONG_MAX; if (o->encoding == OBJ_ENCODING_ZIPLIST) {
length = ziplistLen(o->ptr) / ;
} else if (o->encoding == OBJ_ENCODING_HT) {
length = dictSize((const dict*)o->ptr);
} else {
serverPanic("Unknown hash encoding");
}
return length;
}

然后查看 dictSize 方法查看计算逻辑

#define dictSize(d) ((d)->ht[0].used+(d)->ht[1].used)

可以看到将两个 dictht 数组中的 used 相加,得到 hlen 结果

我们接下来再看下 HSET 命令,我们还是要先查看 src/server.c 中的 hset 对应的 hsetCommand 命令

void hsetCommand(client *c) {
int i, created = ;
robj *o; if ((c->argc % ) == ) {
addReplyError(c,"wrong number of arguments for HMSET");
return;
} if ((o = hashTypeLookupWriteOrCreate(c,c->argv[])) == NULL) return;
hashTypeTryConversion(o,c->argv,,c->argc-); for (i = ; i < c->argc; i += ) //遍历hset的两个参数
created += !hashTypeSet(o,c->argv[i]->ptr,c->argv[i+]->ptr,HASH_SET_COPY); /* HMSET (deprecated) and HSET return value is different. */
char *cmdname = c->argv[]->ptr;
if (cmdname[] == 's' || cmdname[] == 'S') {
/* HSET */
addReplyLongLong(c, created);
} else {
/* HMSET */
addReply(c, shared.ok);
}
signalModifiedKey(c->db,c->argv[]);
notifyKeyspaceEvent(NOTIFY_HASH,"hset",c->argv[],c->db->id);
server.dirty++;
}

然后我们查看 hashTypeSet 方法

int hashTypeSet(robj *o, sds field, sds value, int flags) {
int update = ;
//判断是否是压缩类型
if (o->encoding == OBJ_ENCODING_ZIPLIST) {
unsigned char *zl, *fptr, *vptr; zl = o->ptr;
fptr = ziplistIndex(zl, ZIPLIST_HEAD);
if (fptr != NULL) {
fptr = ziplistFind(fptr, (unsigned char*)field, sdslen(field), );
if (fptr != NULL) {
/* Grab pointer to the value (fptr points to the field) */
vptr = ziplistNext(zl, fptr);
serverAssert(vptr != NULL);
update = ; /* Delete value */
zl = ziplistDelete(zl, &vptr); /* Insert new value */
zl = ziplistInsert(zl, vptr, (unsigned char*)value,
sdslen(value));
}
} if (!update) {
/* Push new field/value pair onto the tail of the ziplist */
zl = ziplistPush(zl, (unsigned char*)field, sdslen(field),
ZIPLIST_TAIL);
zl = ziplistPush(zl, (unsigned char*)value, sdslen(value),
ZIPLIST_TAIL);
}
o->ptr = zl; /* Check if the ziplist needs to be converted to a hash table */
if (hashTypeLength(o) > server.hash_max_ziplist_entries)
hashTypeConvert(o, OBJ_ENCODING_HT);
} else if (o->encoding == OBJ_ENCODING_HT) {
dictEntry *de = dictFind(o->ptr,field);//hash(field)=int查看dictEntry是否有这个position
if (de) {
sdsfree(dictGetVal(de));
if (flags & HASH_SET_TAKE_VALUE) {
dictGetVal(de) = value;
value = NULL;
} else {
dictGetVal(de) = sdsdup(value);
}
update = ;
} else {
sds f,v;
if (flags & HASH_SET_TAKE_FIELD) {
f = field;
field = NULL;
} else {
f = sdsdup(field);
}
if (flags & HASH_SET_TAKE_VALUE) {
v = value;
value = NULL;
} else {
v = sdsdup(value);
}
dictAdd(o->ptr,f,v);
}
} else {
serverPanic("Unknown hash encoding");
} /* Free SDS strings we did not referenced elsewhere if the flags
* want this function to be responsible. */
if (flags & HASH_SET_TAKE_FIELD && field) sdsfree(field);
if (flags & HASH_SET_TAKE_VALUE && value) sdsfree(value);
return update;
}

然后查看 dictFind 方法

static dictEntry *dictFind(dict *ht, const void *key) {
dictEntry *he;
unsigned int h; if (ht->size == ) return NULL;
h = dictHashKey(ht, key) & ht->sizemask;//求余取hash值
he = ht->table[h];//到table中进行查找
while(he) {//如果存在则还要进行挂链查找
if (dictCompareHashKeys(ht, key, he->key))
return he;
he = he->next;
}
return NULL;
}

然后查看 dictHashKey 方法

#define dictHashKey(ht, key) (ht)->type->hashFunction(key)

我们查看 dictAdd 方法

/* Add an element to the target hash table */
static int dictAdd(dict *ht, void *key, void *val) {
int index;
dictEntry *entry; /* Get the index of the new element, or -1 if
* the element already exists. */
if ((index = _dictKeyIndex(ht, key)) == -)
return DICT_ERR; /* Allocates the memory and stores key */
entry = malloc(sizeof(*entry));//后进来的放在前面
entry->next = ht->table[index];
ht->table[index] = entry;//将实体放在table的对应索引中去 /* Set the hash entry fields. */
dictSetHashKey(ht, entry, key);
dictSetHashVal(ht, entry, val);
ht->used++; //最终将used++
return DICT_OK;
}

10、【Set,HyperLogLog】常用命令介绍和sdk使用

10.1、理解Set的底层数据结构

Set 应用场景: 黑名单。

Set 底层就是用了dict。

[key=xxx,value=null]

10.2、常用set命令

sadd(增加),sismember(是否包含),scard(统计个数),srem(删除),smembers(列出值)

server.natappfree.cc:>sadd blacklist
"" server.natappfree.cc:>sadd blacklist
"" server.natappfree.cc:>sismember blacklist
"" server.natappfree.cc:>sismember blacklist
"" server.natappfree.cc:>scard blacklist
"" server.natappfree.cc:>sadd blacklist
"" server.natappfree.cc:>scard blacklist
"" server.natappfree.cc:>smembers blacklist
) ""
) ""
) ""
server.natappfree.cc:>srem blacklist
"" server.natappfree.cc:>smembers blacklist
) ""
) ""

10.3、sdk操作

static void Main(string[] args)
{
ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("server.natappfree.cc:39767");
IDatabase db = redis.GetDatabase(); db.SetAdd("blacklist", ""); var arr = db.SetMembers("blacklist"); Console.WriteLine(string.Join(",", arr)); var len = db.SetLength("blacklist"); Console.WriteLine($"len={len}"); db.SetRemove("blacklist", ""); Console.WriteLine(string.Join(",", db.SetMembers("blacklist"))); Console.ReadKey();
}

10.4、源码阅读

  • scard(统计个数)源码
    我们首先查看 SCARD 命令,我们还是要先查看 src/server.c 中的 scard 对应的 scardCommand 命令
    void scardCommand(client *c) {
    robj *o; if ((o = lookupKeyReadOrReply(c,c->argv[],shared.czero)) == NULL ||
    checkType(c,o,OBJ_SET)) return; addReplyLongLong(c,setTypeSize(o));
    }

    然后我们查看 setTypeSize 方法

    unsigned long setTypeSize(const robj *subject) {
    if (subject->encoding == OBJ_ENCODING_HT) {
    return dictSize((const dict*)subject->ptr);
    } else if (subject->encoding == OBJ_ENCODING_INTSET) {
    return intsetLen((const intset*)subject->ptr);
    } else {
    serverPanic("Unknown set encoding");
    }
    }

    然后我们查看 dictSize 方法

    #define dictSize(d) ((d)->ht[0].used+(d)->ht[1].used)

    这样就侧面印证了也是字典结构

  • sadd(增加)源码
    我们首先查看 SCARD 命令,我们还是要先查看 src/server.c 中的 sadd 对应的 saddCommand 命令
    void saddCommand(client *c) {
    robj *set;
    int j, added = ; set = lookupKeyWrite(c->db,c->argv[]);
    if (set == NULL) {
    set = setTypeCreate(c->argv[]->ptr);
    dbAdd(c->db,c->argv[],set);
    } else {
    if (set->type != OBJ_SET) {
    addReply(c,shared.wrongtypeerr);
    return;
    }
    } for (j = ; j < c->argc; j++) {//遍历多有的值,可以添加多个值
    if (setTypeAdd(set,c->argv[j]->ptr)) added++;
    }
    if (added) {
    signalModifiedKey(c->db,c->argv[]);
    notifyKeyspaceEvent(NOTIFY_SET,"sadd",c->argv[],c->db->id);
    }
    server.dirty += added;
    addReplyLongLong(c,added);
    }

    我们接下来查看 setTypeAdd 方法

    /* Add the specified value into a set.
    *
    * If the value was already member of the set, nothing is done and 0 is
    * returned, otherwise the new element is added and 1 is returned. */
    int setTypeAdd(robj *subject, sds value) {
    long long llval;
    if (subject->encoding == OBJ_ENCODING_HT) {
    dict *ht = subject->ptr;
    dictEntry *de = dictAddRaw(ht,value,NULL);
    if (de) {
    dictSetKey(ht,de,sdsdup(value));//添加key
    dictSetVal(ht,de,NULL);//添加value为null,所以这是内有值得hash字典
    return ;
    }
    } else if (subject->encoding == OBJ_ENCODING_INTSET) {
    if (isSdsRepresentableAsLongLong(value,&llval) == C_OK) {
    uint8_t success = ;
    subject->ptr = intsetAdd(subject->ptr,llval,&success);
    if (success) {
    /* Convert to regular set when the intset contains
    * too many entries. */
    if (intsetLen(subject->ptr) > server.set_max_intset_entries)
    setTypeConvert(subject,OBJ_ENCODING_HT);
    return ;
    }
    } else {
    /* Failed to get integer from object, convert to regular set. */
    setTypeConvert(subject,OBJ_ENCODING_HT); /* The set *was* an intset and this value is not integer
    * encodable, so dictAdd should always work. */
    serverAssert(dictAdd(subject->ptr,sdsdup(value),NULL) == DICT_OK);
    return ;
    }
    } else {
    serverPanic("Unknown set encoding");
    }
    return ;
    }

10.5、HyperLogLogs统计

命令文档地址:https://redis.io/commands#hyperloglog

  • 概况
    比如我有存储数据3,3,1,5 ,那么基数(去除重复后统计数)=3
    优点:特别能节省空间,redis: 12k的空间,就能处理long个数据。 2的64次方 个数据(字符,数字)
    只能处理count统计,有一定的误差,误差率在 0.8%。
    原理: 就是使用数学中的 概率算法,不存储数据本身,用 概率函数 预估基数值。f(x)=xxx.
    250万 int = 1M
    2.5亿 int 100M
  • pfadd, pfcount 命令使用
    server.natappfree.cc:>pfadd p1
    "" server.natappfree.cc:>pfadd p1
    "" server.natappfree.cc:>pfadd p1
    "" server.natappfree.cc:>pfcount p1
    ""

11、【SortedSet】跳跃表原理分析和topK场景中sdk应用

用途:用于范围查找。。  10-100 的人数等等。。。

11.1、理解SortedSet底层结构 (skiplist)

跳跃表。 (本质上是解决查找的一个问题)
树结构: avl,红黑树,伸展树。
链表结构: 层级链表

<1> 有序的链表 (二分查找)
level1: O(N)
level1: 10 - 46 4次
level 2: 3次
leve1 3: -
基于.NetCore的Redis5.0.3(最新版)快速入门、源码解析、集群搭建与SDK使用【原创】

level1: 做汽車: 上海 - 镇江 -南京 - 石家庄 - 北京 (100站)
level2: 做高铁: 上海 - 南京 - 天津 - 北京 (10站)
level3: 做飞机: 上海 - 北京 (1站)

11.2、源码对照

  • zskiplist
    我们首先查看  zskiplist  方法,我们还是要先查看  src/server.c  中的对应方法
    typedef struct zskiplist {
    struct zskiplistNode *header, *tail; //头尾节点
    unsigned long length;
    int level;
    } zskiplist;

    我们接下来查看  zskiplistNode  方法

    /* ZSETs use a specialized version of Skiplists */
    typedef struct zskiplistNode {
    sds ele; //原色
    double score; //得分,类似于权重
    struct zskiplistNode *backward; //回退指针
    struct zskiplistLevel {
    struct zskiplistNode *forward; //向前指针
    unsigned long span; //跳跃节点的区间
    } level[];
    } zskiplistNode;

11.3、应用场景介绍及sdk操作

首先我们初始化消费者客户的消费积分

class Program
{
static void Main(string[] args)
{
ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.181.131:6379"); IDatabase db = redis.GetDatabase(); var rand = new Random();
for (int i = ; i <= ; i++)
{
var customerID = i;
var totalTradeMoney = rand.Next(, );
db.SortedSetAdd("shop", i, totalTradeMoney);
}
Console.WriteLine("插入成功!"); Console.ReadKey();
}
}

统计积分范围个数

192.168.181.131:>zcount shop
""

新增一个用户的积分

192.168.181.131:>zadd shop
""

删除一个用户的积分

192.168.181.131:>zrem shop
""

统计所有用户数量

192.168.181.131:>zcard shop
""

查询排序后10个

192.168.181.131:>zrange shop
) ""
) ""
) ""
) ""
) ""
) ""
) ""
) ""
) ""
) ""
192.168.181.131:>zrange shop with192.168.181.:>scores
) ""
) ""
) ""
) ""
) ""
) ""
) ""
) ""
) ""
) ""
) ""
) ""
) ""
) ""
) ""
) ""
) ""
) ""
) ""
) ""

查询top10

192.168.181.131:>zrevrange shop
) ""
) ""
) ""
) ""
) ""
) ""
) ""
) ""
) ""
) ""
192.168.181.131:>zrevrange shop withscores
) ""
) ""
) ""
) ""
) ""
) ""
) ""
) ""
) ""
) ""
) ""
) ""
) ""
) ""
) ""
) ""
) ""
) ""
) ""
) ""

查询排名

192.168.181.131:>zrank shop192.168.181.:>
""

实现业务逻辑:判断某一个用户是否在消费力前 25 % 的人群,如果是,就是优质客户了。(老客户)

class Program
{
static void Main(string[] args)
{
ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.181.131:6379"); IDatabase db = redis.GetDatabase(); var len = db.SortedSetLength("shop"); var customerRank = len * 0.25; // 高端客户
var customerID = ; var dbRank = db.SortedSetRank("shop", customerID, Order.Descending); Console.ReadKey();
}
}

实现业务逻辑:获取top10%的客户,专门做重点维护。

class Program
{
static void Main(string[] args)
{
ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.181.131:6379"); IDatabase db = redis.GetDatabase(); var len = db.SortedSetLength("shop");
var top10 = len * 0.1; var vals = db.SortedSetRangeByRankWithScores("shop", order: Order.Descending); Console.ReadKey();
}
}

12、【序列化】理解redis的三大序列化存储机制

12.1、RDB (Redis Database)

默认存储文件: dump.rdb

数据库快照,加载速度快。redis意外退出会丢数据。

snapshot: 历史时间点上的某一刻的全量数据。

我们可以查看 redis.conf 中的设置内存刷新数据到磁盘的触发点

# Save the DB on disk:
#
# save <seconds> <changes>
#
# Will save the DB if both the given number of seconds and the given
# number of write operations against the DB occurred.
#
# In the example below the behaviour will be to save:
# after sec ( min) if at least key changed
# after sec ( min) if at least keys changed
# after sec if at least keys changed
#
# Note: you can disable saving completely by commenting out all "save" lines.
#
# It is also possible to remove all the previously configured save
# points by adding a save directive with a single empty string argument
# like in the following example:
#
# save "" save 1 //900秒内有1个数据改变则触发
save 10 //300秒内有10个数据改变则触发
save 10000 //60秒内有10000个数据改变则触发

也可以修改 redis.conf 中设置的保存文件名

# The filename where to dump the DB
dbfilename dump.rdb

我们来模拟一下数据

./redis-server ./redis.conf
set username jack
kill -
./redis-server ./redis.conf get username
(nil)

查看dump文件

[root@localhost redis]# od -c ./mydata/dump.rdb
R E D I S \t r e d i s
- v e r . . \n r e d i
s - b i t s @ c t i m e
N \ \b u s e d - m e m @
\r \ \f a o f - p r e a m b l
e \ t Z Y

使用redis工具查看dump文件

[root@localhost redis]# ./src/redis-check-rdb ./mydata/dump.rdb
[offset ] Checking RDB file ./mydata/dump.rdb
[offset ] AUX FIELD redis-ver = '5.0.3'
[offset ] AUX FIELD redis-bits = ''
[offset ] AUX FIELD ctime = ''
[offset ] AUX FIELD used-mem = ''
[offset ] AUX FIELD aof-preamble = ''
[offset ] Checksum OK
[offset ] \o/ RDB looks OK! \o/
[info] keys read
[info] expires
[info] already expired

12.2、AOF (Append Only File)

我们可以查看 redis.conf 中的设置内存刷新数据附加的触发点

# no: don't fsync, just let the OS flush the data when it wants. Faster.
# always: fsync after every write to the append only log. Slow, Safest.
# everysec: fsync only one time every second. Compromise.
#
# The default is "everysec", as that's usually the right compromise between
# speed and data safety. It's up to you to understand if you can relax this to
# "no" that will let the operating system flush the output buffer when
# it wants, for better performances (but if you can live with the idea of
# some data loss consider the default persistence mode that's snapshotting),
# or on the contrary, use "always" that's very slow but a bit safer than
# everysec.
#
# More details please check the following article:
# http://antirez.com/post/redis-persistence-demystified.html
#
# If unsure, use "everysec". # appendfsync always //来一条附加一条到disk
appendfsync everysec //每秒附加一条到disk
# appendfsync no //由操作系统来决定

接下来我们关闭RDB,开启AOF

#save
#save
#save appendonly yes

然后进行数据存储

127.0.0.1:> set username jack
OK
127.0.0.1:> set password
OK

查看生成的 appendonly.aof 文件

[root@localhost mydata]# cat appendonly.aof
*
$
SELECT
$ *
$
set
$
username
$
jack
*
$
set
$
password
$

AOF:加载慢,丢失数据少

RDB:加载快,丢失数据多

12.3、混合模式rdb + aof 模式

既保证加载速度快,有保证了丢失数据少。

如何开启?我们可以修改 redis.conf 中的设置内存刷新数据到磁盘的触发点

# When rewriting the AOF file, Redis is able to use an RDB preamble in the
# AOF file for faster rewrites and recoveries. When this option is turned
# on the rewritten AOF file is composed of two different stanzas:
#
# [RDB file][AOF tail]
#
# When loading Redis recognizes that the AOF file starts with the "REDIS"
# string and loads the prefixed RDB file, and continues loading the AOF
# tail.
aof-use-rdb-preamble yes

我们接下来输入存储数据

127.0.0.1:> flushall
OK
127.0.0.1:> set username jack
OK
127.0.0.1:> set password
OK

这时候 appendonly.aof 中会被追加存储命令信息

[root@localhost redis]# cat appendonly.aof
*
$
SELECT
$ *
$
flushall
*
$
set
$
username
$
jack
*
$
set
$
password
$ *
$
set
$
username
$
jack
*
$
set
$
password
$ *
$
flushall
*
$
set
$
username
$
jack
*
$
set
$
password
$

接下来执行 bgrewriteaof 命令将aof文件内容写入rdb

127.0.0.1:> bgrewriteaof
Background append only file rewriting started

这时候再查看 appendonly.aof

[root@localhost redis]# cat appendonly.aof
REDIS0009 redis-ver5.0.3
redis-bits