I'm using celery, I have several tasks which needed to be executed in order.
我正在使用芹菜,我有几个任务需要按顺序执行。
For example I have this task:
例如,我有一个任务:
@celery.task
def tprint(word):
print word
And I want to do something like this:
我想做这样的事情:
>>> chain(tprint.s('a') | tprint.s('b'))()
Then I get TypeError: tprint() takes exactly 1 argument (2 given)
.
然后我得到了TypeError: tprint()只接受一个参数(2给定)。
The same with chord, in this situation which I need a task to be executed after a group of tasks:
和和弦一样,在这种情况下,我需要一个任务在一组任务之后执行:
>>> chord([tprint.s('a'), tprint.s('b')])(tprint.s('c'))
So how to deal with this situation? I don't care the result of each task, but they need to be executed in order.
那么如何应对这种情况呢?我不关心每个任务的结果,但是它们需要按顺序执行。
Add a second parameter won't work:
添加第二个参数不起作用:
@celery.task
def tprint(word, ignore=None):
print word
>>> chain(tprint.s('a', 0) | tprint.s('b'))()
This will print out 'a' and 'None'.
这将打印出“a”和“None”。
4 个解决方案
#1
42
There is a built-in functionality to ignore result in chaining and others - immutable subtask. You can use .si() shortcut instead of .s() or .subtask(immutable=True)
有一个内置的功能可以忽略链接的结果和其他不可变的子任务。您可以使用.si()快捷方式而不是.s()或.subtask(不可变=True)
More details here: http://docs.celeryproject.org/en/master/userguide/canvas.html#immutability
更多细节在这里:http://docs.celeryproject.org/en/master/userguide/canvas.html #不变性
#2
2
One possible solution has already posted, but I'd like to add further clarification and an alternate solution (and in some cases a superior one).
一个可能的解决方案已经发布了,但是我想要添加进一步的澄清和一个替代的解决方案(在某些情况下是一个高级的解决方案)。
The error you're seeing, which indicates that your task's signature needs to take into account a second parameter, is due to the fact that when calling tasks in a chain
, Celery automatically pushes each tasks result
as the first parameter of the following task.
您所看到的错误,表明您的任务的签名需要考虑第二个参数,这是因为当调用链中的任务时,芹菜会自动将每个任务结果作为以下任务的第一个参数。
From the docs:
从文档:
Tasks can be linked together, which in practice means adding a callback task:
任务可以链接在一起,这在实践中意味着添加一个回调任务:
>>> res = add.apply_async((2, 2), link=mul.s(16))
>>> res.get()
4
The linked task will be applied with the result of its parent task as the first argument
将该链接任务应用于其父任务的结果作为第一个参数。
Therefore, in your case, you could rewrite your task like this:
因此,在你的情况下,你可以这样改写你的任务:
@celery.task
def tprint(result, word):
print word
If you're not going to do anything with the result, you may as well ignore it, by
changing the decorator thus:
如果你不打算做任何事情的结果,你可以忽略它,通过改变装饰者这样:
@celery.task(ignore_result=True)
And then you won't have to change your task signature.
然后你就不用改变你的任务签名了。
Sorry, that last point needs further research.
对不起,最后一点需要进一步研究。
#3
-1
You can try doing something like this. Instead of having a single parameter for function tprint you can have 2 parameters
你可以试着这样做。没有一个函数tprint参数,你可以有两个参数。
def tprint(word, x=None):
print word
then
然后
chain(tprint.s('a', 0) | tprint.s('b'))()
#4
-2
Finally find a workaround for this, a chain decorator will do this job.
最后找到一个变通方法,一个链式装饰器就可以完成这项工作。
I don't know how exactly celery did it, but celery seems force bind previous task's result to next task's first argument.
我不知道芹菜究竟是怎么做到的,但是芹菜似乎把先前任务的结果与下一个任务的第一个论点联系起来。
So here's an example:
这里有一个例子:
def chain_deco(func):
@functools.wraps(func)
def wrapper(chain=None, *args, **kwargs):
if chain is False:
return False
func(*args, **kwargs)
return True
return wrapper
@celery.task
@chain_deco
def hello(word):
print "hello %s" % word
Now this will give the right output.
这将给出正确的输出。
>>> (hello.s(word='a') | hello.s(word='b'))()
OR
或
>>> (hello.s('a') | hello.s('b'))(True)
And decorator also provide the ability to stop a chain in the middle(make the later cascade fail.)
而且decorator还提供了在中间停止链的能力(使后级级联失败)。
The same mechanism should work for chord
too.
同样的机制也应该适用于和弦。
#1
42
There is a built-in functionality to ignore result in chaining and others - immutable subtask. You can use .si() shortcut instead of .s() or .subtask(immutable=True)
有一个内置的功能可以忽略链接的结果和其他不可变的子任务。您可以使用.si()快捷方式而不是.s()或.subtask(不可变=True)
More details here: http://docs.celeryproject.org/en/master/userguide/canvas.html#immutability
更多细节在这里:http://docs.celeryproject.org/en/master/userguide/canvas.html #不变性
#2
2
One possible solution has already posted, but I'd like to add further clarification and an alternate solution (and in some cases a superior one).
一个可能的解决方案已经发布了,但是我想要添加进一步的澄清和一个替代的解决方案(在某些情况下是一个高级的解决方案)。
The error you're seeing, which indicates that your task's signature needs to take into account a second parameter, is due to the fact that when calling tasks in a chain
, Celery automatically pushes each tasks result
as the first parameter of the following task.
您所看到的错误,表明您的任务的签名需要考虑第二个参数,这是因为当调用链中的任务时,芹菜会自动将每个任务结果作为以下任务的第一个参数。
From the docs:
从文档:
Tasks can be linked together, which in practice means adding a callback task:
任务可以链接在一起,这在实践中意味着添加一个回调任务:
>>> res = add.apply_async((2, 2), link=mul.s(16))
>>> res.get()
4
The linked task will be applied with the result of its parent task as the first argument
将该链接任务应用于其父任务的结果作为第一个参数。
Therefore, in your case, you could rewrite your task like this:
因此,在你的情况下,你可以这样改写你的任务:
@celery.task
def tprint(result, word):
print word
If you're not going to do anything with the result, you may as well ignore it, by
changing the decorator thus:
如果你不打算做任何事情的结果,你可以忽略它,通过改变装饰者这样:
@celery.task(ignore_result=True)
And then you won't have to change your task signature.
然后你就不用改变你的任务签名了。
Sorry, that last point needs further research.
对不起,最后一点需要进一步研究。
#3
-1
You can try doing something like this. Instead of having a single parameter for function tprint you can have 2 parameters
你可以试着这样做。没有一个函数tprint参数,你可以有两个参数。
def tprint(word, x=None):
print word
then
然后
chain(tprint.s('a', 0) | tprint.s('b'))()
#4
-2
Finally find a workaround for this, a chain decorator will do this job.
最后找到一个变通方法,一个链式装饰器就可以完成这项工作。
I don't know how exactly celery did it, but celery seems force bind previous task's result to next task's first argument.
我不知道芹菜究竟是怎么做到的,但是芹菜似乎把先前任务的结果与下一个任务的第一个论点联系起来。
So here's an example:
这里有一个例子:
def chain_deco(func):
@functools.wraps(func)
def wrapper(chain=None, *args, **kwargs):
if chain is False:
return False
func(*args, **kwargs)
return True
return wrapper
@celery.task
@chain_deco
def hello(word):
print "hello %s" % word
Now this will give the right output.
这将给出正确的输出。
>>> (hello.s(word='a') | hello.s(word='b'))()
OR
或
>>> (hello.s('a') | hello.s('b'))(True)
And decorator also provide the ability to stop a chain in the middle(make the later cascade fail.)
而且decorator还提供了在中间停止链的能力(使后级级联失败)。
The same mechanism should work for chord
too.
同样的机制也应该适用于和弦。