Celery任务不会在Django测试中抛出异常

时间:2021-01-22 19:19:11

I have a couple of celery tasks that are included in my Django tests. Unfortunately exceptions are not thrown when tasks are invoked via .delay(). I am setting CELERY_ALWAYS_EAGER to True.

我有几个芹菜任务,包含在我的Django测试中。不幸的是,当通过.delay()调用任务时,不会抛出异常。我将CELERY_ALWAYS_EAGER设置为True。

tasks.py

tasks.py

import celeryapp as app

@app.task()
def exception_task():
    print 'CELERY_ALWAYS_EAGER:', app.conf['CELERY_ALWAYS_EAGER']
    raise Exception('foo')

tests.py

tests.py

def test_exception_in_task(self):
        from tasks import exception_task
        exception_task.delay()

Output

产量

CELERY_ALWAYS_EAGER: True
.
----------------------------------------------------------------------
Ran 1 test in 0.686s

When removing the .delay the test exits with an error as excpected:

删除.delay时,测试将退出并出现错误:

ERROR: test_exception_in_task
Exception: foo

Versions

版本

celery==3.1.4
Django==1.6.4

2 个解决方案

#1


10  

Seems I additionally had to set CELERY_EAGER_PROPAGATES_EXCEPTIONS to True.

似乎我还必须将CELERY_EAGER_PROPAGATES_EXCEPTIONS设置为True。

#2


2  

Under celery 4.0, I had to use CELERY_TASK_EAGER_PROPAGATES

在celery 4.0下,我不得不使用CELERY_TASK_EAGER_PROPAGATES

#1


10  

Seems I additionally had to set CELERY_EAGER_PROPAGATES_EXCEPTIONS to True.

似乎我还必须将CELERY_EAGER_PROPAGATES_EXCEPTIONS设置为True。

#2


2  

Under celery 4.0, I had to use CELERY_TASK_EAGER_PROPAGATES

在celery 4.0下,我不得不使用CELERY_TASK_EAGER_PROPAGATES