Windows上的Python多处理和网络

时间:2022-06-02 18:18:07

I'm trying to implement a tcp 'echo server'. Simple stuff:

我正在尝试实现一个tcp'echo server'。简单的东西:

  1. Client sends a message to the server.
  2. 客户端向服务器发送消息。
  3. Server receives the message
  4. 服务器接收消息
  5. Server converts message to uppercase
  6. 服务器将消息转换为大写
  7. Server sends modified message to client
  8. 服务器向客户端发送修改后的消
  9. Client prints the response.
  10. 客户端打印响应。

It worked well, so I decided to parallelize the server; make it so that it could handle multiple clients at time. Since most Python interpreters have a GIL, multithreading won't cut it. I had to use multiproces... And boy, this is where things went downhill.

它工作得很好,所以我决定并行化服务器;使它能够处理多个客户端。由于大多数Python解释器都有GIL,因此多线程不会削减它。我不得不使用多处理器......而男孩,这就是事情发生了变化的地方。

I'm using Windows 10 x64 and the WinPython suit with Python 3.5.2 x64.

我正在使用Windows 10 x64和WinPython适用于Python 3.5.2 x64。

My idea is to create a socket, intialize it (bind and listen), create sub processes and pass the socket to the children. But for the love of me... I can't make this work, my subprocesses die almost instantly. Initially I had some issues 'pickling' the socket... So I googled a bit and thought this was the issue. So I tried passing my socket thru a multiprocessing queue, through a pipe and my last attempt was 'forkpickling' and passing it as a bytes object during the processing creating. Nothing works.

我的想法是创建一个套接字,初始化它(绑定和监听),创建子进程并将套接字传递给子进程。但是对于我的爱...我无法做到这一点,我的子过程几乎立即死亡。最初我有一些问题'腌制'插座...所以我google了一下,并认为这是问题。所以我尝试通过一个管道将多个处理队列传递给我的套接字,我最后一次尝试是'forkpickling'并在处理创建过程中将其作为一个字节对象传递。什么都行不通。

Can someone please shed some light here? Tell me whats wrong? Maybe the whole idea (sharing sockets) is bad... And if so, PLEASE tell me how can I achieve my initial objective: enabling my server to ACTUALLY handle multiple clients at once (on Windows) (don't tell me about threading, we all know python's threading won't cut it ¬¬)

有人可以在这里说清楚吗?告诉我出了什么事?也许整个想法(共享套接字)是坏的...如果是这样,请告诉我如何实现我的初始目标:使我的服务器能够立即处理多个客户端(在Windows上)(不要告诉我有关线程的信息) ,我们都知道python的线程不会削减它¬¬)

It also worth noting that no files are create by the debug function. No process lived long enough to run it, I believe.

还值得注意的是,调试功能不会创建任何文件。我相信没有任何过程可以长时间运行它。

The typical output of my server code is (only difference between runs is the process numbers):

我的服务器代码的典型输出是(只有运行之间的差异是进程号):

Server is running...
Degree of parallelism: 4
Socket created.
Socket bount to: ('', 0)
Process 3604 is alive: True
Process 5188 is alive: True
Process 6800 is alive: True
Process 2844 is alive: True

Press ctrl+c to kill all processes.

Process 3604 is alive: False
Process 3604 exit code: 1
Process 5188 is alive: False
Process 5188 exit code: 1
Process 6800 is alive: False
Process 6800 exit code: 1
Process 2844 is alive: False
Process 2844 exit code: 1
The children died...
Why god?
WHYYyyyyy!!?!?!?

The server code:

服务器代码:

# Imports
import socket 
import packet
import sys
import os
from time import sleep
import multiprocessing as mp
import pickle
import io

# Constants
DEGREE_OF_PARALLELISM = 4
DEFAULT_HOST = ""
DEFAULT_PORT = 0

def _parse_cmd_line_args():
    arguments = sys.argv
    if len(arguments) == 1:
        return DEFAULT_HOST, DEFAULT_PORT
    else:
        raise NotImplemented()

