page contents

swoole 第10次课 峰回路转:详解task进程 1

task进程

1.为什么需要task进程

11)补充:task()

强调:使用 Task 功能,必须先设置 task_worker_num,并且必须设置 Server 的 onTask 和 onFinish 事件回调函数。

        Swoole\Server->task(mixed$data,int$dstWorkerId=-1,callable$finishCallback):int

参数:$dstWorkerId 可以指定要给投递给哪个 Task 进程,传入 ID 即可 。可选。未指定目标 Task 进程,调用 task 方法会判断 Task 进程的忙闲状态,底层只会向处于空闲状态的 Task 进程投递任务。如果所有 Task 进程均处于忙的状态,底层会轮询投递任务到各个进程。可以使用 server->stats 方法获取当前正在排队的任务数量。

       参数:$finishCallback 可选,finish 回调函数,如果任务设置了回调函数,Task 返回结果时会直接执行指定的回调函数,不再执行 Server 的 onFinish 回调,只有在 Worker 进程中投递任务才可触发

        返回值

  •      调用成功,返回值为整数 $task_id,表示此任务的 ID。如果有 finish 回应,onFinish 回调中会携带 $task_id 参数
  •      调用失败,返回值为 false,$task_id 可能为 0,因此必须使用 === 判断是否失败

       

      注意:Server->task/taskwait/finish 3 个方法当传入的 $data 数据超过 8K 时会启用临时文件来保存。当临时文件内容超过 server->package_max_length 时底层会抛出一个警告。此警告不影响数据的投递,过大的 Task 可能会存在性能问题。WARN: task package is too big.

  •       补充:task_worker_num  配置此参数后将会启用 task 功能。
  •      

12)小案例演示 --Task运用

测试服务端:tcpServer.php (class\10\tcpServer.php)

<?php

$server = new Swoole\Server('127.0.0.1', 9502);

$server->set(
[
'worker_num' => 4,
'task_worker_num' => 4,
]
);

$server->on('receive', function($server, $fd, $reactor_id, $data) {
$task_id = $server->task('Async');
echo "Dispatch AsyncTask: [id={$task_id}]\n";
});

$server->on('task', function ($server, $task_id, $reactor_id, $data) {
echo "New AsyncTask[id={$task_id}]\n";
$server->finish("{$data} -> OK");
});

$server->on('finish', function ($server, $task_id, $data) {
echo "AsyncTask[{$task_id}] finished: {$data}\n";
});

$server->start();

测试服务端:tcpClent.php (class\10\tcpClent.php)

<?php
$client = new Swoole\Client(SWOOLE_SOCK_TCP );

if (!$client->connect('127.0.0.1', 9502, -1)) {
exit("connect failed. Error: {$client->errCode}\n");
}

$client->send("hello world\n");

echo $client->recv();

$client->close();


运行

attachments-2021-02-TtSJt3zn6019eca0a4ce6.jpg

运行进程说明:一个主进程master(13046),一个管理进程manager(13047),4个worker进程,4个task进程,worker进程与task进程同一级,都是由管理进程manager创建的(manager的作用就是创建、回收管理进程的)

进程区分:哪个是worker哪个是task进程

通过workerstart事件(无论是worker还是task,启动时都会执行这个事件)结合 各自进程编号范围来大体判断

  • Worker 进程编号范围是 [0, $serv->setting['worker_num']-1]
  • Task 进程编号范围是 [$serv->setting['worker_num'], $serv->setting['worker_num'] + $serv->setting['task_worker_num']]
  • 这个编号通过workerstart的参数workerId,或者$server->worker_id属性获取,两者是一样的。

还可通过Swoole\Server->taskworker:bool 当前进程是否是 Task 进程。true 表示当前的进程是 Task 工作进程.false 表示当前的进程是 Worker 进程。

实现可在tcpServer.php文件中,添加onworkerstart的触发事件及回调方法实现,具体如下:

$server->on('workerstart', function ($server, int $workerId){
if($server->taskworker){
var_dump("task 进程编号(参数workerId形式):".$workerId.'==(server->worker_id属性形式)'.$server->worker_id);
var_dump("task 进程pid:".posix_getpid());
}else{
var_dump("worker 进程编号(参数workerId形式):".$workerId.'==(server->worker_id属性形式)'.$server->worker_id);
var_dump("worker 进程pid:".posix_getpid());
}
});

运行如下:

attachments-2021-02-2hbSIEqH601b3d0a714bd.png

由运行可见,worker进程的编号范围0-3  task进程的编号范围4-7 ;$server->taskworker区分是进程类别task 或 worker .进程号的编号即进程的编号。具体可参见$worker_id;

13)阻塞方式--Task停用

