谈长耗时任务的优化

时间:2022-06-03 18:28:57

任何一个业务系统都有它的核心业务逻辑,在很多情况下核心业务逻辑通常都有一些特点:牵扯面广,依赖关系多,流程复杂等。

接下来,我就最近公司内一个真实的案例来简单谈谈对于一个长耗时的任务或者业务逻辑有哪些常用的优化手段。

案例简述

要说的这个系统案例是一个统一通信平台,它给客户提供收发短信、email、RTX消息等功能。不难想象最核心的业务当然是借助这个平台间接通过这些通讯 (sms/email/rtx)网关来收发消息。当然现实的场景远没这么简单,还有一些外围业务也围绕此展开:

  • 权限管理
  • 流量控制
  • 黑名单策略
  • 成员管理
  • 网关对接
  • ...
下面是一个用户反映的在单批次群发大概3000条左右的信息时,系统出现的一些故障:
1 、信息员在发送的时候是通过批量发送进行处理的,对于这 2913 人进行发送的时候按钮一按,系统一直处于发送状态,没有 发送成功 提示!等了很长时间也没出来,但运营商网关那里已经开始发送了。
2、管理员通过管理帐号登录查看日志信息信息提示2913条信息发送成功但实际上是很多不成功的,但我们短信系统内的日志却提示全部发送成功,我们也问了运营商的技术人员,他们答复网关反馈的代码只有一次,但是时间上有长有短,所以还需要后台支持这边看一下。
3、针对长短信的计数不准确,我们实际算2913人每人收到3条应该是8739条,而计数只扣除了2913条!
以上问题初步分析,大概可以总结出三个问题:
  • 请求处理时间超长,客户端长时间阻塞处于假死状态
  • 短信网关回执的异步写入产生混乱,不是时序问题就是出现处理异常
  • 长短信计算错误

梳理业务&模块拆分

业务梳理以及模块拆分是减少项目复杂度的手段之一。由于这是一个老项目,经手过的维护人员非常多,加之没有规范而且维护人员技术水准以及对项目的了解程度各不相同,导致项目中的业务流程非常混乱,到处都是if/else、相似逻辑的方法重复定义、特例配置遍布整个项目。只有拆分清楚了,剥离与独立部署才能成为可能。梳理的方式有很大,应该是多重手段并用的结合,常用的手段有:
  • 大方法化小
  • 提取重复逻辑
  • 重新判断方法的归属对象
  • 重新判断逻辑的归属层次
  • 避免逻辑层次的反向依赖
  • ....
当然,我个人认为领域驱动设计(DDD)比通常情况下单纯的分层架构更符合面向组件划分的理念(请注意这里所说的是单纯的分层架构,因为DDD里也可能按逻辑分层)。因为领域对象(Domain Object)的视角跟面向组件在职责单一上都非常契合。

分布式组件

分离关键业务形成分布式组件相对于all-in-one的web系统而言,有助于提升整个系统的可靠性、稳定性以及吞吐量。这里将系统中相对耗时的发送消息业务从web系统中剥离出来,放到网络上一个独立的节点上排队处理,可以充分利用新节点的计算能力来实现并发处理。这个案例中,另一个可以单独实现的组件是网关对接器:gateway-adapter。它的作用是为了适配网关接口,以及处理网关回执。这里所谓的分布式组件,可以是物理上的分布式(比如独立的物理节点),也可以是逻辑上的分布式(比如只是一个独立的JVM进程)。独立节点与否,可以参照节点资源的利用率,但只要跑在独立的JVM进程上,就可以保证单个服务的稳定性。分布式的组件通常都是基于事件驱动的,它们之间的通信可以基于消息中间件。
对于这个业务场景,可以将它大致拆分为三个服务组件:
  • 文件解析、验证组件
  • 发送消息业务逻辑处理组件
  • 消息发送网关适配组件

缓存查询数据

