SocketServer
在上一篇文章中我们学习了利用socket模块创建socket通信服务,但细心学习后就会发现利用socket模块创建的服务无法进行多进程的处理,当需要进行大量请求处理时,请求就会阻塞在队列中,甚至发生请求丢弃。并且如果我们需要大量的socket时,就需要重复创建许多socket、绑定端口..... ,对于程序员来说意味着重复书写大量无意义代码。
那有没有一种方式既能简化书写流程又能实现多线程开发呢 ? 答案是肯定的,这就是SocketServer模块。
SocketServer简化了网络服务器的编写。在进行socket创建时,使用SocketServer会大大减少创建的步骤,并且SocketServer使用了select它有4个类:TCPServer,UDPServer,UnixStreamServer,UnixDatagramServer。这4个类是同步进行处理的,另外通过ForkingMixIn和ThreadingMixIn类来支持异步。
使用SocketServer的步骤简介
1. 创建服务器的步骤。首先,你必须创建一个请求处理类,它是BaseRequestHandler的子类并重载其handle()方法。
2. 实例化一个服务器类,传入服务器的地址和请求处理程序类。
3. 最后,调用handle_request()(一般是调用其他事件循环或者使用select())或serve_forever()。
集成ThreadingMixIn类时需要处理异常关闭。daemon_threads指示服务器是否要等待线程终止,要是线程互相独立,必须要设置为True,默认是False。
无论用什么网络协议,服务器类有相同的外部方法和属性。
该模块在python3中已经更名为socketserver。
举例:
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import SocketServer
class MyServer(SocketServer.BaseRequestHandler):
def handle(self):
//some method....
if __name__='__main__':
server = SocketServer.ThreadingTCPServer(('127.0.0.1,9999'),MyServer)
server.serve_forever()
上面的步骤你可能会看不懂或者不理解为什么这么操作,下面我们进行详细解释。我们进行了如下的操作
一、自定义了一个MyServer类,继承自SocketServer模块中的BaseRequestHandler类。
二、在主函数中,使用SocketServer函数中的ThreadingTCPServer类进行了实例化操作。上边例子中实例化了对象为server,并在进行实例化时进行了参数的传递,参数一:服务器IP与端口号 参数二:自定义函数名称
源码分析
第二步中主函数操作。查看SocketServer模块源码[下面]可以发现。ThreadingTCPServer是继承自基类(ThreadingMixIn,TCPServer),但函数结构体是pass,也就是左右操作全部通过基类中方法进行执行,而基类中的TCPServer又有基类BaseServer。结构图如下
BaseServer
↑
TCPServer ThreadingMixIn
↑ ↑
ThreadingTCPServer
可以看出:ThreadingTCPServer的所有的方法都在它的基类函数中
各个基类作用分别是:
BaseServer:利用select创建多进程
TCPServer:创建每个进程的socket
ThreadingMixIn:Mix-in class to handle each request in a new thread.
第一步中的创建自定义类。继承自BaseRequestHandler类。从源码中看出他的作用就是接受请求,地址,和自定义名称,然后交给它的方法处理。默认的三个处理函数为pass,所以当我们使用时需要进行函数代码重构。
class BaseRequestHandler:
"""Base class for request handler classes.
This class is instantiated for each request to be handled. The
constructor sets the instance variables request, client_address
and server, and then calls the handle() method. To implement a
specific service, all you need to do is to derive a class which
defines a handle() method.
The handle() method can find the request as self.request, the
client address as self.client_address, and the server (in case it
needs access to per-server information) as self.server. Since a
separate instance is created for each request, the handle() method
can define arbitrary other instance variariables.
"""
def __init__(self, request, client_address, server):
self.request = request
self.client_address = client_address
self.server = server
self.setup()
try:
self.handle()
finally:
self.finish()
def setup(self):
pass
def handle(self):
pass
def finish(self):
pass
1 """Generic socket server classes.
2 This module tries to capture the various aspects of defining a server:
3 For socket-based servers:
4 - address family:
5 - AF_INET{,6}: IP (Internet Protocol) sockets (default)
6 - AF_UNIX: Unix domain sockets
7 - others, e.g. AF_DECNET are conceivable (see <socket.h>
8 - socket type:
9 - SOCK_STREAM (reliable stream, e.g. TCP)
10 - SOCK_DGRAM (datagrams, e.g. UDP)
11
12 For request-based servers (including socket-based):
13
14 - client address verification before further looking at the request
15 (This is actually a hook for any processing that needs to look
16 at the request before anything else, e.g. logging)
17 - how to handle multiple requests:
18 - synchronous (one request is handled at a time)
19 - forking (each request is handled by a new process)
20 - threading (each request is handled by a new thread)
21
22 The classes in this module favor the server type that is simplest to
23 write: a synchronous TCP/IP server. This is bad class design, but
24 save some typing. (There's also the issue that a deep class hierarchy
25 slows down method lookups.)
26
27 There are five classes in an inheritance diagram, four of which represent
28 synchronous servers of four types:
29
30 +------------+
31 | BaseServer |
32 +------------+
33 |
34 v
35 +-----------+ +------------------+
36 | TCPServer |------->| UnixStreamServer |
37 +-----------+ +------------------+
38 |
39 v
40 +-----------+ +--------------------+
41 | UDPServer |------->| UnixDatagramServer |
42 +-----------+ +--------------------+
43
44 Note that UnixDatagramServer derives from UDPServer, not from
45 UnixStreamServer -- the only difference between an IP and a Unix
46 stream server is the address family, which is simply repeated in both
47 unix server classes.
48
49 Forking and threading versions of each type of server can be created
50 using the ForkingMixIn and ThreadingMixIn mix-in classes. For
51 instance, a threading UDP server class is created as follows:
52
53 class ThreadingUDPServer(ThreadingMixIn, UDPServer): pass
54
55 The Mix-in class must come first, since it overrides a method defined
56 in UDPServer! Setting the various member variables also changes
57 the behavior of the underlying server mechanism.
58
59 To implement a service, you must derive a class from
60 BaseRequestHandler and redefine its handle() method. You can then run
61 various versions of the service by combining one of the server classes
62 with your request handler class.
63
64 The request handler class must be different for datagram or stream
65 services. This can be hidden by using the request handler
66 subclasses StreamRequestHandler or DatagramRequestHandler.
67
68 Of course, you still have to use your head!
69
70 For instance, it makes no sense to use a forking server if the service
71 contains state in memory that can be modified by requests (since the
72 modifications in the child process would never reach the initial state
73 kept in the parent process and passed to each child). In this case,
74 you can use a threading server, but you will probably have to use
75 locks to avoid two requests that come in nearly simultaneous to apply
76 conflicting changes to the server state.
77
78 On the other hand, if you are building e.g. an HTTP server, where all
79 data is stored externally (e.g. in the file system), a synchronous
80 class will essentially render the service "deaf" while one request is
81 being handled -- which may be for a very long time if a client is slow
82 to read all the data it has requested. Here a threading or forking
83 server is appropriate.
84
85 In some cases, it may be appropriate to process part of a request
86 synchronously, but to finish processing in a forked child depending on
87 the request data. This can be implemented by using a synchronous
88 server and doing an explicit fork in the request handler class
89 handle() method.
90
91 Another approach to handling multiple simultaneous requests in an
92 environment that supports neither threads nor fork (or where these are
93 too expensive or inappropriate for the service) is to maintain an
94 explicit table of partially finished requests and to use select() to
95 decide which request to work on next (or whether to handle a new
96 incoming request). This is particularly important for stream services
97 where each client can potentially be connected for a long time (if
98 threads or subprocesses cannot be used).
99
100 Future work:
101 - Standard classes for Sun RPC (which uses either UDP or TCP)
102 - Standard mix-in classes to implement various authentication
103 and encryption schemes
104 - Standard framework for select-based multiplexing
105
106 XXX Open problems:
107 - What to do with out-of-band data?
108
109 BaseServer:
110 - split generic "request" functionality out into BaseServer class.
111 Copyright (C) 2000 Luke Kenneth Casson Leighton <lkcl@samba.org>
112
113 example: read entries from a SQL database (requires overriding
114 get_request() to return a table entry from the database).
115 entry is processed by a RequestHandlerClass.
116
117 """
118
119 # Author of the BaseServer patch: Luke Kenneth Casson Leighton
120
121 # XXX Warning!
122 # There is a test suite for this module, but it cannot be run by the
123 # standard regression test.
124 # To run it manually, run Lib/test/test_socketserver.py.
125
126 __version__ = "0.4"
127
128
129 import socket
130 import select
131 import sys
132 import os
133 import errno
134 try:
135 import threading
136 except ImportError:
137 import dummy_threading as threading
138
139 __all__ = ["TCPServer","UDPServer","ForkingUDPServer","ForkingTCPServer",
140 "ThreadingUDPServer","ThreadingTCPServer","BaseRequestHandler",
141 "StreamRequestHandler","DatagramRequestHandler",
142 "ThreadingMixIn", "ForkingMixIn"]
143 if hasattr(socket, "AF_UNIX"):
144 __all__.extend(["UnixStreamServer","UnixDatagramServer",
145 "ThreadingUnixStreamServer",
146 "ThreadingUnixDatagramServer"])
147
148 def _eintr_retry(func, *args):
149 """restart a system call interrupted by EINTR"""
150 while True:
151 try:
152 return func(*args)
153 except (OSError, select.error) as e:
154 if e.args[0] != errno.EINTR:
155 raise
156
157 class BaseServer:
158
159 """Base class for server classes.
160
161 Methods for the caller:
162
163 - __init__(server_address, RequestHandlerClass)
164 - serve_forever(poll_interval=0.5)
165 - shutdown()
166 - handle_request() # if you do not use serve_forever()
167 - fileno() -> int # for select()
168
169 Methods that may be overridden:
170
171 - server_bind()
172 - server_activate()
173 - get_request() -> request, client_address
174 - handle_timeout()
175 - verify_request(request, client_address)
176 - server_close()
177 - process_request(request, client_address)
178 - shutdown_request(request)
179 - close_request(request)
180 - handle_error()
181
182 Methods for derived classes:
183
184 - finish_request(request, client_address)
185
186 Class variables that may be overridden by derived classes or
187 instances:
188
189 - timeout
190 - address_family
191 - socket_type
192 - allow_reuse_address
193
194 Instance variables:
195
196 - RequestHandlerClass
197 - socket
198
199 """
200
201 timeout = None
202
203 def __init__(self, server_address, RequestHandlerClass): 实例参数最终传递到这里(ip与端口,自定义类)
204 """Constructor. May be extended, do not override."""
205 self.server_address = server_address
206 self.RequestHandlerClass = RequestHandlerClass
207 self.__is_shut_down = threading.Event()
208 self.__shutdown_request = False
209
210 def server_activate(self):
211 """Called by constructor to activate the server.
212
213 May be overridden.
214
215 """
216 pass
217
218 def serve_forever(self, poll_interval=0.5):
219 """Handle one request at a time until shutdown.
220
221 Polls for shutdown every poll_interval seconds. Ignores
222 self.timeout. If you need to do periodic tasks, do them in
223 another thread.
224 """
225 self.__is_shut_down.clear()
226 try:
227 while not self.__shutdown_request:
228 # XXX: Consider using another file descriptor or
229 # connecting to the socket to wake this up instead of
230 # polling. Polling reduces our responsiveness to a
231 # shutdown request and wastes cpu at all other times.
232 r, w, e = _eintr_retry(select.select, [self], [], [],
233 poll_interval)
234 if self in r:
235 self._handle_request_noblock()
236 finally:
237 self.__shutdown_request = False
238 self.__is_shut_down.set()
239
240 def shutdown(self):
241 """Stops the serve_forever loop.
242
243 Blocks until the loop has finished. This must be called while
244 serve_forever() is running in another thread, or it will
245 deadlock.
246 """
247 self.__shutdown_request = True
248 self.__is_shut_down.wait()
249
250 # The distinction between handling, getting, processing and
251 # finishing a request is fairly arbitrary. Remember:
252 #
253 # - handle_request() is the top-level call. It calls
254 # select, get_request(), verify_request() and process_request()
255 # - get_request() is different for stream or datagram sockets
256 # - process_request() is the place that may fork a new process
257 # or create a new thread to finish the request
258 # - finish_request() instantiates the request handler class;
259 # this constructor will handle the request all by itself
260
261 def handle_request(self):
262 """Handle one request, possibly blocking.
263
264 Respects self.timeout.
265 """
266 # Support people who used socket.settimeout() to escape
267 # handle_request before self.timeout was available.
268 timeout = self.socket.gettimeout()
269 if timeout is None:
270 timeout = self.timeout
271 elif self.timeout is not None:
272 timeout = min(timeout, self.timeout)
273 fd_sets = _eintr_retry(select.select, [self], [], [], timeout) #调用select模块实现了多线程
274 if not fd_sets[0]:
275 self.handle_timeout()
276 return
277 self._handle_request_noblock()
278
279 def _handle_request_noblock(self):
280 """Handle one request, without blocking.
281
282 I assume that select.select has returned that the socket is
283 readable before this function was called, so there should be
284 no risk of blocking in get_request().
285 """
286 try:
287 request, client_address = self.get_request()
288 except socket.error:
289 return
290 if self.verify_request(request, client_address):
291 try:
292 self.process_request(request, client_address)
293 except:
294 self.handle_error(request, client_address)
295 self.shutdown_request(request)
296
297 def handle_timeout(self):
298 """Called if no new request arrives within self.timeout.
299
300 Overridden by ForkingMixIn.
301 """
302 pass
303
304 def verify_request(self, request, client_address):
305 """Verify the request. May be overridden.
306
307 Return True if we should proceed with this request.
308
309 """
310 return True
311
312 def process_request(self, request, client_address):
313 """Call finish_request.
314
315 Overridden by ForkingMixIn and ThreadingMixIn.
316
317 """
318 self.finish_request(request, client_address)
319 self.shutdown_request(request)
320
321 def server_close(self):
322 """Called to clean-up the server.
323
324 May be overridden.
325
326 """
327 pass
328
329 def finish_request(self, request, client_address):
330 """Finish one request by instantiating RequestHandlerClass."""
331 self.RequestHandlerClass(request, client_address, self)
332
333 def shutdown_request(self, request):
334 """Called to shutdown and close an individual request."""
335 self.close_request(request)
336
337 def close_request(self, request):
338 """Called to clean up an individual request."""
339 pass
340
341 def handle_error(self, request, client_address):
342 """Handle an error gracefully. May be overridden.
343
344 The default is to print a traceback and continue.
345
346 """
347 print '-'*40
348 print 'Exception happened during processing of request from',
349 print client_address
350 import traceback
351 traceback.print_exc() # XXX But this goes to stderr!
352 print '-'*40
353
354
355 class TCPServer(BaseServer): ThreadingTCPServer基类之一,它又有自己的基类BaseServer """Base class for various socket-based server classes.
356
357 Defaults to synchronous IP stream (i.e., TCP).
358
359 Methods for the caller:
360
361 - __init__(server_address, RequestHandlerClass, bind_and_activate=True)
362 - serve_forever(poll_interval=0.5)
363 - shutdown()
364 - handle_request() # if you don't use serve_forever()
365 - fileno() -> int # for select()
366
367 Methods that may be overridden:
368
369 - server_bind()
370 - server_activate()
371 - get_request() -> request, client_address
372 - handle_timeout()
373 - verify_request(request, client_address)
374 - process_request(request, client_address)
375 - shutdown_request(request)
376 - close_request(request)
377 - handle_error()
378
379 Methods for derived classes:
380
381 - finish_request(request, client_address)
382
383 Class variables that may be overridden by derived classes or
384 instances:
385
386 - timeout
387 - address_family
388 - socket_type
389 - request_queue_size (only for stream sockets)
390 - allow_reuse_address
391
392 Instance variables:
393
394 - server_address
395 - RequestHandlerClass
396 - socket
397
398 """
399
400 address_family = socket.AF_INET
401
402 socket_type = socket.SOCK_STREAM
403
404 request_queue_size = 5
405
406 allow_reuse_address = False
407
408 def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True):
409 """Constructor. May be extended, do not override."""
410 BaseServer.__init__(self, server_address, RequestHandlerClass) 调用它自己基类函数BaseServer的初始化函数进行多线程的启动
411 self.socket = socket.socket(self.address_family,self.socket_type) 创建启动的独立线程socket
412 if bind_and_activate:
413 try:
414 self.server_bind()
415 self.server_activate()
416 except:
417 self.server_close()
418 raise
419
420 def server_bind(self):
421 """Called by constructor to bind the socket.
422
423 May be overridden.
424
425 """
426 if self.allow_reuse_address:
427 self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
428 self.socket.bind(self.server_address) 绑定地址
429 self.server_address = self.socket.getsockname()
430 def server_activate(self):
431 """
432 Called by constructor to activate the server. May be overridden.可以被重构
433 """
434 self.socket.listen(self.request_queue_size) 端口检听
435
436 def server_close(self):
437 """Called to clean-up the server.
438 May be overridden.
439 """
440 self.socket.close() 关闭socket
441
442 def fileno(self):
443 """Return socket file number.
444
445 Interface required by select().
446
447 """
448 return self.socket.fileno()
449
450 def get_request(self):
451 """Get the request and client address from the socket.
452 May be overridden.
453 """
454 return self.socket.accept()
455
456 def shutdown_request(self, request):
457 """Called to shutdown and close an individual request."""
458 try:
459 #explicitly shutdown. socket.close() merely releases
460 #the socket and waits for GC to perform the actual close.
461 request.shutdown(socket.SHUT_WR)
462 except socket.error:
463 pass #some platforms may raise ENOTCONN here
464 self.close_request(request)
465
466 def close_request(self, request):
467 """Called to clean up an individual request."""
468 request.close()
469
470
471 class UDPServer(TCPServer):
472
473 """UDP server class."""
474
475 allow_reuse_address = False
476
477 socket_type = socket.SOCK_DGRAM
478
479 max_packet_size = 8192
480
481 def get_request(self):
482 data, client_addr = self.socket.recvfrom(self.max_packet_size)
483 return (data, self.socket), client_addr
484
485 def server_activate(self):
486 # No need to call listen() for UDP.
487 pass
488
489 def shutdown_request(self, request):
490 # No need to shutdown anything.
491 self.close_request(request)
492
493 def close_request(self, request):
494 # No need to close anything.
495 pass
496
497 class ForkingMixIn:
498
499 """Mix-in class to handle each request in a new process."""
500
501 timeout = 300
502 active_children = None
503 max_children = 40
504
505 def collect_children(self):
506 """Internal routine to wait for children that have exited."""
507 if self.active_children is None:
508 return
509
510 # If we're above the max number of children, wait and reap them until
511 # we go back below threshold. Note that we use waitpid(-1) below to be
512 # able to collect children in size(<defunct children>) syscalls instead
513 # of size(<children>): the downside is that this might reap children
514 # which we didn't spawn, which is why we only resort to this when we're
515 # above max_children.
516 while len(self.active_children) >= self.max_children:
517 try:
518 pid, _ = os.waitpid(-1, 0)
519 self.active_children.discard(pid)
520 except OSError as e:
521 if e.errno == errno.ECHILD:
522 # we don't have any children, we're done
523 self.active_children.clear()
524 elif e.errno != errno.EINTR:
525 break
526
527 # Now reap all defunct children.
528 for pid in self.active_children.copy():
529 try:
530 pid, _ = os.waitpid(pid, os.WNOHANG)
531 # if the child hasn't exited yet, pid will be 0 and ignored by
532 # discard() below
533 self.active_children.discard(pid)
534 except OSError as e:
535 if e.errno == errno.ECHILD:
536 # someone else reaped it
537 self.active_children.discard(pid)
538
539 def handle_timeout(self):
540 """Wait for zombies after self.timeout seconds of inactivity.
541
542 May be extended, do not override.
543 """
544 self.collect_children()
545
546 def process_request(self, request, client_address):
547 """Fork a new subprocess to process the request."""
548 self.collect_children()
549 pid = os.fork()
550 if pid:
551 # Parent process
552 if self.active_children is None:
553 self.active_children = set()
554 self.active_children.add(pid)
555 self.close_request(request) #close handle in parent process
556 return
557 else:
558 # Child process.
559 # This must never return, hence os._exit()!
560 try:
561 self.finish_request(request, client_address)
562 self.shutdown_request(request)
563 os._exit(0)
564 except:
565 try:
566 self.handle_error(request, client_address)
567 self.shutdown_request(request)
568 finally:
569 os._exit(1)
570
571
572 class ThreadingMixIn: 基类2
573 """Mix-in class to handle each request in a new thread."""
574
575 # Decides how threads will act upon termination of the
576 # main process
577 daemon_threads = False
578
579 def process_request_thread(self, request, client_address):
580 """Same as in BaseServer but as a thread.
581
582 In addition, exception handling is done here.
583
584 """
585 try:
586 self.finish_request(request, client_address)
587 self.shutdown_request(request)
588 except:
589 self.handle_error(request, client_address)
590 self.shutdown_request(request)
591
592 def process_request(self, request, client_address):
593 """Start a new thread to process the request."""
594 t = threading.Thread(target = self.process_request_thread,
595 args = (request, client_address))
596 t.daemon = self.daemon_threads
597 t.start()
598
599
600 class ForkingUDPServer(ForkingMixIn, UDPServer): pass
601 class ForkingTCPServer(ForkingMixIn, TCPServer): pass
602
603 class ThreadingUDPServer(ThreadingMixIn, UDPServer): pass
604 class ThreadingTCPServer(ThreadingMixIn, TCPServer): pass
605
606 if hasattr(socket, 'AF_UNIX'):
607
608 class UnixStreamServer(TCPServer):
609 address_family = socket.AF_UNIX
610
611 class UnixDatagramServer(UDPServer):
612 address_family = socket.AF_UNIX
613
614 class ThreadingUnixStreamServer(ThreadingMixIn, UnixStreamServer): pass
615
616 class ThreadingUnixDatagramServer(ThreadingMixIn, UnixDatagramServer): pass
617
618 class BaseRequestHandler:
619 自定义MyServer自继承这里,主要用于处理来自每个线程的请求,函数拥有三个方法,但是每个方法的结构体都没pass,所以我们在继承以后需要对方法进行重构。
620 """Base class for request handler classes.
621
622 This class is instantiated for each request to be handled. The
623 constructor sets the instance variables request, client_address
624 and server, and then calls the handle() method. To implement a
625 specific service, all you need to do is to derive a class which
626 defines a handle() method.
627
628 The handle() method can find the request as self.request, the
629 client address as self.client_address, and the server (in case it
630 needs access to per-server information) as self.server. Since a
631 separate instance is created for each request, the handle() method
632 can define arbitrary other instance variariables.
633 """
634
635 def __init__(self, request, client_address, server): 接收请求request,客户端地址client_address,自定义server
636 self.request = request 分别赋值
637 self.client_address = client_address
638 self.server = server
639 self.setup() 首先执行setup()函数
640 try:
641 self.handle() 再执行handle()
642 finally:
643 self.finish() 最后执行finish()
644
645 def setup(self): 三个函数结构体都为pass ,需要在继承时进行方法重构
646 pass
647
648 def handle(self):
649 pass
650
651 def finish(self):
652 pass
653
654
655 # The following two classes make it possible to use the same service
656 # class for stream or datagram servers.
657 # Each class sets up these instance variables:
658 # - rfile: a file object from which receives the request is read
659 # - wfile: a file object to which the reply is written
660 # When the handle() method returns, wfile is flushed properly
661
662
663 class StreamRequestHandler(BaseRequestHandler):
664
665 """Define self.rfile and self.wfile for stream sockets."""
666
667 # Default buffer sizes for rfile, wfile.
668 # We default rfile to buffered because otherwise it could be
669 # really slow for large data (a getc() call per byte); we make
670 # wfile unbuffered because (a) often after a write() we want to
671 # read and we need to flush the line; (b) big writes to unbuffered
672 # files are typically optimized by stdio even when big reads
673 # aren't.
674 rbufsize = -1
675 wbufsize = 0
676
677 # A timeout to apply to the request socket, if not None.
678 timeout = None
679
680 # Disable nagle algorithm for this socket, if True.
681 # Use only when wbufsize != 0, to avoid small packets.
682 disable_nagle_algorithm = False
683
684 def setup(self):
685 self.connection = self.request
686 if self.timeout is not None:
687 self.connection.settimeout(self.timeout)
688 if self.disable_nagle_algorithm:
689 self.connection.setsockopt(socket.IPPROTO_TCP,
690 socket.TCP_NODELAY, True)
691 self.rfile = self.connection.makefile('rb', self.rbufsize)
692 self.wfile = self.connection.makefile('wb', self.wbufsize)
693
694 def finish(self):
695 if not self.wfile.closed:
696 try:
697 self.wfile.flush()
698 except socket.error:
699 # An final socket error may have occurred here, such as
700 # the local error ECONNABORTED.
701 pass
702 self.wfile.close()
703 self.rfile.close()
704
705
706 class DatagramRequestHandler(BaseRequestHandler):
707
708 # XXX Regrettably, I cannot get this working on Linux;
709 # s.recvfrom() doesn't return a meaningful client address.
710
711 """Define self.rfile and self.wfile for datagram sockets."""
712
713 def setup(self):
714 try:
715 from cStringIO import StringIO
716 except ImportError:
717 from StringIO import StringIO
718 self.packet, self.socket = self.request
719 self.rfile = StringIO(self.packet)
720 self.wfile = StringIO()
721
722 def finish(self):
723 self.socket.sendto(self.wfile.getvalue(), self.client_address)
SocketServer源码
服务器类型
5种类型:BaseServer,TCPServer,UnixStreamServer,UDPServer,UnixDatagramServer。 注意:BaseServer不直接对外服务。
服务器对象
class SocketServer.BaseServer:这是模块中的所有服务器对象的超类。它定义了接口,如下所述,但是大多数的方法不实现,在子类中进行细化。
BaseServer.fileno():返回服务器监听套接字的整数文件描述符。通常用来传递给select.select(), 以允许一个进程监视多个服务器。
BaseServer.handle_request():处理单个请求。处理顺序:get_request(), verify_request(), process_request()。如果用户提供handle()方法抛出异常,将调用服务器的handle_error()方法。如果self.timeout内没有请求收到, 将调用handle_timeout()并返回handle_request()。
BaseServer.serve_forever(poll_interval=0.5): 处理请求,直到一个明确的shutdown()请求。每poll_interval秒轮询一次shutdown。忽略self.timeout。如果你需要做周期性的任务,建议放置在其他线程。
BaseServer.shutdown():告诉serve_forever()循环停止并等待其停止。python2.6版本。
BaseServer.address_family: 地址家族,比如socket.AF_INET和socket.AF_UNIX。
BaseServer.RequestHandlerClass:用户提供的请求处理类,这个类为每个请求创建实例。
BaseServer.server_address:服务器侦听的地址。格式根据协议家族地址的各不相同,请参阅socket模块的文档。
BaseServer.socketSocket:服务器上侦听传入的请求socket对象的服务器。
服务器类支持下面的类变量:
BaseServer.allow_reuse_address:服务器是否允许地址的重用。默认为false ,并且可在子类中更改。
BaseServer.request_queue_size
请求队列的大小。如果单个请求需要很长的时间来处理,服务器忙时请求被放置到队列中,最多可以放request_queue_size个。一旦队列已满,来自客户端的请求将得到 “Connection denied”错误。默认值通常为5 ,但可以被子类覆盖。
BaseServer.socket_type:服务器使用的套接字类型; socket.SOCK_STREAM和socket.SOCK_DGRAM等。
BaseServer.timeout:超时时间,以秒为单位,或 None表示没有超时。如果handle_request()在timeout内没有收到请求,将调用handle_timeout()。
下面方法可以被子类重载,它们对服务器对象的外部用户没有影响。
BaseServer.finish_request():实际处理RequestHandlerClass发起的请求并调用其handle()方法。 常用。
BaseServer.get_request():接受socket请求,并返回二元组包含要用于与客户端通信的新socket对象,以及客户端的地址。
BaseServer.handle_error(request, client_address):如果RequestHandlerClass的handle()方法抛出异常时调用。默认操作是打印traceback到标准输出,并继续处理其他请求。
BaseServer.handle_timeout():超时处理。默认对于forking服务器是收集退出的子进程状态,threading服务器则什么都不做。
BaseServer.process_request(request, client_address) :调用finish_request()创建RequestHandlerClass的实例。如果需要,此功能可以创建新的进程或线程来处理请求,ForkingMixIn和ThreadingMixIn类做到这点。常用。
BaseServer.server_activate():通过服务器的构造函数来激活服务器。默认的行为只是监听服务器套接字。可重载。
BaseServer.server_bind():通过服务器的构造函数中调用绑定socket到所需的地址。可重载。
BaseServer.verify_request(request, client_address):返回一个布尔值,如果该值为True ,则该请求将被处理,反之请求将被拒绝。此功能可以重写来实现对服务器的访问控制。默认的实现始终返回True。client_address可以限定客户端,比如只处理指定ip区间的请求。 常用。
请求处理器
处理器接收数据并决定如何操作。它负责在socket层之上实现协议(i.e., HTTP, XML-RPC, or AMQP),读取数据,处理并写反应。可以重载的方法如下:
setup(): 准备请求处理. 默认什么都不做,StreamRequestHandler中会创建文件类似的对象以读写socket.
handle(): 处理请求。解析传入的请求,处理数据,并发送响应。默认什么都不做。常用变量:self.request,self.client_address,self.server。
finish(): 环境清理。默认什么都不做,如果setup产生异常,不会执行finish。
通常只需要重载handle。self.request的类型和数据报或流的服务不同。对于流服务,self.request是socket 对象;对于数据报服务,self.request是字符串和socket 。可以在子类StreamRequestHandler或DatagramRequestHandler中重载,重写setup()和finish() ,并提供self.rfile和self.wfile属性。 self.rfile和self.wfile可以读取或写入,以获得请求数据或将数据返回到客户端。