上篇关于流程引擎的文章还是快两年以前的《Github。
一. 我们抽象的是什么?
二. 逻辑推进和业务单元解耦思路
三. Pipeline 管道的设计实现
四. 使用Pipeline完成示例
一. 我们抽象的是什么?
首先,这个不是一个有着标准答案的问答题,只是用来开始这篇文章的发散思考。当然每个有着一定开发经验的过来人也可能都有自己不同的见解,我也做开发多年,从早些年的单纯CRUD(ctrl+c,ctrl+v 也干了不少),到后来参与复杂的业务逻辑,一直到自己全程负责打造产品。个人总结来看,常规业务产品的开发工作包含两个方面:1. 单点业务的操作(依然是CRUD为主),处理数据的存储和展示。 2. 点与点的连接(变数的部分),处理数据的流向。
单点的操作非常的简单,就是诸如保存文件,保存数据库,第三方接口调用等等。很多项目的复杂,主要是集中在第二点,不同的业务有着不同的生命周期事件点组成,特别是再配合不同的公司管理流程加入,即便是相同的业务,系统数据的流向也各有不同,在我有限的经验里,基本是在处理和抽象这一层面。(当然业务单元的粒度大小也是相当重要的,只是这个难度更容易解决)
二. 逻辑推进和业务单元解耦思路
如何解耦,这个每个人的方式方法多种多样,我先以简单的订单支付成功并且需要发送短信和邮件为例,看下演进的过程:
在早期,消息队列还没有大规模使用的时候,处理方式如图
这个时候,功能完全是放在一块的,开发简单快速,但功能耦合且性能低下。随着消息队列这些基础解决方案的使用,我们进行第二版快速改造:
这个时候发送邮件和短信耗时的部分通过消息队列转移至独立的服务处理,暂时提升了性能可用问题,但因为邮件和短信因为参数内容不同,依然需要拼接消息去操作两个不同的队列,耦合度依然存在。我们再更新第三版:
通过这一版,我们回到一个队列,增添了订单支付成功Hook服务,订单支付更新的方法内部不需要关注任何其他逻辑细节,仅需添加成功事件队列,逻辑进一步解耦,同时为后续的扩展提供了空间(在hook服务和具体的短信邮件之间依然可以通过队列处理,这里不做进一步说明)。
此时我们梳理一下当前的数据流:
虽然我们在第三版已经有了很大改善,但是我们可以看到,所有信息流的流动,依然是依赖上一个节点的显示调用。如果这里我们再添加错误重试,操作日志等需求,还是会或多或少的侵入业务代码之中。所以我们能不能更进一步处理,比如订单支付更新只关注更新,无需关注事件是通过消息队列还是异步线程传递给下游。成功HooK方法只需要关注消息组装分发,无需关注是否失败重试。
按照设想,可以得出如下图所示:
如果我们能通过上边的图示将个业务单元完全独立出来,那么在每个业务单元之间可以随时插入新的模块而相互之间不受干扰,且能根据实际的情况进行异常的介入处理。这也是我设计OSS.Pipeline的初衷。
三. Pipeline 管道的设计实现
通过上边的订单支付演进过程,基本展示了我的基本思路,这里我们将示例再次简化,方便继续讲解具体的实现
OSS.Pipeline 将所有的业务单元抽象为一个个节点,这些节点负责业务的具体执行,通过将这些Pipe组合形成业务的生命周期的流水线,即Pipeline。同时Pipeline本身也可作为一个独立的Pipe参与更上一个层级的业务流程之中(即子流水线)。通过将业务输出和逻辑输出的拆解,借助.Net 的泛型每一个管道都能定义独立的业务输入输出,和逻辑输入输出参数(有时,逻辑输入输出和业务输入输出虽然相同,但代表的含义不同),因为OSS.Pipeline 是为了业务生命周期而设计,所以我参照了BPM中的组件命名方式,并扩展对应的组件基类供业务层选择使用,具体的可用组件实现请参照gitee代码介绍
下边我会用上边订单的示例,来搭建一个Pipeline示例。
四. 使用Pipeline完成示例
1. 定义支付更新活动
public class OrderPayReq { public long OrderId { get; set; } public decimal PayMoney { get; set; } } /// <summary> /// 订单支付管道 /// OrderPayReq - 业务输入参数, bool - 业务输出执行成功失败, long - 逻辑输出订单Id /// </summary> internal class OrderPay : BaseActivity<OrderPayReq, bool, long> { protected override async Task<TrafficSignal<bool, long>> Executing(OrderPayReq para) { LogHelper.Info($"支付订单({para.OrderId})金额:{para.PayMoney} 成功"); await Task.Delay(10); // 返回执行成功,并告诉下级管道 订单Id return new TrafficSignal<bool, long>(true, para.OrderId); } }
2. 定义支付成功后的Hook活动:
public class NotifyMsg { public string target { get; set; } public string content { get; set; } public bool is_sms { get; set; } // 假设不是短信就是邮件 } /// <summary> /// 支付Hook /// long-是上级管道传入的订单Id, bool - 业务输出执行成功失败, List<NotifyMsg> 需要发送的消息列表 /// </summary> internal class PayHook : BaseActivity<long, bool, List<NotifyMsg>> { protected override async Task<TrafficSignal<bool, List<NotifyMsg>>> Executing(long para) { LogHelper.Info($"执行订单({para})Hook"); await Task.Delay(10); var msgs = new List<NotifyMsg> { new NotifyMsg() {target = "管理员", content = $"订单({para})支付成功,请注意发货"}, new NotifyMsg() {target = "用户", content = $"订单({para})支付成功,已经入服务流程", is_sms = true} }; return new TrafficSignal<bool, List<NotifyMsg>>(true, msgs); } }
3. 定义发送活动
/// <summary> /// 发送服务 /// NotifyMsg - 上级管道传递的业务输入参数, bool - 当前业务执行成功失败 /// </summary> internal class Notify : BaseActivity<NotifyMsg, bool> { protected override async Task<TrafficSignal<bool>> Executing(NotifyMsg para) { LogHelper.Info($"发送{(para.is_sms?"短信":"邮件")}消息 :{para.target}:{para.content}"); await Task.Delay(10); return new TrafficSignal<bool>(true); } }
4,定义一个Pipeline,将上边的管道串联起来,同时定义一个Watcher,将管道执行过程中的事件记录下来
internal class OrderPayPipeline { private static readonly OrderPay _pay = new OrderPay(); private static readonly PayHook _payHook = new PayHook(); private static readonly Notify _notify = new Notify(); static OrderPayPipeline() { _pay .AppendMsgFlow("order_pay_event") // 添加默认实现的异步消息队列中 .Append(_payHook) // 消息队列数据流向hook管道 .AppendMsgEnumerator() // Hook处理后有多条消息,添加消息枚举器 .Append(_notify); // 枚举后的单个消息体流入发送节点 // 添加日志,通过创建流水线,给流水线添加Watcher,会自动给下边的所有Pipe添加Watcher _pay.AsPipeline(_notify, new PipeLineOption() { Watcher = new FlowWatcher() },"OrderPayPipeline"); } // 作为对外暴露接口 public Task<bool> PayOrder(OrderPayReq req) { return _pay.Execute(req); } } public class FlowWatcher : IPipeLineWatcher { public Task PreCall(string pipeCode, PipeType pipeType, object input) { LogHelper.Info($"进入 {pipeCode} 管道", "PipePreCall", "PipelineWatcher"); return Task.CompletedTask; } public Task Executed(string pipeCode, PipeType pipeType, object input, WatchResult watchResult) { LogHelper.Info($"管道 {pipeCode} 执行结束,结束信号:{watchResult.signal}", "PipeExecuted", "PipelineWatcher"); return Task.CompletedTask; } public Task Blocked(string pipeCode, PipeType pipeType, object input, WatchResult watchResult) { LogHelper.Info($"管道 {pipeCode} 阻塞", "PipeBlocked", "PipelineWatcher"); return Task.CompletedTask; } }
5. 添加业务实际调用,这里使用单元测试:
private static readonly OrderPayPipeline payLine = new OrderPayPipeline(); [TestMethod] public async Task TestOrder() { var payRes =await payLine.PayOrder(new OrderPayReq() {OrderId = 111, PayMoney = 1000.00m}); await Task.Delay(100); Assert.IsTrue(payRes); // 订单支付更新结果 }
最后这里业务执行的日志如下:
2022-09-13 Code: Key: Detail:支付订单(111)金额:1000.00 成功
2022-09-13 Code: Key: Detail:执行订单(111)Hook
2022-09-13 Code: Key: Detail:发送邮件消息 :管理员:订单(111)支付成功,请注意发货
2022-09-13 Code: Key: Detail:发送短信消息 :用户:订单(111)支付成功,已经入服务流程
通过Watcher记录操作日志如下:
2022-09-13 Code: Key:PipePreCall Detail:进入 SimpleMsgFlow`1 管道
2022-09-13 Code: Key:PipeExecuted Detail:管道 OrderPay 执行结束,结束信号:Green_Pass
2022-09-13 Code: Key:PipeExecuted Detail:管道 SimpleMsgFlow`1 执行结束,结束信号:Green_Pass
2022-09-13 Code: Key:PipePreCall Detail:进入 PayHook 管道
2022-09-13 Code: Key:PipeExecuted Detail:管道 PayHook 执行结束,结束信号:Green_Pass
2022-09-13 Code: Key:PipePreCall Detail:进入 MsgEnumerator`1 管道
2022-09-13 Code: Key:PipePreCall Detail:进入 Notify 管道
2022-09-13 Code: Key:PipePreCall Detail:进入 Notify 管道
2022-09-13 Code: Key:PipeExecuted Detail:管道 Notify 执行结束,结束信号:Green_Pass
2022-09-13 Code: Key:PipeExecuted Detail:管道 Notify 执行结束,结束信号:Green_Pass
2022-09-13 Code: Key:PipeExecuted Detail:管道 MsgEnumerator`1 执行结束,结束信号:Green_Pass
如果你已经看到这里,并且感觉还行的话可以在下方点个赞,或者也可以关注我的公总号(见二维码)