序列文章:
Celery 源码解析一:Worker 启动流程概述
Celery 源码解析二:Worker 的执行引擎
Celery 源码解析三: Task 对象的实现
Celery 源码解析四: 定时任务的实现
Celery 源码解析五: 远程控制管理
Celery 源码解析六:Events 的实现
Celery 源码解析七:Worker 之间的交互
Celery 源码解析八:State 和 Result
Task 的实现在 Celery 中你会发现有两处,一处位于 celery/app/task.py,这是第一个;第二个位于 celery/task/base.py 中,这是第二个。他们之间是有关系的,你可以认为第一个是对外暴露的接口,而第二个是具体的实现!所以,我们由简入繁,先来看看对外的接口:
其实这就是个我们声明 Task 的对象,例如我们使用这么一段代码:

我们可以看看 add 对象是啥:
In [1]: add
Out[1]: <@task: worker.add of tasks:0x10c9b06d0>
你会发现其实他就是我们的一个 Task 对象,所以你就可以观察一下我们平时使用这个 add 的形式在里面是如何实现的了,例如我们最常使用的可能就两种方式了,分别是:
In [2]: add.delay()
In [3]: add.apply_async()
其他你看一下源码就会发现他们的实现是一样的,就像这样:

我们现在很清楚,调用 apply_async
是将我们的 Task 提交到 MQ 中,然后获得一个 celery.result.AsyncResult 对象,那么具体都做了哪些工作,还是需要进一步查看的。apply_async
的参数有很多,所以我们需要先给他归个类,这样就好看多了,概括着看,可以分为这么几类:
- AMQP 类:connection、queue、exchange、routing_key、producer、publisher、headers
- MQ 策略类:countdown、eta、expires、retry、retry_policy、priority、
- 管理类:shadow、serializer、compression、add_to_parent
- 其他:args、kwargs、link、link_error、
这样一看就感觉一目了然多了,AMQP 类的我们就不关注了,毕竟都看了这么多了,应该大家都熟悉了。这里的主要关注点还是在 MQ 策略类和管理类上,着重在 MQ 策略类上,因为管理类的功能稍微比较简单一些。
async 发送消息
在 apply_async
中,我们可以看到有两处执行逻辑,第一处是:

这里是直接调用 apply
,然后这里的条件 task_always_eager
是什么意思我们还没见过,可以看一下文档:

ok,了解,其实就是说这是个同步的接口,那么我们就可以对应着看到下面这处应该是异步的实现咯:

既然如此,我们一个个得来看。
同步发送消息实现
同步执行消息的一层函数比较简单,只是简单的构建了一个 tracer,然后就从 tracer 调用中拿到调用结果,我们看上去会比较舒服:

但是,这个 tracer 的内容就复杂啦,但是这个 build_tracer
的构建函数不需要太过关注,所以我们需要关注的是 build_tracer
返回的这个 tracer
函数,但是这个函数的内容很多,为了简约一下,所以给大家抽象了一番。同步调用过程中,可以分为几部分功能,分别是:
- 信号处理:执行前/后/成功这几个时刻需要释放一些信号给感兴趣的成员
- 失败处理:对于没有执行的情况需要进行细分处理,例如:reject/ignore/retry/exception 等
- 依赖处理:因为 Celery 支持一些简单的依赖,所以执行完成之后需要执行被依赖的 tasks
- 执行逻辑:这个就是正常的函数调用咯
- 其他:例如统计执行时间,出入栈之类的
我们就看下任务的执行逻辑是怎么样的,在代码里面是很简单的一个函数调用,其实就是看 Task 对象有没有实现 __call__
方法,如果没有就使用 run
方法:

那 task 的 __call__
实现也不是太复杂,其实最后调用的也是 run
方法,所以到最后都还是 run
方法的责任啦,但是,这里的基类是不实现 run
方法的,所以这个实现就落实到具体的实现类中了,那么你以为 run
方法会在 celery/app/base.py 中实现?我之前也是这么想的,但是,后来我发现不是的,这个实现其实就是我们在代码里面使用 @app.task
装饰的函数,其实就是讲我们自定义的函数封装成 run
,这样调用 run
不就执行的我们的函数了吗?有意思吧,这个封装的方式我们后面再说,也就是说同步的方式我们就到此吧,也差不多了。
异步发送消息实现
看完同步的我们再来看看异步的,在说异步的之前,我们先思考一下,异步的应该是怎样?之前看的时候我猜想异步不就是把 Task 对象塞进 MQ 中么,就应该是这么简单,但是,看完之后发现还是 too young 了,因为从同步中我们就可以看出,还是有很多功课要完成的,不管怎样,一起来再看一遍。
从前边我们说有同步和异步两种形式那里我们可以发现同步和异步的除了功能不一样之外,还有调用的对象也不一样,同步的是调用 Task 自己的方法,也就是说消息被 Task 自己消化了;而异步的确实使用的 Celery 对象的方法,也就是说还得依赖于 Celery 这个 Boss 来实现。这是为啥呢?很明显嘛,Task 自身没有关于 MQ 的任何消息,而只有一个绑定的 Celery 对象,所以从抽象层面就只能交给 Celery 了,而 Celery 却包含了所有你需要的信息,是可以完成这个任务的。
所以,异步的消息到了 Celery 是这么被发出去的:

