Darwin Streaming server 的 Task 类

时间:2021-11-15 11:57:47

Darwin Streaming Server 是一个开放源代码的streaming server,对于streaming server的编程和软件结构有着一定的参考价值,它是使用C++写的,其中的并发模式的核心就是Task类,下面写一下我的理解:

多任务的程序常常采用线程+同步阻塞IO的模式, 每个线程/进程服务于一个client,使用阻塞式的IO:

Darwin Streaming server 的 Task 类

这种模式对于交互式的长连接应用也是常见的选择(比如Telnet)。好处是实现极其简单,容易嵌入复杂的交互逻辑。Apache、ftpd 等都是这种工作模式。但是这种策略很能难足高性能程序的需求。

在handle大量用户的情况下,为了避免创建过多的线程导致context switch开销,常常采用select I/O复用的方法(你可能说select过时了,不过Darwin QTSS就是用的这个):

Darwin Streaming server 的 Task 类

上面是经典的select IO复用的过程, 以上的过程可以由3步来描述:

  1. 应用注册事件

  2. 事件触发通知应用

  3. 应用运行处理事件

注意在SELECT THREAD有3个任务:接受事件注册,等待事件触发,驱动SESSION处理事件,这些任务可以分解为不同的角色,我们可以在这种模式里定义4种角色:

1. EventHandler : EventHandler 向 EventGenerator注册事件,并对注册事件进行处理

2.EventGenerator :EventGenerator接受事件注册,当事件触发的时候,通知 EventHandler

3.EventHandler Driver : EventHandler Driver 驱动EventHandler 的运行,当它发现有EventHandler 受到事件触发以后,调度运行 EventHandler 的事件处理函数。一个EventHandler Driver可以驱动多个EventHandler。多个EventHandler Driver可以组成 Pool 进行驱动EventHandler。

4. Event Driver : Event Driver 驱动 EventGenerator 的事件触发。EventGenerator本身不包含执行线程,需要Event Driver的驱动。多个EventGenerator 在 Event Driver 中等待事件,当事件发生时,Event Driver调度 EventGenerator的来触发事件。

下面就是Darwin对各个对象和上述角色的对应:

  1. Task 就是对EventHandler的对象化封装。每个Task对象有两个主要的方法:Signal和Run。当服务器希望发送一个事件给某个Task对象时,就会调用Signal()方法;而Run()方法是在Task对象获得处理该事件的时间片后运行的,服务器中的大部分工作都是在不同Task对象的Run()函数中进行的。每个Task对象的目标就是利用很小的且不会阻塞的时间片完成服务器指定某个工作。应用可以通过继承Task 并重写Run()方法实现自己的任务。

  2. EventContext 对应EventGenerator的角色,事件的触发者,当事件发生时,调用Task::signal().

  3. TaskThread 对应EventHandler Driver的角色。任务的驱动线程,对一个或者多个Task进行调度,通过调用 Task::run() 处理事件

  4. EventThread对应Event Driver的角色。EventContext的驱动线程,可以处理多个EventContext, 发生事件时调用EventContext::process_event(),后者将调用Task::Signal()

Darwin Streaming server 的 Task 类

流程:

    1. Client或者 Task的子类向Event Context注册事件。

    2. Event Context将事件放入EventThread的Pool内。

    3. EventThread 调用select 等待多个事件中任一个触发。

    4. 事件触发以后,EventThread调用 Event Context::process_event()。

    5. 调用 Task::signal()。

    6. Task::signal()将task放入TaskThread的队列。

    7. TaskThread调度相应的Task, 执行其Run()方法。