一、socketserver
模块介绍
socketserver是标准库中的一个高级模块,用于网络客户端与服务器的实现。(version = "0.4")
在python2中写作SocketServer,在python3中写作socketserver。
socoketserver两个主要的类,一个是Server类,用于处理连接相关的网络操作,另外一个则是RequestHandler类,用于处理数据相关的操作,解决通信问题。并且提供两个MixIn 类,用于扩展 Server,实现多进程或多线程。
最基类的是服务器类BaseServer类,其中定义了相关的方法,不能直接使用这个类,只能用来继承,在子类中有俩,是作为同步服务器类使用,TCPServer和UDPServer,这两个类主要是和socket编程的时候是相同的,也就是会阻塞连接。TCPServer有一个子类为UNIXStreamServer,在UDPServer有一个子类为UnixDatagramServer,这两个是用在UNIX系统中的。
两个混合类,一个是ForkingMixin,主要是用fork的(在linux系统中使用产生进程,在Windows系统中无法使用fork产生进程),产生一个新的进程去处理;一个是ThreadingMixin,产生一个新的线程,主要是用来提供异步处理(简单理解为同时处理多个事情)的能力,其余tcpserver和udpserver组合,又产生了新的四个类,从而提供异步处理的能力。
socketserver框架
在python中,socketserver是一个已集成的模块,它有以下特点:
可用SocketServer框架创建TCP和UDP服务器。
在后台为你做好每一个基础步骤。
socketserver模块的用法
必须是BaseRequestHandler的子类
重写handle()函数
调用serve_forever处理客户端程序。
-
对TCP 服务端来说
self.request是客户端Socket对象
self.client_address是客户端的详细地址
socketserver模块中的各个类
服务器类
5种类型:BaseServer,TCPServer,UnixStreamServer,UDPServer,UnixDatagramServer。 注意:BaseServer不直接对外服务。
请求类
3种类型:BaseRequestHandler,StreamRequestHandler,DatagramRequestHandler
minln类
ForkingMinIn ,ThreadingMinIn
混合类
ThreadingUDPServer, ThreadingTCPServer,ForkingUDPServer,ForkingTCPServer,
ThreadingUnixStreamServer,ThreadingUnixDatagramServer
socketserver模块各个类继承关系图
服务器对象
class SocketServer.BaseServer:这是模块中的所有服务器对象的超类。它定义了接口,如下所述,但是大多数的方法不实现,在子类中进行细化。
BaseServer.fileno():返回服务器监听套接字的整数文件描述符。通常用来传递给select.select(), 以允许一个进程监视多个服务器。
BaseServer.handle_request():处理单个请求。处理顺序:get_request(), verify_request(), process_request()。如果用户提供handle()方法抛出异常,将调用服务器的handle_error()方法。如果self.timeout内没有请求收到, 将调用handle_timeout()并返回handle_request()。
BaseServer.serve_forever(poll_interval=0.5): 处理请求,直到一个明确的shutdown()请求。每poll_interval秒轮询一次shutdown。忽略self.timeout。如果你需要做周期性的任务,建议放置在其他线程。
BaseServer.shutdown():告诉serve_forever()循环停止并等待其停止。python2.6版本。
BaseServer.address_family: 地址家族,比如socket.AF_INET和socket.AF_UNIX。
BaseServer.RequestHandlerClass:用户提供的请求处理类,这个类为每个请求创建实例。
BaseServer.server_address:服务器侦听的地址。格式根据协议家族地址的各不相同,请参阅socket模块的文档。
BaseServer.socketSocket:服务器上侦听传入的请求socket对象的服务器。
服务器类支持下面的类变量:
BaseServer.allow_reuse_address:服务器是否允许地址的重用。默认为false ,并且可在子类中更改。
BaseServer.request_queue_size
请求队列的大小。如果单个请求需要很长的时间来处理,服务器忙时请求被放置到队列中,最多可以放request_queue_size个。一旦队列已满,来自客户端的请求将得到 “Connection denied”错误。默认值通常为5 ,但可以被子类覆盖。
BaseServer.socket_type:服务器使用的套接字类型; socket.SOCK_STREAM和socket.SOCK_DGRAM等。
BaseServer.timeout:超时时间,以秒为单位,或 None表示没有超时。如果handle_request()在timeout内没有收到请求,将调用handle_timeout()。
下面方法可以被子类重载,它们对服务器对象的外部用户没有影响。
BaseServer.finish_request():实际处理RequestHandlerClass发起的请求并调用其handle()方法。 常用。
BaseServer.get_request():接受socket请求,并返回二元组包含要用于与客户端通信的新socket对象,以及客户端的地址。
BaseServer.handle_error(request, client_address):如果RequestHandlerClass的handle()方法抛出异常时调用。默认操作是打印traceback到标准输出,并继续处理其他请求。
BaseServer.handle_timeout():超时处理。默认对于forking服务器是收集退出的子进程状态,threading服务器则什么都不做。
BaseServer.process_request(request, client_address) :调用finish_request()创建RequestHandlerClass的实例。如果需要,此功能可以创建新的进程或线程来处理请求,ForkingMixIn和ThreadingMixIn类做到这点。常用。
BaseServer.server_activate():通过服务器的构造函数来激活服务器。默认的行为只是监听服务器套接字。可重载。
BaseServer.server_bind():通过服务器的构造函数中调用绑定socket到所需的地址。可重载。
BaseServer.verify_request(request, client_address):返回一个布尔值,如果该值为True ,则该请求将被处理,反之请求将被拒绝。此功能可以重写来实现对服务器的访问控制。默认的实现始终返回True。client_address可以限定客户端,比如只处理指定ip区间的请求。 常用。
请求处理器
处理器接收数据并决定如何操作。它负责在socket层之上实现协议(i.e., HTTP, XML-RPC, or AMQP),读取数据,处理并写反应。可以重载的方法如下:
setup(): 准备请求处理. 默认什么都不做,StreamRequestHandler中会创建文件类似的对象以读写socket.
handle(): 处理请求。解析传入的请求,处理数据,并发送响应。默认什么都不做。常用变量:self.request,self.client_address,self.server。
finish(): 环境清理。默认什么都不做,如果setup产生异常,不会执行finish。
通常只需要重载handle。self.request的类型和数据报或流的服务不同。对于流服务,self.request是socket 对象;对于数据报服务,self.request是字符串和socket 。可以在子类StreamRequestHandler或DatagramRequestHandler中重载,重写setup()和finish() ,并提供self.rfile和self.wfile属性。 self.rfile和self.wfile可以读取或写入,以获得请求数据或将数据返回到客户端。
socketserver模块源码翻译
"""Generic socket server classes. # 通用socket server 类 This module tries to capture the various aspects of defining a server: # 该模块尽力从各种不同的方面定义server: For socket-based servers: - address family:
- AF_INET{,6}: IP (Internet Protocol) sockets (default)
- AF_UNIX: Unix domain sockets
- others, e.g. AF_DECNET are conceivable (see <socket.h>
- socket type:
- SOCK_STREAM (reliable stream, e.g. TCP)
- SOCK_DGRAM (datagrams, e.g. UDP) # 对于socket-based servers:(对于socketserver类中的服务类的基类有下面内容)
# -- address family:(地址家族)
# - AF_INET{,6}: IP socket (default)
# - AF_UNIX: Unix domain sockets
# - others, 如 AF_DECNET (见<socket.h>) (不常用)
# -- socket type:(套接字类型)
# - SOCK_STREAM (可靠连接 TCP)
# - SOCK_DGRAM (UDP) For request-based servers (including socket-based): - client address verification before further looking at the request
(This is actually a hook for any processing that needs to look
at the request before anything else, e.g. logging)
- how to handle multiple requests:
- synchronous (one request is handled at a time)
- forking (each request is handled by a new process)
- threading (each request is handled by a new thread) # 对于request-based servers:(对于通信请求服务类的基类)
# -- client address在发出进一步的请求之前需要认证(这实际上把所有需要发出请求的进程在通过认证之前给阻塞住了) # -- 如何处理多请求:(多进程多线程)
# - 同步 (一次只能处理一个请求)
# - forking (fork一个新的进程来处理一个请求)
# - threading (创建一个新的线程来处理一个请求) The classes in this module favor the server type that is simplest to
write: a synchronous TCP/IP server. This is bad class design, but
save some typing. (There's also the issue that a deep class hierarchy
slows down method lookups.) # 在这个模块的各种类中,最简单的服务器类型就是synchronous TCP/IP server。
# 这是一个糟糕的类设计,但是也保存了一些设计的类型理念。 There are five classes in an inheritance diagram, four of which represent
synchronous servers of four types: +------------+
| BaseServer |
+------------+
|
v
+-----------+ +------------------+
| TCPServer |------->| UnixStreamServer |
+-----------+ +------------------+
|
v
+-----------+ +--------------------+
| UDPServer |------->| UnixDatagramServer |
+-----------+ +--------------------+ # 下面是五个类的继承关系图表,其中的四个代表四种类型的同步服务器:
+----------------+
| BaseServer |
+----------------+
|
v
+---------------+ +-------------------------+
| TCPServer |------->| UnixStreamServer |
+---------------+ +-------------------------+
|
v
+---------------+ +-----------------------------+
| UDPServer |------->| UnixDatagramServer |
+---------------+ +-----------------------------+ Note that UnixDatagramServer derives from UDPServer, not from
UnixStreamServer -- the only difference between an IP and a Unix
stream server is the address family, which is simply repeated in both
unix server classes. # 注意:UnixDatagramServer继承于UDPServer,而不是UnixStreamServer,
# IP和Unix stream server之间仅有的差异就是address family,两个服务器类的内容多数是简单的重复。 Forking and threading versions of each type of server can be created
using the ForkingMixIn and ThreadingMixIn mix-in classes. For
instance, a threading UDP server class is created as follows: class ThreadingUDPServer(ThreadingMixIn, UDPServer): pass # forking和threading 可以被创建用于ForkingMixIn和TreadingMixIn mix-in类。
# 例如: threading UDP server类会被如下方式创建:
# class ThreadingUDPServer(ThreadingMixIn, UDPServer): pass The Mix-in class must come first, since it overrides a method defined
in UDPServer! Setting the various member variables also changes
the behavior of the underlying server mechanism. # Mix-in 这个类必须首先实现,因为它重写了定义UDPServer的方法。
# 设置不同的成员变量也改变了基本的服务器构造方法。 To implement a service, you must derive a class from
BaseRequestHandler and redefine its handle() method. You can then run
various versions of the service by combining one of the server classes
with your request handler class. # 为了实现一个服务,你必须从基类BaseRequestHandler中重新定义它的handle方法。
# 然后通过把服务类与你重写的Handle方法类结合,以此运行新的服务类 The request handler class must be different for datagram or stream
services. This can be hidden by using the request handler
subclasses StreamRequestHandler or DatagramRequestHandler. # 请求处理类的TCP和UDP的方式是不同的,
# 这个可以通过使用请求处理的子类StreamRequestHandler或者DatagramRequestHandler来隐藏。 Of course, you still have to use your head! For instance, it makes no sense to use a forking server if the service
contains state in memory that can be modified by requests (since the
modifications in the child process would never reach the initial state
kept in the parent process and passed to each child). In this case,
you can use a threading server, but you will probably have to use
locks to avoid two requests that come in nearly simultaneous to apply
conflicting changes to the server state. # 当然,你还可以思考其他的方法。
# 例如,如果服务中包含请求修改的内存的状态,那么使用forking server没有任何意义
# (因为在子进程中修改将不对父进程的初始化状态有影响,父进程也不会把这个修改的参数传递给
# 其他子进程)。这种情况下,你可以使用threading server,而且你更有可能需要用到“锁”,
# 以此来避免两个请求同时到达而使服务器状态产生冲突。 On the other hand, if you are building e.g. an HTTP server, where all
data is stored externally (e.g. in the file system), a synchronous
class will essentially render the service "deaf" while one request is
being handled -- which may be for a very long time if a client is slow
to read all the data it has requested. Here a threading or forking
server is appropriate. # 此外,如果你在搭建如HTTP服务器等,所有的数据都会存储在外部(如文件系统中),
# 当客户端的一项请求被处理时,并且客户端的读取数据的速度很慢,synchronous class将
# 会使服务不做出响应,这可能需要维持很长时间。 In some cases, it may be appropriate to process part of a request
synchronously, but to finish processing in a forked child depending on
the request data. This can be implemented by using a synchronous
server and doing an explicit fork in the request handler class
handle() method. # 在一些情况下,请求同步可能需要恰当的方法,但是为了在子进程中完成请求要受到请求数据
# 的影响。这可以通过使用同步服务器来实现,并且在请求处理类中的Handle方法中明确指定fork
# 的进程。 Another approach to handling multiple simultaneous requests in an
environment that supports neither threads nor fork (or where these are
too expensive or inappropriate for the service) is to maintain an
explicit table of partially finished requests and to use a selector to
decide which request to work on next (or whether to handle a new
incoming request). This is particularly important for stream services
where each client can potentially be connected for a long time (if
threads or subprocesses cannot be used). # 另一种处理多个同时发生的请求的方法是维系一张明确的完成请求的表单,使用
# select()方法来判定哪个请求应该在接下来做出响应(或者判断是否要处理新到来的请求),
# 当每一个客户端需要建立很长时间的连接时,这对于stream services来说非常重要。
# (前提是不使用线程和子进程的方法) Future work:
- Standard classes for Sun RPC (which uses either UDP or TCP)
- Standard mix-in classes to implement various authentication
and encryption schemes #未来工作:
# -Sun RPC的标准类(使用UDP或TCP)
# -用于实现各种身份验证的标准混合类和加密方案 XXX Open problems:
- What to do with out-of-band data? #开放问题:如何处理带外数据? BaseServer:
- split generic "request" functionality out into BaseServer class.
Copyright (C) 2000 Luke Kenneth Casson Leighton <lkcl@samba.org> example: read entries from a SQL database (requires overriding
get_request() to return a table entry from the database).
entry is processed by a RequestHandlerClass. #BaseServer:
# -将泛型“请求”功能分离到BaseServer类中。
# 版权所有(C) 2000 Luke Kenneth Casson Leighton
# 示例:从SQL数据库中读取条目(需要重写)
# get_request()以从数据库返回表项。
# 条目由RequestHandlerClass处理。 """ # Author of the BaseServer patch: Luke Kenneth Casson Leighton __version__ = "0.4" import socket
import selectors
import os
import errno
import sys try:
import threading
except ImportError:
import dummy_threading as threading
from io import BufferedIOBase
from time import monotonic as time __all__ = ["BaseServer", "TCPServer", "UDPServer",
"ThreadingUDPServer", "ThreadingTCPServer",
"BaseRequestHandler", "StreamRequestHandler",
"DatagramRequestHandler", "ThreadingMixIn"]
if hasattr(os, "fork"):
__all__.extend(["ForkingUDPServer", "ForkingTCPServer", "ForkingMixIn"])
if hasattr(socket, "AF_UNIX"):
__all__.extend(["UnixStreamServer", "UnixDatagramServer",
"ThreadingUnixStreamServer",
"ThreadingUnixDatagramServer"]) # poll/select have the advantage of not requiring any extra file descriptor,
# contrarily to epoll/kqueue (also, they require a single syscall).
if hasattr(selectors, 'PollSelector'):
_ServerSelector = selectors.PollSelector
else:
_ServerSelector = selectors.SelectSelector class BaseServer:
"""Base class for server classes.
#服务类的基类
Methods for the caller:
# 可供调用的方法
- __init__(server_address, RequestHandlerClass)
- serve_forever(poll_interval=0.5)
- shutdown()
- handle_request() # if you do not use serve_forever()
- fileno() -> int # for selector Methods that may be overridden:
# 可以被覆盖的方法
- server_bind()
- server_activate()
- get_request() -> request, client_address
- handle_timeout()
- verify_request(request, client_address)
- server_close()
- process_request(request, client_address)
- shutdown_request(request)
- close_request(request)
- service_actions()
- handle_error() Methods for derived classes:
# 派生类的方法
- finish_request(request, client_address) Class variables that may be overridden by derived classes or
instances:
# 可以被派生类或重写的类变量实例 - timeout
- address_family
- socket_type
- allow_reuse_address Instance variables: - RequestHandlerClass
- socket """ timeout = None def __init__(self, server_address, RequestHandlerClass):
"""Constructor. May be extended, do not override."""
# 构造函数。可以扩展,不要覆盖。
self.server_address = server_address
self.RequestHandlerClass = RequestHandlerClass
self.__is_shut_down = threading.Event()
self.__shutdown_request = False def server_activate(self):
"""Called by constructor to activate the server. May be overridden.
# 由构造函数调用以激活服务器。可能会被覆盖。
"""
pass def serve_forever(self, poll_interval=0.5):
"""Handle one request at a time until shutdown. Polls for shutdown every poll_interval seconds. Ignores
self.timeout. If you need to do periodic tasks, do them in
another thread.
# 每次处理一个请求,直到关闭。为每一个poll_interval秒进行关闭轮询。
# 忽略了self.timeout。如果你需要做周期性的任务,那就做另一个线程。
"""
self.__is_shut_down.clear()
try:
# XXX: Consider using another file descriptor or connecting to the
# socket to wake this up instead of polling. Polling reduces our
# responsiveness to a shutdown request and wastes cpu at all other
# times.
# 考虑使用另一个文件描述符或连接到
# 套接字来唤醒这个,而不是轮询。轮询会降低我们的
# 对关闭请求的响应性,会浪费cpu很多次。
with _ServerSelector() as selector:
selector.register(self, selectors.EVENT_READ) while not self.__shutdown_request:
ready = selector.select(poll_interval)
if ready:
self._handle_request_noblock() self.service_actions()
finally:
self.__shutdown_request = False
self.__is_shut_down.set() def shutdown(self):
"""Stops the serve_forever loop. Blocks until the loop has finished. This must be called while
serve_forever() is running in another thread, or it will
deadlock.
# 停止serve_forever循环。阻塞直到循环结束。这必须在while调用serve_forever()
# 正在另一个线程中运行,否则它会运行死锁。
"""
self.__shutdown_request = True
self.__is_shut_down.wait() def service_actions(self):
"""Called by the serve_forever() loop. May be overridden by a subclass / Mixin to implement any code that
needs to be run during the loop.
# 由serve_forever()循环调用。可能被子类/ Mixin覆盖以实现任何代码需要在循环中运行。
# 由serve_forever()循环调用。可能被子类/ Mixin覆盖以实现任何代码需要在循环中运行。
"""
pass # The distinction between handling, getting, processing and finishing a
# request is fairly arbitrary. Remember:
# 注意操作、获取、并发的区别,完成一个请求是相当随意的。
# - handle_request() is the top-level call. It calls selector.select(),
# 操作请求函数是一个高级别的调用,它调用了selector.select().
# get_request(), verify_request() and process_request()
# 获取请求函数\验证函数和进程线程函数
# - get_request() is different for stream or datagram sockets
# get_request()对于流(TCP)或数据报(UDP)套接字是不同的
# - process_request() is the place that may fork a new process or create a
# new thread to finish the request
# process_request()可以派生一个新进程或创建一个新线程完成请求
# - finish_request() instantiates the request handler class; this
# constructor will handle the request all by itself
# finish_request()实例化请求处理程序类;这构造函数将自己处理请求 def handle_request(self):
"""Handle one request, possibly blocking.
# 处理一个请求,可能会阻塞。
Respects self.timeout.
#遵守超时规则
"""
# Support people who used socket.settimeout() to escape
# handle_request before self.timeout was available.
# 支持使用socket.settimeout(),如果没有将设置默认的
# self.timeout以确保可用
timeout = self.socket.gettimeout()
if timeout is None:
timeout = self.timeout
elif self.timeout is not None:
timeout = min(timeout, self.timeout)
if timeout is not None:
deadline = time() + timeout # Wait until a request arrives or the timeout expires - the loop is
# necessary to accommodate early wakeups due to EINTR.
# 等待直到请求到达或超时过期——循环是必须的,因为要适应中断而被唤醒的各种情况。
with _ServerSelector() as selector:
selector.register(self, selectors.EVENT_READ) while True:
ready = selector.select(timeout)
if ready:
return self._handle_request_noblock()
else:
if timeout is not None:
timeout = deadline - time()
if timeout < 0:
return self.handle_timeout() def _handle_request_noblock(self):
"""Handle one request, without blocking.
# 以非阻塞请求的方式处理一个请求
I assume that selector.select() has returned that the socket is
readable before this function was called, so there should be no risk of
blocking in get_request().
#我假设select .select()返回的是套接字在调用此函数之前可读,
因此应该在get_request()中没有阻塞风险。
"""
try:
request, client_address = self.get_request()
except OSError:
return
if self.verify_request(request, client_address):
try:
self.process_request(request, client_address)
except Exception:
self.handle_error(request, client_address)
self.shutdown_request(request)
except:
self.shutdown_request(request)
raise
else:
self.shutdown_request(request) def handle_timeout(self):
"""Called if no new request arrives within self.timeout.
# 如果self.timeout中没有新请求到达,则调用。
Overridden by ForkingMixIn.
被ForkingMixIn类覆盖
"""
pass def verify_request(self, request, client_address):
"""Verify the request. May be overridden.
# 验证这个请求,可能被覆盖
Return True if we should proceed with this request.
# 如果我们继续这个请求,返回True。
"""
return True def process_request(self, request, client_address):
"""Call finish_request.
# 调用 finish_request
Overridden by ForkingMixIn and ThreadingMixIn.
# 被ForkingMixIn and ThreadingMixIn覆盖
"""
self.finish_request(request, client_address)
self.shutdown_request(request) def server_close(self):
"""Called to clean-up the server.
# 调用以清理这个服务
May be overridden.
# 可能被覆盖
"""
pass def finish_request(self, request, client_address):
"""Finish one request by instantiating RequestHandlerClass."""
# 通过实例化RequestHandlerClass来完成一个请求。
self.RequestHandlerClass(request, client_address, self) def shutdown_request(self, request):
"""Called to shutdown and close an individual request."""
# 调用以关闭和关闭单个请求
self.close_request(request) def close_request(self, request):
"""Called to clean up an individual request."""
# 调用以清理单个请求
pass def handle_error(self, request, client_address):
"""Handle an error gracefully. May be overridden.
# 优雅地处理错误。可能会被覆盖。
The default is to print a traceback and continue.
# 默认情况是打印回溯记录并继续程序。
"""
print('-' * 40, file=sys.stderr)
print('Exception happened during processing of request from',
client_address, file=sys.stderr)
import traceback
traceback.print_exc()
print('-' * 40, file=sys.stderr) def __enter__(self):
return self def __exit__(self, *args):
self.server_close() class TCPServer(BaseServer):
"""Base class for various socket-based server classes.
# 用于各种基于套接字的服务器类的基类
Defaults to synchronous IP stream (i.e., TCP).
#默认为同步IP流(即TCP)
Methods for the caller:
# 可调用的方法
- __init__(server_address, RequestHandlerClass, bind_and_activate=True)
- serve_forever(poll_interval=0.5)
- shutdown()
- handle_request() # if you don't use serve_forever()
- fileno() -> int # for selector Methods that may be overridden:
# 可能被覆盖的方法
- server_bind()
- server_activate()
- get_request() -> request, client_address
- handle_timeout()
- verify_request(request, client_address)
- process_request(request, client_address)
- shutdown_request(request)
- close_request(request)
- handle_error() Methods for derived classes:
# 派生类的方法
- finish_request(request, client_address) Class variables that may be overridden by derived classes or
instances:
# 可以被派生类或重写的类变量实例
- timeout
- address_family
- socket_type
- request_queue_size (only for stream sockets)
- allow_reuse_address Instance variables:
# 实例化变量
- server_address
- RequestHandlerClass
- socket """ address_family = socket.AF_INET socket_type = socket.SOCK_STREAM request_queue_size = 5 allow_reuse_address = False def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True):
"""Constructor. May be extended, do not override."""
# 构造函数。可以扩展,不要覆盖。
BaseServer.__init__(self, server_address, RequestHandlerClass)
self.socket = socket.socket(self.address_family,
self.socket_type)
if bind_and_activate:
try:
self.server_bind()
self.server_activate()
except:
self.server_close()
raise def server_bind(self):
"""Called by constructor to bind the socket.
# 由构造函数调用以绑定套接字。
May be overridden.
# 可能被覆盖
"""
if self.allow_reuse_address:
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.socket.bind(self.server_address)
self.server_address = self.socket.getsockname() def server_activate(self):
"""Called by constructor to activate the server.
# 由构造函数调用以激活服务器
May be overridden.
# 可能被覆盖
"""
self.socket.listen(self.request_queue_size) def server_close(self):
"""Called to clean-up the server.
# 调用以清理服务
May be overridden.
# 可能被覆盖
"""
self.socket.close() def fileno(self):
"""Return socket file number.
# 返回套接字的文件描述符
Interface required by selector.
# selector需要的接口
"""
return self.socket.fileno() def get_request(self):
"""Get the request and client address from the socket.
# 从套接字获取请求和客户端地址
May be overridden.
# 可能被覆盖
"""
return self.socket.accept() def shutdown_request(self, request):
"""Called to shutdown and close an individual request."""
# 调用以关闭和关闭单个请求。
try:
# explicitly shutdown. socket.close() merely releases
# 明确地关闭。socket.close()仅仅是释放
# the socket and waits for GC to perform the actual close.
# 套接字并等待GC执行实际的关闭。
request.shutdown(socket.SHUT_WR)
except OSError:
pass # some platforms may raise ENOTCONN here #一些平台可能会在这里引发ENOTCONN错误
self.close_request(request) def close_request(self, request):
"""Called to clean up an individual request."""
# 调用以清理单个的请求
request.close() class UDPServer(TCPServer):
"""UDP server class."""
# UDP服务类
allow_reuse_address = False socket_type = socket.SOCK_DGRAM max_packet_size = 8192 def get_request(self):
data, client_addr = self.socket.recvfrom(self.max_packet_size)
return (data, self.socket), client_addr def server_activate(self):
# No need to call listen() for UDP.
# UDP服务类不需要listen()
pass def shutdown_request(self, request):
# No need to shutdown anything.
# UDP服务类不需要
self.close_request(request) def close_request(self, request):
# No need to close anything.
# UDP服务类不需要
pass if hasattr(os, "fork"):
class ForkingMixIn:
"""Mix-in class to handle each request in a new process."""
# 混合类在新进程中处理每个请求
timeout = 300
active_children = None
max_children = 40 def collect_children(self):
"""Internal routine to wait for children that have exited."""
# 等待子进程离开的内部程序
if self.active_children is None:
return # If we're above the max number of children, wait and reap them until
# we go back below threshold. Note that we use waitpid(-1) below to be
# able to collect children in size(<defunct children>) syscalls instead
# of size(<children>): the downside is that this might reap children
# which we didn't spawn, which is why we only resort to this when we're
# above max_children. # 如果我们超过了子进程的最大数量,那需要等待并回收他们直到我们回到阈值以下。
# 注意,我们使用下面的waitpid(-1)来回收(<僵尸进程>)系统调用代替了回收子进程
# 缺点是这可能会没有产生子进程,这就是为什么我们只有在最大子进程数量之上
# 才会使用它。
while len(self.active_children) >= self.max_children:
try:
pid, _ = os.waitpid(-1, 0)
self.active_children.discard(pid)
except ChildProcessError:
# we don't have any children, we're done
# 没有任何子进程
self.active_children.clear()
except OSError:
break # Now reap all defunct children.
# 现在回收所有的僵尸进程
for pid in self.active_children.copy():
try:
pid, _ = os.waitpid(pid, os.WNOHANG)
# if the child hasn't exited yet, pid will be 0 and ignored by
# 如果这个子进程没退出,PID会是0并被忽略
# discard() below
# 丢弃pid
self.active_children.discard(pid)
except ChildProcessError:
# someone else reaped it
# 其他的被回收了
self.active_children.discard(pid)
except OSError:
pass def handle_timeout(self):
"""Wait for zombies after self.timeout seconds of inactivity.
等着不活动超时秒僵尸进程出现
May be extended, do not override.
可能被扩展,不要被覆盖
"""
self.collect_children() def service_actions(self):
"""Collect the zombie child processes regularly in the ForkingMixIn.
定期在ForkingMixin中收集僵尸子进程
service_actions is called in the BaseServer's serve_forver loop.
service_actions在BaseServer的serve_forver循环中被调用。
"""
self.collect_children() def process_request(self, request, client_address):
"""Fork a new subprocess to process the request."""
# 派生一个新的子进程来处理请求
pid = os.fork()
if pid:
# Parent process
# 父进程
if self.active_children is None:
self.active_children = set()
self.active_children.add(pid)
self.close_request(request)
return
else:
# Child process.
# 子进程
# This must never return, hence os._exit()!
# 这里必须永远不能return,因此os._exit()!
status = 1
try:
self.finish_request(request, client_address)
status = 0
except Exception:
self.handle_error(request, client_address)
finally:
try:
self.shutdown_request(request)
finally:
os._exit(status) class ThreadingMixIn:
"""Mix-in class to handle each request in a new thread."""
# 混合类以处理新线程中的每个请求。
# Decides how threads will act upon termination of the
# main process
# 决定线程在主进程终止后将如何操作
daemon_threads = False def process_request_thread(self, request, client_address):
"""Same as in BaseServer but as a thread.
# 和在BaseServer中一样,但作为一个线程。
In addition, exception handling is done here.
# 此外,这里还进行了异常处理。
"""
try:
self.finish_request(request, client_address)
except Exception:
self.handle_error(request, client_address)
finally:
self.shutdown_request(request) def process_request(self, request, client_address):
"""Start a new thread to process the request."""
# 启动一个新的线程来处理请求。
t = threading.Thread(target=self.process_request_thread,
args=(request, client_address))
t.daemon = self.daemon_threads
t.start() if hasattr(os, "fork"):
class ForkingUDPServer(ForkingMixIn, UDPServer): pass class ForkingTCPServer(ForkingMixIn, TCPServer): pass class ThreadingUDPServer(ThreadingMixIn, UDPServer): pass class ThreadingTCPServer(ThreadingMixIn, TCPServer): pass if hasattr(socket, 'AF_UNIX'):
class UnixStreamServer(TCPServer):
address_family = socket.AF_UNIX class UnixDatagramServer(UDPServer):
address_family = socket.AF_UNIX class ThreadingUnixStreamServer(ThreadingMixIn, UnixStreamServer): pass class ThreadingUnixDatagramServer(ThreadingMixIn, UnixDatagramServer): pass class BaseRequestHandler:
"""Base class for request handler classes.
# 请求处理程序类的基类。
This class is instantiated for each request to be handled. The
constructor sets the instance variables request, client_address
and server, and then calls the handle() method. To implement a
specific service, all you need to do is to derive a class which
defines a handle() method. 这个类是为每个要处理的请求实例化的。构造函数设置实例化变量request client_address,
然后调用handle()方法,实现一个具体的服务,
您需要做的就是派生一个类定义handle()方法。 The handle() method can find the request as self.request, the
client address as self.client_address, and the server (in case it
needs access to per-server information) as self.server. Since a
separate instance is created for each request, the handle() method
can define other arbitrary instance variables. # handle()方法可以将request查找为self.request,客户端地址为self.client_address,
# 以及服务器(以防万一)需要访问每个服务器的信息)作为self.server。
# 每个请求都创建单独的实例对象,handle()方法可以定义其他任意实例变量。 """ def __init__(self, request, client_address, server):
self.request = request
self.client_address = client_address
self.server = server
self.setup()
try:
self.handle()
finally:
self.finish() def setup(self):
pass def handle(self):
pass def finish(self):
pass # The following two classes make it possible to use the same service
# class for stream or datagram servers. # 以下两个类使使用相同的服务成为可能
# 用于流服务器(TCP)或数据报服务器(UDP)。 # Each class sets up these instance variables:
# - rfile: a file object from which receives the request is read
# - wfile: a file object to which the reply is written
# When the handle() method returns, wfile is flushed properly # 每个类都设置这些实例变量:
# - rfile:接收请求的文件对象
# - wfile:写回复的文件对象
# 当handle()方法返回时,wfile被适时地刷新 class StreamRequestHandler(BaseRequestHandler):
"""Define self.rfile and self.wfile for stream sockets."""
# 为流服务器定义self.rfile和self.wfile # Default buffer sizes for rfile, wfile.
# We default rfile to buffered because otherwise it could be
# really slow for large data (a getc() call per byte); we make
# wfile unbuffered because (a) often after a write() we want to
# read and we need to flush the line; (b) big writes to unbuffered
# files are typically optimized by stdio even when big reads
# aren't. # 默认的rfile, wfile 缓冲区大小
#我们默认将rfile设置为缓存,否则可能会这样对于大数据
# (每个字节一个 getc()调用)来说确实很慢;
#我们做wfile未缓存,因为(a)通常在写入()之后阅读,我们需要不断刷新;
#标准输入输出通常会对未缓冲的大文件写入操作进行优化,即使是大文件的读操作也是如此。 rbufsize = -1
wbufsize = 0 # A timeout to apply to the request socket, if not None.
# 应用于请求套接字(如果不是None)的超时。
timeout = None # Disable nagle algorithm for this socket, if True.
# Use only when wbufsize != 0, to avoid small packets. # 如果为这个套接字,禁用nagle算法。
#仅在wbufsize != 0时使用,以避免小数据包。
disable_nagle_algorithm = False def setup(self):
self.connection = self.request
if self.timeout is not None:
self.connection.settimeout(self.timeout)
if self.disable_nagle_algorithm:
self.connection.setsockopt(socket.IPPROTO_TCP,
socket.TCP_NODELAY, True)
self.rfile = self.connection.makefile('rb', self.rbufsize)
if self.wbufsize == 0:
self.wfile = _SocketWriter(self.connection)
else:
self.wfile = self.connection.makefile('wb', self.wbufsize) def finish(self):
if not self.wfile.closed:
try:
self.wfile.flush()
except socket.error:
# A final socket error may have occurred here, such as
# the local error ECONNABORTED. # 最后一个套接字错误可能发生在这里,例如本地错误被终止。
pass
self.wfile.close()
self.rfile.close() class _SocketWriter(BufferedIOBase):
"""Simple writable BufferedIOBase implementation for a socket Does not hold data in a buffer, avoiding any need to call flush()."""
# 一个套接字的简单可写的BufferedIOBase实现
# 不将数据保存在缓冲区中,避免调用flush() def __init__(self, sock):
self._sock = sock def writable(self):
return True def write(self, b):
self._sock.sendall(b)
with memoryview(b) as view:
return view.nbytes def fileno(self):
return self._sock.fileno() class DatagramRequestHandler(BaseRequestHandler):
"""Define self.rfile and self.wfile for datagram sockets.""" # 定义self.rfile和self.wfile用于数据报套接字
def setup(self):
from io import BytesIO
self.packet, self.socket = self.request
self.rfile = BytesIO(self.packet)
self.wfile = BytesIO() def finish(self):
self.socket.sendto(self.wfile.getvalue(), self.client_address)
先复习继承知识
例子1
class Base:
def __init__(self,name):
self.name=name
def func(self):
print("from base")
class Son(Base):
def func(self):
print("from son")
sonobj= Son("nick")
sonobj.func()
输出结果
from son
分析下执行过程
1、加载class Base ,加载class Son
2、执行sonobj= Son("nick")语句,生成sonobj对象,这里首先寻找__init__
初始化方法,Son类没有,找Son的父类Base,找到父类的__init__
方法,执行父类的方法,将参数"nick"传入,注意这里的self其实是sonobj,即类Son的对象,而非Base的对象,执行self.name=name,为sonobj对象增加了name属性,初始化过程结束。
3、执行sonobj.func()语句,执行对象sonobj的func方法,首先要在这个sonobj对象自身的方法下面找,发现有func方法就执行该方法,而不需要再到父类Base执行func方法。
例子2
class Base:
def func(self):
print("from base")
class Son(Base):
def __init__(self,name):
self.name=name
def func(self):
print("from son")
class Base2:
def func(self):
print("from base2")
class Grandson(Base2,Son):
pass
sonobj= Grandson("nick")
sonobj.func()
输出结果
from base2
简单分析执行结果
1、在python3中所有的类都是新式类,执行的继承顺序是广度优先,所以类Grandson继承的顺序是
2、首先执行初始化方法,在base2中没找到,然后在Son中找到了,执行该方法,增加了对象的name属性,
3、执行func方法,根据继承顺序,先左后右,先在base2中找到了,就直接执行func方法,不再继续寻找。程序结束。
socketserver源码分析tcp版本
服务端
import socketserver
class Myserver(socketserver.BaseRequestHandler): #
def handle(self):
print("服务器开始运行了")
print(self.request) # 相当于socket里的conn链接
print(self.client_address) # 相当于socket里的addr,即客户端的ip地址
while True:
try:
res = self.request.recv(1024)
print(res.decode("utf-8"))
msg = input(">>>").strip()
self.request.send(msg.encode("utf-8"))
except Exception as e:
print(e)
break
if __name__ == "__main__":
ip, port = "127.0.0.1", 8090
serverobj = socketserver.ThreadingTCPServer((ip, port), Myserver)
serverobj.serve_forever()
客户端
import socket
ip, port = "127.0.0.1", 8090
buffer_size = 1024
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect((ip, port))
while True:
msg = input(">>>").strip()
if msg == "q": break
if not msg: continue
client.send(('客户端A:' + msg).encode("utf-8"))
res = client.recv(buffer_size)
print(res.decode("utf-8"))
分析内部执行过程:
1、
加载socketserver模块,加载class Myserver,执行ip, port = "127.0.0.1", 8090
语句
2、
执行
serverobj = socketserver.ThreadingTCPServer((ip, port), Myserver)
语句,寻找初始化方法,根据socketserver模块各个类继承关系图可知,ThreadingTCPServer继承了ThreadingMixIn, TCPServer,而TCPServer又继承了BaseServer。
2.1、
首先在左边的ThreadingMixIn寻找初始化方法,没有找到,再在TCPServer里寻找,找到了__init__
方法,执行该方法。
TCPServer下的__init__
方法
这里的参数server_address就是传入的(ip, port),而RequestHandlerClass就是自己定义的类Myserver
2.1.1、
这里首先执行BaseServer下的__init__
方法
BaseServer下的__init__
方法
执行该方法下面的4个语句,注意这里的self是ThreadingTCPServer的对象,而非BaseServer的对象,因为这次调用是ThreadingTCPServer的对象发起的,即服务端中的serverobj对象。
为serverobj增加了4个属性,将(ip,port)赋值为self.server_addressz属性,自己定义的类 Myserver赋值为RequestHandlerClass属性,执行threading.Event()赋值为self.__is_shut_down。
2.1.2、
执行
self.socket = socket.socket(self.address_family,self.socket_type)
语句
创建socket对象并将socket对象赋值给self.socket属性
2.1.3、
执行
if bind_and_activate:
try:
self.server_bind()
self.server_activate()
except:
self.server_close()
raise
2.1.3.1、
执行self.server_bind()语句,注意这里的self还是serverobj对象,所以需要重新开始寻找server_bind方法,从左边的ThreadingMixIn寻找,没有找到,在右边的TCPServer里寻找,找到了server_bind()方法
由于self.allow_reuse_address=False,不执行这里的if语句
self.socket.bind(self.server_address)
执行了socket对象的绑定ip\port,
执行
self.server_address = self.socket.getsockname()
将返回套接字自己的地址。通常是一个元组(ipaddr,port)重新赋值给self.server_address 属性
2.1.3.2、
执行self.server_activate()语句,与上面一样,先从ThreadingMixIn找,再找TCPServer,执行TCPServer下面的server_activate方法,
self.socket.listen(self.request_queue_size)
执行socket对象的监听动作
2.1.3.3、
如果出错,执行self.server_close(),还是一样的寻找过程,这里就不写了,直接写找到的结果,执行
TCPServer下面的server_close()方法,执行self.socket.close()语句,即关闭socket
2.2、
至此,serverobj对象的初始化方法__init__
才执行完
3、
执行serverobj.serve_forever()语句
3.1、
与上面一样,也是从从ThreadingMixIn找serve_foreve()方法,再找TCPServer下面有没有serve_foreve()方法,发现TCPServer下面也没有,最后找TCPServer的父类BaseServer,在BaseServer下面找到了serve_forever()方法
代码如下
def serve_forever(self, poll_interval=0.5):
"""Handle one request at a time until shutdown.
Polls for shutdown every poll_interval seconds. Ignores
self.timeout. If you need to do periodic tasks, do them in
another thread.
"""
self.__is_shut_down.clear()
try:
# XXX: Consider using another file descriptor or connecting to the
# socket to wake this up instead of polling. Polling reduces our
# responsiveness to a shutdown request and wastes cpu at all other
# times.
with _ServerSelector() as selector:
selector.register(self, selectors.EVENT_READ)
while not self.__shutdown_request:
ready = selector.select(poll_interval)
if ready:
self._handle_request_noblock()
self.service_actions()
finally:
self.__shutdown_request = False
self.__is_shut_down.set()
3.1.1、
执行self.__is_shut_down.clear()语句,清理线程事件状态(后面网络编程部分会学到)
3.1.2、
执行try语句
with _ServerSelector() as selector:
selector.register(self, selectors.EVENT_READ) #注册,表示关心EVENT_READ事件
执行IO多路复用(后面网络编程部分会学到),
3.1.3、
执行
while not self.__shutdown_request:
ready = selector.select(poll_interval)# 查询事件,在poll_interval时间里查询是不是发生了新的事件
if ready: #如果有新事件,则通过下面的语句处理请求
self._handle_request_noblock()
由于self.__shutdown_request默认值为False,所以这里是个死循环
3.1.3.1、
执行
self._handle_request_noblock()语句
寻找handle_request_noblock()方法与上面类似,最后是在BaseServer下面找到了handle_request_noblock()方法
def _handle_request_noblock(self):
"""Handle one request, without blocking.
I assume that selector.select() has returned that the socket is
readable before this function was called, so there should be no risk of
blocking in get_request().
"""
try:
request, client_address = self.get_request()
except OSError:
return
if self.verify_request(request, client_address):
try:
self.process_request(request, client_address)
except Exception:
self.handle_error(request, client_address)
self.shutdown_request(request)
except:
self.shutdown_request(request)
raise
else:
self.shutdown_request(request)
3.1.3.1.1、
执行try语句
try:
request, client_address = self.get_request()
执行self.get_request()方法,将结果分别赋值给request, client_address,
首先和上面一样,寻找到对象的get_request()方法,在TCPServer下面找到了get_request()方法
return self.socket.accept()
即socket对象的accept()操作,获得与用户连接的socket和用户地址。即这里的request是conn链接
3.1.3.1.2、
执行
if self.verify_request(request, client_address):
try:
self.process_request(request, client_address)
except Exception:
self.handle_error(request, client_address)
self.shutdown_request(request)
except:
self.shutdown_request(request)
raise
verify_request()来验证用户请求,找到BaseServer下面的verify_request(),
该方法默认返回True。
执行self.process_request(request, client_address)语句
找到ThreadingMixIn类下面的process_request()方法
def process_request(self, request, client_address):
"""Start a new thread to process the request."""
t = threading.Thread(target = self.process_request_thread,
args = (request, client_address)) #创建多线程
t.daemon = self.daemon_threads
t.start() #启动线程
这里创建了多线程,表示可以同时执行多个任务了。
这里多线程调用process_request_thread()方法
即执行
def process_request_thread(self, request, client_address):
"""Same as in BaseServer but as a thread.
In addition, exception handling is done here.
"""
try:
self.finish_request(request, client_address)
except Exception:
self.handle_error(request, client_address)
finally:
self.shutdown_request(request)
执行self.finish_request(request, client_address),即重新寻找finish_request()方法,在找到BaseServer下面的finish_request()方法
def finish_request(self, request, client_address):
"""Finish one request by instantiating RequestHandlerClass."""
self.RequestHandlerClass(request, client_address, self)
这里是将自己创建的类Myserver实例化,即执行Myserver(request, client_address, self)Z实例化过程,这里产生的实例与serverobj没有任何关系,这里要找到初始化方法,Myserver下没有,找Myserver继承的socketserver.BaseRequestHandler
在socketserver.BaseRequestHandler下找到了init方法
def __init__(self, request, client_address, server):
self.request = request
self.client_address = client_address
self.server = server
self.setup()
try:
self.handle()
finally:
self.finish()
这里的def __init__(self, request, client_address, server):
中self还是serverobj,增加了self.request、self.client_address、self.server 等属性给Myserver创建的对象
这里的self.server就是Myserver这个类,这里定义了一些空的方法self.setup()、self.handle()、self.finish(),子类可以通过派生增加功能
如果 try: self.process_request(request, client_address)出错则执行
except Exception:
self.handle_error(request, client_address)
self.shutdown_request(request)
调用执行BaseServer下面的handle_error()方法处理下错误,
然后调用执行BaseServer下面的shutdown_request()方法,
self.close_request(request)
而BaseServer这里又调用执行close_request()方法,这里又要重新根据继承关系找close_request()方法,在TCPServer下面找到close_request()
执行request.close()语句,即conn.close().
3.1.3.1.3、
其他情况执行
self.shutdown_request(request)
也是执行上面的request.close()语句。
3.1.4、
至此,self._handle_request_noblock()执行完成。
3.1.5、
执行self.service_actions()语句
3.1.6
执行
finally:
self.__shutdown_request = False
self.__is_shut_down.set()
设置__shutdown_request
属性,更改self.__is_shut_down
的属性
3.2
serverobj.serve_forever()执行完成
参考资料
[1]https://blog.csdn.net/xuxiaofei123456789/article/details/50774221