Django,芹菜,Redis, RabbitMQ:用于fan - out- on - write的链式任务

时间:2021-03-09 19:17:23

I've been watching Rick Branson's PyCon video: Messaging at Scale at Instagram. You might want to watch the video in order to answer this question. Rick Branson uses Celery, Redis and RabbitMQ. To get you up to speed, each user has a redis list for their homefeed. Each list contains media ID's of photos posted by the people they follow.

我一直在看瑞克·布兰森(Rick Branson)的PyCon视频:Instagram上的大规模即时通讯。为了回答这个问题,你可以看一下视频。里克·布兰森(Rick Branson)使用芹菜、红辣椒(Redis)和兔八哥(RabbitMQ)。为了让你跟上进度,每个用户都有一个他们的homefeed的redis列表。每个列表包含他们关注的人发布的照片的媒体ID。

Justin Bieber for example has 1.5 million followers. When he posts a photo, the ID of that photo needs to be inserted into each individual redis list for each of his followers. This is called the Fanout-On-Write approach. However, there are a few reliability problems with this approach. It can work, but for someone like Justin Bieber or Lady Gaga who have millions of followers, doing this in the web request (where you have 0-500ms to complete the request) can be problem. By then, the request will timeout.

比如贾斯汀·比伯就有150万粉丝。当他发布一张照片时,需要将照片的ID插入到每个红名单中,以供他的每个追随者使用。这被称为“扇面写”方法。然而,这种方法存在一些可靠性问题。这是可行的,但对于像贾斯汀·比伯或Lady Gaga这样拥有数百万粉丝的人来说,在网络请求中这么做(你有0-500ms的时间来完成请求)可能会有问题。到那时,请求将超时。

So Rick Branson decided to use Celery, an asynchronous task queue/job queue based on distributed message passing. Any heavy lifting such as inserting media IDs into follower's lists can be done asynchronously, outside of the web request. The request will complete and celery will continue to insert the IDs into all of the lists.

因此,里克·布兰森决定使用芹菜,一种基于分布式消息传递的异步任务队列/作业队列。任何繁重的工作,如将媒体id插入到追随者的列表中,都可以在web请求之外异步完成。请求将完成,芹菜将继续将id插入到所有列表中。

This approach works wonders. But again, you don't want to deliver all of Justin's followers to Celery in one huge chunk because it would tie up a celery worker. Why not have multiple workers work on it at the same time so it finishes faster? Brilliant idea! you'd want to break up this chunk into smaller chunks and have different workers working on each batch. Rick Branson does a batch of 10,000 followers, and he uses something called a cursor to keep inserting media IDs for all of Justin Bieber's followers until it is completed. In the video, he talks about this in 3:56

这种方法能创造奇迹。但同样,你也不想把Justin的所有追随者都集中到芹菜上,因为它会绑住一个芹菜工人。为什么不让多个工人同时处理它,以便它更快地完成?这主意真棒!你会想要把这个块分成更小的块,让不同的工人在每个批上工作。里克·布兰森(Rick Branson)拥有1万名粉丝,他使用一种名为“光标”(cursor)的东西,为所有贾斯汀·比伯(Justin Bieber)的粉丝插入媒体id,直到它完成。在视频中,他在3:56谈到这个

I was wondering if anyone could explain this more and show examples of how it can be done. I'm currently trying to attempt the same setup. I use Andy McCurdy's redis-py python client library to communicate with my redis server. For every user on my service, I create a redis followers list.

我想知道是否有人能解释更多,并展示如何做到这一点。我目前正在尝试同样的设置。我使用Andy McCurdy的redis-py python客户端库与redis服务器通信。对于我的服务中的每个用户,我都创建一个redis追随者列表。

So a user with an ID of 343 would have a list at the following key:

因此,一个ID为343的用户将有一个列表:

followers:343

I also create a homefeed list for each user. Every user has their own list. So a user with an ID of 1990 would have a list at the following key:

我还为每个用户创建一个homefeed列表。每个用户都有自己的列表。所以一个ID为1990的用户会有如下键的列表:

homefeed:1990

In the "followers:343" redis list, it contains all the IDs of the people who follow user 343. user 343 has 20,007 followers. Below, I am retrieving all the IDs in the list starting from index 0 all the way to the end -1 just to show you what it looks like.

在“追随者:343”的redis列表中,它包含了追随用户343的所有用户的id。用户343有20,007个追随者。下面,我正在检索列表中的所有id,从索引0一直到最后-1,只是为了向您展示它是什么样子。

>>> r_server.lrange("followers:343", 0, -1)
['8', '7', '5', '3', '65', '342', '42', etc...] ---> for the sake of example, assume this list has another 20,000 IDs.

What you see is a list of all the ID's of users who follow user 343.

您看到的是追随用户343的所有用户ID的列表。

Here is my proj/mydjangoapp/tasks.py which contains my insert_into_homefeed function:

这是我的项目/ mydjangoapp /任务。py包含我的insert_into_homefeed函数:

from __future__ import absolute_import
from celery import shared_task
import redis
pool = redis.ConnectionPool(host='XX.XXX.XXX.X', port=6379, db=0, password='XXXXX')

