Redis连接池

时间:2024-06-01 18:02:34

        本次实现的Redis连接池是一个单例且多线程安全的连接池。

        主要实现的功能为:读取配置,将配置中对应建立redis连接并加入到连接池中,然后从连接池中取出连接使用。每当配置进行修改,重新往池子中加入连接。

  • 通用类

        实现一些基础都会使用的接口,定义宏等。

        有些类是利用RAII思想,构造时获得对象,初始化锁,加锁等,析构时释放对象,释放锁资源,释放锁等。

#pragma once
#include <iostream>
#include "../../hiredis/hiredis/hiredis.h"
#include <pthread.h>
#include <cstring>
#include <algorithm>
#include <map>

#define SAFE_DELETE(x) {if (x != nullptr){ delete x, (x) = nullptr; }}

class AutoReply
{
public:
  AutoReply(redisReply* reply)
  {
    reply_ = nullptr;
    if(reply)
    {
      reply_ = reply;
    }
  }

  ~AutoReply()
  {
    if(reply_)
    {
      freeReplyObject(reply_);
      reply_ = nullptr;

    }
  }

  redisReply* get()
  {
    return reply_;
  }

  bool isErr()
  {
    if (!reply_)
      return true;
    if (reply_->type == REDIS_REPLY_ERROR)
    {
      std::cout << "reply error " << reply_->str << std::endl;
      return true;
    }

    if (reply_->type == REDIS_REPLY_STATUS)
    {
      std::string tmp_str;
       //大写转小写
      //std::transform(reply_->str, reply_->str + strlen(reply_->str), tmp_str.begin(), tolower);
      for(int i = 0; i < strlen(reply_->str); ++i)
      {
        char c = reply_->str[i];
        if(isupper(reply_->str[i]))
        {
          c = tolower(c);
        }
        tmp_str += tolower(c);
      }
      //std::cout << "iserr str" << tmp_str.c_str() << std::endl;
      if (strcmp(tmp_str.c_str(), "ok") != 0)
      {
        std::cout << "reply statue not ok " << tmp_str << "replystr: " << reply_->str << std::endl;
        return true;
      }
    }
    return false;
  }

    bool isPingErr()
  {
    if (!reply_)
      return true;
    if (reply_->type == REDIS_REPLY_ERROR)
    {
      std::cout << "reply error " << reply_->str << std::endl;
      return true;
    }

    if (reply_->type == REDIS_REPLY_STATUS)
    {
      std::string tmp_str;
    //大写转小写
      std::transform(reply_->str, reply_->str + strlen(reply_->str), tmp_str.begin(), tolower);
      //std::cout << "isPingErr str" << tmp_str.c_str() << std::endl;
      if (strcmp(tmp_str.c_str(), "pong") != 0)
      {
        std::cout << "reply statue not pong " << tmp_str << std::endl;
        return true;
      }
    }
    return false;
  }

  AutoReply(const AutoReply& ar)
  {
    reply_ = ar.reply_;
  }
private:
  AutoReply operator=(const AutoReply& ar);
  redisReply* reply_;
};


class qMutex
{
public:
  qMutex()
  {
    pthread_mutex_init(&mt_, nullptr);
  }

  ~qMutex()
  {
    pthread_mutex_destroy(&mt_);
  }

  void lock()
  {
    pthread_mutex_lock(&mt_);
  }

  void unlock()
  {
    pthread_mutex_unlock(&mt_);
  }


private:
  qMutex(const qMutex& qmt);
  qMutex operator=(const qMutex& qmt);
  pthread_mutex_t mt_;
};

class AutoMutex
{
public:
  AutoMutex(qMutex& mutex):mutex_(mutex)
  {
    mutex_.lock();
  }
  ~AutoMutex()
  {
    mutex_.unlock();
  }
private:
  //AutoMutex(const AutoMutex& am);
  AutoMutex& operator=(const AutoMutex& am);
  qMutex& mutex_;
};
  • 操作redis类

        主要是连接redis,操作redis,有密码需要使用auth命令验证密码。

