Java kafka 设置从指定时间戳开始消费

时间:2024-03-06 22:01:07

包含的功能:

1.Java kafka 设置从指定时间戳开始消费

2.JdbcTemplate操作MySql

3.Java多线程消费kafka

4.Java获取kafka所有topic

 

pom.xml文件,引入kafka、mysql、jdbc:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.6.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <groupId>com.suncreate</groupId>
    <artifactId>kafka-collector</artifactId>
    <version>1.0</version>
    <name>kafka-collector</name>
    <description>华为HD平台升级测试-kafka测试数据源采集</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <!-- jdbc -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jdbc</artifactId>
        </dependency>

        <!-- mysql -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.25</version>
        </dependency>

        <!-- kafka -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

    </dependencies>

    <repositories>
        <repository>
            <id>alimaven</id>
            <name>aliyun maven</name>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </repository>
    </repositories>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>
View Code

application.properties文件,kafka集群和mysql配置:

#kafka集群IP:Port端口
kafka.consumer.servers=x.x.x.115:21005,x.x.x.109:21005,x.x.x.116:21005,x.x.x.111:21005,x.x.x.110:21005,x.x.x.112:21005,x.x.x.113:21005,x.x.x.114:21005,x.x.x.117:21005,x.x.x.118:21005,x.x.x.119:21005,x.x.x.120:21005,x.x.x.121:21005
#消费者组
kafka.consumer.group.id=kafka-collector
kafka.consumer.auto.offset.reset=latest
#mysql
spring.datasource.mysql.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.mysql.jdbcurl=jdbc:mysql://localhost:3306/kafka-data?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&useSSL=false&allowPublicKeyRetrieval=true
spring.datasource.mysql.username=root
spring.datasource.mysql.password=123456
View Code

MySqlConfig:

package com.suncreat.kafkacollector.config;

import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.jdbc.DataSourceBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;

import javax.sql.DataSource;

@Configuration
public class MySqlConfig {

    @Bean(name = "MySqlDataSource")
    @Qualifier("MySqlDataSource")
    @ConfigurationProperties(prefix = "spring.datasource.mysql")
    public DataSource MySqlDataSource() {
        return DataSourceBuilder.create().build();
    }

    @Bean(name = "mysqlJdbcTemplate")
    public JdbcTemplate mysqlJdbcTemplate(@Qualifier("MySqlDataSource") DataSource dataSource) {
        return new JdbcTemplate(dataSource);
    }

}
View Code

SaveOffsetOnRebalanced,kafka设置从指定时间戳开始消费:

package com.suncreat.kafkacollector.kafka;

import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;

/**
 * kafka设置从指定时间戳开始消费
 */
public class SaveOffsetOnRebalanced implements ConsumerRebalanceListener {
    private static final Logger log = LoggerFactory.getLogger(SaveOffsetOnRebalanced.class);

    private KafkaConsumer<String, String> consumer;
    private String topic;
    private long fallbackMilliseconds;

    public SaveOffsetOnRebalanced(KafkaConsumer<String, String> consumer, String topic, long fallbackMilliseconds) {
        this.consumer = consumer;
        this.topic = topic;
        this.fallbackMilliseconds = fallbackMilliseconds;
    }

    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> collection) {

    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> collection) {
        try {
            long startTime = System.currentTimeMillis() - fallbackMilliseconds;

            Map<TopicPartition, Long> partitionLongMap = new HashMap<>();
            for (TopicPartition topicPartition : collection) {
                partitionLongMap.put(new TopicPartition(topic, topicPartition.partition()), startTime);
            }

            Map<TopicPartition, OffsetAndTimestamp> map = consumer.offsetsForTimes(partitionLongMap);
            for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : map.entrySet()) {
                TopicPartition partition = entry.getKey();
                OffsetAndTimestamp value = entry.getValue();
                long offset = value.offset();
                consumer.seek(partition, offset);
            }
        } catch (Exception e) {
            log.error("kafka设置从指定时间戳开始消费 SaveOffsetOnRebalanced 出错", e);
        }
    }
}
View Code

SetOffset,kafka设置从指定时间戳开始消费:

package com.suncreat.kafkacollector.kafka;

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Collections;
import java.util.Date;

/**
 * kafka设置从指定时间戳开始消费
 */
public class SetOffset {
    private static final Logger log = LoggerFactory.getLogger(SetOffset.class);

    private KafkaConsumer<String, String> consumer;
    private String topic;
    private String startTime;

    public SetOffset(KafkaConsumer<String, String> consumer, String topic, String startTime) {
        this.consumer = consumer;
        this.topic = topic;
        this.startTime = startTime;
    }

    public void submitOffset() {
        try {
            SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMddHHmmss");
            Date date = simpleDateFormat.parse(startTime + "000000");
            long fallbackMilliseconds = System.currentTimeMillis() - date.getTime();

            SaveOffsetOnRebalanced saveOffsetOnRebalanced = new SaveOffsetOnRebalanced(consumer, topic, fallbackMilliseconds);
            consumer.subscribe(Collections.singleton(topic), saveOffsetOnRebalanced);
            consumer.poll(Duration.ofMillis(0));
        } catch (Exception e) {
            log.error("kafka设置从指定时间戳开始消费 SetOffset 出错", e);
        }
    }
}
View Code

