SpringCloud Stream基本使用

时间:2022-10-01 21:53:29

介绍

Spring Cloud Stream是一个用于构建与共享消息系统连接的高度可伸缩的事件驱动微服务框架。

该框架提供了一个基于已建立且熟悉的Spring习惯用法和最佳实践的灵活编程模型,包括对持久发布/订阅语义、消费者组和有状态分区的支持。

主要应用场景

可能我们会遇到不同的系统在用不同的消息队列,比如系统A用的Kafka、系统B用的RabbitMQ,但是我们现在又没有学习过Kafka,那么怎么办呢?有没有一种方式像JDBC一样,我们只需要关心SQL和业务本身,而不用关心数据库的具体实现呢?

SpringCloud Stream能够做到,它能够屏蔽底层实现,我们使用统一的消息队列操作方式就能操作多种不同类型的消息队列。

SpringCloud Stream基本使用

它屏蔽了不同消息队列底层操作,让我们使用统一的Input和Output形式,以Binder为中间件,这样就算我们切换了不同的消息队列,也无需修改代码,而具体某种消息队列的底层实现是交给Stream在做的。

Binder
Binder是SpringCloud Stream的一个抽象概念,是应用与消息中间件之间的粘合剂,目前SpringCloud Stream实现了Kafka、RabbitMQ和RocketMQ等的binder

通过binder,可以很方便的连接中间件,可以动态的改变消息的destinations(对应于 Kafka的topic,RabbitMQ的exchange),这些都可以通过外部配置项来做到,甚至可以任意的改变中间件的类型但是不需要修改一行业务代码

实践

Linux安装RabbitMQ

接下来创建一个springboot父子项目来演示一下其基本使用(以rabbitmq为例):

SpringCloud Stream基本使用

父工程依赖:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-dependencies</artifactId>
    <version>2021.0.1</version>
    <type>pom</type>
    <scope>import</scope>
</dependency>

SpringCloud Stream基本使用

子模块依赖:

<dependencies>
    <!--  RabbitMQ的Stream实现  -->
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
</dependencies>

SpringCloud Stream基本使用

生产者

生产者配置文件:

server:
  port: 8801

spring:
  application:
    name: stream-publisher
  cloud:
    stream:
      binders: # 在此处配置要绑定的rabbitmq的服务信息
        defaultRabbit: # 表示定义的名称,用于binding整合(随意)
          type: rabbit # 消息组件类型
          environment: # 设置rabbitmq的相关环境配置
            spring:
              rabbitmq:
                host: 47.96.156.51  # RabbitMQ在本机的用localhost,在服务器的用服务器的ip地址
                port: 5672
                username: admin
                password: admin
                virtual-host: /
      bindings: # 服务的整合处理
        output: # 这个名字是一个通道channel的名称
          destination: studyExchange # 表示要使用的Exchange名称
          content-type: text/plain  # 设置消息类型,application/json -> json格式,本文要设置为 text/plain -> 文本类型

定义发送消息接口:

public interface IMessagePublisher {
    void publish(String message);
}

接口实现类:

@EnableBinding(Source.class)    // 定义消息的推送管道(Source是spring的)
@Slf4j
public class MessagePublishImpl implements IMessagePublisher {

    @Resource
    private MessageChannel output;  // 消息发送管道

    @Override
    public void publish(String message) {

        log.info("发送消息:{}", message);
        // MessageBuilder是spring的integration.support.MessageBuilder
        output.send(MessageBuilder.withPayload(message).build());

    }

}

Source接口

SpringCloud Stream基本使用

新建controller.PublishController

@RestController
public class PublishController {

    @Resource
    IMessagePublisher publisher;

    @RequestMapping("/publish")
    public String publish(String message) {
        publisher.publish(message);
        return "消息发送成功!" + new Date();
    }
}

消费者

配置文件:

server:
  port: 8802

spring:
  application:
    name: stream-consumer
  cloud:
    stream:
      binders: # 在此处配置要绑定的rabbitmq的服务信息
        defaultRabbit: # 表示定义的名称,用于binding整合(随意)
          type: rabbit # 消息组件类型
          environment: # 设置rabbitmq的相关环境配置
            spring:
              rabbitmq:
                host: 47.96.156.51  # RabbitMQ在本机的用localhost,在服务器的用服务器的ip地址
                port: 5672
                username: admin
                password: admin
                virtual-host: /
      bindings: # 服务的整合处理
        input: # 这个名字是一个通道channel的名称
          destination: studyExchange # 表示要使用的Exchange名称
          content-type: text/plain  # 设置消息类型,application/json -> json格式,本文要设置为 text/plain -> 文本类型

监听消息:

@EnableBinding(Sink.class) // (Sink也是spring的)
public class ReceiveMessageListener {

