page contents

Laravel 基于redis队列的解析

注意, 上述使用 Lua 脚本的目的在于操作的原子性, Redis 是单进程单线程模式, 以Lua脚本形式执行命令时可以确保执行脚本的原子性, 而不会有并发问题。

attachments-2020-09-evCJKJMA5f4f0f4ca399a.png

为什么使用队列

使用队列的目的一般是:

  1. 异步执行
  2. 出错重试

解释一下:

异步执行: 部分代码执行很耗时, 为了提高响应速度及避免占用过多连接资源, 可以将这部分代码放到队列中异步执行.

Eg. 网站新用户注册后, 需要发送欢迎的邮件, 涉及到网络IO无法控制耗时的这一类就很适合放到队列中来执行.

出错重试: 为了保证一些任务的正常执行, 可以将任务放到队列中执行, 若执行出错则可以延迟一段时间后重试, 直到任务处理成功或出错超过N次后取消执行.

Eg. 用户需要绑定手机号, 此时发送短信的接口是依赖第三方, 一个是不确定耗时, 一个是不确定调用的成功, 为了保证调用成功, 必然需要在出错后重试


Laravel 中的队列

以下分析默认使用的队列及其配置如下

  • 默认队列引擎: redis
通过在 redis-cli 中使用 monitor 命令查看具体执行的命令语句
  • 默认队列名: default


分发任务

此处以分发 异步通知(class XxxNotification implement ShouldQueue)为例.

在Laravel中发起异步通知时, Laravel 会往redis中的任务队列添加一条新任务

redis 执行语句

redis> RPUSH queues:default

{
    "displayName": "App\\Listeners\\RebateEventListener",
    "job": "Illuminate\\Queue\\CallQueuedHandler@call",
    "maxTries": null,
    "timeout": null,
    "timeoutAt": null,
    "data": {
        "commandName": "Illuminate\\Events\\CallQueuedListener",
        "command": "O:36:\"Illuminate\\Events\\CallQueuedListener\":7:{s:5:\"class\";s:33:\"App\\Listeners\\RebateEventListener\";s:6:\"method\";s:15:\"onRebateCreated\";s:4:\"data\";a:1:{i:0;O:29:\"App\\Events\\RebateCreatedEvent\":4:{s:11:\"\u0000*\u0000tbkOrder\";O:45:\"Illuminate\\Contracts\\Database\\ModelIdentifier\":3:{s:5:\"class\";s:19:\"App\\Models\\TbkOrder\";s:2:\"id\";i:416;s:10:\"connection\";s:5:\"mysql\";}s:15:\"\u0000*\u0000notifyAdmins\";b:1;s:13:\"\u0000*\u0000manualBind\";b:0;s:6:\"socket\";N;}}s:5:\"tries\";N;s:9:\"timeoutAt\";N;s:7:\"timeout\";N;s:6:\"\u0000*\u0000job\";N;}"
    },
    "id": "iTqpbeDqqFb3VoED2WP3pgmDbLAUQcMB",
    "attempts": 0
}

上面的redis语句是将任务信息(json格式) rpush 到 redis 队列 queues:default 的尾部.


任务队列 Worker

Laravel 处理任务队列的进程开启方式: php artisan queue:work, 为了更好的观察, 这里使用 --once 选项来指定队列中的单一任务进行处理, 具体的更多参数请自行参考文档

php artisan queue:work --once --delay=1 --tries=3

上述执行语句参数含义:

  1. --once 仅执行一次任务, 默认是常驻进程一直执行
  2. --tries=3 任务出错最多重试3次, 默认是无限制重试
  3. --delay=1 任务出错后, 每次延迟1秒后再次执行, 默认是延迟0秒

当 Worker 启动时, 它依次执行如下步骤:

此处仍以默认队列 default 为例讲解, 且只讲解redis的相关操作
  1. 从 queues:default:delayed 有序集合中获取可以处理的 "延迟任务", 并 rpush 到 queue:default队列的尾部
    具体的执行语句:
redis> eval "Lua脚本" 2 queues:default:delayed queues:default 当前时间戳

Lua 脚本内容如下:

-- Get all of the jobs with an expired \"score\"...localval = redis.call('zrangebyscore', KEYS[1],'-inf', ARGV[1])-- If we have values in the array, we will remove them from the first queue-- and add them onto thedestination queue in chunks of 100, which moves-- all of the appropriate jobs onto the destination queue very safely.if(next(val) ~=nil)thenredis.call('zremrangebyrank', KEYS[1],0, #val -1)fori =1, #val,100doredis.call('rpush', KEYS[2],unpack(val, i,math.min(i+99, #val)))endendreturnval

从 queue:default:reserved有序集合中获取已过期的 "reserved 任务", 并 rpush 到 queue:default队列的尾部

具体的执行语句:

redis> eval "Lua脚本" 2 queues:default:reserved queues:default 当前时间戳

使用的Lua脚本同步骤 1

 queue:default 队列中获取(lpop)一个任务, 增加其 attempts 次数, 并将该任务保存到 queu:default:reserved 有序集合中, 该任务的 score 值为 当前时间 + 90(任务执行超时时间)

具体的执行语句:

redis> eval “Lua脚本” 2 queues:default queues:default:reserved 任务超时时间戳

Lua脚本

- Pop the first job off of the queue... local job = redis.call('lpop', KEYS[1]) local reserved = false if(job ~= false) then -- Increment the attempt count and place job on the reserved queue... reserved = cjson.decode(job) reserved['attempts'] = reserved['attempts'] + 1 reserved = cjson.encode(reserved) redis.call('zadd', KEYS[2], ARGV[1], reserved) end return {job, reserved}
  1. 这里的 90 是根据配置而定: config('queue.connections.redis.retry_after')
    若预计任务耗时过久, 则应增加该数值, 防止任务还在执行时就被重置
  2. 在成功执行上面获取的任务后, 就将该任务从 queues:default:reserved 队列中移除掉
    具体执行语句: ZREM queues:default:reserved "具体任务"
  3. 如果执行任务失败, 此时分为2种情况:

任务失败次数未达到指定的重试次数阀值
将该任务从 queues:default:reserved 中移除, 并将该任务添加到 queue:default:delayed 有序集合中, score 为该任务下一次执行的时间戳
执行语句:

redis> EVAL "Lua脚本" 2 queues:default:delayed queues:default:reserved "失败的任务" 任务延迟执行的时间戳

Lua脚本

-- Remove the job from the current queue... redis.call('zrem', KEYS[2], ARGV[1]) -- Add the job onto the \"delayed\" queue... redis.call('zadd', KEYS[1], ARGV[2], ARGV[1]) return true

如果任务失败次数超过指定的重试阀值
将该任务从 queue:default:reserved 中移除
执行语句:

redis> ZREM queue:default:reserved

注意, 上述使用 Lua 脚本的目的在于操作的原子性, Redis 是单进程单线程模式, 以Lua脚本形式执行命令时可以确保执行脚本的原子性, 而不会有并发问题.


attachments-2020-09-uytu9zQ65f4f0ec0d6f83.jpg

  • 发表于 2020-09-02 11:19
  • 阅读 ( 827 )
  • 分类:框架系列

0 条评论

请先 登录 后评论
Pack
Pack

1135 篇文章

作家榜 »

  1. 轩辕小不懂 2403 文章
  2. 小柒 1658 文章
  3. Pack 1135 文章
  4. Nen 576 文章
  5. 王昭君 209 文章
  6. 文双 71 文章
  7. 小威 64 文章
  8. Cara 36 文章