1. maven依赖包
1
2
3
4
5
|
<dependency>
<groupid>org.apache.kafka</groupid>
<artifactid>kafka-clients</artifactid>
<version> 0.9 . 0.1 </version>
</dependency>
|
2. 生产者代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
package com.lnho.example.kafka;
import org.apache.kafka.clients.producer.kafkaproducer;
import org.apache.kafka.clients.producer.producer;
import org.apache.kafka.clients.producer.producerrecord;
import java.util.properties;
public class kafkaproducerexample {
public static void main(string[] args) {
properties props = new properties();
props.put( "bootstrap.servers" , "master:9092" );
props.put( "acks" , "all" );
props.put( "retries" , 0 );
props.put( "batch.size" , 16384 );
props.put( "linger.ms" , 1 );
props.put( "buffer.memory" , 33554432 );
props.put( "key.serializer" , "org.apache.kafka.common.serialization.stringserializer" );
props.put( "value.serializer" , "org.apache.kafka.common.serialization.stringserializer" );
producer<string, string> producer = new kafkaproducer<>(props);
for ( int i = 0 ; i < 100 ; i++)
producer.send( new producerrecord<>( "topic1" , integer.tostring(i), integer.tostring(i)));
producer.close();
}
}
|
3. 消费者代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
package com.lnho.example.kafka;
import org.apache.kafka.clients.consumer.consumerrecord;
import org.apache.kafka.clients.consumer.consumerrecords;
import org.apache.kafka.clients.consumer.kafkaconsumer;
import java.util.arrays;
import java.util.properties;
public class kafkaconsumerexample {
public static void main(string[] args) {
properties props = new properties();
props.put( "bootstrap.servers" , "master:9092" );
props.put( "group.id" , "test" );
props.put( "enable.auto.commit" , "true" );
props.put( "auto.commit.interval.ms" , "1000" );
props.put( "session.timeout.ms" , "30000" );
props.put( "key.deserializer" , "org.apache.kafka.common.serialization.stringdeserializer" );
props.put( "value.deserializer" , "org.apache.kafka.common.serialization.stringdeserializer" );
kafkaconsumer<string, string> consumer = new kafkaconsumer<>(props);
consumer.subscribe(arrays.aslist( "topic1" ));
while ( true ) {
consumerrecords<string, string> records = consumer.poll( 100 );
for (consumerrecord<string, string> record : records)
system.out.printf( "offset = %d, key = %s, value = %s\n" , record.offset(), record.key(), record.value());
}
}
}
|
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。
原文链接:https://blog.csdn.net/u012129558/article/details/80065817