Go语言学习之11 日志收集系统kafka库实战

时间:2022-08-22 12:20:14

本节主要内容:

1. 日志收集系统设计
2. 日志客户端开发

1. 项目背景
    a. 每个系统都有日志,当系统出现问题时,需要通过日志解决问题
    b. 当系统机器比较少时,登陆到服务器上查看即可满足
    c. 当系统机器规模巨大,登陆到机器上查看几乎不现实

2. 解决方案
    a. 把机器上的日志实时收集,统一的存储到中心系统
    b. 然后再对这些日志建立索引,通过搜索即可以找到对应日志
    c. 通过提供界面友好的web界面,通过web即可以完成日志搜索

3. 面临的问题
    a. 实时日志量非常大,每天几十亿条
    b. 日志准实时收集,延迟控制在分钟级别
    c. 能够水平可扩展

4. 业界方案ELK

日志收集系统架构

Go语言学习之11 日志收集系统kafka库实战

该方案问题:

a. 运维成本高,每增加一个日志收集,都需要手动修改配置
    b. 监控缺失,无法准确获取logstash的状态
    c. 无法做定制化开发以及维护

6. 日志收集系统设计

Go语言学习之11 日志收集系统kafka库实战

各组件介绍:
    a. Log Agent,日志收集客户端,用来收集服务器上的日志
    b. Kafka,高吞吐量的分布式队列,linkin开发,apache*开源项目
    c. ES,elasticsearch,开源的搜索引擎,提供基于http restful的web接口
    d. Hadoop,分布式计算框架,能够对大量数据进行分布式处理的平台

7. kafka应用场景
    1. 异步处理, 把非关键流程异步化,提高系统的响应时间和健壮性

Go语言学习之11 日志收集系统kafka库实战

Go语言学习之11 日志收集系统kafka库实战

2. 应用解耦,通过消息队列

Go语言学习之11 日志收集系统kafka库实战

3. 流量削峰3. 流量削峰

Go语言学习之11 日志收集系统kafka库实战

 8. zookeeper应用场景

1. 服务注册&服务发现

Go语言学习之11 日志收集系统kafka库实战

2. 配置中心

Go语言学习之11 日志收集系统kafka库实战

3. 分布式锁

  • Zookeeper是强一致的
  • 多个客户端同时在Zookeeper上创建相同znode,只有一个创建成功

 9. 安装kafka

见博客:https://www.cnblogs.com/xuejiale/p/10505391.html

10. log agent设计

Go语言学习之11 日志收集系统kafka库实战

11. log agent流程

Go语言学习之11 日志收集系统kafka库实战

11. kafka示例

先导入第三方包:

github.com/Shopify/sarama

我的kafka和ZooKeeper都安装在Linux(Centos6.5,ip: 192.168.30.136)上:

 package main

 import (
"fmt"
"time"
"github.com/Shopify/sarama"
) func main() { config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Partitioner = sarama.NewRandomPartitioner
config.Producer.Return.Successes = true client, err := sarama.NewSyncProducer([]string{"192.168.30.136:9092"}, config)
if err != nil {
fmt.Println("producer close, err:", err)
return
} defer client.Close()
for {
msg := &sarama.ProducerMessage{}
msg.Topic = "nginx_log"
msg.Value = sarama.StringEncoder("this is a good test, my message is good") pid, offset, err := client.SendMessage(msg)
if err != nil {
fmt.Println("send message failed,", err)
return
} fmt.Printf("pid:%v offset:%v\n", pid, offset)
time.Sleep(time.Second)
}
}

kafka示例

注意:Shopify/sarama的同步/异步producer,https://www.jianshu.com/p/666d2604e8f8

Windows启动程序往Linux上的kafka发送数据:

Go语言学习之11 日志收集系统kafka库实战

Linux上的kafka接收数据:

Go语言学习之11 日志收集系统kafka库实战

