ACE_linux:Reactor与Proactor两种模式的区别

时间:2022-09-09 14:38:02

一、概念:

 

Reactor与Proactor两种模式的区别。这里我们只关注read操作,因为write操作也是差不多的。下面是Reactor的做法:

  1. 某个事件处理器宣称它对某个socket上的读事件很感兴趣;

  2. 事件分离者等着这个事件的发生;

  3. 当事件发生了,事件分离器被唤醒,这负责通知先前那个事件处理器;

  4. 事件处理器收到消息,于是去那个socket上读数据了. 如果需要,它再次宣称对这个socket上的读事件感兴趣,一直重复上面的步骤;

下面再来看看真正意义的异步模式Proactor是如何做的:

  1. 事件处理器直接投递发一个读操作(当然,操作系统必须支持这个异步操作)。这个时候,事件处理器根本不关心读事件,它只管发这么个请求,它魂牵梦萦的是这个读操作的完成事件。这个事件处理器很拽,发个命令就不管具体的事情了,只等着别人(系统)帮他搞定的时候给他回个话。

  2. 事件分离器等着这个读事件的完成(比较下与Reactor的不同);

  3. 当事件分离器默默等待完成事情到来的同时,操作系统已经在一边开始干活了,它从目标读取数据,放入用户提供的缓存区中,最后通知事件分离器,这个事情我搞完了;

  4. 事件分离器通知之前的事件处理器: 你吩咐的事情搞定了;

  5. 事件处理器这时会发现想要读的数据已经乖乖地放在他提供的缓存区中,想怎么处理都行了。如果有需要,事件处理器还像之前一样发起另外一个读操作,和上面的几个步骤一样。

 

二、代码示例:

 

ACE_Proactor::run_event_loop();  循环启动
ACE_Proactor::end_event_loop();  循环停止

-----------------------------------

 Reactor:

 

 

-----------------------------------

-----------------------------------

Proactor:

ACE_linux:Reactor与Proactor两种模式的区别ACE_linux:Reactor与Proactor两种模式的区别
  1 #include "ace/Proactor.h"
2 #include "ace/Asynch_Acceptor.h"
3
4 class HA_Proactive_Service : public ACE_Service_Handler
5 {
6 public:
7 HA_Proactive_Service()
8 {
9 ACE_OS::printf("Service_Handler constructed for accepter \n");
10 }
11 ~HA_Proactive_Service ()
12 {
13 if (this->handle () != ACE_INVALID_HANDLE)
14 {
15 ACE_OS::closesocket (this->handle ());
16 }
17 }
18
19 virtual void open (ACE_HANDLE h, ACE_Message_Block&)
20 {
21 //在OPEN函数中完成读写操作
22 this->handle (h);
23 if (this->reader_.open (*this) != 0 ||
24 this->writer_.open (*this) != 0 )
25 {
26 ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"),
27 ACE_TEXT ("HA_Proactive_Service open")));
28
29 return;
30 }
31 ACE_OS::printf("ready!\n");
32
33 //异步发送
34 send_to_remote();
35 //异步读取
36 reveive_from_remote();
37
38 // mb is now controlled by Proactor framework.
39 return;
40 }
41
42 //异步读完成后会调用此函数
43 virtual void handle_read_stream
44 (const ACE_Asynch_Read_Stream::Result &result)
45 {
46 ACE_Message_Block &mb = result.message_block ();
47 if (!result.success () || result.bytes_transferred () == 0)
48 {
49 mb.release ();
50
51 return;
52 }
53 //else
54 //输出读取内容
55 ACE_OS::printf("received:%s\n",mb.rd_ptr());
56 mb.release();
57 //继续下一次异步读取
58 reveive_from_remote();
59
60 return;
61 }
62 //异步写完成后会调用此函数
63 virtual void handle_write_stream(const ACE_Asynch_Write_Stream::Result &result)
64 {
65 result.message_block ().release();
66 ACE_OS::sleep(1);
67 //上次发送完毕之后再接着发送一次,这次发送完成之后又会调用
68 //handle_write_stream,所以会一直发送
69 send_to_remote();
70 return;
71 }
72 //remote
73 void reveive_from_remote(void)
74 {
75 ACE_Message_Block *mb;
76 ACE_NEW_NORETURN (mb, ACE_Message_Block (1024));
77 if (this->reader_.read (*mb, mb->space ()) != 0)
78 {
79 ACE_OS::printf("Begin read fail\n");
80
81 return;
82 }
83 }
84 //把当前时间发送到远端
85 void send_to_remote(void)
86 {
87 std::string book = "S: ";
88 time_t now = ACE_OS::gettimeofday().sec();
89 book = book+ ctime(&now);
90 ACE_Message_Block *mb = new ACE_Message_Block(100);
91 //获取当前时间的字符串格式
92 mb->copy(book.c_str() );
93 //send message to accepter
94 if (this->writer_.write(*mb,mb->length()) !=0)
95 {
96 ACE_OS::printf("Begin write fail\n");
97
98 return;
99 }
100 else
101 {
102 ACE_OS::printf("sended %s\n",mb->rd_ptr());
103 }
104 }
105
106 // Listing 3
107 private:
108 ACE_Asynch_Read_Stream reader_;
109 ACE_Asynch_Write_Stream writer_;
110 };
111
112
113 int main(int argc, char *argv[])
114 {
115 int port=3000;
116 ACE_Asynch_Acceptor<HA_Proactive_Service> acceptor;
117
118 if (acceptor.open (ACE_INET_Addr (port)) == -1)
119 return -1;
120
121 while(true)
122 ACE_Proactor::instance ()->handle_events ();
123
124 return 0;
125 }
Acceptor.cpp

 

