[置顶] 太极分布式事务处理框架MOONWATER使用指南

时间:2021-07-24 06:23:56

太极分布式事务处理框架MOONWATER使用指南

                                        李万鸿2017-5-18 

 

结合目前事务处理的精华,我开发了太极分布式事务处理框架MOONWATER,采用可靠消息服务和重试、补偿处理机制,使用事件驱动、最终一致的事务模型,巧妙地运用数据库的事务处理能力,对服务操作结果进行判断,调用应用系统自身的事务处理功能,自动进行事务处理,从而有效地解决微服务的分布式事务处理问题。框架采用消息机制调用服务,速度快、灵活,通过使用缓存,解决服务调用的冥等性和消息的冥等性,在事务处理时,采用异步并行调用对应的服务,提高了性能。MOONWATER是一个非常优秀的框架,优势在于提高了应用的成功率,自动进行分布式事务处理,事务处理速度快,提高了数据的一致性,把对事务的处理由不可控变为可控,需要人工处理的故障可一键完成,简单快捷,实现事务处理的自动化,框架提供SDK,开发使用方便,高效实用,可以支持任何微服务架构的项目,而且可以运用于任何其他项目,是一个业界领先的世界级成果,可以简单有效地实现CQRS+Event Sourcing领域模型DDD架构开发,及其他方式的微服务开发,实现一个路由灵活、数据可靠传输、高可用、高性能、易扩展的消息服务架构。开源地址:https://github.com/moonufo/galaxyLight

一.框架原理

  

微服务具有独立的数据库,因此必然需要进行分布式事务处理,其他类似的系统也会遇到这个问题。太极框架可以简单轻松地解决分布式事务,可以实现数据的最终一致性(BASE),也可提供接近tcc事务处理的功能。框架采用消息调用和获取服务处理的结果,各服务提供补偿服务模块,业务系统提供一个事务处理模块,专门用于处理本业务系统各个服务完成的情况,提供重试、补偿等操作,保证事务的最终一致性。框架提供故障处理界面,对未成功处理的事务,一键搞定。tcc是强一致性,太极框架是弱一致性base,适用于不同场景,95%可用太极框架

 

 

二.框架架构 

   太极分布式事务处理框架(MOONWATER)提供消息服务和事务处理。应用系统调用微服务时可靠发送消息,消息数据存入数据库,应用处理结束,根据各个微服务的成功情况决定是否需要事务处理,如果需要,则发送对应的消息。MOONWATER根据消息调用应用系统的事务处理服务进行事务处理,对需要处理的服务进行重试或补偿操作,如果操作不成功,则发人工处理消息,保存此操作,以后管理员可一键处理事务。在事务处理时,采用异步并行调用服务,可以提高效率。

   框架使用开源的RabbitMq提供消息通道、redis实现缓存、mysql保存消息数据数据,0成本,高效益,性价比高。

 

三.消息架构优势

    太极框架采用消息调用服务,应用模块向发布-订阅channel写入消息(message)来通知框架调用其他服务,调用的服务完成后向框架发消息返回结果,整个过程通过消息完成。消息机制具有诸多优势,在大型产品上得到广泛应用,已经成为目前架构设计的核心元素,微服务架构采用消息机制将如虎添翼,更能发挥其优势。

   消息传递功能是应用程序开发领域的一个关键组成部分,它使开发人员能够将应用程序分解为组件,并将它们松散耦合在一起。消息传递功能支持在组件之间异步耦合,使用消息队列或主题来发送消息。

消息发布/订阅模式减少了应用程序之间的不必要链接和网络流量。无需频繁使用 REST 轮询调用,在将消息发送到所有关注方(订阅方)之前,应用程序可以保持空闲状态。在消息到达时,订阅的应用程序就会收到消息。基本上讲,无需使用 REST 调用询问“我们到了没”,订阅者就可以获得一个“拍肩提醒”来表明他们想要的消息已经到达。

在发布/订阅拓扑结构中,发布应用程序将消息(通常是一个主题字符串)发送到目标。发布者发出消息的能力并不依赖于订阅者应用程序是否可供访问。消息代理充当着中介,而且负责将消息传送到每个订阅者。

在任何时间点,某一订阅者都有可能是不可用的。消息代理可将消息保留到订阅者可以使用它的时候。消息传送保证的水平是通过服务质量(Quality of Service,QoS)来衡量的,比如最多传一次或至少传一次。

在项目中使用消息队列的一个重要优势是支持特性增长。最初的简单应用程序很容易发展成为一个不断加入新特性的庞大应用程序。在出现新需求时,简单的应用程序就会变得更加复杂,而且您需要采用某种简单方法来添加这些特性,并将对当前应用程序的影响降至最低。

