1.基于swoole的异步io模型
实现类:iostarjhy\src\Async\WorkerSwoole.php
<?php
namespace IoStarJhy\Async;
use IoStarJhy\WorkerBase;
use Swoole\Event; //找不到这个类
class WorkerSwoole extends WorkerBase {
public function accept()
{
Event::add($this->server,$this->createConn());
}
public function createConn()
{
return function($socket){
//@抑制错误
$conn = @stream_socket_accept($this->server);
if (!empty($conn) && get_resource_type($conn) == "stream") {
// 触发建立连接事件
$this->events['connect']($this, $conn);
// 处理通信
Event::add($conn,$this->sendMessage());
}
};
}
public function sendMessage()
{
return function($conn){
$buffer = fread($conn, 65535);
if ('' === $buffer || false === $buffer) {
// 校验是否断开连接
$this->checkConn($buffer, $conn);
} else {
$this->events['receive']($this, $conn, $buffer);
}
};
}
public function checkConn($buffer, $conn)
{
Event::del($conn); //1.可以直接在这里清除
if (strlen($buffer) === 0) {
if ((!get_resource_type($conn) == "Unknown")||(get_resource_type($conn) == "stream")) {
// 关闭连接
$this->close($conn);
}
$this->events['close']($this, $conn);
}
}
//2 也可以在workerbase添加方法delEvent空方法,并在close方法中$this->delEvent($client);同时本类中定义delEvent方法实现清除
/*public function delEvent($conn){
Event::del($conn);
}*/
}
说明:$conn = @stream_socket_accept($this->server);是抑制错误显示,只是不显示错误,处理警告类无关紧要的警告信息
测试服务端:iostarjhy\test\async\server.php 同前,只更改导入类名称
<?php
require_once __DIR__."/../../vendor/autoload.php";
//use IoStarJhy\Async\Worker;
use IoStarJhy\Async\WorkerSwoole as Worker;
$server = new Worker('0.0.0.0',9500);
$server->on('connect',function($server,$client){
dd($client,"客户端建立连接成功");
});
$server->on('receive',function(Worker $server,$client,$data){
dd($data,'处理客户端的数据');
//sleep(5);
$server->send($client,'hello client');
//$server->close($client);
});
$server->on('close',function($server,$client){
dd($client,'断开连接');
//$server->close($client);
});
$server->start();
服务客户端:iostarjhy\test\async\client.php 同前
<?php
require_once __DIR__ . "/../../vendor/autoload.php";
//连接服务端
$fp = stream_socket_client("tcp://192.168.204.168:9500");
fwrite($fp, 'hello server');
dd(fread($fp, 65535));
fclose($fp);
运行:
另外,也可以将iostarjhy\src\Async\WorkerSwoole.php 中的1可以注释掉,将2放开,这个方法在 iostarjhy\src\WorkerBase.php中定义为空方法,并在close中调用,具体如下:
基类:iostarjhy\src\WorkerBase.php
public function close($client)
{
$this->delEvent($client);
\fclose($client);
dd($client);
}
public function delEvent($client){
}
2.理解进程与pcntl
21)理解进程
进程查看命令:ps -aux 查看所有
ps -aux | grep nginx 查看指定
ps -aux | grep php 查看指定
pstree -nlap | grep nginx 树形查看指定 (父子关系的进程查看)
pstree -nlap | grep php 树形查看指定 (父子关系的进程查看)
一个模型要实现高效的话,当前的一个单进程无法满足业务的要求。以最简单的上面异步io模型为例,更改上面测试服务器端的iostarjhy\test\async\server.php的on方法中,加入sleep(5),如下:
...
$server->on('receive',function(Worker $server,$client,$data){
dd($data,'处理客户端的数据');
sleep(5);
$server->send($client,'hello client');
//$server->close($client);
});
...
运行
从上运行时间可看到,单进程服务处理多客户端请求时,虽然为异步,但仍阻塞,仍是以排队的方式处理。这就影响了执行效率及速度。而多进程服务思想,就是利用多个进程去分担多个客户端的请求,提高执行效率。
如下图示比较
22)进程的应用场景
23)进程的关系
24)php进程相关的函数及介绍
强调:pcntl_fork — 在当前进程当前位置产生分支(子进程)。译注:fork是创建了一个子进程,父进程和子进程 都从fork的位置开始向下继续执行,不同的是父进程执行过程中,得到的fork返回值为子进程 号,而子进程得到的是0。
成功时,在父进程执行线程内返回产生的子进程的PID,在子进程执行线程内返回0。失败时,在 父进程上下文返回-1,不会创建子进程,并且会引发一个PHP错误。
实例1:pcntl_fork自创建子进程位置开始,产生了当前进程空间及子进程空间,当前进程空间返回的$ pid为创建的子进程号,而子进程空间,由于没有 pcntl_fork,因而不存在子进程空间,见 下:
iostarjhy\test\pcntl\pcntl1.php
<?php
$pid = pcntl_fork();
echo $pid."\n";
//让程序始终运行而不结束
while(true){
}
运行:
实例2 pcntl_fork 结合 posix_getpid()、posix_getppid(),查看子进程,当前进程及父进程
iostarjhy\test\pcntl\pcntl2.php
<?php
$pid = pcntl_fork();
var_dump("当前进程:".posix_getpid());
echo "子进程:".$pid."\n";
var_dump("当前进程的父进程:".posix_getppid());
//让程序始终运行而不结束
while(true){
}
运行
说明:上半部分为父进程空间,下半部分为子进程空间,执行顺序是先父后子
实例3:pcntl_fork创建子进程,产生当前进程空间及子进程空间,并将当前进程空间pcntl_fork之前的变量,复制到子进程空间里,在没有明确判断当前空间及子空间范围内的代码,则当前空间及产生的子空间,都相应执行。
iostarjhy\test\pcntl\pcntl3.php
<?php
$user='jhy';
$pid = pcntl_fork();
var_dump("当前进程:".posix_getpid());
echo "子进程:".$pid."\n";
var_dump("当前进程的父进程:".posix_getppid());
echo $user."\n";
//让程序始终运行而不结束
while(true){
}
运行
示意图说明执行流程
其实质及流程参考如下
例4 pcntl_fork创建子进程,区分空间分别处理业务 --根据pcntl_fork产生的 $pid号来判断
iostarjhy\test\pcntl\pcntl4.php
<?php
$user='jhy';
//返回创建的子进程id 当前空间内返回的子进程空间$pid>0 子进程空间由于没有执行这个创建,所以$pid==0
$pid = pcntl_fork();
//当前空间及子空间中分别定义了$u=10 这块空间没有区分开
$u=10;
if( $pid == 0 ){
//子进程
var_dump("子进程空间");
//var_dump($u); //10
$u=9; //子空间中重新定义
var_dump($u);
var_dump($user);
}else if($pid > 0){
//父进程
var_dump("父进程空间");
var_dump($u);
var_dump($user);
}else{
//异常
}
while(true){
}
运行
注意:注释中的各项说明
25)注意,创建进程的个数及精确控制
251)创建进程的个数:iostarjhy\test\pcntl\pcntl5.php
//获取当前进程并输出
$current_id = posix_getpid();
var_dump('当前进程的进程'.$current_id);
var_dump('------------');
//不指定进程空间(即不区分进程空间的)的情况下,创建进程,各个存在的空间都创建
//1创建进程的个数 0,1 结果为2的2次方个进程 (包括最初的当前进程)
for( $i=0; $i<2; $i++){
sleep(5);
$p = pcntl_fork();
//到目前为止,有多少个进程,就有多少个进程空间,就有多少输出
var_dump('创建的子进程son1_'.$i.':'.$p);
}
//2 创建进程的个数 结果为2的3次方个进程(包括最初的当前进程)
/*for( $i=0; $i<3; $i++){
sleep(5);
$p = pcntl_fork();
//到目前为止,有多少个进程,就有多少个进程空间,就有多少输出
var_dump('创建的子进程son2_'.$i.':'.$p);
}*/
while(true){}
运行
切换注释2,运行
说明:1)不指定进程空间(即不区分进程空间的)的情况下,创建进程,各个存在的空间都创建。
2)创建进程的个数,在上述或类似的循环中,结果为2的n次方个进程 (注意,包括最初的当前进程,且是小于n的循环条件)
不区分空间而创建进程执行流程及本质,分析如下:
252)精确控制进程的个数:根据上面的分析,想要范围控制创建进程的个数,除了按上述的方法外,但精确控制进程的个数,思路就是明确并指定进程的空间,在这个空间内来创建进程。
方法1:iostarjhy\test\pcntl\pcntl6.php
<?php
//当前进程(父进程)的自定义标识 这时并为id号
$son = -2;
//创建子进程的个数
$work_num = 5;
for( $i=0; $i < $work_num; $i++){
//第一次光当前进程空间即父进程空间,创建子进程。之后,$sond > 0 (当前进程空间中创建的子进程id号),
if( $son == -2 || $son > 0){
//创建的子进程空间,由于创建的子进程空间,并没执行pcntl_fork(),所以$son=0;不会执行这个创建
$son = pcntl_fork();
}
}
while(true){
}
运行
方法2 iostarjhy\test\pcntl\pcntl7.php
<?php
//当前进程创建子进程 $son > 0
$son = pcntl_fork();
//创建子进程的个数 比如,一共要7个。目前已存在一个父空间及一个子空间,所以需要再创建5个
$work_num = 5 ;
for( $i=0; $i < $work_num; $i++){
//限制 $sond > 0 即为当前进程空间,而创建的子进程空间,$son==0 而不满足。
if( $son > 0){
//创建的子进程空间,由于创建的子进程空间,并没执行pcntl_fork(),所以$son=0;不会执行这个创建
$son = pcntl_fork();
}
}
while(true){
}
运行
说明:创建子进程的个数 比如,一共要7个。目前已存在一个父空间及一个子空间,所以需要再创建5个
$work_num = 5 ;
3.实现预派生子进程模型
31)预派生子进程模型结构及流程
32)实现预派生子进程模型
基类同上同前(略)
中间实现类:iostarjhy\src\MultpBlocking\WorkerB.php
<?php
namespace IoStarJhy\MultpBlocking;
use IoStarJhy\WorkerBase;
class WorkerB extends WorkerBase {
public function accept()
{
//建立并挂起连接, 可接受多次信息(而不是一次就关闭服务)
while(true){
//监听是否存在连接
$conn = stream_socket_accept($this->server);
if(!empty($conn)){
//触发建立连接事件
$this->events['connect']($this,$conn);
//接收服务的信息
$data = fread($conn,65535);
//处理信息
$this->events['receive']($this,$conn,$data);
//如下这样处理,有些时候,不能马上断开
//$this->events['close']($this,$conn);
//缺少心跳检测机制
dd($conn);
if(get_resource_type($conn) != "Unknown"){
$this->events['close']($this,$conn);
}
}
}
}
}
实现类:iostarjhy\src\MultpBlocking\Worker.php
<?php
namespace IoStarJhy\MultpBlocking;
use IoStarJhy\MultpBlocking\WorkerB;
class Worker extends WorkerB {
protected $config = [
'worker_num' => 4
];
public function fork()
{
$son = -2 ;
for($i=0;$i<$this->config['worker_num'];$i++){
if($son == -2 || $son > 0){
$son = pcntl_fork();
}
// 限定进程空间执行相关业务
if($son == 0){
//子进程空间
$this -> accept();
}
}
}
public function start()
{
$this->fork();
}
}
测试服务端:iostarjhy\test\multpBlocking\server.php
<?php
require_once __DIR__."/../../vendor/autoload.php";
use IoStarJhy\MultpBlocking\Worker;
//use IoStarJhy\MultpBlocking\WorkerE as Worker;
$server = new Worker('0.0.0.0',9500);
$server->on('connect',function($server,$client){
dd($client,"客户端建立连接成功");
});
$server->on('receive',function(Worker $server,$client,$data){
dd($data,'处理客户端的数据');
$server->send($client,'hello client');
});
$server->on('close',function($server,$client){
dd($client,'断开连接');
});
$server->start();
测试客户端:iostarjhy\test\multpBlocking\client.php
<?php
require_once __DIR__ . "/../../vendor/autoload.php";
//连接服务端
$fp = stream_socket_client("tcp://192.168.204.168:9500");
fwrite($fp, 'hello server');
dd(fread($fp, 65535));
fclose($fp);
运行
从上发现,进程数对应了,达到预期,功能基本实现,但server一开始就自动结束了,然后客户端修改请求时,自动建起连接。这种创建的父子进程(父进程一创建就结束了,只有子进程),实际上是布尔进程。一结束父进程就不存在了。这就需要在通过pcntl_wait($status)回收,修改如下:
实现类:iostarjhy\src\MultpBlocking\Worker改为: iostarjhy\src\MultpBlocking\WorkerE.php
<?php
namespace IoStarJhy\MultpBlocking;
use IoStarJhy\MultpBlocking\WorkerB;
class WorkerE extends WorkerB {
protected $config = [
'worker_num' => 4
];
public function fork()
{
$son = -2 ;
for($i=0;$i<$this->config['worker_num'];$i++){
if($son == -2 || $son > 0){
$son = pcntl_fork();
}
// 限定进程空间执行相关业务
if($son == 0){
//子进程空间
$this -> accept();
//子进程结束下面的循环
exit;
}
}
//父进程空间中 回收进程
for($i=0;$i<$this->config['worker_num'];$i++){
$sop = pcntl_wait($status);
dd($sop,'$sop');
dd($status,'$status');
}
}
public function start()
{
$this->fork();
}
}
测试服务类server.php,切换注释导入类
use IoStarJhy\MultpBlocking\WorkerE as Worker;
运行
server.php 的父进程成功挂起。
未完待续:swoole 第9次课 渐入佳境:实现多进程Reactor 2
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!