20221124 kafka实时数据写入Redis

时间:2024-03-29 11:51:55

package com.dzj.kafka_streaming.listener;

import com.dzj.kafka_streaming.dto.TagNameTypeInfo;

import com.dzj.kafka_streaming.service.ContentTagRelationService;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.data.redis.core.RedisTemplate;

import org.springframework.kafka.annotation.KafkaListener;

import org.springframework.stereotype.Component;

import com.alibaba.fastjson.JSONObject;

import javax.annotation.Resource;

import java.util.ArrayList;

import java.util.Base64;

import java.util.List;

/**

 * "immersive_streaming_" + userId; 这是旧的key,需要清除

 */

@Component

public class MessageListener {

    @Autowired

    private ContentTagRelationService relationService;

    @Resource

    private RedisTemplate<String, Object> redisTemplate;

    private final String TOPIC_NAME = "event-trace-log";

    // @KafkaListener(topics = {TOPIC_NAME},groupId = "itmentuGroup")

    @KafkaListener(topics = {TOPIC_NAME})

    public void listener(ConsumerRecord<String,String> record)  {

        //获取消息

        String message = record.value();

        //消息偏移量

        long offset = record.offset();

        String redisKeyPrefix = "kafka:user_short_video_streaming:_";

        JSONObject dataJson = parseJson(message);

        String eventCode = dataJson.getString("eventCode");

        if ("145001".equals(eventCode)){

            // 测试环境------------------------------------------------------------------------------------------

            // 目前只关注沉浸式中得数据

            String resourceId = dataJson.getJSONObject("eventBody").getString("resourceId");

            String resourceType = dataJson.getJSONObject("eventBody").getString("resourceType");

            Integer duration = dataJson.getJSONObject("eventBody").getInteger("duration");

            String actionCode = dataJson.getJSONObject("eventBody").getString("actionCode");

            String userId = dataJson.getJSONObject("eventBody").getString("userId");

            String appType = dataJson.getJSONObject("eventBody").getString("appType");

            // System.out.println("________kafka msg: eventCode = " + eventCode + "eventBody = " + dataJson.getJSONObject("eventBody"));

            /**

             * 写入Redis

             * redis存储结构: key = List(5),是一个定长为5,右进左出的队列

             * 首先查询该key的list长度,如果长度超过5,就先左边出队列一个,再右边进一个,否则右边进一个

             */

            String key = redisKeyPrefix + userId;

    //        String key = "immersive_streaming_wyp0001";

            // 定义Redis队列写入的结构

            JSONObject redisListItem = new JSONObject();

            redisListItem.put("resourceId",resourceId);

            redisListItem.put("resourceType",resourceType);

            redisListItem.put("duration",duration);

            redisListItem.put("actionCode",actionCode);

            redisListItem.put("appType",appType);

            String redisListItemString = redisListItem.toJSONString();

            if (redisTemplate.opsForList().size(key) >= 100){

                Object leftPop = redisTemplate.opsForList().leftPop(key);

                redisTemplate.opsForList().rightPush(key, redisListItemString);

                System.out.println("[pop]redis key : "+ redisKeyPrefix + userId + " now contains:  "+ redisTemplate.opsForList().range(key,0, -1));

            }else {

                if (!resourceId.isEmpty() && !resourceType.isEmpty()){

                    redisTemplate.opsForList().rightPush(key, redisListItemString);

                    Long size = redisTemplate.opsForList().size(key);

                    System.out.println("redis key : "+ redisKeyPrefix + userId + " pushed one:  "+ size + redisListItemString);

                    System.out.println("redis key : "+ redisKeyPrefix + userId + " now contains:  "+ redisTemplate.opsForList().range(key,0, -1));

                }

            }

        }

    }

     

    /**

     * 解析json,解码功能

     */

    public JSONObject parseJson(String message) {

        JSONObject messageJson = JSONObject.parseObject(message);

        String dataString = messageJson.getString("data");

        // --------------------base64解码字符串--------------------

        String data_string = "";

        final Base64.Decoder decoder = Base64.getDecoder();

        try{

            data_string = new String(decoder.decode(dataString), "UTF-8");

        }catch (Exception e){

            System.out.println("【kafka parseJson ERROR】com.dzj.kafka_streaming.listener.MessageListener.parseJson" + e);

        }

        // string转换为json,只取eventCode = '145001'沉浸式的

        JSONObject dataJson = JSONObject.parseObject(data_string);

        return dataJson;

    }

    /**

     * 从数据库查询

     * @param resourceId

     * @param resourceType

     * @return

     */

    public List<TagNameTypeInfo>  queryByIdAndType(String resourceId, String resourceType ){

        List<TagNameTypeInfo> tagNameTypeInfos = new ArrayList<>();

        try {

            tagNameTypeInfos = relationService.queryTagNameTypeInfo(Long.valueOf(resourceId), resourceType);

        catch (Exception e){

            System.out.println("【ERROR】" + resourceId + "&" + resourceType + "在数据库中查询不到.......");

        }

        return tagNameTypeInfos;

    }

}