def debug(data):
    pid = os.getpid()
    with open('C:\\Users\\Trauer\\Desktop\\debug\\'+str(pid)+'.txt', mode='a',
              encoding='utf8') as file:
        file.write(str(data) + '\n')

def handle_connection(client):
    client_data = client.recv(packet.MAX_PACKET_SIZE_BYTES)
    debug('received data from client: ' + str(len(client_data)))
    response = client_data.upper()
    client.send(response)    
    debug('sent data from client: ' + str(response))

def listen(picklez):    
    debug('started listen function')

    pid = os.getpid()
    server_socket = pickle.loads(picklez)
    debug('acquired socket')

    while True:
        debug('Sub process {0} is waiting for connection...'.format(str(pid)))

        client, address = server_socket.accept()
        debug('Sub process {0} accepted connection {1}'.format(str(pid),
              str(client)))

        handle_connection(client)        
        client.close()
        debug('Sub process {0} finished handling connection {1}'.
              format(str(pid),str(client)))

if __name__ == "__main__":    
#   Since most python interpreters have a GIL, multithreading won't cut
#   it... Oughta bust out some process, yo!
    host_port = _parse_cmd_line_args()
    print('Server is running...')
    print('Degree of parallelism: ' + str(DEGREE_OF_PARALLELISM))

    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    print('Socket created.')

    server_socket.bind(host_port)
    server_socket.listen(DEGREE_OF_PARALLELISM)
    print('Socket bount to: ' + str(host_port))        

    buffer = io.BytesIO()
    mp.reduction.ForkingPickler(buffer).dump(server_socket)
    picklez = buffer.getvalue()

    children = []
    for i in range(DEGREE_OF_PARALLELISM):        
        child_process = mp.Process(target=listen, args=(picklez,))
        child_process.daemon = True
        child_process.start()
        children.append(child_process)

        while not child_process.pid:
            sleep(.25)

        print('Process {0} is alive: {1}'.format(str(child_process.pid), 
              str(child_process.is_alive())))     
    print()    

    kids_are_alive = True
    while kids_are_alive:
        print('Press ctrl+c to kill all processes.\n')
        sleep(1) 

        exit_codes = []
        for child_process in children:
            print('Process {0} is alive: {1}'.format(str(child_process.pid), 
              str(child_process.is_alive())))
            print('Process {0} exit code: {1}'.format(str(child_process.pid), 
              str(child_process.exitcode)))
            exit_codes.append(child_process.exitcode)

        if all(exit_codes):
            # Why do they die so young? :(
            print('The children died...')
            print('Why god?')
            print('WHYYyyyyy!!?!?!?')
            kids_are_alive = False

edit: fixed the signature of "listen". My processes still die instantly.

编辑:修复了“听”的签名。我的进程仍然立即死亡。

edit2: User cmidi pointed out that this code does work on Linux; so my question is: How can I 'made this work' on Windows?

edit2:用户cmidi指出此代码适用于Linux;所以我的问题是:如何在Windows上“完成这项工作”?

2 个解决方案

#1


3  

You can directly pass a socket to a child process. multiprocessing registers a reduction for this, for which the Windows implementation uses the following DupSocket class from multiprocessing.resource_sharer:

您可以直接将套接字传递给子进程。多处理为此注册了一个减少,Windows实现使用multiprocessing.resource_sharer中的以下DupSocket类:

class DupSocket(object):
    '''Picklable wrapper for a socket.'''
    def __init__(self, sock):
        new_sock = sock.dup()
        def send(conn, pid):
            share = new_sock.share(pid)
            conn.send_bytes(share)
        self._id = _resource_sharer.register(send, new_sock.close)

    def detach(self):
        '''Get the socket.  This should only be called once.'''
        with _resource_sharer.get_connection(self._id) as conn:
            share = conn.recv_bytes()
            return socket.fromshare(share)

This calls the Windows socket share method, which returns the protocol info buffer from calling WSADuplicateSocket. It registers with the resource sharer to send this buffer over a connection to the child process. The child in turn calls detach, which receives the protocol info buffer and reconstructs the socket via socket.fromshare.