再来看一个kafka生产和消费示例:

 package main

 import (
"fmt"
"github.com/Shopify/sarama"
) func main() {
// 新建一个arama配置实例
config := sarama.NewConfig()
// WaitForAll waits for all in-sync replicas to commit before responding.
config.Producer.RequiredAcks = sarama.WaitForAll
// NewRandomPartitioner returns a Partitioner which chooses a random partition each time.
config.Producer.Partitioner = sarama.NewRandomPartitioner
config.Producer.Return.Successes = true // new producer
client, err := sarama.NewSyncProducer([]string{"192.168.30.136:9092"}, config)
if err != nil {
fmt.Println("producer close, err:", err)
return
}
defer client.Close() // new message
msg := &sarama.ProducerMessage{}
msg.Topic = "food"
msg.Key = sarama.StringEncoder("fruit")
msg.Value = sarama.StringEncoder("apple") // send message
pid, offset, err := client.SendMessage(msg)
if err != nil {
fmt.Println("send message failed,", err)
return
}
fmt.Printf("pid: %v, offset:%v\n", pid, offset) // new message
msg2 := &sarama.ProducerMessage{}
msg2.Topic = "food"
msg2.Key = sarama.StringEncoder("fruit")
msg2.Value = sarama.StringEncoder("orange") // send message
pid2, offset2, err := client.SendMessage(msg2)
if err != nil {
fmt.Println("send message failed,", err)
return
}
fmt.Printf("pid2: %v, offset2:%v\n", pid2, offset2) fmt.Println("Produce success.")
}

produce

 package main

 import (
"sync"
"github.com/Shopify/sarama"
"fmt"
) var wg sync.WaitGroup func main() {
consumer, err := sarama.NewConsumer([]string{"192.168.30.136:9092"}, nil)
if err != nil {
fmt.Println("consumer connect error:", err)
return
}
fmt.Println("connnect success...")
defer consumer.Close() partitions, err := consumer.Partitions("food")
if err != nil {
fmt.Println("geet partitions failed, err:", err)
return
} for _, p := range partitions {
partitionConsumer, err := consumer.ConsumePartition("food", p, sarama.OffsetOldest)
if err != nil {
fmt.Println("partitionConsumer err:", err)
continue
}
wg.Add()
go func(){
for m := range partitionConsumer.Messages() {
fmt.Printf("key: %s, text: %s, offset: %d\n", string(m.Key), string(m.Value), m.Offset)
}
wg.Done()
}()
}
wg.Wait() fmt.Println("Consumer success.")
}

consumer

12. tailf组件使用

    先导入第三方包:

github.com/hpcloud/tail
 package main

 import (
"fmt"
"github.com/hpcloud/tail"
"time"
)
func main() {
filename := "F:\\Go\\project\\src\\go_dev\\logCollect\\tailf\\my.log"
tails, err := tail.TailFile(filename, tail.Config{
ReOpen: true,
Follow: true,
//Location: &tail.SeekInfo{Offset: 0, Whence: 2},
MustExist: false,
Poll: true,
})
if err != nil {
fmt.Println("tail file err:", err)
return
}
var msg *tail.Line
var ok bool
for {
msg, ok = <-tails.Lines
if !ok {
fmt.Printf("tail file close reopen, filename:%s\n", tails.Filename)
time.Sleep( * time.Millisecond)
continue
}
fmt.Println("msg:", msg)
}
}

tailf示例代码

my.log文件内容(unix格式):

Go语言学习之11 日志收集系统kafka库实战

在Windows上,当我的上面代码里日志文件(my.log)为Windows格式,代码执行结果如下:

Go语言学习之11 日志收集系统kafka库实战

当时用notepade++将文件格式转换为Unix格式,执行代码结果如下:

Go语言学习之11 日志收集系统kafka库实战

注意:最后一行必须有换行符,否则该行无法读取。

13. 配置文件库使用

先导入第三方包:

github.com/astaxie/beego/config

1) 初始化配置库

iniconf, err := NewConfig("ini", "testini.conf")
if err != nil {
log.Fatal(err)
}

2) 读取配置项

String(key string) string
Int(key string) (int, error)
Int64(key string) (int64, error)
Bool(key string) (bool, error)
Float(key string) (float64, error)

例如:

iniconf.String("server::listen_ip")
iniconf.Int("server::listen_port") [server]
listen_ip = "0.0.0.0"
listen_port = [logs]
log_level=debug
log_path=./logs/logagent.log [collect]
log_path=/home/work/logs/nginx/access.log
topic=nginx_log
 package main

 import (
"fmt"
"github.com/astaxie/beego/config"
) func main() {
conf, err := config.NewConfig("ini", "./logcollect.conf")
if err != nil {
fmt.Println("new config failed, err:", err)
return
} port, err := conf.Int("server::listen_port")
if err != nil {
fmt.Println("read server:port failed, err:", err)
return
} fmt.Println("Port:", port)
log_level := conf.String("log::log_level")
if err != nil {
fmt.Println("read log_level failed, ", err)
return
}
fmt.Println("log_level:", log_level) log_path := conf.String("log::log_path")
fmt.Println("log_path:", log_path)
}

