Spring Integration的TCP 和 UDP 支持

时间:2022-12-14 19:07:27

Spring Integration的TCP 和 UDP 支持

Spring 集成提供了通道适配器,用于通过互联网协议接收和发送消息。 同时提供 UDP(用户数据报协议)和 TCP(传输控制协议)适配器。 每个适配器都通过基础协议提供单向通信。 此外,Spring 集成还提供了简单的入站和出站 TCP 网关。 当需要双向通信时,会使用这些。

您需要将此依赖项包含在项目中:

<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-ip</artifactId>
<version>6.0.0</version>
</dependency>

介绍

提供了 UDP 入站和出站通道适配器的两种风格:

  • ​UnicastSendingMessageHandler​​将数据报数据包发送到单个目标。
  • ​UnicastReceivingChannelAdapter​​接收传入的数据报数据包。
  • ​MulticastSendingMessageHandler​​向组播地址发送(广播)数据报数据包。
  • ​MulticastReceivingChannelAdapter​​通过加入组播地址来接收传入的数据报数据包。

提供 TCP 入站和出站通道适配器:

  • ​TcpSendingMessageHandler​​通过 TCP 发送消息。
  • ​TcpReceivingChannelAdapter​​通过 TCP 接收消息。

提供入站 TCP 网关。 它允许简单的请求-响应处理。 虽然网关可以支持任意数量的连接,但每个连接只能串行处理。 从套接字读取的线程在再次读取之前等待并发送响应。 如果连接工厂配置为单机版连接,则连接将在套接字超时后关闭。

提供出站 TCP 网关。 它允许简单的请求-响应处理。 如果为一次性连接配置了关联的连接工厂,则会立即为每个新请求创建一个新连接。 否则,如果连接正在使用中,则调用线程将阻塞连接,直到收到响应或发生超时或 I/O 错误。

TCP 和 UDP 入站通道适配器以及 TCP 入站网关支持该属性。 这提供了与输入网关代理工厂Bean中所述相同的基本功能。​​error-channel​

UDP 适配器

本节介绍如何配置和使用 UDP 适配器。

出站 UDP 适配器(XML 配置)

以下示例配置 UDP 出站通道适配器:

<int-ip:udp-outbound-channel-adapter 
host="somehost"
port="11111"
multicast="false"
socket-customizer="udpCustomizer"
channel="exampleChannel"/>

设置为 时,还应在 host 属性中提供多播地址。​​multicast​​​​true​

UDP 是一种高效但不可靠的协议。 Spring 集成增加了两个属性以提高可靠性:和 。 设置为 时,适配器在消息数据前面加上一个长度字段(按网络字节顺序排列的四个字节)。 这使接收端能够验证收到的数据包的长度。 如果接收系统使用的缓冲区太短而无法包含数据包,则可以截断数据包。 标头提供了一种检测此情况的机制。​​check-length​​​​acknowledge​​​​check-length​​​​true​​​​length​

从版本 4.3 开始,您可以将 设置为 ,在这种情况下,操作系统会选择端口。 通过在适配器启动并返回 后调用,可以发现所选端口。​​port​​​​0​​​​getPort()​​​​isListening()​​​​true​

从版本 5.3.3 开始,您可以添加一个 Bean 来修改创建后的 (例如,调用 )。​​SocketCustomizer​​​​DatagramSocket​​​​setTrafficClass(0x10)​

以下示例显示向数据报数据包添加长度检查的出站通道适配器:

<int-ip:udp-outbound-channel-adapter 
host="somehost"
port="11111"
multicast="false"
check-length="true"
channel="exampleChannel"/>

数据包的接收方还必须配置为期望在实际数据之前有一个长度。 对于弹簧集成 UDP 入站通道适配器,请设置其属性。​​check-length​

第二个可靠性改进允许使用应用程序级确认协议。 接收方必须在指定时间内向发送方发送确认。

以下示例显示了一个出站通道适配器,该适配器向数据报数据包添加长度检查并等待确认:

<int-ip:udp-outbound-channel-adapter 
host="somehost"
port="11111"
multicast="false"
check-length="true"
acknowledge="true"
ack-host="thishost"
ack-port="22222"
ack-timeout="10000"
channel="exampleChannel"/>

设置为 意味着数据包的接收方可以解释添加到包含确认数据(主机和端口)的数据包的标头。 最有可能的是,收件人是 Spring 集成入站通道适配器。​​acknowledge​​​​true​

当多播为 true 时,附加属性 () 指定必须在 中接收多少个确认。​​min-acks-for-success​​​​ack-timeout​

从版本 4.3 开始,您可以将 设置为 ,在这种情况下,操作系统会选择端口。​​ackPort​​​​0​

出站 UDP 适配器(Java 配置)

以下示例演示如何使用 Java 配置出站 UDP 适配器:

@Bean
@ServiceActivator(inputChannel = "udpOut")
public UnicastSendingMessageHandler handler() {
return new UnicastSendingMessageHandler("localhost", 11111);
}

(或多播)。​​MulticastSendingChannelAdapter​

