Spring Boot集成Java DSL的实现代码

时间:2022-01-09 10:06:44

spring integration java dsl已经融合到spring integration core 5.0,这是一个聪明而明显的举动,因为:

  • 基于java config启动新spring项目的每个人都使用它
  • si java dsl使您可以使用lambdas等新的强大java 8功能
  • 您可以使用 基于integrationflowbuilderbuilder模式构建流

让我们看看基于activemq jms的示例如何使用它。

maven依赖:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
<dependencies>
  <dependency>
    <groupid>org.springframework.boot</groupid>
    <artifactid>spring-boot-starter-activemq</artifactid>
  </dependency>
 
  <dependency>
    <groupid>org.springframework.integration</groupid>
    <artifactid>spring-integration-core</artifactid>
  </dependency>
 
  <dependency>
    <groupid>org.springframework.integration</groupid>
    <artifactid>spring-integration-jms</artifactid>
  </dependency>
 
  <dependency>
    <groupid>org.springframework.boot</groupid>
    <artifactid>spring-boot-starter-test</artifactid>
    <scope>test</scope>
  </dependency>
 
  <dependency>
    <groupid>org.apache.activemq</groupid>
    <artifactid>activemq-kahadb-store</artifactid>
  </dependency>
 
  <!-- https://mvnrepository.com/artifact/org.springframework.integration/spring-integration-java-dsl -->
  <dependency>
    <groupid>org.springframework.integration</groupid>
    <artifactid>spring-integration-java-dsl</artifactid>
    <version>1.2.3.release</version>
  </dependency>
</dependencies>

示例1:jms入站网关

我们有以下serviceactivator

?
1
2
3
4
5
6
7
@service
public class activemqendpoint {
  @serviceactivator(inputchannel = "inboundchannel")
  public void processmessage(final string inboundpayload) {
    system.out.println("inbound message: "+inboundpayload);
  }
}

如果您想使用si java dsl 将inboundpayload从jms队列发送到gateway风格的激活器,那么请使用dsljms工厂:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@bean
public dynamicdestinationresolver dynamicdestinationresolver() {
  return new dynamicdestinationresolver();
}
 
@bean
public activemqconnectionfactory connectionfactory() {
  return new activemqconnectionfactory();
}
 
@bean
public defaultmessagelistenercontainer listenercontainer() {
  final defaultmessagelistenercontainer defaultmessagelistenercontainer = new defaultmessagelistenercontainer();
  defaultmessagelistenercontainer.setdestinationresolver(dynamicdestinationresolver());
  defaultmessagelistenercontainer.setconnectionfactory(connectionfactory());
  defaultmessagelistenercontainer.setdestinationname("jms.activemq.test");
  return defaultmessagelistenercontainer;
}
 
@bean
public messagechannel inboundchannel() {
  return messagechannels.direct("inboundchannel").get();
}
 
@bean
public jmsinboundgateway dataendpoint() {
  return jms.inboundgateway(listenercontainer())
      .requestchannel(inboundchannel()).get();
}

通过dataendpoint bean 返回jmsinboundgatewayspec,您还可以向si通道或jms目标发送回复。查看文档。

示例2:jms消息驱动的通道适配器

如果您正在寻找替换消息驱动通道适配器的xml jms配置,那么jmsmessagedrivenchanneladapter是一种适合您的方式:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@bean
public dynamicdestinationresolver dynamicdestinationresolver() {
  return new dynamicdestinationresolver();
}
 
@bean
public activemqconnectionfactory connectionfactory() {
  return new activemqconnectionfactory();
}
 
@bean
public defaultmessagelistenercontainer listenercontainer() {
  final defaultmessagelistenercontainer defaultmessagelistenercontainer = new defaultmessagelistenercontainer();
  defaultmessagelistenercontainer.setdestinationresolver(dynamicdestinationresolver());
  defaultmessagelistenercontainer.setconnectionfactory(connectionfactory());
  defaultmessagelistenercontainer.setdestinationname("jms.activemq.test");
  return defaultmessagelistenercontainer;
}
 
@bean
public messagechannel inboundchannel() {
  return messagechannels.direct("inboundchannel").get();
}
 
@bean
public jmsmessagedrivenchanneladapter dataendpoint() {
  final channelpublishingjmsmessagelistener channelpublishingjmsmessagelistener =
      new channelpublishingjmsmessagelistener();
  channelpublishingjmsmessagelistener.setexpectreply(false);
  final jmsmessagedrivenchanneladapter messagedrivenchanneladapter = new
      jmsmessagedrivenchanneladapter(listenercontainer(), channelpublishingjmsmessagelistener
      );
 
  messagedrivenchanneladapter.setoutputchannel(inboundchannel());
  return messagedrivenchanneladapter;
}

与前面的示例一样,入站有效负载如样本1中一样发送给激活器。

