可编排策略在交易系统的应用

时间:2021-06-25 00:38:39


背景

在2019年的时候,由于组织架构变更,我接手了交易板块。接手后开始找之前的产品、开发沟通了解下这块有什么问题。

综合了下有以下问题:

  • 需求积压严重,开发列了满满一小本子(开发);
  • 需求提出方多,经常功能打架,然后就复制出来了一套(涉及的业务线业比较多)(开发);
  • 需求处理慢(产品);
  • 进入交易系统太慢(1~3秒),好多用户跳出(产品);
  • 生产经常出现各种问题,开发解决速度慢(产品);
  • 产品链路数据无法统计,只能分析出最终结果,具体哪块转化率低,无法统计(产品);
  • 用户转化成本过高(风控);
  • 动一块功能,所有的业务流程都需要回归,测试成本高(测试);

本着,耳听为虚,眼见为实的原则,我开始和产品对交易结构和扒拉代码和数据;

发现的问题:

  • 刚开始底层设计的还可以,
  • 随着业务线的增多,对接产品的增多,每对接一条业务线,复制一条(这里有产品的原因,也有技术的原因,产品提出,我想要什么什么,技术不做思考,那我就实现什么,压根不思考需求的合理性);
  • 整个流程产生异常较多,开发投入大量的精力排查问题,需求经常要加班搞;
  • 交易入口,调用了很多预用参数,用户不往下走不一定会用,规则罗列了太多,有些都不知道干什么;
  • 对接产品源头多,无主次,哪个业务线都需要提需求;
  • 整个上下文串不起来,依赖其他的系统模块,跳出去再跳回来,无法串起来,统计不出来用户整个流程的漏斗数据;
  • 入口处就开始调用各种数据源,性能上不去,成本还高;

梳理与设计

梳理了下各业务线和各机构的元素,大致是这个样子的(真实的业务逻辑比这个复杂多了,因为业务问题不方便透露)。

可编排策略在交易系统的应用

梳理完以后脑子里有两个问题:

  • 交易的业务结构应该是什么?
  • 我先画了一版,我理解的交易流程;
  • 然后和产品讨论,抛开我们现有的,真正的业务结构应该是什么?
  • 这么多类似的规则啥的,怎么给抽取出来?
  • 先安排人梳理所有的规则,然后找共性;

花了几天的时间和产品碰,以及细化规则点。确认如下流程。


可编排策略在交易系统的应用

业务结构清晰了,剩下的问题一个个的解决呗:

  • 和业务沟通利弊,让业务指定了一个主产品(要不然各提各的需求,互不关心,底层一套,容易打架)
  • 其他产品提的需求必须在大的交易结构里,可以定制自己的功能点;
  • 如果交易结构不满足业务的发展,就做大的调整(事实上3年了,我们都没有变动主体结构)
  • 准入哪些是必须? 不看过去,只看业务到底想要什么,然后再把过去的拿出来,对比,防止有遗漏;
  • 差异化的额度加载应该如何处理?通过策略模式来适配各业务场景;
  • 链路跟踪转化怎么处理?漏斗转化怎么实现?入口生成唯一标识,前端缓存,调用所有的交易接口,必须携带这个唯一标识;
  • 哪些规则可以延后?
  • 怎么认定是通用规则?怎么认定是特有规则?
  • 问题流程怎么方便排查?

其实,在这个过程中,我们分析发现,一次交易发起基本99.99%上10分钟内能全部结束。

  • 所以又和业务确认,除了额度以及合规相关的,其他的用户数据,是否在整个流程内一致(缓存以后,后台调整什么不用关注,比如,路由、试算)

准入与分发

在这个节点,我们只保留了最核心的校验,

  • 进入这个节点,最基本的授信得做了吧,不做你进来干啥?
  • 进入这个节点,总得有额度的吧,不管你激活与否。
  • 之前那些预处理的调用,哪些免费?哪些付费?免费的数据能否异步调用缓存?
  • 根据业务场景,走什么样的页面流程?
  • 为了方便测试人员测试,以及生产灰度,我得能控制用户走什么样的流程吧。
  • 一旦在交易系统里分发了,页面流程,我得有一个唯一标识能将交易整个串起来吧

可编排策略在交易系统的应用

