【RabbitMQ 项目】客户端:连接模块

时间:2024-10-01 07:29:55
#pragma once #include "muduo/protobuf/codec.h" #include "muduo/protobuf/dispatcher.h" #include "muduo/base/Logging.h" #include "muduo/base/Mutex.h" #include "muduo/net/EventLoop.h" #include "muduo/net/TcpClient.h" #include "muduo/net/EventLoopThread.h" #include "muduo/base/CountDownLatch.h" #include "../common/ThreadPool.hpp" #include "Channel.hpp" #include <functional> #include <iostream> namespace ns_connection { using ThreadPoolPtr = std::shared_ptr<ns_tp::ThreadPool>; using ProtobufCodecPtr = std::shared_ptr<ProtobufCodec>; using ProtobufDispatcherPtr = std::shared_ptr<ProtobufDispatcher>; using ChannelPtr = std::shared_ptr<ns_channel::Channel>; using ThreadPoolPtr = std::shared_ptr<ns_tp::ThreadPool>; using CommonResponsePtr = std::shared_ptr<ns_protocol::CommomResponse>; using PushMessageResonsePtr = std::shared_ptr<ns_protocol::PushMessageResponse>; /*********** * Connection是对底层用于通信的TCP套接字封装(muduo库中的TcpConnectionPtr) * 一个Connection中包含多个信道,当Connection关闭,信道也会销毁 * ******************/ class Connection { private: muduo::net::EventLoopThread _loopThread; muduo::CountDownLatch _latch; muduo::net::TcpClient _client; muduo::net::TcpConnectionPtr _connPtr; ProtobufDispatcherPtr _distpatcherPtr; ProtobufCodecPtr _codecPtr; ns_channel::ChannelManager _channelManager; ThreadPoolPtr _threadPoolPtr; public: Connection(const std::string &serverIp, int serverPort, const ThreadPoolPtr &threadPoolPtr) : _loopThread(), _latch(1), _client(_loopThread.startLoop(), muduo::net::InetAddress(serverIp, serverPort), "client"), _connPtr(), _channelManager(), _threadPoolPtr(threadPoolPtr) { // 构造成员 _distpatcherPtr = std::make_shared<ProtobufDispatcher>((std::bind(&Connection::onUnknownMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3))); _codecPtr = std::make_shared<ProtobufCodec>((std::bind(&ProtobufDispatcher::onProtobufMessage, _distpatcherPtr.get(), std::placeholders::_1, std::placeholders::_2, std::placeholders::_3))); // 给Client注册两个回调函数 _client.setConnectionCallback(std::bind(&Connection::onConnection, this, std::placeholders::_1)); _client.setMessageCallback(std::bind(&ProtobufCodec::onMessage, _codecPtr.get(), std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); _distpatcherPtr->registerMessageCallback<ns_protocol::CommomResponse>(std::bind(&Connection::onCommonResponse, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); _distpatcherPtr->registerMessageCallback<ns_protocol::PushMessageResponse>(std::bind(&Connection::onRecvMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); connect(); } void connect() { _client.connect(); // 非阻塞 _latch.wait(); } ChannelPtr openChannel() { // 只是在本地建立了信道 auto channelPtr = _channelManager.openChannel(_connPtr, _codecPtr); // 通过该信道发送建立信道的请求,要服务端也建立对应的信道 if (!channelPtr->openChannel()) { LOG(WARNING) << "打开信道失败" << endl; // 关闭本地的信道,防止内存泄漏 _channelManager.closeChannel(channelPtr->_id); } return channelPtr; } void closeChannel(const ChannelPtr& channelPtr) { // 发送关闭信道的请求,让服务端关闭信道 channelPtr->closeChannel(); // 把本地信道关掉 _channelManager.closeChannel(channelPtr->_id); } private: // 给_client设置的回调 void onConnection(muduo::net::TcpConnectionPtr connPtr) { if (connPtr->connected()) { _connPtr = connPtr; _latch.countDown(); } else { _connPtr.reset(); } } void onUnknownMessage(const muduo::net::TcpConnectionPtr &connPtr, const MessagePtr &resp, muduo::Timestamp time) { LOG(WARNING) << "未知响应" << endl; } // 业务处理函数 void onCommonResponse(const muduo::net::TcpConnectionPtr &connPtr, const CommonResponsePtr &respPtr, muduo::Timestamp time) { //LOG(DEBUG) << "收到CommonResponse, respId: " << respPtr->response_id() << endl; std::string channeId = respPtr->channel_id(); auto channelPtr = _channelManager.getChannel(channeId); channelPtr->putCommonResponse(respPtr); } void onRecvMessage(const muduo::net::TcpConnectionPtr &connPtr, const PushMessageResonsePtr &respPtr, muduo::Timestamp time) { //LOG(DEBUG) << "收到消息, body: " << respPtr->msg().saved_info().body() << endl; std::string channeId = respPtr->channel_id(); auto channelPtr = _channelManager.getChannel(channeId); // 把处理消息的任务交给线程池来做 _threadPoolPtr->push(std::bind(&ns_channel::Channel::consumeMessage, channelPtr.get(), respPtr->qname(), respPtr->msg())); } }; }

相关文章