如何将python dict与多处理同步

时间:2021-09-17 20:41:25

I am using Python 2.6 and the multiprocessing module for multi-threading. Now I would like to have a synchronized dict (where the only atomic operation I really need is the += operator on a value).

我使用Python 2.6和多处理模块进行多线程处理。现在我想有一个同步的dict(我真正需要的唯一原子操作是值上的+ =运算符)。

Should I wrap the dict with a multiprocessing.sharedctypes.synchronized() call? Or is another way the way to go?

我应该使用multiprocessing.sharedctypes.synchronized()调用来包装dict吗?还是另一种方式去?

4 个解决方案

#1


54  

Intro

There seems to be a lot of arm-chair suggestions and no working examples. None of the answers listed here even suggest using multiprocessing and this is quite a bit disappointing and disturbing. As python lovers we should support our built-in libraries, and while parallel processing and synchronization is never a trivial matter, I believe it can be made trivial with proper design. This is becoming extremely important in modern multi-core architectures and cannot be stressed enough! That said, I am far from satisfied with the multiprocessing library, as it is still in its infancy stages with quite a few pitfalls, bugs, and being geared towards functional programming (which I detest). Currently I still prefer the Pyro module (which is way ahead of its time) over multiprocessing due to multiprocessing's severe limitation in being unable to share newly created objects while the server is running. The "register" class-method of the manager objects will only actually register an object BEFORE the manager (or its server) is started. Enough chatter, more code:

似乎有很多扶手椅建议,没有工作实例。这里列出的答案都没有建议使用多处理,这有点令人失望和令人不安。作为python爱好者,我们应该支持我们的内置库,虽然并行处理和同步从来都不是一件小事,但我相信它可以通过适当的设计变得微不足道。这在现代多核架构中变得非常重要,并且不能过分强调!也就是说,我对多处理库很不满意,因为它仍然处于初期阶段,存在很多陷阱,错误,并且面向功能编程(我不喜欢)。目前,由于多处理在服务器运行时无法共享新创建的对象的严重限制,我仍然更喜欢Pyro模块(远远超过它的时间)。管理器对象的“register”类方法只会在管理器(或其服务器)启动之前实际注册一个对象。足够的喋喋不休,更多代码:

Server.py

from multiprocessing.managers import SyncManager


class MyManager(SyncManager):
    pass


syncdict = {}
def get_dict():
    return syncdict

if __name__ == "__main__":
    MyManager.register("syncdict", get_dict)
    manager = MyManager(("127.0.0.1", 5000), authkey="password")
    manager.start()
    raw_input("Press any key to kill server".center(50, "-"))
    manager.shutdown()

In the above code example, Server.py makes use of multiprocessing's SyncManager which can supply synchronized shared objects. This code will not work running in the interpreter because the multiprocessing library is quite touchy on how to find the "callable" for each registered object. Running Server.py will start a customized SyncManager that shares the syncdict dictionary for use of multiple processes and can be connected to clients either on the same machine, or if run on an IP address other than loopback, other machines. In this case the server is run on loopback (127.0.0.1) on port 5000. Using the authkey parameter uses secure connections when manipulating syncdict. When any key is pressed the manager is shutdown.

在上面的代码示例中,Server.py使用了多处理的SyncManager,它可以提供同步的共享对象。此代码无法在解释器中运行,因为多处理库对于如何为每个注册对象查找“可调用”非常敏感。运行Server.py将启动一个自定义SyncManager,该SyncManager共享syncdict字典以使用多个进程,并且可以在同一台计算机上连接到客户端,或者运行在除环回之外的IP地址上的其他计算机上。在这种情况下,服务器在端口5000上的环回(127.0.0.1)上运行。使用authkey参数在操作syncdict时使用安全连接。按下任何键时,管理器将关闭。

Client.py

from multiprocessing.managers import SyncManager
import sys, time

class MyManager(SyncManager):
    pass

MyManager.register("syncdict")

