使用多路复用套接字 IO 提升性能

时间:2021-01-01 10:59:06

本文基于 《Python网络编程攻略》 第二章,使用 Python3 改写。
主要介绍以下内容:

  • 使用一些有用的技术提升套接字服务器的性能
  • select 模块
  • 使用 Diesel 并发库的示例

原文链接:使用多路复用套接字 I/O 提升性能

在套接字服务器程序中使用 ForkingMixIn

#!/usr/bin/env python3
# -*- coding: utf-8 -*-


import os
import socket
import threading
import socketserver

SERVER_HOST = 'localhost'
SERVER_PORT = 0
BUF_SIZE = 1024
ECHO_MSG = 'Hello echo server!'


class ForkingClient():

def __init__(self, ip, port):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect((ip, port))

def run(self):
current_porcess_id = os.getpid()
print('PID {} Sending echo message to the server: {}'\
.format(current_porcess_id, ECHO_MSG))

sent_data_length = self.sock.send(ECHO_MSG.encode())
print('Sent: {} characters, so far...'.format(sent_data_length))

response = self.sock.recv(BUF_SIZE)
print('PID {} received: {}'.format(current_porcess_id, response[5:]))

def shutdown(self):
self.sock.close()


class ForkingServerRequestHandler(socketserver.BaseRequestHandler):

def handle(self):
data = self.request.recv(BUF_SIZE)
current_porcess_id = os.getpid()
response = '{}: {}'.format(current_porcess_id, data)
print('Server sending response [current_porcess_id: data] <==> {}'\
.format(response))
self.request.send(response.encode())
return


class ForkingServer(socketserver.ForkingMixIn, socketserver.TCPServer):

pass


def main():

server = ForkingServer((SERVER_HOST, SERVER_PORT),
ForkingServerRequestHandler)

ip, port = server.server_address
server_thread = threading.Thread(target = server.serve_forever)
server_thread.setDaemon(True)
server_thread.start()
print('Server loop running PID: {}'.format(os.getpid()))

client1 = ForkingClient(ip, port)
client1.run()

client2 = ForkingClient(ip, port)
client2.run()

server.shutdown()
client1.shutdown()
client2.shutdown()
server.socket.close()


if __name__ == '__main__':
main()

说明:通过 ForkingMixIn 创建进程,通过 ThreadingMixIn 创建线程

输出:

Server loop running PID: 13314
PID 13314 Sending echo message to the server: Hello echo server!
Sent: 18 characters, so far...
Server sending response [current_porcess_id: data] <==> 13316: b'Hello echo server!'
PID 13314 received: b": b'Hello echo server!'"
PID 13314 Sending echo message to the server: Hello echo server!
Sent: 18 characters, so far...
Server sending response [current_porcess_id: data] <==> 13317: b'Hello echo server!'
PID 13314 received: b": b'Hello echo server!'"

在套接字服务器程序中使用 ThreadingMixIn

或许基于某些原因你不想编写基于进程的应用程序,而更愿意编写多线程应用程序。
可能的原因有:
在线程之间共享应用的状态,避免进程间通信的复杂操作,等等。遇到这种需求,如果想使用 SocketServer 库编写异步网络服务器,就得使用 ThreadingMixIn 类。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-


import os
import socket
import threading
import socketserver

SERVER_HOST = 'localhost'
SERVER_PORT = 0
BUF_SIZE = 1024
# ECHO_MSG = 'Hello echo server!'


def client(ip, port, message):


sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((ip, port))
try:
sock.sendall(message.encode())
response = sock.recv(BUF_SIZE)
print('Client received <==> {}'.format(response.decode()))
finally:
sock.close()


class ThreadedTCPServerRequestHandler(socketserver.BaseRequestHandler):

def handle(self):
data = self.request.recv(BUF_SIZE)
current_thread = threading.current_thread()
response = '{}: {}'.format(current_thread.name, data.decode())
# print('Server sending response [current_thread_name: data] <==> {}'\
# .format(response))
self.request.sendall(response.encode())


class ThreadedTCPServer(socketserver.ForkingMixIn, socketserver.TCPServer):

pass


def main():

server = ThreadedTCPServer((SERVER_HOST, SERVER_PORT),
ThreadedTCPServerRequestHandler)

ip, port = server.server_address
server_thread = threading.Thread(target = server.serve_forever)
server_thread.daemon = True
server_thread.start()
print('Server loop running on thread: {}'.format(server_thread.name))

