【RabbitMQ 项目】客户端:连接模块
#pragma once
#include "muduo/protobuf/codec.h"
#include "muduo/protobuf/dispatcher.h"
#include "muduo/base/Logging.h"
#include "muduo/base/Mutex.h"
#include "muduo/net/EventLoop.h"
#include "muduo/net/TcpClient.h"
#include "muduo/net/EventLoopThread.h"
#include "muduo/base/CountDownLatch.h"
#include "../common/ThreadPool.hpp"
#include "Channel.hpp"
#include <functional>
#include <iostream>
namespace ns_connection
{
using ThreadPoolPtr = std::shared_ptr<ns_tp::ThreadPool>;
using ProtobufCodecPtr = std::shared_ptr<ProtobufCodec>;
using ProtobufDispatcherPtr = std::shared_ptr<ProtobufDispatcher>;
using ChannelPtr = std::shared_ptr<ns_channel::Channel>;
using ThreadPoolPtr = std::shared_ptr<ns_tp::ThreadPool>;
using CommonResponsePtr = std::shared_ptr<ns_protocol::CommomResponse>;
using PushMessageResonsePtr = std::shared_ptr<ns_protocol::PushMessageResponse>;
/***********
* Connection是对底层用于通信的TCP套接字封装(muduo库中的TcpConnectionPtr)
* 一个Connection中包含多个信道,当Connection关闭,信道也会销毁
* ******************/
class Connection
{
private:
muduo::net::EventLoopThread _loopThread;
muduo::CountDownLatch _latch;
muduo::net::TcpClient _client;
muduo::net::TcpConnectionPtr _connPtr;
ProtobufDispatcherPtr _distpatcherPtr;
ProtobufCodecPtr _codecPtr;
ns_channel::ChannelManager _channelManager;
ThreadPoolPtr _threadPoolPtr;
public:
Connection(const std::string &serverIp, int serverPort, const ThreadPoolPtr &threadPoolPtr)
: _loopThread(),
_latch(1),
_client(_loopThread.startLoop(), muduo::net::InetAddress(serverIp, serverPort), "client"),
_connPtr(),
_channelManager(),
_threadPoolPtr(threadPoolPtr)
{
// 构造成员
_distpatcherPtr = std::make_shared<ProtobufDispatcher>((std::bind(&Connection::onUnknownMessage,
this,
std::placeholders::_1,
std::placeholders::_2,
std::placeholders::_3)));
_codecPtr = std::make_shared<ProtobufCodec>((std::bind(&ProtobufDispatcher::onProtobufMessage,
_distpatcherPtr.get(),
std::placeholders::_1,
std::placeholders::_2,
std::placeholders::_3)));
// 给Client注册两个回调函数
_client.setConnectionCallback(std::bind(&Connection::onConnection, this, std::placeholders::_1));
_client.setMessageCallback(std::bind(&ProtobufCodec::onMessage, _codecPtr.get(), std::placeholders::_1,
std::placeholders::_2, std::placeholders::_3));
_distpatcherPtr->registerMessageCallback<ns_protocol::CommomResponse>(std::bind(&Connection::onCommonResponse,
this, std::placeholders::_1,
std::placeholders::_2,
std::placeholders::_3));
_distpatcherPtr->registerMessageCallback<ns_protocol::PushMessageResponse>(std::bind(&Connection::onRecvMessage,
this, std::placeholders::_1,
std::placeholders::_2,
std::placeholders::_3));
connect();
}
void connect()
{
_client.connect(); // 非阻塞
_latch.wait();
}
ChannelPtr openChannel()
{
// 只是在本地建立了信道
auto channelPtr = _channelManager.openChannel(_connPtr, _codecPtr);
// 通过该信道发送建立信道的请求,要服务端也建立对应的信道
if (!channelPtr->openChannel())
{
LOG(WARNING) << "打开信道失败" << endl;
// 关闭本地的信道,防止内存泄漏
_channelManager.closeChannel(channelPtr->_id);
}
return channelPtr;
}
void closeChannel(const ChannelPtr& channelPtr)
{
// 发送关闭信道的请求,让服务端关闭信道
channelPtr->closeChannel();
// 把本地信道关掉
_channelManager.closeChannel(channelPtr->_id);
}
private:
// 给_client设置的回调
void onConnection(muduo::net::TcpConnectionPtr connPtr)
{
if (connPtr->connected())
{
_connPtr = connPtr;
_latch.countDown();
}
else
{
_connPtr.reset();
}
}
void onUnknownMessage(const muduo::net::TcpConnectionPtr &connPtr,
const MessagePtr &resp, muduo::Timestamp time)
{
LOG(WARNING) << "未知响应" << endl;
}
// 业务处理函数
void onCommonResponse(const muduo::net::TcpConnectionPtr &connPtr,
const CommonResponsePtr &respPtr, muduo::Timestamp time)
{
//LOG(DEBUG) << "收到CommonResponse, respId: " << respPtr->response_id() << endl;
std::string channeId = respPtr->channel_id();
auto channelPtr = _channelManager.getChannel(channeId);
channelPtr->putCommonResponse(respPtr);
}
void onRecvMessage(const muduo::net::TcpConnectionPtr &connPtr,
const PushMessageResonsePtr &respPtr, muduo::Timestamp time)
{
//LOG(DEBUG) << "收到消息, body: " << respPtr->msg().saved_info().body() << endl;
std::string channeId = respPtr->channel_id();
auto channelPtr = _channelManager.getChannel(channeId);
// 把处理消息的任务交给线程池来做
_threadPoolPtr->push(std::bind(&ns_channel::Channel::consumeMessage, channelPtr.get(),
respPtr->qname(), respPtr->msg()));
}
};
}