出站 UDP 适配器(Java DSL 配置)

以下示例显示如何使用 Java DSL 配置出站 UDP 适配器:

@Bean
public IntegrationFlow udpOutFlow() {
return f -> f.handle(Udp.outboundAdapter("localhost", 1234)
.configureSocket(socket -> socket.setTrafficClass(0x10)))
.get();
}

入站 UDP 适配器(XML 配置)

以下示例演示如何配置基本的单播入站 udp 通道适配器。

<int-ip:udp-inbound-channel-adapter 
channel="udpOutChannel"
port="11111"
receive-buffer-size="500"
multicast="false"
socket-customizer="udpCustomizer"
check-length="true"/>

以下示例演示如何配置基本多播入站 udp 通道适配器:

<int-ip:udp-inbound-channel-adapter 
channel="udpOutChannel"
port="11111"
receive-buffer-size="500"
multicast="true"
multicast-address="225.6.7.8"
check-length="true"/>

默认情况下,不会对入站数据包执行反向 DNS 查找:在未配置 DNS 的环境中(例如 Docker 容器),这可能会导致连接延迟。 要将 IP 地址转换为主机名以在邮件头中使用,可以通过将该属性设置为 来覆盖默认行为。​​lookup-host​​​​true​

从版本 5.3.3 开始,您可以在创建 Bean 后添加 Bean 进行修改。 它被调用用于接收套接字和为发送确认而创建的任何套接字。​​SocketCustomizer​​​​DatagramSocket​

入站 UDP 适配器(Java 配置)

以下示例显示如何使用 Java 配置入站 UDP 适配器:

@Bean
public UnicastReceivingChannelAdapter udpIn() {
UnicastReceivingChannelAdapter adapter = new UnicastReceivingChannelAdapter(11111);
adapter.setOutputChannelName("udpChannel");
return adapter;
}

以下示例显示如何使用 Java DSL 配置入站 UDP 适配器:

入站 UDP 适配器(Java DSL 配置)

@Bean
public IntegrationFlow udpIn() {
return IntegrationFlow.from(Udp.inboundAdapter(11111))
.channel("udpChannel")
.get();
}

服务器侦听事件

从版本 5.0.2 开始,当入站适配器启动并开始侦听时发出 。 当适配器配置为侦听端口时,这很有用,这意味着操作系统选择端口。 它也可以用来代替 轮询 ,如果您需要等待才能启动将连接到套接字的其他进程。​​UdpServerListeningEvent​​​​0​​​​isListening()​

高级出站配置

() 具有和选项。​​<int-ip:udp-outbound-channel-adapter>​​​​UnicastSendingMessageHandler​​​​destination-expression​​​​socket-expression​

您可以使用 作为硬编码 - 对的运行时替代方法,根据 (使用评估上下文的根对象)确定传出数据报数据包的目标地址。 表达式的计算结果必须为 、URI 样式中的 、 (请参阅 RFC-2396) 或 . 您还可以为此表达式使用入站标头。 在框架中,当我们在 中接收数据报并将其转换为消息时,会填充此标头。 标头值恰好是传入数据报的结果。​​destination-expression​​​​host​​​​port​​​​requestMessage​​​​URI​​​​String​​​​SocketAddress​​​​IpHeaders.PACKET_ADDRESS​​​​DatagramPacketMessageMapper​​​​UnicastReceivingChannelAdapter​​​​DatagramPacket.getSocketAddress()​

使用 ,出站通道适配器可以使用(例如)入站通道适配器套接字通过接收数据的同一端口发送数据报。 在我们的应用程序作为 UDP 服务器工作并且客户端在网络地址转换 (NAT) 后面运行的情况下,它很有用。 此表达式的计算结果必须为 . 用作评估上下文的根对象。 不能将参数与 和 参数一起使用。 以下示例演示如何使用转换为大写并使用套接字的转换器配置 UDP 入站通道适配器:​​socket-expression​​​​DatagramSocket​​​​requestMessage​​​​socket-expression​​​​multicast​​​​acknowledge​

<int-ip:udp-inbound-channel-adapter  port="0" channel="in" />

<int:channel />

<int:transformer expression="new String(payload).toUpperCase()"
input-channel="in" output-channel="out"/>

<int:channel />

<int-ip:udp-outbound-channel-adapter
socket-expression="@inbound.socket"
destination-expression="headers['ip_packetAddress']"
channel="out" />

以下示例显示了与 Java DSL 的等效配置:

@Bean
public IntegrationFlow udpEchoUpcaseServer() {
return IntegrationFlow.from(Udp.inboundAdapter(11111).id("udpIn"))
.<byte[], String>transform(p -> new String(p).toUpperCase())
.handle(Udp.outboundAdapter("headers['ip_packetAddress']")
.socketExpression("@udpIn.socket"))
.get();
}

TCP 连接工厂

概述

对于 TCP,基础连接的配置是使用连接工厂提供的。 提供了两种类型的连接工厂:客户端连接工厂和服务器连接工厂。 客户端连接工厂建立传出连接。 服务器连接工厂侦听传入连接。