ConsumerThread,kafka消费线程,消费kafka消息数据,并写入mysql:

package com.suncreat.kafkacollector.kafka;

import com.suncreat.kafkacollector.utils.CalcSpeed;
import org.apache.kafka.clients.consumer.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jdbc.core.JdbcTemplate;

import java.sql.PreparedStatement;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.Semaphore;

/**
 * kafka消费线程
 */
public class ConsumerThread extends Thread {
    private static final Logger log = LoggerFactory.getLogger(ConsumerThread.class);

    private KafkaConsumer<String, String> consumer;
    private String topic;
    private JdbcTemplate jdbcTemplate;

    public ConsumerThread(KafkaConsumer<String, String> consumer, String topic, JdbcTemplate jdbcTemplate) {
        this.consumer = consumer;
        this.topic = topic;
        this.jdbcTemplate = jdbcTemplate;
    }

    public void run() {
        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

                if (records.count() > 0) {
                    for (ConsumerRecord<String, String> record : records) {
                        int pos = record.value().indexOf("pass_time");
                        String passTime = record.value().substring(pos + 12, pos + 26);

                        insertMySql(topic, record.value());

                        if (CalcSpeed.getCount() % 10000 == 0) {
                            System.out.println(passTime);
                        }

                        CalcSpeed.addCount();
                    }

                    consumer.commitAsync();
                }

                Thread.sleep(1);
            }
        } catch (Exception e) {
            log.error("ConsumerThread 出错", e);
        }
    }

    private void insertMySql(String topic, String value) {
        try {
            jdbcTemplate.update("insert into KafkaData(topic, content) values(?, ?)", topic, value);
        } catch (Exception e) {
            log.error("insertMySql 出错", e);
        }
    }
}
View Code

KafkaCollectService,kafka消费服务,多线程消费kafka消息数据:

package com.suncreat.kafkacollector.service;

import com.suncreat.kafkacollector.kafka.ConsumerThread;
import com.suncreat.kafkacollector.kafka.SetOffset;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.util.*;
import java.util.concurrent.ExecutionException;

@Service
public class KafkaCollectService {
    private static final Logger log = LoggerFactory.getLogger(KafkaCollectService.class);

    @Value("${kafka.consumer.servers}")
    private String servers;

    @Value("${kafka.consumer.group.id}")
    private String groupId;

    @Value("${kafka.consumer.auto.offset.reset}")
    private String autoOffsetReset;

    @Autowired
    @Qualifier("mysqlJdbcTemplate")
    private JdbcTemplate mysqlJdbcTemplate;

    @PostConstruct
    public void start() {
        try {
            //List<Map<String, Object>> maps = mysqlJdbcTemplate.queryForList("select * from KafkaData");

            List<String> topics = listTopics();
            topics.sort(Comparator.naturalOrder());
            topics.forEach(topic -> {
                System.out.println(topic);
            });

            subscribe("ST-FEATURE-LOG"); //人 topic
            subscribe("motorVehicle"); //车 topic
            subscribe("wifiData"); //WIFI topic
            subscribe("KK_PASS_INFO_TYCC"); //卡口 topic

        } catch (Exception e) {
            log.error("出错", e);
        }
    }

    public List<String> listTopics() throws ExecutionException, InterruptedException {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        ListTopicsResult result = KafkaAdminClient.create(properties).listTopics();
        KafkaFuture<Set<String>> set = result.names();

        List<String> topics = new ArrayList<>();
        for (String topic : set.get()) {
            topics.add(topic);
        }

        return topics;
    }

    public void subscribe(String topic) {
        for (int i = 0; i < 2; i++) {
            Properties properties = new Properties();
            properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
            properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
            properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

            KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
            SetOffset setOffset = new SetOffset(consumer, topic, "20210331");
            setOffset.submitOffset();

            ConsumerThread consumerThread = new ConsumerThread(consumer, topic, mysqlJdbcTemplate);
            consumerThread.start();
        }
    }

}
View Code

CalcSpeedConfig,计算数据处理速度:

package com.suncreat.kafkacollector.task;

import com.suncreat.kafkacollector.utils.CalcSpeed;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.SchedulingConfigurer;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
import org.springframework.scheduling.support.CronTrigger;

import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * 计算数据处理速度
 */
@Configuration
@EnableScheduling
public class CalcSpeedConfig implements SchedulingConfigurer {

    private final int timeRange = 2;

    private String calcQuarter = "0/" + timeRange + " * * * * ?";

    private final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    @Override
    public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
        taskRegistrar.addTriggerTask(() -> {

            long currentTime = System.currentTimeMillis();
            double speed = CalcSpeed.getCount() / (double) timeRange;
            System.out.println(simpleDateFormat.format(new Date(currentTime)) + " 处理速度:" + (int) speed + " 条/秒");

            CalcSpeed.resetCount();

        }, triggerContext -> new CronTrigger(calcQuarter).nextExecutionTime(triggerContext));
    }
}
View Code

CalcSpeed,用于记录数据处理速度:

package com.suncreat.kafkacollector.utils;

/**
 * 数据处理速度
 */
public class CalcSpeed {
    private static int count = 0;

    public static synchronized void addCount() {
        count++;
    }

    public static synchronized void resetCount() {
        count = 0;
    }

    public static synchronized int getCount() {
        return count;
    }
}
View Code