背景
在某实验项目中,需要部署一个日志采集系统,其中需要用到Java环境、kafka、zookeeper中间件,其中kafka、zookeeper均部署到本地,本文主要记录环境配置过程。
- 版本信息:
- Java:jdk1.8.0_151
- kafka:kafka_2.12-0.11.0.0
- zookeeper:zookeeper-3.3.6
Java环境准备
-
为提高下载速度,推挤国内镜像站下载,本文使用华为云镜像站:
- java 镜像地址:/java/jdk/
-
配置系统环境变量
- 建议不要装默认地址
C:\Program Files\Java\jdk1.8.0_171
,因为其带有空格- (不过也有博文说不用care这一条)
- 配置JAVA_HOME:C:\Java\jdk1.8.0_151
- 配置ClassPath环境变量:
.;%JAVA_HOME%\lib\;%JAVA_HOME%\lib\;
- 注意前面有“.”和“;”一定是英文状态下的符号
- 在Path中添加:
- C:\Java\jdk1.8.0_151\bin
- C:\Java\jre1.8.0_151\bin
- 建议不要装默认地址
-
Java环境测试
- 点击键盘Windows+r键,打开运行提示框输入cmd按回车键打开命令框,输入 java -version
- 若能正常显示,则表示安装成功。
安装 Zookeeper
历史版本检索:/dist/zookeeper/
注意,此处下载的均是免安装版的,windows 和 Linux 均有编译好的程序,可直接运行
本文下载的是:zookeeper-3.3.
下载好后,还不能直接使用,需要配置一下配置文件
-
先将
zoo_sample.cfg
拷贝一份,并命名为,修改:
# The number of milliseconds of each tick tickTime=2000 # The number of ticks that the initial # synchronization phase can take initLimit=10 # The number of ticks that can pass between # sending a request and getting an acknowledgement syncLimit=5 # the directory where the snapshot is stored. # dataDir=/tmp/zookeeper # 添加本地日志文件目录 <--------------------------- 只用修改这里 dataDir=C:/ProgramData/ToolLog/zookeeper/data dataLogDir=C:/ProgramData/ToolLog/zookeeper/log # the port at which the clients will connect clientPort=2181
-
修改保存后,在
zookeeper\bin
目录下,双击即可运行
安装 Kafka_2.12-0.11.0.0
下载链接:/
下载好后,修改配置文件:config目录下的
修改为
D:\kafka_logs
(自定义目录完整路径)
返回到解压文件夹下,执行:
.\bin\windows\ .\config\
至此,环境配置完毕。
测试程序
本测试程序基于Go语言开发,程序功能为向kafka中的某个话题发送数据消息。
需要说明的是:在创建生产者对象时,需要指明环境中Kafka版本号,否则会导致创建失败!!!!!!!
-
关键语句
config := () = = = true /* 明确环境中kafka版本号 */ = sarama.V0_11_0_0
-
错误提示
producer close, err: kafka: client has run out of available brokers to talk to (Is your cluster reachable?)
-
完整测试程序
package main import ( "fmt" "/Shopify/sarama" ) func main() { config := sarama.NewConfig() config.Producer.RequiredAcks = sarama.WaitForAll config.Producer.Partitioner = sarama.NewRandomPartitioner config.Producer.Return.Successes = true config.Version = sarama.V0_11_0_0 // <------- 注意这里需要选择自己环境中kafka版本,否则程序会运行失败 msg := &sarama.ProducerMessage{} msg.Topic = "nginx_log" msg.Value = sarama.StringEncoder("this is a good test, my message is good") client, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config) if err != nil { fmt.Println("producer close, err:", err) return } defer client.Close() pid, offset, err := client.SendMessage(msg) if err != nil { fmt.Println("send message failed,", err) return } fmt.Printf("pid:%v offset:%v\n", pid, offset) }
参考文献:
/weixin_41402352/article/details/84325136
/wangzhaobo/p/