在测试服务端:tcpServer.php (class\10\tcpServer.php)的基础上停用task进程,改为 :tcpServer2.php (class\10\tcpServer2.php),如下

<?php

$server = new Swoole\Server('0.0.0.0', 9502);

$server->set(
[
/* 'worker_num' => 4,
'task_worker_num' => 4,*/
]
);

$server->on('receive', function($server, $fd, $reactor_id, $data) {
//$task_id = $server->task('Async');
sleep(3);
echo "测试阻塞\n";
$server->send($fd,'ok');
});

/*$server->on('task', function ($server, $task_id, $reactor_id, $data) {
echo "New AsyncTask[id={$task_id}]\n";
$server->finish("{$data} -> OK");
});

$server->on('finish', function ($server, $task_id, $data) {
echo "AsyncTask[{$task_id}] finished: {$data}\n";
});*/

$server->on('workerstart', function ($server, int $workerId){
if($server->taskworker){
var_dump("task 进程编号(参数workerId形式):".$workerId.'==(server->worker_id属性形式)'.$server->worker_id);
var_dump("task 进程pid:".posix_getpid());
}else{
var_dump("worker 进程编号(参数workerId形式):".$workerId.'==(server->worker_id属性形式)'.$server->worker_id);
var_dump("worker 进程pid:".posix_getpid());
}
});

$server->start();

测试客户端不变,运行如下:

attachments-2021-02-LXGkO7CY601e4ce6cdd7c.jpg

客户端等待3秒之后才返回,说明 阻塞了

14)启用task,将sleep(这里是模拟业务完成所需时间)放到ontask中,改动tcpServer2.php (class\10\tcpServer2.php)为tcpServer3.php (class\10\tcpServer3.php),具体如下:

<?php

$server = new Swoole\Server('0.0.0.0', 9502);

$server->set(
[
'worker_num' => 4,
'task_worker_num' => 4,
]
);

$server->on('receive', function($server, $fd, $reactor_id, $data) {
$task_id = $server->task('Async');
//sleep(3);
echo "使用task进程方式\n";
$server->send($fd,'ok');
});

$server->on('task', function ($server, $task_id, $reactor_id, $data) {
sleep(3);
echo "AsyncTask进程[id={$task_id}]:处理任务中\n";
$server->finish("{$data} -> OK");
});

$server->on('finish', function ($server, $task_id, $data) {
echo "AsyncTask[{$task_id}] 处理任务完成: {$data}\n";
});

$server->on('workerstart', function ($server, int $workerId){
if($server->taskworker){
var_dump("task 进程编号(参数workerId形式):".$workerId.'==(server->worker_id属性形式)'.$server->worker_id);
var_dump("task 进程pid:".posix_getpid());
}else{
var_dump("worker 进程编号(参数workerId形式):".$workerId.'==(server->worker_id属性形式)'.$server->worker_id);
var_dump("worker 进程pid:".posix_getpid());
}
});

$server->start();

测试客户端不变,运行如下:

attachments-2021-02-UncTcBCj601e53764b055.png

客户端立刻返回,task进程仍在处理任务中(时间大约3秒,task进程是同步阻塞的,没有判断进程空间,这里都输出了),处理完成后返回通知给worker进程,这些说明task解决的阻塞。

上述执行参考运行图及 流程图见下

attachments-2021-02-oxhRb9616021e964ad612.jpg

attachments-2021-02-hqMag4MR6021eb2ce757b.png

综上:task处理worker投递的耗时任务,让worker进程继续处理其它任务而不必等待结果,从而解决阻塞,这就是为什么需要task进程。

2.Task运用

结合上面的案例,可见Worker与task关系,如下

attachments-2021-02-x0wtYAvx6032edc06f552.png

两个进程之间的关系及通讯方式


attachments-2021-02-hCPdAIyB6032ed4f8c053.png

3.理解任务投递




更新中更新中更新中更新中更新中更新中更新中更新中更新中更新中更新中更新中更新中更新中更新中更新中更新中更新中更新中更新中

更新中更新中更新中更新中更新中更新中更新中更新中更新中更新中更新中更新中更新中更新中更新中更新中更新中更新中更新中更新中

更新中更新中更新中更新中更新中更新中更新中更新中更新中更新中更新中更新中更新中更新中更新中更新中更新中更新中更新中更新中

  • 发表于 2021-02-03 07:01
  • 阅读 ( 1161 )
  • 分类:swoole

你可能感兴趣的文章

相关问题

0 条评论

请先 登录 后评论
吉洪叶
吉洪叶

21 篇文章

作家榜 »

  1. 轩辕小不懂 2403 文章
  2. 小柒 1470 文章
  3. Pack 1135 文章
  4. Nen 576 文章
  5. 王昭君 209 文章
  6. 文双 71 文章
  7. 小威 64 文章
  8. Cara 36 文章