#pragma once

#include "common.h"

enum ConnectState {
  CONNECTSTATE_NONE,
  CONNECTSTATE_CONN,//已连接
  CONNECTSTATE_UNCONN,//断开连接
};

class redisConn{
public:
  redisConn(size_t id, std::string ip = "127.0.0.1", size_t port = 6379, std::string passwd = "123456");
  ~redisConn();

  bool connect();
  bool connectUnblock();

  bool disConnect();

  bool isRun();
  
  bool auth();

  bool ping();
  bool reconnect();
  //断线重连
  void resetReconnectTime() \
  {
    AutoMutex mt(locker_); 
    reconnect_times_ = 0; 
  }
  bool keepAlive();

  bool set(const char* key, const char* val);
  std::string get(const char* key);
private:
  size_t id_;

  std::string ip_;
  size_t port_;
  std::string passwd_;
  ConnectState state_;//连接状态
  size_t reconnect_times_;//连接次数

  redisContext* context_;
  redisReply* reply_;

  qMutex locker_;//为了保证线程安全
};
#include "redisConn.h"

redisConn::redisConn(size_t id, std::string ip, size_t port, std::string passwd)
{
  id_ = id;
  ip_ = ip;
  port_ = port;
  passwd_ = passwd;
  state_ = CONNECTSTATE_NONE;
  context_ = nullptr;
}
redisConn::~redisConn()
{
  disConnect();
}

bool redisConn::connect()
{
  if (context_ != nullptr)
  {
    redisFree(context_);
    context_ = nullptr;      
  }
  context_ = redisConnect(ip_.c_str(), port_);
  if (!context_)
  {
    std::cerr << "connect fail" << std::endl;
    return false;
  }
  if(context_->err)
  {
    std::cerr << "connect err " << context_->errstr << std::endl;
    redisFree(context_);
    context_ = nullptr;
    return false;
  }
  state_ = CONNECTSTATE_CONN;
  std::cout << "connect succ" << std::endl;

  //验证密码
  auth();
  return true;
}

bool redisConn::connectUnblock()
{
  if (context_)
  {
    redisFree(context_);
    context_ = nullptr;
  }
  
  context_ = redisConnectNonBlock(ip_.c_str(), port_);
  if(!context_)
  {
    std::cout << "connect fail" << std::endl;
    return false;
  }

  if(context_->err)
  {
    std::cout << "connect err " << context_->errstr << std::endl;
    redisFree(context_);
    context_ = nullptr;
    return false; 
  }
  state_ = CONNECTSTATE_CONN;
  std::cout << "conn no block succ" << std::endl;

  //验证密码
  //auth();
  return true;
}

bool redisConn::disConnect()
{
  AutoMutex mt(locker_);
  if (context_)
  {
    redisFree(context_);
    context_ = nullptr;
    //SAFE_DELETE(context_);
  }
  state_ = CONNECTSTATE_UNCONN;
  std::cout << "disConnect succ" << std::endl;
  return true;
}

bool redisConn::isRun()
{
  AutoMutex mt(locker_);
  return state_ == CONNECTSTATE_CONN;
}

bool redisConn::auth()
{
  AutoMutex mt(locker_);
  AutoReply reply = (redisReply*)redisCommand(context_, "auth %s", passwd_.c_str());
  if (reply.isErr())
    return false;
  std::cout << "auth succ" << std::endl;
  return true;
}

bool redisConn::ping()
{
  //检查服务器是否在运行
  AutoMutex mt(locker_);
  AutoReply reply = (redisReply*)redisCommand(context_, "ping");
  if(reply.isPingErr())
    return false;
  return true;
}