    @StreamListener(Sink.INPUT) // 监听
    public void input(Message<String> message) {
        System.out.println("消费者1号------>收到的消息:" + message.getPayload());
    }
}

Sink接口

SpringCloud Stream基本使用

注意: output输入信道是stream自带的,还自带了一个输出信道input,上述两个接口。

启动两个项目进行测试,项目启动完成发现studyExchange交换机已经创建好了。

SpringCloud Stream基本使用

调用发送消息接口,消息发送成功

SpringCloud Stream基本使用

消费者也成功消费消息

SpringCloud Stream基本使用

使用自定义信道实现消息传递

上述代码实现是通过stream默认的信道完成的,本部分实现通过自定义信道实现

类比stream默认信道,创建两个自定义信道MySource、MySink

public interface MySource {
    /**
     * Name of the output channel.
     */
    String OUTPUT1 = "output1";

    /**
     * @return output channel
     */
    @Output(OUTPUT1)
    MessageChannel output();
}
public interface MySink {
    /**
     * Input channel name.
     */
    String INPUT1 = "input1";

    /**
     * @return input channel.
     */
    @Input(INPUT1)
    SubscribableChannel input();
}

生产者新增配置项:

SpringCloud Stream基本使用

同理消费者新增配置项:

SpringCloud Stream基本使用

接下来改造发送消息实现类

SpringCloud Stream基本使用

修改该类,EnableBinding注解的值改为绑定多个传入信道接口,然后使用我们自定义信道发送消息。

消费者消费消息

SpringCloud Stream基本使用

同理,EnableBinding注解的值改为绑定多个信道接口,新建一个方法监听即可。

启动项目,进行测试:

SpringCloud Stream基本使用

发现我们自定义信道消息也能正常被消费到。

SpringCloud Stream基本使用

@EnableBinding注解过时

由上图可以看到 @EnableBinding 注解貌似已经过时了。

@EnableBinding源码中明确声明,该注解在从3.1版本开始被弃用,推荐我们使用函数编程的方式

接下来会演示下这种方式基本使用:

生产者案例:

yml配置:

server:
  port: 8801

spring:
  application:
    name: stream-publisher
  cloud:
    stream:
      binders: # 在此处配置要绑定的rabbitmq的服务信息
        defaultRabbit: # 表示定义的名称,用于binding整合(随意)
          type: rabbit # 消息组件类型
          environment: # 设置rabbitmq的相关环境配置
            spring:
              rabbitmq:
                host: 47.96.156.51  # RabbitMQ在本机的用localhost,在服务器的用服务器的ip地址
                port: 5672
                username: admin
                password: admin
                virtual-host: /
      bindings: # 服务的整合处理
        myChannel-out-0:
          destination: demo #表示要使用Exchange名称定义
          contentType: text/plain

注意使用这种方式,bingdings 集合中的key由 通道名-out/in-数字组成

新版发送消息:

@RestController
public class PublishController {

    @Resource
    StreamBridge bridge;

    @RequestMapping("/publish")
    public String publish(String message) {
        bridge.send("myChannel-out-0", message);
        return "消息发送成功!" + new Date();
    }
}

@Autowire注解自动注入StreamBridge的实例,直接使用StreamBridge发送消息,StreamBridge的send方法第一个参数是binding的名字,第二个参数是想要发送的消息。

消费者案例:

yml配置:

server:
  port: 8802

spring:
  application:
    name: stream-consumer
  cloud:
    stream:
      binders: # 在此处配置要绑定的rabbitmq的服务信息
        defaultRabbit: # 表示定义的名称,用于binding整合(随意)
          type: rabbit # 消息组件类型
          environment: # 设置rabbitmq的相关环境配置
            spring:
              rabbitmq:
                host: 47.96.156.51  # RabbitMQ在本机的用localhost,在服务器的用服务器的ip地址
                port: 5672
                username: admin
                password: admin
                virtual-host: /
      bindings: # 服务的整合处理
        myChannel-in-0:
          destination: demo
          contentType: text/plain

消费者消费消息:

@Component
public class ConsumerComponent {

    @Bean("myChannel")  
    public Consumer<String> consumer() {
        return message -> System.out.println("新版本消费消息:" + message);
    }
    
    //@Bean
    //public Consumer<String> myChannel() {
    //    return message -> System.out.println("新版本消费消息:" + message);
    //}
}

注意:@Bean里是yml配置文件中通道名称,这样生产者发送的数据才会正确到达,应用程序启动后会自动接收生产者发送的消息;

或者是方法名为yml配置文件中通道名称,两种方式都能正常消费消息。

启动项目进行测试:

交换机正常创建:

SpringCloud Stream基本使用

消息发送成功:

SpringCloud Stream基本使用

消息正常被消费:

SpringCloud Stream基本使用

完毕!

参考博客:

SpringCloud Stream @EnableBinding注解过时