(一)安装zookeeper(windows)
kafka需要用到zookeeper,所以需要先安装zookeeper
1.到官网下载最新版zookeeper,http://www.apache.org/dyn/closer.cgi/zookeeper/
2.解压到你喜欢的路径,我这里为:E:\zookeeper\zookeeper-3.4.10
3.复制conf目录下zoo_sample.cfg,粘贴改名为zoo.cfg,修改zoo.cfg中的dataDir的值为E:/data/zookeeper,并添加一行dataLogDir=E:/log/zookeeper
4.修改系统环境变量,在Path后添加 ;E:\zookeeper\zookeeper-3.4.10\bin
5.运行cmd命令窗口,输入zkServer回车,出现下图的就表示zookeeper启动成功,也表明安装成功了。
安装zookeeper(Linux)
1. Xshell等工具连接Linux服务器,切换到任意目录,下载zookeeper最新稳定版,下载地址http://mirrors.hust.edu.cn/apache/zookeeper/stable/,命令如下
cd /usr/soft
wget http://mirrors.hust.edu.cn/apache/zookeeper/stable/zookeeper-3.4.10.tar.gz
2.解压
tar -xzvf zookeeper-3.4.10.tar.gz
3.切换到conf配置文件目录,复制zoo_sample.cfg为zoo.cfg可以按需修改配置文件内容
4.切换到bin目录,启动zookeeper,看到Starting zookeeper ... STARTED字样表示启动成功了
./zkServer.sh start
(二)安装kafka(windows)
1. 到官网下载最新版kafka,http://kafka.apache.org/downloads
2.解压到你喜欢的路径,我这里解压路径为:E:\kafka_2.12-0.10.2.0
3.修改E:\kafka_2.12-0.10.2.0\config目录下的server.properties中 log.dirs的值为E:/log/kafka
4.添加系统环境变量,在Path后添加 ;E:\kafka_2.12-0.10.2.0\bin\windows
5.启动kafka,在cmd命令行用cd命令切换到kafka根目录E:\kafka_2.12-0.10.2.0,输入命令
.\bin\windows\kafka-server-start.bat .\config\server.properties
出现started (kafka.server.KafkaServer)字样表示启动成功
6.运行cmd命令行,创建一个topic
启动时若出现“wvim不是内部或外部命令...”错误提示,则需要在系统Path环境变量后添加
;C:\Windows\System32\wbem
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
7.再打开一个cmd,创建一个Producer
kafka-console-producer.bat --broker-list localhost:9092 --topic test
8.再打开一个cmd,创建一个Customer
kafka-console-consumer.bat --zookeeper localhost:2181 --topic test
9.在Producer窗口下输入信息进行测试 ,每输入一行回车后消息马上就会出现在Customer中,表明kafka已经安装测试成功
安装kafka(Linux)
1.下载kafka最新版https://www.apache.org/dyn/closer.cgi?path=/kafka/0.10.2.0/kafka_2.12-0.10.2.0.tgz
wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/0.10.2.0/kafka_2.12-0.10.2.0.tgz
2.解压,文件夹重命名
tar -xzvf kafka_2.12-0.10.2.0.tgz
mv kafka_2.12-0.10.2.0 kafka
3.切换目录到kafka目录下的bin目录,用vi命令修改kafka-server-start.sh中jvm内存大小,把
export KAFKA_HEAP_OPTS="-Xms1G -Xms1G" 修改为
export KAFKA_HEAP_OPTS="-Xms256M -Xms128M",当然如果你的内存够大可以不修改
4.切换到kafka根目录,启动kafka,启动成功如下图
bin/kafka-server-start.sh config/server.properties
5.创建topic
bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic test
创建一个名为test的topic,只有一个副本,一个分区。
通过list命令查看刚刚创建的topic
bin/kafka-topics.sh -list -zookeeper 127.0.0.1:2181
6.启动producer并发送消息启动producer
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
启动之后就可以发送消息了
按Ctrl+C退出发送消息
7.启动consumer
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
启动consumer之后就可以在console中看到producer发送的消息了
可以开启两个终端,一个发送消息,一个接受消息。
(三)kafka编程之Java接口
1.新建Maven工程,我这里用的是Eclipse;pom加入kafka依赖,如下:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.2.0</version>
</dependency>
2.新建生产测试类TestProducer.java
import java.util.Properties;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class TestProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
//The "all" setting we have specified will result in blocking on the full commit of the record, the slowest but most durable setting.
//“所有”设置将导致记录的完整提交阻塞,最慢的,但最持久的设置。
props.put("acks", "all");
//如果请求失败,生产者也会自动重试,即使设置成0 the producer can automatically retry.
props.put("retries", 0);
//The producer maintains buffers of unsent records for each partition.
props.put("batch.size", 16384);
//默认立即发送,这里这是延时毫秒数
props.put("linger.ms", 1);
//生产者缓冲大小,当缓冲区耗尽后,额外的发送调用将被阻塞。时间超过max.block.ms将抛出TimeoutException
props.put("buffer.memory", 33554432);
//The key.serializer and value.serializer instruct how to turn the key and value objects the user provides with their ProducerRecord into bytes.
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//创建kafka的生产者类
Producer<String, String> producer = new KafkaProducer<String, String>(props);
//生产者的主要方法
// close();//Close this producer.
// close(long timeout, TimeUnit timeUnit); //This method waits up to timeout for the producer to complete the sending of all incomplete requests.
// flush() ;所有缓存记录被立刻发送
for(int i = 0; i < 100; i++)
producer.send(new ProducerRecord<String, String>("test",0, Integer.toString(i), Integer.toString(i)));
producer.close();
}
}
3.新建消费测试类TestCustomer.java
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class TestConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
System.out.println("this is the group part test 1");
//消费者的组id
props.put("group.id", "GroupA");//这里是GroupA或者GroupB
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
//从poll(拉)的回话处理时长
props.put("session.timeout.ms", "30000");
//poll的数量限制
//props.put("max.poll.records", "100");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
//订阅主题列表topic
consumer.subscribe(Arrays.asList("test"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
// 正常这里应该使用线程池处理,不应该在这里处理
System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value()+"\n");
}
}
}
4.先运行(run/debug)TestCustomer, 再运行TestProducer,在TestCustomer的控制台看到下图的结果就表示消息发送并接收成功了
并且在之前启动的消费端的命令窗口也能看到接收到的数据:
dazu表示kakazhj