PostgreSQL异步客户端(并模拟redis 数据结构)

时间:2022-09-07 10:41:56

以前为了不在游戏逻辑(对象属性)变更时修改数据库,就弄了个varchar字段来表示json,由服务器逻辑(读取到内存)去操作它。

但这对运维相当不友好,也不能做一些此Json数据里查询。

所以后面就用了下ssdb,然而就在前几天才了解到postgresql支持json了(其实早在两年前就行了吧···)

就这点差不多就可以算当作mongodb用了,不过还是不支持redis的高级数据结构。

于是我就想模拟(实现)下redis(的数据结构)。

就抽空看了下它的c api库:libpq,发现其请求-等待模型,在网络延迟高的时候,特别影响qps。所以我就写了一个异步客户端,并简易模拟了redis的kv,hash。

开8个链接到pg server,其速度比1个链接快5倍。 在我的测试中,每秒打到30k QPS

(目前不支持list,以及后期还要通过储存过程对现在的hash实现进行改造优化)

#include <string>
#include <list>
#include <iostream>
#include <unordered_map>
#include <memory>
#include <queue>
#include <assert.h>
#include <functional>
#include <sstream>
#include <chrono> #include "fdset.h" #include "libpq-events.h"
#include "libpq-fe.h"
#include "libpq/libpq-fs.h" using namespace std; class AsyncPGClient
{
public:
/*TODO::传递错误信息*/
typedef std::function<void(const PGresult*)> RESULT_CALLBACK;
typedef std::function<void(bool value)> BOOL_RESULT_CALLBACK;
typedef std::function<void(const string& value)> STRING_RESULT_CALLBACK;
typedef std::function<void(const std::unordered_map<string, string>& value)> STRINGMAP_RESULT_CALLBACK; AsyncPGClient() : mKVTableName("kv_data"), mHashTableName("hashmap_data")
{
mfdset = ox_fdset_new();
} ~AsyncPGClient()
{
for (auto& kv : mConnections)
{
PQfinish((*kv.second).pgconn);
} ox_fdset_delete(mfdset);
mfdset = nullptr;
} void get(const string& key, const STRING_RESULT_CALLBACK& callback = nullptr)
{
mStringStream << "SELECT key, value FROM public." << mKVTableName << " where key = '" << key << "';"; postQuery(mStringStream.str(), [callback](const PGresult* result){
if (callback != nullptr && result != nullptr)
{
if (PQntuples(result) == && PQnfields(result) == )
{
callback(PQgetvalue(result, , ));
}
}
});
} void set(const string& key, const string& v, const BOOL_RESULT_CALLBACK& callback = nullptr)
{
mStringStream << "INSERT INTO public." << mKVTableName << "(key, value) VALUES('" << key << "', '" << v << "') ON CONFLICT(key) DO UPDATE SET value = EXCLUDED.value;"; postQuery(mStringStream.str(), [callback](const PGresult* result){
if (callback != nullptr)
{
if (PQresultStatus(result) == PGRES_COMMAND_OK)
{
callback(true);
}
else
{
cout << PQresultErrorMessage(result);
callback(false);
}
}
});
} void hget(const string& hashname, const string& key, const STRING_RESULT_CALLBACK& callback = nullptr)
{
hmget(hashname, { key }, [callback](const std::unordered_map<string, string>& value){
if (callback != nullptr && !value.empty())
{
callback((*value.begin()).second);
}
});
} void hmget(const string& hashname, const std::vector<string>& keys, const STRINGMAP_RESULT_CALLBACK& callback = nullptr)
{
mStringStream << "SELECT key, value FROM public." << mHashTableName << " where ";
auto it = keys.begin();
do
{
mStringStream << "key='" << (*it) << "'"; ++it;
} while (it != keys.end() && &(mStringStream << " or ") != nullptr);
mStringStream << ";"; postQuery(mStringStream.str(), [callback](const PGresult* result){
if (callback != nullptr)
{
std::unordered_map<string, string> ret;
if (PQresultStatus(result) == PGRES_TUPLES_OK)
{
int num = PQntuples(result);
int fileds = PQnfields(result);
if (fileds == )
{
for (int i = ; i < num; i++)
{
ret[PQgetvalue(result, i, )] = PQgetvalue(result, i, );
}
}
} callback(ret);
}
});
} void hset(const string& hashname, const string& key, const string& value, const BOOL_RESULT_CALLBACK& callback = nullptr)
{
mStringStream << "INSERT INTO public." << mHashTableName << "(hashname, key, value) VALUES('" << hashname << "', '" << key << "', '" << value
<< "') ON CONFLICT (hashname, key) DO UPDATE SET value = EXCLUDED.value;"; postQuery(mStringStream.str(), [callback](const PGresult* result){
if (callback != nullptr)
{
callback(PQresultStatus(result) == PGRES_COMMAND_OK);
}
});
} void hgetall(const string& hashname, const STRINGMAP_RESULT_CALLBACK& callback = nullptr)
{
mStringStream << "SELECT key, value FROM public." << mHashTableName << " where hashname = '" << hashname << "';";
postQuery(mStringStream.str(), [callback](const PGresult* result){
if (callback != nullptr)
{
std::unordered_map<string, string> ret;
if (PQresultStatus(result) == PGRES_TUPLES_OK)
{
int num = PQntuples(result);
int fileds = PQnfields(result);
if (fileds == )
{
for (int i = ; i < num; i++)
{
ret[PQgetvalue(result, i, )] = PQgetvalue(result, i, );
}
}
} callback(ret);
}
});
} void postQuery(const string&& query, const RESULT_CALLBACK& callback = nullptr)
{
mPendingQuery.push({ std::move(query), callback});
mStringStream.str(std::string());
mStringStream.clear();
} void postQuery(const string& query, const RESULT_CALLBACK& callback = nullptr)
{
mPendingQuery.push({ query, callback });
mStringStream.str(std::string());
mStringStream.clear();
} public:
void poll(int millSecond)
{
ox_fdset_poll(mfdset, millSecond); std::vector<int> closeFds; for (auto& it : mConnections)
{
auto fd = it.first;
auto connection = it.second;
auto pgconn = connection->pgconn; if (ox_fdset_check(mfdset, fd, ReadCheck))
{
if (PQconsumeInput(pgconn) > && PQisBusy(pgconn) == )
{
bool successGetResult = false; while (true)
{
auto result = PQgetResult(pgconn);
if (result != nullptr)
{
successGetResult = true;
if (connection->callback != nullptr)
{
connection->callback(result);
connection->callback = nullptr;
}
PQclear(result);
}
else
{
break;
}
} if (successGetResult)
{
mIdleConnections.push_back(connection);
}
} if (PQstatus(pgconn) == CONNECTION_BAD)
{
closeFds.push_back(fd);
}
} if (ox_fdset_check(mfdset, fd, WriteCheck))
{
if (PQflush(pgconn) == )
{
//移除可写检测
ox_fdset_del(mfdset, fd, WriteCheck);
}
}
} for (auto& v : closeFds)
{
removeConnection(v);
}
} void trySendPendingQuery()
{
while (!mPendingQuery.empty() && !mIdleConnections.empty())
{
auto& query = mPendingQuery.front();
auto& connection = mIdleConnections.front(); if (PQsendQuery(connection->pgconn, query.request.c_str()) == )
{
cout << PQerrorMessage(connection->pgconn) << endl;
if (query.callback != nullptr)
{
query.callback(nullptr);
}
}
else
{
ox_fdset_add(mfdset, PQsocket(connection->pgconn), WriteCheck);
connection->callback = query.callback;
} mPendingQuery.pop();
mIdleConnections.pop_front();
}
} size_t pendingQueryNum() const
{
return mPendingQuery.size();
} size_t getWorkingQuery() const
{
return mConnections.size() - mIdleConnections.size();
} void createConnection( const char *pghost, const char *pgport,
const char *pgoptions, const char *pgtty,
const char *dbName, const char *login, const char *pwd,
int num)
{
for (int i = ; i < num; i++)
{
auto pgconn = PQsetdbLogin(pghost, pgport, pgoptions, pgtty, dbName, login, pwd);
if (PQstatus(pgconn) == CONNECTION_OK)
{
auto connection = std::make_shared<Connection>(pgconn, nullptr);
mConnections[PQsocket(pgconn)] = connection;
PQsetnonblocking(pgconn, );
ox_fdset_add(mfdset, PQsocket(pgconn), ReadCheck);
mIdleConnections.push_back(connection);
}
else
{
cout << PQerrorMessage(pgconn);
PQfinish(pgconn);
pgconn = nullptr;
}
} if (!mConnections.empty())
{
sCreateTable((*mConnections.begin()).second->pgconn, mKVTableName, mHashTableName);
}
} private:
void removeConnection(int fd)
{
auto it = mConnections.find(fd);
if (it != mConnections.end())
{
auto connection = (*it).second;
for (auto it = mIdleConnections.begin(); it != mIdleConnections.end(); ++it)
{
if ((*it)->pgconn == connection->pgconn)
{
mIdleConnections.erase(it);
break;
}
} ox_fdset_del(mfdset, fd, ReadCheck | WriteCheck);
PQfinish(connection->pgconn);
mConnections.erase(fd);
}
} private:
static void sCreateTable(PGconn* conn, const string& kvTableName, const string& hashTableName)
{
{
string query = "CREATE TABLE public.";
query += kvTableName;
query += "(key character varying NOT NULL, value json, CONSTRAINT key PRIMARY KEY(key))";
PGresult* exeResult = PQexec(conn, query.c_str());
auto status = PQresultStatus(exeResult);
auto errorStr = PQresultErrorMessage(exeResult);
PQclear(exeResult);
} {
string query = "CREATE TABLE public.";
query += hashTableName;
query += "(hashname character varying, key character varying, value json, "
"CONSTRAINT hk PRIMARY KEY (hashname, key))";
PGresult* exeResult = PQexec(conn, query.c_str());
auto status = PQresultStatus(exeResult);
auto errorStr = PQresultErrorMessage(exeResult);
PQclear(exeResult);
}
} private:
struct QueryAndCallback
{
std::string request;
RESULT_CALLBACK callback;
}; struct Connection
{
PGconn* pgconn;
RESULT_CALLBACK callback; Connection(PGconn* p, RESULT_CALLBACK c)
{
pgconn = p;
callback = c;
}
}; const string mKVTableName;
const string mHashTableName; stringstream mStringStream;
fdset_s* mfdset; std::unordered_map<int, shared_ptr<Connection>> mConnections;
std::list<shared_ptr<Connection>> mIdleConnections; std::queue<QueryAndCallback> mPendingQuery; /*TODO::监听wakeup支持*/
/*TODO::考虑固定分配connection给某业务*/ /*TODO::编写储存过程,替换现有的hashtable模拟方式,如循环使用jsonb_set以及 select value->k1, value->k2 from ...*/
/*TODO::编写储存过程,实现list*/
}; int main()
{
using std::chrono::system_clock; AsyncPGClient asyncClient;
asyncClient.createConnection("192.168.12.1", "", nullptr, nullptr, "postgres", "postgres", "", );
system_clock::time_point startTime = system_clock::now(); auto nowTime = time(NULL); for (int i = ; i < ; i++)
{
if(false)
{
string test = "INSERT INTO public.kv_data(key, value) VALUES ('";
test += std::to_string(nowTime*+i);
test += "', '{\"hp\":100000}') ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value;"; asyncClient.postQuery(test);
}
else
{
asyncClient.postQuery("select * from public.kv_data where key='dd';");
}
} asyncClient.postQuery("INSERT INTO public.kv_data(key, value) VALUES ('dodo5', '{\"hp\":100000}') "
" ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value", [](const PGresult* result){
cout << "fuck" << endl;
}); asyncClient.get("dd", [](const string& value){
cout << "get dd : " << value << endl;
}); asyncClient.set("dd", "{\"hp\":456}", [](bool isOK){
cout << "set dd : " << isOK << endl;
}); asyncClient.hget("heros:dodo", "hp", [](const string& value){
cout << "hget heros:dodo:" << value << endl;
}); asyncClient.hset("heros:dodo", "hp", "{\"hp\":1}", [](bool isOK){
cout << "hset heros:dodo:" << isOK << endl;
}); asyncClient.hmget("heros:dodo", { "hp", "money" }, [](const unordered_map<string, string>& kvs){
cout << "hmget:" << endl;
for (auto& kv : kvs)
{
cout << kv.first << " : " << kv.second << endl;
}
}); asyncClient.hgetall("heros:dodo", [](const unordered_map<string, string>& kvs){
cout << "hgetall:" << endl;
for (auto& kv : kvs)
{
cout << kv.first << " : " << kv.second << endl;
}
}); while (true)
{
asyncClient.poll();
asyncClient.trySendPendingQuery();
if (asyncClient.pendingQueryNum() == && asyncClient.getWorkingQuery() == )
{
break;
}
} auto elapsed = system_clock::now() - startTime;
cout << "cost :" << chrono::duration<double>(elapsed).count() << "s" << endl;
cout << "enter any key exit" << endl;
cin.get();
return ;
}

