[源码解析] 并行分布式任务队列 Celery 之 多进程模型

时间:2023-07-21 19:14:56

[源码解析] 并行分布式任务队列 Celery 之 多进程模型

0x00 摘要

Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统,专注于实时处理的异步任务队列,同时也支持任务调度。因为 Celery 通过多进程来提高执行效率,所以本文将带领大家初步了解 Celery 之 多进程架构和模型。

通过本文,大家可以了解为了实现一个多进程架构,Celery 都有哪些思考,做了哪些抽象,比如:

  • Celery 作为一个整体系统如何把多进程模型融入进来,从而得到进程池;
  • 如何根据不同 OS 实例化不同的多进程模型;
  • 如何建立父子进程之间的通讯机制,如何读写分离;
  • 如何生成子进程,子进程工作逻辑为何,如何抽象子进程;
  • 如何辅助管理子进程;
  • 如何给子进程分配任务;
  • 如何处理子进程返回;

我们先给出一个粗略逻辑,让大家有一个大致逻辑,便于后续的理解。下图中需要注意的是:

  • TaskHandler 是父进程给子进程分配任务的逻辑;
  • ResultHandler 是父进程处理子进程返回的逻辑;
  • Supervisor 是辅助管理handler;
  • Worker 是子进程逻辑业务代码,_pool 是进程池,ForkProcess(也就是 WorkerProcess)是子进程抽象,每个子进程抽象都会运行 Worker。

这几个逻辑概念一定要分清楚。

+--------------------------+
| AsynPool |
| |
| |
| ResultHandler +-------> celery.concurrency.asynpool.ResultHandler
| |
| Supervisor +-------> billiard.pool.Supervisor
| |
| TaskHandler +-------> billiard.pool.TaskHandler
| |
| TimeoutHandler +-------> billiard.pool.TimeoutHandler
| |
| Worker +-------> celery.concurrency.asynpool.Worker
| |
| _pool +-----------------+---> <ForkProcess(ForkPoolWorker-1, started daemon)>
+--------------------------+ |
+---> <ForkProcess(ForkPoolWorker-2, started daemon)>
|
+---> <ForkProcess(ForkPoolWorker-3, started daemon)>
|
+---> <ForkProcess(ForkPoolWorker-4, started daemon)>

手机如下

[源码解析] 并行分布式任务队列 Celery 之 多进程模型

多进程入口位于 Consumer 的 pool step,所以我们从 Consumer 组件启动入手。

0x01 Consumer 组件 Pool bootstep

首先,Consumer Pool 启动从 bootsteps 开始。这个 Bootstep 是 worker 真正的执行引擎

这里的 Pool bootstrap 之所以又做了一层封装,是因为它需要设定一个伸缩值,也就是所谓的 autoscaler。因为我们这里已经构建了各种池,后面有 task 直接往 Pool 里头丢就行。

1.1 bootsteps

代码位于:celery/worker/components.py。这就是一个入口,目的为:

  • 做各种配置;
  • 引入 TaskPool,TaskPool 是 worker 多进程的入口;
class Pool(bootsteps.StartStopStep):

    def __init__(self, w, autoscale=None, **kwargs):
w.pool = None
if isinstance(autoscale, str):
max_c, _, min_c = autoscale.partition(',')
autoscale = [int(max_c), min_c and int(min_c) or 0]
w.autoscale = autoscale
if w.autoscale:
w.max_concurrency, w.min_concurrency = w.autoscale
super().__init__(w, **kwargs) def create(self, w): semaphore = None
max_restarts = None if w.app.conf.worker_pool in GREEN_POOLS: # pragma: no cover 判断worker_pool是在'eventlet', 'gevent'中默认的是prefork
threaded = not w.use_eventloop or IS_WINDOWS # user_eventloop是否为True和是否是windows如果是则使用线程
procs = w.min_concurrency # 最小缓冲池个数,默认为4
w.process_task = w._process_task # 将worker的_process_task绑定到process_task
if not threaded: # 不使用线程的话
semaphore = w.semaphore = LaxBoundedSemaphore(procs) # 通过LaxBoundedSemaphore实现原子操作,利用队列实现
w._quick_acquire = w.semaphore.acquire # 将相关的操作方法赋值给worker
w._quick_release = w.semaphore.release
max_restarts = 100 # 最大重启次数
if w.pool_putlocks and w.pool_cls.uses_semaphore: # 通过查看类配置是否更新process_task方法
w.process_task = w._process_task_sem # 默认配置更新process_task
allow_restart = w.pool_restarts # 是否允许重启 pool = w.pool = self.instantiate(
w.pool_cls, w.min_concurrency, # w.pool_cls默认是prefork.TaskPool
initargs=(w.app, w.hostname),
maxtasksperchild=w.max_tasks_per_child,
max_memory_per_child=w.max_memory_per_child,
timeout=w.time_limit,
soft_timeout=w.soft_time_limit,
putlocks=w.pool_putlocks and threaded,
lost_worker_timeout=w.worker_lost_wait,
threads=threaded,
max_restarts=max_restarts,
allow_restart=allow_restart,
forking_enable=True,
semaphore=semaphore,
sched_strategy=self.optimization,
app=w.app,
)
_set_task_join_will_block(pool.task_join_will_block)
return pool

这里 w.pool_cls 是 <class 'celery.concurrency.prefork.TaskPool'>,逻辑如下:

+-------------------------------+
| Pool(bootsteps.StartStopStep) |
| |
| |
| celery/worker/components.py |
+---------------+---------------+
|
|
|
v
__init__
+
|
|
|
v
create
+
|
|
v
+--------+----------+
| TaskPool |
| |
| Pool +-------> celery.concurrency.asynpool.AsynPool
| |
| app +-------> Celery
| |
+-------------------+

0x02 进程池入口 -- TaskPool

TaskPool 是多进程的入口,这里对于所有操作系统都是统一的

因为 这里 w.pool_cls 是 <class 'celery.concurrency.prefork.TaskPool'>,所以代码来到TaskPool,在初始化时候,instantiate会先来到基类 BasePool,位置在:celery/concurrency/base.py。

2.1 进程池初始化

这里 __init__注意的为:self.app = app ,即初始化时会传入 celery 应用自己

class BasePool:
"""Task pool.""" Timer = timer2.Timer
_state = None
_pool = None def __init__(self, limit=None, putlocks=True, forking_enable=True,
callbacks_propagate=(), app=None, **options):
self.limit = limit
self.putlocks = putlocks
self.options = options
self.forking_enable = forking_enable
self.callbacks_propagate = callbacks_propagate
self.app = app

2.2 进程池启动 start

Blueprint 会调用start。

class Blueprint:
def start(self, parent):
self.state = RUN
if self.on_start:
self.on_start()
for i, step in enumerate(s for s in parent.steps if s is not None):
self.started = i + 1
step.start(parent)

由于TaskPool中没有声明start函数,因此这里会调用到其父类BasePool中定义的函数,定义如下

class BasePool(object):
"""Task pool.""" def start(self):
self._does_debug = logger.isEnabledFor(logging.DEBUG)
self.on_start()
self._state = self.RUN

这里会调用到on_start函数,由于各子类覆盖了该函数,因此会调用子类中的on_start函数,同样地,以TaskPool为例,on_start函数的定义如下

