芹菜,RabbitMQ消息不断发送

时间:2021-04-11 09:49:13

I'm new to rabbit and celery so please bare with me.

我对兔子和芹菜还不熟,所以请跟我说实话。

So here is the setup - django project with celery and a CloudAMQP rabbitMQ worker doing the message brokering.

这里是setup - django项目,该项目使用芹菜和CloudAMQP rabbitMQ工作人员进行消息代理。

My Celery/RabbitMQ settings:

我的芹菜/ RabbitMQ设置:

# RabbitMQ & Celery settings
BROKER_URL = 'ampq://guest:guest@localhost:5672/' # Understandably fake
BROKER_POOL_LIMIT = 1
BROKER_CONNECTION_TIMEOUT = 30
BROKER_HEARTBEAT = 30
CELERY_SEND_EVENTS = False
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'

A docker container running celery with the following command:

一个操作芹菜的docker容器,有以下命令:

bash -c 'cd django && celery -A pkm_main worker -E -l info --concurrency=3'

The shared_task definition:

shared_task定义:

from __future__ import absolute_import

from celery import shared_task

@shared_task
def push_notification(user_id, message):
    logging.critical('Push notifications sent')
    return {'status': 'success'}

And me actually calling it when something happens (I have omitted some of the code because it does not seem to be relevant):

当事情发生时,我就会调用它(我省略了一些代码,因为它似乎不相关):

from notificatons.tasks import push_notification

    def like_this(self, **args):
    # Do like stuff and then do .delay()
    push_notification.delay(media.user.id, request.user.username + ' has liked your item')

So when this is ran - everything seems fine and dandy - the output looks like so:

所以当它运行时,一切看起来都很好,输出看起来是这样的:

worker_1 | [2016-03-25 09:03:34,888: INFO/MainProcess] Received task: notifications.tasks.push_notification[8443bd88-fa02-4ea4-9bff-8fbec8c91516]
worker_1 | [2016-03-25 09:03:35,333: CRITICAL/Worker-1] Push notifications sent
worker_1 | [2016-03-25 09:03:35,336: INFO/MainProcess] Task notifications.tasks.push_notification[8443bd88-fa02-4ea4-9bff-8fbec8c91516] succeeded in 0.444933412999s: {'status': 'success'}

So from what I gather the task has been ran and executed properly, the messages should be stopped and RabbitMQ should stop.

因此,根据我所收集的任务已经正确地运行和执行,应该停止消息,并停止RabbitMQ。

But in my RabbitMQ Management I see messages getting published and delivered non-stop:

但在我的RabbitMQ管理中,我看到消息不断地发布和传递:

芹菜,RabbitMQ消息不断发送

So what I'm gathering from this is that RabbitMQ is trying to send some sort of confirmation and failing and retrying? Is there a way to actually turn this behavior off?

所以我从这里收集到的是RabbitMQ试图发送某种确认,失败和重试?有没有一种方法可以让这种行为消失呢?

All help and advice is warmly welcomed.

所有的帮助和建议都受到热烈欢迎。

EDIT: Forgot to mentions something important - until I call on push_notification.delay() the message tab is empty save for the heartbeat that comes and goes every 30 seconds. Only after I have called .delay() does this happen.

编辑:忘记提到一些重要的事情——直到我调用push_notification.delay()消息选项卡是空的,保存为每30秒发生一次的心跳。只有在我调用.delay()之后才会发生这种情况。

EDIT 2: CELERYBEAT_SCHEDULE settings (I've tried running with and without them - there was no difference but adding them just in case)

编辑2:CELERYBEAT_SCHEDULE设置

CELERYBEAT_SCHEDULE = {
    "minutely_process_all_notifications": {
        'task': 'transmissions.tasks.process_all_notifications',
        'schedule': crontab(minute='*')
    }
}

EDIT 3: Added View code. Also I'm not using the CELERYBEAT_SCHEDULE. I'm just keeping the config in the code for future scheduled tasks

编辑3:添加视图代码。我也没有使用CELERYBEAT_SCHEDULE。我只是把配置保存在代码中以备将来计划的任务使用

from notifications.tasks import push_notification

class MediaLikesView(BaseView):
    def post(self, request, media_id):
        media = self.get_object(media_id)
        data = {}
        data['media'] = media.id
        data['user'] = request.user.id
        serializer = MediaLikeSerializer(data=data)
        if serializer.is_valid():
            like = serializer.save()
            push_notification.delay(media.user.id, request.user.username + ' has liked your item')
            serializer = MediaGetLikeSerializer(like)
            return self.get_mocked_pagination_response(status=status.HTTP_204_NO_CONTENT)
        return self.get_mocked_pagination_response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)

1 个解决方案

#1


1  

It's Celery's mingle and gossiping. Disable by adding --without-gossip --without-mingle --without-heartbeat to the command line arguments.

这是芹菜的混合和闲谈。通过向命令行参数添加——没有流言——没有混合——没有心跳——来禁用。

Also don't forget to set BROKER_HEARTBEAT = None when you've disabled heartbeats on the commandline, otherwise you'll disconnected after 30s. It's most often better to rely on TCP keepalive then AMQP heartbeats, or even worse, Celery's own heartbeats.

当你在命令行上禁用心跳时,不要忘记设置BROKER_HEARTBEAT = None,否则你将在30秒后断开连接。比起AMQP的心跳,或者更糟的是,芹菜的心跳,依靠TCP来维持生命往往更好。

#1


1  

It's Celery's mingle and gossiping. Disable by adding --without-gossip --without-mingle --without-heartbeat to the command line arguments.

这是芹菜的混合和闲谈。通过向命令行参数添加——没有流言——没有混合——没有心跳——来禁用。

Also don't forget to set BROKER_HEARTBEAT = None when you've disabled heartbeats on the commandline, otherwise you'll disconnected after 30s. It's most often better to rely on TCP keepalive then AMQP heartbeats, or even worse, Celery's own heartbeats.

当你在命令行上禁用心跳时,不要忘记设置BROKER_HEARTBEAT = None,否则你将在30秒后断开连接。比起AMQP的心跳,或者更糟的是,芹菜的心跳,依靠TCP来维持生命往往更好。