淘宝开源网络框架tbnet 之 iocomponent

时间:2022-08-30 07:25:50

在上篇文章中,我们简要的讨论了tbnet里面的packet类,这个类主要肩负了其他数据包的轮廓定义,而接下来的这个类则主要是用于抽象tbnet库的输入输出操作,这个类就是今天我们要讨论的iocomponent类,这个类很强大,其封装了tbnet里面的所有的数据包输入输出操作,接下来,我们就来看看这个无比强大的类吧,首先从成员变量说起,代码如下:

protected:
Transport *_owner;
Socket *_socket; // Ò»¸öSocketµÄÎļþ¾ä±ú
SocketEvent *_socketEvent;
int _state; // Á¬½Ó״̬
atomic_t _refcount; // ÒýÓüÆÊý
bool _autoReconn; // ÊÇ·ñÖØÁ¬
bool _isServer; // ÊÇ·ñΪ·þÎñÆ÷¶Ë
bool _inUsed; // ÊÇ·ñÔÚÓÃ
int64_t _lastUseTime; // ×î½üʹÓõÄϵͳʱ¼ä

private:
IOComponent *_prev; // ÓÃÓÚÁ´±í
IOComponent *_next; // ÓÃÓÚÁ´±í

在该成员变量中,owner这个变量,之前分析transport中有所涉及,其作用就是指定这个iocomponent对象的归属问题,而socket变量则是直接与底层的通讯打交道的接口,而_socketEvent变量,其作用跟owner差不多,也是指明归属,我们来回忆下,在transport中,当创建了一个iocomponent对象时,我们调用了其setSocketEvent和将对应的socket插入到了socketEvent中,不知道大家是否还记得,这类应用在一些比较底层的代码里面用的很多,在iocomponent类中,我们可以看到其创建的对象包含了前向指针和后向指针,可以看出iocomponent对象时基于链表存储的,至于其他的几个成员,就是一些基本属性了,例如_refcount这个主要是用于记录这个对象被应用的次数,主要是基于共享对象来考虑,而_autoReconn 则是针对TCPComponent对象而言的,_isServer这个对象则是针对TCPAccepter对象来说的,_isUsed指明对象是否可用,_lastUserTime则是用于记录对象最后一次使用时间,变量说的差不多了,我们就来看看一些比较重要的函数吧,代码如下:

class IOComponent {
friend class Transport;

public:
enum {
TBNET_CONNECTING = 1,
TBNET_CONNECTED,
TBNET_CLOSED,
TBNET_UNCONNECTED
};

public:
/*
* ¹¹Ô캯Êý
*/
IOComponent(Transport *owner, Socket *socket);
...
virtual bool init(bool isServer = false) = 0;

/*
* ¹Ø±Õ
*/
virtual void close() {}

/*
* µ±ÓÐÊý¾Ý¿Éдµ½Ê±±»Transportµ÷ÓÃ
*
* @return ÊÇ·ñ³É¹¦, true - ³É¹¦, false - ʧ°Ü¡£
*/
virtual bool handleWriteEvent() = 0;

/*
* µ±ÓÐÊý¾Ý¿É¶Áʱ±»Transportµ÷ÓÃ
*
* @return ÊÇ·ñ³É¹¦, true - ³É¹¦, false - ʧ°Ü¡£
*/
virtual bool handleReadEvent() = 0;

/*
* ÉèÖÃSocketEvent
*/
void setSocketEvent(SocketEvent *socketEvent) {
_socketEvent = socketEvent;
}

/*
* ÉèÖÃÄܶÁд
*
* @param writeOn дÊÇ·ñ´ò¿ª
*/
void enableWrite(bool writeOn) {
if (_socketEvent) {
_socketEvent->setEvent(_socket, true, writeOn);
}
}
/*
* Ôö¼ÓÒýÓüÆÊý
*/
int addRef() {
return atomic_add_return(1, &_refcount);
}

/*
* ¼õÉÙÒýÓüÆÊý
*/
void subRef() {
atomic_dec(&_refcount);
}

/*
* È¡³öÒýÓüÆÊý
*/
int getRef() {
return atomic_read(&_refcount);
}
...
Transport *getOwner();
};