class TaskPool(BasePool):
"""Multiprocessing Pool implementation.""" Pool = AsynPool
BlockingPool = BlockingPool uses_semaphore = True
write_stats = None def on_start(self):
forking_enable(self.forking_enable)
Pool = (self.BlockingPool if self.options.get('threads', True)
else self.Pool) # 若使用多线程则使用BlockingPool否则使用AsynPool
P = self._pool = Pool(processes=self.limit,
initializer=process_initializer,
on_process_exit=process_destructor,
enable_timeouts=True,
synack=False,
**self.options) # 创建Pool # Create proxy methods 创建代理
self.on_apply = P.apply_async # 将pool中的方法设置到Pool类上
self.maintain_pool = P.maintain_pool
self.terminate_job = P.terminate_job
self.grow = P.grow
self.shrink = P.shrink
self.flush = getattr(P, 'flush', None) # FIXME add to billiard

可以看到,on_start函数主要完成了3个工作

  • 根据选项参数确定使用BlockingPool还是AsynPool(分别为billiard.pool.Poolcelery.concurrency.asynpool.AsyncPool);
  • 创建Pool;
  • 创建代理方法;

这里 windows 系统中,对应的 _pool 为 <class 'billiard.pool.Pool'>,mac系统则为:AsyncPool

此时具体逻辑如下:

+------------------------------+
| Pool(bootsteps.StartStopStep)|
+-------------+--------------+
|
|
|
1 | instantiate
| 2 on_start
| +--------------------------------+
v | |
+-------+--------+--+ |
| TaskPool | |
| | +------+ |
| app +----------> |celery| |
| | +------+ |
| | |
| | +-----------+ |
| _pool +----------> | AsynPool | |
| | +-----------+ |
+----------------+--+ |
^ |
| |
+--------------------------------+

0x03 进程池实现 -- AsynPool

_pool 是根据操作系统的不同进行了分别实现。可以看到这里配置了管道,file,queue,以及真实的 具体线程池。

假设系统为 mac,于是来到了 AsynPool 这个进程池。位置在:celery/concurrency/asynpool.py

3.1 实例化

主要是执行了进程池 Pool 的实例化。这个实例化就是 prefork 的具体实现。这个 Pool 其实就是 AsyncPool

具体工作为:

  • 配置调度策略;

  • 根据本机配置进程数量,就是需要 fork 的子进程数量,默认是 cpu 核数,如果在命令行制定了 -c 参数,则是 -c 参数的值;

  • 创建出来一堆读和写的管道。根据流向的不同和主进程与子进程的不同,之后会分别关闭对应的的一端的管道,比如父进程把写关闭,子进程就把读关闭。并会用抽象的数据结构进行封装以便于管理。这个数据结构的实例用来为主进程和即将 fork 的子进程提供双向的数据传输。同样的,会根据子进程的数量创建出多个管道实例来;

  • 调用基类构造方法。这里为 fork 的关键所在;

  • 根据建立子进程结果,配置file 到 queue 的关系;

代码如下:

class AsynPool(_pool.Pool):
"""AsyncIO Pool (no threads).""" ResultHandler = ResultHandler
Worker = Worker def WorkerProcess(self, worker):
worker = super().WorkerProcess(worker)
worker.dead = False
return worker def __init__(self, processes=None, synack=False,
sched_strategy=None, proc_alive_timeout=None,
*args, **kwargs):
self.sched_strategy = SCHED_STRATEGIES.get(sched_strategy,
sched_strategy)
processes = self.cpu_count() if processes is None else processes #需要 fork 的子进程数量,默认是 cpu 核数,如果在命令行制定了 -c 参数,则是 -c 参数的值
self.synack = synack
# create queue-pairs for all our processes in advance.
self._queues = {
self.create_process_queues(): None for _ in range(processes) #创建出来一堆读和写的管道
} # inqueue fileno -> process mapping
self._fileno_to_inq = {}
# outqueue fileno -> process mapping
self._fileno_to_outq = {}
# synqueue fileno -> process mapping
self._fileno_to_synq = {} # We keep track of processes that haven't yet
# sent a WORKER_UP message. If a process fails to send
# this message within _proc_alive_timeout we terminate it
# and hope the next process will recover.
self._proc_alive_timeout = (
PROC_ALIVE_TIMEOUT if proc_alive_timeout is None
else proc_alive_timeout
)
self._waiting_to_start = set() # denormalized set of all inqueues.
self._all_inqueues = set() # Set of fds being written to (busy)
self._active_writes = set() # Set of active co-routines currently writing jobs.
self._active_writers = set() # Set of fds that are busy (executing task)
self._busy_workers = set()
self._mark_worker_as_available = self._busy_workers.discard # Holds jobs waiting to be written to child processes.
self.outbound_buffer = deque() self.write_stats = Counter() super().__init__(processes, *args, **kwargs) #调用基类构造方法 for proc in self._pool:
# create initial mappings, these will be updated
# as processes are recycled, or found lost elsewhere.
self._fileno_to_outq[proc.outqR_fd] = proc
self._fileno_to_synq[proc.synqW_fd] = proc self.on_soft_timeout = getattr(
self._timeout_handler, 'on_soft_timeout', noop,
)
self.on_hard_timeout = getattr(
self._timeout_handler, 'on_hard_timeout', noop,
)

3.2 建立通讯机制 queues

实例化代码中,queues 的建立需要重点说明,因为父进程 和 子进程 之间使用 queue 来进行通讯

代码如下:

self._queues = {
self.create_process_queues(): None for _ in range(processes)
}

这里创建出来一堆读和写的管道,这里进程数量为 4,因此建立 4 组管道列表,每组列表包括两个_SimpleQueue,具体如下。

