起因
最近打算实现异步任务,回想起当年看celery的场景,重新整理下celery的机制
1. 任务入队列
假定一个函数定义如下
def add(a, b, c=0):
print a + b + c
任务被序列化后,以字符串的形式入队列
{"body": "gAJ9cQEoVQdleHBpcmVzcQJOVQN1dGNxA4lVBGFyZ3NxBF1xBShLD0sUZVUFY2hvcmRxBk5VCWNhbGxiYWNrc3EHTlUIZXJyYmFja3NxCE5VB3Rhc2tzZXRxCU5VAmlkcQpVJDA2ZjMzMWQ1LWFhZTktNGVmNy05NDVmLTNhNzM3NThlNmI2MnELVQdyZXRyaWVzcQxLAFUEdGFza3ENWAkAAAB0YXNrcy5hZGRxDlUDZXRhcQ9OVQZrd2FyZ3NxEH1xEVgBAAAAY0sKc3Uu", "headers": {}, "content-type": "application/x-python-serialize", "properties": {"body_encoding": "base64", "delivery_info": {"priority": 0, "routing_key": "xxxxxxxx", "exchange": "xxxxxxxx"}, "delivery_mode": 2, "delivery_tag": "2e1bc567-980d-46a0-94d4-d9ad030973d3"}, "content-encoding": "binary"}
展开
{
"body": "gAJ9cQEoVQdleHBpcmVzcQJOVQN1dGNxA4lVBGFyZ3NxBF1xBShLD0sUZVUFY2hvcmRxBk5VCWNhbGxiYWNrc3EHTlUIZXJyYmFja3NxCE5VB3Rhc2tzZXRxCU5VAmlkcQpVJDA2ZjMzMWQ1LWFhZTktNGVmNy05NDVmLTNhNzM3NThlNmI2MnELVQdyZXRyaWVzcQxLAFUEdGFza3ENWAkAAAB0YXNrcy5hZGRxDlUDZXRhcQ9OVQZrd2FyZ3NxEH1xEVgBAAAAY0sKc3Uu",
"headers": {},
"content-type": "application/x-python-serialize",
"properties": {
"body_encoding": "base64",
"delivery_info": {
"priority": 0,
"routing_key": "xxxxxxxx",
"exchange": "xxxxxxxx"
},
"delivery_mode": 2,
"delivery_tag": "2e1bc567-980d-46a0-94d4-d9ad030973d3"
},
"content-encoding": "binary"
}
body 中存储有task需要执行的所有信息,默认情况下, 它的编码方式是
dict –> pickle 编码 –> base64编码 –> 字符串
解码后的body 形如
{
'utc': False,
'chord': None,
'args': [
15,
20
],
'retries': 0,
'expires': None,
'task': u'tasks.add',
'callbacks': None,
'errbacks': None,
'taskset': None,
'kwargs': {
u'c': 10
},
'eta': None,
'id': '06f331d5-aae9-4ef7-945f-3a73758e6b62'
}
其中重要的字段是 task
u’tasks.add’ 是函数的全路径,包含包路径 args
[15, 20] 函数参数 kwargs
{ u’c’: 10} 函数参数
2. 任务执行
函数的执行可以直接简化成
def add(a, b, c=0):
print a + b + c
globals()['add'](*[15, 20], **{'c':10})
3. 后记
当然celery本身比这复杂的多,任务编码的方式可以自己指定,worker要能执行某个任务,任务的信息是需要提前注册的。