很久没有写博客了,这半年多时间一直很忙,一直没有更新博客,今天心血来潮准备做一篇,刚好把最近的新研究东西拿出来给大家分享一下!自己以前的一个后台程序框架(应用于了很多应用项目,运营商***拦截系统,国内某视频聊天应用的后台系统等),里面的网络部分基于ACE来实现的,最近准备淘汰ACE,大部分组件功能打算重写,所以基于网络这块打算用libevent来实现,在做的过程中发现了一些问题,就是能找到的例子都是单线程实现的,有多线的例子也没有写得那么详细,都是很简单的实现,经过一周时间对源码和api的分析,自己做了实现,经过测试还没有发现问题,效率上比之前的框架做了很大的提升,今天给大家贴出来,做分享交流。
NetFrame.h
//
// NetFrame.h
// Frame
//
// Created by chenjianjun on 15/9/7.
// Copyright (c) 2015年 jsbn. All rights reserved.
//
#ifndef __Frame__NetFrame__
#define __Frame__NetFrame__
#include <event.h>
#include <glog/logging.h>
#include "Common.h"
#include "Thread.h"
namespace NAME_SPACE {
class NetFrame {
public:
static NetFrame* Instance();
int NetWorkInit();
int NetWorkExit();
protected:
NetFrame();
~NetFrame();
private:
class NetRunnable:public Runnable {
public:
NetRunnable();
~NetRunnable();
protected:
virtual void Run(void*);
};
friend class NetRunnable;
DISALLOW_EVIL_CONSTRUCTORS(NetFrame);
public:
static struct event_base* _base;
private:
Thread _main_loop_thread;
volatile bool _run_flg;
NetRunnable _runnable;
};
}
#endif /* defined(__Frame__NetFrame__) */
NetFrame.cpp
//
// NetFrame.cpp
// Frame
//
// Created by chenjianjun on 15/9/7.
// Copyright (c) 2015年 jsbn. All rights reserved.
//
#include "NetFrame.h"
#include <event2/event.h>
namespace NAME_SPACE {
struct event_base* NetFrame::_base = nullptr;
NetFrame* NetFrame::Instance() {
LIBJINGLE_DEFINE_STATIC_LOCAL(NetFrame, manager, ());
return &manager;
}
NetFrame::NetFrame():_run_flg(false){}
NetFrame::~NetFrame(){
NetWorkExit();
}
NetFrame::NetRunnable::NetRunnable() {}
NetFrame::NetRunnable::~NetRunnable() {}
void NetFrame::NetRunnable::Run(void* arg) {
NetFrame* pNetFrame = (NetFrame*)arg;
while (pNetFrame->_run_flg) {
Thread::SleepMs(2000);
event_base_dispatch(NetFrame::_base);
}
}
int NetFrame::NetWorkInit() {
if (_run_flg) {
return FUNC_SUCCESS;
}
evthread_use_pthreads();
_base = event_base_new();
evthread_make_base_notifiable(_base);
// 开启事件监听主线程
_run_flg = true;
if (_main_loop_thread.Start(&_runnable, this)) {
return FUNC_SUCCESS;
}
// 开始线程失败置运行标志
_run_flg = false;
return FUNC_FAILED;
}
int NetFrame::NetWorkExit() {
if (!_run_flg) {
return FUNC_SUCCESS;
}
_run_flg = false;
event_base_loopexit(NetFrame::_base, nullptr);
_main_loop_thread.Stop();
event_base_free(_base);
_base = nullptr;
return FUNC_SUCCESS;
}
}
服务器对象类
ServerWorker.h
//
// ServerWorker.h
// 服务器对象类
//
// Created by chenjianjun on 15/9/7.
// Copyright (c) 2015年 jsbn. All rights reserved.
//
#ifndef __ServerWorker_H_
#define __ServerWorker_H_
#include <string>
#include <event2/listener.h>
#include "NetSignal.h"
namespace NAME_SPACE {
class ServerWorker {
public:
/**
* @brief 服务器构造函数
*
* @param listen_ip 监听的本地IP
* @param listen_port 监听的本地端口
*
* @return
*/
ServerWorker(std::string listen_ip, int listen_port);
ServerWorker(int listen_port);
~ServerWorker();
/**
* @brief 启动工作
*
* @param
*
* @return
*/
bool StartWork(TCPServerSignal* pTCPServerSignal);
/**
* @brief 停止工作
*
* @param
*
* @return
*/
void StopWork();
// 获取监听套接字
SOCKET GetFd() { return _listen_fd; }
public:
/**
* @brief 新连接处理,此函数外部禁止调用,用于event事件回调函数调用
*
* @param int 套接字句柄
* @param struct sockaddr_in * 客户端地址
*/
void Accept(int fd, struct sockaddr_in *sa);
/**
* @brief 监听失败事件处理,此函数外部禁止调用,用于event事件回调函数调用
*
* @param int 套接字句柄
* @param EM_NET_EVENT 错误码
*/
void AcceptError(int fd, EM_NET_EVENT msg);
private:
// 事件监听器
evconnlistener* _listener;
// 监听的IP
std::string _listen_ip;
// 监听的端口
unsigned short _listen_port;
// 监听的套接字
SOCKET _listen_fd;
// 连接器
TCPServerSignal* _pTCPServerSignal;
};
}
#endif /* defined(__ServerWorker_H_) */
ServerWorker.cpp
//
// ServerWorker.cpp
// Frame
//
// Created by chenjianjun on 15/9/7.
// Copyright (c) 2015年 jsbn. All rights reserved.
//
#include "ServerWorker.h"
#include "NetFrame.h"
namespace NAME_SPACE {
// 客户端连接事件回调处理函数
static void ListenerEventCb(evconnlistener *listener, evutil_socket_t fd,
sockaddr *sa, int socklen, void *user_data) {
ServerWorker *pServerWorker = (ServerWorker*)user_data;
struct linger l;
l.l_onoff = 1;
l.l_linger = 0;
setsockopt(fd, SOL_SOCKET, SO_LINGER, (void*)&l, sizeof(l));
pServerWorker->Accept(fd, (struct sockaddr_in *)sa);
}
// 监听失败回调处理函数
static void ListenerErrorCb(struct evconnlistener *listener, void *user_data) {
ServerWorker* pServerWorker = (ServerWorker*)user_data;
pServerWorker->AcceptError(pServerWorker->GetFd(),ENE_ACCEPT_ERROR);
//evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR()));
}
ServerWorker::ServerWorker(std::string listen_ip, int listen_port)
:_listen_ip(listen_ip),
_listen_port(listen_port),
_listener(nullptr),
_pTCPServerSignal(nullptr) {}
ServerWorker::ServerWorker(int listen_port)
:_listen_port(listen_port),
_listener(nullptr),
_pTCPServerSignal(nullptr) {
_listen_ip.clear();
}
bool ServerWorker::StartWork(TCPServerSignal* pTCPServerSignal) {
if (_listener) {
return false;
}
sockaddr_in sin;
memset(&sin, 0, sizeof(sin));
sin.sin_family = AF_INET;
if (!_listen_ip.empty()) { sin.sin_addr.s_addr = ::inet_addr(_listen_ip.c_str()); }
sin.sin_port = htons(_listen_port);
_listener = evconnlistener_new_bind(NetFrame::_base,
ListenerEventCb,
(void*)this,
LEV_OPT_REUSEABLE|LEV_OPT_CLOSE_ON_FREE,
-1,
(sockaddr*)&sin,
sizeof(sockaddr_in));
if( nullptr == _listener ) {
LOG(ERROR)<<"创建监听器失败,IP["<<_listen_ip<<":"<<_listen_port<<"]";
return false;
}
_pTCPServerSignal = pTCPServerSignal;
// 设置监听失败回调
evconnlistener_set_error_cb(_listener, ListenerErrorCb);
// 获取监听的套接字句柄
_listen_fd = evconnlistener_get_fd(_listener);
return true;
}
void ServerWorker::StopWork()
{
if (_listener) {
evconnlistener_free(_listener);
_listener = nullptr;
}
}
void ServerWorker::Accept(int fd, struct sockaddr_in *sa) {
if (_pTCPServerSignal) {
_pTCPServerSignal->SignalAccept(fd, sa);
}
}
void ServerWorker::AcceptError(int fd, EM_NET_EVENT msg) {
if (_pTCPServerSignal) {
_pTCPServerSignal->SignalAcceptError(fd, msg);
}
}
}
被动客户端连接类
PassiveTCPClient.h
//
// PassiveTCPClient.h
// Frame
//
// Created by chenjianjun on 15/9/7.
// Copyright (c) 2015年 jsbn. All rights reserved.
//
#ifndef __PassiveTCPClient_H_
#define __PassiveTCPClient_H_
#include <string>
#include <event.h>
#include <event2/listener.h>
#include "NetSignal.h"
namespace NAME_SPACE {
class PassiveTCPClient {
public:
/**
* @brief 构造函数
*
* @param SOCKET 套接字句柄
* @param sockaddr_in* 客户端地址
* @param short 心跳时间
*
* @return
*/
PassiveTCPClient(SOCKET fd, struct sockaddr_in* sa, short heart_time = 10);
~PassiveTCPClient();
/**
* @brief 启动工作
*
* @param
*
* @return
*/
bool StartWork(TCPClientSignal*);
/**
* @brief 停止工作
*
* @param
*
* @return
*/
void StopWork();
/**
* @brief 发送数据
*
* @paramv char* 数据
* @paramv size_t 数据长度
*
* @return
*/
int SendData(void* pdata, size_t len);
SOCKET GetFd() { return _fd; }
void SetHeartFlg(bool flg) { _heart_flg = flg; }
bool GetHeartFlg() { return _heart_flg; }
public:
/**
* @brief 接收数据,此函数外部禁止调用,用于event事件回调函数调用
*
* @param void* 数据
* @param size_t 数据长度
*/
void PutRecvData(void*, size_t);
/**
* @brief 事件处理,此函数外部禁止调用,用于event事件回调函数调用
*
* @param short 事件集合
*/
void ProcEvent(short events);
public:
/*
连接器类,这个分离很重要,如果不分离会出现小概率崩溃现象,主要是连接函数在调用的时候回调函数里面,
如果外部删除这个对象会出现内存访问异常,具体看实现(那里是一个自动锁实现,崩溃也在自动锁释放那里)
*/
TCPClientSignal* _pTCPClientSignal;
private:
// 客户端IP
std::string _client_ip;
// 客户端端口
unsigned short _client_port;
// 套接字句柄
SOCKET _fd;
// 心跳时间
short _heart_time;
// bufferevent
struct bufferevent *_bev;
// 心跳事件器
struct event *_event;
// 心跳标志
volatile bool _heart_flg;
};
}
#endif /* defined(__PassiveTCPClient_H_) */
PassiveTCPClient.cpp
//
// PassiveTCPClient.cpp
// 被动TCP客户端
//
// Created by chenjianjun on 15/9/7.
// Copyright (c) 2015年 jsbn. All rights reserved.
//
#include "PassiveTCPClient.h"
#include "NetFrame.h"
namespace NAME_SPACE {
void PassiveTCPTimeOutEventCb(evutil_socket_t fd, short, void *data) {
PassiveTCPClient *pPassiveTCPClient = (PassiveTCPClient*)data;
if (pPassiveTCPClient->GetHeartFlg()) {
// 超时清除标志
pPassiveTCPClient->SetHeartFlg(false);
} else {
// 心跳超时回调
pPassiveTCPClient->ProcEvent(BEV_EVENT_TIMEOUT);
}
}
void PassiveTCPReadEventCb(struct bufferevent *bev, void *data) {
PassiveTCPClient* pPassiveTCPClient = (PassiveTCPClient*)data;
static char databuf[40960];
size_t datalen = 0;
size_t nbytes;
while ((nbytes = evbuffer_get_length(bev->input)) > 0) {
evbuffer_remove(bev->input, databuf+datalen, sizeof(databuf)-datalen);
datalen += nbytes;
}
// 有数据往来,设置标志
pPassiveTCPClient->SetHeartFlg(true);
// 数据接收回调
pPassiveTCPClient->PutRecvData(databuf, datalen);
}
void PassiveTCPEventCb(struct bufferevent *bev, short events, void *data) {
PassiveTCPClient* pPassiveTCPClient = (PassiveTCPClient*)data;
// 处理事件
pPassiveTCPClient->ProcEvent(events);
}
PassiveTCPClient::PassiveTCPClient(SOCKET fd, struct sockaddr_in* sa, short heart_time)
:_fd(fd),
_client_ip(inet_ntoa(sa->sin_addr)),
_client_port(ntohs(sa->sin_port)),
_bev(nullptr),
_heart_flg(false),
_heart_time(heart_time),
_pTCPClientSignal(nullptr)
{}
PassiveTCPClient::~PassiveTCPClient() {
StopWork();
_pTCPClientSignal = nullptr;
}
bool PassiveTCPClient::StartWork(TCPClientSignal* pTCPClientSignal) {
if (_bev) {
return false;
}
_bev = bufferevent_socket_new(NetFrame::_base,
_fd,
BEV_OPT_CLOSE_ON_FREE|BEV_OPT_THREADSAFE);
if (_bev == nullptr) {
return false;
}
_event = event_new(NetFrame::_base,
_fd,
EV_TIMEOUT|EV_PERSIST,
PassiveTCPTimeOutEventCb, this);
if (_event == nullptr) {
bufferevent_free(_bev);
_bev = nullptr;
return false;
}
_pTCPClientSignal = pTCPClientSignal;
// 设置心跳检测时间
struct timeval timeout = {_heart_time, 0};
event_add(_event, &timeout);
bufferevent_setcb(_bev, PassiveTCPReadEventCb, nullptr, PassiveTCPEventCb, this);
bufferevent_enable(_bev, EV_READ);
return true;
}
void PassiveTCPClient::StopWork() {
if (_bev) {
bufferevent_disable(_bev, EV_READ);
bufferevent_free(_bev);
_bev = nullptr;
}
if (_event) {
event_del(_event);
event_free(_event);
_event = nullptr;
}
// 不要对_pPassiveTCPClientSignal置null,释放由外部传入者负责
}
int PassiveTCPClient::SendData(void* pdata, size_t len) {
if (_bev == nullptr) {
return FUNC_FAILED;
}
if (bufferevent_write(_bev, pdata, len) < 0) {
return FUNC_FAILED;
}
return FUNC_SUCCESS;
}
void PassiveTCPClient::PutRecvData(void* data, size_t len) {
if (_pTCPClientSignal) {
_pTCPClientSignal->SignalRecvData(_fd, data, len);
}
}
void PassiveTCPClient::ProcEvent(short events) {
if (!_pTCPClientSignal) {
return;
}
if (events & BEV_EVENT_CONNECTED) {
_pTCPClientSignal->SignalEvent(_fd, ENE_CONNECTED);
}
if(events & (BEV_EVENT_READING | BEV_EVENT_WRITING | BEV_EVENT_EOF | BEV_EVENT_ERROR | BEV_EVENT_TIMEOUT))
{
_pTCPClientSignal->SignalEvent(_fd, ENE_CLOSE);
}
}
}
主动客户端连接类
ActiveTCPClient.h
//
// ActiveTCPClient.h
// Frame 主动TCP客户端连接类
//
// Created by chenjianjun on 15/9/8.
// Copyright (c) 2015年 jsbn. All rights reserved.
//
#ifndef __ActiveTCPClient_H_
#define __ActiveTCPClient_H_
#include <string>
#include <event.h>
#include "NetSignal.h"
#include "RWLock.h"
namespace NAME_SPACE {
class ActiveTCPClient {
public:
explicit ActiveTCPClient(std::string host_name,
unsigned short host_port,
short heart_time = 10);
~ActiveTCPClient();
void SetTCPClientSignal(TCPClientSignal* pTCPClientSignal) { _pTCPClientSignal = pTCPClientSignal;}
/**
* @brief 启动工作
*
* @param
*
* @return
*/
bool StartWork();
/**
* @brief 停止工作
*
* @param
*
* @return
*/
void StopWork();
/**
* @brief 发送数据
*
* @paramv char* 数据
* @paramv size_t 数据长度
*
* @return
*/
int SendData(void* pdata, size_t len);
SOCKET GetFd() { return _fd; }
void SetHeartFlg(bool flg) { _heart_flg = flg; }
bool GetHeartFlg() { return _heart_flg; }
bool IsConnect() { return _connect_flg == 2;}
public:
/**
* @brief 接收数据,此函数外部禁止调用,用于event事件回调函数调用
*
* @param void* 数据
* @param size_t 数据长度
*/
void PutRecvData(void*, size_t);
/**
* @brief 事件处理,此函数外部禁止调用,用于event事件回调函数调用
*
* @param short 事件集合
*/
void ProcEvent(short events);
public:
/*
连接器类,这个分离很重要,如果不分离会出现小概率崩溃现象,主要是连接函数在调用的时候回调函数里面,
如果外部删除这个对象会出现内存访问异常,具体看实现(那里是一个自动锁实现,崩溃也在自动锁释放那里)
*/
TCPClientSignal* _pTCPClientSignal;
private:
// 服务器监听地址
std::string _host_name;
// 服务器监听端口
unsigned short _host_port;
// bufferevent
struct bufferevent *_bev;
// 心跳检测时间
short _heart_time;
// socket连接句柄
SOCKET _fd;
// 心跳事件器
struct event *_event;
// 心跳标志
volatile bool _heart_flg;
// 读写锁
RWLock* _m_rw_loacl;
// 连接标志 0:未连接 1:连接中 2:已连接
volatile unsigned char _connect_flg;
};
}
#endif /* defined(__ActiveTCPClient_H_) */
ActiveTCPClient.cpp
//
// ActiveTCPClient.cpp
// Frame
//
// Created by chenjianjun on 15/9/8.
// Copyright (c) 2015年 jsbn. All rights reserved.
//
#include "ActiveTCPClient.h"
#include "NetFrame.h"
namespace NAME_SPACE {
void ActiveTCPTimeOutEventCb(evutil_socket_t fd, short, void *data) {
ActiveTCPClient* pActiveTCPClient = (ActiveTCPClient*)data;
pActiveTCPClient->ProcEvent(BEV_EVENT_TIMEOUT);
}
void ActiveTCPEventCb(struct bufferevent *bev, short events, void *data) {
ActiveTCPClient* pActiveTCPClient = (ActiveTCPClient*)data;
pActiveTCPClient->ProcEvent(events);
}
void ActiveTCPReadEventCb(struct bufferevent *bev, void *data) {
ActiveTCPClient* pActiveTCPClient = (ActiveTCPClient*)data;
static char databuf[40960];
size_t datalen = 0;
size_t nbytes;
while ((nbytes = evbuffer_get_length(bev->input)) > 0) {
evbuffer_remove(bev->input, databuf+datalen, sizeof(databuf)-datalen);
datalen += nbytes;
}
// 有数据往来,设置标志
pActiveTCPClient->SetHeartFlg(true);
// 数据接收回调
pActiveTCPClient->PutRecvData(databuf, datalen);
}
ActiveTCPClient::ActiveTCPClient(std::string host_name, unsigned short host_port, short heart_time)
:_host_name(host_name),
_host_port(host_port),
_heart_time(heart_time),
_bev(nullptr),
_pTCPClientSignal(nullptr),
_heart_flg(false),
_event(nullptr),
_connect_flg(0),
_m_rw_loacl(RWLock::Create()) {
}
ActiveTCPClient::~ActiveTCPClient() {
StopWork();
_pTCPClientSignal = nullptr;
delete _m_rw_loacl;
}
bool ActiveTCPClient::StartWork() {
WriteLockScoped wLock(*_m_rw_loacl);
if (_bev) {
return false;
}
_fd = socket(AF_INET, SOCK_STREAM, 0);
evutil_make_socket_nonblocking(_fd);
if (_fd < 0) {
return false;
}
_bev = bufferevent_socket_new(NetFrame::_base, _fd, BEV_OPT_CLOSE_ON_FREE|BEV_OPT_THREADSAFE);
if (_bev == nullptr) {
close(_fd);
return false;
}
struct sockaddr_in sSvrAddr;
memset(&sSvrAddr, 0, sizeof(sSvrAddr));
sSvrAddr.sin_family = AF_INET;
sSvrAddr.sin_addr.s_addr = inet_addr(_host_name.c_str());
sSvrAddr.sin_port = htons(_host_port);
int addrlen = sizeof(struct sockaddr_in);
// 置为连接中状态
_connect_flg = 1;
if (bufferevent_socket_connect(_bev, (struct sockaddr*)&sSvrAddr, addrlen) < 0) {
_connect_flg = 0;
StopWork();
return false;
}
bufferevent_setcb(_bev, ActiveTCPReadEventCb, nullptr, ActiveTCPEventCb, this);
bufferevent_enable(_bev, EV_READ);
return true;
}
void ActiveTCPClient::StopWork() {
WriteLockScoped wLock(*_m_rw_loacl);
_connect_flg = 0;
if (_event) {
event_del(_event);
event_free(_event);
_event = nullptr;
}
if (_bev) {
bufferevent_disable(_bev, EV_READ);
bufferevent_free(_bev);
_bev = nullptr;
_fd = -1;
}
}
int ActiveTCPClient::SendData(void* pdata, size_t len) {
LOG(INFO)<<"发送数据1..........";
ReadLockScoped rLock(*_m_rw_loacl);
if (_bev == nullptr || _connect_flg != 2) {
return FUNC_FAILED;
}
if (bufferevent_write(_bev, pdata, len) < 0) {
return FUNC_FAILED;
}
// if (send(_fd, pdata, len, 0) < 0) {
// return FUNC_FAILED;
// }
LOG(INFO)<<"发送数据32..........";
return FUNC_SUCCESS;
}
void ActiveTCPClient::ProcEvent(short events) {
if (!_pTCPClientSignal) {
return;
}
if (events & BEV_EVENT_CONNECTED) {
// 已连接状态
_connect_flg = 2;
// 连接建立,开启心跳计数
_event = event_new(NetFrame::_base,
_fd,
EV_TIMEOUT|EV_PERSIST,
ActiveTCPTimeOutEventCb,
this);
// 设置心跳检测时间
struct timeval timeout = {_heart_time, 0};
event_add(_event, &timeout);
_pTCPClientSignal->SignalEvent(_fd, ENE_CONNECTED);
} else if (events & (BEV_EVENT_READING|BEV_EVENT_WRITING|BEV_EVENT_EOF|BEV_EVENT_TIMEOUT)) {
_pTCPClientSignal->SignalEvent(_fd, ENE_CLOSE);
} else {
_pTCPClientSignal->SignalEvent(_fd, EVE_UNKNOWN);
}
}
void ActiveTCPClient::PutRecvData(void* data, size_t len) {
if (_pTCPClientSignal) {
_pTCPClientSignal->SignalRecvData(_fd, data, len);
}
}
}
NetSignal.h
//借鉴了google的一个开源项目里面的sigslot机制,这里就不贴出来了,最后上一个测试代码
// NetSignal.h
// Frame
//
// Created by chenjianjun on 15/9/8.
// Copyright (c) 2015年 jsbn. All rights reserved.
//
#ifndef __NetSignal_H_
#define __NetSignal_H_
#include "Sigslot.h"
#include "Common.h"
namespace NAME_SPACE {
class TCPServerSignal {
public:
TCPServerSignal() {}
~TCPServerSignal() {}
// 客户端连接触发器
sigslot::signal2<SOCKET , struct sockaddr_in*> SignalAccept;
// 监听失败触发器
sigslot::signal2<SOCKET , EM_NET_EVENT> SignalAcceptError;
};
class TCPClientSignal {
public:
TCPClientSignal() {}
~TCPClientSignal() {}
// 数据接收连接器
sigslot::signal3<SOCKET, void*, size_t> SignalRecvData;
// 套接字事件处理器
sigslot::signal2<SOCKET, EM_NET_EVENT> SignalEvent;
};
}
#endif /* defined(__NetSignal_H_) */
#include <glog/logging.h>
#include <map>
#include "NetFrame.h"
#include "ServerWorker.h"
#include "PassiveTCPClient.h"
#include "ActiveTCPClient.h"
#include "NetSignal.h"
#include "RWLock.h"
using namespace NAME_SPACE;
// 测试服务器
class TestServer : public sigslot::has_slots<>, public TCPClientSignal, public TCPServerSignal {
public:
TestServer() {
pthread_mutex_init(&_work_mutex, nullptr);
}
~TestServer() {
pthread_mutex_destroy(&_work_mutex);
}
int Start() {
_pServerWorker = new ServerWorker("192.168.1.74",8088);
SignalAccept.connect(this, &TestServer::Accept);
SignalAcceptError.connect(this, &TestServer::Event);
SignalRecvData.connect(this, &TestServer::RecvData);
SignalEvent.connect(this, &TestServer::Event);
if (!_pServerWorker->StartWork(this)) {
LOG(ERROR)<<"服务器监听启动失败";
return FUNC_FAILED;
}
return FUNC_SUCCESS;
}
void Stop() {
_pServerWorker->StopWork();
pthread_mutex_lock(&_work_mutex);
std::map<SOCKET, PassiveTCPClient*>::iterator it = _map_clients.begin();
while (it != _map_clients.end()) {
it->second->StopWork();
delete it->second;
_map_clients.erase(it++);
}
pthread_mutex_unlock(&_work_mutex);
}
int SendData(SOCKET fd, void* data, size_t len) {
pthread_mutex_lock(&_work_mutex);
std::map<SOCKET, PassiveTCPClient*>::iterator it = _map_clients.find(fd);
if (it != _map_clients.end()) {
it->second->SendData(data, len);
}
pthread_mutex_unlock(&_work_mutex);
return 0;
}
public:
// 数据接收
void RecvData(SOCKET fd, void* data, size_t len) {
// 接收到数据就回显,正常的程序师丢到队列里面去,让其他线程来处理
SendData(fd, data, len);
}
// 套接字事件处理器
void Event(SOCKET fd, EM_NET_EVENT msg) {
LOG(ERROR)<<"收到事件通知."<< msg;
pthread_mutex_lock(&_work_mutex);
std::map<SOCKET, PassiveTCPClient*>::iterator it = _map_clients.find(fd);
if (it != _map_clients.end()) {
it->second->StopWork();
delete it->second;
_map_clients.erase(it);
}
pthread_mutex_unlock(&_work_mutex);
}
// 客户端连接触发器
void Accept(SOCKET fd, struct sockaddr_in* sa) {
LOG(ERROR)<<"收到客户端连接.";
pthread_mutex_lock(&_work_mutex);
std::map<SOCKET, PassiveTCPClient*>::iterator it = _map_clients.find(fd);
if (it != _map_clients.end()) {
it->second->StartWork(this);
delete it->second;
_map_clients.erase(it);
}
PassiveTCPClient* pPassiveTCPClient = new PassiveTCPClient(fd, sa, 15);
if (!pPassiveTCPClient->StartWork(this)) {
LOG(ERROR)<<"启动客户端失败";
} else {
_map_clients[fd] = pPassiveTCPClient;
}
pthread_mutex_unlock(&_work_mutex);
}
private:
ServerWorker* _pServerWorker;
pthread_mutex_t _work_mutex;
std::map<SOCKET, PassiveTCPClient*> _map_clients;
};
// 测试客户端
class TestClient : public sigslot::has_slots<>, public TCPClientSignal, public Runnable {
public:
TestClient():_is_run_flg(false) {
}
~TestClient() {
}
int Start() {
_pActiveTCPClient = new ActiveTCPClient("192.168.1.5", 8088, 15);
_pActiveTCPClient->SetTCPClientSignal(this);
SignalEvent.connect(this, &TestClient::Event);
SignalRecvData.connect(this, &TestClient::RecvData);
_is_run_flg = true;
if (!_connect_thread.Start(this)) {
_is_run_flg = false;
delete _pActiveTCPClient;
return FUNC_FAILED;
}
return FUNC_SUCCESS;
}
void Stop() {
if (_pActiveTCPClient) {
_is_run_flg = false;
_connect_thread.Stop();
SignalEvent.disconnect(this);
SignalRecvData.disconnect(this);
_pActiveTCPClient->StopWork();
delete _pActiveTCPClient;
_pActiveTCPClient = nullptr;
}
}
int SendData(void* data,size_t len) {
if (_pActiveTCPClient) {
_pActiveTCPClient->SendData(data, len);
}
return FUNC_SUCCESS;
}
// 数据接收
void RecvData(SOCKET fd, void* data, size_t len) {
// 接收到数据就回显,正常的程序师丢到队列里面去,让其他线程来处理
SendData(data, len);
}
// 套接字事件处理器
void Event(SOCKET fd, EM_NET_EVENT msg) {
if (msg == ENE_CONNECTED) {
} else {
_pActiveTCPClient->StopWork();
}
}
protected:
virtual void Run(void* arg) {
//TestClient* p = (TestClient*)arg;
while (_is_run_flg) {
if (!_pActiveTCPClient->IsConnect()) {
_pActiveTCPClient->StartWork();
}
Thread::SleepMs(2000);
}
}
private:
ActiveTCPClient* _pActiveTCPClient;
// 运行标志
volatile bool _is_run_flg;
// 连接检测线程
Thread _connect_thread;
};
int main(int argc,char* argv[]) {
// 初期化网络
if (NetFrame::Instance()->NetWorkInit() != FUNC_SUCCESS) {
LOG(ERROR)<<"网络初期化失败....";
return -1;
}
// {
// // 测试服务器
// TestServer mTestServer;
// mTestServer.Start();
// sleep(200);// 模拟测试,休眠10分钟时间来测试整个网络库
// mTestServer.Stop();
// }
// {
// // 测试客户端
// TestClient mTestClient;
// mTestClient.Start();
// char buf[4] = "bye";
// for (int i = 0; i < 200; ++i) {
// memset(buf, 0x00, sizeof(buf));
// sprintf(buf, "%03d", i);
// sleep(1);
// mTestClient.SendData(buf, 3);
// }
//
// mTestClient.Stop();
// }
// 关闭网络
NetFrame::Instance()->NetWorkExit();
return 0;
}
测试代码被注释了,打开就可以测试.
这里把主要代码贴了出来,一些其他的代码可以看我的github项目:
https://github.com/chenjianjun571/cioforandroid.git
https://github.com/chenjianjun571/cioforios.git