if __name__ == "__main__":
    manager = MyManager(("127.0.0.1", 5000), authkey="password")
    manager.connect()
    syncdict = manager.syncdict()

    print "dict = %s" % (dir(syncdict))
    key = raw_input("Enter key to update: ")
    inc = float(raw_input("Enter increment: "))
    sleep = float(raw_input("Enter sleep time (sec): "))

    try:
         #if the key doesn't exist create it
         if not syncdict.has_key(key):
             syncdict.update([(key, 0)])
         #increment key value every sleep seconds
         #then print syncdict
         while True:
              syncdict.update([(key, syncdict.get(key) + inc)])
              time.sleep(sleep)
              print "%s" % (syncdict)
    except KeyboardInterrupt:
         print "Killed client"

The client must also create a customized SyncManager, registering "syncdict", this time without passing in a callable to retrieve the shared dict. It then uses the customized SycnManager to connect using the loopback IP address (127.0.0.1) on port 5000 and an authkey establishing a secure connection to the manager started in Server.py. It retrieves the shared dict syncdict by calling the registered callable on the manager. It prompts the user for the following:

客户端还必须创建一个自定义的SyncManager,注册“syncdict”,这次没有传入一个callable来检索共享的dict。然后,它使用定制的SycnManager使用端口5000上的环回IP地址(127.0.0.1)进行连接,并使用authkey建立与Server.py中启动的管理器的安全连接。它通过调用管理器上注册的callable来检索共享的dict syncdict。它会提示用户输入以下内容:

  1. The key in syncdict to operate on
  2. 同步操作的关键

  3. The amount to increment the value accessed by the key every cycle
  4. 每个周期增加密钥访问的值的数量

  5. The amount of time to sleep per cycle in seconds
  6. 每个周期睡眠的时间(以秒为单位)

The client then checks to see if the key exists. If it doesn't it creates the key on the syncdict. The client then enters an "endless" loop where it updates the key's value by the increment, sleeps the amount specified, and prints the syncdict only to repeat this process until a KeyboardInterrupt occurs (Ctrl+C).

然后客户端检查密钥是否存在。如果没有,它会在syncdict上创建密钥。然后客户端进入一个“无限”循环,它通过增量更新密钥的值,休眠指定的数量,并打印syncdict只重复此过程,直到发生KeyboardInterrupt(Ctrl + C)。

Annoying problems

  1. The Manager's register methods MUST be called before the manager is started otherwise you will get exceptions even though a dir call on the Manager will reveal that it indeed does have the method that was registered.
  2. 必须在管理器启动之前调用管理器的注册方法,否则即使管理器上的dir调用显示它确实具有已注册的方法,您也会获得异常。

  3. All manipulations of the dict must be done with methods and not dict assignments (syncdict["blast"] = 2 will fail miserably because of the way multiprocessing shares custom objects)
  4. dict的所有操作必须用方法完成而不是dict赋值(syncdict [“blast”] = 2会因为多处理共享自定义对象的方式而失败)

  5. Using SyncManager's dict method would alleviate annoying problem #2 except that annoying problem #1 prevents the proxy returned by SyncManager.dict() being registered and shared. (SyncManager.dict() can only be called AFTER the manager is started, and register will only work BEFORE the manager is started so SyncManager.dict() is only useful when doing functional programming and passing the proxy to Processes as an argument like the doc examples do)
  6. 使用SyncManager的dict方法可以减轻恼人的问题#2,除了恼人的问题#1阻止SyncManager.dict()返回的代理被注册和共享。 (SyncManager.dict()只能在管理器启动后调用,并且注册只能在管理器启动之前工作,因此SyncManager.dict()仅在进行函数式编程并将代理作为参数传递给Processes时才有用。 doc示例))

  7. The server AND the client both have to register even though intuitively it would seem like the client would just be able to figure it out after connecting to the manager (Please add this to your wish-list multiprocessing developers)
  8. 服务器和客户端都必须注册,即使直观地看起来客户端只能在连接到管理器后能够解决它(请将此添加到您的愿望列表多处理开发人员)

Closing

I hope you enjoyed this quite thorough and slightly time-consuming answer as much as I have. I was having a great deal of trouble getting straight in my mind why I was struggling so much with the multiprocessing module where Pyro makes it a breeze and now thanks to this answer I have hit the nail on the head. I hope this is useful to the python community on how to improve the multiprocessing module as I do believe it has a great deal of promise but in its infancy falls short of what is possible. Despite the annoying problems described I think this is still quite a viable alternative and is pretty simple. You could also use SyncManager.dict() and pass it to Processes as an argument the way the docs show and it would probably be an even simpler solution depending on your requirements it just feels unnatural to me.