系统中有些业务数据的更新频次比较低,但读取的频次却非常高。对于这些读写频次差别比较大的业务数据。通常的优化手段有:分表、分库、读写分离、数据内存化(缓存)。
目前一些基于内存的缓存/数据库(如redis及memcached)的使用已经非常流行,它们的使用对于系统性能的提升是立竿见影的。使用它们的另一好处是:不会破坏原有系统的数据结构。对于原来的某个表,大部分字段都是只读的,但有个别字段写的频次却非常高。这时你提取字段拆分表,做读写分离的话,会破坏系统的表结构设计,进而会大改应用程序。而使用基于Key-Value的内存缓存,则会对于数据库以及应用程序作尽量少的改动,只需要对关键业务加入一些对缓存访问/处理的代码。但缓存的使用也会带来数据的可见性、一致性问题。这需要很好的刷新、同步机制。

并发&多线程

将流程拆分为分布式组件的一大优势就是可以利用更多节点的计算能力来提升业务处理的吞吐量。将原来串行处理流程修改为并行处理的方式,将能够加快对消息处理的速度。按组件拆分后,这些组件都是完备的“服务”,它们之间由消息中间件来传递消息以串联时序关系。这些组件共享数据库,彼此之间并不产生依赖,这些服务也可以看作是任务处理器。
这里多线程的主要应用场景在business-filter这个主业务逻辑组件上。在客户端有大批量发送的请求到来时,会在file-parser之后对待发送的目标进行拆分处理,拆分后的每个组将由一个线程来处理。至于分组的参考值,这是个权衡值,它既需要保证吞吐量,还要保证单个处理线程在时间允许的范围内重回线程池,从而避免线程饥饿。

context&pipeline&filter

pipeline&filter是处理同一数据对象的常用方式。它可以用于拆分&重组&串联业务流程。拆分流程的好处显而易见,你甚至可以基于一定的策略,动态加载或卸载一个filter(利用classloader),你也可以对后来修改某个filter的逻辑带来的影响最小化(比如对于根据号码反查不到工号,有些用户选择的处理方案是中止发送,有些用户选择的方案是继续发送,这样的改动将会被限制到某个特定的filter内,对外部完全透明)。
context在很多模式中都能起到一个粘合剂的作用。因为很多模式需要在编程接口上保持一致性,而因为处理的业务不同,它们需要处理的数据也不尽相同,所以需要一个Transfer Object对象来将它们封装起来,同时保持编程接口的一致性。

同步请求异步化

上面案例引发的第一个问题就是页面持续在parsing状态,后台因为阻塞式的同步执行,http请求迟迟无法得到响应,导致页面假死,体验非常不好。这也是另外一个优化点,而且对于这一点的优化,其价值会比后端优化大得多,因为它是直面终端用户的。
对于一个请求,后端如果耗费极长的时间,可以将同步请求异步处理。将请求快速响应给前端,然后以ajax的方式,轮询后端的请求进度。比如,就这个业务而言,我们就可以定义如下用于反映处理进度的数据结构:
谈长耗时任务的优化
然后在模板页的某个位置给出进度显示。

组件交互

组件之间交互关系如下示意图:
谈长耗时任务的优化

优化后处理流程

我们可以以一个请求作为触发,来梳理一下经过优化后的业务处理流程:

(1)web服务端接到“excel自定义批量发送”请求后,简单得校验一下格式,将存放发送目标的excel直接保存到服务器的磁盘上,同时给同样宿主在该server上的file-parser 组件发送一条表示“有任务到达”的消息,并附带有要发送的消息内容以及发起本次请求的员工信息,在数据库中创建一条请求记录并生成一个唯一的requestID作为处理的批次号(它用于备案以及作分布式处理的日志追踪,字段大致如下所示),然后该http请求便迅速响应给客户端,但只给出一个状态码:表示请求已被受理,正在处理中。
(2)这时客户端首先需要有一点小改进:在页面的模板或者iframe(如果使用的话)区域,展示一个通知提示区(为的是不断跟踪处理状态),需要这个展示区的目的是它们是全局的,不会影响内容页切换。
(3)上面的请求一旦被返回,就可以在模板中采用ajax请求来以poll或push的方式获取处理状态,采用哪种方式根据需要获知发送状态的精确程度而定,如果需要精确的话,可采用poll模式。
(4)回到服务端,excel解析服务会对文件进行解析,提取出里面发送目标的email或手机号。
(5)与此同时,它会在消息内容表中插入要发送的消息内容,并提取出该记录的ID作为对消息内容的引用
(6)接下来它会将要发送的号码拆分为若干组,并封装入若干个传输消息内(此处的消息指的是组件之间传输的消息,而非发送消息)。同时会附带发送消息内容的ID以及批次号。将这些传输消息发送到专门用于处理发送服务的business filter所listen的队列上
(7)business filter是一个分布式组件,用于接收文件解析服务发来的分组传输消息。收到传输消息后解析。然后对每个传输消息(内部包含已分好组的若干个发送目标),启用一个独立的线程进行并行处理。
(8)对每个线程而言(也可以说对每个传输消息而言),将这个传输消息按发送目标拆分成一个个的子发送消息(下面提及的发送消息都指代这种子发送消息),将发送消息对象包装进一个上下文对象中(Context此处的意义跟Transfer Object类似,在J2EE中也称之为Value Object),然后这个发送消息将要流过一个pipeline的filter-chain,每个filter都是一个子业务的封装,其中包括比如:

  1. 成员验证:比如要发送的号码是不是本系统的用户
  2. 流量控制:当前是否发送部门的流量已经超标
  3. 策略校验:是否满足用户的时间策略以及接受策略
总之这些filter大致都是两种类型:
  • 内容筛选器
  • 内容扩充器

(9)因为这些用filter包装起来的业务,大都在执行——查询&校验的动作,所以对这些业务需要查询的数据集构建合适的缓存,将能够有效得提升处理速度,当然这其中也不可避免会有部分的写操作。因为这里采用多线程并发处理,因此对竞争资源的写需要保护机制,通常通过同步来保证数据的一致性。

(10)filter进行到chain的最后一步,会根据中间filter处理的状态来判断最终这条消息是否满足发送条件:

  • 如果不满足:
    • 记录错误/失败原因;
    • 更新批次号为当前RequestID的那条记录的相关字段;处理结束
  • 如果满足:
    • 记录相关信息到消息对应关系表,这里需要的很多信息都是通过上面的内容扩充器filter进行填充的
    • 在数据库的下行队列表中创建一条该消息的发送记录,便于收到回执后改写消息的发送状态,并将这条记录的ID追加到消息中
    • 将消息发送到gateway-adapter对应的队列
    • 更新批次号为当前RequestID的那条记录的已投递数字段;处理结束

(11)上面将消息发送到gateway-adapter服务对应的队列而不是将其直接发送到网关对应的队列是因为在消息发送给网关之前还有一些问题需要处理:

  • 通常网关会是一个独立并且“标准”的service provider,它会对接很多系统,这时为了不让消息的格式过于混乱,它定义了满足它需求的标准格式;让所有服务使用者来适配它。而这个网关对接服务存在的目的就是为了适配网关的消息格式
  • 而对接服务存在的另一个理由是:根据网关发回的回执来更新数据库中消息的状态:
    • 对应于下行消息表中当前messageId的消息状态更新
    • 对应于当前requestID的发送成功数/发送失败数字段的更新

(12)对每一条消息进行处理,从而使得不同状态的累加数等于总记录数
以上通过将长耗时的同步任务异步化处理的优化,不仅使得消息从串行同步处理变成了并行异步处理,而且改善了用户体验,在发送的过程中,点击错误日志或者发送过的消息状态就可以实时看到部分处理的结果。