logrus是go的一个日志框架,它最让人激动的应该是hook机制,可以在初始化时为logrus添加hook,logrus可以实现各种扩展功能,可以将日志输出到elasticsearch和activemq等中间件去,甚至可以输出到你的email和叮叮中去,不要问为为什么可以发现可以输入到叮叮中去,都是泪,手动笑哭!
言归正传,这里就简单的通过hook机制将文件输出到本地磁盘。
首先
go get github.com/sirupsen/logrus
然后
logrus和go lib里面一样有6个等级,可以直接调用
1
2
3
4
5
6
|
logrus.Debug("Useful debugging information.")
logrus.Info("Something noteworthy happened!")
logrus.Warn("You should probably take a look at this.")
logrus.Error("Something failed but I'm not quitting.")
logrus.Fatal("Bye.") //log之后会调用os.Exit(1)
logrus.Panic("I'm bailing.") //log之后会panic()
|
项目例子结构
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
package main
import (
"fmt"
"github.com/sirupsen/logrus"
"logT/logS"
)
func main() {
//创建一个hook,将日志存储路径输入进去
hook := logS.NewHook("d:/log/golog.log")
//加载hook之前打印日志
logrus.WithField("file", "d:/log/golog.log").Info("New logrus hook err.")
logrus.AddHook(hook)
//加载hook之后打印日志
logrus.WithFields(logrus.Fields{
"animal": "walrus",
}).Info("A walrus appears")
}
|
hook.go
不要看下面三个go文件代码很长,其实大多数都是固定代码,也就NewHook函数自己扩展定义就好
package logS
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
|
import (
"fmt"
"github.com/sirupsen/logrus"
"os"
"strings"
)
// Hook 写文件的Logrus Hook
type Hook struct {
W LoggerInterface
}
func NewHook(file string) (f *Hook) {
w := NewFileWriter()
config := fmt.Sprintf(`{"filename":"%s","maxdays":7}`, file)
err := w.Init(config)
if err != nil {
return nil
}
return &Hook{w}
}
// Fire 实现Hook的Fire接口
func (hook *Hook) Fire(entry *logrus.Entry) (err error) {
message, err := getMessage(entry)
if err != nil {
fmt.Fprintf(os.Stderr, "Unable to read entry, %v", err)
return err
}
switch entry.Level {
case logrus.PanicLevel:
fallthrough
case logrus.FatalLevel:
fallthrough
case logrus.ErrorLevel:
return hook.W.WriteMsg(fmt.Sprintf("[ERROR] %s", message), LevelError)
case logrus.WarnLevel:
return hook.W.WriteMsg(fmt.Sprintf("[WARN] %s", message), LevelWarn)
case logrus.InfoLevel:
return hook.W.WriteMsg(fmt.Sprintf("[INFO] %s", message), LevelInfo)
case logrus.DebugLevel:
return hook.W.WriteMsg(fmt.Sprintf("[DEBUG] %s", message), LevelDebug)
default:
return nil
}
}
// Levels 实现Hook的Levels接口
func (hook *Hook) Levels() []logrus.Level {
return []logrus.Level{
logrus.PanicLevel,
logrus.FatalLevel,
logrus.ErrorLevel,
logrus.WarnLevel,
logrus.InfoLevel,
logrus.DebugLevel,
}
}
func getMessage(entry *logrus.Entry) (message string, err error) {
message = message + fmt.Sprintf("%s ", entry.Message)
file, lineNumber := GetCallerIgnoringLogMulti(2)
if file != "" {
sep := fmt.Sprintf("%s/src/", os.Getenv("GOPATH"))
fileName := strings.Split(file, sep)
if len(fileName) >= 2 {
file = fileName[1]
}
}
message = fmt.Sprintf("%s:%d ", file, lineNumber) + message
for k, v := range entry.Data {
message = message + fmt.Sprintf("%v:%v ", k, v)
}
return
}
|
caller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
package logS
import (
"runtime"
"strings"
)
func GetCaller(callDepth int, suffixesToIgnore ...string) (file string, line int) {
// bump by 1 to ignore the getCaller (this) stackframe
callDepth++
outer:
for {
var ok bool
_, file, line, ok = runtime.Caller(callDepth)
if !ok {
file = "???"
line = 0
break
}
for _, s := range suffixesToIgnore {
if strings.HasSuffix(file, s) {
callDepth++
continue outer
}
}
break
}
return
}
// GetCallerIgnoringLogMulti TODO
func GetCallerIgnoringLogMulti(callDepth int) (string, int) {
// the +1 is to ignore this (getCallerIgnoringLogMulti) frame
return GetCaller(callDepth+1, "logrus/hooks.go", "logrus/entry.go", "logrus/logger.go", "logrus/exported.go", "asm_amd64.s")
}
|
file.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
|
package logS
import (
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"log"
"os"
"path/filepath"
"strings"
"sync"
"time"
)
// RFC5424 log message levels.
const (
LevelError = iota
LevelWarn
LevelInfo
LevelDebug
)
// LoggerInterface Logger接口
type LoggerInterface interface {
Init(config string) error
WriteMsg(msg string, level int) error
Destroy()
Flush()
}
// LogWriter implements LoggerInterface.
// It writes messages by lines limit, file size limit, or time frequency.
type LogWriter struct {
*log.Logger
mw *MuxWriter
// The opened file
Filename string `json:"filename"`
Maxlines int `json:"maxlines"`
maxlinesCurlines int
// Rotate at size
Maxsize int `json:"maxsize"`
maxsizeCursize int
// Rotate daily
Daily bool `json:"daily"`
Maxdays int64 `json:"maxdays"`
dailyOpendate int
Rotate bool `json:"rotate"`
startLock sync.Mutex // Only one log can write to the file
Level int `json:"level"`
}
// MuxWriter an *os.File writer with locker.
type MuxWriter struct {
sync.Mutex
fd *os.File
}
// write to os.File.
func (l *MuxWriter) Write(b []byte) (int, error) {
l.Lock()
defer l.Unlock()
return l.fd.Write(b)
}
// SetFd set os.File in writer.
func (l *MuxWriter) SetFd(fd *os.File) {
if l.fd != nil {
_ = l.fd.Close()
}
l.fd = fd
}
// NewFileWriter create a FileLogWriter returning as LoggerInterface.
func NewFileWriter() LoggerInterface {
w := &LogWriter{
Filename: "",
Maxlines: 1000000,
Maxsize: 1 << 28, //256 MB
Daily: true,
Maxdays: 7,
Rotate: true,
Level: LevelDebug,
}
// use MuxWriter instead direct use os.File for lock write when rotate
w.mw = new(MuxWriter)
// set MuxWriter as Logger's io.Writer
w.Logger = log.New(w.mw, "", log.Ldate|log.Ltime)
return w
}
// Init file logger with json config.
// jsonconfig like:
// {
// "filename":"logs/sample.log",
// "maxlines":10000,
// "maxsize":1<<30,
// "daily":true,
// "maxdays":15,
// "rotate":true
// }
func (w *LogWriter) Init(jsonconfig string) error {
err := json.Unmarshal([]byte(jsonconfig), w)
if err != nil {
return err
}
if len(w.Filename) == 0 {
return errors.New("jsonconfig must have filename")
}
err = w.startLogger()
return err
}
// start file logger. create log file and set to locker-inside file writer.
func (w *LogWriter) startLogger() error {
fd, err := w.createLogFile()
if err != nil {
return err
}
w.mw.SetFd(fd)
err = w.initFd()
if err != nil {
return err
}
return nil
}
func (w *LogWriter) docheck(size int) {
w.startLock.Lock()
defer w.startLock.Unlock()
if w.Rotate && ((w.Maxlines > 0 && w.maxlinesCurlines >= w.Maxlines) ||
(w.Maxsize > 0 && w.maxsizeCursize >= w.Maxsize) ||
(w.Daily && time.Now().Day() != w.dailyOpendate)) {
if err := w.DoRotate(); err != nil {
fmt.Fprintf(os.Stderr, "FileLogWriter(%q): %s\n", w.Filename, err)
return
}
}
w.maxlinesCurlines++
w.maxsizeCursize += size
}
// WriteMsg write logger message into file.
func (w *LogWriter) WriteMsg(msg string, level int) error {
if level > w.Level {
return nil
}
n := 24 + len(msg) // 24 stand for the length "2013/06/23 21:00:22 [T] "
w.docheck(n)
w.Logger.Print(msg)
return nil
}
func (w *LogWriter) createLogFile() (*os.File, error) {
// Open the log file
fd, err := os.OpenFile(w.Filename, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0660)
return fd, err
}
func (w *LogWriter) initFd() error {
fd := w.mw.fd
finfo, err := fd.Stat()
if err != nil {
return fmt.Errorf("get stat err: %s", err)
}
w.maxsizeCursize = int(finfo.Size())
w.dailyOpendate = time.Now().Day()
if finfo.Size() > 0 {
content, err := ioutil.ReadFile(w.Filename)
if err != nil {
return err
}
w.maxlinesCurlines = len(strings.Split(string(content), "\n"))
} else {
w.maxlinesCurlines = 0
}
return nil
}
// DoRotate means it need to write file in new file.
// new file name like xx.log.2013-01-01.2
func (w *LogWriter) DoRotate() error {
_, err := os.Lstat(w.Filename)
if err == nil { // file exists
// Find the next available number
num := 1
fname := ""
for ; err == nil && num <= 999; num++ {
fname = w.Filename + fmt.Sprintf(".%s.%03d", time.Now().Format("2006-01-02"), num)
_, err = os.Lstat(fname)
}
// return error if the last file checked still existed
if err == nil {
return fmt.Errorf("Rotate: Cannot find free log number to rename %s", w.Filename)
}
// block Logger's io.Writer
w.mw.Lock()
defer w.mw.Unlock()
fd := w.mw.fd
_ = fd.Close()
// close fd before rename
// Rename the file to its newfound home
err = os.Rename(w.Filename, fname)
if err != nil {
return fmt.Errorf("Rotate: %s", err)
}
// re-start logger
err = w.startLogger()
if err != nil {
return fmt.Errorf("Rotate StartLogger: %s", err)
}
go w.deleteOldLog()
}
return nil
}
func (w *LogWriter) deleteOldLog() {
dir := filepath.Dir(w.Filename)
_ = filepath.Walk(dir, func(path string, info os.FileInfo, err error) (returnErr error) {
defer func() {
if r := recover(); r != nil {
returnErr = fmt.Errorf("Unable to delete old log '%s', error: %+v", path, r)
fmt.Println(returnErr)
}
}()
if !info.IsDir() && info.ModTime().Unix() < (time.Now().Unix()-60*60*24*w.Maxdays) {
if strings.HasPrefix(filepath.Base(path), filepath.Base(w.Filename)) {
_ = os.Remove(path)
}
}
return
})
}
// Destroy destroy file logger, close file writer.
func (w *LogWriter) Destroy() {
_ = w.mw.fd.Close()
}
// Flush file logger.
// there are no buffering messages in file logger in memory.
// flush file means sync file from disk.
func (w *LogWriter) Flush() {
_ = w.mw.fd.Sync()
}
|
补充知识:golang logrus自定义hook:日志切片hook、邮件警报hook、kafkahook
logrus Hook 分析
logrus hook 接口定义很简单。如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
package logrus
// A hook to be fired when logging on the logging levels returned from
// `Levels()` on your implementation of the interface. Note that this is not
// fired in a goroutine or a channel with workers, you should handle such
// functionality yourself if your call is non-blocking and you don't wish for
// the logging calls for levels returned from `Levels()` to block.
type Hook interface {
Levels() []Level
Fire(*Entry) error
}
// Internal type for storing the hooks on a logger instance.
type LevelHooks map[Level][]Hook
// Add a hook to an instance of logger. This is called with
// `log.Hooks.Add(new(MyHook))` where `MyHook` implements the `Hook` interface.
func (hooks LevelHooks) Add(hook Hook) {
for _, level := range hook.Levels() {
hooks[level] = append(hooks[level], hook)
}
}
// Fire all the hooks for the passed level. Used by `entry.log` to fire
// appropriate hooks for a log entry.
func (hooks LevelHooks) Fire(level Level, entry *Entry) error {
for _, hook := range hooks[level] {
if err := hook.Fire(entry); err != nil {
return err
}
}
return nil
}
|
只需实现 该结构的接口。
1
2
3
4
|
type Hook interface {
Levels() []Level
Fire(*Entry) error
}
|
就会被logrus框架遍历调用已注册的 hook 的 Fire 方法
获取日志实例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
// log_hook.go
package logger
import (
"fmt"
"github.com/sirupsen/logrus"
"library/util/constant"
"os"
)
//自实现 logrus hook
func getLogger(module string) *logrus.Logger {
//实例化
logger := logrus.New()
//设置输出
logger.Out = os.Stdout
//设置日志级别
logger.SetLevel(logrus.DebugLevel)
//设置日志格式
//自定writer就行, hook 交给 lfshook
logger.AddHook(newLogrusHook(constant.GetLogPath(), module))
logger.SetFormatter(&logrus.JSONFormatter{
TimestampFormat:"2006-01-02 15:04:05",
})
return logger
}
//确保每次调用使用的文件都是唯一的。
func GetNewFieldLoggerContext(module,appField string) *logrus.Entry {
logger:= getLogger(module)
return logger.WithFields(logrus.Fields{
"app": appField,
})
}
//订阅 警告日志
func SubscribeLog(entry *logrus.Entry, subMap SubscribeMap) {
logger := entry.Logger
logger.AddHook(newSubScribeHook(subMap))
fmt.Println("日志订阅成功")
}
|
constant.GetLogPath() 可以替换为自己的日志文件输出目录地址,比如我的mac上则是:/usr/local/log ,直接替换即可。
日志切片hook
代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
|
// writer.go
package logger
import (
"fmt"
"github.com/pkg/errors"
"io"
"library/util"
"os"
"path/filepath"
"sync"
"time"
)
type LogWriter struct {
logDir string //日志根目录地址。
module string //模块 名
curFileName string //当前被指定的filename
curBaseFileName string //在使用中的file
turnCateDuration time.Duration
mutex sync.RWMutex
outFh *os.File
}
func (w *LogWriter) Write(p []byte) (n int, err error) {
w.mutex.Lock()
defer w.mutex.Unlock()
if out, err:= w.getWriter(); err!=nil {
return 0, errors.New("failed to fetch target io.Writer")
}else{
return out.Write(p)
}
}
func (w *LogWriter) getFileName() string {
base := time.Now().Truncate(w.turnCateDuration)
return fmt.Sprintf("%s/%s/%s_%s", w.logDir, base.Format("2006-01-02"), w.module, base.Format("15"))
}
func (w *LogWriter) getWriter()(io.Writer, error) {
fileName := w.curBaseFileName
//判断是否有新的文件名
//会出现新的文件名
baseFileName := w.getFileName()
if baseFileName != fileName {
fileName = baseFileName
}
dirname := filepath.Dir(fileName)
if err := os.MkdirAll(dirname, 0755); err != nil {
return nil, errors.Wrapf(err, "failed to create directory %s", dirname)
}
fileHandler, err := os.OpenFile(fileName, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644)
if err != nil {
return nil, errors.Errorf("failed to open file %s", err)
}
w.outFh.Close()
w.outFh = fileHandler
w.curBaseFileName = fileName
w.curFileName = fileName
return fileHandler, nil
}
func New(logPath, module string, duration time.Duration) *LogWriter {
return &LogWriter{
logDir: logPath,
module: module,
turnCateDuration:duration,
curFileName: "",
curBaseFileName: "",
}
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
// hook.go
package logger
import (
"github.com/rifflock/lfshook"
"github.com/sirupsen/logrus"
"time"
)
func newLogrusHook(logPath, moduel string) logrus.Hook {
logrus.SetLevel(logrus.WarnLevel)
writer := New(logPath, moduel, time.Hour * 2)
lfsHook := lfshook.NewHook(lfshook.WriterMap{
logrus.DebugLevel: writer,
logrus.InfoLevel: writer,
logrus.WarnLevel: writer,
logrus.ErrorLevel: writer,
logrus.FatalLevel: writer,
logrus.PanicLevel: writer,
}, &logrus.TextFormatter{DisableColors: true})
// writer 生成新的log文件类型 writer 在通过new hook函数 消费 fire 函数
// writer 是实现了writer 接口的库,在日志调用write是做预处理
return lfsHook
}
|
测试代码
1
2
3
4
|
func TestGetLogger(t *testing.T) {
lg := GetNewFieldLoggerContext("test","d")
lg.Logger.Info("????")
}
|
解析
logger实例持有了 自定义的 io.writer 结构体,在消费Fire函数时,会调用Write方法,此时通过Truncate时间切片函数逻辑判断需要写入的文件。或创建新的文件。
注: 文章提供的代码是按天切分文件夹的,文件夹内模块日志再按2小时切分。可自行替换成按模块切分。
邮件警报hook
代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
// subscribeHook.go
package logger
import (
"fmt"
"github.com/sirupsen/logrus"
"library/email"
"strings"
)
type SubscribeMap map[logrus.Level][]*email.Receiver
type SubscribeHook struct {
subMap SubscribeMap
}
//此处可以自实现hook 目前使用三方hook
func(h *SubscribeHook)Levels() []logrus.Level{
return logrus.AllLevels
}
func(h *SubscribeHook)Fire(entry *logrus.Entry) error{
for level, receivers := range h.subMap {
//命中 准备消费
if level == entry.Level {
if len(receivers) > 0 {
email.SendEmail(receivers, fmt.Sprintf("%s:[系统日志警报]", entry.Level.String()),
fmt.Sprintf("错误内容: %s",entry.Message))
}
}
}
return nil
}
func NewSubscribeMap(level logrus.Level, receiverStr string) SubscribeMap{
subMap := SubscribeMap{}
addressList := strings.Split(receiverStr,";")
var receivers []*email.Receiver
for _, address := range addressList {
receivers = append(receivers, &email.Receiver{Email: address})
}
subMap[level] = receivers
return subMap
}
func newSubScribeHook(subMap SubscribeMap) *SubscribeHook {
return &SubscribeHook{subMap}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
|
// email.go
package email
import (
"fmt"
"gopkg.in/gomail.v2"
"regexp"
"strconv"
)
type Sender struct {
User string
Password string
Host string
Port int
MailTo []string
Subject string
Content string
}
type Receiver struct {
Email string
}
func (r *Receiver) Check() bool {
pattern := `\w+([-+.]\w+)*@\w+([-.]\w+)*\.\w+([-.]\w+)*` //匹配电子邮箱
reg := regexp.MustCompile(pattern)
return reg.MatchString(r.Email)
}
func (s *Sender) clean (){
}
//检查 邮箱正确性
func (s *Sender)NewReceiver(email string) *Receiver {
rec := &Receiver{Email:email}
if rec.Check() {
m.MailTo = []string{email}
return rec
}else{
fmt.Printf("email check fail 【%s】\n", email)
return nil
}
}
func (s *Sender)NewReceivers(receivers []*Receiver) {
for _, rec := range receivers {
if rec.Check() {
m.MailTo = append(m.MailTo, rec.Email)
}else{
fmt.Printf("email check fail 【%s】\n", rec.Email)
}
}
}
// 163邮箱 password 为开启smtp后给的秘钥
var m = Sender{User:"6666666@163.com", Password:"666666666", Host: "smtp.163.com", Port: 465}
func SendEmail(receivers []*Receiver,subject, content string){
m.NewReceivers(receivers)
m.Subject = subject
m.Content = content
e := gomail.NewMessage()
e.SetHeader("From", e.FormatAddress(m.User, "hengsheng"))
e.SetHeader("To", m.MailTo...) //发送给多个用户
e.SetHeader("Subject", m.Subject) //设置邮件主题
e.SetBody("text/html", m.Content) //设置邮件正文
d := gomail.NewDialer(m.Host, m.Port, m.User, m.Password)
err := d.DialAndSend(e)
if err != nil {
fmt.Printf("error 邮件发送错误! %s \n", err.Error())
}
}
|
使用
同理在writer时 如果是错误日志则发送邮件。
1
2
3
4
5
|
o.logger = logger.GetNewFieldLoggerContext("test", "666")
if subscribeSocket {
logger.SubscribeLog(o.Logger, logger.NewSubscribeMap(logrus.ErrorLevel, "a@163.com;b@163.com"))
}
// o 为实际结构体实例
|
kafkahook
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
// kafka hook
package logger
import (
"github.com/sirupsen/logrus"
"library/kafka"
"library/util/constant"
)
type KafKaHook struct {
kafkaProducer *kafka.KafkaProducer
}
func(h *KafKaHook)Levels() []logrus.Level{
return logrus.AllLevels
}
func(h *KafKaHook)Fire(entry *logrus.Entry) error{
h.kafkaProducer.SendMsgSync(entry.Message)
return nil
}
func newKafkaHook() *KafKaHook{
producer := kafka.NewKafkaProducer(constant.KafkaLogElkTopic,true)
return &KafKaHook{kafkaProducer: producer}
}
|
使用时logger.AddHook(newKafkaHook()) 即可
kafka模块
生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
|
// kafkaProducer.go
package kafka
import (
"errors"
"fmt"
"github.com/Shopify/sarama"
"library/util/constant"
"log"
"time"
)
func GetKafkaAddress()[]string{
return "127.0.0.1:9092"
}
//同步消息模式
func SyncProducer(topic, message string) error {
config := sarama.NewConfig()
config.Producer.Return.Successes = true
config.Producer.Timeout = 5 * time.Second
p, err := sarama.NewSyncProducer(GetKafkaAddress(), config)
if err != nil {
return errors.New(fmt.Sprintf("sarama.NewSyncProducer err, message=%s \n", err))
}
defer p.Close()
msg := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.ByteEncoder(message),
}
part, offset, err := p.SendMessage(msg)
if err != nil {
return errors.New(fmt.Sprintf("send sdsds err=%s \n", err))
} else {
fmt.Printf("发送成功,partition=%d, offset=%d \n", part, offset)
return nil
}
}
//async 异步生产者
type KafkaProducer struct {
topic string
asyncProducer *sarama.AsyncProducer
syncProducer *sarama.SyncProducer
sync bool
}
func NewKafkaProducer(topic string, sync bool) *KafkaProducer {
k := &KafkaProducer{
topic: topic,
sync: sync,
}
if sync {
k.initSync()
}else{
k.initAsync()
}
return k
}
func (k *KafkaProducer) initAsync() bool {
if k.sync {
fmt.Printf("sync producer cant call async func !\n")
return false
}
config := sarama.NewConfig()
//等待服务器所有副本都保存成功后的响应
config.Producer.RequiredAcks = sarama.WaitForAll
//随机向partition发送消息
config.Producer.Partitioner = sarama.NewRandomPartitioner
//是否等待成功和失败后的响应,只有上面的RequireAcks设置不是NoReponse这里才有用.
config.Producer.Return.Successes = true
config.Producer.Return.Errors = true
//设置使用的kafka版本,如果低于V0_10_0_0版本,消息中的timestrap没有作用.需要消费和生产同时配置
//注意,版本设置不对的话,kafka会返回很奇怪的错误,并且无法成功发送消息
config.Version = sarama.V0_10_0_1
producer, e := sarama.NewAsyncProducer(GetKafkaAddress(), config)
if e != nil {
fmt.Println(e)
return false
}
k.asyncProducer = &producer
defer producer.AsyncClose()
pd := *k.asyncProducer
go func() {
for{
select {
case <-pd.Successes():
//fmt.Println("offset: ", suc.Offset, "timestamp: ", suc.Timestamp.String(), "partitions: ", suc.Partition)
case fail := <-pd.Errors():
fmt.Printf("err: %s \n", fail.Err.Error())
}
}
}()
return true
}
func (k *KafkaProducer) initSync() bool {
if !k.sync {
fmt.Println("async producer cant call sync func !")
return false
}
config := sarama.NewConfig()
config.Producer.Return.Successes = true
config.Producer.Timeout = 5 * time.Second
p, err := sarama.NewSyncProducer(GetKafkaAddress(), config)
k.syncProducer = &p
if err != nil {
log.Printf("sarama.NewSyncProducer err, message=%s \n", err)
return false
}
return true
}
func (k *KafkaProducer) SendMsgAsync(sendStr string) {
msg := &sarama.ProducerMessage{
Topic: k.topic,
}
//将字符串转化为字节数组
msg.Value = sarama.ByteEncoder(sendStr)
//fmt.Println(value)
//使用通道发送
pd := *k.asyncProducer
pd.Input() <- msg
}
func (k *KafkaProducer) SendMsgSync(sendStr string) bool {
msg := &sarama.ProducerMessage{
Topic: k.topic,
Value: sarama.ByteEncoder(sendStr),
}
pd := *k.syncProducer
part, offset, err := pd.SendMessage(msg)
if err != nil {
fmt.Printf("发送失败 send message(%s) err=%s \n", sendStr, err)
return false
} else {
fmt.Printf("发送成功 partition=%d, offset=%d \n", part, offset)
return true
}
}
|
调用 SendMsgSync 或 SendMsgAsync 生产消息,注意初始化时的参数要保证一致!
消费者组
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
|
// kafkaConsumerGroup.go
package kafka
import (
"context"
"fmt"
"github.com/Shopify/sarama"
"log"
"sync"
)
func NewKafkaConsumerGroup(topics []string, group string, businessCall func(message *sarama.ConsumerMessage) bool) *KafkaConsumerGroup {
k := &KafkaConsumerGroup{
brokers: GetKafkaAddress(),
topics: topics,
group: group,
channelBufferSize: 2,
ready: make(chan bool),
version: "1.1.1",
handler: businessCall,
}
k.Init()
return k
}
// 消费者组(consumer group): 相同的group.id的消费者将视为同一个消费者组,
// 每个消费者都需要设置一个组id, 每条消息只能被 consumer group 中的一个
// Consumer 消费,但可以被多个 consumer group 消费
type KafkaConsumerGroup struct {
//代理(broker): 一台kafka服务器称之为一个broker
brokers []string
//主题(topic): 消息的一种逻辑分组,用于对消息分门别类,每一类消息称之为一个主题,相同主题的消息放在一个队列中
topics []string
version string
ready chan bool
group string
channelBufferSize int
//业务调用
handler func(message *sarama.ConsumerMessage) bool
}
func (k *KafkaConsumerGroup)Init() func() {
version,err := sarama.ParseKafkaVersion(k.version)
if err!=nil{
fmt.Printf("Error parsing Kafka version: %v", err)
}
cfg := sarama.NewConfig()
cfg.Version = version
// 分区分配策略
cfg.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
// 未找到组消费位移的时候从哪边开始消费
cfg.Consumer.Offsets.Initial = -2
// channel长度
cfg.ChannelBufferSize = k.channelBufferSize
ctx, cancel := context.WithCancel(context.Background())
client, err := sarama.NewConsumerGroup(k.brokers, k.group, cfg)
if err != nil {
fmt.Printf("Error creating consumer group client: %v", err)
}
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer func() {
wg.Done()
//util.HandlePanic("client.Consume panic", log.StandardLogger())
}()
for {
if err := client.Consume(ctx, k.topics, k); err != nil {
log.Printf("Error from consumer: %v", err)
}
// check if context was cancelled, signaling that the consumer should stop
if ctx.Err() != nil {
log.Println(ctx.Err())
return
}
k.ready = make(chan bool)
}
}()
<-k.ready
fmt.Printf("Sarama consumer up and running!... \n")
// 保证在系统退出时,通道里面的消息被消费
return func() {
cancel()
wg.Wait()
if err = client.Close(); err != nil {
fmt.Printf("Error closing client: %v \n", err)
}
}
}
// Setup is run at the beginning of a new session, before ConsumeClaim
func (k *KafkaConsumerGroup) Setup(sarama.ConsumerGroupSession) error {
// Mark the consumer as ready
close(k.ready)
return nil
}
// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (k *KafkaConsumerGroup) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}
// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (k *KafkaConsumerGroup) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
// NOTE:
// Do not move the code below to a goroutine.
// The `ConsumeClaim` itself is called within a goroutine, see:
// https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29
// 具体消费消息
for message := range claim.Messages() {
//msg := string(message.Value)
//k.logger.Infof("卡夫卡: %s", msg)
if ok:= k.handler(message); ok {
// 更新位移
session.MarkMessage(message, "")
}
//run.Run(msg)
}
return nil
}
|
测试代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
func TestKafkaConsumerGroup_Init(t *testing.T) {
//pd := NewKafkaProducer("test-fail",true)
//pd.InitSync()
k := NewKafkaConsumerGroup([]string{constant.KafkaALiSdkTopic}, "group-2", func(message *sarama.ConsumerMessage) bool {
fmt.Println(string(message.Value))
//如果失败的处理逻辑
//if ok := pd.SendMsgSync("666666"); ok {
// return true
//}
return false
})
consumerDone := k.Init()
sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
select {
case <-sigterm:
fmt.Println("terminating: via signal")
}
consumerDone()
}
|
这里有一些补偿逻辑在里面。
以上就是logrus相关hook。
好了,这篇logrus hook输出日志到本地磁盘的操作就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持服务器之家。
原文链接:https://noshoes.blog.csdn.net/article/details/82909121