spring integration使用:消息转换器

时间:2023-01-03 00:37:00

系列文章目录

…TODO
spring integration开篇:说明
…TODO
spring integration使用:消息路由
spring integration使用:消息转换器



前言

本系列文章主要是通过一些实际项目场景举例,展开讲解spring integration对enterprise integration patterns的实现。个人能力所限,文中有不妥当或者错误的点还希望大家担待和指正。

文章的示例使用的都是java DSL风格代码,很多上下文都是通过使用Spring Expression Language (SpEL)来做的动作和内容,所以需要你对SpEL有一些了解,这个过程应该不会太长。

关于文章中使用的一些环境依赖和代码风格、约定,请看系列文章的开篇说明。


消息转换器(或者叫翻译器)的概念

在许多情况下,企业集成解决方案在现有应用程序(如遗留系统、打包应用程序、自行开发的自定义应用程序或由外部合作伙伴运营的应用程序)之间路由消息。这些应用程序中的每一个通常都是围绕专有数据模型构建的。每个应用程序对客户实体的概念可能略有不同,定义客户的属性以及客户与哪些其他实体相关。例如,会计系统可能对客户的纳税人 ID 号更感兴趣,而客户关系管理 (CRM) 系统存储电话号码和地址。应用程序的基础数据模型通常驱动物理数据库模式、接口文件格式或编程接口 (API) 的设计,这些实体是集成解决方案必须与之交互的。因此,应用程序期望接收模仿应用程序内部数据格式的消息。

除了各种应用程序中包含的专有数据模型和数据格式之外,集成解决方案通常还与寻求独立于特定应用程序的标准化数据格式进行交互。有许多联盟和标准机构定义了这些协议,例如RosettaNet,ebXML,OAGIS和许多其他行业特定的联盟。在许多情况下,集成解决方案需要能够使用“官方”数据格式与外部各方进行通信,而内部系统则基于专有格式。

使用不同数据格式的系统如何使用消息传递相互通信?

spring integration使用:消息转换器

在其他过滤器或应用程序之间使用特殊筛选器(消息转换器)将一种数据格式转换为另一种数据格式。

消息转换器是 [GoF] 中描述的适配器模式的消息传递等效项。适配器将组件的接口转换为另一个接口,以便可以在不同的上下文中使用。

在EIP中叫translator。

二、translator在spring integration中的实现分为4个组件

transformer:将源消息转换(翻译)为你指定的任意格式或者类型(比如XML转换为JSON)。
content enricher:动态扩充源消息的header或者payload内容,加字段之类的操作。
claim check:是一种消息传递机制,它可以解决消息体过大的问题,提高系统的可靠性和稳定性。
codec:编解码器对对象进行编码和解码。

transformer

content enricher

  • header enricher
  • payload enricher

claim check

Claim Check是一种消息传递机制,它可以解决消息体过大的问题,提高系统的可靠性和稳定性。

当消息体过大时,传输和处理这些消息会导致系统的性能下降。为了解决这个问题,Claim Check机制可以将消息体抽离出来,只传递消息的引用,而不是整个消息体。这样可以减少消息传输的数据量,提高传输效率。

在Claim Check机制中,消息的发送方将消息体存储到一个*存储区域,然后只传递消息体的引用给接收方。当接收方需要处理消息时,它可以使用引用来检索消息体,然后对消息进行处理。

Claim Check机制的优点是可以降低系统的开销,同时可以提高系统的可靠性和稳定性。通过使用Claim Check机制,可以避免因为消息体过大导致的系统错误和性能下降的问题,从而提高系统的可维护性和可扩展性。

codec

目标

通过对消息内容做判断将消息分流到不同的渠道中进行后续处理。

1.引入库

gradle

    implementation 'org.springframework.boot:spring-boot-starter-integration'
    implementation 'org.springframework.integration:spring-integration-http'
    implementation 'org.springframework.integration:spring-integration-file'

2.码代码

2.1.消息源

    public String getFeed() {
        RestTemplate restTemplate = new RestTemplate();
        String forObject = restTemplate.getForObject("https://spring.io/blog.atom", String.class);
//        String forObject = restTemplate.getForObject("https://tuna.moe/feed.xml", String.class);
//        System.out.println(forObject);
        return forObject;
    }

2.2.定义渠道

    @Bean
    public MessageChannel prefixa(){
        return new DirectChannel();
    }

2.3.定义集成流

    @Bean
    public IntegrationFlow httpOutboundFlow() {
        return IntegrationFlows.fromSupplier(this::getFeed, c -> c.poller(Pollers.fixedRate(10000)))
                .channel(MessageChannels.direct())
                .transform(Transformers.objectToString("UTF-8"))
                .split(s -> s.applySequence(false).delimiters(" "))
                .<String>filter((p) -> p.length() < 10 && p.matches("\\b[\\w]{3,}\\b"))
                .channel(MessageChannels.direct())
                .routeToRecipients(r->r
                        .applySequence(true)
                        .ignoreSendFailures(true)
                        .defaultOutputChannel("nullChannel")
                        .recipient("prefixa", "payload.startsWith('a')")

                )
                .get();
    }

2.4.定义用于处理分流过来消息(前缀为a的消息)集成流

    @Bean
    public IntegrationFlow printAFlow(){
        return IntegrationFlows.from("prefixa")
                .handle(p->{
                    System.out.println("^^^^^^^^^^^^^^^" + p.getPayload());
                })
                .get();
    }

总结

…TODO。