spring integration java dsl已经融合到spring integration core 5.0,这是一个聪明而明显的举动,因为:
- 基于java config启动新spring项目的每个人都使用它
- si java dsl使您可以使用lambdas等新的强大java 8功能
- 您可以使用 基于integrationflowbuilder的builder模式构建流
让我们看看基于activemq jms的示例如何使用它。
maven依赖:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
<dependencies>
<dependency>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter-activemq</artifactid>
</dependency>
<dependency>
<groupid>org.springframework.integration</groupid>
<artifactid>spring-integration-core</artifactid>
</dependency>
<dependency>
<groupid>org.springframework.integration</groupid>
<artifactid>spring-integration-jms</artifactid>
</dependency>
<dependency>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter-test</artifactid>
<scope>test</scope>
</dependency>
<dependency>
<groupid>org.apache.activemq</groupid>
<artifactid>activemq-kahadb-store</artifactid>
</dependency>
<!-- https: //mvnrepository.com/artifact/org.springframework.integration/spring-integration-java-dsl -->
<dependency>
<groupid>org.springframework.integration</groupid>
<artifactid>spring-integration-java-dsl</artifactid>
<version> 1.2 . 3 .release</version>
</dependency>
</dependencies>
|
示例1:jms入站网关
我们有以下serviceactivator:
1
2
3
4
5
6
7
|
@service
public class activemqendpoint {
@serviceactivator (inputchannel = "inboundchannel" )
public void processmessage( final string inboundpayload) {
system.out.println( "inbound message: " +inboundpayload);
}
}
|
如果您想使用si java dsl 将inboundpayload从jms队列发送到gateway风格的激活器,那么请使用dsljms工厂:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
@bean
public dynamicdestinationresolver dynamicdestinationresolver() {
return new dynamicdestinationresolver();
}
@bean
public activemqconnectionfactory connectionfactory() {
return new activemqconnectionfactory();
}
@bean
public defaultmessagelistenercontainer listenercontainer() {
final defaultmessagelistenercontainer defaultmessagelistenercontainer = new defaultmessagelistenercontainer();
defaultmessagelistenercontainer.setdestinationresolver(dynamicdestinationresolver());
defaultmessagelistenercontainer.setconnectionfactory(connectionfactory());
defaultmessagelistenercontainer.setdestinationname( "jms.activemq.test" );
return defaultmessagelistenercontainer;
}
@bean
public messagechannel inboundchannel() {
return messagechannels.direct( "inboundchannel" ).get();
}
@bean
public jmsinboundgateway dataendpoint() {
return jms.inboundgateway(listenercontainer())
.requestchannel(inboundchannel()).get();
}
|
通过dataendpoint bean 返回jmsinboundgatewayspec,您还可以向si通道或jms目标发送回复。查看文档。
示例2:jms消息驱动的通道适配器
如果您正在寻找替换消息驱动通道适配器的xml jms配置,那么jmsmessagedrivenchanneladapter是一种适合您的方式:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
@bean
public dynamicdestinationresolver dynamicdestinationresolver() {
return new dynamicdestinationresolver();
}
@bean
public activemqconnectionfactory connectionfactory() {
return new activemqconnectionfactory();
}
@bean
public defaultmessagelistenercontainer listenercontainer() {
final defaultmessagelistenercontainer defaultmessagelistenercontainer = new defaultmessagelistenercontainer();
defaultmessagelistenercontainer.setdestinationresolver(dynamicdestinationresolver());
defaultmessagelistenercontainer.setconnectionfactory(connectionfactory());
defaultmessagelistenercontainer.setdestinationname( "jms.activemq.test" );
return defaultmessagelistenercontainer;
}
@bean
public messagechannel inboundchannel() {
return messagechannels.direct( "inboundchannel" ).get();
}
@bean
public jmsmessagedrivenchanneladapter dataendpoint() {
final channelpublishingjmsmessagelistener channelpublishingjmsmessagelistener =
new channelpublishingjmsmessagelistener();
channelpublishingjmsmessagelistener.setexpectreply( false );
final jmsmessagedrivenchanneladapter messagedrivenchanneladapter = new
jmsmessagedrivenchanneladapter(listenercontainer(), channelpublishingjmsmessagelistener
);
messagedrivenchanneladapter.setoutputchannel(inboundchannel());
return messagedrivenchanneladapter;
}
|
与前面的示例一样,入站有效负载如样本1中一样发送给激活器。
示例3:使用jaxb的jms消息驱动的通道适配器
在典型的场景中,您希望通过jms接受xml作为文本消息,将其转换为jaxb存根并在服务激活器中处理它。我将向您展示如何使用si java dsl执行此操作,但首先让我们为xml处理添加两个依赖项:
1
2
3
4
5
6
7
8
9
|
<dependency>
<groupid>org.springframework.integration</groupid>
<artifactid>spring-integration-xml</artifactid>
</dependency>
<dependency>
<groupid>org.springframework</groupid>
<artifactid>spring-oxm</artifactid>
</dependency>
|
我们将通过jms接受shiporders ,所以首先xsd命名为shiporder.xsd:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
<?xml version= "1.0" encoding= "utf-8" ?>
<xs:schema xmlns:xs= "http://www.w3.org/2001/xmlschema" >
<xs:element name= "shiporder" >
<xs:complextype>
<xs:sequence>
<xs:element name= "orderperson" type= "xs:string" />
<xs:element name= "shipto" >
<xs:complextype>
<xs:sequence>
<xs:element name= "name" type= "xs:string" />
<xs:element name= "address" type= "xs:string" />
<xs:element name= "city" type= "xs:string" />
<xs:element name= "country" type= "xs:string" />
</xs:sequence>
</xs:complextype>
</xs:element>
<xs:element name= "item" maxoccurs= "unbounded" >
<xs:complextype>
<xs:sequence>
<xs:element name= "title" type= "xs:string" />
<xs:element name= "note" type= "xs:string" minoccurs= "0" />
<xs:element name= "quantity" type= "xs:positiveinteger" />
<xs:element name= "price" type= "xs:decimal" />
</xs:sequence>
</xs:complextype>
</xs:element>
</xs:sequence>
<xs:attribute name= "orderid" type= "xs:string" use= "required" />
</xs:complextype>
</xs:element>
</xs:schema>
|
新增jaxb maven plugin 生成jaxb存根:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
<plugin>
<groupid>org.codehaus.mojo</groupid>
<artifactid>jaxb2-maven-plugin</artifactid>
<version> 2.3 . 1 </version>
<executions>
<execution>
<id>xjc-schema1</id>
<goals>
<goal>xjc</goal>
</goals>
<configuration>
<!-- use all xsds under the west directory for sources here. -->
<sources>
<source>src/main/resources/xsds/shiporder.xsd</source>
</sources>
<!-- package name of the generated sources. -->
<packagename>com.example.stubs</packagename>
<outputdirectory>src/main/java</outputdirectory>
<clearoutputdir> false </clearoutputdir>
</configuration>
</execution>
</executions>
</plugin>
|
我们已经准备好了存根类和一切,现在使用jaxb magic的java dsl jms消息驱动适配器:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
/**
* sample 3: jms message driven adapter with jaxb
*/
@bean
public jmsmessagedrivenchanneladapter dataendpoint() {
final channelpublishingjmsmessagelistener channelpublishingjmsmessagelistener =
new channelpublishingjmsmessagelistener();
channelpublishingjmsmessagelistener.setexpectreply( false );
channelpublishingjmsmessagelistener.setmessageconverter( new marshallingmessageconverter(shipordersmarshaller()));
final jmsmessagedrivenchanneladapter messagedrivenchanneladapter = new
jmsmessagedrivenchanneladapter(listenercontainer(), channelpublishingjmsmessagelistener
);
messagedrivenchanneladapter.setoutputchannel(inboundchannel());
return messagedrivenchanneladapter;
}
@bean
public jaxb2marshaller shipordersmarshaller() {
jaxb2marshaller marshaller = new jaxb2marshaller();
marshaller.setcontextpath( "com.example.stubs" );
return marshaller;
}
|
xml配置在java中使用它可以为您提供如此强大的功能和灵活性。要完成此示例,inboundchannel的服务激活器将如下所示:
1
2
3
4
5
6
7
8
9
|
/**
* sample 3
* @param shiporder
*/
@serviceactivator (inputchannel = "inboundchannel" )
public void processmessage( final shiporder shiporder) {
system.out.println(shiporder.getorderid());
system.out.println(shiporder.getorderperson());
}
|
要测试流,您可以使用以下xml通过jconsole发送到jms队列:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
<?xml version= "1.0" encoding= "utf-8" ?>
<shiporder orderid= "889923"
xmlns:xsi= "http://www.w3.org/2001/xmlschema-instance"
xsi:nonamespaceschemalocation= "shiporder.xsd" >
<orderperson>john smith</orderperson>
<shipto>
<name>ola nordmann</name>
<address>langgt 23 </address>
<city> 4000 stavanger</city>
<country>norway</country>
</shipto>
<item>
<title>empire burlesque</title>
<note>special edition</note>
<quantity> 1 </quantity>
<price> 10.90 </price>
</item>
<item>
<title>hide your heart</title>
<quantity> 1 </quantity>
<price> 9.90 </price>
</item>
</shiporder>
|
示例4:具有jaxb和有效负载根路由的jms消息驱动的通道适配器
另一种典型情况是接受xml作为jms文本消息,将其转换为jaxb存根并根据有效负载根类型将有效负载路由到某个服务激活器。当然si java dsl支持所有类型的路由,我将向您展示如何根据有效载荷类型进行路由。
首先,将以下xsd添加到shiporder.xsd所在的文件夹中,并将其命名为purchaseorder.xsd:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
<xsd:schema xmlns:xsd= "http://www.w3.org/2001/xmlschema"
xmlns:tns= "http://tempuri.org/purchaseorderschema.xsd"
targetnamespace= "http://tempuri.org/purchaseorderschema.xsd"
elementformdefault= "qualified" >
<xsd:element name= "purchaseorder" >
<xsd:complextype>
<xsd:sequence>
<xsd:element name= "shipto" type= "tns:usaddress" maxoccurs= "2" />
<xsd:element name= "billto" type= "tns:usaddress" />
</xsd:sequence>
<xsd:attribute name= "orderdate" type= "xsd:date" />
</xsd:complextype>
</xsd:element>
<xsd:complextype name= "usaddress" >
<xsd:sequence>
<xsd:element name= "name" type= "xsd:string" />
<xsd:element name= "street" type= "xsd:string" />
<xsd:element name= "city" type= "xsd:string" />
<xsd:element name= "state" type= "xsd:string" />
<xsd:element name= "zip" type= "xsd:integer" />
</xsd:sequence>
<xsd:attribute name= "country" type= "xsd:nmtoken" fixed= "us" />
</xsd:complextype>
</xsd:schema>
|
然后添加到jaxb maven插件配置:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
<plugin>
<groupid>org.codehaus.mojo</groupid>
<artifactid>jaxb2-maven-plugin</artifactid>
<version> 2.3 . 1 </version>
<executions>
<execution>
<id>xjc-schema1</id>
<goals>
<goal>xjc</goal>
</goals>
<configuration>
<!-- use all xsds under the west directory for sources here. -->
<sources>
<source>src/main/resources/xsds/shiporder.xsd</source>
<source>src/main/resources/xsds/purchaseorder.xsd</source>
</sources>
<!-- package name of the generated sources. -->
<packagename>com.example.stubs</packagename>
<outputdirectory>src/main/java</outputdirectory>
<clearoutputdir> false </clearoutputdir>
</configuration>
</execution>
</executions>
</plugin>
|
运行mvn clean install以生成新xsd的jaxb存根。现在承诺有效负载根映射:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
@bean
public jaxb2marshaller ordersmarshaller() {
jaxb2marshaller marshaller = new jaxb2marshaller();
marshaller.setcontextpath( "com.example.stubs" );
return marshaller;
}
/**
* sample 4: jms message driven adapter with jaxb and payload routing.
* @return
*/
@bean
public jmsmessagedrivenchanneladapter dataendpoint() {
final channelpublishingjmsmessagelistener channelpublishingjmsmessagelistener =
new channelpublishingjmsmessagelistener();
channelpublishingjmsmessagelistener.setmessageconverter( new marshallingmessageconverter(ordersmarshaller()));
final jmsmessagedrivenchanneladapter messagedrivenchanneladapter = new
jmsmessagedrivenchanneladapter(listenercontainer(), channelpublishingjmsmessagelistener
);
messagedrivenchanneladapter.setoutputchannel(inboundchannel());
return messagedrivenchanneladapter;
}
@bean
public integrationflow payloadrootmapping() {
return integrationflows.from(inboundchannel()).<object, class <?>>route(object::getclass, m->m
.subflowmapping(shiporder. class , sf->sf.handle((messagehandler) message -> {
final shiporder shiporder = (shiporder) message.getpayload();
system.out.println(shiporder.getorderperson());
system.out.println(shiporder.getorderid());
}))
.subflowmapping(purchaseorder. class , sf->sf.handle((messagehandler) message -> {
final purchaseorder purchaseordertype = (purchaseorder) message.getpayload();
system.out.println(purchaseordertype.getbillto().getname());
}))
).get();
}
|
注意payloadrootmapping bean,让我们解释一下重要的部分:
- <object, class<?>> route - 表示来自inboundchannel的输入将是object,并且将根据class <?>执行路由
- subflowmapping(shiporder.class.. - shipoders的处理。
- subflowmapping(purchaseorder.class ... - 处理purchaseorders。
要测试shiporder有效负载,请使用示例3中的xml,以测试purchaseorder有效负载,使用以下xml:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
<?xml version= "1.0" encoding= "utf-8" ?>
<purchaseorder orderdate= "1900-01-01" xmlns= "http://tempuri.org/purchaseorderschema.xsd" >
<shipto country= "us" >
<name>name1</name>
<street>street1</street>
<city>city1</city>
<state>state1</state>
<zip> 1 </zip>
</shipto>
<shipto country= "us" >
<name>name2</name>
<street>street2</street>
<city>city2</city>
<state>state2</state>
<zip>- 79228162514264337593543950335 </zip>
</shipto>
<billto country= "us" >
<name>name1</name>
<street>street1</street>
<city>city1</city>
<state>state1</state>
<zip> 1 </zip>
</billto>
</purchaseorder>
|
应根据subflow 子流map路由两个有效载荷。
示例5:integrationflowadapter
除了企业集成模式的其他实现(check them out)),我需要提到integrationflowadapter。通过扩展此类并实现buildflow方法,如:
1
2
3
4
5
6
7
8
9
10
11
12
|
[url=https: //bitbucket.org/component/]@component[/url]
public class myflowadapter extends integrationflowadapter {
@autowired
private connectionfactory rabbitconnectionfactory;
@override
protected integrationflowdefinition<?> buildflow() {
return from(amqp.inboundadapter( this .rabbitconnectionfactory, "myqueue" ))
.<string, string>transform(string::tolowercase)
.channel(c -> c.queue( "myflowadapteroutput" ));
}
|
你可以将bean的重复声明包装成一个组件并给它们所需的流量。然后可以配置这样的组件并将其作为一个类实例提供给调用代码!
因此,让我们举例说明这个repo中的示例3更短一些,并为所有jmsendpoints定义基类,并在其中定义重复bean:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
|
public class jmsendpoint extends integrationflowadapter {
private string queuename;
private string channelname;
private string contextpath;
/**
* @param queuename
* @param channelname
* @param contextpath
*/
public jmsendpoint(string queuename, string channelname, string contextpath) {
this .queuename = queuename;
this .channelname = channelname;
this .contextpath = contextpath;
}
@override
protected integrationflowdefinition<?> buildflow() {
return from(jms.messagedrivenchanneladapter(listenercontainer())
.jmsmessageconverter( new marshallingmessageconverter(shipordersmarshaller()))
).channel(channelname);
}
@bean
public jaxb2marshaller shipordersmarshaller() {
jaxb2marshaller marshaller = new jaxb2marshaller();
marshaller.setcontextpath(contextpath);
return marshaller;
}
@bean
public dynamicdestinationresolver dynamicdestinationresolver() {
return new dynamicdestinationresolver();
}
@bean
public activemqconnectionfactory connectionfactory() {
return new activemqconnectionfactory();
}
@bean
public defaultmessagelistenercontainer listenercontainer() {
final defaultmessagelistenercontainer defaultmessagelistenercontainer = new defaultmessagelistenercontainer();
defaultmessagelistenercontainer.setdestinationresolver(dynamicdestinationresolver());
defaultmessagelistenercontainer.setconnectionfactory(connectionfactory());
defaultmessagelistenercontainer.setdestinationname(queuename);
return defaultmessagelistenercontainer;
}
@bean
public messagechannel inboundchannel() {
return messagechannels.direct(channelname).get();
}
}
|
现在声明特定队列的jms端点很容易:
1
2
3
4
|
@bean
public jmsendpoint jmsendpoint() {
return new jmsendpoint( "jms.activemq.test" , "inboundchannel" , "com.example.stubs" );
}
|
inboundchannel的服务激活器:
1
2
3
4
5
6
7
8
9
|
/**
* sample 3, 5
* @param shiporder
*/
@serviceactivator (inputchannel = "inboundchannel" )
public void processmessage( final shiporder shiporder) {
system.out.println(shiporder.getorderid());
system.out.println(shiporder.getorderperson());
}
|
您不应该错过在项目中使用integrationflowadapter。我喜欢它的概念。
我最近在embedit的新的基于spring boot的项目中开始使用spring integration java dsl 。即使有一些配置,我发现它非常有用。
- 它很容易调试。不添加像wiretap这样的配置。
- 阅读起来要容易得多。是的,即使是lambdas!
- 它很强大。在java配置中,您现在有很多选择。
源码地址: https://bitbucket.org/tomask79/spring-integration-java-dsl
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。
原文链接:https://www.jdon.com/51378