示例3:使用jaxb的jms消息驱动的通道适配器

在典型的场景中,您希望通过jms接受xml作为文本消息,将其转换为jaxb存根并在服务激活器中处理它。我将向您展示如何使用si java dsl执行此操作,但首先让我们为xml处理添加两个依赖项:

?
1
2
3
4
5
6
7
8
9
<dependency>
    <groupid>org.springframework.integration</groupid>
    <artifactid>spring-integration-xml</artifactid>
  </dependency>
 
  <dependency>
    <groupid>org.springframework</groupid>
    <artifactid>spring-oxm</artifactid>
  </dependency>

我们将通过jms接受shiporders ,所以首先xsd命名为shiporder.xsd:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
<?xml version="1.0" encoding="utf-8" ?>
<xs:schema xmlns:xs="http://www.w3.org/2001/xmlschema">
 
  <xs:element name="shiporder">
    <xs:complextype>
      <xs:sequence>
        <xs:element name="orderperson" type="xs:string"/>
        <xs:element name="shipto">
          <xs:complextype>
            <xs:sequence>
              <xs:element name="name" type="xs:string"/>
              <xs:element name="address" type="xs:string"/>
              <xs:element name="city" type="xs:string"/>
              <xs:element name="country" type="xs:string"/>
            </xs:sequence>
          </xs:complextype>
        </xs:element>
        <xs:element name="item" maxoccurs="unbounded">
          <xs:complextype>
            <xs:sequence>
              <xs:element name="title" type="xs:string"/>
              <xs:element name="note" type="xs:string" minoccurs="0"/>
              <xs:element name="quantity" type="xs:positiveinteger"/>
              <xs:element name="price" type="xs:decimal"/>
            </xs:sequence>
          </xs:complextype>
        </xs:element>
      </xs:sequence>
      <xs:attribute name="orderid" type="xs:string" use="required"/>
    </xs:complextype>
  </xs:element>
 
</xs:schema>

新增jaxb maven plugin 生成jaxb存根:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
<plugin>
     <groupid>org.codehaus.mojo</groupid>
     <artifactid>jaxb2-maven-plugin</artifactid>
     <version>2.3.1</version>
     <executions>
       <execution>
         <id>xjc-schema1</id>
         <goals>
           <goal>xjc</goal>
         </goals>
         <configuration>
           <!-- use all xsds under the west directory for sources here. -->
           <sources>
             <source>src/main/resources/xsds/shiporder.xsd</source>
           </sources>
 
           <!-- package name of the generated sources. -->
           <packagename>com.example.stubs</packagename>
           <outputdirectory>src/main/java</outputdirectory>
           <clearoutputdir>false</clearoutputdir>
         </configuration>
       </execution>
     </executions>
   </plugin>

我们已经准备好了存根类和一切,现在使用jaxb magic的java dsl jms消息驱动适配器:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
 * sample 3: jms message driven adapter with jaxb
 */
@bean
public jmsmessagedrivenchanneladapter dataendpoint() {
  final channelpublishingjmsmessagelistener channelpublishingjmsmessagelistener =
      new channelpublishingjmsmessagelistener();
  channelpublishingjmsmessagelistener.setexpectreply(false);
  channelpublishingjmsmessagelistener.setmessageconverter(new marshallingmessageconverter(shipordersmarshaller()));
  final jmsmessagedrivenchanneladapter messagedrivenchanneladapter = new
      jmsmessagedrivenchanneladapter(listenercontainer(), channelpublishingjmsmessagelistener
  );
 
  messagedrivenchanneladapter.setoutputchannel(inboundchannel());
  return messagedrivenchanneladapter;
}
 
@bean
public jaxb2marshaller shipordersmarshaller() {
  jaxb2marshaller marshaller = new jaxb2marshaller();
  marshaller.setcontextpath("com.example.stubs");
  return marshaller;
}

xml配置在java中使用它可以为您提供如此强大的功能和灵活性。要完成此示例,inboundchannel的服务激活器将如下所示:

?
1
2
3
4
5
6
7
8
9
/**
 * sample 3
 * @param shiporder
 */
@serviceactivator(inputchannel = "inboundchannel")
public void processmessage(final shiporder shiporder) {
  system.out.println(shiporder.getorderid());
  system.out.println(shiporder.getorderperson());
}

要测试流,您可以使用以下xml通过jconsole发送到jms队列:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
<?xml version="1.0" encoding="utf-8"?>   
 <shiporder orderid="889923"
   xmlns:xsi="http://www.w3.org/2001/xmlschema-instance"
   xsi:nonamespaceschemalocation="shiporder.xsd">
  <orderperson>john smith</orderperson>
   <shipto>
    <name>ola nordmann</name>
    <address>langgt 23</address>
    <city>4000 stavanger</city>
    <country>norway</country>
   </shipto>
   <item>
    <title>empire burlesque</title>
    <note>special edition</note>
    <quantity>1</quantity>
    <price>10.90</price>
    </item>
   <item>
    <title>hide your heart</title>
    <quantity>1</quantity>
    <price>9.90</price>
   </item>
 </shiporder>