在上述成员函数声明中,我们主要关注三个函数,init()、 handleWriteEvent()和handleReadEvent()函数,这三个函数基本上就涵盖了iocomponent对象的大部分功能,首先来看看这三个函数的定义吧,这三个函数在iocomponent类中被定义为虚函数,其实现是依据其子类定义的,而在tbnet中基于iocomponent类实现了三个子类::TCPAcceptor、TCPComponent以及UDPComponent类,我们就从TCPAcceptor类来说起吧,代码如下:

class TCPAcceptor : public IOComponent {

public:
/**
* ¹¹Ô캯Êý£¬ÓÉTransportµ
*
* @param owner: ÔËÊä²ã¶Ô
* @param socket: Socket¶ÔÏ
* @param streamer: Êý¾Ý°üµÄË«ÏòÁ÷£¬ÓÃpacket´´½¨£¬½â
* @param serverAdapter: ÓÃÔÚ·þÎñÆ÷¶Ë£¬µ±Connection³õʼ»¯¼°Channel´´
*/
TCPAcceptor(Transport *owner, Socket *socket,
IPacketStreamer *streamer, IServerAdapter *serverAdapter);

/*
* ³õʼ
*
* @return ÊÇ·ñ³É
*/
bool init(bool isServer = false);

/**
* µ±ÓÐÊý¾Ý¿É¶Áʱ±»Transp
*
* @return ÊÇ·ñ³É¹¦, true - ³É¹¦, false - Ê
*/
bool handleReadEvent();

/**
* ÔÚacceptÖÐÃ»Ó Ð´Ê
*/
bool handleWriteEvent() {
return true;
}

/*
* ³¬Ê±¼
*
* @param now µ±Ç°Ê±¼ä(µ¥Î
*/
void checkTimeout(int64_t now);

private:
IPacketStreamer *_streamer; // Êý¾Ý°ü½â
IServerAdapter *_serverAdapter; // ·þÎñÆ÷ÊÊ
};

首先来看看TCPAcceptor类的成员变量,_streamer和_serverAdapter,这两个变量都与数据包的传输有关系,后面会讨论到,在此先略过,在函数中我们主要看看上面提到的是三个函数的实现,代码如下:

bool TCPAcceptor::init(bool isServer) {
_socket->setSoBlocking(false);
return ((ServerSocket*)_socket)->listen();
}

在这里面,我们可以看到相关socket的操作,这个socket是从iocomponent继承下来的,有人可能会说那父类的socket又是怎么得到的呢,其实你看看TCPAcceptor构造函数就只知道了,呵呵,首先将将TCPAcceptor关联的socket设置为非阻塞的,然后调用监听函数,从而实现了监听功能,到此我们的服务器就可以开始监听接收客户端的连接请求了,在transport类中,我们也有提及这方面的,大家如果忘记可以看看transport那篇blog,接下来我们来看看其他两个函数的实现代码:

bool TCPAcceptor::handleReadEvent() {
Socket *socket;
while ((socket = ((ServerSocket*)_socket)->accept()) != NULL) {
//TBSYS_LOG(INFO, "ÓÐÐÂÁ¬½Ó½øÀ´, fd: %d", socket->getSocketHandle());
// TCPComponent, ÔÚ·þÎñÆ÷¶Ë
TCPComponent *component = new TCPComponent(_owner, socket, _streamer, _serverAdapter);

if (!component->init(true)) {
delete component;
return true;
}

// ¼ÓÈëµ½iocomponentsÖУ¬¼°×¢²á¿É¶Áµ½socketeventÖÐ
_owner->addComponent(component, true, false);
}

return true;
}

在这个函数中,accept这个函数,我想大家都应该看到了吧,通过accept来接受一个连接,然后将接收成功的连接封装成IOComponent对象,并将其插入到transport中,而最终的处理都会放在transport中,而这个函数的上层也是transport,总之,handleReadEvent这个函数其实只是一个中间过程,在TCPAcceptor中,并没有实现handleWriteEvent,至于原因我想大家应该知道的,因为在服务器端监听套接字只有读属性,并没有写属性;接下来吗,我们来看看另外一个IOComponent的子类吧,代码如下:

class TCPComponent : public IOComponent {
public:
/**
* ¹¹Ô캯Êý£¬ÓÉTransportµ
*
* @param owner: ÔËÊä²ã¶Ô
* @param host: ¼àÌýipµØÖ·»òhos
* @param port: ¼àÌý¶
* @param streamer: Êý¾Ý°üµÄË«ÏòÁ÷£¬ÓÃpacket´´½¨£¬½â
* @param serverAdapter: ÓÃÔÚ·þÎñÆ÷¶Ë£¬µ±Connection³õʼ»¯¼°Channel´´
*/
TCPComponent(Transport *owner, Socket *socket,
IPacketStreamer *streamer, IServerAdapter *serverAdapter);

/*
* Îö¹¹º
*/
~TCPComponent();

/*
* ³õʼ
*
* @return ÊÇ·ñ³É
*/
bool init(bool isServer = false);

/*
* ¹Ø
*/
void close();

/*
* µ±ÓÐÊý¾Ý¿Éдµ½Ê±±»Transportµ÷ÓÃ
*
* @return ÊÇ·ñ³É¹¦, true - ³É¹¦, false - ʧ°Ü¡£
*/
bool handleWriteEvent();

/*
* µ±ÓÐÊý¾Ý¿É¶Áʱ±»Transportµ÷ÓÃ
*
* @return ÊÇ·ñ³É¹¦, true - ³É¹¦, false - ʧ°Ü¡£
*/
bool handleReadEvent();

/*
* µÃµ½connection
*
* @return TCPConnection
*/
TCPConnection *getConnection() {
return _connection;
}

/*
* ³¬Ê±¼ì²é
*
* @param now µ±Ç°Ê±¼ä(µ¥Î»us)
*/
void checkTimeout(int64_t now);

/*
* Á¬½Óµ½socket
*/
bool socketConnect();

private:
// TCPÁ¬½Ó
TCPConnection *_connection;
int64_t _startConnectTime;
};

在这个类中,我们也只关注上面提到的三个函数,首先来看看init函数,代码如下:

bool TCPComponent::init(bool isServer) {
_socket->setSoBlocking(false);
_socket->setSoLinger(false, 0);
_socket->setReuseAddress(true);
_socket->setIntOption(SO_KEEPALIVE, 1);
_socket->setIntOption(SO_SNDBUF, 640000);
_socket->setIntOption(SO_RCVBUF, 640000);
// _socket->setTcpNoDelay(true);
if (!isServer) {
if (!socketConnect() && _autoReconn == false) {
return false;
}
} else {
_state = TBNET_CONNECTED;
}
_connection->setServer(isServer);
_isServer = isServer;

return true;
}

在这个函数中,主要是用于设置socket的一些属性,并且根据isSever类型来分类处理,如果是客户端的话则调用serverConnect函数来实现连接服务器的功能,而如果是服务器则会设置IOComponent的状态,因为服务器充当被动接收的角色,故如果是服务器的连接的话,一般来说应该是连接已经成功,接下来,我们来看看其他的两个函数,代码如下:

bool TCPComponent::handleWriteEvent() {
_lastUseTime = tbsys::CTimeUtil::getTime();
bool rc = true;
if (_state == TBNET_CONNECTED) {
rc = _connection->writeData();
} else if (_state == TBNET_CONNECTING) {
int error = _socket->getSoError();
if (error == 0) {
enableWrite(true);
_connection->clearOutputBuffer();
_state = TBNET_CONNECTED;
} else {
TBSYS_LOG(ERROR, "Á¬½Óµ½ %s ʧ°Ü: %s(%d)", _socket->getAddr().c_str(), strerror(error), error);
if (_socketEvent) {
_socketEvent->removeEvent(_socket);
}
_socket->close();
_state = TBNET_CLOSED;
}
}
return rc;
}

在该函数中,实现了写入数据的要求,主要是通过writeData函数调用来实现,这个函数是在connection中实现的,在这里我们先不深入,等到讨论connect这个类的时候,再来讨论,在此只需要知道当有写事件时,IOComponent会通过这个函数将数据发送出去即可,同理在handleReadData函数中,同样调用了connection类里的函数,代码如下:

bool TCPComponent::handleReadEvent() {
_lastUseTime = tbsys::CTimeUtil::getTime();
bool rc = false;
if (_state == TBNET_CONNECTED) {
rc = _connection->readData();
}
return rc;
}

现在我们花点时间来看一看TCPComponent类里的一个函数,代码如下:

bool TCPComponent::socketConnect() {
if (_state == TBNET_CONNECTED || _state == TBNET_CONNECTING) {
return true;
}
_socket->setSoBlocking(false);
if (_socket->connect()) {
if (_socketEvent) {
_socketEvent->addEvent(_socket, true, true);
}
_state = TBNET_CONNECTED;
_startConnectTime = tbsys::CTimeUtil::getTime();
} else {
int error = Socket::getLastError();
if (error == EINPROGRESS || error == EWOULDBLOCK) {
_state = TBNET_CONNECTING;
if (_socketEvent) {
_socketEvent->addEvent(_socket, true, true);
}
} else {
_socket->close();
_state = TBNET_CLOSED;
TBSYS_LOG(ERROR, "Á¬½Óµ½ %s ʧ°Ü, %s(%d)", _socket->getAddr().c_str(), strerror(error), err
return false;
}
}
return true;
}

这个函数是在init里面调用过,从名字上就可以知道这个函数的作用了,其实主要是由于客户端连接服务器,连接成功后会将iocomponent对应的socket插入到异步事件队列中,并且将iocomponent对象的状态设置为TBNET_CONNECTED,如果在连接过程中出现了中断事件,也会将socket插入到异步时间队列中,只是此时iocomponent对象的状态设置为TBNET_CONNECTING,好了,还剩下最后一个IOComponent子类了,代码如下:

class UDPComponent : public IOComponent {

public:
/**
* ¹¹Ô캯Êý£¬ÓÉTransportµ
*
* @param owner: Transport
* @param socket: Socket
* @param streamer: Êý¾Ý°üµÄË«ÏòÁ÷£¬ÓÃpacket´´½¨£¬½â
* @param serverAdapter: ÓÃÔÚ·þÎñÆ÷¶Ë£¬µ±Connection³õʼ»¯¼°Channel´´
*/
UDPComponent(Transport *owner, Socket *socket, IPacketStreamer *streamer, IServerAdapter *serverAdapter);

/*
* Îö¹¹º
*/
~UDPComponent();

/*
* ³õʼ
*
* @return ÊÇ·ñ³É
*/
bool init(bool isServer = false);

/*
* ¹Ø
*/
void close();

/*
* µ±ÓÐÊý¾Ý¿Éдµ½Ê±±»Tran
*
* @return ÊÇ·ñ³É¹¦, true - ³É¹¦, false - Ê
*/
bool handleWriteEvent();

/*
* µ±ÓÐÊý¾Ý¿É¶Áʱ±»Transp
*
* @return ÊÇ·ñ³É¹¦, true - ³É¹¦, false - Ê
*/
bool handleReadEvent();
private:
__gnu_cxx::hash_map<int, UDPConnection*> _connections; // UDPÁ¬½Ó¼¯ºÏ
IPacketStreamer *_streamer; // streamer
IServerAdapter *_serverAdapter; // serveradapter
};

在这个类中,我们也只需要关注上面所提及的三个函数,至于成员变量其实都差不多,只是这里面多了一个_connect映射容器,其作用就是用于记录每个连接,先来看看init函数吧,代码如下:

bool UDPComponent::init(bool isServer) {
if (!isServer) { // ²»Òªconnect, ÊÇaccept²ú

if (!_socket->connect()) {
return false;
}
}
_isServer = isServer;
return true;
}

从上述代码中可以看出,init函数中完成了connect服务器的流程,我们都知道基于UDP传输的特点就是无连接的,无状态的,因而这里并没有做过多地操作,其他两个函数在UDPComponent中也没有具体的实现,原因在上面也说过了,由于UDP协议并没有连接的概念,因此,在tbnet中,对于UDP协议这块支持;力度有限,但是也不影响到UDP的使用,如果有需要自己也可以重写这块,好了,这部分写的有点长,只是简要的论述了下具体的代码实现了,有机会可以将这部分自己重新实现一遍,也不错,这篇博文到此结束,下篇我们就研究tbnet库的ipacketstreamer,ipackethandler以及iserveradaper,尽情期待,多谢了。

总结

      tbnet中的IOComponent是很重要的一块,基本上封装了tbnet库的数据包的输入输出相关操作,希望大家能够好好地看看这部分,这部分设计思想还是很不错的,封装的程度也很好,值得学习,推荐之,多谢了。

如有需要,请注明转载,多谢