这里出现了一个我们还没怎么接触过的 amqp
,但是没关系,随着等下的了解,我们会认识到它的,这里的几个关键步骤都是通过 amqp
来完成的,所以我们应该着重看看他们
异步消息体的创建
在 Celery 中,异步消息体是通过 create_task_message
来创建的,我们可以发现,这里是传了一大堆参数进去,但是,无妨,对于这些参数,我们大部分都在前面见过了,不怵,主要还是需要关注一下内部都为消息体做了什么工作:

这里可以发现两件事情
- 消息体的预处理都是在这里完成的,例如检验和转换参数格式
- 构建消息就用了四个属性:
headers
、properties
、body
和sent_event
这里其实就是所有的构建消息体的代码了,为什么呢,因为 task_message 是一个 nametuple:

异步消息的发送
异步消息的发送这里不是直接就调用的一个函数,而是动态得创建了一个 sender ,然后才调用这个 sender 发送的(没搞懂为啥,为了扩展?)。而创建 sender 的逻辑倒是比较简单,所以忽略了,直接来看真正的 send 操作是如何完成的,其实之前提过了,这里真正的 send 操作就像之前我们看同步的执行逻辑一样尿性,又臭又长,真的,又臭又长,而且作者自己都加注释承认了,他的理由是为了性能!
同样得,为了方便我们的理解,我还是采用抽丝剥茧的方式来给大家介绍一下,首先,我习惯性得分个类:
- MQ 的各项功能:routing_key/exchange/delivery_mode/retry
- 任务执行的前后处理:发送前/发送后
- 真正的发送逻辑
- 其他
其实重头戏应该在 MQ 的参数确定上,因为只要这些参数都确定了,消息的发送只是一个 producer.publish
就解决的事情,所以我们花些精力来看看 MQ 的参数都是怎么决定出来的:
- queue_name
- 调用
task.delay
的时候传的,没传并且也没传exchange
那就是default
了 - 不会出现传了
exchange
但是不传queue
- 调用
- routing_key
- 调用
task.delay
的时候传的,没传就看exchange
有没有,没有就是queue
的值了 - 如果参数传了
exchange
,那么就是配置中的默认routing_key
- 调用
- exchange:
- 调用
task.delay
的时候传的,没传但是exchange_type
类型是direct
,那么就是 "" - 如果类型不是
direct
,那么 queue 有 exchange 就用,没有就使用默认的
- 调用
- delivery_mode
- 调用
task.delay
的时候传的,没传就看 queue 里面有没有,有就用 - 没有就使用默认的
- 调用
- retry:
- 调用
task.delay
的时候传了就用,没传就用默认的
- 调用
等这些参数确认完之后,就使用这些参数发送了!
然后这样子就将消息发出去了,等待 Worker 的接收,而 worker 的接受逻辑我们之前已经看到了,其实还是注册的 Consumer 的 on_message
附加
在前面我们说如何构建异步消息体的时候,对于消息体只是简单的用几个 ...
忽略过,但是,对于整体理解来说,我们不应该忽略他们的实质内容,所以在最后我把他们都罗列出来,前后的会用到的。而且你会发现有点意思的是,对于我们的一个异步调用,task
名和 id
都是放在 headers
里头的,而参数什么的却是放在 body
里面,在我自己实现的异步 MQ 里面,这些都是放在 body
里面的,这点我倒是不太欣赏 Celery 的。
headers

properties

body

send_event

