page contents

golang实现定时任务调度框架

在定时任务中,有这样一个场景,定时给某些用户发送消息,或者定时给某些数据进行对账,而这些场景有一个要求是,下次处理是上次处理的定时间隔(比如下次的1分钟)等,这样就会出现,每秒处理的数据和用户是不一样的。比如13:13:13 对数据A处理失败了(处理失败常见,非常规意义的失败),那下一分钟接着处理A(13:14:13),13:13:14 对数据B处理失败了那下一分钟接着处理B(13:14:14)。这样就会发现,每分钟的60秒都在处理一批批数据。类似于时间轮一样的轮循。
attachments-2021-09-s5AO1Q1361395fdf65c10.jpg
golang实现定时任务调度框架:
在定时任务中,有这样一个场景,定时给某些用户发送消息,或者定时给某些数据进行对账,而这些场景有一个要求是,下次处理是上次处理的定时间隔(比如下次的1分钟)等,这样就会出现,每秒处理的数据和用户是不一样的。比如13:13:13 对数据A处理失败了(处理失败常见,非常规意义的失败),那下一分钟接着处理A(13:14:13),13:13:14 对数据B处理失败了那下一分钟接着处理B(13:14:14)。这样就会发现,每分钟的60秒都在处理一批批数据。类似于时间轮一样的轮循。
当然,也可以使用消息队列的延时队列,但这种情况会造成,一个消息的堆积,而且无法处理这种清空,比如A,B的任务相差一分钟,那在A的2分钟后是发送A的第2次,而B是第1次,这两次是同时发送的,消息队列比较难处理这种情况,同时对于消息的标记也比较麻烦。

方案:可以使用go里的定时器,每秒触发一次任务,对任务进行轮循处理,简单的demo如下:(这里忽略了任务的进入,具体可以使用kafka结合到达的时间点来标记队列的轮片)
package main

import (
    "fmt"
    "os"
    "os/signal"
    "syscall"
    "time"
)

type Task struct {
    msg string   //任务的内容, 具体可以是一个复杂的结构体对象
    pri int   // 任务的优先级,在对同一个bucket的数据,可以按照优先级来处理
    idx int   //  bucket 的标识
    status bool // 任务标识,标识任务是否执行成功,是否需要删除
}

func (t *Task) runTask() {  //简单的执行任务
    fmt.Println("run message", t.msg)
    t.status = true
}

var taskList = map[int][]Task{}

func sendTask(idx int) {
    msg := fmt.Sprintf("task message %d", idx)
    pri := idx / 60
    idx = idx % 60

    task := Task{
        msg,
        pri,
        idx,
            false,
    }
    taskList[idx] = append(taskList[idx], task)
}

/**
 * 假设 i是任务的id号,表示有一个150个任务要进如队列审核
 */
func initTask() {
    for i := 0; i < 150; i++ {
        sendTask(i)
    }
}

var ticker = time.NewTicker(1 * time.Second)
var cc = 0 //轮片指针

func main() {
    c := make(chan os.Signal)
    status:=true
    signal.Notify(c,
        syscall.SIGKILL,
        syscall.SIGHUP,
        syscall.SIGINT,
        syscall.SIGQUIT,
        os.Interrupt,
        os.Kill,
    )
    initTask()
    go func() {
        for {
            select {
            case <-ticker.C:
                for _, t := range taskList[cc] {
                    if t.status == false {
                        t.runTask()
                    }
                }
                cc += 1
                cc = cc%60  //循环轮询
            case <-c: //监听 信号
                ticker.Stop()
                fmt.Println("kill task")
                status = false
                break

            }
        }
    }()
    for {// 常驻
        time.Sleep(1*time.Second)
        if status == false {
            break
        }
    }
}

更多相关技术内容咨询欢迎前往并持续关注六星社区了解详情。

程序员编程交流QQ群:805358732

如果你想用Python开辟副业赚钱,但不熟悉爬虫与反爬虫技术,没有接单途径,也缺乏兼职经验
关注下方微信公众号:Python编程学习圈,获取价值999元全套Python入门到进阶的学习资料以及教程,还有Python技术交流群一起交流学习哦。

attachments-2022-06-9Kd2MAog62afde6821d05.jpeg

  • 发表于 2021-09-09 09:14
  • 阅读 ( 1420 )
  • 分类:Golang

你可能感兴趣的文章

相关问题

0 条评论

请先 登录 后评论
轩辕小不懂
轩辕小不懂

2403 篇文章

作家榜 »

  1. 轩辕小不懂 2403 文章
  2. 小柒 1658 文章
  3. Pack 1135 文章
  4. Nen 576 文章
  5. 王昭君 209 文章
  6. 文双 71 文章
  7. 小威 64 文章
  8. Cara 36 文章