我希望你能像我一样享受这个非常彻底和稍微耗时的答案。我很难直接理解为什么我在多处理模块中苦苦挣扎,Pyro让它变得轻而易举,而现在由于这个答案,我已经敲了敲门。我希望这对于如何改进多处理模块的python社区是有用的,因为我相信它有很大的希望,但在它的初期缺乏可能性。尽管描述了烦人的问题,但我认为这仍然是一个非常可行的替代方案并且非常简单。您也可以使用SyncManager.dict()并将其作为参数传递给过程,就像文档显示的方式一样,它可能是一个更简单的解决方案,这取决于您的要求,这对我来说感觉不自然。

#2


4  

I would dedicate a separate process to maintaining the "shared dict": just use e.g. xmlrpclib to make that tiny amount of code available to the other processes, exposing via xmlrpclib e.g. a function taking key, increment to perform the increment and one taking just the key and returning the value, with semantic details (is there a default value for missing keys, etc, etc) depending on your app's needs.

我会专门用一个单独的过程来维护“共享字典”:只需使用例如xmlrpclib使其他进程可以使用少量代码,通过xmlrpclib公开,例如:一个函数获取键,递增执行增量,一个只取键并返回值,具有语义细节(缺少键的默认值等,等等),具体取决于您的应用程序的需要。

Then you can use any approach you like to implement the shared-dict dedicated process: all the way from a single-threaded server with a simple dict in memory, to a simple sqlite DB, etc, etc. I suggest you start with code "as simple as you can get away with" (depending on whether you need a persistent shared dict, or persistence is not necessary to you), then measure and optimize as and if needed.

然后你可以使用你喜欢的任何方法来实现shared-dict专用进程:从内存中带有简单dict的单线程服务器到简单的sqlite DB等等。我建议你从代码开始“就像你可以逃脱一样简单“(取决于你是否需要一个持久的共享字典,或者你不需要持久性),然后根据需要进行测量和优化。

#3


4  