client(ip, port, '"Hello" from client 1')
client(ip, port, '"Hello" from client 2')
client(ip, port, '"Hello" from client 3')

server.shutdown()

if __name__ == '__main__':
main()

说明:在客户端和服务器的通信中用到了sendall()方法,以保证发送的数据无任何丢失。

输出:

Server loop running on thread: Thread-1
Client received <==> Thread-1: "Hello" from client 1
Client received <==> Thread-1: "Hello" from client 2
Client received <==> Thread-1: "Hello" from client 3

使用 select.select 编写一个聊天室服务器

在大型网络服务器应用程序中可能有几百或几千个客户端同时连接服务器,此时为每个客户端创建单独的线程或进程可能不切实际。由于内存可用量受限,且主机的 CPU 能力有限,我们需要一种更好的技术来处理大量的客户端。幸好,Python 提供的 select 模块能解决这一问题。

服务器端代码:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import select
import socket
import sys
import signal
from com import send, receive, parser


class ChatServer(object):

def __init__(self, port, backlog = 5):
self.clientnum = 0
self.clientmap = {}
self.outputs = [] # List output sockets

self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server.bind(('', port))

print('Server listening to port: {} ...'.format(port))
self.server.listen(backlog)
# Catch keyboard interrupts
signal.signal(signal.SIGINT, self.signalhandler)

def signalhandler(self, signalnum, stackframe):

print('Shutting down server...')
# Close existing client sockets
for output in self.outputs:
output.close()

self.server.close()

def get_client_name(self, client):

info = self.clientmap[client]
host, name = info[0][0], info[1]

return '@'.join((name, host))

def run(self):
inputs = [self.server, sys.stdin]
self.outputs = []

running = True

while running:

try:
readable, writeable, exceptional = select.select(inputs,
self.outputs, [])

except select.error as e:
print(e)
break

except socket.error as e:
print(e)
break

for sock in readable:

if sock == self.server:
# handle the server socket
client, address = self.server.accept()
print('Chat server: got connection {} from {}'\
.format(client.fileno(), address))
# Read the login name
login_name = receive(client).split('NAME: ')[1]

# Compute client name and send back
self.clientnum += 1
send(client, 'CLIENT: {}'.format(address[0]))
inputs.append(client)

self.clientmap[client] = (address, login_name)
# Send joining information to other clients
msg = '\n(Connected: New client ({}) from {})'\
.format(self.clientnum, self.get_client_name(client))

for output in self.outputs:
send(output, msg)

self.outputs.append(client)

elif sock == sys.stdin:
# handle standard input
junk = sys.stdin.readline()
running = False

else:
# handle all other sockets
try:

data = receive(sock)
if data:
# Send as new client's message
msg = '\n#[{}]>> {}'\
.format(self.get_client_name(sock),data)
# Send data to all except ourself
for output in self.outputs:
if output != sock:
send(output, msg)

else:
print('Chat server: {} hung up'.format(
sock.fileno()))

self.clientnum -= 1
sock.close()
inputs.remove(sock)
self.outputs.remove(sock)

# Sending client leaving information to others
msg = '\n(Now hung up: Client from {})'.format(
self.get_client_name(sock))

for output in self.outputs:
send(output, msg)

except socket.error as e:
# Remove
print(e)

inputs.remove(sock)
self.outputs.remove(sock)


self.server.close()

if __name__ == '__main__':

name, port = parser()

if name == 'server':
server = ChatServer(port)
server.run()

客户端代码:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import socket
import sys
import select
from com import send, receive, parser

class ChatClient(object):

def __init__(self, name, port, host = 'localhost'):

self.name = name
self.connected = False
self.host = host
self.port = port

# Initial prompt
fmt = '@'.join((name, socket.gethostname().split('.')[0]))
self.prompt = '[{}]> '.format(fmt)

# Connect to server at port
try:
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect((host, self.port))
print('Now connected to chat server @ port {}'.format(self.port))
self.connected = True

# send my name ...
send(self.sock, 'NAME: ' + self.name)
data = receive(self.sock)

addr = data.split('CLIENT: ')[1]
fmt = '@'.join((self.name, addr))
self.prompt = '[{}]> '.format(fmt)

except socket.error as e:
print('Failed to connect to chat server @ port {}\n{}'\
.format(self.port, e))

sys.exit(1)

def run(self):

while self.connected:
try:
sys.stdout.write(self.prompt)
sys.stdout.flush()

