本文环境如下:
操作系统:CentOS 6 32位
JDK版本:1.8.0_77 32位
Kafka版本:0.9.0.1(Scala 2.11)
1. maven依赖包
1
2
3
4
5
|
< dependency >
< groupId >org.apache.kafka</ groupId >
< artifactId >kafka-clients</ artifactId >
< version >0.9.0.1</ version >
</ dependency >
|
2. 生产者代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
package com.lnho.example.kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class KafkaProducerExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put( "bootstrap.servers" , "master:9092" );
props.put( "acks" , "all" );
props.put( "retries" , 0 );
props.put( "batch.size" , 16384 );
props.put( "linger.ms" , 1 );
props.put( "buffer.memory" , 33554432 );
props.put( "key.serializer" , "org.apache.kafka.common.serialization.StringSerializer" );
props.put( "value.serializer" , "org.apache.kafka.common.serialization.StringSerializer" );
Producer<String, String> producer = new KafkaProducer<>(props);
for ( int i = 0 ; i < 100 ; i++)
producer.send( new ProducerRecord<>( "topic1" , Integer.toString(i), Integer.toString(i)));
producer.close();
}
}
|
3. 消费者代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
package com.lnho.example.kafka;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class KafkaConsumerExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put( "bootstrap.servers" , "master:9092" );
props.put( "group.id" , "test" );
props.put( "enable.auto.commit" , "true" );
props.put( "auto.commit.interval.ms" , "1000" );
props.put( "session.timeout.ms" , "30000" );
props.put( "key.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer" );
props.put( "value.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer" );
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList( "topic1" ));
while ( true ) {
ConsumerRecords<String, String> records = consumer.poll( 100 );
for (ConsumerRecord<String, String> record : records)
System.out.printf( "offset = %d, key = %s, value = %s\n" , record.offset(), record.key(), record.value());
}
}
}
|
4. 执行程序
lib底下需要有:kafka-clients-0.9.0.1.jar log4j-1.2.17.jar slf4j-api-1.7.6.jar slf4j-log4j12-1.7.6.jar
生产者:
复制代码 代码如下:
java -classpath kafka-example-1.0-SNAPSHOT.jar:lib/* com.lnho.example.kafka.KafkaProducerExample
消费者:
复制代码 代码如下:
java -classpath kafka-example-1.0-SNAPSHOT.jar:lib/* com.lnho.example.kafka.KafkaConsumerExample
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。
原文链接:http://blog.csdn.net/lnho2015/article/details/51353936