bool redisConn::reconnect()
{
  if(!connect() && !auth())
  {
    std::cout << "connect un block fail" << std::endl;
    return false;
  }
  state_ = CONNECTSTATE_CONN;
  std::cout << "reconnect succ" << std::endl;
  return true;
}

bool redisConn::keepAlive()
{
  if (state_ == CONNECTSTATE_CONN)
  {
    if(!ping())
    {
      if(!reconnect())
      {
        std::cout << "断线" << ip_ << ":" << port_ << std::endl;
        state_ = CONNECTSTATE_UNCONN;
        return false;

      }
      return true;
    }
    else if(state_ == CONNECTSTATE_UNCONN)
    {
      //断线重连
      if (reconnect_times_ < 10)
      {
        reconnect_times_++;
        if(reconnect())
        {
          reconnect_times_ = 0;
          state_ = CONNECTSTATE_CONN;
          return true;
        }
      }
    }
  }

  return false;
}


bool redisConn::set(const char* key, const char* val)
{
  AutoMutex mt(locker_);
  if(!context_)
    return false;
  AutoReply reply = (redisReply*)redisCommand(context_, "set %s %s", key, val);
  if(reply.isErr())
  {
    return false;
  }
  return true;
}

std::string redisConn::get(const char* key)
{
  AutoMutex mt(locker_);
  if(!context_)
    return "";
  AutoReply reply = (redisReply*)redisCommand(context_, "get %s", key);
  if(reply.isErr())
  {
    return "";
  }

  return reply.get()->str ? reply.get()->str : "";

}
  • 单例泛型类

        对应的连接池和读取配置只需要一个对象,可以实现为单例。

         使用这个单例:

  1. 继承该单例类(继承单例接口),并且设为友元(由于构造,拷贝构造,赋值重载运算符设为私有,单例类需要使用)。
  2. 设为友元。

        两种方式使用方法不同,看下面例子。

#pragma once

#include "common.h"

template<class T>
class Singleton
{
public:
    static T* getMe()
    {
        if(single_ == nullptr)
        {
            mt_.lock();
            if(single_ == nullptr)
            {
                single_ = new T;
            }
            mt_.unlock();
        }
        return single_;
    }

    static void delMe()
    {
        if(single_)
        {
            SAFE_DELETE(single_);
        }
    }
protected:
    //需要写出protected,不然继承的构造无法调用
    Singleton()
    {
    }
    ~Singleton()
    {
        delMe();
    }
private:
    //外部不允许赋值喝拷贝构造
    Singleton(const Singleton& );
    const Singleton& operator=(const Singleton& );
    static T* single_;
    static qMutex mt_;
};

template<class T>
T* Singleton<T>::single_ = nullptr;

template<class T>
qMutex Singleton<T>::mt_;

        使用实例:

 

  •  连接池类和读配置类

        连接池主要实现了将连接加入到连接池中,和从连接池中拿出连接。

        读取配置,配置主要为读取唯一标识——服务器ip——redis端口。如果需要和可以读取密码等。

        连接池中的每一个连接有一个唯一标识,通过标识来获取对一个连接,标识可以是对应表冈功能(比如:如果是游戏中的排行榜,可以是不同的排行榜,也可以是不同玩家,每个玩家对应一个redis)。

#pragma once 

#include "qSingleton.h"
#include "redisConn.h"
#include <fstream>
#include <cstring>


struct ConnectInfo
{
  ConnectInfo()
  {
    state_ = 0;
  }

  redisConn* con_;
  int state_;
};

class redisPool : public Singleton<redisPool>
{
  friend class Singleton<redisPool>;
public:

  bool addClient(size_t id, std::string ip, size_t port);
  bool delClient();
  bool keepAlive();

  bool updateCfgConn();
  redisConn* getClientByID(size_t id);
private:
  redisPool();
  ~redisPool();
  redisPool(const redisPool& );
  const redisPool& operator=(const redisPool&);

