你如何对Celery任务进行单元测试?

时间:2022-12-22 02:30:26

The Celery documentation mentions testing Celery within Django but doesn't explain how to test a Celery task if you are not using Django. How do you do this?

Celery文档提到在Django中测试Celery,但如果你没有使用Django,则没有解释如何测试Celery任务。你怎么做到这一点?

8 个解决方案

#1


41  

It is possible to test tasks synchronously using any unittest lib out there. I normaly do 2 different test sessions when working with celery tasks. The first one (as I'm suggesting bellow) is completely synchronous and should be the one that makes sure the algorithm does what it should do. The second session uses the whole system (including the broker) and makes sure I'm not having serialization issues or any other distribution, comunication problem.

可以使用任何unittest lib同步测试任务。在使用芹菜任务时,我正常做两个不同的测试会议。第一个(正如我建议的那样)是完全同步的,应该是确保算法做它应该做的事情。第二个会话使用整个系统(包括代理)并确保我没有序列化问题或任何其他分发,通信问题。

So:

所以:

from celery import Celery

celery = Celery()

@celery.task
def add(x, y):
    return x + y

And your test:

而你的测试:

from nose.tools import eq_

def test_add_task():
    rst = add.apply(args=(4, 4)).get()
    eq_(rst, 8)

Hope that helps!

希望有所帮助!

#2


45  

I use this:

我用这个:

with mock.patch('celeryconfig.CELERY_ALWAYS_EAGER', True, create=True):
    ...

Docs: http://docs.celeryproject.org/en/3.1/configuration.html#celery-always-eager

文档:http://docs.celeryproject.org/en/3.1/configuration.html#celery-always-eager

CELERY_ALWAYS_EAGER lets you run your task synchronous, and you don't need a celery server.

CELERY_ALWAYS_EAGER允许您同步运行任务,而不需要芹菜服务器。

#3


25  

Depends on what exactly you want to be testing.

取决于您想要测试的内容。

  • Test the task code directly. Don't call "task.delay(...)" just call "task(...)" from your unit tests.
  • 直接测试任务代码。不要调用“task.delay(...)”从单元测试中调用“task(...)”。
  • Use CELERY_ALWAYS_EAGER. This will cause your tasks to be called immediately at the point you say "task.delay(...)", so you can test the whole path (but not any asynchronous behavior).
  • 使用CELERY_ALWAYS_EAGER。这将导致在您说“task.delay(...)”时立即调用您的任务,因此您可以测试整个路径(但不是任何异步行为)。

#4


18  

unittest

import unittest

from myproject.myapp import celeryapp

class TestMyCeleryWorker(unittest.TestCase):

  def setUp(self):
      celeryapp.conf.update(CELERY_ALWAYS_EAGER=True)

py.test fixtures

# conftest.py
from myproject.myapp import celeryapp

@pytest.fixture(scope='module')
def celery_app(request):
    celeryapp.conf.update(CELERY_ALWAYS_EAGER=True)
    return celeryapp

# test_tasks.py
def test_some_task(celery_app):
    ...

Addendum: make send_task respect eager

from celery import current_app

def send_task(name, args=(), kwargs={}, **opts):
    # https://github.com/celery/celery/issues/581
    task = current_app.tasks[name]
    return task.apply(args, kwargs, **opts)

current_app.send_task = send_task

#5


15  

For those on Celery 4 it's:

对于芹菜4上的人来说:

@override_settings(CELERY_TASK_ALWAYS_EAGER=True)

Because the settings names have been changed and need updating if you choose to upgrade, see

由于设置名称已更改,如果您选择升级需要更新,请参阅

http://docs.celeryproject.org/en/latest/whatsnew-4.0.html#lowercase-setting-names

http://docs.celeryproject.org/en/latest/whatsnew-4.0.html#lowercase-setting-names

#6


14  

As of Celery 3.0, one way to set CELERY_ALWAYS_EAGER in Django is:

从Celery 3.0开始,在Django中设置CELERY_ALWAYS_EAGER的一种方法是:

from django.test import TestCase, override_settings

from .foo import foo_celery_task

class MyTest(TestCase):

    @override_settings(CELERY_ALWAYS_EAGER=True)
    def test_foo(self):
        self.assertTrue(foo_celery_task.delay())

#7


3  

In my case (and I assume many others), all I wanted was to test the inner logic of a task using pytest.

在我的情况下(我假设许多其他人),我想要的只是使用pytest测试任务的内部逻辑。

TL;DR; ended up mocking everything away (OPTION 2)

TL; DR;最终嘲笑一切(选项2)


Example Use Case:

示例用例:

proj/tasks.py

PROJ / tasks.py

@shared_task(bind=True)
def add_task(self, a, b):
    return a+b;

tests/test_tasks.py

测试/ test_tasks.py

from proj import add_task

def test_add():
    assert add_task(1, 2) == 3, '1 + 2 should equal 3'

but, since shared_task decorator does a lot of celery internal logic, it isn't really a unit tests.

但是,由于shared_task装饰器做了很多芹菜内部逻辑,它实际上并不是单元测试。

So, for me, there were 2 options:

所以,对我来说,有两个选择:

OPTION 1: Separate internal logic

选项1:单独的内部逻辑

proj/tasks_logic.py

PROJ / tasks_logic.py

def internal_add(a, b):
    return a + b;

proj/tasks.py

PROJ / tasks.py

from .tasks_logic import internal_add

@shared_task(bind=True)
def add_task(self, a, b):
    return internal_add(a, b);

This looks very odd, and other than making it less readable, it requires to manually extract and pass attributes that are part of the request, for instance the task_id in case you need it, which make the logic less pure.

这看起来很奇怪,除了使其不太可读之外,它还需要手动提取和传递属于请求的属性,例如在需要时使用task_id,这会使逻辑变得不那么纯净。

OPTION 2: mocks
mocking away celery internals

选项2:嘲笑剔除芹菜内部

tests/__init__.py

测试/ __ init__.py

# noinspection PyUnresolvedReferences
from celery import shared_task

from mock import patch


def mock_signature(**kwargs):
    return {}


def mocked_shared_task(*decorator_args, **decorator_kwargs):
    def mocked_shared_decorator(func):
        func.signature = func.si = func.s = mock_signature
        return func

    return mocked_shared_decorator

patch('celery.shared_task', mocked_shared_task).start()

which then allows me to mock the request object (again, in case you need things from the request, like the id, or the retries counter.

然后允许我模拟请求对象(同样,如果你需要来自请求的东西,比如id或重试计数器)。

tests/test_tasks.py

测试/ test_tasks.py

from proj import add_task

class MockedRequest:
    def __init__(self, id=None):
        self.id = id or 1


class MockedTask:
    def __init__(self, id=None):
        self.request = MockedRequest(id=id)


def test_add():
    mocked_task = MockedTask(id=3)
    assert add_task(mocked_task, 1, 2) == 3, '1 + 2 should equal 3'

This solution is much more manual, but, it gives me the control I need to actually unit test, without repeating myself, and without losing the celery scope.

这个解决方案更加手动,但是,它为我提供了实际单元测试所需的控制,无需重复自己,也不会失去芹菜范围。

#8


1  

Since Celery v4.0, py.test fixtures are provided to start a celery worker just for the test and shutdown when done:

从Celery v4.0开始,提供了py.test灯具,以便在完成测试和关机时启动芹菜工作者:

def test_myfunc_is_executed(celery_session_worker):
    # celery_session_worker: <Worker: gen93553@gnpill.local (running)>
    assert myfunc.delay().wait(3)

Among other fixtures described on http://docs.celeryproject.org/en/latest/userguide/testing.html#py-test, you can change the celery default options by redefining the celery_config fixture this way:

在http://docs.celeryproject.org/en/latest/userguide/testing.html#py-test中描述的其他灯具中,您可以通过以下方式重新定义celery_config灯具来更改芹菜默认选项:

@pytest.fixture(scope='session')
def celery_config():
    return {
        'accept_content': ['json', 'pickle'],
        'result_serializer': 'pickle',
    }

By default, the test worker uses an in-memory broker and result backend. No need to use a local Redis or RabbitMQ if not testing specific features.

默认情况下,测试工作者使用内存中介和结果后端。如果不测试特定功能,则无需使用本地Redis或RabbitMQ。

#1


41  

It is possible to test tasks synchronously using any unittest lib out there. I normaly do 2 different test sessions when working with celery tasks. The first one (as I'm suggesting bellow) is completely synchronous and should be the one that makes sure the algorithm does what it should do. The second session uses the whole system (including the broker) and makes sure I'm not having serialization issues or any other distribution, comunication problem.

可以使用任何unittest lib同步测试任务。在使用芹菜任务时,我正常做两个不同的测试会议。第一个(正如我建议的那样)是完全同步的,应该是确保算法做它应该做的事情。第二个会话使用整个系统(包括代理)并确保我没有序列化问题或任何其他分发,通信问题。

So:

所以:

from celery import Celery

celery = Celery()

@celery.task
def add(x, y):
    return x + y

And your test:

而你的测试:

from nose.tools import eq_

def test_add_task():
    rst = add.apply(args=(4, 4)).get()
    eq_(rst, 8)

Hope that helps!

希望有所帮助!

#2


45  

I use this:

我用这个:

with mock.patch('celeryconfig.CELERY_ALWAYS_EAGER', True, create=True):
    ...

Docs: http://docs.celeryproject.org/en/3.1/configuration.html#celery-always-eager

文档:http://docs.celeryproject.org/en/3.1/configuration.html#celery-always-eager

CELERY_ALWAYS_EAGER lets you run your task synchronous, and you don't need a celery server.

CELERY_ALWAYS_EAGER允许您同步运行任务,而不需要芹菜服务器。

#3


25  

Depends on what exactly you want to be testing.

取决于您想要测试的内容。

  • Test the task code directly. Don't call "task.delay(...)" just call "task(...)" from your unit tests.
  • 直接测试任务代码。不要调用“task.delay(...)”从单元测试中调用“task(...)”。
  • Use CELERY_ALWAYS_EAGER. This will cause your tasks to be called immediately at the point you say "task.delay(...)", so you can test the whole path (but not any asynchronous behavior).
  • 使用CELERY_ALWAYS_EAGER。这将导致在您说“task.delay(...)”时立即调用您的任务,因此您可以测试整个路径(但不是任何异步行为)。

#4


18  

unittest

import unittest

from myproject.myapp import celeryapp

class TestMyCeleryWorker(unittest.TestCase):

  def setUp(self):
      celeryapp.conf.update(CELERY_ALWAYS_EAGER=True)

py.test fixtures

# conftest.py
from myproject.myapp import celeryapp

@pytest.fixture(scope='module')
def celery_app(request):
    celeryapp.conf.update(CELERY_ALWAYS_EAGER=True)
    return celeryapp

# test_tasks.py
def test_some_task(celery_app):
    ...

Addendum: make send_task respect eager

from celery import current_app

def send_task(name, args=(), kwargs={}, **opts):
    # https://github.com/celery/celery/issues/581
    task = current_app.tasks[name]
    return task.apply(args, kwargs, **opts)

current_app.send_task = send_task

#5


15  

For those on Celery 4 it's:

对于芹菜4上的人来说:

@override_settings(CELERY_TASK_ALWAYS_EAGER=True)

Because the settings names have been changed and need updating if you choose to upgrade, see

由于设置名称已更改,如果您选择升级需要更新,请参阅

http://docs.celeryproject.org/en/latest/whatsnew-4.0.html#lowercase-setting-names

http://docs.celeryproject.org/en/latest/whatsnew-4.0.html#lowercase-setting-names

#6


14  

As of Celery 3.0, one way to set CELERY_ALWAYS_EAGER in Django is:

从Celery 3.0开始,在Django中设置CELERY_ALWAYS_EAGER的一种方法是:

from django.test import TestCase, override_settings

from .foo import foo_celery_task

class MyTest(TestCase):

    @override_settings(CELERY_ALWAYS_EAGER=True)
    def test_foo(self):
        self.assertTrue(foo_celery_task.delay())

#7


3  

In my case (and I assume many others), all I wanted was to test the inner logic of a task using pytest.

在我的情况下(我假设许多其他人),我想要的只是使用pytest测试任务的内部逻辑。

TL;DR; ended up mocking everything away (OPTION 2)

TL; DR;最终嘲笑一切(选项2)


Example Use Case:

示例用例:

proj/tasks.py

PROJ / tasks.py

@shared_task(bind=True)
def add_task(self, a, b):
    return a+b;

tests/test_tasks.py

测试/ test_tasks.py

from proj import add_task

def test_add():
    assert add_task(1, 2) == 3, '1 + 2 should equal 3'

but, since shared_task decorator does a lot of celery internal logic, it isn't really a unit tests.

但是,由于shared_task装饰器做了很多芹菜内部逻辑,它实际上并不是单元测试。

So, for me, there were 2 options:

所以,对我来说,有两个选择:

OPTION 1: Separate internal logic

选项1:单独的内部逻辑

proj/tasks_logic.py

PROJ / tasks_logic.py

def internal_add(a, b):
    return a + b;

proj/tasks.py

PROJ / tasks.py

from .tasks_logic import internal_add

@shared_task(bind=True)
def add_task(self, a, b):
    return internal_add(a, b);

This looks very odd, and other than making it less readable, it requires to manually extract and pass attributes that are part of the request, for instance the task_id in case you need it, which make the logic less pure.

这看起来很奇怪,除了使其不太可读之外,它还需要手动提取和传递属于请求的属性,例如在需要时使用task_id,这会使逻辑变得不那么纯净。

OPTION 2: mocks
mocking away celery internals

选项2:嘲笑剔除芹菜内部

tests/__init__.py

测试/ __ init__.py

# noinspection PyUnresolvedReferences
from celery import shared_task

from mock import patch


def mock_signature(**kwargs):
    return {}


def mocked_shared_task(*decorator_args, **decorator_kwargs):
    def mocked_shared_decorator(func):
        func.signature = func.si = func.s = mock_signature
        return func

    return mocked_shared_decorator

patch('celery.shared_task', mocked_shared_task).start()

which then allows me to mock the request object (again, in case you need things from the request, like the id, or the retries counter.

然后允许我模拟请求对象(同样,如果你需要来自请求的东西,比如id或重试计数器)。

tests/test_tasks.py

测试/ test_tasks.py

from proj import add_task

class MockedRequest:
    def __init__(self, id=None):
        self.id = id or 1


class MockedTask:
    def __init__(self, id=None):
        self.request = MockedRequest(id=id)


def test_add():
    mocked_task = MockedTask(id=3)
    assert add_task(mocked_task, 1, 2) == 3, '1 + 2 should equal 3'

This solution is much more manual, but, it gives me the control I need to actually unit test, without repeating myself, and without losing the celery scope.

这个解决方案更加手动,但是,它为我提供了实际单元测试所需的控制,无需重复自己,也不会失去芹菜范围。

#8


1  

Since Celery v4.0, py.test fixtures are provided to start a celery worker just for the test and shutdown when done:

从Celery v4.0开始,提供了py.test灯具,以便在完成测试和关机时启动芹菜工作者:

def test_myfunc_is_executed(celery_session_worker):
    # celery_session_worker: <Worker: gen93553@gnpill.local (running)>
    assert myfunc.delay().wait(3)

Among other fixtures described on http://docs.celeryproject.org/en/latest/userguide/testing.html#py-test, you can change the celery default options by redefining the celery_config fixture this way:

在http://docs.celeryproject.org/en/latest/userguide/testing.html#py-test中描述的其他灯具中,您可以通过以下方式重新定义celery_config灯具来更改芹菜默认选项:

@pytest.fixture(scope='session')
def celery_config():
    return {
        'accept_content': ['json', 'pickle'],
        'result_serializer': 'pickle',
    }

By default, the test worker uses an in-memory broker and result backend. No need to use a local Redis or RabbitMQ if not testing specific features.

默认情况下,测试工作者使用内存中介和结果后端。如果不测试特定功能,则无需使用本地Redis或RabbitMQ。