示例4:具有jaxb和有效负载根路由的jms消息驱动的通道适配器

另一种典型情况是接受xml作为jms文本消息,将其转换为jaxb存根并根据有效负载根类型将有效负载路由到某个服务激活器。当然si java dsl支持所有类型的路由,我将向您展示如何根据有效载荷类型进行路由。

首先,将以下xsd添加到shiporder.xsd所在的文件夹中,并将其命名为purchaseorder.xsd:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
<xsd:schema xmlns:xsd="http://www.w3.org/2001/xmlschema"
      xmlns:tns="http://tempuri.org/purchaseorderschema.xsd"
      targetnamespace="http://tempuri.org/purchaseorderschema.xsd"
      elementformdefault="qualified">
  <xsd:element name="purchaseorder">
    <xsd:complextype>
      <xsd:sequence>
        <xsd:element name="shipto" type="tns:usaddress" maxoccurs="2"/>
        <xsd:element name="billto" type="tns:usaddress"/>
      </xsd:sequence>
      <xsd:attribute name="orderdate" type="xsd:date"/>
    </xsd:complextype>
  </xsd:element>
 
  <xsd:complextype name="usaddress">
    <xsd:sequence>
      <xsd:element name="name"  type="xsd:string"/>
      <xsd:element name="street" type="xsd:string"/>
      <xsd:element name="city"  type="xsd:string"/>
      <xsd:element name="state" type="xsd:string"/>
      <xsd:element name="zip"  type="xsd:integer"/>
    </xsd:sequence>
    <xsd:attribute name="country" type="xsd:nmtoken" fixed="us"/>
  </xsd:complextype>
</xsd:schema>

然后添加到jaxb maven插件配置:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
<plugin>
     <groupid>org.codehaus.mojo</groupid>
     <artifactid>jaxb2-maven-plugin</artifactid>
     <version>2.3.1</version>
     <executions>
       <execution>
         <id>xjc-schema1</id>
         <goals>
           <goal>xjc</goal>
         </goals>
         <configuration>
           <!-- use all xsds under the west directory for sources here. -->
           <sources>
             <source>src/main/resources/xsds/shiporder.xsd</source>
             <source>src/main/resources/xsds/purchaseorder.xsd</source>
           </sources>
 
           <!-- package name of the generated sources. -->
           <packagename>com.example.stubs</packagename>
           <outputdirectory>src/main/java</outputdirectory>
           <clearoutputdir>false</clearoutputdir>
         </configuration>
       </execution>
     </executions>
   </plugin>

运行mvn clean install以生成新xsd的jaxb存根。现在承诺有效负载根映射:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@bean
public jaxb2marshaller ordersmarshaller() {
  jaxb2marshaller marshaller = new jaxb2marshaller();
  marshaller.setcontextpath("com.example.stubs");
  return marshaller;
}
 
/**
 * sample 4: jms message driven adapter with jaxb and payload routing.
 * @return
 */
@bean
public jmsmessagedrivenchanneladapter dataendpoint() {
  final channelpublishingjmsmessagelistener channelpublishingjmsmessagelistener =
      new channelpublishingjmsmessagelistener();
  channelpublishingjmsmessagelistener.setmessageconverter(new marshallingmessageconverter(ordersmarshaller()));
  final jmsmessagedrivenchanneladapter messagedrivenchanneladapter = new
      jmsmessagedrivenchanneladapter(listenercontainer(), channelpublishingjmsmessagelistener
  );
 
  messagedrivenchanneladapter.setoutputchannel(inboundchannel());
  return messagedrivenchanneladapter;
}
 
@bean
public integrationflow payloadrootmapping() {
  return integrationflows.from(inboundchannel()).<object, class<?>>route(object::getclass, m->m
      .subflowmapping(shiporder.class, sf->sf.handle((messagehandler) message -> {
        final shiporder shiporder = (shiporder) message.getpayload();
        system.out.println(shiporder.getorderperson());
        system.out.println(shiporder.getorderid());
      }))
      .subflowmapping(purchaseorder.class, sf->sf.handle((messagehandler) message -> {
        final purchaseorder purchaseordertype = (purchaseorder) message.getpayload();
        system.out.println(purchaseordertype.getbillto().getname());
      }))
  ).get();
}

