application-test.properties
1 #kafka 2 kafka.consumer.zookeeper.connect=*:2181 3 kafka.consumer.servers=*:9092 4 kafka.consumer.enable.auto.commit=true 5 kafka.consumer.session.timeout=6000 6 kafka.consumer.auto.commit.interval=1000 7 #保证每个组一个消费者消费同一条消息,若设置为earliest,那么会从头开始读partition(none) 8 kafka.consumer.auto.offset.reset=latest 9 kafka.consumer.concurrency=10 10 11 kafka.producer.servers=*:9092 12 kafka.producer.retries=0 13 kafka.producer.batch.size=4096 14 #//往kafka服务器提交消息间隔时间,0则立即提交不等待 15 kafka.producer.linger=1 16 kafka.producer.buffer.memory=40960
启动类
@SpringBootApplication @EnableScheduling public class Application { @Autowired private KafkaSender kafkaSender; public static void main(String[] args) { SpringApplication.run(Application .class, args); } //然后每隔1分钟执行一次 @Scheduled(fixedRate = 1000 * 60) public void testKafka() throws Exception { kafkaSender.sendTest(); } }
生产者:
1 @Component 2 public class KafkaSender { 3 4 @Resource 5 KafkaConsumerPool consumerPool; 6 7 /** 8 * 这里需要放到程序启动完成之后执行 TODO 9 */ 10 @PostConstruct 11 void d(){ 12 13 ConsumerGroup consumerThread = new ConsumerGroup("gropu-1","access_data",consumerConfig); 14 ConsumerGroup consumerThread2 = new ConsumerGroup("gropu-2","access_data", consumerConfig); 15 16 /** 17 * 各起两个消费者 ,Kafka consumer是非线程安全的 Consumer 需要一个new 的 18 */ 19 consumerPool.SubmitConsumerPool(new Consumer(consumerThread)); 20 consumerPool.SubmitConsumerPool(new Consumer(consumerThread)); 21 22 consumerPool.SubmitConsumerPool(new Consumer(consumerThread2)); 23 consumerPool.SubmitConsumerPool(new Consumer(consumerThread2)); 24 } 25 26 27 @Resource 28 KafkaConsumerConfig consumerConfig; 29 30 @Autowired 31 private KafkaTemplate kafkaTemplate; 32 33 @Autowired 34 private KafkaTopics kafkaTopics; 35 36 /** 37 * 发送消息到kafka 38 * 39 */ 40 public void sendTest() throws InterruptedException, IOException, KeeperException { 41 42 /** 43 * topic='access_data' 44 */ 45 kafkaTemplate.send("access_data",""+ System.currentTimeMillis()); 46 kafkaTemplate.send("access_data",""+System.currentTimeMillis()); 47 kafkaTemplate.send("access_data",""+System.currentTimeMillis()); 48 kafkaTemplate.send("access_data",""+System.currentTimeMillis()); 49 kafkaTemplate.send("access_data",""+System.currentTimeMillis()); 50 kafkaTemplate.send("access_data",""+System.currentTimeMillis()); 51 } 52 53 54 }
KafkaProducerConfig
@Configuration @EnableKafka public class KafkaProducerConfig { @Value("${kafka.producer.servers}") private String servers; @Value("${kafka.producer.retries}") private int retries; @Value("${kafka.producer.batch.size}") private int batchSize; @Value("${kafka.producer.linger}") private int linger; @Value("${kafka.producer.buffer.memory}") private int bufferMemory; public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); props.put(ProducerConfig.RETRIES_CONFIG, retries); props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); props.put(ProducerConfig.LINGER_MS_CONFIG, linger); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } public ProducerFactory<String, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<String, String>(producerFactory()); } }
KafkaConsumerConfig
1 @Configuration 2 @EnableKafka 3 public class KafkaConsumerConfig { 4 5 @Value("${kafka.consumer.zookeeper.connect}") 6 public String zookeeperConnect; 7 @Value("${kafka.consumer.servers}") 8 public String servers; 9 @Value("${kafka.consumer.enable.auto.commit}") 10 public boolean enableAutoCommit; 11 @Value("${kafka.consumer.session.timeout}") 12 public String sessionTimeout; 13 @Value("${kafka.consumer.auto.commit.interval}") 14 public String autoCommitInterval; 15 @Value("${kafka.consumer.auto.offset.reset}") 16 public String autoOffsetReset; 17 @Value("${kafka.consumer.concurrency}") 18 public int concurrency; 19 20 21 public String getZookeeperConnect() { 22 return zookeeperConnect; 23 } 24 25 public void setZookeeperConnect(String zookeeperConnect) { 26 this.zookeeperConnect = zookeeperConnect; 27 } 28 29 public String getServers() { 30 return servers; 31 } 32 33 public void setServers(String servers) { 34 this.servers = servers; 35 } 36 37 public boolean isEnableAutoCommit() { 38 return enableAutoCommit; 39 } 40 41 public void setEnableAutoCommit(boolean enableAutoCommit) { 42 this.enableAutoCommit = enableAutoCommit; 43 } 44 45 public String getSessionTimeout() { 46 return sessionTimeout; 47 } 48 49 public void setSessionTimeout(String sessionTimeout) { 50 this.sessionTimeout = sessionTimeout; 51 } 52 53 public String getAutoCommitInterval() { 54 return autoCommitInterval; 55 } 56 57 public void setAutoCommitInterval(String autoCommitInterval) { 58 this.autoCommitInterval = autoCommitInterval; 59 } 60 61 public String getAutoOffsetReset() { 62 return autoOffsetReset; 63 } 64 65 public void setAutoOffsetReset(String autoOffsetReset) { 66 this.autoOffsetReset = autoOffsetReset; 67 } 68 69 public int getConcurrency() { 70 return concurrency; 71 } 72 73 public void setConcurrency(int concurrency) { 74 this.concurrency = concurrency; 75 } 76 }
Consumer
/** * 实际消费者,继承了ShutdownableThread ,要多加几个消费者直接继承实现即可 * * @create 2017-11-06 12:42 * @update 2017-11-06 12:42 **/ public class Consumer extends ShutdownableThread { /** * kafka 消费者 */ private KafkaConsumer consumer; /** * topic */ private String topic; /** * 组id */ private String groupId; public Consumer(ConsumerGroup consumerGroup) { super("",false); this.consumer = consumerGroup.getConsumer(); this.topic = consumerGroup.getTopic(); this.groupId = consumerGroup.getA_groupId(); } /** * * 监听主题,有消息就读取 * 从kafka里面得到数据后,具体怎么去处理. 如果需要开启kafka处理消息的广播模式,多个监听要监听不同的group, * 即方法上的注解@KafkaListener里的group一定要不一样.如果多个监听里的group写的一样,就会造成只有一个监听能处理其中的消息, * 另外监听就不能处理消息了.也即是kafka的分布式消息处理方式. * 在同一个group里的监听,共同处理接收到的消息,会根据一定的算法来处理.如果不在一个组,但是监听的是同一个topic的话,就会形成广播模式 */ @Override public void doWork() { consumer.subscribe(Collections.singletonList(this.topic)); ConsumerRecords<Integer, String> records = consumer.poll(1000); for (ConsumerRecord<Integer, String> record : records) { System.out.println("Thread: "+Thread.currentThread().getName() +"Received message: (" + this.groupId + ", " + record.value() + ") at offset " + record.offset()+" partition : "+records.partitions()); } } }
ConsumerGroup 设置消费组
1 public class ConsumerGroup { 2 3 /** 4 * 日志处理 5 */ 6 private static final Log log = LogFactory.getLog(ConsumerGroup.class); 7 8 /** 9 * topic 10 */ 11 private final String topic; 12 13 /** 14 * 公共连接属性 15 */ 16 private Properties props ; 17 18 /** 19 * 消费者组 20 */ 21 private final String groupId; 22 23 24 public ConsumerGroup(String groupId, String topic, KafkaConsumerConfig consumerConfig) { 25 createConsumerConfig(groupId,consumerConfig); 26 this.topic = topic; 27 this.groupId = groupId; 28 } 29 30 31 private Properties createConsumerConfig(String groupId, KafkaConsumerConfig consumerConfig) { 32 props = new Properties(); 33 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,consumerConfig.servers); 34 props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); 35 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, consumerConfig.enableAutoCommit); 36 props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, consumerConfig.autoCommitInterval); 37 props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, consumerConfig.sessionTimeout); 38 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer"); 39 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); 40 props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, consumerConfig.autoOffsetReset); 41 // 其他配置再配置 42 return props; 43 } 44 45 public KafkaConsumer getConsumer() { 46 return new KafkaConsumer(props); 47 } 48 49 /** 50 * 其他类获取topic 51 * @return 52 */ 53 public String getTopic() { 54 return topic; 55 } 56 57 public String getA_groupId() { 58 return groupId; 59 } 60 }
1 @Component 2 public class KafkaConsumerPool { 3 4 /** 5 * 日志处理 6 */ 7 private static final Log log = LogFactory.getLog(KafkaConsumerPool.class); 8 9 /** 10 * 线程池 11 */ 12 private ExecutorService executor; 13 14 /** 15 * 初始化10个线程 16 */ 17 @PostConstruct 18 void init(){ 19 executor = Executors.newFixedThreadPool(10); 20 } 21 22 /** 23 * 提交新的消费者 24 * 25 * @param shutdownableThread 26 */ 27 public void SubmitConsumerPool(ShutdownableThread shutdownableThread) { 28 executor.execute(shutdownableThread); 29 } 30 31 /** 32 * 程序关闭,关闭线程池 33 * 34 */ 35 @PreDestroy 36 void fin(){ 37 shutdown(); 38 } 39 40 public void shutdown() { 41 if (executor != null) executor.shutdown(); 42 try { 43 if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) { 44 log.info("Timed out waiting for consumer threads to shut down, exiting uncleanly"); 45 } 46 } catch (InterruptedException e) { 47 log.info("Interrupted during shutdown, exiting uncleanly"); 48 } 49 } 50 }