3 ACE_Reactor 同步框架 网络聊天室
ACE_Reactor 框架
网络聊天室
项目文件:
chunli@Linux:~/ace/AceChatRoom$ tree
.
├── ChatMain.cpp
├── ChatRoom.cpp
├── ChatRoom.h
├── ParticipantAcceptor.cpp
├── ParticipantAcceptor.h
├── Participant.cpp
├── Participant.h
├── SignalHandler.cpp
└── SignalHandler.h
0 directories, 9 files
chunli@Linux:~/ace/AceChatRoom$
主程序:
chunli@Linux:~/ace/AceChatRoom$ cat ChatMain.cpp #include <ace/Reactor.h>#include "ParticipantAcceptor.h"#include "SignalHandler.h"int main() { SignalHandler sh; ParticipantAcceptor acceptor; ACE_INET_Addr addr(8868); if (acceptor.open(addr) == -1) return 1; return ACE_Reactor::instance()->run_reactor_event_loop();}chunli@Linux:~/ace/AceChatRoom$
ChatRoom类文件
chunli@Linux:~/ace/AceChatRoom$ cat ChatRoom.h #ifndef CHATROOM_H_#define CHATROOM_H_#include <list>#include <ace/Singleton.h>#include <ace/Null_Mutex.h>class Participant;class ChatRoom {public: void join(Participant* user); void leave(Participant* user); void forwardMsg(const char* msg);private: std::list<Participant*> users;};// 不加锁的方式typedef ACE_Singleton<ChatRoom, ACE_Null_Mutex> Room;#endif /* CHATROOM_H_ */chunli@Linux:~/ace/AceChatRoom$ chunli@Linux:~/ace/AceChatRoom$ cat ChatRoom.cpp #include <cstring>#include <iostream>#include "ChatRoom.h"#include "Participant.h"void ChatRoom::join(Participant* user) {users.push_back(user);}void ChatRoom::leave(Participant* user) {std::list<Participant*>::iterator it = users.begin();for (; it != users.end(); ++it) {if (*it == user) {users.erase(it);break;}}}void ChatRoom::forwardMsg(const char* msg) {std::list<Participant*>::const_iterator it = users.begin();for (; it != users.end(); ++it) {ACE_SOCK_Stream& sock = (*it)->socket();if (sock.send(msg, std::strlen(msg)) == -1)(*it)->handle_close(ACE_INVALID_HANDLE, 0);}}chunli@Linux:~/ace/AceChatRoom$
ParticipantAcceptor类文件
chunli@Linux:~/ace/AceChatRoom$ cat ParticipantAcceptor.h #ifndef PARTICIPANTACCEPTOR_H_#define PARTICIPANTACCEPTOR_H_#include <ace/Reactor.h>#include <ace/Event_Handler.h>#include <ace/SOCK_Acceptor.h>class ParticipantAcceptor: ACE_Event_Handler {public: ParticipantAcceptor(ACE_Reactor* reactor = ACE_Reactor::instance()); virtual ~ParticipantAcceptor(); int open(const ACE_INET_Addr& addr); virtual ACE_HANDLE get_handle() const; virtual int handle_input(ACE_HANDLE h = ACE_INVALID_HANDLE); virtual int handle_close(ACE_HANDLE h, ACE_Reactor_Mask closeMask);private: ACE_SOCK_Acceptor acceptor;};#endif /* PARTICIPANTACCEPTOR_H_ */chunli@Linux:~/ace/AceChatRoom$ chunli@Linux:~/ace/AceChatRoom$ chunli@Linux:~/ace/AceChatRoom$ cat ParticipantAcceptor.cpp #include <ace/Log_Msg.h>#include "ParticipantAcceptor.h"#include "ChatRoom.h"#include "Participant.h"ParticipantAcceptor::ParticipantAcceptor(ACE_Reactor* reactor) { this->reactor(reactor);}ParticipantAcceptor::~ParticipantAcceptor() { handle_close(ACE_INVALID_HANDLE, 0);}int ParticipantAcceptor::open(const ACE_INET_Addr& addr) { if (acceptor.open(addr, 0) == -1) ACE_ERROR_RETURN((LM_ERROR, "%p\n", "acceptor.open"), -1); return reactor()->register_handler(this, ACE_Event_Handler::ACCEPT_MASK);}ACE_HANDLE ParticipantAcceptor::get_handle() const { return acceptor.get_handle();}int ParticipantAcceptor::handle_input(ACE_HANDLE h) { Participant* user = new Participant(reactor()); if (acceptor.accept(user->socket()) == -1) ACE_ERROR_RETURN((LM_ERROR, "%p\n", "acceptor.accept"), -1); if (user->open() == -1) { ACE_ERROR_RETURN((LM_ERROR, "%p\n", "acceptor.accept"), -1); user->handle_close(ACE_INVALID_HANDLE, 0); } else { Room::instance()->join(user); } return 0;}int ParticipantAcceptor::handle_close(ACE_HANDLE h, ACE_Reactor_Mask closeMask) { if (acceptor.get_handle() != ACE_INVALID_HANDLE) { ACE_Reactor_Mask m = ACE_Event_Handler::ACCEPT_MASK | ACE_Event_Handler::DONT_CALL; reactor()->remove_handler(this, m); acceptor.close(); } return 0;}chunli@Linux:~/ace/AceChatRoom$
Participant类文件
chunli@Linux:~/ace/AceChatRoom$ cat Participant.h #ifndef PARTICIPANT_H_#define PARTICIPANT_H_#include <ace/Reactor.h>#include <ace/Event_Handler.h>#include <ace/SOCK_Acceptor.h>class Participant: ACE_Event_Handler {public: static ACE_Time_Value maxMsgInterval; Participant(ACE_Reactor* reactor = ACE_Reactor::instance()); int open(); virtual ACE_HANDLE get_handle() const; virtual int handle_input(ACE_HANDLE h = ACE_INVALID_HANDLE); virtual int handle_timeout(const ACE_Time_Value& t, const void* = 0); virtual int handle_close(ACE_HANDLE h, ACE_Reactor_Mask closeMask); ACE_SOCK_Stream& socket();private: ACE_Time_Value lastMsgTime; ACE_SOCK_Stream sock;};#endif /* PARTICIPANT_H_ */chunli@Linux:~/ace/AceChatRoom$ cat Participant.cpp #include <ace/Log_Msg.h>#include <ace/Timer_Queue.h>#include "Participant.h"#include "ChatRoom.h"//ACE_Time_Value Participant::maxMsgInterval = ACE_Time_Value(5);ACE_Time_Value Participant::maxMsgInterval = ACE_Time_Value(20);//20秒没有在聊天室说话的人,就被closeParticipant::Participant(ACE_Reactor* reactor) { this->reactor(reactor);}int Participant::open() { lastMsgTime = reactor()->timer_queue()->gettimeofday(); int result = reactor()->register_handler(this, ACE_Event_Handler::READ_MASK); if (result != 0) return result; result = reactor()->schedule_timer(this, 0, ACE_Time_Value::zero, maxMsgInterval); return result;}ACE_HANDLE Participant::get_handle() const { return sock.get_handle();}int Participant::handle_input(ACE_HANDLE h) { char buf[512] = ""; ssize_t recvBytes = sock.recv(buf, sizeof(buf)); if (recvBytes <= 0) ACE_ERROR_RETURN((LM_ERROR, "%p\n", "sock.recv"), -1); lastMsgTime = reactor()->timer_queue()->gettimeofday(); Room::instance()->forwardMsg(buf); return 0;}int Participant::handle_timeout(const ACE_Time_Value& t, const void*) { if (t - lastMsgTime > maxMsgInterval) reactor()->remove_handler(this, ACE_Event_Handler::READ_MASK); return 0;}int Participant::handle_close(ACE_HANDLE h, ACE_Reactor_Mask closeMask) { if (sock.get_handle() != ACE_INVALID_HANDLE) { ACE_Reactor_Mask m = ACE_Event_Handler::ALL_EVENTS_MASK | ACE_Event_Handler::DONT_CALL; reactor()->cancel_timer(this); reactor()->remove_handler(this, m); sock.close(); Room::instance()->leave(this); delete this; } return 0;}ACE_SOCK_Stream& Participant::socket() { return sock;}chunli@Linux:~/ace/AceChatRoom$
SignalHandler类文件
chunli@Linux:~/ace/AceChatRoom$ cat SignalHandler.h #ifndef SIGNALHANDLER_H_#define SIGNALHANDLER_H_#include <ace/Signal.h>#include <ace/Reactor.h>#include <ace/Event_Handler.h>class SignalHandler: ACE_Event_Handler {public: SignalHandler(ACE_Reactor* reactor = ACE_Reactor::instance()); virtual int handle_signal(int signum, siginfo_t*, ucontext_t *);};#endif /* SIGNALHANDLER_H_ */chunli@Linux:~/ace/AceChatRoom$ cat SignalHandler.cpp #include <ace/Log_Msg.h>#include "SignalHandler.h"#include "ChatRoom.h"SignalHandler::SignalHandler(ACE_Reactor* reactor) { this->reactor(reactor); ACE_Sig_Set signals; signals.fill_set(); this->reactor()->register_handler(signals, this);}int SignalHandler::handle_signal(int signum, siginfo_t*, ucontext_t*) { switch (signum) { case SIGINT: ACE_DEBUG((LM_DEBUG, "signal SIGINT, but not be terminated!\n")); break; case SIGUSR1: ACE_DEBUG((LM_DEBUG, "signal SIGUSR1, broadcast greeting ...\n")); Room::instance()->forwardMsg("hello every one!\n"); break; case SIGUSR2: ACE_DEBUG((LM_DEBUG, "signal SIGUSR2, shutdown chat room ...\n")); this->reactor()->end_reactor_event_loop(); break; } return 0;}chunli@Linux:~/ace/AceChatRoom$
编译运行:
编译运行:chunli@Linux:~/ace/AceChatRoom$ g++ *.cpp -lACE -Wall && ./a.out 3个客户端连接上来:chunli@Linux:~$ nc localhost 886811111111111111111111111111111111111111111111222222222222222222222222333333333333333333333333333333311111111111111111111111111111111111111111111111111111111112222222222222222222222222222222333333333333333333333333333333chunli@Linux:~$ nc localhost 886822222222222222222222222222222222222222222222222233333333333333333333333333333331111111111111111111111111111122222222222222222222222222222222222222222222222222222222222222333333333333333333333333333333chunli@Linux:~$ chunli@Linux:~$ nc localhost 886833333333333333333333333333333333333333333333333333333333333333111111111111111111111111111112222222222222222222222222222222333333333333333333333333333333333333333333333333333333333333chunli@Linux:~$ Ctrl+C杀不死chunli@Linux:~/ace/AceChatRoom$ g++ *.cpp -lACE -Wall && ./a.out ^Csignal SIGINT, but not be terminated!断开SSH终端:看到变成了守护进程chunli@Linux:~$ ps aux | grep a.outchunli 2978 0.0 0.1 22704 4208 ? S 10:54 0:00 ./a.out杀死服务程序:chunli@Linux:~$ ps aux | grep a.outchunli 2952 0.0 0.0 22704 2308 pts/7 S+ 10:50 0:00 ./a.outchunli@Linux:~$ kill -9 2952
本文出自 “李春利” 博客,请务必保留此出处http://990487026.blog.51cto.com/10133282/1889648