In response to an appropriate solution to the concurrent-write issue. I did very quick research and found that this article is suggesting a lock/semaphore solution. (http://effbot.org/zone/thread-synchronization.htm)

响应并发写入问题的适当解决方案。我做了很快的研究,发现这篇文章建议使用锁定/信号量解决方案。 (http://effbot.org/zone/thread-synchronization.htm)

While the example isn't specificity on a dictionary, I'm pretty sure you could code a class-based wrapper object to help you work with dictionaries based on this idea.

虽然这个例子不是字典上的特异性,但我很确定你可以编写一个基于类的包装器对象来帮助你根据这个想法使用字典。

If I had a requirement to implement something like this in a thread safe manner, I'd probably use the Python Semaphore solution. (Assuming my earlier merge technique wouldn't work.) I believe that semaphores generally slow down thread efficiencies due to their blocking nature.

如果我需要以线程安全的方式实现这样的东西,我可能会使用Python Semaphore解决方案。 (假设我之前的合并技术不起作用。)我相信信号量通常会因为它们的阻塞性质而降低线程效率。

From the site:

从网站:

A semaphore is a more advanced lock mechanism. A semaphore has an internal counter rather than a lock flag, and it only blocks if more than a given number of threads have attempted to hold the semaphore. Depending on how the semaphore is initialized, this allows multiple threads to access the same code section simultaneously.

信号量是一种更先进的锁定机制。信号量具有内部计数器而不是锁定标志,并且只有在超过给定数量的线程试图保持信号量时它才会阻塞。根据信号量的初始化方式,这允许多个线程同时访问相同的代码段。

semaphore = threading.BoundedSemaphore()
semaphore.acquire() # decrements the counter
... access the shared resource; work with dictionary, add item or whatever.
semaphore.release() # increments the counter

#4


3  

Is there a reason that the dictionary needs to be shared in the first place? Could you have each thread maintain their own instance of a dictionary and either merge at the end of the thread processing or periodically use a call-back to merge copies of the individual thread dictionaries together?

是否有理由首先需要共享字典?您是否可以让每个线程都维护自己的字典实例,并在线程处理结束时进行合并,或者定期使用回调将各个线程字典的副本合并在一起?

I don't know exactly what you are doing, so keep in my that my written plan may not work verbatim. What I'm suggesting is more of a high-level design idea.

我不确切知道你在做什么,所以请保持我的书面计划可能不会逐字逐句。我的建议更多的是高级设计理念。

#1


54  

Intro

There seems to be a lot of arm-chair suggestions and no working examples. None of the answers listed here even suggest using multiprocessing and this is quite a bit disappointing and disturbing. As python lovers we should support our built-in libraries, and while parallel processing and synchronization is never a trivial matter, I believe it can be made trivial with proper design. This is becoming extremely important in modern multi-core architectures and cannot be stressed enough! That said, I am far from satisfied with the multiprocessing library, as it is still in its infancy stages with quite a few pitfalls, bugs, and being geared towards functional programming (which I detest). Currently I still prefer the Pyro module (which is way ahead of its time) over multiprocessing due to multiprocessing's severe limitation in being unable to share newly created objects while the server is running. The "register" class-method of the manager objects will only actually register an object BEFORE the manager (or its server) is started. Enough chatter, more code:

似乎有很多扶手椅建议,没有工作实例。这里列出的答案都没有建议使用多处理,这有点令人失望和令人不安。作为python爱好者,我们应该支持我们的内置库,虽然并行处理和同步从来都不是一件小事,但我相信它可以通过适当的设计变得微不足道。这在现代多核架构中变得非常重要,并且不能过分强调!也就是说,我对多处理库很不满意,因为它仍然处于初期阶段,存在很多陷阱,错误,并且面向功能编程(我不喜欢)。目前,由于多处理在服务器运行时无法共享新创建的对象的严重限制,我仍然更喜欢Pyro模块(远远超过它的时间)。管理器对象的“register”类方法只会在管理器(或其服务器)启动之前实际注册一个对象。足够的喋喋不休,更多代码:

Server.py

from multiprocessing.managers import SyncManager


class MyManager(SyncManager):
    pass


syncdict = {}
def get_dict():
    return syncdict

if __name__ == "__main__":
    MyManager.register("syncdict", get_dict)
    manager = MyManager(("127.0.0.1", 5000), authkey="password")
    manager.start()
    raw_input("Press any key to kill server".center(50, "-"))
    manager.shutdown()

In the above code example, Server.py makes use of multiprocessing's SyncManager which can supply synchronized shared objects. This code will not work running in the interpreter because the multiprocessing library is quite touchy on how to find the "callable" for each registered object. Running Server.py will start a customized SyncManager that shares the syncdict dictionary for use of multiple processes and can be connected to clients either on the same machine, or if run on an IP address other than loopback, other machines. In this case the server is run on loopback (127.0.0.1) on port 5000. Using the authkey parameter uses secure connections when manipulating syncdict. When any key is pressed the manager is shutdown.

在上面的代码示例中,Server.py使用了多处理的SyncManager,它可以提供同步的共享对象。此代码无法在解释器中运行,因为多处理库对于如何为每个注册对象查找“可调用”非常敏感。运行Server.py将启动一个自定义SyncManager,该SyncManager共享syncdict字典以使用多个进程,并且可以在同一台计算机上连接到客户端,或者运行在除环回之外的IP地址上的其他计算机上。在这种情况下,服务器在端口5000上的环回(127.0.0.1)上运行。使用authkey参数在操作syncdict时使用安全连接。按下任何键时,管理器将关闭。

Client.py

from multiprocessing.managers import SyncManager
import sys, time

class MyManager(SyncManager):
    pass

MyManager.register("syncdict")

if __name__ == "__main__":
    manager = MyManager(("127.0.0.1", 5000), authkey="password")
    manager.connect()
    syncdict = manager.syncdict()

    print "dict = %s" % (dir(syncdict))
    key = raw_input("Enter key to update: ")
    inc = float(raw_input("Enter increment: "))
    sleep = float(raw_input("Enter sleep time (sec): "))

    try:
         #if the key doesn't exist create it
         if not syncdict.has_key(key):
             syncdict.update([(key, 0)])
         #increment key value every sleep seconds
         #then print syncdict
         while True:
              syncdict.update([(key, syncdict.get(key) + inc)])
              time.sleep(sleep)
              print "%s" % (syncdict)
    except KeyboardInterrupt:
         print "Killed client"

The client must also create a customized SyncManager, registering "syncdict", this time without passing in a callable to retrieve the shared dict. It then uses the customized SycnManager to connect using the loopback IP address (127.0.0.1) on port 5000 and an authkey establishing a secure connection to the manager started in Server.py. It retrieves the shared dict syncdict by calling the registered callable on the manager. It prompts the user for the following:

客户端还必须创建一个自定义的SyncManager,注册“syncdict”,这次没有传入一个callable来检索共享的dict。然后,它使用定制的SycnManager使用端口5000上的环回IP地址(127.0.0.1)进行连接,并使用authkey建立与Server.py中启动的管理器的安全连接。它通过调用管理器上注册的callable来检索共享的dict syncdict。它会提示用户输入以下内容:

  1. The key in syncdict to operate on
  2. 同步操作的关键

  3. The amount to increment the value accessed by the key every cycle
  4. 每个周期增加密钥访问的值的数量

  5. The amount of time to sleep per cycle in seconds
  6. 每个周期睡眠的时间(以秒为单位)

The client then checks to see if the key exists. If it doesn't it creates the key on the syncdict. The client then enters an "endless" loop where it updates the key's value by the increment, sleeps the amount specified, and prints the syncdict only to repeat this process until a KeyboardInterrupt occurs (Ctrl+C).

然后客户端检查密钥是否存在。如果没有,它会在syncdict上创建密钥。然后客户端进入一个“无限”循环,它通过增量更新密钥的值,休眠指定的数量,并打印syncdict只重复此过程,直到发生KeyboardInterrupt(Ctrl + C)。

Annoying problems

  1. The Manager's register methods MUST be called before the manager is started otherwise you will get exceptions even though a dir call on the Manager will reveal that it indeed does have the method that was registered.
  2. 必须在管理器启动之前调用管理器的注册方法,否则即使管理器上的dir调用显示它确实具有已注册的方法,您也会获得异常。

  3. All manipulations of the dict must be done with methods and not dict assignments (syncdict["blast"] = 2 will fail miserably because of the way multiprocessing shares custom objects)
  4. dict的所有操作必须用方法完成而不是dict赋值(syncdict [“blast”] = 2会因为多处理共享自定义对象的方式而失败)

  5. Using SyncManager's dict method would alleviate annoying problem #2 except that annoying problem #1 prevents the proxy returned by SyncManager.dict() being registered and shared. (SyncManager.dict() can only be called AFTER the manager is started, and register will only work BEFORE the manager is started so SyncManager.dict() is only useful when doing functional programming and passing the proxy to Processes as an argument like the doc examples do)
  6. 使用SyncManager的dict方法可以减轻恼人的问题#2,除了恼人的问题#1阻止SyncManager.dict()返回的代理被注册和共享。 (SyncManager.dict()只能在管理器启动后调用,并且注册只能在管理器启动之前工作,因此SyncManager.dict()仅在进行函数式编程并将代理作为参数传递给Processes时才有用。 doc示例))

  7. The server AND the client both have to register even though intuitively it would seem like the client would just be able to figure it out after connecting to the manager (Please add this to your wish-list multiprocessing developers)
  8. 服务器和客户端都必须注册,即使直观地看起来客户端只能在连接到管理器后能够解决它(请将此添加到您的愿望列表多处理开发人员)

Closing

I hope you enjoyed this quite thorough and slightly time-consuming answer as much as I have. I was having a great deal of trouble getting straight in my mind why I was struggling so much with the multiprocessing module where Pyro makes it a breeze and now thanks to this answer I have hit the nail on the head. I hope this is useful to the python community on how to improve the multiprocessing module as I do believe it has a great deal of promise but in its infancy falls short of what is possible. Despite the annoying problems described I think this is still quite a viable alternative and is pretty simple. You could also use SyncManager.dict() and pass it to Processes as an argument the way the docs show and it would probably be an even simpler solution depending on your requirements it just feels unnatural to me.

我希望你能像我一样享受这个非常彻底和稍微耗时的答案。我很难直接理解为什么我在多处理模块中苦苦挣扎,Pyro让它变得轻而易举,而现在由于这个答案,我已经敲了敲门。我希望这对于如何改进多处理模块的python社区是有用的,因为我相信它有很大的希望,但在它的初期缺乏可能性。尽管描述了烦人的问题,但我认为这仍然是一个非常可行的替代方案并且非常简单。您也可以使用SyncManager.dict()并将其作为参数传递给过程,就像文档显示的方式一样,它可能是一个更简单的解决方案,这取决于您的要求,这对我来说感觉不自然。

#2


4  

I would dedicate a separate process to maintaining the "shared dict": just use e.g. xmlrpclib to make that tiny amount of code available to the other processes, exposing via xmlrpclib e.g. a function taking key, increment to perform the increment and one taking just the key and returning the value, with semantic details (is there a default value for missing keys, etc, etc) depending on your app's needs.

我会专门用一个单独的过程来维护“共享字典”:只需使用例如xmlrpclib使其他进程可以使用少量代码,通过xmlrpclib公开,例如:一个函数获取键,递增执行增量,一个只取键并返回值,具有语义细节(缺少键的默认值等,等等),具体取决于您的应用程序的需要。

Then you can use any approach you like to implement the shared-dict dedicated process: all the way from a single-threaded server with a simple dict in memory, to a simple sqlite DB, etc, etc. I suggest you start with code "as simple as you can get away with" (depending on whether you need a persistent shared dict, or persistence is not necessary to you), then measure and optimize as and if needed.

然后你可以使用你喜欢的任何方法来实现shared-dict专用进程:从内存中带有简单dict的单线程服务器到简单的sqlite DB等等。我建议你从代码开始“就像你可以逃脱一样简单“(取决于你是否需要一个持久的共享字典,或者你不需要持久性),然后根据需要进行测量和优化。

#3


4  

In response to an appropriate solution to the concurrent-write issue. I did very quick research and found that this article is suggesting a lock/semaphore solution. (http://effbot.org/zone/thread-synchronization.htm)

响应并发写入问题的适当解决方案。我做了很快的研究,发现这篇文章建议使用锁定/信号量解决方案。 (http://effbot.org/zone/thread-synchronization.htm)

While the example isn't specificity on a dictionary, I'm pretty sure you could code a class-based wrapper object to help you work with dictionaries based on this idea.

虽然这个例子不是字典上的特异性,但我很确定你可以编写一个基于类的包装器对象来帮助你根据这个想法使用字典。

If I had a requirement to implement something like this in a thread safe manner, I'd probably use the Python Semaphore solution. (Assuming my earlier merge technique wouldn't work.) I believe that semaphores generally slow down thread efficiencies due to their blocking nature.

如果我需要以线程安全的方式实现这样的东西,我可能会使用Python Semaphore解决方案。 (假设我之前的合并技术不起作用。)我相信信号量通常会因为它们的阻塞性质而降低线程效率。

From the site:

从网站:

A semaphore is a more advanced lock mechanism. A semaphore has an internal counter rather than a lock flag, and it only blocks if more than a given number of threads have attempted to hold the semaphore. Depending on how the semaphore is initialized, this allows multiple threads to access the same code section simultaneously.

信号量是一种更先进的锁定机制。信号量具有内部计数器而不是锁定标志,并且只有在超过给定数量的线程试图保持信号量时它才会阻塞。根据信号量的初始化方式,这允许多个线程同时访问相同的代码段。

semaphore = threading.BoundedSemaphore()
semaphore.acquire() # decrements the counter
... access the shared resource; work with dictionary, add item or whatever.
semaphore.release() # increments the counter

#4


3  

Is there a reason that the dictionary needs to be shared in the first place? Could you have each thread maintain their own instance of a dictionary and either merge at the end of the thread processing or periodically use a call-back to merge copies of the individual thread dictionaries together?

是否有理由首先需要共享字典?您是否可以让每个线程都维护自己的字典实例,并在线程处理结束时进行合并,或者定期使用回调将各个线程字典的副本合并在一起?

I don't know exactly what you are doing, so keep in my that my written plan may not work verbatim. What I'm suggesting is more of a high-level design idea.

我不确切知道你在做什么,所以请保持我的书面计划可能不会逐字逐句。我的建议更多的是高级设计理念。