注意payloadrootmapping bean,让我们解释一下重要的部分:

  • <object, class<?>> route - 表示来自inboundchannel的输入将是object,并且将根据class <?>执行路由
  • subflowmapping(shiporder.class.. - shipoders的处理。
  • subflowmapping(purchaseorder.class ... - 处理purchaseorders。

要测试shiporder有效负载,请使用示例3中的xml,以测试purchaseorder有效负载,使用以下xml:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
<?xml version="1.0" encoding="utf-8"?>
<purchaseorder orderdate="1900-01-01" xmlns="http://tempuri.org/purchaseorderschema.xsd">
 <shipto country="us">
  <name>name1</name>
  <street>street1</street>
  <city>city1</city>
  <state>state1</state>
  <zip>1</zip>
 </shipto>
 <shipto country="us">
  <name>name2</name>
  <street>street2</street>
  <city>city2</city>
  <state>state2</state>
  <zip>-79228162514264337593543950335</zip>
 </shipto>
 <billto country="us">
  <name>name1</name>
  <street>street1</street>
  <city>city1</city>
  <state>state1</state>
  <zip>1</zip>
 </billto>
</purchaseorder>

应根据subflow 子流map路由两个有效载荷。

示例5:integrationflowadapter

除了企业集成模式的其他实现(check them out)),我需要提到integrationflowadapter。通过扩展此类并实现buildflow方法,如:

?
1
2
3
4
5
6
7
8
9
10
11
12
[url=https://bitbucket.org/component/]@component[/url]
public class myflowadapter extends integrationflowadapter {
 
@autowired
 private connectionfactory rabbitconnectionfactory;
 
 @override
 protected integrationflowdefinition<?> buildflow() {
   return from(amqp.inboundadapter(this.rabbitconnectionfactory, "myqueue"))
        .<string, string>transform(string::tolowercase)
        .channel(c -> c.queue("myflowadapteroutput"));
 }

你可以将bean的重复声明包装成一个组件并给它们所需的流量。然后可以配置这样的组件并将其作为一个类实例提供给调用代码!

因此,让我们举例说明这个repo中的示例3更短一些,并为所有jmsendpoints定义基类,并在其中定义重复bean:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public class jmsendpoint extends integrationflowadapter {
 
  private string queuename;
 
  private string channelname;
 
  private string contextpath;
 
  /**
   * @param queuename
   * @param channelname
   * @param contextpath
   */
  public jmsendpoint(string queuename, string channelname, string contextpath) {
    this.queuename = queuename;
    this.channelname = channelname;
    this.contextpath = contextpath;
  }
 
  @override
  protected integrationflowdefinition<?> buildflow() {
    return from(jms.messagedrivenchanneladapter(listenercontainer())
      .jmsmessageconverter(new marshallingmessageconverter(shipordersmarshaller()))
    ).channel(channelname);
  }
 
  @bean
  public jaxb2marshaller shipordersmarshaller() {
    jaxb2marshaller marshaller = new jaxb2marshaller();
    marshaller.setcontextpath(contextpath);
    return marshaller;
  }
 
  @bean
  public dynamicdestinationresolver dynamicdestinationresolver() {
    return new dynamicdestinationresolver();
  }
 
  @bean
  public activemqconnectionfactory connectionfactory() {
    return new activemqconnectionfactory();
  }
 
  @bean
  public defaultmessagelistenercontainer listenercontainer() {
    final defaultmessagelistenercontainer defaultmessagelistenercontainer = new defaultmessagelistenercontainer();
    defaultmessagelistenercontainer.setdestinationresolver(dynamicdestinationresolver());
    defaultmessagelistenercontainer.setconnectionfactory(connectionfactory());
    defaultmessagelistenercontainer.setdestinationname(queuename);
    return defaultmessagelistenercontainer;
  }
 
  @bean
  public messagechannel inboundchannel() {
    return messagechannels.direct(channelname).get();
  }
}

现在声明特定队列的jms端点很容易:

?
1
2
3
4
@bean
public jmsendpoint jmsendpoint() {
  return new jmsendpoint("jms.activemq.test", "inboundchannel", "com.example.stubs");
}

inboundchannel的服务激活器:

?
1
2
3
4
5
6
7
8
9
/**
 * sample 3, 5
 * @param shiporder
 */
@serviceactivator(inputchannel = "inboundchannel")
public void processmessage(final shiporder shiporder) {
  system.out.println(shiporder.getorderid());
  system.out.println(shiporder.getorderperson());
}

您不应该错过在项目中使用integrationflowadapter。我喜欢它的概念。

我最近在embedit的新的基于spring boot的项目中开始使用spring integration java dsl 。即使有一些配置,我发现它非常有用。

  • 它很容易调试。不添加像wiretap这样的配置。
  • 阅读起来要容易得多。是的,即使是lambdas!
  • 它很强大。在java配置中,您现在有很多选择。

源码地址: https://bitbucket.org/tomask79/spring-integration-java-dsl

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。

原文链接:https://www.jdon.com/51378