readable, writeable, exceptional = select.select(
[0, self.sock], [], [])

for sock in readable:

if sock == 0:
data = sys.stdin.readline().strip()
if data:
send(self.sock, data)

elif sock == self.sock:
data = receive(self.sock)
if not data:
print('Client shutting down.')
self.connected = False
break
else:
sys.stdout.write(data + '\n')
sys.stdout.flush()

except KeyboardInterrupt:
print('Client interrupted.')
self.sock.close()
break


if __name__ == '__main__':

name, port = parser()

client = ChatClient(name = name, port = port)
client.run()

com.py:

import pickle
import socket
import struct
import argparse


def send(channel, *args):
buf = pickle.dumps(args)
value = socket.htonl(len(buf))
size = struct.pack('L', value)
channel.send(size)
channel.send(buf)

def receive(channel):
size = struct.calcsize('L')
size = channel.recv(size)
try:
size = socket.ntohl(struct.unpack('L', size)[0])
except struct.error as e:
return ''
buf = ""
while len(buf) < size:
buf = channel.recv(size - len(buf))

return pickle.loads(buf)[0]

def parser():

parser = argparse.ArgumentParser(description = 'Socket Server Example with select')
parser.add_argument('--name', action = 'store', dest = 'name', required = True)
parser.add_argument('--port', action = 'store', dest = 'port', type = int,
required = True)
given_args = parser.parse_args()
port = given_args.port
name = given_args.name

return name, port

输出:

服务器:

$ ./chat_server.py --name server --port 8800
Server listening to port: 8800 ...
Chat server: got connection 4 from ('127.0.0.1', 59936)
Chat server: got connection 5 from ('127.0.0.1', 59938)

客户端 1:

$ ./chat_client.py --name client1 --port 8800
Now connected to chat server @ port 8800
[client1@127.0.0.1]>
(Connected: New client (2) from client2@127.0.0.1)
[client1@127.0.0.1]> Hello from client 1
[client1@127.0.0.1]>
#[client2@127.0.0.1]>> Hello from client 2
[client1@127.0.0.1]>

客户端 2:

$ ./chat_client.py --name client2 --port 8800
Now connected to chat server @ port 8800
[client2@127.0.0.1]>
#[client1@127.0.0.1]>> Hello from client 1
[client2@127.0.0.1]> Hello from client 2
[client2@127.0.0.1]>

使用多路复用套接字 IO 提升性能

使用多路复用套接字 IO 提升性能

使用多路复用套接字 IO 提升性能

使用 select.epoll 多路复用 Web 服务器

Python 的 select 模块中有很多针对特定平台的网络事件管理函数。在 Linux 设备中可以使用 epoll。这个函数利用操作系统内核轮询网络事件,让脚本知道有事件发生了。这听起来比前面介绍的 select.select 方案更高效。

#!/usr/bin/env python3

import socket
import select
import argparse
import ntplib
from time import ctime

SERVER_HOST = 'localhost'

EOL1 = b'\n\n'
EOL2 = b'\n\r\n'


def get_time():

ntp_client = ntplib.NTPClient()
response = ntp_client.request('time1.aliyun.com')

return ctime(response.tx_time)

def server_response():

response = (b'HTTP/1.1 200 OK\r\nDate: %b\r\n'
b'Content-Type: text/plain\r\nContent-Length: 25\r\n'
b'\r\nHello from Epoll Server!') % get_time().encode()

return response


class EpollServer(object):

def __init__(self, host = SERVER_HOST, port = 0):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.sock.bind((host, port))
self.sock.listen(1)
self.sock.setblocking(0)
self.sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
print('Started Epoll Server')

self.epoll = select.epoll()
self.epoll.register(self.sock.fileno(), select.EPOLLIN)

def run(self):
try:
connections = {}
requests = {}
responses = {}
while True:
events = self.epoll.poll(1)

for fileno, event in events:
if fileno == self.sock.fileno():
connection, address = self.sock.accept()
connection.setblocking(0)

self.epoll.register(connection.fileno(),
select.EPOLLIN)

connections[connection.fileno()] = connection
requests[connection.fileno()] = b''
responses[connection.fileno()] = server_response()

elif event & select.EPOLLIN:
requests[fileno] += connections[fileno].recv(1024)

if (EOL1 in requests[fileno] or
EOL2 in requests[fileno]):

self.epoll.modify(fileno, select.EPOLLOUT)
print('-'*40 + '\n' + requests[fileno].decode()[:-2])

