概述
Apache Kafka是一个分布式发布-订阅消息系统和强大的队列,可以处理大量的数据,将消息从一个端点传递到另一个端点。Kafka适合离线和在线消息消费,Kafka消息保存在磁盘上,并在集群内复制以防止数据丢失。Kafka构建在Zookeeper同步服务之上。它最初由LinkedIn公司开发,之后成为Apache项目的一部分。Kafka的核心功能总结起来就是高性能的消息发送和高性能的消息消费。现在我们首先跑通一个Kafka的简单示例,切身感受一下Kafka消息是什么样子的
Kafka的几个好处:
1.可靠性:Kafka是分布式分区容错的
2.可扩展性:Kafka消息传递系统轻松缩放,无需停机。
3.耐用性:Kafka使用分布式提交日志。这意味着消息会尽可能快的保留在磁盘上,因此它是持久的。
4.性能:Kafka对于发布和订阅消息都具有高吞吐量,即使存储了大量的消息也能保持稳定的性能,Kafka非常快,并保证零停机和零数据丢失。
所以在大型分布式系统中都使用Kafka作为消息系统。
安装
Kafka官网的下载地址是 http://kafka.apache.org/downloads,在下载页面我们可以看到不同版本的Kafka的二进制代码压缩包的下载链接。最新的稳定版本是2.1.0(The current stable version is 2.1.0.)
官网提供两个二进制压缩包以供下载
[We build for multiple versions of Scala. This only matters if you are using Scala and you want a version built for the same Scala version you use. Otherwise any version should work (2.12 is recommended)].
上面两个文件中的2.11/2.12分别表示编译Kafka的Scala语言版本,后面的2.1.0是Kafka的版本。
下载完毕后,将解压后的目录放到你机器上的某个文件夹下即可!
$ wget http://mirrors.shu.edu.cn/apache/kafka/2.1.0/kafka_2.12-2.1.0.tgz $ tar -zxvf kafka_2.-.tgz $ cd kafka_2.- $ ll total drwxr-xr-x root root Nov : bin drwxr-xr-x root root Nov : config drwxr-xr-x root root Jan : libs -rw-r--r-- root root Nov : LICENSE -rw-r--r-- root root Nov : NOTICE drwxr-xr-x root root Nov : site-docs
(wget还挺好用,不用下载后再rz上传)
阿里云kafka配置
############ Server Basics ########## # The id of the broker. This must be set to a unique integer for each broker. broker.id= port= #阿里云内网地址 host.name= #阿里云外网映射地址 advertised.host.name=
启动服务器
启动 zookeeper
Kafka依赖于Zookeeper,Zookeeper是为Kafka提供协调服务的工具!Kafka内置提供了一个Zookeeper服务器以及一组相关的管理脚本,我们直接使用这个内置的Zookeeper即可。
$ bin/zookeeper-server-start.sh config/zookeeper.properties......INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)
最后的输出信息表明Zookeeper已经成功的在端口2181上启动了,此时不能关闭窗口,不然Zookeeper也会关闭,挺麻烦是不是!
我们可以使用后台启动的方式来启动Zookeeper!
$nohup bin/zookeeper-server-start.sh config/zookeeper.properties >> zk.out 2>&1 &
# 查看输出 $tail -f zk.out $cat zk.out
唯一不好的地方就在于会额外生成一个zk.out文件!
启动 kafka server
$ bin/kafka-server-start.sh config/server.properties ... INFO Awaiting socket connections on . (kafka.network.Acceptor) ... INFO [KafkaServer id=] started (kafka.server.KafkaServer)
最后的输出信息标志着 server 启动成功了,上面那条信息说明 server 的监听端口是 9092,同样,窗口也不能关,不然Kakfa服务器也关闭了。
同样,我们以后台启动的方式来启动Kakfa Server
$nohup bin/kafka-server-start.sh config/server.properties >> kafka.out 2>&1 & # 查看输出 $tail -f kafka.out $cat kafka.out
Caused by: java.net.UnknownHostException: iZuf68tztea6l5ccdz7wemZ: iZuf68tztea6l5ccdz7wemZ: Name or service not known
解决方案
vi /etc/hosts 127.0.0.1 izwz920j4zsv1qvh2kt0efz
创建 topic
服务器启动后,我们需要创建一个主题(topic),用于消息的发送与接收。
下面将创建一个名为test-topic的topic,该topic只有一个分区(partition),且该partition也只有一个副本(replication)处理消息。
执行如下命令创建topic
$ bin/kafka-topics.sh --topic test --partitions --replication-factor
# 输出信息 Created topic "test-topic".
这时topic就 创建成功了,接下来我们还使用这个命令来查看这个topic的状态
$ bin/kafka-topics.sh --topic test-topic
# 输出信息 Topic:test-topic PartitionCount: ReplicationFactor: Configs: Topic: test-topic Partition: Leader: Replicas: Isr:
发送消息
kafka默认提供了脚本工具可以不断的接受标准输入并将他们发送到kafka的某个topic上面,用户在控制台终端下启动该命令,输入一行文本数据,然后该脚本将该行文本封装成一条kafka消息发送给指定的topic。打开新的终端,执行命令。
$ bin/kafka-console-producer.sh --broker-list localhost: --topic test-topic
执行这个命令行之后就等待用户输入
>Hello Kafka >This is my first Kafka Message
用户可以不断的输入字符形成消息,每当按下回车键的时候该文本就会被发送。
然后就可以 ctrl+c 退出了。
消费消息
kafka提供了一对应的脚本用于消费某些topic下的消息并打印到标准输出。这对于我们测试调试消息消费就很方便了。打开新的终端。
$ bin/kafka-console-consumer.sh --bootstrap-server localhost: --topic test-topic --from-beginning
执行之后我们看到了上一步中发送的两条消息
Hello Kafka This is my first Kafka Message
参考:
http://www.54tianzhisheng.cn/2018/01/05/SpringBoot-Kafka/