【黑马头条】-day11热点文章实时计算-kafka-kafkaStream-Redis

时间:2024-04-27 07:18:39

文章目录

  • 今日内容
  • 1 实时流式计算
    • 1.1 应用场景
    • 1.2 技术方案选型
  • 2 Kafka Stream
    • 2.1 概述
    • 2.2 KafkaStream
    • 2.3 入门demo
      • 2.3.1 需求分析
      • 2.3.2 实现
        • 2.3.2.1 添加依赖
        • 2.3.2.2 创建快速启动,生成kafka流
        • 2.3.2.3 修改生产者
        • 2.3.2.4 修改消费者
        • 2.3.2.5 测试
    • 2.4 SpringBoot集合KafkaStream
      • 2.4.1 创建自定配置参数类
      • 2.4.2 修改配置文件
      • 2.4.3 创建配置类创建KStream对象
      • 2.4.4 测试
  • 3 热点文章实时计算
    • 3.1 思路说明
    • 3.2 实现步骤
    • 3.3 具体实现
      • 3.3.1 为行为微服务添加kafka配置
      • 3.3.2 行为微服务中发送消息的消息体实体类
      • 3.3.3 定义kafka流接收的topic
      • 3.3.4 修改用户行为后的逻辑(相当于生产者)
      • 3.3.5 Stream聚合
        • 3.3.5.1 创建自定配置参数类
        • 3.3.5.2 添加kafkaStream的配置
        • 3.3.5.3 定义kafka流转发的topic
        • 3.3.5.4 文章微服务中的发送消息的消息体实体类
        • 3.3.5.5 创建配置类创建KStream对象
      • 3.3.6 创建消费者,用于监听聚合后的消息
      • 3.3.7 在文章微服务中更新当前分值
      • 3.3.8 Service添加updateScore方法
    • 3.4 测试


今日内容

在这里插入图片描述

1 实时流式计算

在这里插入图片描述

1.1 应用场景

在这里插入图片描述

1.2 技术方案选型

在这里插入图片描述

2 Kafka Stream

2.1 概述

在这里插入图片描述

在这里插入图片描述

2.2 KafkaStream

在这里插入图片描述

2.3 入门demo

2.3.1 需求分析

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

2.3.2 实现

在这里插入图片描述

还是在kafka-demo的模块里实现

2.3.2.1 添加依赖
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <exclusions>
        <exclusion>
            <artifactId>connect-json</artifactId>
            <groupId>org.apache.kafka</groupId>
        </exclusion>
        <exclusion>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
        </exclusion>
    </exclusions>
</dependency>
2.3.2.2 创建快速启动,生成kafka流
public class KafkaStreamQuickStart {

    public static void main(String[] args) {

        //kafka的配置信息
        Properties prop = new Properties();
        prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.204.129:9092");
        prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"streams-quickstart");

        //stream 构建器
        StreamsBuilder streamsBuilder = new StreamsBuilder();

        //流式计算
        streamProcessor(streamsBuilder);

        //创建kafkaStream对象
        KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(),prop);
        //开启流式计算
        kafkaStreams.start();
    }

    /**
     * 流式计算
     * 消息的内容:hello kafka  hello itcast
     * @param streamsBuilder
     */
    private static void streamProcessor(StreamsBuilder streamsBuilder) {
        //创建kstream对象,同时指定从那个topic中接收消息
        KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");
        /**
         * 处理消息的value
         */
        stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {
                    @Override
                    public Iterable<String> apply(String value) {
                        return Arrays.asList(value.split(" "));
                    }
                })
                //按照value进行聚合处理
                .groupBy((key,value)->value)
                //时间窗口
                .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
                //统计单词的个数
                .count()
                //转换为kStream
                .toStream()
                .map((key,value)->{
                    System.out.println("key:"+key+",vlaue:"+value);
                    return new KeyValue<>(key.key().toString(),value.toString());
                })
                //发送消息
                .to("itcast-topic-out");
    }
}
2.3.2.3 修改生产者

修改com.heima.kafka.sample.ProducerQuickStart的方法

public class ProducerQuickStart {
 
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //1.kafka的配置信息
        Properties properties = new Properties();
        //kafka的连接地址
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.204.129:9092");
        //发送失败,失败的重试次数
        properties.put(ProducerConfig.RETRIES_CONFIG,5);
        //消息key的序列化器
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        //消息value的序列化器
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
 
        //2.生产者对象
        KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);

        /**
         * 第一个参数:topic 第二个参数:key 第三个参数:value
         */
        //封装发送的消息
        //ProducerRecord<String,String> record = new ProducerRecord<String, String>("topic-first","key-001","hello kafka");
        for(int i=0;i<5;i++){
            ProducerRecord<String,String> record = new ProducerRecord<String, String>("itcast-topic-input","hello kafka"+" "+i);
            //3.发送消息
            producer.send(record);
        }
        producer.close();

2.3.2.4 修改消费者

修改com.heima.kafka.sample.ConsumerQuickStart的方法

public class ConsumerQuickStart {
 