出站通道适配器使用客户端连接工厂,但您也可以提供对入站通道适配器的客户端连接工厂的引用。 该适配器接收在出站适配器创建的连接上接收的任何传入消息。

入站通道适配器或网关使用服务器连接工厂。 (事实上,没有连接工厂就无法运行)。 还可以提供对出站适配器的服务器连接工厂的引用。 然后,您可以使用该适配器向同一连接上的传入消息发送答复。

仅当回复包含由连接工厂插入到原始消息中的标头时,回复消息才会路由到连接。​​ip_connectionId​

这是在入站和出站适配器之间共享连接工厂时执行的消息关联程度。 这种共享允许通过 TCP 进行异步双向通信。 默认情况下,仅使用 TCP 传输有效负载信息。 因此,任何消息关联都必须由下游组件(如聚合器或其他终结点)执行。 版本 3.0 中引入了对传输选定标头的支持。 有关详细信息,请参阅 TCP 消息关联。

您可以为每种类型最多一个适配器提供对连接工厂的引用。

Spring 集成提供了使用 和 的连接工厂。​​java.net.Socket​​​​java.nio.channel.SocketChannel​

下面的示例演示使用连接的简单服务器连接工厂:​​java.net.Socket​

<int-ip:tcp-connection-factory 
type="server"
port="1234"/>

下面的示例演示使用连接的简单服务器连接工厂:​​java.nio.channel.SocketChannel​

<int-ip:tcp-connection-factory 
type="server"
port="1234"
using-nio="true"/>

从 Spring Integration 版本 4.2 开始,如果服务器配置为侦听随机端口(通过将端口设置为 ),则可以使用 获取操作系统选择的实际端口。 此外,让您获得完整的. 有关更多信息,请参阅 TcpServerConnectionFactory 接口的 Javadoc​。​​0​​​​getPort()​​​​getServerSocketAddress()​​​​SocketAddress​

<int-ip:tcp-connection-factory 
type="client"
host="localhost"
port="1234"
single-use="true"
so-timeout="10000"/>

以下示例显示使用连接并为每条消息创建新连接的客户端连接工厂:​​java.net.Socket​

<int-ip:tcp-connection-factory 
type="client"
host="localhost"
port="1234"
single-use="true"
so-timeout="10000"
using-nio=true/>

从版本 5.2 开始,客户端连接工厂支持 属性 ,以秒为单位指定,默认为 60。​​connectTimeout​

另请参阅基于注释的配置和使用 Java DSL for TCP 组件。

消息划分(序列化程序和解序列化程序)

TCP 是一种流式处理协议。 这意味着必须为通过TCP传输的数据提供一些结构,以便接收方可以将数据划分为离散消息。 连接工厂配置为使用序列化程序和反序列化程序在消息有效负载和通过 TCP 发送的位之间进行转换。 这是通过分别为入站和出站消息提供反序列化程序和序列化程序来实现的。 Spring Integration提供了许多标准的序列化程序和解串器。

​ByteArrayCrlfSerializer​*将字节数组转换为字节流,后跟回车符和换行符 ()。 这是默认的序列化程序(和解序列化程序),可以(例如)与 telnet 一起使用作为客户端。​​\r\n​

* 将字节数组转换为字节流,后跟单个终止字符(默认值为 )。​​ByteArraySingleTerminatorSerializer​​​​0x00​

* 将字节数组转换为字节流,后跟单个换行字符 ()。​​ByteArrayLfSerializer​​​​0x0a​

* 将字节数组转换为字节流,前面是 STX (),后跟 ETX ()。​​ByteArrayStxEtxSerializer​​​​0x02​​​​0x03​

将字节数组转换为字节流,前面是网络字节顺序(大字节序)的二进制长度。 这是一个高效的反序列化程序,因为它不必解析每个字节来查找终止字符序列。 它还可用于包含二进制数据的有效负载。 上述序列化程序仅支持有效负载中的文本。 长度标头的默认大小为 4 个字节(整数),允许消息最多为 (2^31 - 1) 字节。 但是,对于最多 255 字节的消息,标头可以是单个字节(无符号),对于最多 (2^16 - 1) 个字节的消息,标头可以是无符号短消息(2 个字节)。 如果标头需要任何其他格式,则可以对 and 方法进行子类化并提供实现。 绝对最大数据大小为 (2^31 - 1) 字节。 从版本 5.2 开始,标头值除了有效负载之外,还可以包含标头的长度。 设置属性以启用该机制(对于生产者和使用者,必须将其设置为相同)。​​ByteArrayLengthHeaderSerializer​​​​length​​​​ByteArrayLengthHeaderSerializer​​​​readHeader​​​​writeHeader​​​​inclusive​