self._queues = {dict: 4}
(<billiard.queues._SimpleQueue object>, <billiard.queues._SimpleQueue object at 0x = {NoneType}
(<billiard.queues._SimpleQueue object>, <billiard.queues._SimpleQueue object at 0x = {NoneType}
(<billiard.queues._SimpleQueue object>, <billiard.queues._SimpleQueue object at 0x = {NoneType}
(<billiard.queues._SimpleQueue object>, <billiard.queues._SimpleQueue object at 0x = {NoneType}
__len__ = {int} 4

建立 queues 方法如下,这里建立了 inq, outq, synq:

def create_process_queues(self):
"""Create new in, out, etc. queues, returned as a tuple."""
# NOTE: Pipes must be set O_NONBLOCK at creation time (the original
# fd), otherwise it won't be possible to change the flags until
# there's an actual reader/writer on the other side.
inq = _SimpleQueue(wnonblock=True)
outq = _SimpleQueue(rnonblock=True)
synq = None
if self.synack:
synq = _SimpleQueue(wnonblock=True)
return inq, outq, synq

3.2.1 _SimpleQueue

_SimpleQueue为一个locked pipe,即管道。定义如下:

class _SimpleQueue(object):
'''
Simplified Queue type -- really just a locked pipe
'''
def __init__(self, rnonblock=False, wnonblock=False, ctx=None):
self._reader, self._writer = connection.Pipe(
duplex=False, rnonblock=rnonblock, wnonblock=wnonblock,
)
self._poll = self._reader.poll
self._rlock = self._wlock = None

变量举例如下:

self._poll = {method} <bound method _ConnectionBase.poll of <billiard.connection.Connection self = {_SimpleQueue} <billiard.queues._SimpleQueue object at 0x7fc46ae049e8>
_reader = {Connection} <billiard.connection.Connection object at 0x7fc46ae68c18>
_writer = {Connection} <billiard.connection.Connection object at 0x7fc46ae726a0>

3.2.2 Pipe

上文中,_SimpleQueue 的 self._reader, self._writer 是 pipe 类型,所以需要看看。

pipe 的定义如下:

其实就建立了两个Connection,返回给_SimpleQueue,这两个Connection一个为读抽象,一个为写抽象

if sys.platform != 'win32':

    def Pipe(duplex=True, rnonblock=False, wnonblock=False):
'''
Returns pair of connection objects at either end of a pipe
'''
if duplex:
s1, s2 = socket.socketpair()
s1.setblocking(not rnonblock)
s2.setblocking(not wnonblock)
c1 = Connection(detach(s1))
c2 = Connection(detach(s2))
else:
fd1, fd2 = os.pipe()
if rnonblock:
setblocking(fd1, 0)
if wnonblock:
setblocking(fd2, 0)
c1 = Connection(fd1, writable=False)
c2 = Connection(fd2, readable=False) return c1, c2

3.2.3 Connection

上面又涉及到了 Connection,注意这里不是 Kombu 的connection,而是多进程内部自己的 Connection 定义

class Connection(_ConnectionBase):
"""
Connection class based on an arbitrary file descriptor (Unix only), or
a socket handle (Windows).
"""

Connection 是 基于 file descriptor 的连接类。

class _ConnectionBase(object):
_handle = None def __init__(self, handle, readable=True, writable=True):
if isinstance(handle, _SocketContainer):
self._socket = handle.sock # keep ref so not collected
handle = handle.sock.fileno()
handle = handle.__index__()
self._handle = handle
self._readable = readable
self._writable = writable

现在变量如下:

c1 = {Connection} <billiard.connection.Connection object at 0x7fc46ae68c18>
c2 = {Connection} <billiard.connection.Connection object at 0x7fc46ae726a0>

于是 AsynPool 最终如下:

    +------------------------------+                                                                     +----------------+
| Pool(bootsteps.StartStopStep)| +-----------------+ | Connection |
+-------------+--------------+ | _SimpleQueue | | |
| | | | _write |
| | _reader +---------> | _read |
| | | | _send |
1 | instantiate | | | _recv |
| | | | _handle |
2 on_start | | | +----------------+
| | _poll +---------> _ConnectionBase.poll
+-------------+ | | |
| | | | | +------------+
| | v | _writer +---------> | Connection |
| +---+---+-----------+ | | +------------+
| | TaskPool | +-------+---------+
| | | +------+ ^
| | app +----------> |celery| |
| | | +------+ |
| | | +
| | | +--------------------------+ +----> (<_SimpleQueue>, <_SimpleQueue>)
| | _pool +----------> | AsynPool | |
| | | | | |
| +---+---------------+ | _queues +------->-----> (<_SimpleQueue>, <_SimpleQueue>)
| ^ | | |
| | | | |
| | +--------------------------+ +----> (<_SimpleQueue>, <_SimpleQueue>)
+-------------+ |
|
+----> (<_SimpleQueue>, <_SimpleQueue>)

手机如下:

[源码解析] 并行分布式任务队列 Celery 之 多进程模型

3.3 进程池基类构造方法

我们要再说说 AsynPool 的基类,这是 Celery 作者专门为 python 多进程做的修改,封装。这里建立了各种 消息处理函数,并且建立了子进程

位置在:billiard/pool.py

这里关键工作如下:

  • 用 self._Process = self._ctx.Process 设置成为 <class 'billiard.context.ForkProcess'>

  • 根据子进程数量通过 _create_worker_process(i) 建立子进程;

  • 建立 self._worker_handler = self.Supervisor(self);

  • 建立分配任务 TaskHandler;

  • 建立 TimeoutHandler;

  • 建立 ResultHandler;

具体代码如下:

class Pool(object):
'''
Class which supports an async version of applying functions to arguments.
''' def __init__(self, processes=None, initializer=None, initargs=(),..., **kwargs):
self._ctx = context or get_context()
self._setup_queues()
self._taskqueue = Queue()
self._cache = {}
self._state = RUN
.....
self.readers = {} self._processes = self.cpu_count() if processes is None else processes
self.max_restarts = max_restarts or round(self._processes * 100)
self.restart_state = restart_state(max_restarts, max_restart_freq or 1)
self._Process = self._ctx.Process
self._pool = []
self._poolctrl = {}
self._on_ready_counters = {}
self.putlocks = putlocks
self._putlock = semaphore or LaxBoundedSemaphore(self._processes)
for i in range(self._processes):
self._create_worker_process(i) self._worker_handler = self.Supervisor(self)
if threads:
self._worker_handler.start() self._task_handler = self.TaskHandler(self._taskqueue,
self._quick_put,
self._outqueue,
self._pool,
self._cache)
if threads:
self._task_handler.start() # Thread killing timedout jobs.
if self.enable_timeouts:
self._timeout_handler = self.TimeoutHandler(
self._pool, self._cache,
self.soft_timeout, self.timeout,
)
self._timeout_handler_mutex = Lock()
self._timeout_handler_started = False
self._start_timeout_handler()
# If running without threads, we need to check for timeouts
# while waiting for unfinished work at shutdown.
if not threads:
self.check_timeouts = self._timeout_handler.handle_event # Thread processing results in the outqueue.
self._result_handler = self.create_result_handler()
self.handle_result_event = self._result_handler.handle_event if threads:
self._result_handler.start() self._terminate = Finalize(
self, self._terminate_pool,
args=(self._taskqueue, self._inqueue, self._outqueue,
self._pool, self._worker_handler, self._task_handler,
self._result_handler, self._cache,
self._timeout_handler,
self._help_stuff_finish_args()),
exitpriority=15,
)

下面我们具体一一分析。

3.3.1 建立子进程

如下代码建立子进程。

for i in range(self._processes):
self._create_worker_process(i)

_create_worker_process 主要工作如下:

  • inq, outq, synq = self.get_process_queues() 拿到的是一个读和写的管道的抽象对象。这个管道是之前预先创建好的(就是上面 self.create_process_queues() 创建的)。主要是给即将 fork 的子进程用的,子进程会监听这管道数据结构抽象实例中的读事件,还可以从写管道写数据。

  • w,也就是 self.WorkerProcess 的实例,其实是对 fork 出来的子进程的一个抽象封装。用来方便快捷的管理子进程,抽象成一个进程池,这个 w 会记录 fork 出来的子进程的一些 meta 信息,比如 pid,管道的读写的 fd 等等,并注册在主进程中,主进程可以利用它进行任务分发;

  • 把 WorkerProcess 的实例记录在 self._pool这个很重要,父进程就是用此变量来知道有哪几个子进程

  • w.start() 中包含具体的 fork 过程;

代码如下:

def _create_worker_process(self, i):
sentinel = self._ctx.Event() if self.allow_restart else None
inq, outq, synq = self.get_process_queues()
on_ready_counter = self._ctx.Value('i') w = self.WorkerProcess(self.Worker(
inq, outq, synq, self._initializer, self._initargs,
self._maxtasksperchild, sentinel, self._on_process_exit,
# Need to handle all signals if using the ipc semaphore,
# to make sure the semaphore is released.
sigprotection=self.threads,
wrap_exception=self._wrap_exception,
max_memory_per_child=self._max_memory_per_child,
on_ready_counter=on_ready_counter,
))
self._pool.append(w)
self._process_register_queues(w, (inq, outq, synq))
w.name = w.name.replace('Process', 'PoolWorker')
w.daemon = True
w.index = i
w.start()
self._poolctrl[w.pid] = sentinel
self._on_ready_counters[w.pid] = on_ready_counter
if self.on_process_up:
self.on_process_up(w)
return w

因为提到了 self.WorkerProcess(self.Worker...,所以我们分别介绍下 WorkerProcess 与 Worker。

此时逻辑简略如下:

    +----------------+
| StartStopStep |
+-------+--------+
|
| start
|
v
+-----------+-------------------+
| BasePool |
| celery/concurrency/base.py |
+-----------+-------------------+
|
| start
|
v
+-----------+-------------------+
| TaskPool |
| celery/concurrency/prefork.py |
+-----------+-------------------+
|
| on_start
|
v
+-----------+--------------------+
| AsynPool |
| celery/concurrency/asynpool.py |
+-----------+--------------------+
|
|
v
+--------+------------+
| class Pool(object) |
| billiard/pool.py |
+--------+------------+
|
+----+------+
| |
v v +----------------------+
__init__ _create_worker_process +---> | class Worker(object) |
+----------------------+
3.3.1.1 子进程工作代码

Worker 是子进程的工作代码。也有几种不同的实现方式,比如:

celery.concurrency.asynpool.Worker,billiard/pool.Worker 都是子进程工作循环

以 billiard/pool.Worker 为例看看。

Worker init 之中主要工作为:配置各种fd。

这里 obj.inqW_fd = self.inq._writer.fileno() 就为从 queues 的对应的 Connection 得到对应的 fd :

class _ConnectionBase(object):
_handle = None def fileno(self):
"""File descriptor or handle of the connection"""
self._check_closed()
return self._handle

具体 Worker 定义如下:

class Worker(object):

    def __init__(self, inq, outq, synq=None, initializer=None, initargs=(),...):
......
self.max_memory_per_child = max_memory_per_child
self._shutdown = sentinel
self.inq, self.outq, self.synq = inq, outq, synq
self.contribute_to_object(self) def contribute_to_object(self, obj):
obj.inq, obj.outq, obj.synq = self.inq, self.outq, self.synq
obj.inqW_fd = self.inq._writer.fileno() # inqueue write fd
obj.outqR_fd = self.outq._reader.fileno() # outqueue read fd
if self.synq:
obj.synqR_fd = self.synq._reader.fileno() # synqueue read fd
obj.synqW_fd = self.synq._writer.fileno() # synqueue write fd
obj.send_syn_offset = _get_send_offset(self.synq._writer)
else:
obj.synqR_fd = obj.synqW_fd = obj._send_syn_offset = None
obj._quick_put = self.inq._writer.send
obj._quick_get = self.outq._reader.recv
obj.send_job_offset = _get_send_offset(self.inq._writer)
return obj

变量为:

self = {Worker}
initargs = {tuple: 2} (<Celery tasks at 0x7f8a0a70dd30>, )
inq = {_SimpleQueue} <billiard.queues._SimpleQueue object at 0x7f8a0b66aba8>
inqW_fd = {int} 7
max_memory_per_child = {NoneType} None
maxtasks = {NoneType} None
on_ready_counter = {Synchronized} <Synchronized wrapper for c_int(0)>
outq = {_SimpleQueue} <billiard.queues._SimpleQueue object at 0x7f8a0b6844a8>
outqR_fd = {int} 8
sigprotection = {bool} False
synq = {NoneType} None
synqR_fd = {NoneType} None
synqW_fd = {NoneType} None
wrap_exception = {bool} True

AsynPool 简略版逻辑如下:

下图中需要注意:

Worker 是子进程逻辑业务代码,_pool 是进程池,ForkProcess(也就是 WorkerProcess)是子进程抽象,每个子进程抽象都会运行 Worker,这几个逻辑概念一定要分清楚。

+--------------------------+
| AsynPool |
| |
| |
| ResultHandler +-------> celery.concurrency.asynpool.ResultHandler
| |
| Supervisor +-------> billiard.pool.Supervisor
| |
| TaskHandler +-------> billiard.pool.TaskHandler
| |
| TimeoutHandler +-------> billiard.pool.TimeoutHandler
| |
| Worker +-------> celery.concurrency.asynpool.Worker
| |
| _pool +-----------------+---> <ForkProcess(ForkPoolWorker-1, started daemon)>
+--------------------------+ |
+---> <ForkProcess(ForkPoolWorker-2, started daemon)>
|
+---> <ForkProcess(ForkPoolWorker-3, started daemon)>
|
+---> <ForkProcess(ForkPoolWorker-4, started daemon)>

手机如下

[源码解析] 并行分布式任务队列 Celery 之 多进程模型

精细版逻辑如下:

    +------------------------------+                                                                     +----------------+
| Pool(bootsteps.StartStopStep)| +-----------------+ | Connection |
+-------------+--------------+ | _SimpleQueue | | |
| | | | _write |
| | _reader +---------> | _read |
| | | | _send |
1 | instantiate | | | _recv |
| | | | _handle+---> {int} 8 <-+
2 on_start | | | +----------------+ |
| | _poll +---------> _ConnectionBase.poll |
+-------------+ | | | |
| | | | | +----------------+ |
| | v | _writer +---------> | Connection | |
| +---+---+-----------+ | | | | |
| | TaskPool | +-------+---------+ | _handle+----> {int} 7 |
| | | +------+ ^ | | |
| | app +----------> |celery| | +----------------+ ^ |
| | | +------+ | | |
| | | + | |
| | | +--------------------------+ +----> (<_SimpleQueue>, <_SimpleQueue>) | |
| | _pool +----------> | AsynPool | | | |
| | | | | | | |
| +---+---------------+ | _queues +------->-----> (<_SimpleQueue>, <_SimpleQueue>) | |
| ^ | | | | |
| | | | | | |
| | | | +----> (<_SimpleQueue>, <_SimpleQueue>) | |
+-------------+ | | | | |
| | | | |
+--------------------------+ +----> (<_SimpleQueue>, <_SimpleQueue>) | |
| |
+----------------------+ | |
| | | |
| Worker inq | | |
| | | |
| outq | | |
| | | |
| synq | | |
| | | |
| inqW_fd +-----------------------------------------+ |
| | |
| outqR_fd +------------------------------------------------+
| |
| workloop |
| |
| after_fork |
| |
+----------------------+

手机如下:

[源码解析] 并行分布式任务队列 Celery 之 多进程模型

3.3.1.2 子进程抽象封装 --- WorkerProcess

WorkerProcess 其实是对 fork 出来的子进程的一个抽象封装。用来方便快捷的管理子进程,抽象成一个进程池,这个 w 会记录 fork 出来的子进程的一些 meta 信息,比如 pid,管道的读写的 fd 等等,并注册在主进程中,主进程可以利用它进行任务分发

WorkerProcess 的作用为封装了 ForkProcess。ForkProcess定义如下:

class ForkProcess(process.BaseProcess):
_start_method = 'fork' @staticmethod
def _Popen(process_obj):
from .popen_fork import Popen
return Popen(process_obj)
3.3.1.2.1 WorkerProcess 具体执行

WorkerProcess 具体执行为:

def WorkerProcess(self, worker):
worker = super().WorkerProcess(worker)
worker.dead = False
return worker

首先执行基类中的代码,因此最终返回 ForkProcess:

def Process(self, *args, **kwds):
return self._Process(*args, **kwds) def WorkerProcess(self, worker):
return worker.contribute_to_object(self.Process(target=worker))

在 self._Process(*args, **kwds) 调用中,相关变量为:

self._Process = {type} <class 'billiard.context.ForkProcess'>
args = {tuple: 0} ()
kwds = {dict: 1} {'target': <celery.concurrency.asynpool.Worker object at 0x7f9c306326a0>}
self = {AsynPool} <celery.concurrency.asynpool.AsynPool object at 0x7f9c30604da0>

于是 调用到 ForkProcess(process.BaseProcess) 基类。

3.3.1.2.2 基类 BaseProcess

BaseProcess 基类如下,注意这里 run 就是子进程的 loop_target 就是 子进程的 运行代码

 _target = {Worker} <celery.concurrency.asynpool.Worker object at 0x7f9ad358b240>

定义如下:

class BaseProcess(object):
'''
Process objects represent activity that is run in a separate process
The class is analagous to `threading.Thread`
''' def __init__(self, group=None, target=None, name=None,
args=(), kwargs={}, daemon=None, **_kw): count = next(_process_counter)
self._identity = _current_process._identity + (count, )
self._config = _current_process._config.copy()
self._parent_pid = os.getpid()
self._popen = None
self._target = target
self._args = tuple(args)
self._kwargs = dict(kwargs)
self._name = (
name or type(self).__name__ + '-' +
':'.join(str(i) for i in self._identity)
)
if daemon is not None:
self.daemon = daemon
if _dangling is not None:
_dangling.add(self) self._controlled_termination = False def run(self):
'''
Method to be run in sub-process; can be overridden in sub-class
'''
if self._target:
self._target(*self._args, **self._kwargs)

基类处理完,于是得到 ForkProcess

self = {ForkProcess} <ForkProcess(ForkProcess-1, initial)>
authkey = {AuthenticationString: 32} b''
daemon = {bool} False
exitcode = {NoneType} None
ident = {NoneType} None
name = {str} 'ForkProcess-1'
pid = {NoneType} None
_args = {tuple: 0} ()
_authkey = {AuthenticationString: 32}
_children = {set: 0} set()
_config = {dict: 2} {'authkey': b'', 'semprefix': '/mp'}
_counter = {count} count(2)
_daemonic = {bool} False
_identity = {tuple: 1} 1
_kwargs = {dict: 0} {}
_name = {str} 'ForkProcess-1'
_parent_pid = {int} 14747
_popen = {NoneType} None
_start_method = {str} 'fork'
_target = {Worker} <celery.concurrency.asynpool.Worker object at 0x7f9ad358b240>
_tempdir = {NoneType} None
3.3.1.2.3 加入进程列表

生成子进程之后,self._pool.append(w) 的作用就是把子进程 加入 父进程之中 的 子进程列表。并且配置 queues。

def _create_worker_process(self, i):
sentinel = self._ctx.Event() if self.allow_restart else None
inq, outq, synq = self.get_process_queues()
on_ready_counter = self._ctx.Value('i') w = self.WorkerProcess(self.Worker(
inq, outq, synq, self._initializer, self._initargs,
self._maxtasksperchild, sentinel, self._on_process_exit,
# Need to handle all signals if using the ipc semaphore,
# to make sure the semaphore is released.
sigprotection=self.threads,
wrap_exception=self._wrap_exception,
max_memory_per_child=self._max_memory_per_child,
on_ready_counter=on_ready_counter,
)) self._pool.append(w) # 运行到了这里。
self._process_register_queues(w, (inq, outq, synq)) #到了这里

此时变量如下:

self = {AsynPool} <celery.concurrency.asynpool.AsynPool object at 0x7f9ad36680f0>
ResultHandler = {type} <class 'celery.concurrency.asynpool.ResultHandler'>
SoftTimeLimitExceeded = {type} <class 'billiard.exceptions.SoftTimeLimitExceeded'>
Supervisor = {type} <class 'billiard.pool.Supervisor'>
TaskHandler = {type} <class 'billiard.pool.TaskHandler'>
TimeoutHandler = {type} <class 'billiard.pool.TimeoutHandler'>
Worker = {type} <class 'celery.concurrency.asynpool.Worker'>
......
outbound_buffer = {deque: 0} deque([])
readers = {dict: 0} {}
restart_state = {restart_state} <billiard.common.restart_state object at 0x7f9ad3668e80>
sched_strategy = {int} 4
timers = {dict: 1} {<bound method Pool.maintain_pool of <celery.concurrency.asynpool.AsynPool object at 0x7f9ad36680f0>>: 5.0} write_stats = {Counter: 0} Counter()
_Process = {type} <class 'billiard.context.ForkProcess'>
_active_writers = {set: 0} set()
_active_writes = {set: 0} set()
_all_inqueues = {set: 0} set()
_busy_workers = {set: 0} set()
_cache = {dict: 0} {}
_ctx = {ForkContext} <billiard.context.ForkContext object at 0x7f9ad27ad828>
_fileno_to_inq = {dict: 0} {}
_fileno_to_outq = {dict: 0} {}
_fileno_to_synq = {dict: 0} {}
_initargs = {tuple: 2} (<Celery myTest at 0x7f9ad270c128>, 'celery@me2koreademini')
_inqueue = {NoneType} None
_max_memory_per_child = {NoneType} None
_maxtasksperchild = {NoneType} None
_on_ready_counters = {dict: 0} {}
_outqueue = {NoneType} None
_poll_result = {NoneType} None
_pool = {list: 1} [<ForkProcess(ForkPoolWorker-1, initial daemon)>]
_poolctrl = {dict: 0} {}
_proc_alive_timeout = {float} 4.0
_processes = {int} 4
_putlock = {LaxBoundedSemaphore} <LaxBoundedSemaphore at 0x7f9ad354db70 value:4 waiting:0>
_queues = {dict: 4} {(<billiard.queues._SimpleQueue object at 0x7f9ad35acef0>, <billiard.queues._SimpleQueue object at 0x7f9ad3668160>, None): <ForkProcess(ForkPoolWorker-1, initial daemon)>, (<billiard.queues._SimpleQueue object at 0x7f9ad36684a8>, <billiard.queues._SimpleQu
_quick_get = {NoneType} None
_quick_put = {NoneType} None
_state = {int} 0
_taskqueue = {Queue} <queue.Queue object at 0x7f9ad2a30908>
_waiting_to_start = {set: 0} set()
_wrap_exception = {bool} True
sentinel = {NoneType} None
synq = {NoneType} None
3.3.1.3 fork 过程

w.start() 中包含具体的 fork 过程。

def _create_worker_process(self, i):
sentinel = self._ctx.Event() if self.allow_restart else None
inq, outq, synq = self.get_process_queues()
on_ready_counter = self._ctx.Value('i') w = self.WorkerProcess(self.Worker(
inq, outq, synq, self._initializer, self._initargs,
self._maxtasksperchild, sentinel, self._on_process_exit,
# Need to handle all signals if using the ipc semaphore,
# to make sure the semaphore is released.
sigprotection=self.threads,
wrap_exception=self._wrap_exception,
max_memory_per_child=self._max_memory_per_child,
on_ready_counter=on_ready_counter,
))
self._pool.append(w)
self._process_register_queues(w, (inq, outq, synq))
w.name = w.name.replace('Process', 'PoolWorker')
w.daemon = True
w.index = i
w.start() # 此时到了这里,将要进行 fork。
self._poolctrl[w.pid] = sentinel
self._on_ready_counters[w.pid] = on_ready_counter
if self.on_process_up:
self.on_process_up(w)
return w

具体代码如下:

class BaseProcess(object):
'''
Process objects represent activity that is run in a separate process
The class is analagous to `threading.Thread`
''' def start(self):
'''
Start child process
'''
assert self._popen is None, 'cannot start a process twice'
assert self._parent_pid == os.getpid(), \
'can only start a process object created by current process'
_cleanup()
self._popen = self._Popen(self)
self._sentinel = self._popen.sentinel
_children.add(self)

其中主要是 self._popen = self._Popen(self) 比较重要,我们看下 Popen 的源码 _launch:

class ForkProcess(process.BaseProcess):
_start_method = 'fork' @staticmethod
def _Popen(process_obj):
from .popen_fork import Popen
return Popen(process_obj)

代码在:/billiard/popen_fork.py。

看到这里我们应该明白了。在执行 launch 方法的时候,会使用 os.fork() 派生出一个子进程,并且使用 ps.pipe() 创建出一对读写的管道,之后通过比较 [self.pid] 是否为 0,从而在主进程和子进程中执行不同的逻辑:

  • 子进程关闭 读 管道,之后执行 process_obj._bootstrap() 方法。
  • 父进程关闭 写管道,并且记录读管道的 fd。
class Popen(object):
method = 'fork'
sentinel = None def __init__(self, process_obj):
sys.stdout.flush()
sys.stderr.flush()
self.returncode = None
self._launch(process_obj) def duplicate_for_child(self, fd):
return fd def poll(self, flag=os.WNOHANG):
if self.returncode is None:
while True:
try:
pid, sts = os.waitpid(self.pid, flag)
except OSError as e:
if e.errno == errno.EINTR:
continue
# Child process not yet created. See #1731717
# e.errno == errno.ECHILD == 10
return None
else:
break
if pid == self.pid:
if os.WIFSIGNALED(sts):
self.returncode = -os.WTERMSIG(sts)
else:
assert os.WIFEXITED(sts)
self.returncode = os.WEXITSTATUS(sts)
return self.returncode def _launch(self, process_obj):
code = 1
parent_r, child_w = os.pipe()
self.pid = os.fork()
if self.pid == 0:
try:
os.close(parent_r)
if 'random' in sys.modules:
import random
random.seed()
code = process_obj._bootstrap()
finally:
os._exit(code)
else:
os.close(child_w)
self.sentinel = parent_r

3.4.2 辅助管理 Supervisor

Supervisor 类会定期对线程池进行维护,比如是否需要动态缩放。

class Supervisor(PoolThread):

    def __init__(self, pool):
self.pool = pool
super(Supervisor, self).__init__() def body(self): time.sleep(0.8)
pool = self.pool try:
# do a burst at startup to verify that we can start
# our pool processes, and in that time we lower
# the max restart frequency.
prev_state = pool.restart_state
pool.restart_state = restart_state(10 * pool._processes, 1)
for _ in range(10):
if self._state == RUN and pool._state == RUN:
pool._maintain_pool()
time.sleep(0.1) # Keep maintaing workers until the cache gets drained, unless
# the pool is termianted
pool.restart_state = prev_state
while self._state == RUN and pool._state == RUN:
pool._maintain_pool()
time.sleep(0.8)
except RestartFreqExceeded:
pool.close()
pool.join()
raise

3.3.3 给子进程分配任务 ---- TaskHandler

这个类是负责具体业务,即在这里把任务消息从父进程传递给子进程

之前建立 TaskHandler 中,重要点就是

  • self._taskqueue 传递进来,这样以后就通过这个来传递任务消息,这个_taskqueue 就是简单的数据结构应用,用来在Celery Consumer worker 和 pool 之间做消息缓冲。

  • self._quick_put 传递进来,赋值给了 put,即 put 指向了 self._inqueue.put

  • 这样 TaskHandler 就通过 put(task) 这个来给 父子进程之前的 管道 _inqueue 发送消息。就是说,TaskHandler 内部,如果 父进程 接到消息,就 通过 self._inqueue.put 这个管道的函数 给 自己 的 子进程发消息self._taskqueue 就是一个中间变量而已。

此时 各种 queue 的来源是:

self._taskqueue = Queue()

def _setup_queues(self):
self._inqueue = Queue()
self._outqueue = Queue()
self._quick_put = self._inqueue.put
self._quick_get = self._outqueue.get self._task_handler = self.TaskHandler(self._taskqueue,
self._quick_put,
self._outqueue,
self._pool,
self._cache)

所以初始化时候变量为:

outqueue = {SimpleQueue} <billiard.queues.SimpleQueue object at 0x000001B55131AE88>

pool = {list: 8} [<SpawnProcess(SpawnPoolWorker-1, started daemon)>, <SpawnProcess(SpawnPoolWorker-2, started daemon)>, <SpawnProcess(SpawnPoolWorker-3, started daemon)>, <SpawnProcess(SpawnPoolWorker-4, started daemon)>, <SpawnProcess(SpawnPoolWorker-5, started daemon)>, <SpawnProcess(SpawnPoolWorker-6, started daemon)>, <SpawnProcess(SpawnPoolWorker-7, started daemon)>, <SpawnProcess(SpawnPoolWorker-8, started daemon)>]

put = {method} <bound method _ConnectionBase.send of <billiard.connection.PipeConnection object at 0x000001B55131AF08>>

self = {TaskHandler} Unable to get repr for <class 'billiard.pool.TaskHandler'>
taskqueue = {Queue} <queue.Queue object at 0x000001B551334308>

TaskHandler简略版代码如下:

class TaskHandler(PoolThread):

    def __init__(self, taskqueue, put, outqueue, pool, cache):
self.taskqueue = taskqueue
self.put = put
self.outqueue = outqueue
self.pool = pool
self.cache = cache
super(TaskHandler, self).__init__() def body(self):
cache = self.cache
taskqueue = self.taskqueue
put = self.put for taskseq, set_length in iter(taskqueue.get, None):
task = None
i = -1
try:
for i, task in enumerate(taskseq):
if self._state:
break
put(task)
else:
if set_length:
set_length(i + 1)
continue
break self.tell_others() def tell_others(self):
outqueue = self.outqueue
put = self.put
pool = self.pool try:
# tell result handler to finish when cache is empty
outqueue.put(None) # tell workers there is no more work
for p in pool:
put(None) def on_stop_not_started(self):
self.tell_others()

此时逻辑为:

注意:这里图中的 Worker scope 是 celery/apps/worker.py,属于 Celery 之中逻辑范畴,不是子进程相关概念(下面各图 同)。Celery 中有多个同名类,这点很让人纠结

                           +
Consumer |
message |
v strategy +------------------------------------+
+------------+------+ | strategies |
| on_task_received | <--------+ | |
| | |[myTest.add : task_message_handler] |
+------------+------+ +------------------------------------+
|
|
+------------------------------------------------------------------------------------+
strategy |
|
|
v Request [myTest.add]
+------------+-------------+ +---------------------+
| task_message_handler | <-------------------+ | create_request_cls |
| | | |
+------------+-------------+ +---------------------+
| _process_task_sem
|
+------------------------------------------------------------------------------------+
Worker | req[{Request} myTest.add]
v
+--------+-----------+
| WorkController |
| |
| pool +-------------------------+
+--------+-----------+ |
| |
| apply_async v
+-----------+----------+ +---+-------------------+
|{Request} myTest.add | +---------------> | TaskPool |
+----------------------+ +----+------------------+
myTest.add |
|
+--------------------------------------------------------------------------------------+
|
v
+----+------------------+
| billiard.pool.Pool |
+-------+---------------+
|
|
Pool +---------------------------+ |
| TaskHandler | |
| | | self._taskqueue.put
| _taskqueue | <---------------+
| |
+---------------------------+

手机如下:

[源码解析] 并行分布式任务队列 Celery 之 多进程模型

3.4.3 处理子进程返回 --- ResultHandler

父进程 使用 ResultHandler 用来处理子进程的运行返回。

def create_result_handler(self):
return super().create_result_handler(
fileno_to_outq=self._fileno_to_outq,
on_process_alive=self.on_process_alive,
) class ResultHandler(_pool.ResultHandler):
"""Handles messages from the pool processes.""" def __init__(self, *args, **kwargs):
self.fileno_to_outq = kwargs.pop('fileno_to_outq')
self.on_process_alive = kwargs.pop('on_process_alive')
super().__init__(*args, **kwargs)
# add our custom message handler
self.state_handlers[WORKER_UP] = self.on_process_alive

具体变量如下:

ResultHandler = {type} <class 'celery.concurrency.asynpool.ResultHandler'>
daemon = {property} <property object at 0x7f847454d638>
fdel = {NoneType} None
exitcode = {property} <property object at 0x7f8475c9e8b8>
fdel = {NoneType} None
fset = {NoneType} None
ident = {property} <property object at 0x7f847454d4f8>
fdel = {NoneType} None
fset = {NoneType} None
name = {property} <property object at 0x7f847454d598>
fdel = {NoneType} None
_initialized = {bool} False

具体代码如下 ,可以看到使用 poll 阻塞等待消息。

    def _process_result(self, timeout=1.0):
poll = self.poll
on_state_change = self.on_state_change while 1:
try:
ready, task = poll(timeout) if ready:
on_state_change(task)
if timeout != 0: # blocking
break
else:
break
yield def handle_event(self, fileno=None, events=None):
if self._state == RUN:
if self._it is None:
self._it = self._process_result(0) # non-blocking
try:
next(self._it)
except (StopIteration, CoroStop):
self._it = None

具体 poll 对应于 _poll_result,就是 self._outqueue._reader.poll(timeout)。

可见其阻塞在 outqueue上,就是子进程的管道外发接口。

def _setup_queues(self):
self._inqueue = self._ctx.SimpleQueue()
self._outqueue = self._ctx.SimpleQueue()
self._quick_put = self._inqueue._writer.send
self._quick_get = self._outqueue._reader.recv def _poll_result(timeout):
if self._outqueue._reader.poll(timeout):
return True, self._quick_get()
return False, None
self._poll_result = _poll_result

所以此时逻辑如下:

                           +
Consumer |
message |
v strategy +------------------------------------+
+------------+------+ | strategies |
| on_task_received | <--------+ | |
| | |[myTest.add : task_message_handler] |
+------------+------+ +------------------------------------+
|
|
+------------------------------------------------------------------------------------+
strategy |
|
|
v Request [myTest.add]
+------------+-------------+ +---------------------+
| task_message_handler | <-------------------+ | create_request_cls |
| | | |
+------------+-------------+ +---------------------+
| _process_task_sem
|
+------------------------------------------------------------------------------------+
Worker | req[{Request} myTest.add]
v
+--------+-----------+
| WorkController |
| |
| pool +-------------------------+
+--------+-----------+ |
| |
| apply_async v
+-----------+----------+ +---+-------------------+
|{Request} myTest.add | +---------------> | TaskPool |
+----------------------+ +----+------------------+
myTest.add |
|
+--------------------------------------------------------------------------------------+
|
v
+----+------------------+
| billiard.pool.Pool |
+-------+---------------+
|
|
Pool +---------------------------+ |
| TaskHandler | |
| | | self._taskqueue.put
| _taskqueue | <---------------+
| |
+------------+--------------+
|
| put(task)
|
| +------------------+
| | ResultHandler |
| +------------------+
|
| ^
| |
| |
+--------------------------------------------------------------------------------------+
| |
Sub process | |
v +
self._inqueue self._outqueue

手机如下:

[源码解析] 并行分布式任务队列 Celery 之 多进程模型

3.5 配置file 到 queue 的关系

最后,根据建立子进程结果,配置file 到 queue 的关系。

可以看出来,这里配置了outq 和 synq 的关系,即这些 queue 指向哪一个 子进程。

代码如下:

class AsynPool(_pool.Pool):
"""AsyncIO Pool (no threads).""" def __init__(self, processes=None, synack=False,
sched_strategy=None, proc_alive_timeout=None,
*args, **kwargs): ......
super().__init__(processes, *args, **kwargs) for proc in self._pool:
# create initial mappings, these will be updated
# as processes are recycled, or found lost elsewhere.
self._fileno_to_outq[proc.outqR_fd] = proc
self._fileno_to_synq[proc.synqW_fd] = proc

配置完成 fd 之后,为:

self._fileno_to_outq = {dict: 4}
8 = {ForkProcess} <ForkProcess(ForkPoolWorker-1, started daemon)>
12 = {ForkProcess} <ForkProcess(ForkPoolWorker-2, started daemon)>
16 = {ForkProcess} <ForkProcess(ForkPoolWorker-3, started daemon)>
20 = {ForkProcess} <ForkProcess(ForkPoolWorker-4, started daemon)>
__len__ = {int} 4 self._fileno_to_synq = {dict: 1} {None: <ForkProcess(ForkPoolWorker-4, started daemon)>}

3.6 AsynPool 总体结果

最终 AsynPool 的结果如下,我们可以看到内部各种变量,大家可以对应前文进行理解:

self = {AsynPool} <celery.concurrency.asynpool.AsynPool object at 0x7fe44f664128>
ResultHandler = {type} <class 'celery.concurrency.asynpool.ResultHandler'>
SoftTimeLimitExceeded = {type} <class 'billiard.exceptions.SoftTimeLimitExceeded'>
Supervisor = {type} <class 'billiard.pool.Supervisor'>
TaskHandler = {type} <class 'billiard.pool.TaskHandler'>
TimeoutHandler = {type} <class 'billiard.pool.TimeoutHandler'>
Worker = {type} <class 'celery.concurrency.asynpool.Worker'>
allow_restart = {bool} False
enable_timeouts = {bool} True
lost_worker_timeout = {float} 10.0
max_restarts = {int} 100
on_process_down = {NoneType} None
on_process_up = {NoneType} None
on_timeout_cancel = {NoneType} None
on_timeout_set = {NoneType} None
outbound_buffer = {deque: 0} deque([])
process_sentinels = {list: 4} [25, 27, 29, 31]
putlocks = {bool} False
readers = {dict: 0} {}
restart_state = {restart_state} <billiard.common.restart_state object at 0x7fe44f6644a8>
sched_strategy = {int} 4
soft_timeout = {NoneType} None
synack = {bool} False
threads = {bool} False
timeout = {NoneType} None
timers = {dict: 1} {<bound method Pool.maintain_pool of <celery.concurrency.asynpool.AsynPool object at 0x7fe44f664128>>: 5.0}
write_stats = {Counter: 0} Counter()
_Process = {type} <class 'billiard.context.ForkProcess'>
_active_writers = {set: 0} set()
_active_writes = {set: 0} set()
_all_inqueues = {set: 0} set()
_busy_workers = {set: 0} set()
_cache = {dict: 0} {}
_ctx = {ForkContext} <billiard.context.ForkContext object at 0x7fe44e7ac7f0>
_fileno_to_inq = {dict: 0} {}
_fileno_to_outq = {dict: 4} {8: <ForkProcess(ForkPoolWorker-1, started daemon)>, 12: <ForkProcess(ForkPoolWorker-2, started daemon)>, 16: <ForkProcess(ForkPoolWorker-3, started daemon)>, 20: <ForkProcess(ForkPoolWorker-4, stopped[SIGABRT] daemon)>}
_fileno_to_synq = {dict: 1} {None: <ForkProcess(ForkPoolWorker-4, stopped[SIGABRT] daemon)>}
_initargs = {tuple: 2} (<Celery myTest at 0x7fe44e61cb38>, 'celery@me2koreademini')
_inqueue = {NoneType} None
_max_memory_per_child = {NoneType} None
_maxtasksperchild = {NoneType} None
_on_ready_counters = {dict: 4} {14802: <Synchronized wrapper for c_int(0)>, 14803: <Synchronized wrapper for c_int(0)>, 14804: <Synchronized wrapper for c_int(0)>, 14806: <Synchronized wrapper for c_int(0)>}
_outqueue = {NoneType} None
_poll_result = {NoneType} None
_pool = {list: 4} [<ForkProcess(ForkPoolWorker-1, started daemon)>, <ForkProcess(ForkPoolWorker-2, started daemon)>, <ForkProcess(ForkPoolWorker-3, started daemon)>, <ForkProcess(ForkPoolWorker-4, stopped[SIGABRT] daemon)>]
_poolctrl = {dict: 4} {14802: None, 14803: None, 14804: None, 14806: None}
_proc_alive_timeout = {float} 4.0
_processes = {int} 4
_putlock = {LaxBoundedSemaphore} <LaxBoundedSemaphore at 0x7fe44f54bf98 value:4 waiting:0>
_queues = {dict: 4} {(<billiard.queues._SimpleQueue object at 0x7fe44f664160>, <billiard.queues._SimpleQueue object at 0x7fe44f664240>, None): <ForkProcess(ForkPoolWorker-1, started daemon)>, (<billiard.queues._SimpleQueue object at 0x7fe44f664550>, <billiard.queues._SimpleQu
_quick_get = {NoneType} None
_quick_put = {NoneType} None
_result_handler = {ResultHandler} <ResultHandler(Thread-170, initial daemon)>
_state = {int} 0
_task_handler = {TaskHandler} <TaskHandler(Thread-168, initial daemon)>
_taskqueue = {Queue} <queue.Queue object at 0x7fe44f664978>
_terminate = {Finalize} <Finalize object, callback=_terminate_pool, args=(<queue.Queue object at 0x7fe44f664978>, None, None, [<ForkProcess(ForkPoolWorker-1, started daemon)>, <ForkProcess(ForkPoolWorker-2, started daemon)>, <ForkProcess(ForkPoolWorker-3, started daemon)>, <ForkP
_timeout_handler = {TimeoutHandler} <TimeoutHandler(Thread-169, initial daemon)>
_timeout_handler_mutex = {DummyLock} <kombu.asynchronous.semaphore.DummyLock object at 0x7fe44f6cb7b8>
_timeout_handler_started = {bool} False
_waiting_to_start = {set: 0} set()
_worker_handler = {Supervisor} <Supervisor(Thread-151, initial daemon)>
_wrap_exception = {bool} True

因此,本文最终图如下,其中 worker 就是子进程工作代码,ForkProcess 是子进程抽象(这里只展示出一个)

    +------------------------------+                                                                     +----------------+
| Pool(bootsteps.StartStopStep)| +-----------------+ | Connection |
+-------------+--------------+ | _SimpleQueue | | |
| | | | _write |
| | _reader +---------> | _read |
| | | | _send |
1 | instantiate | | | _recv |
| | | | _handle+---> {int} 8 <-+
2 on_start | | | +----------------+ |
| | _poll +---------> _ConnectionBase.poll |
+-------------+ | | | |
| | | | | +----------------+ |
| | v | _writer +---------> | Connection | |
| +---+---+-----------+ | | | | |
| | TaskPool | +-------+---------+ | _handle+----> {int} 7 |
| | | +------+ ^ | | |
| | app +----------> |celery| | +----------------+ ^ |
| | | +------+ | | |
| | | + | |
| | | +--------------------------+ +----> (<_SimpleQueue>, <_SimpleQueue>) | |
| | _pool +----------> | AsynPool | | | |
| | | | | | | |
| +---+---------------+ | _queues +------->-----> (<_SimpleQueue>, <_SimpleQueue>) | |
| ^ | | | | |
| | | _fileno_to_inq | | | |
| | | | +----> (<_SimpleQueue>, <_SimpleQueue>) | |
+-------------+ | _fileno_to_outq +--+ | | |
| | | | | |
| _queues[queues] | | +----> (<_SimpleQueue>, <_SimpleQueue>) | |
| + | | | |
| _pool | | | +----------------------+ | |
| + | | | | | | |
+--------------------------+ | | Worker inq | | |
| | | | | | |
| | | | outq | | |
2.1 append(w) | | | | | | |
| | | | synq | | |
v | | | | | |
+-------------+--+ | | | inqW_fd +-----------------------------------------+ |
| | <-+ | | | |
| ForkProcess | | | outqR_fd +------------------------------------------------+
| | <------+ | |
| | | workloop |
| _target +------------> | |
| | | after_fork |
| | | |
+----------------+ +----------------------+

手机如下:

[源码解析] 并行分布式任务队列 Celery 之 多进程模型

0xFF 参考

Celery 源码学习(二)多进程模型

celery源码分析-worker初始化分析(下)