代码地址:https://github.com/IronsDu/accumulation-dev/blob/master/examples/Pgedis.cpp

PostgreSQL异步客户端(并模拟redis 数据结构)的更多相关文章

  1. redis数据结构、持久化、缓存淘汰策略

    Redis 单线程高性能,它所有的数据都在内存中,所有的运算都是内存级别的运算,而且单线程避免了多线程的切换性能损耗问题.redis利用epoll来实现IO多路复用,将连接信息和事件放到队列中,依次放 ...

  2. Redis 数据结构与内存管理策略(上)

    Redis 数据结构与内存管理策略(上) 标签: Redis Redis数据结构 Redis内存管理策略 Redis数据类型 Redis类型映射 Redis 数据类型特点与使用场景 String.Li ...

  3. Redis学习系列六ZSet&lpar;有序列表&rpar;及Redis数据结构的过期

    一.简介 ZSet可以说是Redis中最有趣的数据结构了,因为他兼具了Hash集合和Set的双重特性,也是用的最多的,保证了value值的唯一性的同时,,同时又保证了高性能,最主要的是还可以给每个Va ...

  4. 5种Redis数据结构详解

    本文主要和大家分享 5种Redis数据结构详解,希望文中的案例和代码,能帮助到大家. 转载链接:https://www.php.cn/php-weizijiaocheng-388126.html 2. ...

  5. 深入Redis客户端&lpar;redis客户端属性、redis缓冲区、关闭redis客户端&rpar;

    深入Redis客户端(redis客户端属性.redis缓冲区.关闭redis客户端) Redis 数据库采用 I/O 多路复用技术实现文件事件处理器,服务器采用单线程单进程的方式来处理多个客户端发送过 ...

  6. Redis 数据结构使用场景

    转自http://get.ftqq.com/523.get 一.redis 数据结构使用场景 原来看过 redisbook 这本书,对 redis 的基本功能都已经熟悉了,从上周开始看 redis 的 ...

  7. Redis 数据结构与内存管理策略(下)

    Redis 数据结构与内存管理策略(下) 标签: Redis Redis数据结构 Redis内存管理策略 Redis数据类型 Redis类型映射 Redis 数据类型特点与使用场景 String.Li ...

  8. 第18章 Redis数据结构常用命令

    18-1 字符串的一些基本命令 18-1 :配置Spring关于Redis字符串的运行环境 <bean id="poolConfig" class="redis.c ...

  9. redis 模拟redis server接收信息

    一.实现说明 客户端使用jedis正常set值到redis服务器   2. 模拟服务器接收jedis发送的信息 二.jedis客户端代码 package com.ahd.redis; import r ...