ACE_linux:Reactor与Proactor两种模式的区别ACE_linux:Reactor与Proactor两种模式的区别
  1 #include "ace/Proactor.h"
2 #include "ace/Asynch_Connector.h"
3
4 class HA_Proactive_Service : public ACE_Service_Handler
5 {
6 public:
7 HA_Proactive_Service()
8 {
9 ACE_OS::printf("Service_Handler constructed for connector \n");
10 }
11 ~HA_Proactive_Service ()
12 {
13 if (this->handle () != ACE_INVALID_HANDLE)
14 {
15 ACE_OS::closesocket (this->handle ());
16 }
17 }
18
19 virtual void open (ACE_HANDLE h, ACE_Message_Block&)
20 {
21 //在OPEN函数中完成读写操作
22 this->handle (h);
23
24 if (this->reader_.open (*this) != 0 ||
25 this->writer_.open (*this) != 0 )
26 {
27 ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"),
28 ACE_TEXT ("HA_Proactive_Service open")));
29
30 return;
31 }
32
33 ACE_OS::printf("connceted!\n");
34 //异步发送
35 send_to_remote();
36 //异步读取
37 reveive_from_remote();
38
39
40 return;
41 }
42
43 //异步读完成后会调用此函数
44 virtual void handle_read_stream
45 (const ACE_Asynch_Read_Stream::Result &result)
46 {
47 ACE_Message_Block &mb = result.message_block ();
48 if (!result.success () || result.bytes_transferred () == 0)
49 {
50 mb.release ();
51
52 return;
53 }
54 //else
55 //输出读取内容
56 ACE_OS::printf("received:%s\n",mb.rd_ptr());
57 mb.release();
58 //继续下一次异步读取
59 reveive_from_remote();
60
61 return;
62 }
63 //异步写完成后会调用此函数
64 virtual void handle_write_stream(const ACE_Asynch_Write_Stream::Result &result)
65 {
66 result.message_block ().release();
67 ACE_OS::sleep(1);
68 //上次发送完毕之后再接着发送一次,这次发送完成之后又会调用
69 //handle_write_stream,所以会一直发送
70 send_to_remote();
71 return;
72 }
73 //remote
74 void reveive_from_remote(void)
75 {
76 ACE_Message_Block *mb;
77 ACE_NEW_NORETURN (mb, ACE_Message_Block (1024));
78 if (this->reader_.read (*mb, mb->space ()) != 0)
79 {
80 ACE_OS::printf("Begin read fail\n");
81
82 return;
83 }
84 }
85 //把当前时间发送到远端
86 void send_to_remote(void)
87 {
88 std::string book = "C: ";
89 time_t now = ACE_OS::gettimeofday().sec();
90 book = book+ ctime(&now);
91 ACE_Message_Block *mb = new ACE_Message_Block(100);
92 //获取当前时间的字符串格式
93 mb->copy(book.c_str() );
94 //send message to accepter
95 if (this->writer_.write(*mb,mb->length()) !=0)
96 {
97 ACE_OS::printf("Begin write fail\n");
98
99 return;
100 }
101 else
102 {
103 ACE_OS::printf("sended %s\n",mb->rd_ptr());
104 }
105 }
106
107 // Listing 3
108 private:
109 ACE_Asynch_Read_Stream reader_;
110 ACE_Asynch_Write_Stream writer_;
111 };
112
113 int main(int argc, char *argv[])
114 {
115
116 ACE_INET_Addr addr(3000,"127.0.0.1");
117
118 ACE_Asynch_Connector<HA_Proactive_Service> connector;
119
120 connector.open();
121 if (connector.connect(addr) == -1)
122 return -1;
123
124 while(true)
125 ACE_Proactor::instance ()->handle_events ();
126
127 return 0;
128 }
Connector.cpp

ACE_Proactor::run_event_loop(); <==>
while(true)
    ACE_Proactor::instance ()->handle_events ();

 //增加线程池

ACE_linux:Reactor与Proactor两种模式的区别ACE_linux:Reactor与Proactor两种模式的区别
  1 #include "ace/Proactor.h"