* 将字节数组转换为字节流,并且不添加其他消息划分数据。 使用此序列化程序(和反序列化程序),消息的结尾由客户端以有序方式关闭套接字来指示。 使用此序列化程序时,消息接收将挂起,直到客户端关闭套接字或发生超时。 超时不会导致消息。 当使用此序列化程序并且客户端是 Spring 集成应用程序时,客户端必须使用配置了 的连接工厂。 这样做会导致适配器在发送消息后关闭套接字。 序列化程序本身不会关闭连接。 应仅将此序列化程序与通道适配器(而不是网关)使用的连接工厂一起使用,并且连接工厂应由入站或出站适配器使用,但不能同时由两者使用。 另请参阅本节后面的 。 但是,从版本 5.2 开始,出站网关具有新属性;这允许使用原始序列化程序/反序列化程序,因为 EOF 会向服务器发出信号,同时保持连接打开以接收回复。​​ByteArrayRawSerializer​​​​single-use="true"​​​​ByteArrayElasticRawDeserializer​​​​closeStreamAfterSend​

在版本 4.2.2 之前,当使用非阻塞 I/O (NIO) 时,此序列化程序将超时(读取期间)视为文件结束,并且到目前为止读取的数据作为消息发出。 这是不可靠的,不应用于分隔消息。 它现在将此类条件视为例外。 万一以这种方式使用它,可以通过将构造函数参数设置为 来恢复以前的行为。​​treatTimeoutAsEndOfMessage​​​​true​

其中每个都是 的子类,它同时实现 和 。 为了向后兼容,使用 的任何子类进行序列化的连接也接受首先转换为字节数组的 。 这些序列化程序和反序列化程序中的每一个都将包含相应格式的输入流转换为字节数组有效负载。​​AbstractByteArraySerializer​​​​org.springframework.core.serializer.Serializer​​​​org.springframework.core.serializer.Deserializer​​​​AbstractByteArraySerializer​​​​String​

为了避免由于客户端行为不良(不遵守配置的序列化程序的协议)而导致内存耗尽,这些序列化程序施加了最大消息大小。 如果传入消息超过此大小,则会引发异常。 默认的最大邮件大小为 2048 字节。 您可以通过设置属性来增加它。 如果使用缺省序列化程序或反序列化程序并希望增加最大消息大小,则必须将最大消息大小声明为具有属性集的显式 Bean,并将连接工厂配置为使用该 Bean。​​maxMessageSize​​​​maxMessageSize​

本节前面标有 * 的类使用中间缓冲区,并将解码的数据复制到正确大小的最终缓冲区。 从版本 4.3 开始,可以通过设置一个属性来配置这些缓冲区,以允许重用这些原始缓冲区,而不是为每条消息分配和丢弃这些原始缓冲区,这是默认行为。 将属性设置为负值将创建一个没有边界的池。 如果池是有界的,则还可以设置该属性(以毫秒为单位),之后如果没有可用的缓冲区,则会引发异常。 它默认为无穷大。 此类异常会导致套接字关闭。​​poolSize​​​​poolWaitTimeout​

如果您希望在自定义反序列化程序中使用相同的机制,则可以扩展(而不是其超类)并实现而不是 . 缓冲区将自动返回到池中。 还提供了方便的实用工具方法:。​​AbstractPooledBufferByteArraySerializer​​​​AbstractByteArraySerializer​​​​doDeserialize()​​​​deserialize()​​​​AbstractPooledBufferByteArraySerializer​​​​copyToSizedArray()​

版本 5.0 添加了 . 这类似于上面的反序列化程序端,只是不需要设置 . 在内部,它使用 让缓冲区根据需要增长。 客户端必须有序地关闭套接字以发出消息结束的信号。​​ByteArrayElasticRawDeserializer​​​​ByteArrayRawSerializer​​​​maxMessageSize​​​​ByteArrayOutputStream​

此反序列化程序应仅在对等方受信任时使用;由于内存不足的情况,它容易受到 DoS 连接的影响。

使用 Jackson 在 a 和 JSON 之间进行转换。 可以将此序列化程序与 a 和 a 结合使用,以 JSON 格式传输选定的标头和有效负载。​​MapJsonSerializer​​​​ObjectMapper​​​​Map​​​​MessageConvertingTcpMessageMapper​​​​MapMessageConverter​