在准入分发里:

  • 业务不关注的,做一个日志埋点,直接通过kafka(log4j2中批量把日志推入kafka中,然后进入es)流入es,在kibana中展现;
  • 业务关注的,真正进入了我这个流程,进行持久化埋点
  • 流程标识flowId:
  • 全局串联交易业务,以及分析交易转化
  • 在整个交易里进行校验,没有流程标识,认为是非法请求,前端直接重新调用准入分发接口;
  • 流程里还隐含了一个场景标识
  • 通过剥离非核心业务,以及数据获取后置
  • 这块接口响应,由之前的1~3秒,优化了到了100毫秒以下(这里还调用了一个系统的接口,我不能说)

通用处理

可编排策略在交易系统的应用

  • 针对交易的所有请求,进行流程验证,如果流程标识非法,前端会重新进入准入分发;
  • 统一请求日志埋点,通过kafka进入es中;
  • 然后才是各个业务的处理;

同时,针对每一步,进行接口抽取,然后进行默认实现,差异性的根据策略实现

  • 在没有确定机构之前,都是根据场景适配,有差异性,根据场景实现,没有就直接使用默认方案(默认方案是一定能通过的,最严格的);
  • 在确定了机构之后,都是根据场景+机构进行适配;

规则校验(重点)

用户输入金额,试算,对相关结果无异议,就点击同意协议进行下一步了;

在这里,把所有的规则都集中到这一处:

  • 到这里用户的意愿已经很强了
  • 不过不同的机构对个人信息要求都不一样;
  • 我们把分散在各个节点的机构校验规则,都放到了这里;
  • 哪些是强规则?哪些是弱规则?哪些是临时规则?
  • 技术这块没法判定
  • 强规则可能会随时间,或监管变成弱规则
  • 反过来,弱规则,也可能会根据监管变成强规则;
  • 既然这样,那我就让业务自己配置;
  • 同时,我们分析出来规则,有阻塞性的,还有是引导性的
  • 大大小小的规则,拆分出来后总共也就十来条

所以这块重点有两块,一块是规则的组装,一块是规则的抽取。

规则组装:

CREATE TABLE `t_flow_strategy_config` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键Id',
`capital_name` varchar(40) DEFAULT NULL COMMENT '机构:all,等',
`flow` varchar(40) DEFAULT 'all' COMMENT '流程标识:all, 其他为具体流程',
`scene` varchar(10) default null comment '场景,all 全场景,其他为指定场景',
`position` varchar(10) default null comment '位置,具体在哪执行',
`category` varchar(20) DEFAULT NULL COMMENT '策略类型block,guide',
`strategy_name` varchar(20) DEFAULT NULL COMMENT '策略名称',
`start_time` datetime DEFAULT NULL COMMENT '开始时间,临时策略生效时间',
`end_time` datetime DEFAULT NULL COMMENT '结束时间,临时策略失效时间',
`sort` int(2) DEFAULT NULL COMMENT '策略排序',
`enable_state` varchar(10) DEFAULT NULL COMMENT '策略可用状态:1:启用,0:禁用',
`creator` varchar(20) DEFAULT NULL COMMENT '创建人',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`updator` varchar(20) DEFAULT NULL COMMENT '更新人',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=COMPACT COMMENT='机构对应的策略配置';

规则抽取,这里抽出两种类型,一种是阻塞型的规则,一种是引导型的规则,类关系如下:

可编排策略在交易系统的应用

可编排策略在交易系统的应用

BaseStrategy 只最基本的