config示例代码

配置文件内容:

[server]
listen_ip = "0.0.0.0"
listen_port = [log]
log_level=debug
log_path=./logs/logagent.log [collect]
log_path=/home/work/logs/nginx/access.log
topic=nginx_log

执行结果:

Go语言学习之11 日志收集系统kafka库实战

14. 日志库的使用

先导入第三方包:

github.com/astaxie/beego/logs

1) 配置log组件

config := make(map[string]interface{})
config["filename"] = "./logs/logcollect.log"
config["level"] = logs.LevelDebug configStr, err := json.Marshal(config)
if err != nil {
fmt.Println("marshal failed, err:", err)
return
}

2) 初始化日志组件

logs.SetLogger(“file”, string(configStr))
 package main

 import (
"encoding/json"
"fmt"
"github.com/astaxie/beego/logs"
) func main() {
config := make(map[string]interface{})
config["filename"] = "./logcollect.log"
config["level"] = logs.LevelDebug configStr, err := json.Marshal(config)
if err != nil {
fmt.Println("marshal failed, err:", err)
return
} logs.SetLogger(logs.AdapterFile, string(configStr)) logs.Debug("this is a test, my name is %s", "stu01")
logs.Trace("this is a trace, my name is %s", "stu02")
logs.Warn("this is a warn, my name is %s", "stu03")
}

logs示例

15. 日志收集项目整体实现

    开发环境为Windows系统,go version go1.12.1 windows/amd64, kafka_2.11-2.0.0,zookeeper-3.4.12。

   先实现了一个demo,V1版本:

(1)代码结构图

Go语言学习之11 日志收集系统kafka库实战

(2)代码地址见本人github:https://github.com/XJL635438451/logCollectProject/tree/master

(3)如何运行

1)先安装 go, kafka,zookeeper;

2)先启动 zookeeper,然后启动kafka,下面是启动的命令;

启动ZK
.\zkServer.cmd 启动kafka
F:\Go\project\src\module\kafka_2.-2.0.>.\bin\windows\kafka-server-start.bat .\config\server.properties 创建topic
F:\Go\project\src\module\kafka_2.-2.0.>.\bin\windows\kafka-topics.bat --create --zookeeper localhost: --replication-factor --partitions --topic kafkaTest 启动生产者:
F:\Go\project\src\module\kafka_2.-2.0.>.\bin\windows\kafka-console-producer.bat --broker-list localhost: --topic kafkaTest 启动消费者:
F:\Go\project\src\module\kafka_2.-2.0.>.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost: --topic nginx_log --from-beginning

3)如果自己不行写日志文件,可以运行代码中的 writeLogTest/log.go,然后运行 main.exe (如果自己修改了代码还需要重新编译);

4)可以起一个kafka的consumer来查看日志是否写入到了kafka,方法就是上面的启动生产者命令,如果正常就可以看到日志一直在kafka中刷新。

Go语言学习之11 日志收集系统kafka库实战

