一、什么是Redis
Redis是一个开源的,使用C语言编写的面向键值对类型的分布式Nosql数据库系统,功能类似Memcache,但比Memcache功能更丰富。官网地址:https://redis.io
Redis与Memcache的不同点:
支持持久化、主从复制;支持的类型多;
Memcache通过客户端驱动实现的集群,存储的时候通过算法来决定把值存在哪一台服务器中。
redis做集群与Memcach不同,每一个redis服务器都有一个完整的副本。
Redis支持的数据类型有5种:
string : key为string,value支持各种格式,会自动序列化
list(链表) :双向链表可实现stack和queue
Hash(哈希) :高效存储对象
Set(集合)/ZSet(有序集合) :可以进行交集、并集、差集操作
二、Redis安装和在.net中的使用
1 Redis结构
redis不正式支持windows,但是,微软开发并维护了针对Win64的Windows版本。windows版本下载地址:https://github.com/MicrosoftArchive/redis/releases,我们下载zip包,下载完成后解压如图所示:
Redis的文件结构还是比较明了的,这里不多赘述。
2 简单配置
Redis的相关配置是在配置文件redis.windows.conf(如果使用的是Linux,redis的配置文件为redis.conf,配置方法一样)中进行的,打开redis.windows.conf文件,修改以下几个节点:
绑定ip :bind 127.0.0.1
端口号: port
加密: requirepass
这里配置了Redis的IP,端口,和密码,其他的配置下面会介绍。
3 持久化配置
Rdis提供了两种持久化的策略:RDB方式和AOF方式。
RDB(Redis Database)
默认方式,将Redis的数据以快照的形式存放在磁盘上,持久化数据放在 dump.rdb中
修改配置文件: save
说明:九百秒大于10条记录就存储,储存在dump.rdb文件中(在这段时间内服务器出问题会来不及保存,造成数据丢失)
AOF(append-only file)
redis将收到的写命令都通过write函数追到到文件中,当redis重启时会通过重新执行文件中的写命令在内存中重建(数据不会丢失,影响了性能)
修改配置文件:
appendonly yes #默认是no,开启AOF
appendfsync #always (每次都写入,性能差但是持久化最有保障)
everysec #一秒写一次,推荐
#no (操作系统控制,性能好但是持久化没有保障)
数据恢复
当我们的redis服务器挂了,只需要有下边这两个文件的任何一个都可恢复数据
恢复数据的操作很简单:
通过 config get dir 命令找到Redis服务的目录,把上边的两个备份文件任选一个放在该目录下,重启redis服务即可,redis会自动读取备份文件中的记录,把内容写入内存中。
简单补充:如果AOF和RDB的同步文件都存在,优先使用的是AOF文件。
4 PowerShell测试
1.安装
#将Redis安装为Windows服务
.\redis-server --service-install redis.windows.conf --loglevel verbose --service-name Redis6379
2.测试
5 net使用Redis(ServiceStack插件)
下边给出了在.net中使用的代码:
static void Main(string[] args)
{
//---------获取redisClient对象
IRedisClientsManager clientManager = new PooledRedisClientManager(new string[]
{ "127.0.0.1:6379", "10.122.0.1" });
IRedisClient client = clientManager.GetClient(); //简单测试数据
UserInfo user1 = new UserInfo() { UserName = "zs", Age = };
UserInfo user2 = new UserInfo() { UserName = "ls", Age = }; //---------------------------------string类型-------------------------------------------//
//string存一个对象
client.Set<UserInfo>("user1", user1);
UserInfo userGot = client.Get<UserInfo>("user1");
Console.WriteLine(userGot.UserName); //string存一个list
List<UserInfo> userlist = new List<UserInfo>() { user1, user2 };
client.Set<List<UserInfo>>("userlist", userlist);//存入list
List<UserInfo> listGot = client.Get<List<UserInfo>>("userlist");//取出list
foreach (UserInfo user in listGot)
{
Console.WriteLine(user.UserName);
} //---------------------------------hash类型---------------------------------------// client.SetEntryInHash("usert", "UserName", "zs");
client.SetEntryInHash("usert", "Age", ""); List<string> listKeys = client.GetHashKeys("usert");//获取key 结果:UserName,Age
List<string> listValues = client.GetHashValues("usert");//获取值 结果:zs,18
List<string> listAllK = client.GetAllKeys();//获取所有的key。 //-------------------------list类型(只支持<string,string>)------------------------// //队列使用
client.EnqueueItemOnList("listQueue", "item1");
client.EnqueueItemOnList("listQueue", "item2");
int count = client.GetListCount("listQueue");
for (int i = ; i < count; i++)
{
Console.WriteLine(client.DequeueItemFromList("listQueue"));//结果:item1,item2 先进先出
} //栈使用
client.PushItemToList("listStack", "item1");
client.PushItemToList("listStack", "item2");
int count = client.GetListCount("listStack");
for (int i = ; i < count; i++)
{
Console.WriteLine(client.PopItemFromList("listStack"));//结果:item2,item1 先进后出
} //-------------------------Set类型只支持<string,string>--------------------------//
//对Set类型进行操作
client.AddItemToSet("set1", "");
client.AddItemToSet("set1", "");
client.AddItemToSet("set1", "");
client.AddItemToSet("set1", "");
client.AddItemToSet("set1",""); //client.RemoveItemFromSet("set1", "111");//删除111
//获取set中的数据
HashSet<string> hashset = client.GetAllItemsFromSet("set1");
foreach (string str in hashset)
{
Console.WriteLine(str);//每次abcde的顺序都不一样
} client.AddItemToSet("set2", "");
client.AddItemToSet("set2", "");
client.AddItemToSet("set2", "");
client.AddItemToSet("set2", ""); //求并集 结果:1~7
HashSet<string> hashset1 = client.GetUnionFromSets(new string[]{"set1", "set2"});
//求交集 结果:444 555
HashSet<string> hashset2 = client.GetIntersectFromSets(new string[] { "set1", "set2" });
//求差集(第一个set有,第二个set没有) 结果:111 222 333
HashSet<string> hashset3 = client.GetDifferencesFromSet("set1",new string[]{"set2"}); }
三、集群和密码
1 redis加密
redis没有实现访问控制这个功能,但是它提供了一个轻量级的认证方式,可以编辑redis.conf配置来启用认证。
加密方法1:初始化Redis密码:
配置文件中: requirepass 123321;(Ps:需重启Redis才能生效)
加密方法2:不重启Redis设置密码(shell中进行):
配置文件中: requirepass 123321;
shell执行 : redis 127.0.0.1:6379> config set requirepass 123321
登陆有密码的Redis:
redis-cli -p 6379
redis 127.0.0.1:6379> auth 123321
查询密码:
redis 127.0.0.1:6379> config get requirepass
(error) ERR operation not permitted
密码验证:
redis 127.0.0.1:6379> auth 123321
OK
再次查询:
redis 127.0.0.1:6379> config get requirepass
1) "requirepass"
2) "123321"
2 搭建redis集群
redis主从复制机制中一个master可以有多个slave,而一个slave也可以有多个slave,如此可是使用redis搭建多级的服务器集群。
一般写操作在master服务器中,在通过主从复制写入slave服务器,查操作通过slave服务器中获取(因为写的操作一般比较少,而读的操作多)
怎么搭建集群:
1、三台电脑为例,一主二从,建立三个redis服务器MasterRedis,SlaveRedis1,SlavaRedis2
2、在Master的配置文件修改:
port 6379 ---- bind 192.168.2.153 ------ reqirepass 123321
3、在Slave1中
port 6379----- bind 192.168.2.154 ----- slaveof 192.168.2.153 6379 --- masterauth 123321 --- reqirepass 123456
在Slave2中
port 6379 ----- bind 192.168.2.155 ----- slaveof 192.168.2.153 6379 --- masterauth 123321 --- reqirepass 123456
4、推荐:Master关闭save和appendonly
Slave开启save和appendonly,bgrewriteaof
(Master只向内存写,不考虑持久化,让Slave来进行持久化和Aof日志记录,这样做的优势是Master的性能达到最好)
3 读写分离的简单实现(RedisExchange插件)
public class RedisHelper
{
#region 属性成员
//ConnectionMultiplexer实例是线程安全的,默认设计为单例(连接复用器)
//主服务器负责write操作
private static ConnectionMultiplexer writeRedis = ConnectionMultiplexer.Connect(new ConfigurationOptions()
{
EndPoints = { { "192.168.2.153", } },
Password = "",
AllowAdmin = true
});
//查询较多,从服务器负责read,查询负荷大的时候可以随时添加从服务器
private static ConnectionMultiplexer readRedis = ConnectionMultiplexer.Connect(new ConfigurationOptions()
{
EndPoints = { { "192.168.2.154", }, { "192.168.2.155", } },
Password = "",
});
static IDatabase masterDb = writeRedis.GetDatabase();
static IDatabase slaveDb = readRedis.GetDatabase();
#endregion
#region string类型操作
/// <summary>
/// string类型写入
/// </summary>
/// <param name="key"></param>
/// <param name="value"></param>
/// <returns>isSuccess</returns>
public static bool StringSet(string key, string value, TimeSpan? expiry = default(TimeSpan?))
{
return masterDb.StringSet(key, value, expiry);
}
/// <summary>
/// string类型读取
/// </summary>
/// <param name="key"></param>
/// <returns></returns>
public static string StringGet(string key)
{ return slaveDb.StringGet(key);
}
#endregion
#region 单个实例
/// <summary>
/// 单个实例写入
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="key"></param>
/// <param name="t"></param>
/// <param name="expiry"></param>
/// <returns></returns>
public static bool ObjSet<T>(string key, T t, TimeSpan? expiry = default(TimeSpan?))
{
string json = Newtonsoft.Json.JsonConvert.SerializeObject(t);
return masterDb.StringSet(key, json, expiry);
}
/// <summary>
/// 单个实例获取
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="key"></param>
/// <returns></returns>
public static T ObjGet<T>(string key) where T : class
{
try
{
string json = slaveDb.StringGet(key);
if (!string.IsNullOrWhiteSpace(json))
{
T t = Newtonsoft.Json.JsonConvert.DeserializeObject<T>(json);
if (t != null)
{
return t;
}
}
return null;
}
catch (Exception)
{
throw;
}
}
#endregion //public static void List_Add<T>(String key,T t)
//{
// masterDb.ListRightPush(key,t)
//} #region 哈希类型
#region 简单属性
/// <summary>
/// set or update the HashValue for string key
/// </summary>
/// <param name="key"></param>
/// <param name="hashkey"></param>
/// <param name="value"></param>
/// <returns></returns>
public static void SampleHashSet(string key, string hashkey, RedisValue value)
{
masterDb.HashSet(key, hashkey, value);
}
/// <summary>
/// 获取简单类型的属性值
/// </summary>
/// <param name="key">Represents a key that can be stored in redis</param>
/// <param name="hashkey"></param>
/// <returns></returns>
public static RedisValue SampleHashGet(string key, string hashkey)
{
return slaveDb.HashGet(key, hashkey);
}
#endregion
#region 复杂属性
/// <summary>
/// 复杂属性存入
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="key"></param>
/// <param name="hashkey"></param>
/// <param name="t">defined class</param>
/// <returns></returns>
public static void ObjHashSet<T>(string key, string hashkey, T t) where T : class
{
string json = Newtonsoft.Json.JsonConvert.SerializeObject(t);
masterDb.HashSet(key, hashkey, json);
}
/// <summary>
/// 复杂属性取出
/// </summary>
/// <param name="key">Represents a key that can be stored in redis</param>
/// <param name="hashkey"></param>
/// <returns></returns>
public static T ObjHashGet<T>(string key, string hashkey) where T : class
{
try
{
string json = slaveDb.HashGet(key, hashkey).ToString();
if (!string.IsNullOrWhiteSpace(json))
{
T t = Newtonsoft.Json.JsonConvert.DeserializeObject<T>(json);
if (t != null)
{
return t;
}
}
return null;
}
catch (Exception)
{
throw;
}
}
#endregion
#endregion
}
这里没有list,set,zset,只是简单展示了读写分离的思路。
补充:
1.redis分布式锁(进程锁)
参考:https://www.imooc.com/article/37701
在同一进程中,我们使用lock来处理多线程并发的问题,但在分布式系统中,服务部署在多台服务器上lock锁就不能解决高并发问题了,我们可以使用redis的分布式锁来处理:
实现思路:在执行业务代码前,先去设置一个分布式锁(就是给Redis设置一个Key,但是要这个Key不存再的情况下才可以设置成功)
如果设置成功,表示当前进程拿到锁,可以执行后续代码
如果设置失败,表示其它进程已经锁定,那么我们就要让当前进程休眠一下,然后再去重试设置锁
直到设置锁成功,才表示当前进程锁定,才可以执行自定义代码
在执行自宝义代码后,释放锁,这样其它进程就可以拿到锁了
ConnectionMultiplexer multiplexer = ConnectionMultiplexer.Connect("0.0.0.0:1234");
var database= multiplexer.GetDatabase(); //加锁Lock,使用ThreadID用来模拟,实际应该使用进程号或者商品编号来替代ThreadId
while (true)
{
bool flag = database.LockTake("lock_key", Thread.CurrentThread.ManagedThreadId, TimeSpan.FromSeconds());
if (flag)
{
break;
} Thread.Sleep();
} //do something
//do something
//do something //解锁
database.LockRelease("lock_key", Thread.CurrentThread.ManagedThreadId);
2.redis的发布订阅
//发布
ConnectionMultiplexer multiplexer = ConnectionMultiplexer.Connect("0.0.0.0:1234");
ISubscriber pub = multiplexer.GetSubscriber();
pub.Publish("myTopic", "myMessage"); //订阅
ConnectionMultiplexer multiplexer = ConnectionMultiplexer.Connect("0.0.0.0:1234");
ISubscriber sub = multiplexer.GetSubscriber();
ChannelMessageQueue queue = sub.Subscribe("myTopic");
queue.OnMessage(msg => { Console.WriteLine(msg.Message); });
3.redis内存管理
redis通过配置文件的maxmemory可以设置最大占用内存,如果内存满了的话,最简单的处理方式是增加maxmemory的值。
如果设置了maxmemory后,超过maxmemory的值怎么办?使用两个配置:
maxmemory-policy,默认是noeviction,表示不删除key,只报错;
maxmemory-samples,设置一次删除几条数据
maxmemory-policy可选择的策略:
volatile-lru 使用LRU算法删除一个键(只对设置了生存时间的键)
allkeys-lru 使用LRU算法删除一个键
volatile-random 随机删除一个键(只对设置了生存时间的键)
allkeys-random 随机删除一个键
volatile-ttl 删除生存时间最近的一个键
noeviction 不删除键,只返回错误