/**
* 〈公共策略〉
*
* @author yxkong
* @create 2019-05-12
* @since 1.0.0
*/
public interface BaseStrategy {
/**
* 服务调用走这个方法
*
* @param context 策略上下文对象
* @return
*/
boolean execute(StrategyContext context);

/**
* 策略执行前,返回true代表成功,无需再次引导
*
* @param context 策略上下文对象
* @return
*/
boolean before(StrategyContext context);

/**
* 策略执行
*
* @param strategyRuleDto 策略上下文对象
* @return true 代表策略执行成功,false代表策略执行失败
*/
boolean strategy(StrategyContext context);

/**
* 策略执行后
*
* @param context 策略上下文对象
* @param result 策略执行结果
* @return
*/
void after(StrategyContext context, boolean result);

/**
* 返回策略失败的特定标识,以及提示
*/
ResultBean<ValidateVo> getFailResult();
}
/**
* 〈基础策略抽象实现〉
*
* @author yxkong
* @create 2019-05-12
* @since 1.0.0
*/
public abstract class BaseStrategyImpl implements BaseStrategy {
protected Logger logger = LoggerFactory.getLogger(BaseStrategyImpl.class);
/**
* 执行策略抽象实现</br>
* 1,before 策略执行前的通用处理,
* 如果缓存中有,可以直接通过,防止重复验证
* 不同的策略类型,实现不一样
* 2,strategy 真实策略执行
* 3,after 策略执行后的通用处理,
* 包含日志的记录,以及缓存的时效
* @param context 策略上下文对象
* @return
*/
@Override
public boolean execute(StrategyContext context) {

if (this.before(context)) {
return Boolean.TRUE;
}
boolean result = false;
try {
result = this.strategy(context);
} catch (Exception e) {
// 设置异常状态
context.setStatus(StrategyResultStatusEnum.EXCEPTION.status);
String message = e.getMessage();
if (StringUtils.isNotNull(message) && message.length() > 500) {
message = message.substring(0, 500);
}
// 设置异常信息
context.setMessage(message);
logger.error("执行strategy:{} 失败", e);
}
this.after(context, result);
return result;
}
@Override
public void after(StrategyContext context, boolean result) {
CapitalStrategyConfigEntity capitalStrategyConfig = context.getFlowStrategyConfigEntity();
String strategyName = capitalStrategyConfig.getStrategyName();
String category = capitalStrategyConfig.getCategory();
if (logger.isInfoEnabled()) {
logger.info("执行{}:{}的结果:{} after ", category, strategyName, result);
}
if (result) {
//这里相对简单,以人为维护,缓存规则通过,1个小时 ,这也会记录入库记录log
successHandle(context);
} else {
//这里会记录埋点日志,并会入库记录log,哪一条规则,未过
failHandle(context);
}
}
}

阻塞策略抽象实现

/**
* 〈阻塞规则策略抽象实现〉
*
* @author yxkong
* @create 2019-05-14
* @since 1.0.0
*/
public abstract class BaseBlockStrategyImpl extends BaseStrategyImpl implements BlockStrategy {

@Override
public boolean before(StrategyContext context) {
// 阻塞策略在此不做校验
if (logger.isInfoEnabled()) {
logger.info("执行blockStrategy :{} before",context.getFlowStrategyConfigEntity().getStrategyName());
}
return false;
}
}

引导策略抽象实现:

/**
* 〈引导策略抽象实现〉
*
* @author yxkong
* @create 2019-05-14
* @since 1.0.0
*/
public abstract class BaseGuideStrategyImpl extends BaseStrategyImpl implements GuideStrategy {

public static final String GUIDE_CAPITAL_KEY_FORMAT = "%s";

@Override
public boolean before(StrategyContext context) {
/***
* 查询缓存是否有通过的数据
* 只缓存成功的,因为规则依赖外部查询,失败的重新去查,看是否成功
*/
return true;
}
@Override
public boolean after(StrategyContext context) {
/***
* 查询缓存是否有通过的数据
* 只缓存成功的,因为规则依赖外部查询,失败的重新去查,看是否成功
*/
return true;
}
}

一个具体的策略实现

/**
* 〈个人信息引导规则实现〉
*
* @author yxkong
* @create 2019-05-12
* @since 1.0.0
*/
@Service("personGuide")
public class PersonGuideStrategyImpl extends BaseGuideStrategyImpl {

@Autowired
private UserSystemService userSystemService;

@Override
public boolean strategy(StrategyContext context) {
ValidateDto validateDto = context.getValidateDto();
String capitalName = validateDto.getCapitalName();
capitalName = handleCapitalName(validateDto, capitalName);
// 查询用户信息,用户信息,在底层通过feign拦截器做了处理
ResultBean<PersonGuideVo> resultBean = userSystemService.verifyOCRPersonlInfo( capitalName);
if (ResultBeanUtil.isSuccess(resultBean)) {
return Boolean.TRUE;
}
context.getValidateVo().setPerson(resultBean.getData());
return false;
}

@Override
public ResultBean getFailResult() {
//自定义返回状态码和message
return ResultBeanUtil.result(StrategyStatusMessageEnum.PERSON_GUIDE);
}
}

策略执行

/**
* 执行接口定义
*/
public interface StrategyService {
ResultBean validate(StrategyContext context,String strategyName) ;
}

阻塞策略执行

