文章目录
- 今日内容
- 1 实时流式计算
- 1.1 应用场景
- 1.2 技术方案选型
- 2 Kafka Stream
- 2.1 概述
- 2.2 KafkaStream
- 2.3 入门demo
- 2.3.1 需求分析
- 2.3.2 实现
- 2.3.2.1 添加依赖
- 2.3.2.2 创建快速启动,生成kafka流
- 2.3.2.3 修改生产者
- 2.3.2.4 修改消费者
- 2.3.2.5 测试
- 2.4 SpringBoot集合KafkaStream
- 2.4.1 创建自定配置参数类
- 2.4.2 修改配置文件
- 2.4.3 创建配置类创建KStream对象
- 2.4.4 测试
- 3 热点文章实时计算
- 3.1 思路说明
- 3.2 实现步骤
- 3.3 具体实现
- 3.3.1 为行为微服务添加kafka配置
- 3.3.2 行为微服务中发送消息的消息体实体类
- 3.3.3 定义kafka流接收的topic
- 3.3.4 修改用户行为后的逻辑(相当于生产者)
- 3.3.5 Stream聚合
- 3.3.5.1 创建自定配置参数类
- 3.3.5.2 添加kafkaStream的配置
- 3.3.5.3 定义kafka流转发的topic
- 3.3.5.4 文章微服务中的发送消息的消息体实体类
- 3.3.5.5 创建配置类创建KStream对象
- 3.3.6 创建消费者,用于监听聚合后的消息
- 3.3.7 在文章微服务中更新当前分值
- 3.3.8 Service添加updateScore方法
- 3.4 测试
今日内容
1 实时流式计算
1.1 应用场景
1.2 技术方案选型
2 Kafka Stream
2.1 概述
2.2 KafkaStream
2.3 入门demo
2.3.1 需求分析
2.3.2 实现
还是在kafka-demo的模块里实现
2.3.2.1 添加依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<exclusions>
<exclusion>
<artifactId>connect-json</artifactId>
<groupId>org.apache.kafka</groupId>
</exclusion>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
</dependency>
2.3.2.2 创建快速启动,生成kafka流
public class KafkaStreamQuickStart {
public static void main(String[] args) {
//kafka的配置信息
Properties prop = new Properties();
prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.204.129:9092");
prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"streams-quickstart");
//stream 构建器
StreamsBuilder streamsBuilder = new StreamsBuilder();
//流式计算
streamProcessor(streamsBuilder);
//创建kafkaStream对象
KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(),prop);
//开启流式计算
kafkaStreams.start();
}
/**
* 流式计算
* 消息的内容:hello kafka hello itcast
* @param streamsBuilder
*/
private static void streamProcessor(StreamsBuilder streamsBuilder) {
//创建kstream对象,同时指定从那个topic中接收消息
KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");
/**
* 处理消息的value
*/
stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {
@Override
public Iterable<String> apply(String value) {
return Arrays.asList(value.split(" "));
}
})
//按照value进行聚合处理
.groupBy((key,value)->value)
//时间窗口
.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
//统计单词的个数
.count()
//转换为kStream
.toStream()
.map((key,value)->{
System.out.println("key:"+key+",vlaue:"+value);
return new KeyValue<>(key.key().toString(),value.toString());
})
//发送消息
.to("itcast-topic-out");
}
}
2.3.2.3 修改生产者
修改com.heima.kafka.sample.ProducerQuickStart的方法
public class ProducerQuickStart {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//1.kafka的配置信息
Properties properties = new Properties();
//kafka的连接地址
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.204.129:9092");
//发送失败,失败的重试次数
properties.put(ProducerConfig.RETRIES_CONFIG,5);
//消息key的序列化器
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
//消息value的序列化器
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
//2.生产者对象
KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);
/**
* 第一个参数:topic 第二个参数:key 第三个参数:value
*/
//封装发送的消息
//ProducerRecord<String,String> record = new ProducerRecord<String, String>("topic-first","key-001","hello kafka");
for(int i=0;i<5;i++){
ProducerRecord<String,String> record = new ProducerRecord<String, String>("itcast-topic-input","hello kafka"+" "+i);
//3.发送消息
producer.send(record);
}
producer.close();
2.3.2.4 修改消费者
修改com.heima.kafka.sample.ConsumerQuickStart的方法
public class ConsumerQuickStart {
public static void main(String[] args) {
//1.添加kafka的配置信息
Properties properties = new Properties();
//kafka的连接地址
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.204.129:9092");
//消费者组
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
//消息的反序列化器
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
//手动提交偏移量
//properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
//2.消费者对象
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
//3.订阅主题
consumer.subscribe(Collections.singletonList("itcast-topic-out"));
//当前线程一直处于监听状态
while (true) {
//4.获取消息
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord.key());
System.out.println(consumerRecord.value());
System.out.println(consumerRecord.offset());
System.out.println(consumerRecord.partition());
}
}
}
}
2.3.2.5 测试
先启动消费者,再启动kafkaStream,再启动生产者
发送消息为"hello kafka"+" "+i
,一共五次
符合我们发的
2.4 SpringBoot集合KafkaStream
2.4.1 创建自定配置参数类
在kafka-demo中创建com.heima.kafka.config.KafkaStreamConfig类
/**
* 通过重新注册KafkaStreamsConfiguration对象,设置自定配置参数
*/
@Getter
@Setter
@Configuration
@EnableKafkaStreams
@ConfigurationProperties(prefix="kafka")
public class KafkaStreamConfig {
private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024;
private String hosts;
private String group;
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);//连接信息
props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid");//组
props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid");//应用名称
props.put(StreamsConfig.RETRIES_CONFIG, 10);//重试次数
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());//key序列化器
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
return new KafkaStreamsConfiguration(props);
}
}
2.4.2 修改配置文件
修改heima-leadnews-test/kafka-demo/src/main/resources/application.yaml
将其放到最底下
kafka:
hosts: 192.168.204.129:9092
group: ${spring.application.name}
server:
port: 9991
spring:
application:
name: kafka-demo
kafka:
bootstrap-servers: 192.168.204.129:9092
producer:
retries: 10
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: ${spring.application.name}-test
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
kafka:
hosts: 192.168.204.129:9092
group: ${spring.application.name}
2.4.3 创建配置类创建KStream对象
创建com.heima.kafka.stream.KafkaStreamHelloListener
等于KStream放入spring容器中进行直接监听
@Configuration
@Slf4j
public class KafkaStreamHelloListener {
@Bean
public KStream<String,String> kStream(StreamsBuilder streamsBuilder){
//创建kstream对象,同时指定从那个topic中接收消息
KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");
stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {
@Override
public Iterable<String> apply(String value) {
return Arrays.asList(value.split(" "));
}
})
//根据value进行聚合分组
.groupBy((key,value)->value)
//聚合计算时间间隔
.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
//求单词的个数
.count()
.toStream()
//处理后的结果转换为string字符串
.map((key,value)->{
System.out.println("key:"+key+",value:"+value);
return new KeyValue<>(key.key().toString(),value.toString());
})
//发送消息
.to("itcast-topic-out");
return stream;
}
}
2.4.4 测试
启动kafka启动类,启动消费者和生产者
发送消息是"hello kafka",一共五次
3 热点文章实时计算
3.1 思路说明
3.2 实现步骤
3.3 具体实现
3.3.1 为行为微服务添加kafka配置
在heima-leadnews-behavior微服务中集成kafka生产者配置
spring:
application:
name: leadnews-behavior
kafka:
bootstrap-servers: 192.168.204.129:9092
producer:
retries: 10
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
3.3.2 行为微服务中发送消息的消息体实体类
定义消息发送封装类:UpdateArticleMess
在heima-leadnews-model中创建com.heima.model.message.UpdateArticleMess实体类
package com.heima.model.message;
import lombok.Data;
@Data
public class UpdateArticleMess {
/**
* 修改文章的字段类型
*/
private UpdateArticleType type;
/**
* 文章ID
*/
private Long articleId;
/**
* 修改数据的增量,可为正负
*/
private Integer add;
public enum UpdateArticleType{
COLLECTION,COMMENT,LIKES,VIEWS;
}
}
3.3.3 定义kafka流接收的topic
在heima-leadnews-common中创建com.heima.common.constants.HotArticleConstants常量类
package com.heima.common.constants;
public class HotArticleConstants {
public static final String HOT_ARTICLE_SCORE_TOPIC="hot.article.score.topic";
}
3.3.4 修改用户行为后的逻辑(相当于生产者)
点赞之后就要发送消息了,所以去修改用户点赞的实现类com.heima.behavior.service.impl.ApLikesBehaviorServiceImpl
@Service
@Transactional
@Slf4j
public class ApLikesBehaviorServiceImpl implements ApLikesBehaviorService {
@Autowired
private CacheService cacheService;
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Override
public ResponseResult like(LikesBehaviorDto dto) {
//1.检查参数
if (dto == null || dto.getArticleId() == null || checkParam(dto))