page contents

Golang框架--定时任务goCron

goCron是一个Golang作业调度工具,可以使用简单的语法定期执行go函数。 使用实例 package main import ("fmt""github.com/jasonlvhit/gocron") func task() {fmt.Println("I am runnning ta...

attachments-2021-05-Schh1bU460b44758ec7fd.png

goCron是一个Golang作业调度工具,可以使用简单的语法定期执行go函数。

使用实例

package main
 
import (
"fmt"
"github.com/jasonlvhit/gocron"
)
 
func task() {
fmt.Println("I am runnning task.")
}
 
func taskWithParams(a int, b string) {
fmt.Println(a, b)
}
 
func main() {
//可并发运行多个任务
//注意 interval>1时调用sAPi
gocron.Every(2).Seconds().Do(task)
gocron.Every(1).Second().Do(taskWithParams, 1, "hi")
//在cron所有操作最后调用 start函数,否则start之后调用的操作无效不执行
//<-gocron.Start()
 
//在task执行过程中 禁止异常退出
gocron.Every(1).Minute().DoSafely(taskWithParams, 1, "hello")
 
// 支持在具体某一天、某天的某一时刻、每y-M-d h-m-s 执行任务
gocron.Every(1).Monday().Do(task)
gocron.Every(1).Thursday().Do(task)
// function At() take a string like 'hour:min'
gocron.Every(1).Day().At("10:30").Do(task)
gocron.Every(1).Monday().At("18:30").Do(task)
 
// 删除某一任务
gocron.Remove(task)
 
//删除所有任务
gocron.Clear()
 
//可同时创建一个新的任务调度 2个schedulers 同时执行
s := gocron.NewScheduler()
s.Every(3).Seconds().Do(task)
<-s.Start()
 
//防止多个集群中任务同时执行 task 实现lock接口
//两行代码,对cron 设置lock实现,执行task时调用Lock方法再Do task
gocron.SetLocker(lockerImplementation)
gocron.Every(1).Hour().Lock().Do(task)
 
<-gocron.Start()
}

源码浅析

 轻量 简洁的链式调用,看工程源码,简洁的……只有一个类。gocron当前支持最多1w个任务数,核心对象就是维护了job对象,所有执行的任务都放在jobs数组中,start方法底层调用go time包下的NewTicker,新启一个线程执行task方法。

外部调用的gocron.start func调用链

// Start all the pending jobs
// Add seconds ticker
func (s *Scheduler) Start() chan bool {
stopped := make(chan bool, 1)
ticker := time.NewTicker(1 * time.Second)
 
go func() {
for {
select {
case <-ticker.C:
s.RunPending()    //调用RunPending 执行数组有序的任务队列
case <-stopped:
ticker.Stop()
return
}
}
}()
 
return stopped
}
 
// RunPending runs all the jobs that are scheduled to run.
func (s *Scheduler) RunPending() {
runnableJobs, n := s.getRunnableJobs()
 
if n != 0 {
for i := 0; i < n; i++ {
runnableJobs[i].run()
}
}
}
 
//run方法,反射获取job的各属性,最终调用function.call方法执行任务函数
//Run the job and immediately reschedule it
func (j *Job) run() (result []reflect.Value, err error) {
if j.lock {
if locker == nil {
err = fmt.Errorf("trying to lock %s with nil locker", j.jobFunc)
return
}
key := getFunctionKey(j.jobFunc)
 
if ok, err := locker.Lock(key); err != nil || !ok {
return nil, err
}
 
defer func() {
if e := locker.Unlock(key); e != nil {
err = e
}
}()
}
 
f := reflect.ValueOf(j.funcs[j.jobFunc])
params := j.fparams[j.jobFunc]
if len(params) != f.Type().NumIn() {
err = errors.New("the number of param is not adapted")
return
}
in := make([]reflect.Value, len(params))
for k, param := range params {
in[k] = reflect.ValueOf(param)
}
result = f.Call(in)
j.lastRun = time.Now()
j.scheduleNextRun()
return
}

这里需要关注一下,gocron对lock的实现,从代码上看Job结构体的lock属性,用于控制多实例job并发执行。但项目woner提到的 multiple instances 指的并不是跨服务器的多实例,而是在同一应用服务 里的多任务实例(也就是1个app服务中多个任务,粒度是只在统一应用内)。如果跨server则lock需要自行依赖redis或其他分布式锁来管理。通过读源码的run方法,j.lock来控制job并发,但一旦跨server job.lock属性是没法共享的。这里doc上给的解释有点歧义,需要注意。

Job结构体

// Job struct keeping information about job
type Job struct {
interval uint64                   // pause interval * unit bettween runs
jobFunc  string                   // the job jobFunc to run, func[jobFunc]
unit     string                   // time units, ,e.g. 'minutes', 'hours'...
atTime   time.Duration            // optional time at which this job runs
lastRun  time.Time                // datetime of last run
nextRun  time.Time                // datetime of next run
startDay time.Weekday             // Specific day of the week to start on
funcs    map[string]interface{}   // Map for the function task store
fparams  map[string][]interface{} // Map for function and  params of function
lock     bool                     // lock the job from running at same time form multiple instances
}

scheduler内维护JOBs array数组和大小,gocron scheduler最大可执行1w个job(大小可重写)。

// Scheduler struct, the only data member is the list of jobs.
// - implements the sort.Interface{} for sorting jobs, by the time nextRun
type Scheduler struct {
jobs [MAXJOBNUM]*Job // Array store jobs ,const MAXJOBNUM = 10000
size int             // Size of jobs which jobs holding.
}

对于执行时间的控制,均通过对job的unit属性进行设置,代码如下

// Seconds set the unit with seconds
func (j *Job) Seconds() *Job {
return j.setUnit("seconds")
}
 
// Minutes set the unit with minute
func (j *Job) Minutes() *Job {
return j.setUnit("minutes")
}
 
// Second set the unit with second
func (j *Job) Second() *Job {
j.mustInterval(1)
return j.Seconds()
}
 
// Minute set the unit  with minute, which interval is 1
func (j *Job) Minute() *Job {
j.mustInterval(1)
return j.Minutes()
}
 
// Sunday sets the job start day Sunday
func (j *Job) Sunday() *Job {
return j.Weekday(time.Sunday)
}
 
// Every schedule a new periodic job with interval
func (s *Scheduler) Every(interval uint64) *Job {
job := NewJob(interval)
s.jobs[s.size] = job
s.size++
return job
}

简单总结

gocron代码总共570行,在java中但凡涉及到一点“通用工具”或“架构”的实现,除了多依赖之外,撸代码是少不了的。但在go中要实现满足基本功能的cron任务调度,没有多余依赖,纯基于gosdk本身,575行代码打完收工。这是在不接触go语言之前设想不到的事情。轻量好用,为中间件而生。

编程语言跟人类沟通语言一样,属性都是工具,透过这个工具无论作为人或是工程师,给我们打开的是另一个世界和景象。在对比中扬长避短,可对比的资源越多,越是能找到最优方案。不怕不知道,就怕不知道。

更多相关技术内容咨询欢迎前往并持续关注六星社区了解详情。

程序员编程交流QQ群:805358732

如果你想用Python开辟副业赚钱,但不熟悉爬虫与反爬虫技术,没有接单途径,也缺乏兼职经验
关注下方微信公众号:Python编程学习圈,获取价值999元全套Python入门到进阶的学习资料以及教程,还有Python技术交流群一起交流学习哦。

attachments-2022-06-WwnoEUe262ac1e7367cc2.jpeg

  • 发表于 2021-05-31 10:18
  • 阅读 ( 1267 )
  • 分类:Golang

0 条评论

请先 登录 后评论
轩辕小不懂
轩辕小不懂

2403 篇文章

作家榜 »

  1. 轩辕小不懂 2403 文章
  2. 小柒 1658 文章
  3. Pack 1135 文章
  4. Nen 576 文章
  5. 王昭君 209 文章
  6. 文双 71 文章
  7. 小威 64 文章
  8. Cara 36 文章