Kafka-spark streaming
1、安装包
kafka安装需要zookeeper、jdk。
官网下载最新的:
https://kafka.apache.org/downloads
http://mirrors.hust.edu.cn/apache/zookeeper/
http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html
本例用到的服务版本:kafka_2.11.0.11.tar.gz,jdk1.8
Zookeeper可以自己安装,也可以使用kafka自带的zk。本例使用自带的zookeeper。
2、安装kafka(standalone)
1)# tar xf kafka_2.11-0.11.0.0.tgz
2)# mv kafka_2.11-0.11.0.0 kafka
3)# cd kafka ;ls
bin config kafka-logs libs LICENSE logs NOTICE site-docs
3、修改配置文件
主要是修改server.properties、zookeeper.properties、producer.properties、consumer.properties
1) # vim server.properties
broker.id=0 #每个kafka的broker是唯一的
delete.topic.enable=true
listeners=PLAINTEXT://:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka/kafka-logs
num.partitions=4 #4个分区,对应spark4个RDD
num.recovery.threads.per.data.dir=1
- offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
2)vim zookeeper.properties
dataDir=/data/zookeeper
clientPort=2181
maxClientCnxns=0
3)vim producer.properties
bootstrap.servers=localhost:9092
compression.type=none
4)vim consumer.properties
zookeeper.connect=127.0.0.1:2181
zookeeper.connection.timeout.ms=6000
group.id=consumer
#如果zk是集群,则用,隔开。
#kafka是集群,则group.id相同
创建相应的目录。
4、启动服务
1)启动zk
# nohup ./zookeeper-server-start.sh ../config/zookeeper.properties 2>&1 &
2)启动kafka
# nohup ./kafka-server-start.sh ../config/server.properties 2>&1 &
3)创建一个topic
# ./kafka-topics.sh --create --topic kafka-test --replication-factor 1 --partitions 4 --zookeeper localhost:2181
修改partition的数量:
# ./kafka-topics.sh --alter --zookeeper localhost:2181 --topic kafka-test --partitions 20
4)查看创建的所有topic
# ./kafka-topics.sh --list --zookeeper localhost:2181
5)删除某个topic
# ./kafka-topics.sh --delete --zookeeper localhost:2181 --topic kafka-test
# 需要在server.properties里面设置delete.topic.enable=true。
6)模拟producer
# ./kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic userlog
7)模拟consumer
# ./kafka-console-consumer.sh --zookeeper localhost:2181 --topic userlog --from-beginning
#如果是分布式的,还需要设置listeners=PLAINTEXT://:9092,默认是127.0.0.1;需要修改为:listeners=PLAINTEXT://服务器ip:9092
在其他服务器上调用的时候,也是需要将地址改为服务器地址的,还有就是 bootstrap.servers=localhost:9092
,改为服务器的地址。