文章目录
- 用goroutine来替代mq做异步的应用
- 心跳
- contenx的超时设置
- 定时器
- break label
- 核心代码
用goroutine来替代mq做异步的应用
方法在创建ai任务接口中用协程的方式异步调用go s.handleResultPolling(ctx, algorithm, taskId, iAiHandle)
,来更新ai任务的状态
心跳
心跳机制是一种用于检测连接或任务是否仍处于活动状态的方法。在分布式系统或网络通信中,心跳机制可以用来监视节点和服务的可用性、性能和故障。心跳通常是通过定期发送小的数据包或信号来实现的,然后接收方会对这些信号进行响应,以表示它们仍然在线并正常运行。
在这段代码中,心跳机制用于监控AI任务的进行情况。每隔2秒,会将当前时间戳写入Redis缓存,作为心跳信号。这样,外部监控系统可以检查Redis缓存中的心跳信号来判断任务是否仍在进行。如果在一定时间内没有收到新的心跳信号,那么可以认为任务已经中止或出现故障。
contenx的超时设置
context没有默认的超时时间,如果要设置超时时间的话,记得调用
ctxTimeout, timeoutCancel := context.WithTimeout(ctx, time.Hour)
defer timeoutCancel() // 操作完成时立即释放资源
定时器
在这段代码中,time.NewTicker(time.Millisecond)
创建的定时器最初每隔1毫秒触发一次,ticker.Reset(time.Second * 2)
的作用是重置定时器(ticker)的时间间隔为2s,这样能每2s从通道中取到信息case <-ticker.C:
。
break label
breakFor是一个标签,它用于在嵌套循环或选择语句中提供更精确的控制。在这个例子中,
breakFor标签用于跳出外层的
for循环。 在Go语言中,
break语句默认只跳出当前层次的循环或选择语句。在这个例子中,当
ctxTimeout.Done()通道接收到一个值或者满足其他条件时,我们希望跳出整个
for循环,而不仅仅是
select语句。通过在
for循环前添加
breakFor标签,并在
break`语句中指定该标签,我们可以实现这个功能。
核心代码
// handleCreateAsync 启动协程处理结果
func (s *aiHandle) handleCreateAsync(ctx context.Context, algorithm string, taskId string, data interface{}, iAiHandle IAiHandle) {
// 上报神策
properties := map[string]interface{}{
"uid": util.GetCtx(ctx).User.WsId,
"alg": algorithm,
}
sensorsdata.Track(gtrace.GetTraceID(ctx), "SeAlgCreate", properties, util.GetCtx(ctx).User.WsId != "", "")
// 创建任务
s.createDbSave(ctx, taskId, algorithm, data)
// 任务数量+1
s.handleCreateActive(ctx, algorithm, true)
// 异步处理结果
go s.handleResultPolling(ctx, algorithm, taskId, iAiHandle)
}
// handleCreateAsync 启动协程处理结果
func (s *aiHandle) handleCreateAsync(ctx context.Context, algorithm string, taskId string, data interface{}, iAiHandle IAiHandle) {
// 上报神策
properties := map[string]interface{}{
"uid": util.GetCtx(ctx).User.WsId,
"alg": algorithm,
}
sensorsdata.Track(gtrace.GetTraceID(ctx), "SeAlgCreate", properties, util.GetCtx(ctx).User.WsId != "", "")
// 创建任务
s.createDbSave(ctx, taskId, algorithm, data)
// 任务数量+1
s.handleCreateActive(ctx, algorithm, true)
// 异步处理结果
go s.handleResultPolling(ctx, algorithm, taskId, iAiHandle)
}
// handleResultPolling 轮询结果
func (s *aiHandle) handleResultPolling(ctx context.Context, algorithm, taskId string, iAiHandle IAiHandle) {
// 60分钟超时
// 1. 首先,设置一个60分钟的超时上下文,确保处理不会无限期地进行下去。
ctxTimeout, timeoutCancel := context.WithTimeout(ctx, time.Hour)
defer timeoutCancel() // 超时前调用这个,用来释放资源;不调用这个方法的话,会在超时时间释放资源
// 2. 创建一个心跳goroutine,每隔2秒更新一次心跳缓存。这样可以在外部监控任务的进行情况。
// 启动心跳,2秒一次
go func(ctx context.Context, taskId string) {
ticker := time.NewTicker(time.Millisecond)
defer ticker.Stop()
heartbeatKey := aiCache.GetAiTaskHeartBeat(taskId)
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
ticker.Reset(time.Second * 2)
if cache.RedisExists(ctx, heartbeatKey) == false {
_ = cache.RedisSet(ctx, heartbeatKey, time.Now().Unix(), time.Hour*24)
}
_ = cache.RedisIncBy(ctx, heartbeatKey, 2)
}
}
}(ctxTimeout, taskId)
// 3. 初始化任务结果的状态和数据变量。
// 任务结果
taskEnd := false
taskStatus := consts.TaskStatusProcessing
taskMsg := ""
var taskData any
// 4. 设置一个定时器,每隔3秒执行一次轮询查询结果。
// 每3秒重试一次
step := time.NewTimer(time.Millisecond)
defer step.Stop()
// 查询失败时,重试5次
retry := 1
//5. 使用`breakFor`标签和`select`语句进行轮询操作,当超时或查询到结果时跳出循环。
//- 调用`iAiHandle.ResultHandle`方法查询任务结果。
//- 如果查询失败,重试5次。
//- 如果任务状态不是处理中或等待中,则更新任务状态和数据,设置`taskEnd`为`true`,跳出循环。
// 轮询结果
breakFor:
for {
select {
case <-ctxTimeout.Done():
break breakFor
case <-step.C:
step.Reset(time.Second * 3)
// 查结果
taskResult, err := iAiHandle.ResultHandle(ctx, &v1.ResultReq{
Algorithm: algorithm,
TaskId: taskId,
})
taskData = taskResult
if err != nil {
if retry > 5 {
taskStatus = consts.TaskStatusServerFail
taskMsg = "handle result err retry than 5 times"
break breakFor
}
retry++
} else {
// 状态不一致则返回
if taskResult != nil && taskResult.Status != consts.TaskStatusProcessing && taskResult.Status != consts.TaskStatusWaiting {
taskStatus = taskResult.Status
taskMsg = taskResult.Reason
taskEnd = true
break breakFor
}
}
}
}
// 程序处理超时
if taskEnd == false {
taskStatus = consts.TaskStatusServerOutTime
taskMsg = "handle result than 60 minutes"
}
// 7. 更新任务计数器。
// 任务数量-1
s.handleCreateActive(ctx, algorithm, false)
// 8. 保存任务结果到数据库。
// 更数任务
s.resultDbSave(ctx, taskId, taskStatus, taskMsg, taskData)
}