@shared_task
def insert_into_homefeed(photo_id, user_id):
    # Grab the list of all follower IDs from Redis for user_id.
    r_server = redis.Redis(connection_pool=pool)

    followers_list = r_server.lrange("followers:%s" % (user_id), 0, -1)

    # Now for each follower_id in followers_list, find their homefeed key 
    # in Redis and insert the photo_id into that homefeed list.

    for follower_id in followers_list:
        homefeed_list = r_server.lpush("homefeed:%s" % (follower_id), photo_id)
    return "Fan Out Completed for %s" % (user_id)

In this task, when called from the Django view, it will grab all the IDs of the people who follow user 343 and then insert the photo ID into all of their homefeed lists.

在这个任务中,当从Django视图调用时,它将获取跟踪用户343的所有用户的ID,然后将照片ID插入到他们所有的homefeed列表中。

Here is my upload view in my proj/mydjangoapp/views.py. I basically call celery's delay method and pass on the neccessary variables so that the request ends quickly:

这是我在proj/mydjangoapp/views.py中的上传视图。我基本上调用了芹菜的延迟方法,并传递了必要的变量,使请求快速结束:

# Import the Celery Task Here
from mydjangoapp.tasks import insert_into_homefeed


@csrf_exempt
def Upload(request):
    if request.method == 'POST':
        data  = json.loads(request.body)
        newPhoto = Photo.objects.create(user_id = data['user_id'], description= data['description'], photo_url = data['photo_url'])
        newPhoto_ID = newPhoto.pk
        insert_into_homefeed.delay(newPhoto_ID, data['user_id'])
        return HttpResponse("Request Completed")

How can I do this in such a way that it will be batched by 10,000?

我怎么能以这样一种方式来做呢?

1 个解决方案

#1


8  

The approach described in the video is task "chaining".

视频中描述的方法是任务“链接”。

To get your task method up and running as a chain, you want to add an extra parameter that represents the index into the list of followers. Instead of working on the full list of followers, the task only works on a fixed batch size, starting from the index argument it was handed. At completion, the task should create a new task and pass the new index.

要使任务方法以链的形式启动并运行,您需要在追随者列表中添加一个表示索引的额外参数。这个任务不是在完整的追随者列表上工作,而是只在一个固定的批处理大小上工作,从它的索引参数开始。完成后,任务应该创建一个新任务并传递新索引。

INSERT_INTO_HOMEFEED_BATCH = 10000

@shared_task
def insert_into_homefeed(photo_id, user_id, index=0):
    # Grab the list of all follower IDs from Redis for user_id.
    r_server = redis.Redis(connection_pool=pool)

    range_limit = index + INSERT_INTO_HOMEFEED_BATCH - 1 # adjust for zero-index

    followers_list_batch = r_server.lrange("followers:%s" % (user_id), index, range_limit)

    if not followers_list_batch:
        return # zero followers or no more batches

    # Now for each follower_id in followers_list_batch, find their homefeed key 
    # in Redis and insert the photo_id into that homefeed list.
    for follower_id in followers_list:
        homefeed_list = r_server.lpush("homefeed:%s" % (follower_id), photo_id)

    insert_into_homefeed.delay(photo_id, user_id, range_limit + 1)

This works well because Redis lists are ordered and the lrange command doesn't return an error on out-of-range inputs.

这很好,因为Redis列表是有序的,lrange命令不会在超出范围的输入上返回错误。

#1


8  

The approach described in the video is task "chaining".

视频中描述的方法是任务“链接”。

To get your task method up and running as a chain, you want to add an extra parameter that represents the index into the list of followers. Instead of working on the full list of followers, the task only works on a fixed batch size, starting from the index argument it was handed. At completion, the task should create a new task and pass the new index.

要使任务方法以链的形式启动并运行,您需要在追随者列表中添加一个表示索引的额外参数。这个任务不是在完整的追随者列表上工作,而是只在一个固定的批处理大小上工作,从它的索引参数开始。完成后,任务应该创建一个新任务并传递新索引。

INSERT_INTO_HOMEFEED_BATCH = 10000

@shared_task
def insert_into_homefeed(photo_id, user_id, index=0):
    # Grab the list of all follower IDs from Redis for user_id.
    r_server = redis.Redis(connection_pool=pool)

    range_limit = index + INSERT_INTO_HOMEFEED_BATCH - 1 # adjust for zero-index

    followers_list_batch = r_server.lrange("followers:%s" % (user_id), index, range_limit)

    if not followers_list_batch:
        return # zero followers or no more batches

    # Now for each follower_id in followers_list_batch, find their homefeed key 
    # in Redis and insert the photo_id into that homefeed list.
    for follower_id in followers_list:
        homefeed_list = r_server.lpush("homefeed:%s" % (follower_id), photo_id)

    insert_into_homefeed.delay(photo_id, user_id, range_limit + 1)

This works well because Redis lists are ordered and the lrange command doesn't return an error on out-of-range inputs.

这很好,因为Redis列表是有序的,lrange命令不会在超出范围的输入上返回错误。