  ConnectInfo* getConnectInfo(size_t id);
  typedef std::map<size_t, ConnectInfo> ConnectInfoT;
  ConnectInfoT connect_;

};

struct RedisConnCfg
{
  RedisConnCfg()
  {
    id_ = 0;
    port_ = 0;
  }

  void print()
  {
      std::cout << id_ << ":" << ip_ << ":" << port_ << std::endl;
  }
  size_t id_;
  std::string ip_;
  size_t port_;
};

class RedisConnCfgMgr : public Singleton<RedisConnCfgMgr>
{
  friend class Singleton<RedisConnCfgMgr>;
public:
  bool loadCfg();
  const std::vector<RedisConnCfg>& getRedisConnCfg(){ return cfg_; }
private:
  RedisConnCfgMgr();
  ~RedisConnCfgMgr();
  RedisConnCfgMgr(const RedisConnCfgMgr&);
  const RedisConnCfgMgr& operator=(const RedisConnCfgMgr& c);

  std::vector<RedisConnCfg> cfg_;
};
#include "redisPool.h"

redisPool::redisPool()
{

}

redisPool::~redisPool()
{
  delClient();
}

bool redisPool::addClient(size_t id, std::string ip, size_t port)
{
  ConnectInfo* pInfo = getConnectInfo(id);
  if(!pInfo)
  {
    pInfo = &connect_[id];
    pInfo->con_ = new redisConn(id, ip.c_str(), port, "123456");
    pInfo->state_ = 1;
  }

  if(!pInfo->con_->isRun())
  {
    if(pInfo->con_->connect() && pInfo->con_->auth())
    {
      return true;
    }
    else 
    {
      pInfo->con_->disConnect();
      return false;
    }
  }

  return true;
}

bool redisPool::delClient()
{
  ConnectInfoT::iterator it = connect_.begin();
  for(; it != connect_.end(); ++it)
  {
    it->second.con_->disConnect();
    it->second.state_ = 0;
    SAFE_DELETE(it->second.con_);
    it->second.con_ = nullptr;
  }  
  connect_.clear();
}

redisConn* redisPool::getClientByID(size_t id)
{
  ConnectInfoT::iterator it = connect_.find(id);
  if(it != connect_.end())
  {
    return it->second.con_;
  }
  return nullptr;
}

ConnectInfo* redisPool::getConnectInfo(size_t id)
{
  ConnectInfoT::iterator it = connect_.find(id);
  if(it != connect_.end())
  {
    return &it->second;
  }
  return nullptr;
}

bool redisPool::keepAlive()
{
  ConnectInfoT::iterator it = connect_.begin();
  for(; it != connect_.end(); ++it)
  {
    it->second.con_->keepAlive();
  }
}

bool redisPool::updateCfgConn()
{
  //重置连接状态
  ConnectInfoT::iterator it = connect_.begin();
  for(; it != connect_.end(); ++it)
  {
    it->second.state_ = 0;
  }
  //更新配置中的redis到redisPool中
  const std::vector<RedisConnCfg>& cfg_vec = RedisConnCfgMgr::getMe()->getRedisConnCfg();
  for(int i = 0; i < cfg_vec.size(); ++i)
  {
    //在连接池中进行重连
    const RedisConnCfg& tmp = cfg_vec[i];
    it = connect_.find(tmp.id_);
    if(it != connect_.end())
    {
      it->second.con_->resetReconnectTime();
      it->second.con_->keepAlive();
      it->second.state_ = 1;
    }
    else
    {
      //不在进行添加并连接redis
      addClient(tmp.id_, tmp.ip_, tmp.port_);
    }
  }

  //删除掉未连接的redis
  it = connect_.begin();
  for(; it != connect_.end(); )
  {
    if(!it->second.state_)
    {
      it->second.con_->disConnect();
      SAFE_DELETE(it->second.con_);
      connect_.erase(it++);
    }
    else
    {
      it++;
    }
  }

  return true;
}

