是否可以在芹菜周期任务上运行单元测试?

时间:2022-08-20 19:17:21

I'm trying to run a unit test on a celery task that I have set to run daily.
I have tried importing the function and calling it in my test but this doesn't work.

我试着在一个芹菜任务上运行一个单元测试,我每天都要运行这个测试。我已经尝试导入这个函数,并在测试中调用它,但这不起作用。

The task is:

的任务是:

@shared_task

def create_a_notification_if_a_product_is_in_or_out_of_season():
    """
    Send a notification if a product is now in or out of season
    """
    julian_date = date.today().timetuple().tm_yday + 1
    active_products = Product.objects.filter(status='ACTIVE')

    for products in active_products:
        in_season_prd = ProductDescription.objects.filter(
            product=products, 
            early_start_julian=julian_date
        )
        for prd in in_season_prd:
            notification = Notification()
            notification.type = notification_choices.PRODUCT_IN_SEASON
            notification.description = str(prd.product.name) + " will be in season from tomorrow."
            notification.save()

and here is an example of one of my tests:

这是我的一个测试的例子

def test_when_product_is_about_to_come_in_to_seasonality(self):
    """
    Make a notification when a product is due to come in to seasonality tomorrow
    """
    p = Product.objects.first()
    p.status = "ACTIVE"
    today = date.today().timetuple().tm_yday
    p.early_start_julian = today + 1
    create_a_notification_if_a_product_is_in_or_out_of_season()
    updated_notifications = Notification.objects.all().count()
    self.assertNotEqual(self.current_notifications, updated_notifications)

Any help would be appreciated!

任何帮助都将被感激!

Thanks

谢谢

2 个解决方案

#1


0  

You can apply() your celery task to execute it synchronously:

您可以应用()您的芹菜任务来同步执行它:

def test_when_product_is_about_to_come_in_to_seasonality(self):
    """
    Make a notification when a product is due to come in to seasonality tomorrow
    """
    p = Product.objects.first()
    p.status = "ACTIVE"
    today = date.today().timetuple().tm_yday
    p.early_start_julian = today + 1
    create_a_notification_if_a_product_is_in_or_out_of_season.apply()
    updated_notifications = Notification.objects.all().count()
    self.assertNotEqual(self.current_notifications, updated_notifications)

#2


0  

I think you're looking for CELERY_ALWAYS_EAGER setting. If set to True it will run your tasks synchronously. You can set it in your test settings or you can decorate only that test with @override_settings(CELERY_ALWAYS_EAGER=True)

我想你是在寻找CELERY_ALWAYS_EAGER设置。如果设置为真,它将同步运行您的任务。您可以将其设置在测试设置中,或者您可以只使用@override_settings来修饰该测试(CELERY_ALWAYS_EAGER=True)

#1


0  

You can apply() your celery task to execute it synchronously:

您可以应用()您的芹菜任务来同步执行它:

def test_when_product_is_about_to_come_in_to_seasonality(self):
    """
    Make a notification when a product is due to come in to seasonality tomorrow
    """
    p = Product.objects.first()
    p.status = "ACTIVE"
    today = date.today().timetuple().tm_yday
    p.early_start_julian = today + 1
    create_a_notification_if_a_product_is_in_or_out_of_season.apply()
    updated_notifications = Notification.objects.all().count()
    self.assertNotEqual(self.current_notifications, updated_notifications)

#2


0  

I think you're looking for CELERY_ALWAYS_EAGER setting. If set to True it will run your tasks synchronously. You can set it in your test settings or you can decorate only that test with @override_settings(CELERY_ALWAYS_EAGER=True)

我想你是在寻找CELERY_ALWAYS_EAGER设置。如果设置为真,它将同步运行您的任务。您可以将其设置在测试设置中,或者您可以只使用@override_settings来修饰该测试(CELERY_ALWAYS_EAGER=True)