分布式任务 + 消息队列框架 go-queue

时间:2024-10-22 08:19:40
  1. 为什么写这个库
  2. 应用场景有哪些
  3. 如何使用
  4. 总结

为什么要写这个库?

在开始自研 go-queue 之前,针对以下我们调研目前的开源队列方案:

beanstalkd

beanstalkd 有一些特殊好用功能:支持任务priority、延时(delay)、超时重发(time-to-run)和预留(buried),能够很好的支持分布式的后台任务和定时任务处理。如下是 beanstalkd 基本部分:

  • job:任务单元;
  • tube:任务队列,存储统一类型 job。producer 和 consumer 操作对象;
  • producerjob 生产者,通过 put 将 job 加入一个 tube;
  • consumerjob 消费者,通过 reserve/release/bury/delete 来获取job或改变job的状态;

很幸运的是官方提供了 go client:/beanstalkd/go-beanstalk。

但是这对不熟悉 beanstalkd 操作的 go 开发者而言,需要学习成本。

kafka

类似基于 kafka 消息队列作为存储的方案,存储单元是消息,如果要实现延时执行,可以想到的方案是以延时执行的时间作为 topic,这样在大型的消息系统中,充斥大量一次性的 topicdq_1616324404788, dq_1616324417622),当时间分散,会容易造成磁盘随机写的情况。

而且在 go 生态中,

同时考虑以下因素:

  • 支持延时任务
  • 高可用,保证数据不丢失
  • 可扩展资源和性能

所以我们自己基于以上两个基础组件开发了 go-queue

  1. 基于 beanstalkd 开发了 dq,支持定时和延时操作。同时加入 redis 保证消费唯一性。
  2. 基于 kafka 开发了 kq,简化生产者和消费者的开发API,同时在写入kafka使用批量写,节省IO。

整体设计如下:

应用场景

首先在消费场景来说,一个是针对任务队列,一个是消息队列。而两者最大的区别:

  • 任务是没有顺序约束;消息需要;
  • 任务在加入中,或者是等待中,可能存在状态更新(或是取消);消息则是单一的存储即可;

所以在背后的基础设施选型上,也是基于这种消费场景。

  • dq:依赖于beanstalkd ,适合延时、定时任务执行;
  • kq:依赖于 kafka ,适用于异步、批量任务执行;

而从其中 dq 的 API 中也可以看出:

// 延迟任务执行
- (msg, delayTime);

// 定时任务执行
- (msg, atTime);
  • 1
  • 2
  • 3
  • 4

而在我们内部:

  • 如果是 异步消息消费/推送 ,则会选择使用 kq(msg)
  • 如果是 15分钟提醒/ 明天中午发送短信 等,则使用 dq

如何使用

分别介绍 dqkq 的使用方式:

dq

// [Producer]
producer := ([]{
    {
        Endpoint: "localhost:11300",
        Tube:     "tube",
    },
    {
        Endpoint: "localhost:11301",
        Tube:     "tube",
    },
})    

for i := 1000; i < 1005; i++ {
    _, err := ([]byte((i)), *5)
    if err != nil {
        (err)
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
// [Consumer]
consumer := ({
  Beanstalks: []{
    {
      Endpoint: "localhost:11300",
      Tube:     "tube",
    },
    {
      Endpoint: "localhost:11301",
      Tube:     "tube",
    },
  },
  Redis: {
    Host: "localhost:6379",
    Type: ,
  },
})
(func(body []byte) {
  // your consume logic
  (string(body))
})
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

和普通的 生产者-消费者 模型类似,开发者也只需要关注以下:

  1. 开发者只需要关注自己的任务类型「延时/定时」
  2. 消费端的消费逻辑

kq

// message structure
type message struct {
    Key     string `json:"key"`
    Value   string `json:"value"`
    Payload string `json:"message"`
}

pusher := ([]string{
    "127.0.0.1:19092",
    "127.0.0.1:19093",
    "127.0.0.1:19094",
}, "kq")

ticker := ()
for round := 0; round < 3; round++ {
    select {
    case <-:
        count := (100)
    // 准备消息
        m := message{
            Key:     (().UnixNano(), 10),
            Value:   ("%d,%d", round, count),
            Payload: ("%d,%d", round, count),
        }
        body, err := (m)
        if err != nil {
            (err)
        }

        (string(body))
    // push to kafka broker
        if err := (string(body)); err != nil {
            (err)
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

Name: kq
Brokers:
  - 127.0.0.1:19092
  - 127.0.0.1:19092
  - 127.0.0.1:19092
Group: adhoc
Topic: kq
Offset: first
Consumers: 1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

var c 
("", &c)

// WithHandle: 具体的处理msg的logic
// 这也是开发者需要根据自己的业务定制化
q := (c, (func(k, v string) error {
  ("=> %s\n", v)
  return nil
}))
defer ()
()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

dq 不同的是:开发者不需要关注任务类型(在这里也没有任务的概念,传递的都是 message data)。

其他操作和 dq 类似,只是将 业务处理函数 当成配置直接传入消费者中。

总结

在我们目前的场景中,kq 大量使用在我们的异步消息服务;而延时任务,我们除了 dq,还可以使用内存版的 TimingWheelgo-zero 生态组件」。

关于 go-queue 更多的设计和实现文章,可以持续关注我们。欢迎大家去关注和使用。

/tal-tech/go-queue

/tal-tech/go-zero

欢迎使用 go-zerostar 支持我们!

go-zero 系列文章见『微服务实践』公众号

相关文章