go kafka

时间:2024-10-06 16:36:56

安装导入

go get github.com/Shopify/sarama
import "github.com/Shopify/sarama"

使用

1、同步生产者

package main

import (
"fmt"
"time"
"github.com/Shopify/sarama"
) func main() {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll // ACK
config.Producer.Partitioner = sarama.NewRandomPartitioner // 随机分区
config.Producer.Return.Successes = true // 返回true client, err := sarama.NewSyncProducer([]string{"192.168.14.4:9092"}, config)
if err != nil {
fmt.Println("producer close, err:", err)
return
} defer client.Close()
for {
msg := &sarama.ProducerMessage{}
msg.Topic = "nginx_log"
msg.Value = sarama.StringEncoder("this is a good test, my message is good") pid, offset, err := client.SendMessage(msg)
if err != nil {
fmt.Println("send message failed,", err)
return
} fmt.Printf("pid:%v offset:%v\n", pid, offset)
time.Sleep(10 * time.Millisecond)
}
}

  

2、消费者

package main

import (
"fmt"
"strings"
"sync"
"github.com/Shopify/sarama"
) var (
wg sync.WaitGroup
) func main() { consumer, err := sarama.NewConsumer(strings.Split("192.168.14.4:9092", ","), nil)
if err != nil {
fmt.Println("Failed to start consumer: %s", err)
return
}
partitionList, err := consumer.Partitions("nginx_log")
if err != nil {
fmt.Println("Failed to get the list of partitions: ", err)
return
}
fmt.Println(partitionList)
for partition := range partitionList {
pc, err := consumer.ConsumePartition("nginx_log", int32(partition), sarama.OffsetNewest)
if err != nil {
fmt.Printf("Failed to start consumer for partition %d: %s\n", partition, err)
return
}
defer pc.AsyncClose()
go func(pc sarama.PartitionConsumer) {
wg.Add(1)
for msg := range pc.Messages() {
fmt.Printf("Partition:%d, Offset:%d, Key:%s, Value:%s", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
fmt.Println()
}
wg.Done()
}(pc)
}
time.Sleep(time.Hour)
wg.Wait()
consumer.Close()
}

相关文章