By parametric I mean a lock that can be used with a parameter and lock only the threads that use the same parameter. The parameter can have thousands of different values, and it's impossible to create a dictionary of threading.Lock
objects, one per parameter value.
通过参数我是指可以与参数一起使用的锁,并且仅锁定使用相同参数的线程。该参数可以包含数千个不同的值,并且无法创建threading.Lock对象的字典,每个参数值一个。
A function on my web server accepts two parameters, a group and something else. Every time the function is called it checks if a group file has changed (very fast), and if it has changed it does something with it (very slow). The slow process for each group is completely independent from the other groups and can happen at the same time, but two elements of each group cannot be processed at the same time, nor while the group is being processed.
我的Web服务器上的一个函数接受两个参数,一个组和其他东西。每次调用该函数时,它都会检查组文件是否已更改(非常快),如果已更改,则会对其执行某些操作(非常慢)。每个组的缓慢进程完全独立于其他组,并且可以同时发生,但是每个组的两个元素不能同时处理,也不能在处理组时处理。
I was able to get it to work using a global list of groups being processed, but now that I'm done with it I think it's really ugly and there must be a better way.
我能够使用正在处理的组的全局列表来使其工作,但是现在我已经完成了它,我认为它真的很难看并且必须有更好的方法。
The snippet below shows what I'm looking for. It uses an imaginary LockWithGroup
. Is there something similar in Python?
下面的代码段显示了我正在寻找的内容。它使用虚构的LockWithGroup。 Python中有类似的东西吗?
process_lock = threading.LockWithGroup()
def process_element(group, element):
print('Start', group)
with process_lock(group):
if needs_update(group):
print('Updating', group)
update_group(group)
print('Updated', group)
with process_lock(group):
retval = do_something_with(group, element)
print('End', group)
return retval
process_element('g1', e1) # a
process_element('g1', e2) # b
process_element('g1', e3) # c
process_element('g2', e4) # d
process_element('g2', e5) # e
Output:
> Start g1 # a
> Start g2 # d
> Updating g1 # a
> Updating g2 # d
> Updated g1 # a
> End g1 # a
> Start g1 # b
> End g1 # b
> Start g1 # c
> Updated g2 # d
> End g2 # d
> Start g2 # e
> End g1 # c
> End g2 # e
1 个解决方案
#1
0
Inspired by the answer to the post mentioned in the comment I created a class that seems to do the job.
受到评论中提到的帖子答案的启发,我创建了一个似乎可以完成这项工作的课程。
I used the code from that answer, added timeout
and blocking
arguments and put it inside a class so I can use it as a context manager. The class uses static methods, so it can be instantiated once or it can be created many times (as the test below does in slow_worker_2
).
我使用了该答案中的代码,添加了超时和阻塞参数并将其放在一个类中,因此我可以将其用作上下文管理器。该类使用静态方法,因此可以实例化一次,也可以多次创建(如下面的测试在slow_worker_2中所做的那样)。
The first part of the code is the class, the second part tests both explicit acquire
and release
and the context manager with with
.
代码的第一部分是类,第二部分测试显式获取和释放以及带有的上下文管理器。
import threading
import time
namespace_lock = threading.Lock()
namespace = {}
counters = {}
class NamespaceLock:
def __init__(self, group):
self.group = group
def __enter__(self):
self.__class__.acquire_lock(self.group)
def __exit__(self, exc_type, exc_val, exc_tb):
self.__class__.release_lock(self.group)
@staticmethod
def acquire_lock(value, blocking=True, timeout=-1.0):
with namespace_lock:
if value in namespace:
counters[value] += 1
else:
namespace[value] = threading.Lock()
counters[value] = 1
return namespace[value].acquire(blocking=blocking, timeout=timeout)
@staticmethod
def release_lock(value):
with namespace_lock:
if counters[value] == 1:
del counters[value]
lock = namespace.pop(value)
else:
counters[value] -= 1
lock = namespace[value]
lock.release()
def slow_worker_1(group, seconds):
if NamespaceLock.acquire_lock(group, timeout=2.5):
print('Start {} {}'.format(group, seconds))
time.sleep(seconds)
print('End {} {}'.format(group, seconds))
NamespaceLock.release_lock(group)
else:
print('Timeout {} {}'.format(group, seconds))
def slow_worker_2(group, seconds):
with NamespaceLock(group):
print('Start {} {}'.format(group, seconds))
time.sleep(seconds)
print('End {} {}'.format(group, seconds))
def join_all(name):
for t in threading.enumerate():
if t.name == name:
t.join()
if __name__ == '__main__':
print('explicit acquire and release')
threading.Thread(target=slow_worker_1, args=('g1', 1), name='worker').start()
threading.Thread(target=slow_worker_1, args=('g2', 2), name='worker').start()
threading.Thread(target=slow_worker_1, args=('g1', 3), name='worker').start()
threading.Thread(target=slow_worker_1, args=('g2', 4), name='worker').start()
threading.Thread(target=slow_worker_1, args=('g1', 5), name='worker').start()
join_all('worker')
print('context manager')
threading.Thread(target=slow_worker_2, args=('g1', 1), name='worker').start()
threading.Thread(target=slow_worker_2, args=('g2', 2), name='worker').start()
threading.Thread(target=slow_worker_2, args=('g1', 3), name='worker').start()
threading.Thread(target=slow_worker_2, args=('g2', 4), name='worker').start()
threading.Thread(target=slow_worker_2, args=('g1', 5), name='worker').start()
join_all('worker')
#1
0
Inspired by the answer to the post mentioned in the comment I created a class that seems to do the job.
受到评论中提到的帖子答案的启发,我创建了一个似乎可以完成这项工作的课程。
I used the code from that answer, added timeout
and blocking
arguments and put it inside a class so I can use it as a context manager. The class uses static methods, so it can be instantiated once or it can be created many times (as the test below does in slow_worker_2
).
我使用了该答案中的代码,添加了超时和阻塞参数并将其放在一个类中,因此我可以将其用作上下文管理器。该类使用静态方法,因此可以实例化一次,也可以多次创建(如下面的测试在slow_worker_2中所做的那样)。
The first part of the code is the class, the second part tests both explicit acquire
and release
and the context manager with with
.
代码的第一部分是类,第二部分测试显式获取和释放以及带有的上下文管理器。
import threading
import time
namespace_lock = threading.Lock()
namespace = {}
counters = {}
class NamespaceLock:
def __init__(self, group):
self.group = group
def __enter__(self):
self.__class__.acquire_lock(self.group)
def __exit__(self, exc_type, exc_val, exc_tb):
self.__class__.release_lock(self.group)
@staticmethod
def acquire_lock(value, blocking=True, timeout=-1.0):
with namespace_lock:
if value in namespace:
counters[value] += 1
else:
namespace[value] = threading.Lock()
counters[value] = 1
return namespace[value].acquire(blocking=blocking, timeout=timeout)
@staticmethod
def release_lock(value):
with namespace_lock:
if counters[value] == 1:
del counters[value]
lock = namespace.pop(value)
else:
counters[value] -= 1
lock = namespace[value]
lock.release()
def slow_worker_1(group, seconds):
if NamespaceLock.acquire_lock(group, timeout=2.5):
print('Start {} {}'.format(group, seconds))
time.sleep(seconds)
print('End {} {}'.format(group, seconds))
NamespaceLock.release_lock(group)
else:
print('Timeout {} {}'.format(group, seconds))
def slow_worker_2(group, seconds):
with NamespaceLock(group):
print('Start {} {}'.format(group, seconds))
time.sleep(seconds)
print('End {} {}'.format(group, seconds))
def join_all(name):
for t in threading.enumerate():
if t.name == name:
t.join()
if __name__ == '__main__':
print('explicit acquire and release')
threading.Thread(target=slow_worker_1, args=('g1', 1), name='worker').start()
threading.Thread(target=slow_worker_1, args=('g2', 2), name='worker').start()
threading.Thread(target=slow_worker_1, args=('g1', 3), name='worker').start()
threading.Thread(target=slow_worker_1, args=('g2', 4), name='worker').start()
threading.Thread(target=slow_worker_1, args=('g1', 5), name='worker').start()
join_all('worker')
print('context manager')
threading.Thread(target=slow_worker_2, args=('g1', 1), name='worker').start()
threading.Thread(target=slow_worker_2, args=('g2', 2), name='worker').start()
threading.Thread(target=slow_worker_2, args=('g1', 3), name='worker').start()
threading.Thread(target=slow_worker_2, args=('g2', 4), name='worker').start()
threading.Thread(target=slow_worker_2, args=('g1', 5), name='worker').start()
join_all('worker')