RedisConnCfgMgr::RedisConnCfgMgr()
{

}

RedisConnCfgMgr::~RedisConnCfgMgr()
{

}

bool RedisConnCfgMgr::loadCfg()
{
  std::ifstream infile("redisCfg.txt");
  if(!infile.is_open())
  {
    std::cout << "file open fail" << std::endl;
    return false;
  }

  std::vector<std::string> cfg_vec;
  char buf[1024];
  while(infile.getline(buf, sizeof(buf)))
  {
    //std::cout << buf << std::endl;
    char* p = strtok(buf, "-");
    while(p)
    {
      cfg_vec.push_back(p);
      p = strtok(NULL, "-");
    }
    if(cfg_vec.size() >= 3)
    {
      RedisConnCfg tmp;
      tmp.id_ = atoi(cfg_vec[0].c_str());
      tmp.ip_ = cfg_vec[1];
      tmp.port_ = atoi(cfg_vec[2].c_str());
      tmp.print();
      cfg_.push_back(tmp);
    }
    cfg_vec.clear();
  }
  infile.close();
  return true;
}

        测试:

#include "redisPool.h"


void* setHandleFunc(void* arg)
{
  //std::cout << pthread_self() << std::endl;
  RedisConnCfg* tmp = (RedisConnCfg*)arg;
  redisConn* conn =  redisPool::getMe()->getClientByID(tmp->id_);
  if(conn)
  {
    char kbuf[64];
    char vbuf[64];
    snprintf(kbuf, sizeof(kbuf), "k%d", tmp->id_);
    snprintf(vbuf, sizeof(vbuf), "v%ld", pthread_self());
    //std::cout << kbuf << "---" << vbuf << std::endl;
    conn->set(kbuf, vbuf);
  }
  return nullptr;
}

void* getHandleFunc(void* arg)
{
  //std::cout << pthread_self() << std::endl;
  RedisConnCfg* tmp = (RedisConnCfg*)arg;
  redisConn* conn =  redisPool::getMe()->getClientByID(tmp->id_);
  if(conn)
  {
    char kbuf[64];
    snprintf(kbuf, sizeof(kbuf), "k%d", tmp->id_);
    std::string str = conn->get(kbuf);
    std::cout << kbuf << ":" << str << std::endl;
  }
  return nullptr;
}

int main()
{
  //加载配置
  RedisConnCfgMgr::getMe()->loadCfg();
  redisPool::getMe()->updateCfgConn();

  std::vector<pthread_t> pt_vec;
  const std::vector<RedisConnCfg>& cfg_vec = RedisConnCfgMgr::getMe()->getRedisConnCfg();
  for(int i = 0; i < cfg_vec.size(); ++i)
  {
    //创建线程
    pthread_t pid = 0;
    pthread_create(&pid, nullptr, setHandleFunc, (void*)&cfg_vec[i]);
    pt_vec.push_back(pid);
  }

  for(int i = 0; i < pt_vec.size(); ++i)
  {
    pthread_detach(pt_vec[i]);
  }

  pt_vec.clear();
  for(int i = 0; i < cfg_vec.size(); ++i)
  {
    //创建线程
    pthread_t pid = 0;
    pthread_create(&pid, nullptr, getHandleFunc, (void*)&cfg_vec[i]);
    pt_vec.push_back(pid);
  }

  for(int i = 0; i < pt_vec.size(); ++i)
  {
    pthread_detach(pt_vec[i]);
  }

  while(1)
  {}

  return 0;
}

int main()
{
  RedisConnCfgMgr rm;
  rm.loadCfg();

  return 0;
}

         Makefile编译文件:

        主要需要下载hiredis库。

main:main.cpp redisPool.cpp redisConn.cpp
	g++ -g $^ -o $@ -std=c++11 -lhiredis  -lpthread


.PHONY:clean
clean:
	rm -rf main