redis在游戏服务器中的使用初探(一) 环境搭建
redis在游戏服务器中的使用初探(二) 客户端开源库选择
redis在游戏服务器中的使用初探(三) 信息存储
redis在游戏服务器中的使用初探(四) redis应用
在学习分布式对象存储的期间,有这么一个需求
"多个接口服务(本文当作客户端Clinet)需要以固定间隔向所有的多个服务器发送心跳,保证服务器确认客户端状态。
服务器在接收到文件读取请求时候,会广播询问所有数据服务器(本文也当作服务器)存储的数据情况"
以上一对多 的询问,是需要消息队列来进行通讯的
但是其实 redis也可以作为轻量级的消息队列来完成这个需求。
结构图
服务器开启一个线程进行redis订阅模式,当有人在指定频道发布消息时,所有订阅该频道的节点都可以接收到消息。
但是订阅操作如果我们不想采取固定时间间隔去获取频道是否有消息这么LOW的方案,其实是需要做成异步模式的。
而windows下 hredis异步模式是需要libevent支持的。 两者都是linux下运行良好的开源库,在windows下却是问题多多。
经过多次尝试,我决定放弃使用这两个开源库而选择cpp-redis。(linux下使用hredis和libevent ,有时间会试试)
流程如下:
一个服务节点 需要开启一个线程 进行客户端消息队列的订阅,每当收到消息就会调用收到消息的回调函数
而最初开启的服务节点的运行线程会定时的在服务器消息队列发布询问数据存储的信息。
客户端节点则相反 开启一个线程 定时向客户端消息队列发布心跳信息。
最初开启的客户端节点进行服务器消息队列的订阅,若收到服务器的数据存储询问,则进行本身是否存储该数据的判断
由于资源有限,最开始我们开启了5个线程 来模拟 2个服务器和 3个客户端
代码如下
1 #include <iostream> 2 #include <Winsock2.h> 3 #include <thread> 4 #include <mutex> 5 6 #include "cpp_redis/cpp_redis" 7 #include "tacopie/tacopie" 8 9 using namespace std; 10 11 const int serverThreadNum = 2; 12 const int clientThreadNum = 3; 13 const int heartBeatTime = 1; 14 const int ServerQueryTime = 1; 15 const std::string clientChanName = "ClientChan"; 16 const std::string serverChanName = "ServerChan"; 17 std::mutex g_mutex; 18 19 class WinsockGuard { 20 public: 21 WinsockGuard() { 22 WORD version = MAKEWORD(2, 2); 23 if (WSAStartup(version, &data) != 0) { 24 std::cerr << "WSAStartup() failure!" << std::endl; 25 return; 26 } 27 } 28 29 ~WinsockGuard() { 30 WSACleanup(); 31 } 32 private: 33 WSADATA data; 34 }; 35 36 bool SubcribCommFunc(int threadNum,bool isServer) { 37 cpp_redis::subscriber sub; 38 39 try { 40 sub.connect("127.0.0.1", 6379, [](const std::string& host, std::size_t port, cpp_redis::subscriber::connect_state status) { 41 if (status == cpp_redis::subscriber::connect_state::dropped) { 42 {std::lock_guard<std::mutex> l(g_mutex); std::cout << "client disconnected from " << host << ":" << port << std::endl; } 43 //should_exit.notify_all(); 44 } 45 }); 46 47 } 48 catch (std::exception& e) { 49 {std::lock_guard<std::mutex> l(g_mutex); std::cerr << "in " << __FUNCTION__ << ".err = " << e.what() << std::endl; } 50 return false; 51 } 52 std::string chanName; 53 if (isServer) {chanName = clientChanName;} 54 else {chanName = serverChanName;} 55 56 sub.subscribe(chanName.c_str(), [threadNum, isServer](const std::string& chan, const std::string& msg) { 57 string s; 58 if (isServer)s = "server "; 59 else s = "client "; 60 s += to_string(threadNum);s += " recv "; 61 {std::lock_guard<std::mutex> l(g_mutex); std::cout << s.c_str() << chan << ": " << msg << std::endl; } 62 //todo Check heatbeat or response 63 }); 64 sub.commit(); 65 66 while (1) { 67 std::this_thread::sleep_for(std::chrono::seconds(50000)); 68 } 69 70 return true; 71 } 72 73 bool RecvClientInfo(int i) { 74 return SubcribCommFunc(i,true); 75 } 76 77 bool PublishCommFunc(int threadNum, bool isServer, string publishStr) { 78 cpp_redis::client client; 79 try { 80 client.connect("127.0.0.1", 6379, [threadNum, isServer,&publishStr](const std::string& host, std::size_t port, cpp_redis::client::connect_state status) { 81 if (status == cpp_redis::client::connect_state::dropped) { 82 {std::lock_guard<std::mutex> l(g_mutex); std::cout << "disconnected from " << host << ":" << port << std::endl; } 83 } 84 }); 85 while (1) { 86 std::string chanName; 87 if (isServer) {chanName = serverChanName;} 88 else { chanName = clientChanName;} 89 90 client.publish(chanName.c_str(), publishStr.c_str()); 91 client.commit(); 92 93 int PubliLoopTime = 9; 94 if (isServer) {PubliLoopTime = ServerQueryTime;} 95 else {PubliLoopTime = heartBeatTime;} 96 97 std::this_thread::sleep_for(std::chrono::seconds(PubliLoopTime)); 98 } 99 } 100 catch (std::exception& e) { 101 {std::lock_guard<std::mutex> l(g_mutex); std::cerr << "in " << __FUNCTION__ << ".err = " << e.what() << std::endl; } 102 return false; 103 } 104 105 return true; 106 } 107 108 void QueryWhoSaveDataLoop(int i) { 109 string s = "Server thread ";s += to_string(i);s += " query Who save data? "; 110 PublishCommFunc(i, true, s); 111 return; 112 } 113 114 void ServerFunc(int i) { 115 {std::lock_guard<std::mutex> l(g_mutex);std::cout << "Enter ServerFunc threadNo = " << i << std::endl;} 116 //开启一个订阅客户端消息队列的线程 接受客户端的心跳包 117 thread t = thread(RecvClientInfo, i); 118 t.detach(); 119 120 //开启一个定时检测心跳超时的客户端 todo 121 122 //本线程不定时随机 发送一个询问各个客户端是否保存有数据 123 QueryWhoSaveDataLoop(i); 124 125 std::this_thread::sleep_for(std::chrono::seconds(500)); 126 } 127 128 void SendHeatBeatOnTime(int threadNum, int sendTime) { 129 string s = "client thread ";s += to_string(threadNum);s += " send heartbeat"; 130 PublishCommFunc(threadNum, false, s); 131 } 132 133 void ClientFunc(int i) { 134 {std::lock_guard<std::mutex> l(g_mutex);std::cout << "Enter ClientFunc threadNo = " << i << std::endl;} 135 136 //开启一个线程 定时发送心跳包 137 int s = heartBeatTime; 138 std::thread t = thread(SendHeatBeatOnTime, i, s); 139 t.detach(); 140 141 SubcribCommFunc(i, false); 142 } 143 144 void Start() { 145 thread serverThread[serverThreadNum]; 146 thread clientThread[clientThreadNum]; 147 148 for (int i = 0; i < serverThreadNum; i++) { 149 serverThread[i] = thread(ServerFunc, i); 150 } 151 for (int i = 0; i < clientThreadNum; i++) { 152 clientThread[i] = thread(ClientFunc, i); 153 } 154 //================================================== 155 for (int i = 0; i < serverThreadNum; i++) { 156 serverThread[i].join(); 157 } 158 for (int i = 0; i < clientThreadNum; i++) { 159 clientThread[i].join(); 160 } 161 } 162 163 int main() 164 { 165 WinsockGuard g; 166 Start(); 167 std::cout << "Finish!\n"; 168 }
开启redis 运行代码如图
番外: 补上我在ubuntu下进行的libevent + hiredis的异步测试
首先是安装源头更新 更新 gcc g++ make 等工具
sudo apt-get update
sudo apt-get install g++ gcc
安装 redis server
sudo apt-get install redis-server
现在可以通过下面的命令查看到该进程:
ps -ef|grep redis
然后安装 hiredis 和 libevent
sudo apt-get install libhiredis-dev
sudo apt-get install libevent-dev
安装完成验证下是否正确安装
编写libevent 示例代码
1 #include <event.h> 2 #include <stdio.h> 3 4 struct event ev; 5 struct timeval tv; 6 7 8 void time_cb(int fd, short event, void *argc) 9 { 10 printf( "timer wakeup\n"); 11 event_add(&ev, &tv); 12 } 13 14 int main() 15 { 16 struct event_base *base = event_init(); 17 18 tv.tv_sec = 1; 19 tv.tv_usec = 0; 20 evtimer_set(&ev, time_cb, NULL); 21 event_add(&ev, &tv); 22 event_base_dispatch(base); 23 24 return 0; 25 }
执行编译命令并运行 gcc -o eventexe libeventTest.c -levent
./eventexe 执行无错误则验证通过
编写hiredis示例代码
1 #include <stdio.h> 2 #include <hiredis/hiredis.h> 3 int main() 4 { 5 redisContext *conn = redisConnect("127.0.0.1",6379); 6 if(conn != NULL && conn->err) 7 { 8 printf("connection error: %s\n",conn->errstr); 9 return 0; 10 } 11 redisReply *reply = (redisReply*)redisCommand(conn,"set foo 1234"); 12 freeReplyObject(reply); 13 14 reply = redisCommand(conn,"get foo"); 15 printf("%s\n",reply->str); 16 freeReplyObject(reply); 17 18 redisFree(conn); 19 return 0; 20 }
执行编译命令并运行 gcc -o hiredisCli hiredisTest.c -lhiredis
./hiredisCli 执行无错误则验证通过
libevent和hiredis都确认无误后 开始测试异步代码
编写异步示例代码
1 #include <stdio.h> 2 #include <stdlib.h> 3 #include <string.h> 4 #include <signal.h> 5 6 #include <hiredis/hiredis.h> 7 #include <hiredis/async.h> 8 #include <hiredis/adapters/libevent.h> 9 10 11 #include <stdio.h> 12 #include <stdlib.h> 13 #include <string.h> 14 #include <signal.h> 15 16 void getCallback(redisAsyncContext *c, void *r, void *privdata) { 17 redisReply *reply = r; 18 if (reply == NULL) return; 19 printf("argv[%s]: %s\n", (char*)privdata, reply->str); 20 21 /* Disconnect after receiving the reply to GET */ 22 redisAsyncDisconnect(c); 23 } 24 25 void connectCallback(const redisAsyncContext *c, int status) { 26 if (status != REDIS_OK) { 27 printf("Error: %s\n", c->errstr); 28 return; 29 } 30 printf("Connected...\n"); 31 } 32 33 void disconnectCallback(const redisAsyncContext *c, int status) { 34 if (status != REDIS_OK) { 35 printf("Error: %s\n", c->errstr); 36 return; 37 } 38 printf("Disconnected...\n"); 39 } 40 41 int main (int argc, char **argv) { 42 signal(SIGPIPE, SIG_IGN); 43 struct event_base *base = event_base_new(); 44 45 redisAsyncContext *c = redisAsyncConnect("127.0.0.1", 6379); 46 if (c->err) { 47 /* Let *c leak for now... */ 48 printf("Error: %s\n", c->errstr); 49 return 1; 50 } 51 52 redisLibeventAttach(c,base); 53 redisAsyncSetConnectCallback(c,connectCallback); 54 redisAsyncSetDisconnectCallback(c,disconnectCallback); 55 redisAsyncCommand(c, NULL, NULL, "SET key %b", argv[argc-1], strlen(argv[argc-1])); 56 redisAsyncCommand(c, getCallback, (char*)"end-1", "GET key"); 57 event_base_dispatch(base); 58 return 0; 59 }
执行编译命令并运行
gcc -o async async.c -lhiredis -levent
./async
测试成功