elif event & select.EPOLLOUT:
byteswriteten = connections[fileno].send(responses[fileno])
responses[fileno] = responses[fileno][byteswriteten:]

if len(responses[fileno]) == 0:
self.epoll.modify(fileno, 0)
connections[fileno].shutdown(socket.SHUT_RDWR)

elif event & select.EPOLLHUP:
self.epoll.unregister(fileno)
connections[fileno].close()
del connections[fileno]

finally:

self.epoll.unregister(self.sock.fileno())
self.epoll.close()
self.sock.close()

if __name__ == '__main__':

parser = argparse.ArgumentParser(description = 'Socket Server Example with Epoll')
parser.add_argument('--port', action = 'store', dest = 'port', type = int,
required = True)
given_args = parser.parse_args()
port = given_args.port

server = EpollServer(host = SERVER_HOST, port = port)
server.run()

说明:运行这个脚本,在网页浏览器(例如 Firefox 或 IE)中输入 http://localhost:8800/ 访问服务器

输出:

$ ./simple_web_server_with_epoll.py --port 8800Started Epoll Server
----------------------------------------

GET / HTTP/1.1
Host: localhost:8800
User-Agent: Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:54.0) Gecko/20100101 Firefox/54.0
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8
Accept-Language: en-US,en;q=0.5
Accept-Encoding: gzip, deflate
Cookie: Hm_
lvt_1948c8abb4e3b3ec7217fa1df21a3a4a=1500028185,1500032610,1500111003,1500347942; Hm_lvt_fc9fe892eb6094742712cb9c6c539865=1499869521; _ga=GA1.1.1414537744.1499914905
Connection: keep-alive
Upgrade-Insecure-Requests: 1

----------------------------------------
GET /favicon.ico HTTP/1.1
Host: localhost:8800
User-Agent: Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:54.0) Gecko/20100101 Firefox/54.0
Accept: */*
Accept-Language: en-US,en;q=0.5
Accept-Encoding: gzip, deflate
Cookie: Hm_lvt_1948c8abb4e3b3ec7217fa1df21a3a4a=1500028185,1500032610,1500111003,1500347942; Hm_lvt_fc9fe892eb6094742712cb9c6c539865=1499869521; _ga=GA1.1.1414537744.1499914905
Connection: keep-alive

----------------------------------------

GET /favicon.ico HTTP/1.1
Host: localhost:8800
User-Agent: Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:54.0) Gecko/20100101 Firefox/54.0
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8
Accept-Language: en-US,en;q=0.5
Accept-Encoding: gzip, deflate
Cookie: Hm_
lvt_1948c8abb4e3b3ec7217fa1df21a3a4a=1500028185,1500032610,1500111003,1500347942; Hm_lvt_fc9fe892eb6094742712cb9c6c539865=1499869521; _ga=GA1.1.1414537744.1499914905
Connection: keep-alive

使用多路复用套接字 IO 提升性能

使用并发库 Diesel 多路复用回显服务器

有时你需要编写一个大型自定义网络应用程序,但不想重复输入初始化服务器的代码,比如说创建套接字、绑定地址、监听以及处理基本的错误等。有很多 Python 网络库都可以帮助你把样板代码删除。这里我们要使用一个提供这种功能的库,它叫作 Diesel。

Diesel 目前不兼容 Python3

#!/usr/bin/env python
# not to support python3, 2017.07.20

import diesel
import argparse


class EchoServer(object):

def handler(self, remote_addr):

host, port = remote_addr[0], remote_addr[1]
print('Echo client connected from: {}:{}'.format(host, port))

while True:
try:
message = diesel.until_eol()
your_message = ': '.join(['Your said', message])
diesel.send(your_message)
except Exception as e:
print('Exception:', e)

def main(server_port):
app = diesel.Application()
server = EchoServer()
app.add_service(diesel.Service(server.handler, server_port))
app.run()

if __name__ == '__main__':
parser = argparse.ArgumentParser(description = 'Echo server example with Diesel')
parser.add_argument('--port', action = 'store', dest = 'port', type = int,
required = True)
given_args = parser.parse_args()
port = given_args.port
main(port)

输出:

$ ./echo_server_with_diesel.py --port 8800
[2017/07/20 13:52:25] {diesel} WARNING|Starting diesel <hand-rolled select.epoll>
Echo client connected from: 127.0.0.1:59992
$ telnet localhost 8800
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
Hello Diesel server?
Your said: Hello Diesel server?