Celery 源码解析三: Task 对象的实现的更多相关文章
-
Celery 源码解析五: 远程控制管理
今天要聊的话题可能被大家关注得不过,但是对于 Celery 来说确实很有用的功能,曾经我在工作中遇到这类情况,就是我们将所有的任务都放在同一个队列里面,然后有一天突然某个同学的代码写得不对,导致大量的 ...
-
Celery 源码解析六:Events 的实现
在 Celery 中,除了远程控制之外,还有一个元素可以让我们对分布式中的任务的状态有所掌控,而且从实际意义上来说,这个元素对 Celery 更为重要,这就是在本文中将要说到的 Event. 在 Ce ...
-
Mybatis源码解析(三) —— Mapper代理类的生成
Mybatis源码解析(三) -- Mapper代理类的生成 在本系列第一篇文章已经讲述过在Mybatis-Spring项目中,是通过 MapperFactoryBean 的 getObject( ...
-
AspNetCore3.1_Secutiry源码解析_2_Authentication_核心对象
系列文章目录 AspNetCore3.1_Secutiry源码解析_1_目录 AspNetCore3.1_Secutiry源码解析_2_Authentication_核心项目 AspNetCore3. ...
-
ReactiveCocoa源码解析(三) Signal代码的基本实现
上篇博客我们详细的聊了ReactiveSwift源码中的Bag容器,详情请参见<ReactiveSwift源码解析之Bag容器>.本篇博客我们就来聊一下信号量,也就是Signal的的几种状 ...
-
ReactiveSwift源码解析(三) Signal代码的基本实现
上篇博客我们详细的聊了ReactiveSwift源码中的Bag容器,详情请参见<ReactiveSwift源码解析之Bag容器>.本篇博客我们就来聊一下信号量,也就是Signal的的几种状 ...
-
React的React.createRef()/forwardRef()源码解析(三)
1.refs三种使用用法 1.字符串 1.1 dom节点上使用 获取真实的dom节点 //使用步骤: 1. <input ref="stringRef" /> 2. t ...
-
Celery 源码解析四: 定时任务的实现
在系列中的第二篇我们已经看过了 Celery 中的执行引擎是如何执行任务的,并且在第三篇中也介绍了任务的对象,但是,目前我们看到的都是被动的任务执行,也就是说目前执行的任务都是第三方调用发送过来的.可 ...
-
Spring源码解析三:IOC容器的依赖注入
一般情况下,依赖注入的过程是发生在用户第一次向容器索要Bean是触发的,而触发依赖注入的地方就是BeanFactory的getBean方法. 这里以DefaultListableBeanFactory ...
随机推荐
-
CDC的StretchBlt函数载入位图时图片失真问题
最近遇到加载的bmp图片出现失真问题,查找得知需要用SetStretchBltMode函数设置拉伸模式. 函数原型:int SetSTretchBltMode(HDC hdc, int iStretc ...
-
Getting Started With Hazelcast 读书笔记(第二章、第三章)
第二章 起步 本章就相当简单粗暴了,用一个个例子说明hazelcast怎么用. 1.map,set,list这些集合类都是开箱即用的,只要从Hazelcast的实例中获取一份就行. 2.增加了Mult ...
-
解决内网主机ping不通网关能ping内网
有一台笔记本电脑可以自动获取IP,可以和内网其他主机互相PING通,就是PING 不通网关,只能上内网,不能上外网,IP换到其他主机上也可以上外网,说明路由器上没什么限制.路由器也查了,电脑也重装了, ...
-
js使用技巧大全
1.防止重新构建 var constructedHtml = ""; for(var i = 0,len = arr.length;i < len;i++){ constru ...
-
UVa1628 UVaLive5847 Pizza Delivery
填坑系列(p.302) 既然不知道后面还要卖多少个就加一维状态嘛.. lrj写的O(n)转移?其实转移可以O(1) 貌似按x排序有奇效? #include<cstdio> #include ...
-
poj2228
这显然是一道环形dp的题目 处理环形我们都是要转化为线性来做 一般有这么两种方法处理 复制一段到最后 (比如说noip的能量项链) 考查环形对dp的影响然后分类讨论(比如bzoj1040) 这道题我们 ...
-
SQL Server 跨库复制表方法小笔记
insert into tableA (column1,column2.....) SELECT * FROM OPENDATASOURCE('SQLOLEDB', 'Data Source=127. ...
-
使用WebGL加载Google街景图
我们要实现的功能比较简单:首先通过坐标定位.我的位置.地址搜索等方式,调用google map api获取地址信息.然后根据地址信息中的全景信息获取当前缩放级别的全景信息.最终把这些全景信息通过Web ...
-
Switch 语句
如果您希望有选择地执行若干代码块之一,请使用 Switch 语句. 使用 Switch 语句可以避免冗长的 if..elseif..else 代码块. 语法 工作原理: 对表达式(通常是变量)进行一次 ...
-
kaggle入门项目:Titanic存亡预测 (一)比赛简介
自从入了数据挖掘的坑,就在不停的看视频刷书,但是总觉得实在太过抽象,在结束了coursera上Andrew Ng 教授的机器学习课程还有刷完一整本集体智慧编程后更加迷茫了,所以需要一个实践项目来扎实之 ...