goframe开发一个企业网站 redis队例13

时间:2024-11-11 11:23:23

如何在 GoFrame 项目中实现 Redis 队列功能:
配置代码:

# 消息队列配置
mq:
  # 消息队列类型: rocketmq 或 rabbitmq
  type: "rabbitmq"
  # 是否启用消息队列
  enabled: true


rocketmq:
  nameServer: "127.0.0.1:9876"
  producerGroup: "myProducerGroup"
  consumerGroup: "myConsumerGroup"
  brokerAddress: "127.0.0.1:10911"  # 添加 broker 地址

rabbitmq:
  url: "amqp://wanghaibin:wanghaibin@127.0.0.1:5672/"
  exchange: "gf_exchange"
  dlx_exchange: "gf_dlx_exchange"    # 新增:死信交换机
  queue: "gf_queue"
  delay_queue: "gf_delay_queue"      # 新增:延迟队列
  routingKey: "gf_key"
  vhost: "/"

# GoFrame 项目实现 Redis 队列功能

本文将介绍如何在 GoFrame 框架中实现 Redis 队列功能,采用三层架构:Controller、Service 和 Logic。

## 1. 定义接口层 (Service)

首先在 `internal/service` 目录下创建接口定义:

```go:internal/service/redis_queue.go
package service

import (
    "context"
)

type IRedisQueue interface {
    // 生产消息
    ProduceMessage(ctx context.Context, queue string, message string) error
    // 消费消息
    ConsumeMessage(ctx context.Context, queue string) (string, error)
    // 获取队列长度
    QueueLength(ctx context.Context, queue string) (int64, error)
}

var localRedisQueue IRedisQueue

func RedisQueue() IRedisQueue {
    if localRedisQueue == nil {
        panic("implement not found for interface IRedisQueue")
    }
    return localRedisQueue
}

func RegisterRedisQueue(i IRedisQueue) {
    localRedisQueue = i
}

2. 实现业务逻辑层 (Logic)

internal/logic 目录下实现具体业务逻辑:

package redis_queue

import (
    "context"
    "gf_new_web/internal/service"
    "github.com/gogf/gf/v2/frame/g"
)

type sRedisQueue struct{}

func init() {
    service.RegisterRedisQueue(New())
}

func New() *sRedisQueue {
    return &sRedisQueue{}
}

// ProduceMessage 生产消息
func (s *sRedisQueue) ProduceMessage(ctx context.Context, queue string, message string) error {
    _, err := g.Redis().RPush(ctx, queue, message)
    if err != nil {
        return err
    }
    return nil
}

// ConsumeMessage 消费消息
func (s *sRedisQueue) ConsumeMessage(ctx context.Context, queue string) (string, error) {
    message, err := g.Redis().LPop(ctx, queue)
    if err != nil {
        return "", err
    }
    return message.String(), nil
}

// QueueLength 获取队列长度
func (s *sRedisQueue) QueueLength(ctx context.Context, queue string) (int64, error) {
    length, err := g.Redis().LLen(ctx, queue)
    if err != nil {
        return 0, err
    }
    return length, nil
}

3. 实现控制器层 (Controller)

internal/controller 目录下实现 HTTP 接口:

package controller

import (
    "gf_new_web/internal/service"
    "github.com/gogf/gf/v2/frame/g"
    "github.com/gogf/gf/v2/net/ghttp"
)

type cRedisQueue struct{}

var RedisQueue = cRedisQueue{}

// ProduceMessage 生产消息接口
func (c *cRedisQueue) ProduceMessage(r *ghttp.Request) {
    var (
        queue   = r.Get("queue").String()
        message = r.Get("message").String()
    )

    err := service.RedisQueue().ProduceMessage(r.GetCtx(), queue, message)
    if err != nil {
        r.Response.WriteJson(g.Map{
            "code": -1,
            "msg":  err.Error(),
        })
        return
    }

    r.Response.WriteJson(g.Map{
        "code": 0,
        "msg":  "消息发送成功",
    })
}

// ConsumeMessage 消费消息接口
func (c *cRedisQueue) ConsumeMessage(r *ghttp.Request) {
    var (
        queue = r.Get("queue").String()
    )

    message, err := service.RedisQueue().ConsumeMessage(r.GetCtx(), queue)
    if err != nil {
        r.Response.WriteJson(g.Map{
            "code": -1,
            "msg":  err.Error(),
        })
        return
    }

    r.Response.WriteJson(g.Map{
        "code": 0,
        "msg":  "消息消费成功",
        "data": message,
    })
}

// QueueLength 获取队列长度接口
func (c *cRedisQueue) QueueLength(r *ghttp.Request) {
    var (
        queue = r.Get("queue").String()
    )

    length, err := service.RedisQueue().QueueLength(r.GetCtx(), queue)
    if err != nil {
        r.Response.WriteJson(g.Map{
            "code": -1,
            "msg":  err.Error(),
        })
        return
    }

    r.Response.WriteJson(g.Map{
        "code": 0,
        "data": length,
    })
}

4. 路由注册

在路由配置文件中添加相关路由:

func registerRouter(group *ghttp.RouterGroup) {
    group.Group("/queue", func(group *ghttp.RouterGroup) {
        group.POST("/produce", controller.RedisQueue.ProduceMessage)
        group.GET("/consume", controller.RedisQueue.ConsumeMessage)
        group.GET("/length", controller.RedisQueue.QueueLength)
    })
}

5. 使用说明

生产消息

curl -X POST "http://localhost:8000/queue/produce" \
     -d "queue=test_queue" \
     -d "message=hello world"

消费消息

curl "http://localhost:8000/queue/consume?queue=test_queue"

获取队列长度

curl "http://localhost:8000/queue/length?queue=test_queue"

6. 特点和优势

  1. 层次分明:严格遵循三层架构设计
  2. 接口解耦:通过接口定义实现业务解耦
  3. 易于扩展:可以方便地添加新的队列操作
  4. 统一错误处理:采用统一的错误返回格式
  5. 使用 GoFrame 的 Redis 客户端,保证稳定性

7. 注意事项

  1. 需要在配置文件中正确配置 Redis 连接信息
  2. 建议添加队列名称的验证
  3. 可以考虑添加消息的序列化和反序列化
  4. 在生产环境中应该添加适当的日志记录
  5. 可以考虑添加消息的过期策略