package
com.dzj.kafka_streaming.listener;
import
com.dzj.kafka_streaming.dto.TagNameTypeInfo;
import
com.dzj.kafka_streaming.service.ContentTagRelationService;
import
org.apache.kafka.clients.consumer.ConsumerRecord;
import
org.springframework.beans.factory.annotation.Autowired;
import
org.springframework.data.redis.core.RedisTemplate;
import
org.springframework.kafka.annotation.KafkaListener;
import
org.springframework.stereotype.Component;
import
com.alibaba.fastjson.JSONObject;
import
javax.annotation.Resource;
import
java.util.ArrayList;
import
java.util.Base64;
import
java.util.List;
/**
* "immersive_streaming_" + userId; 这是旧的key,需要清除
*/
@Component
public
class
MessageListener {
@Autowired
private
ContentTagRelationService relationService;
@Resource
private
RedisTemplate<String, Object> redisTemplate;
private
final
String TOPIC_NAME =
"event-trace-log"
;
// @KafkaListener(topics = {TOPIC_NAME},groupId = "itmentuGroup")
@KafkaListener
(topics = {TOPIC_NAME})
public
void
listener(ConsumerRecord<String,String> record) {
//获取消息
String message = record.value();
//消息偏移量
long
offset = record.offset();
String redisKeyPrefix =
"kafka:user_short_video_streaming:_"
;
JSONObject dataJson = parseJson(message);
String eventCode = dataJson.getString(
"eventCode"
);
if
(
"145001"
.equals(eventCode)){
// 测试环境------------------------------------------------------------------------------------------
// 目前只关注沉浸式中得数据
String resourceId = dataJson.getJSONObject(
"eventBody"
).getString(
"resourceId"
);
String resourceType = dataJson.getJSONObject(
"eventBody"
).getString(
"resourceType"
);
Integer duration = dataJson.getJSONObject(
"eventBody"
).getInteger(
"duration"
);
String actionCode = dataJson.getJSONObject(
"eventBody"
).getString(
"actionCode"
);
String userId = dataJson.getJSONObject(
"eventBody"
).getString(
"userId"
);
String appType = dataJson.getJSONObject(
"eventBody"
).getString(
"appType"
);
// System.out.println("________kafka msg: eventCode = " + eventCode + "eventBody = " + dataJson.getJSONObject("eventBody"));
/**
* 写入Redis
* redis存储结构: key = List(5),是一个定长为5,右进左出的队列
* 首先查询该key的list长度,如果长度超过5,就先左边出队列一个,再右边进一个,否则右边进一个
*/
String key = redisKeyPrefix + userId;
// String key = "immersive_streaming_wyp0001";
// 定义Redis队列写入的结构
JSONObject redisListItem =
new
JSONObject();
redisListItem.put(
"resourceId"
,resourceId);
redisListItem.put(
"resourceType"
,resourceType);
redisListItem.put(
"duration"
,duration);
redisListItem.put(
"actionCode"
,actionCode);
redisListItem.put(
"appType"
,appType);
String redisListItemString = redisListItem.toJSONString();
if
(redisTemplate.opsForList().size(key) >=
100
){
Object leftPop = redisTemplate.opsForList().leftPop(key);
redisTemplate.opsForList().rightPush(key, redisListItemString);
System.out.println(
"[pop]redis key : "
+ redisKeyPrefix + userId +
" now contains: "
+ redisTemplate.opsForList().range(key,
0
, -
1
));
}
else
{
if
(!resourceId.isEmpty() && !resourceType.isEmpty()){
redisTemplate.opsForList().rightPush(key, redisListItemString);
Long size = redisTemplate.opsForList().size(key);
System.out.println(
"redis key : "
+ redisKeyPrefix + userId +
" pushed one: "
+ size + redisListItemString);
System.out.println(
"redis key : "
+ redisKeyPrefix + userId +
" now contains: "
+ redisTemplate.opsForList().range(key,
0
, -
1
));
}
}
}
}
/**
* 解析json,解码功能
*/
public
JSONObject parseJson(String message) {
JSONObject messageJson = JSONObject.parseObject(message);
String dataString = messageJson.getString(
"data"
);
// --------------------base64解码字符串--------------------
String data_string =
""
;
final
Base64.Decoder decoder = Base64.getDecoder();
try
{
data_string =
new
String(decoder.decode(dataString),
"UTF-8"
);
}
catch
(Exception e){
System.out.println(
"【kafka parseJson ERROR】com.dzj.kafka_streaming.listener.MessageListener.parseJson"
+ e);
}
// string转换为json,只取eventCode = '145001'沉浸式的
JSONObject dataJson = JSONObject.parseObject(data_string);
return
dataJson;
}
/**
* 从数据库查询
* @param resourceId
* @param resourceType
* @return
*/
public
List<TagNameTypeInfo> queryByIdAndType(String resourceId, String resourceType ){
List<TagNameTypeInfo> tagNameTypeInfos =
new
ArrayList<>();
try
{
tagNameTypeInfos = relationService.queryTagNameTypeInfo(Long.valueOf(resourceId), resourceType);
}
catch
(Exception e){
System.out.println(
"【ERROR】"
+ resourceId +
"&"
+ resourceType +
"在数据库中查询不到......."
);
}
return
tagNameTypeInfos;
}
}