windows下golang实现Kfaka消息发送及kafka环境搭建

时间:2021-08-31 09:33:04

  kafka环境搭建:

    一、安装配置java-jdk

    (1)kafka需要java环境,安装java-jdk,下载地址:https://www.oracle.com/technetwork/java/javase/downloads/index.html

windows下golang实现Kfaka消息发送及kafka环境搭建

   (2)安装目录如下:windows下golang实现Kfaka消息发送及kafka环境搭建

    (3)环境变量配置:

    windows下golang实现Kfaka消息发送及kafka环境搭建

     windows下golang实现Kfaka消息发送及kafka环境搭建

    二、下载kafka

    (1)下载kafka2.10-0.9.0.1版本,自带了zookeeper jar包,不用再次下载zookeeper。kafka代理无状态,zookeeper维持集群状态。下载地址:http://mirror.bit.edu.cn/apache/kafka/2.2.0/kafka-2.2.0-src.tgz

    (2)安装目录(不要带空格)如下:

windows下golang实现Kfaka消息发送及kafka环境搭建

    

    (3)修改zookeeper和kafka配置文件:

windows下golang实现Kfaka消息发送及kafka环境搭建

windows下golang实现Kfaka消息发送及kafka环境搭建windows下golang实现Kfaka消息发送及kafka环境搭建

    (4)按照配置创建这两个目录

    (5)cmd启动zookeeper:  

cd D:\KAFKA\kafka_2.10-0.9.0.1\kafka_2.10-0.9.0.1\kafka_2.10-0.9.0.1

bin\windows\zookeeper-server-start.bat config\zookeeper.properties

windows下golang实现Kfaka消息发送及kafka环境搭建

    (6)再开cmd启动kafka:

cd D:\KAFKA\kafka_2.10-0.9.0.1\kafka_2.10-0.9.0.1\kafka_2.10-0.9.0.1

bin\windows\kafka-server-start.bat config\server.properties

windows下golang实现Kfaka消息发送及kafka环境搭建

    (7)再开cmd创建topic发送消息:

cd D:\KAFKA\kafka_2.10-0.9.0.1\kafka_2.10-0.9.0.1\kafka_2.10-0.9.0.1

# 创建topic
bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kjTest

# 列出topic
bin\windows\kafka-topics.bat --list --zookeeper localhost:2181

# 创建生产者
bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic kjTest

# 发送消息
this is a test
hello

windows下golang实现Kfaka消息发送及kafka环境搭建

    (8)再开cmd接收消息:

cd D:\KAFKA\kafka_2.10-0.9.0.1\kafka_2.10-0.9.0.1\kafka_2.10-0.9.0.1

# 创建消费者
bin\windows\kafka-console-consumer.bat --zookeeper localhost:2181 --topic kjTest --from-beginning

# 消费
this is a test
hello

windows下golang实现Kfaka消息发送及kafka环境搭建


 

   golang实现Kfaka消息发送:

      创建main.go:

package main

import (
	"fmt"

	"github.com/Shopify/sarama"
	"time"
)

//消息写入kafka
func main() {
	//初始化配置
	config := sarama.NewConfig()
	config.Producer.RequiredAcks = sarama.WaitForAll
	config.Producer.Partitioner = sarama.NewRandomPartitioner
	config.Producer.Return.Successes = true
	//生产者
	client, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config)
	if err != nil {
		fmt.Println("producer close,err:", err)
		return
	}

	defer client.Close()
	var n int = 0

	for n < 20 {
		n++
		//创建消息
		msg := &sarama.ProducerMessage{}
		msg.Topic = "kjTest"
		msg.Value = sarama.StringEncoder("this is a good test,hello nola!")
		//发送消息
		pid, offset, err := client.SendMessage(msg)
		if err != nil {
			fmt.Println("send message failed,", err)
			return
		}
		fmt.Printf("pid:%v offset:%v\n,", pid, offset)
		time.Sleep(10 * time.Millisecond)

	}

}

      消费消息效果:

windows下golang实现Kfaka消息发送及kafka环境搭建

  参考博友:

    kafka环境搭建:https://www.cnblogs.com/UniqueColor/p/8657319.html

    golang发送消息到kafka:https://www.cnblogs.com/pyyu/p/8371649.html

    kafka入门,概念功能理解:https://blog.csdn.net/tflasd1157/article/details/81985722