代码分享: 延迟时间指数递增的重试机制实现

116 天前
 Nazz

在工作中, 我们经常会碰到重试的需求, 很多人会用定时任务来实现. 如果有更高的要求, 可以尝试下"延迟时间指数递增"的策略, 相比定时任务, 更加节省 CPU 和数据库资源.

为简化逻辑, 代码中没有使用 RedisMySQL 维护失败次数.

Quick Start

package main

import (
	"errors"
	"fmt"
	"github.com/lxzan/memorycache"
	"math"
	"sync/atomic"
	"time"
)

const (
	MaxRetryTimes = 17              // 最大重试次数
	FirstDelay    = 5 * time.Second // 初始延迟
)

func NewRetryer() *Retryer {
	return &Retryer{
		mc: memorycache.New[string, string](
			memorycache.WithBucketNum(16),
			memorycache.WithBucketSize(16, math.MaxInt64),
			memorycache.WithInterval(time.Second, 5*time.Second),
		),
	}
}

type Retryer struct {
	mc *memorycache.MemoryCache[string, string]
}

func (c *Retryer) Add(jobId string, jobFunc func() error) {
	// 检查任务是否已存在
	if _, exist := c.mc.GetOrCreate(jobId, jobId, -1); exist {
		return
	}

	var callback = func(element *memorycache.Element[string, string], reason memorycache.Reason) {
		// 只处理过期触发的回调
		if reason != memorycache.ReasonExpired {
			return
		}

		// 回调函数必须是非阻塞的; 酌情使用任务队列, 控制最大并发.
		go func(id string) {
			if err := jobFunc(); err == nil {
				c.delete(id)
			}
		}(element.Value)
	}

	// 注册延迟及回调函数
	var failedTimes = 1
	var delay, delta = FirstDelay, FirstDelay
	for i := 1; i <= MaxRetryTimes; i++ {
		if i >= failedTimes {
			var key = fmt.Sprintf("%s-%d", jobId, i)
			c.mc.SetWithCallback(key, jobId, delay, callback)
		}
		delta *= 2
		delay += delta
	}
}

// 任务执行成功, 删除剩余的重试任务
func (c *Retryer) delete(jobId string) {
	for i := 1; i <= MaxRetryTimes; i++ {
		var key = fmt.Sprintf("%s-%d", jobId, i)
		c.mc.Delete(key)
	}
	c.mc.Delete(jobId)
}

// 模拟一个任务
func newJob() (jobId string, jobFunc func() error) {
	var counter = new(atomic.Int64)
	return "1", func() error {
		defer counter.Add(1)
		serial := counter.Load()
		if serial < 5 {
			fmt.Printf("serial=%d, t=%d, success=false\n", serial, time.Now().Unix())
			return errors.New("test")
		}
		fmt.Printf("serial=%d, t=%d, success=true\n", serial, time.Now().Unix())
		return nil
	}
}

func main() {
	retryer := NewRetryer()
	jobId, jobFunc := newJob()
	if err := jobFunc(); err != nil {
		retryer.Add(jobId, jobFunc)
	}
	select {}
}

Output

serial=0, t=1705196714, success=false
serial=1, t=1705196719, success=false
serial=2, t=1705196729, success=false
serial=3, t=1705196749, success=false
serial=4, t=1705196789, success=false
serial=5, t=1705196869, success=true
1418 次点击
所在节点    Go 编程语言
8 条回复
Guaderxx
116 天前
指数退避么
Nazz
116 天前
@Guaderxx 单调递增
diagnostics
116 天前
这有啥好分享的,又不是大学生
Opportunity
116 天前
现在连重试都需要上 redis 了吗。。
zhangzEric
116 天前
@Opportunity 本地重试不需要,分布式异步重试就得有地方存储了
Nazz
116 天前
@Opportunity 我就用 redis 保存下失败次数,不想用 mysql
shellcodecow
108 天前
设计上
建议独立一个服务做调度功能
比如延迟调度、循环调度、递增调度 通过 GRPC 进行 timeout 或者 callback
Nazz
108 天前
@shellcodecow 分布式是该这样

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/1008470

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX