1.为什么需要task进程
11)补充:task()
强调:使用 Task 功能,必须先设置 task_worker_num,并且必须设置 Server 的 onTask 和 onFinish 事件回调函数。
Swoole\Server->task(mixed$data,int$dstWorkerId=-1,callable$finishCallback):int
参数:$dstWorkerId 可以指定要给投递给哪个 Task 进程,传入 ID 即可 。可选。未指定目标 Task 进程,调用 task 方法会判断 Task 进程的忙闲状态,底层只会向处于空闲状态的 Task 进程投递任务。如果所有 Task 进程均处于忙的状态,底层会轮询投递任务到各个进程。可以使用 server->stats 方法获取当前正在排队的任务数量。
参数:$finishCallback 可选,finish 回调函数,如果任务设置了回调函数,Task 返回结果时会直接执行指定的回调函数,不再执行 Server 的 onFinish 回调,只有在 Worker 进程中投递任务才可触发
返回值
注意:Server->task/taskwait/finish 3 个方法当传入的 $data 数据超过 8K 时会启用临时文件来保存。当临时文件内容超过 server->package_max_length 时底层会抛出一个警告。此警告不影响数据的投递,过大的 Task 可能会存在性能问题。WARN: task package is too big.
12)小案例演示 --Task运用
测试服务端:tcpServer.php (class\10\tcpServer.php)
<?php
$server = new Swoole\Server('127.0.0.1', 9502);
$server->set(
[
'worker_num' => 4,
'task_worker_num' => 4,
]
);
$server->on('receive', function($server, $fd, $reactor_id, $data) {
$task_id = $server->task('Async');
echo "Dispatch AsyncTask: [id={$task_id}]\n";
});
$server->on('task', function ($server, $task_id, $reactor_id, $data) {
echo "New AsyncTask[id={$task_id}]\n";
$server->finish("{$data} -> OK");
});
$server->on('finish', function ($server, $task_id, $data) {
echo "AsyncTask[{$task_id}] finished: {$data}\n";
});
$server->start();
测试服务端:tcpClent.php (class\10\tcpClent.php)
<?php
$client = new Swoole\Client(SWOOLE_SOCK_TCP );
if (!$client->connect('127.0.0.1', 9502, -1)) {
exit("connect failed. Error: {$client->errCode}\n");
}
$client->send("hello world\n");
echo $client->recv();
$client->close();
运行
运行进程说明:一个主进程master(13046),一个管理进程manager(13047),4个worker进程,4个task进程,worker进程与task进程同一级,都是由管理进程manager创建的(manager的作用就是创建、回收管理进程的)
进程区分:哪个是worker哪个是task进程
通过workerstart事件(无论是worker还是task,启动时都会执行这个事件)结合 各自进程编号范围来大体判断
还可通过Swoole\Server->taskworker:bool 当前进程是否是 Task 进程。true 表示当前的进程是 Task 工作进程.false 表示当前的进程是 Worker 进程。
实现可在tcpServer.php文件中,添加onworkerstart的触发事件及回调方法实现,具体如下:
$server->on('workerstart', function ($server, int $workerId){
if($server->taskworker){
var_dump("task 进程编号(参数workerId形式):".$workerId.'==(server->worker_id属性形式)'.$server->worker_id);
var_dump("task 进程pid:".posix_getpid());
}else{
var_dump("worker 进程编号(参数workerId形式):".$workerId.'==(server->worker_id属性形式)'.$server->worker_id);
var_dump("worker 进程pid:".posix_getpid());
}
});
运行如下:
由运行可见,worker进程的编号范围0-3 task进程的编号范围4-7 ;$server->taskworker区分是进程类别task 或 worker .进程号的编号即进程的编号。具体可参见$worker_id;
13)阻塞方式--Task停用
在测试服务端:tcpServer.php (class\10\tcpServer.php)的基础上停用task进程,改为 :tcpServer2.php (class\10\tcpServer2.php),如下
<?php
$server = new Swoole\Server('0.0.0.0', 9502);
$server->set(
[
/* 'worker_num' => 4,
'task_worker_num' => 4,*/
]
);
$server->on('receive', function($server, $fd, $reactor_id, $data) {
//$task_id = $server->task('Async');
sleep(3);
echo "测试阻塞\n";
$server->send($fd,'ok');
});
/*$server->on('task', function ($server, $task_id, $reactor_id, $data) {
echo "New AsyncTask[id={$task_id}]\n";
$server->finish("{$data} -> OK");
});
$server->on('finish', function ($server, $task_id, $data) {
echo "AsyncTask[{$task_id}] finished: {$data}\n";
});*/
$server->on('workerstart', function ($server, int $workerId){
if($server->taskworker){
var_dump("task 进程编号(参数workerId形式):".$workerId.'==(server->worker_id属性形式)'.$server->worker_id);
var_dump("task 进程pid:".posix_getpid());
}else{
var_dump("worker 进程编号(参数workerId形式):".$workerId.'==(server->worker_id属性形式)'.$server->worker_id);
var_dump("worker 进程pid:".posix_getpid());
}
});
$server->start();
测试客户端不变,运行如下:
客户端等待3秒之后才返回,说明 阻塞了
14)启用task,将sleep(这里是模拟业务完成所需时间)放到ontask中,改动tcpServer2.php (class\10\tcpServer2.php)为tcpServer3.php (class\10\tcpServer3.php),具体如下:
<?php
$server = new Swoole\Server('0.0.0.0', 9502);
$server->set(
[
'worker_num' => 4,
'task_worker_num' => 4,
]
);
$server->on('receive', function($server, $fd, $reactor_id, $data) {
$task_id = $server->task('Async');
//sleep(3);
echo "使用task进程方式\n";
$server->send($fd,'ok');
});
$server->on('task', function ($server, $task_id, $reactor_id, $data) {
sleep(3);
echo "AsyncTask进程[id={$task_id}]:处理任务中\n";
$server->finish("{$data} -> OK");
});
$server->on('finish', function ($server, $task_id, $data) {
echo "AsyncTask[{$task_id}] 处理任务完成: {$data}\n";
});
$server->on('workerstart', function ($server, int $workerId){
if($server->taskworker){
var_dump("task 进程编号(参数workerId形式):".$workerId.'==(server->worker_id属性形式)'.$server->worker_id);
var_dump("task 进程pid:".posix_getpid());
}else{
var_dump("worker 进程编号(参数workerId形式):".$workerId.'==(server->worker_id属性形式)'.$server->worker_id);
var_dump("worker 进程pid:".posix_getpid());
}
});
$server->start();
测试客户端不变,运行如下:
客户端立刻返回,task进程仍在处理任务中(时间大约3秒,task进程是同步阻塞的,没有判断进程空间,这里都输出了),处理完成后返回通知给worker进程,这些说明task解决的阻塞。
上述执行参考运行图及 流程图见下
综上:task处理worker投递的耗时任务,让worker进程继续处理其它任务而不必等待结果,从而解决阻塞,这就是为什么需要task进程。
2.Task运用
结合上面的案例,可见Worker与task关系,如下
两个进程之间的关系及通讯方式
3.理解任务投递
更新中更新中更新中更新中更新中更新中更新中更新中更新中更新中更新中更新中更新中更新中更新中更新中更新中更新中更新中更新中
更新中更新中更新中更新中更新中更新中更新中更新中更新中更新中更新中更新中更新中更新中更新中更新中更新中更新中更新中更新中
更新中更新中更新中更新中更新中更新中更新中更新中更新中更新中更新中更新中更新中更新中更新中更新中更新中更新中更新中更新中
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!