Go语言学习之11 日志收集系统kafka库实战的更多相关文章

  1. GO学习-&lpar;35&rpar; Go实现日志收集系统4

    Go实现日志收集系统4   到这一步,我的收集系统就已经完成很大一部分工作,我们重新看一下我们之前画的图: 我们已经完成前面的部分,剩下是要完成后半部分,将kafka中的数据扔到ElasticSear ...

  2. GO学习-&lpar;34&rpar; Go实现日志收集系统3

    Go实现日志收集系统3   再次整理了一下这个日志收集系统的框,如下图 这次要实现的代码的整体逻辑为: 完整代码地址为: etcd介绍 高可用的分布式key-value存储,可以用于配置共享和服务发现 ...

  3. GO学习-&lpar;32&rpar; Go实现日志收集系统1

    Go实现日志收集系统1 项目背景 每个系统都有日志,当系统出现问题时,需要通过日志解决问题 当系统机器比较少时,登陆到服务器上查看即可满足 当系统机器规模巨大,登陆到机器上查看几乎不现实 当然即使是机 ...

  4. Go语言之高级篇beego框架之日志收集系统

    一.日志收集系统架构设计 图1 图2 二.开发环境 1.安装jdk jdk-8u51-windows-x64.exe 安装目录:C:\Program Files\jdk8 2.安装zookeeper ...

  5. GO学习-&lpar;33&rpar; Go实现日志收集系统2

    Go实现日志收集系统2   一篇文章主要是关于整体架构以及用到的软件的一些介绍,这一篇文章是对各个软件的使用介绍,当然这里主要是关于架构中我们agent的实现用到的内容 关于zookeeper+kaf ...

  6. &lbrack;转载&rsqb; 一共81个,开源大数据处理工具汇总(下),包括日志收集系统&sol;集群管理&sol;RPC等

    原文: http://www.36dsj.com/archives/25042 接上一部分:一共81个,开源大数据处理工具汇总(上),第二部分主要收集整理的内容主要有日志收集系统.消息系统.分布式服务 ...

  7. 一共81个,开源大数据处理工具汇总(下),包括日志收集系统&sol;集群管理&sol;RPC等

    作者:大数据女神-诺蓝(微信公号:dashujunvshen).本文是36大数据专稿,转载必须标明来源36大数据. 接上一部分:一共81个,开源大数据处理工具汇总(上),第二部分主要收集整理的内容主要 ...

  8. ELK&plus;kafka构建日志收集系统

    ELK+kafka构建日志收集系统   原文  http://lx.wxqrcode.com/index.php/post/101.html   背景: 最近线上上了ELK,但是只用了一台Redis在 ...

  9. Flume -- 开源分布式日志收集系统

    Flume是Cloudera提供的一个高可用的.高可靠的开源分布式海量日志收集系统,日志数据可以经过Flume流向需要存储终端目的地.这里的日志是一个统称,泛指文件.操作记录等许多数据. 一.Flum ...

随机推荐

  1. IOS 杂笔-9 (MD5 加密)

    首先是一段对MD5的简介 *出自一位大牛之手* Message Digest Algorithm MD5(中文名为消息摘要算法第五版)为计算机安全领域广泛使用的一种散列函数,用以提供消息的完整性保护 ...

  2. &lbrack;ES&rsqb; 基础概念

    Elasticsearch是基于Luence实现的一款搜索引擎,支持分布式和集群,并且搜索近实时,主要用于搜索和数据分析 索引 index 可以理解为数据库中的database,存储的是实际数据,因为 ...

  3. img的绝对路径转为相对路径

    $('#add_img').on('change', function(){ var objUrl = getObjectURL(this.files[0]) ; if (objUrl) { $(th ...

  4. QIBO &sol;do&sol;jf&period;php EvilCode Execution Injected By &sol;hack&sol;jfadmin&sol;admin&period;php

    catalog . 漏洞描述 . 漏洞触发条件 . 漏洞影响范围 . 漏洞代码分析 . 防御方法 . 攻防思考 1. 漏洞描述 这个漏洞的成因简单来说可以归纳为如下几点 . 类似于ECSHOP的的模版 ...

  5. Java基础系列--06&lowbar;抽象类与接口概述

    抽象类 (1)如果多个类中存在相同的方法声明,而方法体不一样,我们就可以只提取方法声明. 如果一个方法只有方法声明,没有方法体,那么这个方法必须用抽象修饰. 而一个类中如果有抽象方法,这个类必须定义为 ...

  6. linux防火墙开放和禁用指定端口

    一.例如:开放8080端口 firewall-cmd --permanent --add-port=8080/tcp 二.重启使设置生效 systemctl restart firewalld.ser ...

  7. Wannafly挑战赛29-A&sol;B

    链接:https://ac.nowcoder.com/acm/contest/271/A来源:牛客网 御坂美琴 时间限制:C/C++ 1秒,其他语言2秒 空间限制:C/C++ 131072K,其他语言 ...

  8. mongodb 3&period;2&period;x 启动 Warning 错误处理

    [root@restore1 data1]# mongod --dbpath=/data/data1/mongodb_data/ --directoryperdb ** WARNING: You ar ...

  9. &lbrack;CLPR&rsqb; 卷积还是相关&quest; - Opencv之filter2D探究

    I am doing something about convolving images in Python and for sake of speed I chose opencv 2.4.9. O ...

  10. python实践报错:SyntaxError&colon; Non-ASCII character

    报错: File "C:\001myWorkspace\001myWork\workspace2\MyFirstPython\src\demo4\demo4-2.py", line ...