因为消息传递功能有助于将应用程序的组件解耦,所以一个新应用程序特性可编写为使用消息传递功能作为与现有应用程序进行通信的一种途径,而无需在部署新特性时更改或中断主要应用程序。

系统中也可能有一个大型的整体式应用程序,而且您希望扩大企业规模,致力于集成互动协作系统(Systems of Engagement,SoE)。消息传递功能对将现有系统与新的前端特性相集成很有帮助,它支持在二者之间进行通信,不要求两种应用程序在同一个平台上运行或使用同一种语言编写,便于扩展内部部署的企业应用程序

许多现有系统可能已部署了某种类型的企业消息传递系统。因此,新微服务的最佳设计将是采用消息传递功能,该消息传递功能能够与现有系统相关联,并与现有的企业消息传递系统进行通信。

 1. 解耦

在项目启动之初来预测将来项目会碰到什么需求,是极其困难的。消息队列在处理过程中间插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口。这允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。

2. 冗余

有时在处理数据的时候处理过程会失败。除非数据被持久化,否则将永远丢失。消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。在被许多消息队列所采用的"插入-获取-删除"范式中,在把一个消息从队列中删除之前,需要你的处理过程明确的指出该消息已经被处理完毕,确保你的数据被安全的保存直到你使用完毕。

3. 扩展性

因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的;只要另外增加处理过程即可。不需要改变代码、不需要调节参数。扩展就像调大电力按钮一样简单。

4. 灵活性 & 峰值处理能力

当你的应用上了Hacker News的首页,你将发现访问流量攀升到一个不同寻常的水平。在访问量剧增的情况下,你的应用仍然需要继续发挥作用,但是这样的突发流量并不常见;如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住增长的访问压力,而不是因为超出负荷的请求而完全崩溃,实现请求的削峰填谷,保持稳定的响应处理。

5. 可恢复性

当体系的一部分组件失效,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。而这种允许重试或者延后处理请求的能力通常是造就一个略感不便的用户和一个沮丧的用户之间的区别。

6. 送达保证

消息队列提供的冗余机制保证了消息能被实际的处理,只要一个进程读取了该队列即可。在此基础上,RabbitMQ提供了一个"只送达一次"保证。无论有多少进程在从队列中领取数据,每一个消息只能被处理一次。这之所以成为可能,是因为获取一个消息只是"预定"了这个消息,暂时把它移出了队列。除非客户端明确的表示已经处理完了这个消息,否则这个消息会被放回队列中去,在一段可配置的时间之后可再次被处理。

7.排序保证

在许多情况下,数据处理的顺序都很重要。消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。RabbitMQ保证消息浆糊通过FIFO(先进先出)的顺序来处理,因此消息在队列中的位置就是从队列中检索他们的位置。

8.缓冲

在任何重要的系统中,都会有需要不同的处理时间的元素。例如,加载一张图片比应用过滤器花费更少的时间。消息队列通过一个缓冲层来帮助任务最高效率的执行--写入队列的处理会尽可能的快速,而不受从队列读的预备处理的约束。该缓冲有助于控制和优化数据流经过系统的速度。

 9. 理解数据流

在一个分布式系统里,要得到一个关于用户操作会用多长时间及其原因的总体印象,是个巨大的挑战。消息系列通过消息被处理的频率,来方便的辅助确定那些表现不佳的处理过程或领域,这些地方的数据流都不够优化。

10. 异步通信

很多时候,你不想也不需要立即处理消息。消息队列提供了异步处理机制,允许你把一个消息放入队列,但并不立即处理它。你想向队列中放入多少消息就放多少,然后在你乐意的时候再去处理它们。

四. 框架使用 

 太极框架的使用包括4部分,开发微服务和应用系统时需要编写相关代码:

微服务模块需要保存处理前的数据和写log,并实现冥等。

应用系统需要发消息调用服务。

应用事务处理模块保存数据,根据服务处理结果处理事务。

太极框架接受并处理消息,提供人工处理功能。

1.业务应用模块说明

在controller层,每个业务单元需要写对应的事务处理代码,比如对ComputeControllers写ComputeControllersBEH,在ComputeControllersBEH中处理所有对应的服务,根据各个服务返回的情况决定事务的处理,根据业务判断,如果关键服务成功,其余服务可以重试的则调用对应的服务进行重试;如果关键服务失败,则补偿其余已成功的服务。

2.业务调用sdk说明

在业务比如ComputeControllers中,首先生成业务bid,调用服务之前发业务开始的消息:thelp.send(restTemplate, serviceNumber, 0, pathParematerb, bid, 0, “”,urib, bapp, null, etype,

                  methodb,null, edate);