/**
* 〈阻塞策略路由实现〉
*
* @author yxkong
* @create 2019-05-13
* @since 1.0.0
*/
@Service
public class BlockStrategyService implements StrategyService{
//利用spring的特性,省去了工厂和if的判断
@Autowired
private Map<String, BlockStrategy> blockStrategyMap;

/**
* 阻塞策略执行
*
* @param strategyRuleDto 策略执行时的上下文对象
*/
@Override
public ResultBean validate(StrategyContext context,String strategyName) {
BlockStrategy strategy= blockStrategyMap.get(strategyName);
boolean rst =strategy.execute( context);
if(rst){
//阻塞策略,拦截,表示失败
return strategy.getFailResult();
}
return ResultBeanUtil.sucess();
}
}

引导策略执行:

/**
* 〈引导策略路由实现〉
*
* @author ducongcong
* @create 2019-05-13
* @since 1.0.0
*/
@Service
public class GuideStrategyService implements StrategyService{

@Autowired
private Map<String, GuideStrategy> guideStrategyMap;

/**
* 引导策略执行
*
* @param strategyRuleDto 策略执行时的上下文对象
* @return true 表示策略通过,false表示策略失败
*/
@Override
public ResultBean validate(StrategyContext context,String strategyName) {
GuideStrategy strategy= guideStrategyMap.get(strategyName);
boolean rst =strategy.execute(context);
if(rst){
return ResultBeanUtil.sucess();

}
//引导用户去做对应的事
return strategy.getFailResult();
}
}

策略编排

  • 开发将业务规则实现,定义好对应的beanName;
  • 业务规则机构的需要,将规则通过后台配置到​​t_flow_strategy_config​​ 表中
  • 规则什么时候生效、什么时候时效,怎么排序都有(终于可以不加班熬夜等业务上线了)
  • 在流程过程中通过读取​​t_flow_strategy_config​​ 表获取所有的执行策略(原子方法)
public interface StrategyBuilder {
ResultBean validate(LoginContext loginContext, BizContext bizContext);
}


@Service
public class StrategyBuilderImpl implements StrategyBuilder{
@Resource
private BlockStrategyService blockStrategyService;
@Resource
private GuideStrategyService guideStrategyService;
@Override
public ResultBean validate(LoginContext loginContext, BizContext bizContext) {
/**
* 根据场景+机构+业务流程+位置 获取所有的策略
* 旁路缓存查的,基本上不变动,变动以后对一致性的要求没那么高,缓存了10分钟
*/
List<FlowStrategyConfigEntity> strategies = getScenePosition(bizContext.getScene(),bizContext.getPosition(),bizContext.getFlow(),bizContext.getCapitalName());
List<FlowStrategyConfigEntity> blockStrategies = getBlockStrategy(strategies);
StrategyContext strategyContext = buildContenxt(loginContext,bizContext);
for (FlowStrategyConfigEntity entity:blockStrategies){
ResultBean validate = blockStrategyService.validate(strategyContext,entity.getStrategyName());
if(!validate.isSuc()){
return validate;
}
}

List<FlowStrategyConfigEntity> guideStrategies = getBlockStrategy(strategies);
for (FlowStrategyConfigEntity entity:guideStrategies){
ResultBean validate = guideStrategyService.validate(strategyContext,entity.getStrategyName());
if(!validate.isSuc()){
return validate;
}
}
return ResultBeanUtil.success();
}
}

总结

  • 整个流程通过了流程标识串联了起来,没有流程标识,业务走不下;
  • 通过异步+数据后置,将准入分发的接口性能提升了将近20多倍,数据成本也降低了40%;
  • 整个流程重构后
  • 资金对接需求评审提升了70%以上
  • 对接效率提升了90%(基本上不动,动了以后后端都可以复用,也就是配合前端联调下,或者扩展些功能)
  • 测试效率提升至少60%,只是某个点改动,可以只测对应的点位;
  • 通过kibana配置报表,能实时查看各个规则节点跳出率,以及用户完成的意向;
  • 如果因为某个规则无法完成,导致用户中断,10分钟内就可以报警;
  • 实现了交易环节的整体监控
  • 通过不断地尝试,调整规则
  • 最终交易通过率提升了将近1倍

在整个重构的过程中,这里就用到以下一些方法:

  • 规则原子化以及标准化(最难的是在这里,入参标准,出参标准,流程标准)
  • 通过模板方法把相同的类型的规则按模板编排;
  • 通过表配置,将实现规则链式编排,规则通过链式(你可以理解为责任链的变种)执行每一条规则
  • 同时,这里利用了缓存,异步等思想来提升效率;

我觉的最重重重要的是对业务的理解,怎么通过系统引导用户,怎么通过体系化的建设来将整个业务串联起来。