杰克逊无法在流中划分消息。 因此,需要委托给另一个序列化程序或反序列化程序来处理消息划分。 默认情况下,使用 a,导致消息的格式为 on the wire,但您可以将其配置为改用其他格式。 (下一个示例演示如何执行此操作。​​ObjectMapper​​​​MapJsonSerializer​​​​ByteArrayLfSerializer​​​​<json><LF>​

最终的标准序列化程序是 ,您可以使用它通过 Java 序列化转换可序列化的对象。 为包含可序列化对象的流的入站反序列化提供。​​org.springframework.core.serializer.DefaultSerializer​​​​org.springframework.core.serializer.DefaultDeserializer​

如果不希望使用默认序列化程序和反序列化程序 (),则必须在连接工厂上设置 and 属性。 以下示例演示如何执行此操作:​​ByteArrayCrLfSerializer​​​​serializer​​​​deserializer​

<bean 
class="org.springframework.core.serializer.DefaultSerializer" />
<bean
class="org.springframework.core.serializer.DefaultDeserializer" />

<int-ip:tcp-connection-factory
type="server"
port="1234"
deserializer="javaDeserializer"
serializer="javaSerializer"/>

使用连接并在网络上使用 Java 序列化的服务器连接工厂。​​java.net.Socket​

有关连接工厂上可用属性的完整详细信息,请参阅本节末尾的参考。

默认情况下,不会对入站数据包执行反向 DNS 查找:在未配置 DNS 的环境中(例如 Docker 容器),这可能会导致连接延迟。 要将 IP 地址转换为主机名以在邮件头中使用,可以通过将该属性设置为 来覆盖默认行为。​​lookup-host​​​​true​

您还可以修改套接字和套接字工厂的属性。 有关详细信息,请参阅 SSL/TLS 支持。 如前所述,如果是否使用 SSL,则可以进行此类修改。

另请参阅基于注释的配置和使用 Java DSL for TCP 组件。

自定义序列化程序和解串程序

如果数据不是标准反序列化程序之一支持的格式,则可以实现自己的格式;还可以实现自定义序列化程序。

若要实现自定义序列化程序和解序列化程序对,请实现 和 接口。​​org.springframework.core.serializer.Deserializer​​​​org.springframework.core.serializer.Serializer​

当反序列化程序检测到消息之间的关闭输入流时,它必须抛出 ;这是向框架发出的信号,表明收盘价是“正常的”。 如果在解码消息时关闭流,则应引发其他一些异常。​​SoftEndOfStreamException​

从 5.2 版开始,现在是 而不是扩展 .​​SoftEndOfStreamException​​​​RuntimeException​​​​IOException​

TCP 缓存客户端连接工厂

如前所述,TCP 套接字可以是“一次性”(一个请求或响应)或共享。 共享套接字在大容量环境中与出站网关的性能不佳,因为套接字一次只能处理一个请求或响应。

若要提高性能,可以使用协作通道适配器而不是网关,但这需要应用程序级消息关联。 有关详细信息,请参阅 TCP 消息关联。

Spring Integration 2.2引入了缓存客户端连接工厂,它使用共享套接字池,允许网关使用共享连接池处理多个并发请求。

TCP 故障转移客户端连接工厂

可以配置支持故障转移到一个或多个其他服务器的 TCP 连接工厂。 发送消息时,工厂将遍历其所有已配置的工厂,直到可以发送消息或找不到连接。 最初,使用配置列表中的第一个工厂。 如果连接随后失败,则下一个工厂将成为当前工厂。 下面的示例演示如何配置故障转移客户端连接工厂:

<bean  class="o.s.i.ip.tcp.connection.FailoverClientConnectionFactory">
<constructor-arg>
<list>
<ref bean="clientFactory1"/>
<ref bean="clientFactory2"/>
</list>
</constructor-arg>
</bean>


使用故障转移连接工厂时,该属性必须在工厂本身与其配置为使用的工厂列表之间保持一致。​​singleUse​

与共享连接一起使用时,连接工厂具有两个与故障回复相关的属性 ():​​singleUse=false​

  • ​refreshSharedInterval​
  • ​closeOnRefresh​

根据上述配置,请考虑以下方案: 假设无法建立连接,但可以。 当该方法在 通过后调用时,我们将再次尝试使用 ;如果成功,将关闭 的连接。 如果是,则“旧”连接将保持打开状态,如果第一个工厂再次失败,将来可以重复使用。​​clientFactory1​​​​clientFactory2​​​​failCF​​​​getConnection()​​​​refreshSharedInterval​​​​clientFactory1​​​​clientFactory2​​​​closeOnRefresh​​​​false​

设置为仅在该时间到期后尝试与第一个工厂重新连接;如果只想在当前连接失败时故障回复到第一个出厂状态,请将其设置为(默认值)。​​refreshSharedInterval​​​​Long.MAX_VALUE​

设置为在刷新后关闭“旧”连接,实际创建新连接。​​closeOnRefresh​

如果任何委托工厂是 ,则这些属性不适用,因为连接缓存是在那里处理的;在这种情况下,将始终参考连接工厂列表以获取连接。​​CachingClientConnectionFactory​

从版本 5.3 开始,这些默认为 ,因此工厂仅在当前连接失败时尝试故障回复。 要恢复为以前版本的默认行为,请将其设置为 和 。​​Long.MAX_VALUE​​​​true​​​​0​​​​false​

另请参阅测试连接。

TCP 线程相关性连接工厂

Spring 集成版本 5.0 引入了此连接工厂。 它将连接绑定到调用线程,并且每次线程发送消息时都会重复使用相同的连接。 这一直持续到连接关闭(由服务器或网络)或直到线程调用该方法。 连接本身由另一个客户端工厂实现提供,必须将其配置为提供非共享(一次性)连接,以便每个线程都获得一个连接。​​releaseConnection()​

以下示例演示如何配置 TCP 线程相关性连接工厂:

@Bean
public TcpNetClientConnectionFactory cf() {
TcpNetClientConnectionFactory cf = new TcpNetClientConnectionFactory("localhost",
Integer.parseInt(System.getProperty(PORT)));
cf.setSingleUse(true);
return cf;
}

@Bean
public ThreadAffinityClientConnectionFactory tacf() {
return new ThreadAffinityClientConnectionFactory(cf());
}

@Bean
@ServiceActivator(inputChannel = "out")
public TcpOutboundGateway outGate() {
TcpOutboundGateway outGate = new TcpOutboundGateway();
outGate.setConnectionFactory(tacf());
outGate.setReplyChannelName("toString");
return outGate;
}

测试连接

在某些情况下,在首次打开连接时发送某种运行状况检查请求可能很有用。 其中一种情况可能是在使用 TCP 故障转移客户端连接工厂时,以便在所选服务器允许打开连接但报告其不正常时进行故障转移。

为了支持此功能,请向客户端连接工厂添加 。​​connectionTest​

/**
* Set a {@link Predicate} that will be invoked to test a new connection; return true
* to accept the connection, false the reject.
* @param connectionTest the predicate.
* @since 5.3
*/
public void setConnectionTest(@Nullable Predicate<TcpConnectionSupport> connectionTest) {
this.connectionTest = connectionTest;
}

若要测试连接,请将临时侦听器附加到测试中的连接。 如果测试失败,连接将关闭并引发异常。 与 TCP 故障转移客户端连接工厂一起使用时,这会触发尝试下一台服务器。

只有来自服务器的第一个回复才会转到测试侦听器。

在下面的示例中,如果服务器在我们发送时回复,则认为服务器健康。​​PONG​​​​PING​

Message<String> ping = new GenericMessage<>("PING");
byte[] pong = "PONG".getBytes();
clientFactory.setConnectionTest(conn -> {
CountDownLatch latch = new CountDownLatch(1);
AtomicBoolean result = new AtomicBoolean();
conn.registerTestListener(msg -> {
if (Arrays.equals(pong, (byte[]) msg.getPayload())) {
result.set(true);
}
latch.countDown();
return false;
});
conn.send(ping);
try {
latch.await(10, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return result.get();
});

TCP 连接拦截器

可以使用引用 来配置连接工厂。 可以使用侦听器向连接添加行为,例如协商、安全性和其他选项。 框架当前未提供拦截器,但有关示例,请参阅源存储库中的 InterceptedSharedConnectionTests。​​TcpConnectionInterceptorFactoryChain​

测试用例中使用的方法如下:​​HelloWorldInterceptor​

拦截器首先配置客户端连接工厂。 当第一条消息通过截获的连接发送时,拦截器通过连接发送“Hello”,并期望接收“world! 发生这种情况时,协商完成并发送原始消息。 此外,使用相同连接的消息无需任何其他协商即可发送。

配置了服务器连接工厂时,拦截器要求第一条消息为“Hello”,如果是,则返回“world! 否则,它将引发导致连接关闭的异常。

截获所有方法。 拦截器实例由拦截器工厂为每个连接创建。 如果拦截器是有状态的,则工厂应为每个连接创建一个新实例。 如果没有状态,则相同的侦听器可以包装每个连接。 侦听器工厂将添加到侦听器工厂链的配置中,您可以通过设置属性将其提供给连接工厂。 拦截器必须扩展 。 工厂必须实现该接口。 具有直通方法。 通过扩展此类,您只需要实现您希望拦截的那些方法。​​TcpConnection​​​​interceptor-factory​​​​TcpConnectionInterceptorSupport​​​​TcpConnectionInterceptorFactory​​​​TcpConnectionInterceptorSupport​

以下示例演示如何配置连接侦听器工厂链:

<bean 
class="o.s.i.ip.tcp.connection.TcpConnectionInterceptorFactoryChain">
<property name="interceptors">
<array>
<bean class="o.s.i.ip.tcp.connection.HelloWorldInterceptorFactory"/>
</array>
</property>
</bean>

<int-ip:tcp-connection-factory
type="server"
port="12345"
using-nio="true"
single-use="true"
interceptor-factory-chain="helloWorldInterceptorFactory"/>

<int-ip:tcp-connection-factory
type="client"
host="localhost"
port="12345"
single-use="true"
so-timeout="100000"
using-nio="true"
interceptor-factory-chain="helloWorldInterceptorFactory"/>

TCP 连接事件

从版本 3.0 开始,实例会报告对实例的更改。 是 的子类,因此可以由 — 例如事件入站通道适配器中的任何定义接收。​​TcpConnection​​​​TcpConnectionEvent​​​​TcpConnectionEvent​​​​ApplicationEvent​​​​ApplicationListener​​​​ApplicationContext​

​TcpConnectionEvents​​具有以下属性:

  • ​connectionId​​:连接标识符,可以在消息标头中将其用于将数据发送到连接。
  • ​connectionFactoryName​​:连接所属的连接工厂的 Bean 名称。
  • ​throwable​​:(仅适用于事件)。ThrowableTcpConnectionExceptionEvent
  • ​source​​:这。 例如,您可以使用它来确定远程 IP 地址(需要强制转换)。TcpConnectiongetHostAddress()

此外,从版本 4.0 开始,TCP 连接工厂中讨论的标准反序列化程序现在在解码数据流时遇到问题时会发出实例。 这些事件包含异常、正在构建的缓冲区,以及在发生异常时到缓冲区的偏移量(如果可用)。 应用程序可以使用正常或(请参阅接收 Spring 应用程序事件)来捕获这些事件,从而允许分析问题。​​TcpDeserializationExceptionEvent​​​​ApplicationListener​​​​ApplicationEventListeningMessageProducer​

从版本 4.0.7 和 4.1.3 开始,每当服务器套接字上发生意外异常(例如,使用服务器套接字时)就会发布实例。 这些事件具有对连接工厂和原因的引用。​​TcpConnectionServerExceptionEvent​​​​BindException​

从版本 4.2 开始,每当终端节点(入站网关或协作出站通道适配器)收到由于标头无效而无法路由到连接的消息时,就会发布实例。 出站网关也会在收到延迟回复(发送方线程超时)时发布此事件。 该事件包含连接 ID 以及属性中的异常,其中包含失败的消息。​​TcpConnectionFailedCorrelationEvent​​​​ip_connectionId​​​​cause​

从版本 4.3 开始,启动服务器连接工厂时会发出 。 当工厂配置为侦听端口时,这很有用,这意味着操作系统选择端口。 它也可以用来代替 轮询 ,如果您需要在启动连接到套接字的其他进程之前等待。​​TcpConnectionServerListeningEvent​​​​0​​​​isListening()​

为了避免延迟侦听线程接受连接,事件发布在单独的线程上。

从版本 4.3.2 开始,只要无法创建客户端连接,就会发出 a。 事件的源是连接工厂,可用于确定无法建立连接的主机和端口。​​TcpConnectionFailedEvent​

TCP 适配器

提供了使用前面提到的连接工厂的 TCP 入站和出站通道适配器。 这些适配器具有两个相关属性:和 。 该属性指示将用于管理适配器的连接的连接工厂。 该属性指定消息到达出站适配器的通道以及入站适配器放置消息的通道。 虽然入站和出站适配器都可以共享一个连接工厂,但服务器连接工厂始终由入站适配器“拥有”。 客户端连接工厂始终由出站适配器“拥有”。 每种类型只有一个适配器可以获取对连接工厂的引用。 下面的示例演示如何定义客户端和服务器 TCP 连接工厂:​​connection-factory​​​​channel​​​​connection-factory​​​​channel​

<bean 
class="org.springframework.core.serializer.DefaultSerializer"/>
<bean
class="org.springframework.core.serializer.DefaultDeserializer"/>

<int-ip:tcp-connection-factory
type="server"
port="1234"
deserializer="javaDeserializer"
serializer="javaSerializer"
using-nio="true"
single-use="true"/>

<int-ip:tcp-connection-factory
type="client"
host="localhost"
port="#{server.port}"
single-use="true"
so-timeout="10000"
deserializer="javaDeserializer"
serializer="javaSerializer"/>

<int:channel />

<int:channel >
<int:queue/>
</int:channel>

<int-ip:tcp-outbound-channel-adapter
channel="input"
connection-factory="client"/>

<int-ip:tcp-inbound-channel-adapter
channel="replies"
connection-factory="client"/>

<int-ip:tcp-inbound-channel-adapter
channel="loop"
connection-factory="server"/>

<int-ip:tcp-outbound-channel-adapter
channel="loop"
connection-factory="server"/>

<int:channel />

另请参阅基于注释的配置和使用 Java DSL for TCP 组件。

在上述配置中,到达通道的消息通过连接工厂创建的连接进行序列化,在服务器上接收,并放置在通道上。 由于是 的输入通道,因此消息通过同一连接环回,由 接收并存放在该通道中。 Java 序列化用于网络。​​input​​​​client​​​​loop​​​​loop​​​​outboundServer​​​​inboundClient​​​​replies​

通常,入站适配器使用连接工厂,该工厂侦听传入的连接请求。 在某些情况下,您可能希望反向建立连接,以便入站适配器连接到外部服务器,然后等待该连接上的入站消息。​​type="server"​

入站适配器上的设置支持此拓扑。 在这种情况下,连接工厂的类型必须为 ,并且必须设置为 。​​client-mode="true"​​​​client​​​​single-use​​​​false​

另外两个属性支持此机制。 指定(以毫秒为单位)框架在连接失败后尝试重新连接的频率。 提供 来安排连接尝试并测试连接是否仍处于活动状态。​​retry-interval​​​​scheduler​​​​TaskScheduler​

如果不提供调度程序,则使用框架的默认 taskScheduler bean。

对于出站适配器,通常在发送第一条消息时建立连接。 在出站适配器上会导致在启动适配器时建立连接。 默认情况下,适配器会自动启动。 同样,连接工厂必须是类型并具有 。 A 和也受支持。 如果连接失败,调度程序将在发送下一条消息时重新建立连接。​​client-mode="true"​​​​client​​​​single-use="false"​​​​retry-interval​​​​scheduler​

对于入站和出站,如果适配器已启动,则可以通过发送以下命令来强制适配器建立连接:。 然后,您可以使用 检查当前状态。​​<control-bus />​​​​@adapter_id.retryConnection()​​​​@adapter_id.isClientModeConnected()​

TCP 网关

入站 TCP 网关和出站 TCP 网关分别使用服务器和客户端连接工厂。 每个连接一次可以处理一个请求或响应。​​TcpInboundGateway​​​​TcpOutboundGateway​

入站网关在使用传入有效负载构造消息并将其发送到 后,等待响应,并通过将响应消息中的有效负载写入连接来发送该有效负载。​​requestChannel​

对于入站网关,您必须保留或填充标头,因为它用于将消息与连接相关联。 源自网关的邮件会自动设置标头。 如果回复被构造为新消息,则需要设置标头。 可以从传入消息中捕获标头值。​​ip_connectionId​

与入站适配器一样,入站网关通常使用连接工厂,该工厂侦听传入的连接请求。 在某些情况下,您可能希望反向建立连接,以便入站网关连接到外部服务器,然后等待并回复该连接上的入站邮件。​​type="server"​

在入站网关上使用支持此拓扑。 在这种情况下,连接工厂的类型必须为 ,并且必须设置为 。​​client-mode="true"​​​​client​​​​single-use​​​​false​

另外两个属性支持此机制。 指定(以毫秒为单位)框架在连接失败后尝试重新连接的频率。 提供 来安排连接尝试并测试连接是否仍处于活动状态。​​retry-interval​​​​scheduler​​​​TaskScheduler​

如果网关已启动,您可以通过发送命令来强制网关建立连接: 并使用 检查当前状态。​​<control-bus/>​​​​@adapter_id.retryConnection()​​​​@adapter_id.isClientModeConnected()​

出站网关通过连接发送消息后,等待响应,构造响应消息,并将其放在回复通道上。 通过连接进行的通信是单线程的。 一次只能处理一条消息。 如果另一个线程在收到当前响应之前尝试发送消息,它将阻塞,直到之前的任何请求完成(或超时)。 但是,如果客户端连接工厂配置为一次性连接,则每个新请求都会获得自己的连接并立即处理。 以下示例配置入站 TCP 网关:

<int-ip:tcp-inbound-gateway 
request-channel="tcpChannel"
reply-channel="replyChannel"
connection-factory="cfServer"
reply-timeout="10000"/>

如果使用配置了默认序列化程序或反序列化程序的连接工厂,则消息是分隔数据,网关可由简单客户端(如 telnet)使用。​​\r\n​

以下示例显示了一个出站 TCP 网关:

<int-ip:tcp-outbound-gateway 
request-channel="tcpChannel"
reply-channel="replyChannel"
connection-factory="cfClient"
request-timeout="10000"
remote-timeout="10000"/> <!-- or e.g. remote-timeout-expression="headers['timeout']" -->

当前不适用于出站网关。​​client-mode​

从版本 5.2 开始,可以使用属性 配置出站网关。 如果连接工厂配置为(每个请求/回复的新连接),网关将关闭输出流;这会向服务器发出 EOF 信号。 如果服务器使用 EOF 来确定消息的结尾,而不是流中的某个分隔符,但保持连接打开以接收回复,这将非常有用。​​closeStreamAfterSend​​​​single-use​

通常,调用线程将在网关中阻塞,等待回复(或超时)。 从版本 5.3 开始,可以在网关上设置属性,并释放发送线程以执行其他工作。 回复(或错误)将在接收线程上发送。 这仅适用于使用 ,在使用 NIO 时会忽略它,因为存在争用条件,即收到回复后发生的套接字错误可以在回复之前传递到网关。​​async​​​​TcpNetClientConnectionFactory​

使用共享连接 () 时,新请求(而另一个请求正在处理)将被阻止,直到收到当前回复。 如果您希望在长期连接池上支持并发请求,请考虑使用 。​​singleUse=false​​​​CachingClientConnectionFactory​

从版本 5.4 开始,可以使用 . 未经请求的入站消息以及延迟回复(客户端超时)将发送到此通道。 若要在服务器端支持此功能,现在可以向连接工厂注册多个。 网关和通道适配器会自动注册自身。 从服务器发送未经请求的消息时,必须将相应的消息添加到发送的消息中。​​unsolicitedMessageChannel​​​​TcpSender​​​​IpHeaders.CONNECTION_ID​

另请参阅基于注释的配置和使用 Java DSL for TCP 组件。

TCP 消息关联

IP端点的一个目标是提供与Spring Integration应用程序以外的系统的通信。 因此,默认情况下仅发送和接收消息负载。 从 3.0 开始,您可以使用 JSON、Java 序列化或自定义序列化程序和反序列化程序传输标头。 有关详细信息,请参阅传输标头。 服务器端的框架(使用网关时除外)或协作通道适配器不提供消息关联。在本文档的后面部分,我们将讨论可用于应用程序的各种关联技术。 在大多数情况下,这需要消息的特定应用程序级关联,即使消息负载包含一些自然关联数据(如订单号)。