随机推荐

  1. Gradle版本变更的问题

    了解相关三个概念 gradle .gradle wrapper . gradle plugin (1)Gradle  :  项目的构建工具,管理一个项目的依赖架包.性质和maven相似. (2)Gra ...

  2. 学习K&amp&semi;R时初学者经常遇到的一个问题——EOF

    学习K&R时初学者经常遇到的一个问题——EOF

  3. Linux下使用mke2fsk格式化虚拟磁盘分区的方法

    原文地址:http://www.2cto.com/os/201202/119963.html 我们用dd命令就可以创建一个raw格式的虚拟磁盘,通常Xen就是使用这种格式的虚拟磁盘,今天就来讨论下怎样 ...

  4. Android 从硬件到应用程序:一步一步爬上去 6 -- 我写的APP测试框架层硬件服务&lpar;终点&rpar;

    创Android Applicationproject:采用Eclipse的Android插入ADT创Androidproject,project名字Gpio,创建完成后,project文件夹pack ...

  5. loj&period;ac&colon;&num;10024&period; 「一本通 1&period;3 练习 3」质数方阵

    CSDN的博客 友键 题目描述 质数方阵是一个\(5×5\)的方阵,每行.每列.两条对角线上的数字可以看作是五位的素数.方格中的行按照从左到右的顺序组成一个素数,而列按照从上到下的顺序.两条对角线也是 ...

  6. Scala微服务架构 三

    四 Controller层 之前我们已经把基层架构搭建好了,那么要如何使用呢? 首先看看我的Controller层代码 @Singleton class BMAuthController @Injec ...

  7. css 填坑常用代码分享&lbrack;居家实用型&rsqb;

    原文地址 http://www.cnblogs.com/jikey/p/4233003.html 以下是常用的代码收集,没有任何技术含量,只是填坑的积累.转载请注明出处,谢谢. 一. css 2.x ...

  8. CentOS7 修改分辨率

    1. 修改文件: vi /boot/grub2/grub.cfg 2. 在linux16 开头的哪一行 增加 vga=0x341 修改为1024x768 3. 重启..

  9. MoreEffectiveC&plus;&plus;Item35&lpar;异常&rpar;&lpar;条款9-15&rpar;

    条款9 使用析构函数防止内存泄漏 条款10 在构造函数中防止内存泄漏 条款11 禁止异常信息传递到析构函数外 条款12 理解"抛出一个异常''与"传递一个参数"或调用一个 ...

  10. poj2002 数正方形 (哈希&plus;几何)

    题目传送门 题目大意:给你一堆点,问你能组成几个正方形. 思路:一开始想的是用对角线的长度来当哈希的key,但判断正方形会太复杂,然后就去找了一下正方形的判断方法,发现 已知: (x1,y1) (x2 ...