Spring Cloud Stream集成Kafka

时间:2022-04-07 16:17:04

这里演示使用Spring Boot ,Spring Cloud集成Kafka来实现一个简单的实时流系统。

添加依赖

可以在https://start.spring.io创建一个基于spring boot的maven项目。

需要添加的主要依赖:spring-cloud-stream以及spring-cloud-starter-stream-kafka,如下:

<dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-actuator</artifactId>
  </dependency>
  <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-stream</artifactId>
  </dependency>
  <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-starter-stream-kafka</artifactId>
  </dependency>

添加dependencyManagement

<dependencyManagement>
  <dependencies>
    <dependency>
      <!-- Import dependency management from Spring Boot -->
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-dependencies</artifactId>
      <version>${spring-boot.version}</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-stream-dependencies</artifactId>
      <version>${spring-cloud-stream.version}</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>
  </dependencies>
    </dependencyManagement>

在<repository>节点添加:

<repository>
  <id>spring-milestones</id>
  <name>Spring Milestones</name>
  <url>http://repo.spring.io/libs-milestone</url>
  <snapshots>
    <enabled>false</enabled>
  </snapshots>
</repository>

定义kafka stream

package demo.streamkafka.stream;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;  

public interface GreetingsStreams {
    String INPUT = "input";
    String OUTPUT = "output";

    @Input(INPUT)
    SubscribableChannel inboundGreetings();

    @Output(OUTPUT)
    MessageChannel outboundGreetings();
}

这里分别定义了一个输出流和一个输入流:

  • outboundGreetings:输出流,它用于把消息写入Kafka的topic里
  • inboundGreetings:输入流,它用于消费kafka的消息。

Spring会用一个Java代理来试下GreetingsStreams接口。

配置Spring Cloud Stream

接着我们需要配置Spring Cloud Stream来绑定stream到GreetingsStreams:

package demo.streamkafka.config;

import demo.streamkafka.stream.GreetingsStreams;
import org.springframework.cloud.stream.annotation.EnableBinding;

@EnableBinding(GreetingsStreams.class)
public class StreamsConfig {
}

使用注释@EnableBinding启动绑定。

配置Kafka属性

spring boot的默认配置文件为src/main/resources/application.properties,或者是一个application.yaml。

使用application.yaml配置如下:

spring:
  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost:9092
      bindings:
        greetings-in:
          destination: greetings
          contentType: application/json
        greetings-out:
          destination: greetings
          contentType: application/json

配置文件里设置了:

  • kafka服务器的连接地址
  • kafka的topic为greetings
  • 发送消息的contentType为json

新建消息模型

package demo'.streamkafka.model;

public class Greetings {
    private long timestamp;
    private String message;
    //get/set方法
}

在Service层写消息到Kafka

package demo.streamkafka.service;

import demo.streamkafka.model.Greetings;
import demo.streamkafka.stream.GreetingsStreams;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.util.MimeTypeUtils;

@Service
public class GreetingsService {
    private final GreetingsStreams greetingsStreams;

    public GreetingsService(GreetingsStreams greetingsStreams) {
        this.greetingsStreams = greetingsStreams;
    }

    public void sendGreeting(final Greetings greetings) {

        MessageChannel messageChannel = greetingsStreams.outboundGreetings();
        messageChannel.send(MessageBuilder
                .withPayload(greetings)
                .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
                .build());
    }

在sendGreeting()方法接收消息对象Greetings,使用messageChannel来发送消息。

触发写消息Rest API

使用spring mvc来创建一个RestController,它提供一个Rest API让我们触发写消息到Kafka。GreetingsController会调用GreetingsService来发送消息。

package demo.streamkafka.web;

import demo.streamkafka.model.Greetings;
import demo.streamkafka.service.GreetingsService;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestController; 

@RestController
public class GreetingsController {
    private final GreetingsService greetingsService;

    public GreetingsController(GreetingsService greetingsService) {
        this.greetingsService = greetingsService;
    }

    @GetMapping("/greetings")
    @ResponseStatus(HttpStatus.ACCEPTED)
    public void greetings(@RequestParam("message") String message) {
        Greetings greetings = Greetings.builder()
            .message(message)
            .timestamp(System.currentTimeMillis())
            .build();

        greetingsService.sendGreeting(greetings);
    }
}

监听kafka的topic

GreetingsListener 是一个监听器,它用来监听kafka的主题是否有消息。

package demo.streamkafka.service;

import demo.streamkafka.model.Greetings;
import demo.streamkafka.stream.GreetingsStreams;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

@Component
public class GreetingsListener {
    @StreamListener(GreetingsStreams.INPUT)
    public void handleGreetings(@Payload Greetings greetings) {
        System.out.println("Received greetings: " +  greetings);
    }
}

StreamListener为GreetingsStreams.INPUT。如果有消息进入kafka就会被GreetingsListener监听到,并在handleGreetings()处理消费消息。

启动应用

最后就是要实现一个Spring Boot的Application用于启动应用。

package demo.streamkafka;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class StreamKafkaApplication {

    public static void main(String[] args) {
        SpringApplication.run(StreamKafkaApplication.class, args);
    }
}

到此就完成了一个简单的kafka消息流处理。使用rest api可以触发消息的发送。