你听说过任督二脉吗?像这样~
咳咳~今天不讲武功,讲电商平台设计的功夫~
背景
当今的电商可不仅仅是B2C商城,接下来还会有O2O,往后可能还会有商超、奥莱、二手交易。。。且称之为业务模式~而每个业务模式下还会有预售、竞拍、拼团等不同组合的子模式。
可是我商城的商品列表页不想展示O2O的商品啊,商品列表的数据希望按一定规则相互隔离。其他模块,有的出于操作习惯的考虑不隔离,有的出于用户行为的考虑需要隔离。
各模块数据隔离需求如下
|
列表页 |
商详页 |
商品组 |
优惠券 |
活动 |
订单 |
... |
原商城 |
隔离 |
隔离 |
隔离 |
暂时不隔离 |
暂时不隔离 |
隔离 |
|
O2O |
隔离 |
隔离 |
隔离 |
暂时不隔离 |
暂时不隔离 |
隔离 |
|
各模块流程差异
|
新建商品 |
列表页 |
购物车 |
订单 |
... |
原商城 |
店铺创建,门店设置库存 |
基于item建es文档 |
跨门店 |
状态流转走快递 |
|
O2O |
门店创建(沿用原模型但弱化店铺的概念) |
基于item建es文档 |
单个门店 |
状态流转走配送 |
|
于是我们就会面临不同的改造的场景。
场景1,新建商品就是新建商品啊!!!
例如商品的新建保存,是基础服务,已经具备通用存储模型。为了支持新模式我还得改服务接口、发布二方包?咱可不可以这样?
商品服务
Integer bizMode = BizModeContext.getBizMode(); itemDO.setBizMode(bizMode); // ... itemDAO.save(itemDO);
场景2,下单就是下单啊!!!
例如创建订单,虽然商品维度、订单类型、优惠方式有很多,但我修改一下B2C下单的字段计算,还要引发O2O模式的回归测试?咱可不可以这样?
甚至这样~
实现类路由
@BizModeService( bizMode=BizMode.B2C, srvClz=OrderTradeService.class ) public class MallOrderTradeServiceImpl extends AbstractOrderTradeService { } //使用时 Integer bizMode = BizModeContext.getBizMode(); OrderTradeService srv = BizModeRouter.routeBean(bizMode, OrderTradeService.class);
眼尖的小哥哥可能已经发现,要是能再搭配个热加载的bean容器,都可以做成插件了!emmm...那是远景~
如何打通任督二脉?
首先要舌尖抵住上颚,再来三个深呼吸~然后拿起一本《Thinking In Java》或《Core Java》假装在修炼。。。等等。。。什么是任督二脉?
Java老司机都知道,我们通常会把ApplicationContext比作Spring的任督二脉,它贯穿始终,管理着bean的生命周期和传递。
所以电商平台的任督二脉就是BizModeContext啦!它的经脉图大概长这样~
所以我们通过下面一二三四,入口处打标、dubbo服务间传递、RocketMQ传递、本机线程池内传递,一步一步打通整个标的透传。
步骤1-打标
aop按包路径切面+注解覆盖,满足你不同的定制需求~于是,在用户点击页面操作的那一刻,每个接口都被打上了“模式标”。
注解打标
@Configuration public class ControllerConfig { @Aspect @Component public static class CxcAdvice implements BizModeControllerAspect { @Override public Integer getBizMode() { return 300; } @Override @Pointcut("execution(* com.mall.web.controller..*(..))") public void pointcut() { } } } @Slf4j @RestController @MarkBizMode(bizMode = 200) public class AdminOldController2 { @RequestMapping("/admin_anno_byclass") public String annoByClass() { log.info("annoByClass got bizmode: " + BizModeContext.getBizMode()); return "this is " + this.getClass().toString(); } @RequestMapping("/admin_anno_bymethod") @MarkBizMode(bizMode = 100) public String annoByMethod() { log.info("annoByMethod got bizmode: " + BizModeContext.getBizMode()); return "this is " + this.getClass().toString(); } }
步骤2-dubbo服务传递
借助dubbo自带的Filter和RpcContext可以轻松实现。那是因为dubbo的设计中已经充分考虑了。
Filter的使用
filter定义
@Activate(group = Constants.CONSUMER) public class BizModeDubboConsumerFilter implements Filter { }
filter配置扫描发现: /src/main/resources/META-INF/dubbo/com.alibaba.dubbo.rpc.Filter
filter的装配原理: List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
dubbo的SPI扩展机制就不具体展开啦~
RpcContext的生命周期
RpcContext -> RpcInvocation ---服务调用--- RpcInvocation -> RpcContext
业务扩展的调用:RpcContext.getContext().setAttachment("bizMode", (bizMode.toString()));
RpcContext.java
//创建一个线程隔离的上下文实例 private static final InternalThreadLocal<RpcContext> LOCAL = new InternalThreadLocal<RpcContext>() { @Override protected RpcContext initialValue() { return new RpcContext(); } }; public static RpcContext getContext() { return LOCAL.get(); }
dubbo对attachment的传递:
- 本机(当前线程)的保存:RpcContext
- 远程调用的保存和传递:RpcInvocation
- 将RpcContext存入RpcInvocation:AbstractInvoker
public abstract class AbstractInvoker<T> implements Invoker<T> { @Override public Result invoke(Invocation inv) throws RpcException { //节选。。。 Map<String, String> context = RpcContext.getContext().getAttachments(); if (context != null) { invocation.addAttachmentsIfAbsent(context); } if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)){ invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString()); } RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); //节选。。。 // return ... } protected abstract Result doInvoke(Invocation invocation) throws Throwable; }
- 序列化与反序列化:DubboCodec (此处不展开)
- 从RpcInvocation取出,存入提供方的RpcContext:ContextFilter
@Activate(group = Constants.PROVIDER, order = -10000) public class ContextFilter implements Filter { @Override public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { Map<String, String> attachments = invocation.getAttachments(); //节选。。。 RpcContext.getContext().getAttachments().putAll(attachments); //节选。。。 try { RpcResult result = (RpcResult) invoker.invoke(invocation); // pass attachments to result result.addAttachments(RpcContext.getServerContext().getAttachments()); return result; } finally { RpcContext.removeContext(); RpcContext.getServerContext().clearAttachments(); } } }
步骤3-RocketMQ传递
RocketMQ设计时也预留了扩展打标的能力,只需要把模式标存入属性字段,就能跟随MQ把标传递到消费方。
消息体数据结构
org.apache.rocketmq.common.message.Message |
private String topic;
//填入属性,仅包可见 //填入自定义属性,与其他属性共享map,但对key过滤保留字 |
org.apache.rocketmq.common.message.MessageExt 是Message的子类 |
private int queueId; private int storeSize; private long queueOffset; private long storeTimestamp; private long preparedTransactionOffset; |
因此,可以在消息体的 Map<String, String> properties 属性上附加打标信息。
发消息的扩展钩子
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.registerSendMessageHook(SendMessageHook)
收消息的扩展钩子
org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl.registerConsumeMessageHook(ConsumeMessageHook)
但由于收消息是一批一批收的,收到的是消息列表 List<MessageExt>,默认配置下只有一个元素,但允许配置多个,因此不能在这个钩子上做扩展。
因此,对starter做改造,在单个消息消费的位置增加了类似的hook扩展点。
ConsumerHook
public interface ConsumeOneMessageAdvice { String hookName(); void consumeMessageBefore(final MessageExt msg); void consumeMessageAfter(final MessageExt msg); }
步骤4-线程池子线程传递
BizModeContext的原理是用ThreadLocal存储线程范围的上下文,可是实际场景中,总会有些异步和并发的问题,需要使用到线程池。那么问题来了。
父线程context如何传递给子线程
jdk自带InheritableThreadLocal类解决了父子线程传递的问题。
Thread.init()
public class Thread implements Runnable { private void init(ThreadGroup g, Runnable target, String name, long stackSize, AccessControlContext acc, boolean inheritThreadLocals) { //节选。。。 Thread parent = currentThread(); if (inheritThreadLocals && parent.inheritableThreadLocals != null) this.inheritableThreadLocals = ThreadLocal.createInheritedMap(parent.inheritableThreadLocals); //节选。。。 } } //子线程创建时会把父线程的ThreadLocalMap复制到子线程中 public class ThreadLocal<T> { private ThreadLocalMap(ThreadLocalMap parentMap) { Entry[] parentTable = parentMap.table; int len = parentTable.length; setThreshold(len); table = new Entry[len]; for (int j = 0; j < len; j++) { Entry e = parentTable[j]; if (e != null) { @SuppressWarnings("unchecked") ThreadLocal<Object> key = (ThreadLocal<Object>) e.get(); if (key != null) { Object value = key.childValue(e.value); Entry c = new Entry(key, value); int h = key.threadLocalHashCode & (len - 1); while (table[h] != null) h = nextIndex(h, len); table[h] = c; size++; } } } } }
线程池中子线程复用时怎样维护context
但如果使用了线程池,子线程运行完并不会销毁,被另一个父线程复用时不会重新初始化。
这时候我们需要借助一个开源框架 TransmittableThreadLocal https://github.com/alibaba/transmittable-thread-local
(图片来自官网)
在获取子线程时重新读取父线程的上下文,子线程run()执行结束时清理子线程的上下文。
打通任督二脉后可以练什么武功?
打通模式标的透传后,能怎么使用呢?大家可以尽情发挥下想象力~何时何地只需要 BizModeContext.getBizMode()
- 日志MDC打标:可以统一给日志记录加入模式标。
- sql自动追加查询条件:通过mybatis插件扩展或甚至是数据源代理,可以给sql自动追加隔离标条件(虽然具体业务中并不那么好用)。
- 全链路监控或压测:是的,如果打标的不是bizMode,而是traceId或影子标,就可以通过这个“任督二脉”透传整个系统!
- 新模式插件化接入:各业务板块逐渐模块化后,可以通过给扩展点开发实现类的形式接入新模式。
远景-多模式插件化部署
我们期望,未来新的业务模式接入,就像安装插件一样无痛无感知。
新模式接入,只需要增加部署新的bizmodeX节点,其他业务不需要回归测试。
某个业务,例如bizmode100,部署重启时,其他业务不受影响。
这还需要一步一步来,目前我们先实现了“任督二脉”的打通,后面的故事,敬请期待哦~