这将调用Windows套接字共享方法,该方法从调用WSADuplicateSocket返回协议信息缓冲区。它向资源共享器注册,以通过与子进程的连接发送此缓冲区。子进程调用detach,它接收协议信息缓冲区并通过socket.fromshare重建套接字。

It's not directly related to your problem, but I recommend that you redesign the server to instead call accept in the main process, which is the way this is normally done (e.g. in Python's socketserver.ForkingTCPServer module). Pass the resulting (conn, address) tuple to the first available worker over a multiprocessing.Queue, which is shared by all of the workers in the process pool. Or consider using a multiprocessing.Pool with apply_async.

它与您的问题没有直接关系,但我建议您重新设计服务器,而不是在主进程中调用accept,这是通常的方式(例如在Python的socketserver.ForkingTCPServer模块中)。将生成的(conn,address)元组通过multiprocessing.Queue传递给第一个可用的worker,该处理池由进程池中的所有worker共享。或者考虑使用带有apply_async的multiprocessing.Pool。

#2


0  

def listen() the target/start for your child processes does not take any argument but you are providing serialized socket as an argument args=(picklez,) to the child process this would cause an exception in the child process and exit immediately.

def listen()你的子进程的目标/开始没有任何参数,但你提供序列化套接字作为参数args =(picklez,)到子进程,这将导致子进程中的异常并立即退出。

TypeError: listen() takes no arguments (1 given)

def listen(picklez) should solve the problem this will provide one argument to the target of your child processes.

def listen(picklez)应该解决问题,这将为你的子进程的目标提供一个参数。

#1


3  

You can directly pass a socket to a child process. multiprocessing registers a reduction for this, for which the Windows implementation uses the following DupSocket class from multiprocessing.resource_sharer:

您可以直接将套接字传递给子进程。多处理为此注册了一个减少,Windows实现使用multiprocessing.resource_sharer中的以下DupSocket类:

class DupSocket(object):
    '''Picklable wrapper for a socket.'''
    def __init__(self, sock):
        new_sock = sock.dup()
        def send(conn, pid):
            share = new_sock.share(pid)
            conn.send_bytes(share)
        self._id = _resource_sharer.register(send, new_sock.close)

    def detach(self):
        '''Get the socket.  This should only be called once.'''
        with _resource_sharer.get_connection(self._id) as conn:
            share = conn.recv_bytes()
            return socket.fromshare(share)

This calls the Windows socket share method, which returns the protocol info buffer from calling WSADuplicateSocket. It registers with the resource sharer to send this buffer over a connection to the child process. The child in turn calls detach, which receives the protocol info buffer and reconstructs the socket via socket.fromshare.

这将调用Windows套接字共享方法,该方法从调用WSADuplicateSocket返回协议信息缓冲区。它向资源共享器注册,以通过与子进程的连接发送此缓冲区。子进程调用detach,它接收协议信息缓冲区并通过socket.fromshare重建套接字。

It's not directly related to your problem, but I recommend that you redesign the server to instead call accept in the main process, which is the way this is normally done (e.g. in Python's socketserver.ForkingTCPServer module). Pass the resulting (conn, address) tuple to the first available worker over a multiprocessing.Queue, which is shared by all of the workers in the process pool. Or consider using a multiprocessing.Pool with apply_async.

它与您的问题没有直接关系,但我建议您重新设计服务器,而不是在主进程中调用accept,这是通常的方式(例如在Python的socketserver.ForkingTCPServer模块中)。将生成的(conn,address)元组通过multiprocessing.Queue传递给第一个可用的worker,该处理池由进程池中的所有worker共享。或者考虑使用带有apply_async的multiprocessing.Pool。

#2


0  

def listen() the target/start for your child processes does not take any argument but you are providing serialized socket as an argument args=(picklez,) to the child process this would cause an exception in the child process and exit immediately.

def listen()你的子进程的目标/开始没有任何参数,但你提供序列化套接字作为参数args =(picklez,)到子进程,这将导致子进程中的异常并立即退出。

TypeError: listen() takes no arguments (1 given)

def listen(picklez) should solve the problem this will provide one argument to the target of your child processes.

def listen(picklez)应该解决问题,这将为你的子进程的目标提供一个参数。