Spring 集成提供通道适配器和其他实用程序组件,以与内存数据网格 Hazelcast 进行交互。
您需要将此依赖项包含在项目中:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-hazelcast</artifactId>
<version>6.0.0</version>
</dependency>
Hazelcast 组件的 XML 命名空间和架构位置定义包括:
xmlns:int-hazelcast="http://www.springframework.org/schema/integration/hazelcast"
xsi:schemaLocation="http://www.springframework.org/schema/integration/hazelcast
https://www.springframework.org/schema/integration/hazelcast/spring-integration-hazelcast.xsd"
Hazelcast 事件驱动的入站通道适配器
Hazelcast提供分布式数据结构,例如:
-
com.hazelcast.map.IMap
-
com.hazelcast.multimap.MultiMap
-
com.hazelcast.collection.IList
-
com.hazelcast.collection.ISet
-
com.hazelcast.collection.IQueue
-
com.hazelcast.topic.ITopic
-
com.hazelcast.replicatedmap.ReplicatedMap
它还提供事件侦听器,以便侦听对这些数据结构所做的修改。
-
com.hazelcast.core.EntryListener<K, V>
-
com.hazelcast.collection.ItemListener
-
com.hazelcast.topic.MessageListener
Hazelcast 事件驱动的入站通道适配器侦听相关的缓存事件,并将事件消息发送到定义的通道。 它支持XML和JavaConfig驱动的配置。
XML 配置:
<int-hazelcast:inbound-channel-adapter channel="mapChannel"
cache="map"
cache-events="UPDATED, REMOVED"
cache-listening-policy="SINGLE" />
Hazelcast 事件驱动的入站通道适配器需要以下属性:
-
channel
:指定消息发送到的通道;
-
cache
:指定侦听的分布式对象引用。 这是一个强制性属性;
-
cache-events
:指定侦听的缓存事件。 它是一个可选属性,其默认值为 。 其支持的值如下:ADDED
- 和 支持的缓存事件类型:、、、 和
IMap
MultiMap
ADDED
REMOVED
UPDATED
EVICTED
EVICT_ALL
CLEAR_ALL
;
- 支持的缓存事件类型:、、、
ReplicatedMap
ADDED
REMOVED
UPDATED
EVICTED
;
- 支持的缓存事件类型 和 : , 。 没有 的缓存事件类型。
IList
ISet
IQueue
ADDED
REMOVED
ITopic
-
cache-listening-policy
:将缓存侦听策略指定为 或 。 它是一个可选属性,其默认值为 。 侦听具有相同缓存事件属性的同一缓存对象的每个 Hazelcast 入站通道适配器都可以接收单个事件消息或所有事件消息。 如果是,则侦听具有相同缓存事件属性的同一缓存对象的所有 Hazelcast 入站通道适配器都将收到所有事件消息。 如果是,他们将收到唯一的事件消息。SINGLE
ALL
SINGLE
ALL
SINGLE
一些配置示例:
分布式地图
<int:channel />
<int-hazelcast:inbound-channel-adapter channel="mapChannel"
cache="map"
cache-events="UPDATED, REMOVED" />
<bean factory-bean="instance" factory-method="getMap">
<constructor-arg value="map"/>
</bean>
<bean class="com.hazelcast.core.Hazelcast"
factory-method="newHazelcastInstance">
<constructor-arg>
<bean class="com.hazelcast.config.Config" />
</constructor-arg>
</bean>
分布式多地图
<int-hazelcast:inbound-channel-adapter channel="multiMapChannel"
cache="multiMap"
cache-events="ADDED, REMOVED, CLEAR_ALL" />
<bean factory-bean="instance" factory-method="getMultiMap">
<constructor-arg value="multiMap"/>
</bean>
分布式列表
<int-hazelcast:inbound-channel-adapter channel="listChannel"
cache="list"
cache-events="ADDED, REMOVED"
cache-listening-policy="ALL" />
<bean factory-bean="instance" factory-method="getList">
<constructor-arg value="list"/>
</bean>
分布式集
<int-hazelcast:inbound-channel-adapter channel="setChannel" cache="set" />
<bean factory-bean="instance" factory-method="getSet">
<constructor-arg value="set"/>
</bean>
分布式队列
<int-hazelcast:inbound-channel-adapter channel="queueChannel"
cache="queue"
cache-events="REMOVED"
cache-listening-policy="ALL" />
<bean factory-bean="instance" factory-method="getQueue">
<constructor-arg value="queue"/>
</bean>
分布式主题
<int-hazelcast:inbound-channel-adapter channel="topicChannel" cache="topic" />
<bean factory-bean="instance" factory-method="getTopic">
<constructor-arg value="topic"/>
</bean>
复制的地图
<int-hazelcast:inbound-channel-adapter channel="replicatedMapChannel"
cache="replicatedMap"
cache-events="ADDED, UPDATED, REMOVED"
cache-listening-policy="SINGLE" />
<bean factory-bean="instance" factory-method="getReplicatedMap">
<constructor-arg value="replicatedMap"/>
</bean>
Java 配置示例:
以下示例显示了一个配置。 相同的配置可用于其他分布式数据结构(、、、、 和 ):DistributedMap
IMap
MultiMap
ReplicatedMap
IList
ISet
IQueue
ITopic
@Bean
public PollableChannel distributedMapChannel() {
return new QueueChannel();
}
@Bean
public IMap<Integer, String> distributedMap() {
return hazelcastInstance().getMap("Distributed_Map");
}
@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance();
}
@Bean
public HazelcastEventDrivenMessageProducer hazelcastEventDrivenMessageProducer() {
final HazelcastEventDrivenMessageProducer producer = new HazelcastEventDrivenMessageProducer(distributedMap());
producer.setOutputChannel(distributedMapChannel());
producer.setCacheEventTypes("ADDED,REMOVED,UPDATED,CLEAR_ALL");
producer.setCacheListeningPolicy(CacheListeningPolicyType.SINGLE);
return producer;
}
榛子连续查询入站通道适配器
Hazelcast 连续查询允许侦听对特定地图条目执行的修改。 Hazelcast 连续查询入站通道适配器是一个事件驱动的通道适配器,它根据定义的谓词侦听相关的分布式映射事件。
@Bean
public PollableChannel cqDistributedMapChannel() {
return new QueueChannel();
}
@Bean
public IMap<Integer, String> cqDistributedMap() {
return hazelcastInstance().getMap("CQ_Distributed_Map");
}
@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance();
}
@Bean
public HazelcastContinuousQueryMessageProducer hazelcastContinuousQueryMessageProducer() {
final HazelcastContinuousQueryMessageProducer producer =
new HazelcastContinuousQueryMessageProducer(cqDistributedMap(), "surname=TestSurname");
producer.setOutputChannel(cqDistributedMapChannel());
producer.setCacheEventTypes("UPDATED");
producer.setIncludeValue(false);
return producer;
}
它支持六个属性,如下所示:
-
channel
:指定消息发送到的通道;
-
cache
:指定侦听的分布式映射引用。 命令的;
-
cache-events
:指定侦听的缓存事件。 作为其默认值的可选属性。 支持的值为 、、 和ADDED
ADDED
REMOVED
UPDATED
EVICTED
EVICT_ALL
CLEAR_ALL
;
-
predicate
:指定用于侦听对特定映射条目执行的修改的谓词。 命令的;
-
include-value
:指定在连续查询结果中包含值和旧值。 可选为默认值;true
-
cache-listening-policy
:将缓存侦听策略指定为 或 。 可选,默认值为 。 每个侦听具有相同缓存事件属性的相同缓存对象的 Hazelcast CQ 入站通道适配器都可以接收单个事件消息或所有事件消息。 如果是,则侦听具有相同缓存事件属性的同一缓存对象的所有 Hazelcast CQ 入站通道适配器都将收到所有事件消息。 如果是,他们将收到唯一的事件消息。SINGLE
ALL
SINGLE
ALL
SINGLE
Hazelcast 群集监视器入站通道适配器
Hazelcast 群集监视器支持侦听在群集上执行的修改。 Hazelcast 群集监视器入站通道适配器是一个事件驱动的通道适配器,侦听相关的成员资格、分布式对象、迁移、生命周期和客户端事件:
@Bean
public PollableChannel eventChannel() {
return new QueueChannel();
}
@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance();
}
@Bean
public HazelcastClusterMonitorMessageProducer hazelcastClusterMonitorMessageProducer() {
HazelcastClusterMonitorMessageProducer producer = new HazelcastClusterMonitorMessageProducer(hazelcastInstance());
producer.setOutputChannel(eventChannel());
producer.setMonitorEventTypes("DISTRIBUTED_OBJECT");
return producer;
}
它支持以下三个属性:
-
channel
:指定消息发送到的通道;
-
hazelcast-instance
:指定用于侦听群集事件的 Hazelcast 实例引用。 这是一个强制性属性;
-
monitor-types
:指定侦听的监视器类型。 它是一个可选属性,是默认值。 支持的值为 、、、、。MEMBERSHIP
MEMBERSHIP
DISTRIBUTED_OBJECT
MIGRATION
LIFECYCLE
CLIENT
Hazelcast 分布式 SQL 入站通道适配器
Hazelcast允许在分布式地图上运行分布式查询。 Hazelcast 分布式 SQL 入站通道适配器是一个轮询入站通道适配器。 它运行定义的分布式 sql 命令,并根据迭代类型返回结果。
@Bean
public PollableChannel dsDistributedMapChannel() {
return new QueueChannel();
}
@Bean
public IMap<Integer, String> dsDistributedMap() {
return hazelcastInstance().getMap("DS_Distributed_Map");
}
@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance();
}
@Bean
@InboundChannelAdapter(value = "dsDistributedMapChannel", poller = @Poller(maxMessagesPerPoll = "1"))
public HazelcastDistributedSQLMessageSource hazelcastDistributedSQLMessageSource() {
final HazelcastDistributedSQLMessageSource messageSource =
new HazelcastDistributedSQLMessageSource(dsDistributedMap(),
"name='TestName' AND surname='TestSurname'");
messageSource.setIterationType(DistributedSQLIterationType.ENTRY);
return messageSource;
}
它需要一个轮询器并支持四个属性:
-
channel
:指定消息发送到的通道。 这是一个强制性属性;
-
cache
:指定要查询的分布式引用。 它是必需属性;IMap
-
iteration-type
:指定结果类型。 分布式 SQL 可以在 、 或 上运行。 它是一个可选属性,是默认属性。 支持的值为 ,和EntrySet
KeySet
LocalKeySet
Values
VALUE
ENTRY, `KEY
LOCAL_KEY
VALUE
;
-
distributed-sql
:指定 sql 语句的 where 子句。 这是一个必需属性。
榛子出站通道适配器
Hazelcast 出站通道适配器侦听其定义的通道,并将传入消息写入相关的分布式缓存。 它需要 之一 或 用于分布式对象定义。 支持的分布式对象包括:、 和 。cache
cache-expression
HazelcastHeaders.CACHE_NAME
IMap
MultiMap
ReplicatedMap
IList
ISet
IQueue
ITopic
@Bean
public MessageChannel distributedMapChannel() {
return new DirectChannel();
}
@Bean
public IMap<Integer, String> distributedMap() {
return hzInstance().getMap("Distributed_Map");
}
@Bean
public HazelcastInstance hzInstance() {
return Hazelcast.newHazelcastInstance();
}
@Bean
@ServiceActivator(inputChannel = "distributedMapChannel")
public HazelcastCacheWritingMessageHandler hazelcastCacheWritingMessageHandler() {
HazelcastCacheWritingMessageHandler handler = new HazelcastCacheWritingMessageHandler();
handler.setDistributedObject(distributedMap());
handler.setKeyExpression(new SpelExpressionParser().parseExpression("payload.id"));
handler.setExtractPayload(true);
return handler;
}
它需要以下属性:
-
channel
:指定消息发送到的通道;
-
cache
:指定分布式对象引用。 自选;
-
cache-expression
:通过 Spring 表达式语言 (SpEL) 指定分布式对象。 自选;
-
key-expression
:通过 Spring 表达式语言 (SpEL) 指定键值对的键。 可选且仅对 和分布式数据结构是必需的。IMap
MultiMap
ReplicatedMap
-
extract-payload
:指定是发送整条消息还是仅发送有效负载。 默认属性为可选属性。 如果为 true,则只有有效负载将写入分布式对象。 否则,将通过转换消息头和有效负载来写入整个消息。true
通过在标头中设置分布式对象名称,可以通过同一通道将消息写入不同的分布式对象。 如果未定义 或 属性,则必须在请求中设置标头。cache
cache-expression
HazelcastHeaders.CACHE_NAME
Message
榛子领袖选举
如果需要领导者选举(例如,对于只有一个节点应该接收消息的高可用性消息使用者),可以使用基于 Hazelcast 的:LeaderInitiator
@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance();
}
@Bean
public LeaderInitiator initiator() {
return new LeaderInitiator(hazelcastInstance());
}
当一个节点被选为领导者时,它将向所有应用程序侦听器发送一个。OnGrantedEvent
榛子消息存储
对于分布式消息传递状态管理,例如对于持久或跟踪消息组,提供了以下实现:QueueChannel
Aggregator
HazelcastMessageStore
@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance();
}
@Bean
public MessageGroupStore messageStore() {
return new HazelcastMessageStore(hazelcastInstance());
}
默认情况下,用于将消息和组存储为键/值。 任何自定义都可以提供给 .SPRING_INTEGRATION_MESSAGE_STORE
IMap
IMap
HazelcastMessageStore
榛子广播元数据存储
可以使用后备 Hazelcast 来实现 。 默认地图是使用可自定义的名称创建的。ListenableMetadataStore
IMap
SPRING_INTEGRATION_METADATA_STORE
@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance();
}
@Bean
public MetadataStore metadataStore() {
return new HazelcastMetadataStore(hazelcastInstance());
}
该实现允许您注册自己的侦听器类型以通过 侦听事件。HazelcastMetadataStore
ListenableMetadataStore
MetadataStoreListener
addListener(MetadataStoreListener callback)
榛子锁注册表
可以使用后备 Hazelcast 分布式支持来实现 a:LockRegistry
ILock
@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance();
}
@Bean
public LockRegistry lockRegistry() {
return new HazelcastLockRegistry(hazelcastInstance());
}
当与共享(例如 存储管理),可用于跨多个应用程序实例提供此功能,以便一次只有一个实例可以操作组。MessageGroupStore
Aggregator
HazelcastLockRegistry
对于所有分布式操作,必须在 上启用 CP 子系统。HazelcastInstance |