2 #include "ace/Asynch_Acceptor.h"
3 #include "ace/Task_T.h"
4 #include "ace/Thread_Semaphore.h"
5
6 class Receive : public ACE_Service_Handler
7 {
8 public:
9 Receive()
10 {
11 ACE_OS::printf("Service_Handler constructed for accepter \n");
12 }
13 ~Receive ()
14 {
15 if (this->handle () != ACE_INVALID_HANDLE)
16 {
17 ACE_OS::closesocket (this->handle ());
18 }
19 }
20
21 virtual void open (ACE_HANDLE h, ACE_Message_Block&)
22 {
23 //在OPEN函数中完成读写操作
24 this->handle (h);
25 if (this->reader_.open (*this) != 0 ||
26 this->writer_.open (*this) != 0 )
27 {
28 ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"),
29 ACE_TEXT ("Receive open")));
30
31 return;
32 }
33 ACE_OS::printf("ready!\n");
34
35 //异步发送
36 //send_to_remote();
37 //异步读取
38 reveive_from_remote();
39
40 // mb is now controlled by Proactor framework.
41 return;
42 }
43
44 //异步读完成后会调用此函数
45 virtual void handle_read_stream
46 (const ACE_Asynch_Read_Stream::Result &result)
47 {
48 ACE_Message_Block &mb = result.message_block ();
49 if (!result.success () || result.bytes_transferred () == 0)
50 {
51 mb.release ();
52
53 return;
54 }
55
56 //输出读取内容
57 //ACE_OS::printf("received:%s\n",mb.rd_ptr());
58 ACE_DEBUG((LM_DEBUG,"(%t)received:%s\n",mb.rd_ptr()));
59 mb.release();
60
61 //继续下一次异步读取
62 reveive_from_remote();
63
64 return;
65 }
66 //异步写完成后会调用此函数
67 virtual void handle_write_stream(const ACE_Asynch_Write_Stream::Result &result)
68 {
69 result.message_block ().release();
70 ACE_OS::sleep(1);
71 //上次发送完毕之后再接着发送一次,这次发送完成之后又会调用
72 //handle_write_stream,所以会一直发送
73 send_to_remote();
74 return;
75 }
76 //remote
77 void reveive_from_remote(void)
78 {
79 ACE_Message_Block *mb;
80 ACE_NEW_NORETURN (mb, ACE_Message_Block (1024));
81 if (this->reader_.read (*mb, mb->space ()) != 0)
82 {
83 ACE_OS::printf("Begin read fail\n");
84
85 return;
86 }
87 }
88 //把当前时间发送到远端
89 void send_to_remote(void)
90 {
91 std::string book = "S: ";
92 time_t now = ACE_OS::gettimeofday().sec();
93 book = book+ ctime(&now);
94 ACE_Message_Block *mb = new ACE_Message_Block(100);
95 //获取当前时间的字符串格式
96 mb->copy(book.c_str() );
97 //send message to accepter
98 if (this->writer_.write(*mb,mb->length()) !=0)
99 {
100 ACE_OS::printf("Begin write fail\n");
101
102 return;
103 }
104 else
105 {
106 ACE_OS::printf("sended %s\n",mb->rd_ptr());
107 }
108 }
109
110 // Listing 3
111 private:
112 ACE_Asynch_Read_Stream reader_;
113 ACE_Asynch_Write_Stream writer_;
114 };
115
116 class Accepte : public ACE_Asynch_Acceptor<Receive>
117 {
118 public:
119 virtual Receive* make_handler(void)
120 {
121 return new Receive();
122 }
123
124 };
125
126 class Proactor_Task : public ACE_Task<ACE_MT_SYNCH>
127 {
128 public:
129
130 int star(int nMax)
131 {
132 create_proactor();
133 this->activate (THR_NEW_LWP, nMax);
134 for (;nMax>0;nMax--)
135 {
136 sem_.acquire();
137 }
138 return 0;
139 }
140 int stop()
141 {
142 ACE_Proactor::end_event_loop();
143 this->wait();
144 return 0;
145 }
146
147 virtual int svc (void)
148 {
149 ACE_DEBUG((LM_INFO,ACE_TEXT("svc method is invoked!\n")));
150 sem_.release(1);
151
152 ACE_Proactor::run_event_loop();
153
154 return 0;
155 }
156
157
158 int create_proactor()
159 {
160 ACE_Proactor::instance ();
161
162 return 0;
163 }
164
165 int release_proactor()
166 {
167 ACE_Proactor::close_singleton ();
168 return 0;
169 }
170
171 ACE_Thread_Semaphore sem_;
172 };
173
174 int ACE_TMAIN(int ,char*[])
175 {
176 Proactor_Task task;
177 task.star(3);
178
179 Accepte accepte;
180 accepte.open(ACE_INET_Addr (2222), 0, 1,ACE_DEFAULT_BACKLOG,1,ACE_Proactor::instance());
181
182 //主函数退出控制
183 {
184 int nExit=0;
185 while (nExit==0)
186 scanf("%d",&nExit);
187 }
188
189 return 0;
190 }
191
192 /*
193 int main(int argc, char *argv[])
194 {
195 int port=3000;
196 ACE_Asynch_Acceptor<Receive> acceptor;
197
198 if (acceptor.open (ACE_INET_Addr (port)) == -1)
199 return -1;
200
201 ACE_Proactor::run_event_loop();
202
203 return 0;
204 }
205 */
Proactor_Task.cpp

 

-----------------------------------

 

GOOD LUCK!