etype:1:业务开发bapp start ,2:业务成功结束ok,3:业务部分失败,可以进行事务处理而成功,6:业务不能成功bapp fatalfail;4:业务调用的服务成功service ok,5: 业务调用的服务失败fail

serviceNumber:业务调用的服务个数;

pathParematerb:业务的调用参数;

urib:业务的调用uri;

bapp:业务单元名字,比如增加订单

methodb:业务调用的方式:1.get,2:post;

edate:当前时间。

各服务调用完成后再发上面的消息。

业务处理结束,再发消息。

调用服务前生产服务id----sid,需要把服务的参数转化为json发送消息。

太极框架收到消息后把以上数据save db eventprocess表,并根据etype的结果判定,如果为3,则调用ComputeControllersBEH进行事务处理。

3.服务补偿说明

各服务需要写对应的补偿服务,比如:

ComputeControllersa包含一个add方法:

    @RequestMapping(value= "/add1/{a}", method = RequestMethod.GET)

    publicReturnResult add(@PathVariable String a, @RequestParam String pevh) {

则写一个ComputeControllersaP class,包含:

@RequestMapping(value ="/add1_comp/{a}", method = RequestMethod.GET)

    publicString add(@PathVariable String a, @RequestParam String pevh) {

,补偿服务完成服务的对冲补偿操作,比如给数据+8,则-8,insert则delete,updtae为x,则update为以前的数据y、delete则insert。

补偿服务从redis获取数据,进行处理:

if (!pevh.isEmpty())

           bsn= tjshelper.getBsn(pevh) + "add";

 

       stringRedisTemplate.opsForValue().get(bsn+ "Insert");

 

       //delete updtae id

       stringRedisTemplate.opsForValue().get(bsn+ "delete" );

 

       //update   data

       stringRedisTemplate.opsForValue().get(bsn+ "update" );

 

4.服务模块说明

各服务根据需要在redis用json保存操作前的数据,以便补偿,在服务开始时获取服务的key 即bsn:

    TJshelpertjshelper = new TJshelper();

       if(!pevh.isEmpty())

           bsn= tjshelper.getBsn(pevh) + "add";

再保存操作以前的数据:

Insert 数据的id

stringRedisTemplate.opsForValue().set(bsn +"Insert", id);

           stringRedisTemplate.expire(bsn,8, TimeUnit.HOURS);

 

           //删除的id  delete   id

           stringRedisTemplate.opsForValue().set(bsn+ "delete", id);

           stringRedisTemplate.expire(bsn,8, TimeUnit.HOURS);

 

           //修改前的数据用json保存 update one data

           stringRedisTemplate.opsForValue().set(bsn+ "updateOne", y);

           stringRedisTemplate.expire(bsn,8, TimeUnit.HOURS);

            

服务还需要把操作的数据写入db eventprocesslog:

           bsn= tjshelper.saveLog(eventMapper, pevh, etype);

服务处理需要保证冥等通过在:

// do only once ok 冥等

if (!stringRedisTemplate.hasKey(bsn)) {

服务返回类型为ReturnResult,根据运行情况设置不同StatusCode :

// do Service if ok etype=4 else 5 if fatalerror 6        

           booleanret = true;

           if(ret) {

              stringRedisTemplate.opsForValue().set(bsn,bsn);

              stringRedisTemplate.expire(bsn,5, TimeUnit.MINUTES);

              res.setSresult(a);

 

           }

           res.setStatusCode(etype);

5.太极框架处理功能

框架接受消息,保存数据到数据库,然后调用需要事务处理的业务,处理事务;每2分钟查找调用没有完成的业务,确保事务的正确性和数据的一致性。事务处理不成功的服务存入数据库,框架提供界面进行人工处理。管理员排除故障后,一键完成事务处理。

 

6.查询难点破解

同样由于微服务或系统的数据库独立,查询变得困难,多表的join无法完成。一个好办法是建立一个汇总库,把各个微服务的数据库数据同步进来,查询在总库完成,从而可以轻松简单解决。对于oracle数据库,同步变得简单,使用rac、adg可以轻松完成。对于mysql数据库,有2个办法。

 一是采用消息机制,当代码里数据库发生cud数据,则发送消息,然后调用线程把数据同步到总库。

二是根据数据库的log,把变化的数据同步到总库。阿里的mysql同步技术天下第一,开发了oceanbase分布式全球数据库,实现数据同步,为满足双11的天量访问并发提供了坚实的基础。

 

总之,太极框架通过消息和重试、补偿机制,高效、可靠地完成分布式事务处理,可以有效解决微服务等系统的分布式事务处理,是一个世界领先的成果,值得使用。