    public static void main(String[] args) {
        //1.添加kafka的配置信息
        Properties properties = new Properties();
        //kafka的连接地址
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.204.129:9092");
        //消费者组
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
        //消息的反序列化器
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        //手动提交偏移量
        //properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

        //2.消费者对象
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
 
        //3.订阅主题
        consumer.subscribe(Collections.singletonList("itcast-topic-out"));

 
        //当前线程一直处于监听状态
        while (true) {
            //4.获取消息
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord.key());
                System.out.println(consumerRecord.value());
                System.out.println(consumerRecord.offset());
                System.out.println(consumerRecord.partition());
            }
        }
 
    }
 
}
2.3.2.5 测试

先启动消费者,再启动kafkaStream,再启动生产者

发送消息为"hello kafka"+" "+i,一共五次

在这里插入图片描述

符合我们发的

2.4 SpringBoot集合KafkaStream

在这里插入图片描述

在这里插入图片描述

2.4.1 创建自定配置参数类

在kafka-demo中创建com.heima.kafka.config.KafkaStreamConfig类

/**
 * 通过重新注册KafkaStreamsConfiguration对象,设置自定配置参数
 */
@Getter
@Setter
@Configuration
@EnableKafkaStreams
@ConfigurationProperties(prefix="kafka")
public class KafkaStreamConfig {
    private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024;
    private String hosts;
    private String group;
    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);//连接信息
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid");//组
        props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid");//应用名称
        props.put(StreamsConfig.RETRIES_CONFIG, 10);//重试次数
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());//key序列化器
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        return new KafkaStreamsConfiguration(props);
    }
}

2.4.2 修改配置文件

修改heima-leadnews-test/kafka-demo/src/main/resources/application.yaml

将其放到最底下

kafka:
  hosts: 192.168.204.129:9092
  group: ${spring.application.name}
server:
  port: 9991
spring:
  application:
    name: kafka-demo
  kafka:
    bootstrap-servers: 192.168.204.129:9092
    producer:
      retries: 10
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: ${spring.application.name}-test
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
kafka:
  hosts: 192.168.204.129:9092
  group: ${spring.application.name}

2.4.3 创建配置类创建KStream对象

创建com.heima.kafka.stream.KafkaStreamHelloListener

等于KStream放入spring容器中进行直接监听

@Configuration
@Slf4j
public class KafkaStreamHelloListener {
    @Bean
    public KStream<String,String> kStream(StreamsBuilder streamsBuilder){
        //创建kstream对象,同时指定从那个topic中接收消息
        KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");
        stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {
            @Override
            public Iterable<String> apply(String value) {
                return Arrays.asList(value.split(" "));
            }
        })
                //根据value进行聚合分组
                .groupBy((key,value)->value)
                //聚合计算时间间隔
                .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
                //求单词的个数
                .count()
                .toStream()
                //处理后的结果转换为string字符串
                .map((key,value)->{
                    System.out.println("key:"+key+",value:"+value);
                    return new KeyValue<>(key.key().toString(),value.toString());
                })
                //发送消息
                .to("itcast-topic-out");
        return stream;
    }
}

2.4.4 测试

启动kafka启动类,启动消费者和生产者

发送消息是"hello kafka",一共五次

在这里插入图片描述

3 热点文章实时计算

3.1 思路说明

在这里插入图片描述

3.2 实现步骤

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

3.3 具体实现

3.3.1 为行为微服务添加kafka配置

在heima-leadnews-behavior微服务中集成kafka生产者配置

spring:
  application:
    name: leadnews-behavior
  kafka:
    bootstrap-servers: 192.168.204.129:9092
    producer:
      retries: 10
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

3.3.2 行为微服务中发送消息的消息体实体类

定义消息发送封装类:UpdateArticleMess

在heima-leadnews-model中创建com.heima.model.message.UpdateArticleMess实体类

package com.heima.model.message;
 
import lombok.Data;
 
@Data
public class UpdateArticleMess {
 
    /**
     * 修改文章的字段类型
      */
    private UpdateArticleType type;
    /**
     * 文章ID
     */
    private Long articleId;
    /**
     * 修改数据的增量,可为正负
     */
    private Integer add;
 
    public enum UpdateArticleType{
        COLLECTION,COMMENT,LIKES,VIEWS;
    }
}

3.3.3 定义kafka流接收的topic

在heima-leadnews-common中创建com.heima.common.constants.HotArticleConstants常量类

package com.heima.common.constants;
public class HotArticleConstants {
    public static final String HOT_ARTICLE_SCORE_TOPIC="hot.article.score.topic";
}

3.3.4 修改用户行为后的逻辑(相当于生产者)

点赞之后就要发送消息了,所以去修改用户点赞的实现类com.heima.behavior.service.impl.ApLikesBehaviorServiceImpl

@Service
@Transactional
@Slf4j
public class ApLikesBehaviorServiceImpl implements ApLikesBehaviorService {

    @Autowired
    private CacheService cacheService;
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Override
    public ResponseResult like(LikesBehaviorDto dto) {

        //1.检查参数
        if (dto == null || dto.getArticleId() == null || checkParam(dto))