包含的功能:
1.Java kafka 设置从指定时间戳开始消费
2.JdbcTemplate操作MySql
3.Java多线程消费kafka
4.Java获取kafka所有topic
pom.xml文件,引入kafka、mysql、jdbc:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.6.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.suncreate</groupId> <artifactId>kafka-collector</artifactId> <version>1.0</version> <name>kafka-collector</name> <description>华为HD平台升级测试-kafka测试数据源采集</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!-- jdbc --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jdbc</artifactId> </dependency> <!-- mysql --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.25</version> </dependency> <!-- kafka --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> </dependencies> <repositories> <repository> <id>alimaven</id> <name>aliyun maven</name> <url>http://maven.aliyun.com/nexus/content/groups/public/</url> <snapshots> <enabled>false</enabled> </snapshots> </repository> </repositories> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build> </project>
application.properties文件,kafka集群和mysql配置:
#kafka集群IP:Port端口 kafka.consumer.servers=x.x.x.115:21005,x.x.x.109:21005,x.x.x.116:21005,x.x.x.111:21005,x.x.x.110:21005,x.x.x.112:21005,x.x.x.113:21005,x.x.x.114:21005,x.x.x.117:21005,x.x.x.118:21005,x.x.x.119:21005,x.x.x.120:21005,x.x.x.121:21005 #消费者组 kafka.consumer.group.id=kafka-collector kafka.consumer.auto.offset.reset=latest #mysql spring.datasource.mysql.driver-class-name=com.mysql.jdbc.Driver spring.datasource.mysql.jdbcurl=jdbc:mysql://localhost:3306/kafka-data?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&useSSL=false&allowPublicKeyRetrieval=true spring.datasource.mysql.username=root spring.datasource.mysql.password=123456
MySqlConfig:
package com.suncreat.kafkacollector.config; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.jdbc.DataSourceBuilder; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jdbc.core.JdbcTemplate; import javax.sql.DataSource; @Configuration public class MySqlConfig { @Bean(name = "MySqlDataSource") @Qualifier("MySqlDataSource") @ConfigurationProperties(prefix = "spring.datasource.mysql") public DataSource MySqlDataSource() { return DataSourceBuilder.create().build(); } @Bean(name = "mysqlJdbcTemplate") public JdbcTemplate mysqlJdbcTemplate(@Qualifier("MySqlDataSource") DataSource dataSource) { return new JdbcTemplate(dataSource); } }
SaveOffsetOnRebalanced,kafka设置从指定时间戳开始消费:
package com.suncreat.kafkacollector.kafka; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.common.TopicPartition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.*; /** * kafka设置从指定时间戳开始消费 */ public class SaveOffsetOnRebalanced implements ConsumerRebalanceListener { private static final Logger log = LoggerFactory.getLogger(SaveOffsetOnRebalanced.class); private KafkaConsumer<String, String> consumer; private String topic; private long fallbackMilliseconds; public SaveOffsetOnRebalanced(KafkaConsumer<String, String> consumer, String topic, long fallbackMilliseconds) { this.consumer = consumer; this.topic = topic; this.fallbackMilliseconds = fallbackMilliseconds; } @Override public void onPartitionsRevoked(Collection<TopicPartition> collection) { } @Override public void onPartitionsAssigned(Collection<TopicPartition> collection) { try { long startTime = System.currentTimeMillis() - fallbackMilliseconds; Map<TopicPartition, Long> partitionLongMap = new HashMap<>(); for (TopicPartition topicPartition : collection) { partitionLongMap.put(new TopicPartition(topic, topicPartition.partition()), startTime); } Map<TopicPartition, OffsetAndTimestamp> map = consumer.offsetsForTimes(partitionLongMap); for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : map.entrySet()) { TopicPartition partition = entry.getKey(); OffsetAndTimestamp value = entry.getValue(); long offset = value.offset(); consumer.seek(partition, offset); } } catch (Exception e) { log.error("kafka设置从指定时间戳开始消费 SaveOffsetOnRebalanced 出错", e); } } }
SetOffset,kafka设置从指定时间戳开始消费:
package com.suncreat.kafkacollector.kafka; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.text.SimpleDateFormat; import java.time.Duration; import java.util.Collections; import java.util.Date; /** * kafka设置从指定时间戳开始消费 */ public class SetOffset { private static final Logger log = LoggerFactory.getLogger(SetOffset.class); private KafkaConsumer<String, String> consumer; private String topic; private String startTime; public SetOffset(KafkaConsumer<String, String> consumer, String topic, String startTime) { this.consumer = consumer; this.topic = topic; this.startTime = startTime; } public void submitOffset() { try { SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMddHHmmss"); Date date = simpleDateFormat.parse(startTime + "000000"); long fallbackMilliseconds = System.currentTimeMillis() - date.getTime(); SaveOffsetOnRebalanced saveOffsetOnRebalanced = new SaveOffsetOnRebalanced(consumer, topic, fallbackMilliseconds); consumer.subscribe(Collections.singleton(topic), saveOffsetOnRebalanced); consumer.poll(Duration.ofMillis(0)); } catch (Exception e) { log.error("kafka设置从指定时间戳开始消费 SetOffset 出错", e); } } }
ConsumerThread,kafka消费线程,消费kafka消息数据,并写入mysql:
package com.suncreat.kafkacollector.kafka; import com.suncreat.kafkacollector.utils.CalcSpeed; import org.apache.kafka.clients.consumer.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.jdbc.core.JdbcTemplate; import java.sql.PreparedStatement; import java.time.Duration; import java.util.Map; import java.util.concurrent.Semaphore; /** * kafka消费线程 */ public class ConsumerThread extends Thread { private static final Logger log = LoggerFactory.getLogger(ConsumerThread.class); private KafkaConsumer<String, String> consumer; private String topic; private JdbcTemplate jdbcTemplate; public ConsumerThread(KafkaConsumer<String, String> consumer, String topic, JdbcTemplate jdbcTemplate) { this.consumer = consumer; this.topic = topic; this.jdbcTemplate = jdbcTemplate; } public void run() { try { while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); if (records.count() > 0) { for (ConsumerRecord<String, String> record : records) { int pos = record.value().indexOf("pass_time"); String passTime = record.value().substring(pos + 12, pos + 26); insertMySql(topic, record.value()); if (CalcSpeed.getCount() % 10000 == 0) { System.out.println(passTime); } CalcSpeed.addCount(); } consumer.commitAsync(); } Thread.sleep(1); } } catch (Exception e) { log.error("ConsumerThread 出错", e); } } private void insertMySql(String topic, String value) { try { jdbcTemplate.update("insert into KafkaData(topic, content) values(?, ?)", topic, value); } catch (Exception e) { log.error("insertMySql 出错", e); } } }
KafkaCollectService,kafka消费服务,多线程消费kafka消息数据:
package com.suncreat.kafkacollector.service; import com.suncreat.kafkacollector.kafka.ConsumerThread; import com.suncreat.kafkacollector.kafka.SetOffset; import org.apache.kafka.clients.admin.KafkaAdminClient; import org.apache.kafka.clients.admin.ListTopicsResult; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.serialization.StringDeserializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import java.util.*; import java.util.concurrent.ExecutionException; @Service public class KafkaCollectService { private static final Logger log = LoggerFactory.getLogger(KafkaCollectService.class); @Value("${kafka.consumer.servers}") private String servers; @Value("${kafka.consumer.group.id}") private String groupId; @Value("${kafka.consumer.auto.offset.reset}") private String autoOffsetReset; @Autowired @Qualifier("mysqlJdbcTemplate") private JdbcTemplate mysqlJdbcTemplate; @PostConstruct public void start() { try { //List<Map<String, Object>> maps = mysqlJdbcTemplate.queryForList("select * from KafkaData"); List<String> topics = listTopics(); topics.sort(Comparator.naturalOrder()); topics.forEach(topic -> { System.out.println(topic); }); subscribe("ST-FEATURE-LOG"); //人 topic subscribe("motorVehicle"); //车 topic subscribe("wifiData"); //WIFI topic subscribe("KK_PASS_INFO_TYCC"); //卡口 topic } catch (Exception e) { log.error("出错", e); } } public List<String> listTopics() throws ExecutionException, InterruptedException { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); ListTopicsResult result = KafkaAdminClient.create(properties).listTopics(); KafkaFuture<Set<String>> set = result.names(); List<String> topics = new ArrayList<>(); for (String topic : set.get()) { topics.add(topic); } return topics; } public void subscribe(String topic) { for (int i = 0; i < 2; i++) { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties); SetOffset setOffset = new SetOffset(consumer, topic, "20210331"); setOffset.submitOffset(); ConsumerThread consumerThread = new ConsumerThread(consumer, topic, mysqlJdbcTemplate); consumerThread.start(); } } }
CalcSpeedConfig,计算数据处理速度:
package com.suncreat.kafkacollector.task; import com.suncreat.kafkacollector.utils.CalcSpeed; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.SchedulingConfigurer; import org.springframework.scheduling.config.ScheduledTaskRegistrar; import org.springframework.scheduling.support.CronTrigger; import java.text.SimpleDateFormat; import java.util.Date; /** * 计算数据处理速度 */ @Configuration @EnableScheduling public class CalcSpeedConfig implements SchedulingConfigurer { private final int timeRange = 2; private String calcQuarter = "0/" + timeRange + " * * * * ?"; private final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); @Override public void configureTasks(ScheduledTaskRegistrar taskRegistrar) { taskRegistrar.addTriggerTask(() -> { long currentTime = System.currentTimeMillis(); double speed = CalcSpeed.getCount() / (double) timeRange; System.out.println(simpleDateFormat.format(new Date(currentTime)) + " 处理速度:" + (int) speed + " 条/秒"); CalcSpeed.resetCount(); }, triggerContext -> new CronTrigger(calcQuarter).nextExecutionTime(triggerContext)); } }
CalcSpeed,用于记录数据处理速度:
package com.suncreat.kafkacollector.utils; /** * 数据处理速度 */ public class CalcSpeed { private static int count = 0; public static synchronized void addCount() { count++; } public static synchronized void resetCount() { count = 0; } public static synchronized int getCount() { return count; } }