接上
运行kill命令时,可以看到:
pcntl_wait($status)回收的结果是kill的子进程号,表示回收成功,即pcntl_wait()返回退出的子进程进程号,发生错误时返回-1。
而pcntl_wait的参数$status 是状态信息参数--- pcntl_wait()将会存储状态信息到status 参数上,这个通过status参数返回的状态信息可以用以下函数 pcntl_wifexited(), pcntl_wifstopped(), pcntl_wifsignaled(), pcntl_wexitstatus(), pcntl_wtermsig()以及 pcntl_wstopsig()获取其具体的值。
具体运行如下:
33) work结构
34)通信的情况
修改测试服务类:iostarjhy\test\multpBlocking\server.php 在onreceive方法中,获取当前子进程ip号,具体如下:
$server->on('receive',function(Worker $server,$client,$data){
dd($data,'处理客户端的数据');
dd(posix_getpid(),"获取当前进程的pid");
$server->send($client,'hello client');
});
运行发现, 获取当前的进程pid会变化,说明 子进程在“抢”
4.多进程Reactor模型
41) work进程结构及设置
42)多进程Reactor模型的实现
中间实现类: iostarjhy\src\MultpReactor\WorkerSwoole.php 同iostarjhy\src\Async\WorkerSwoole.php,这里复制,然后实现类于继承这个类
<?php
namespace IoStarJhy\MultpReactor;
use IoStarJhy\WorkerBase;
use Swoole\Event; //找不到这个类
class WorkerSwoole extends WorkerBase {
public function accept()
{
Event::add($this->server,$this->createConn());
}
public function createConn()
{
return function($socket){
//@抑制错误
$conn = @stream_socket_accept($this->server);
if (!empty($conn) && get_resource_type($conn) == "stream") {
// 触发建立连接事件
$this->events['connect']($this, $conn);
// 处理通信
Event::add($conn,$this->sendMessage());
}
};
}
public function sendMessage()
{
return function($conn){
$buffer = fread($conn, 65535);
if ('' === $buffer || false === $buffer) {
// 校验是否断开连接
$this->checkConn($buffer, $conn);
} else {
$this->events['receive']($this, $conn, $buffer);
}
};
}
public function checkConn($buffer, $conn)
{
//Event::del($conn); //1.可以直接在这里清除
if (strlen($buffer) === 0) {
if ((!get_resource_type($conn) == "Unknown")||(get_resource_type($conn) == "stream")) {
// 关闭连接
$this->close($conn);
}
$this->events['close']($this, $conn);
}
}
//2 也可以在workerbase添加方法delEvent空方法,并在close方法中$this->delEvent($client);同时本类中定义delEvent方法实现清除
public function delEvent($conn){
Event::del($conn);
}
}
实现类:iostarjhy\src\MultpReactor\Worker.php
<?php
namespace IoStarJhy\MultpReactor;
class Worker extends WorkerSwoole{
protected $config = [
'worker_num' => 4,
'context'=>[
'socket'=>[
//设置等待资源的个数
'backlog'=>'102400',
],
],
];
public function __construct($host,$port)
{
//这里是参考与workerman中的写法 stream_context_create创建上下文
$context = stream_context_create($this->config["context"]);
//设置端口号可以重复监听
\stream_context_set_option($context,'socket','so_reuseport',1);
dd("传递一个资源的文本 context");
//传递一个资源的文本 context
$this->server = stream_socket_server($this->flag.$host.":".$port,$errno,$errstr,STREAM_SERVER_BIND | STREAM_SERVER_LISTEN,$context);
}
public function fork()
{
$son = -2 ;
for($i=0;$i<$this->config['worker_num'];$i++){
if($son == -2 || $son > 0){
$son = pcntl_fork();
}
// 限定进程空间执行相关业务
if($son == 0){
//子进程空间
$this -> accept();
//子进程结束下面的循环
exit;
}
}
//父进程空间中 回收进程
for($i=0;$i<$this->config['worker_num'];$i++){
$sop = pcntl_wait($status);
}
}
public function start()
{
$this->fork();
}
}
测试服务端: iostarjhy\test\multpReactor\server.php
<?php
require_once __DIR__."/../../vendor/autoload.php";
use IoStarJhy\MultpReactor\Worker;
$server = new Worker('0.0.0.0',9500);
$server->on('connect',function($server,$client){
dd($client,"客户端建立连接成功");
});
$server->on('receive',function(Worker $server,$client,$data){
dd($data,'处理客户端的数据');
dd(posix_getpid(),"获取当前进程的pid");
$server->send($client,'hello client');
});
$server->on('close',function($server,$client){
dd($client,'断开连接');
});
$server->start();
测试客户端:iostarjhy\test\multpReactor\client.php
<?php
require_once __DIR__ . "/../../vendor/autoload.php";
//连接服务端
$fp = stream_socket_client("tcp://192.168.204.168:9500");
fwrite($fp, 'hello server');
dd(fread($fp, 65535));
fclose($fp);
运行
补充:
stream_context_create 创建资源流上下文
stream_context_set_option — 对资源流、数据包或者上下文设置参数
注意,两种用法
SWOOLE_BASE 这种模式就是传统的异步非阻塞 Server。与 Nginx 和 Node.js 等程序是完全一致的。
worker_num 参数对于 BASE 模式仍然有效,会启动多个 Worker 进程。
当有 TCP 连接请求进来的时候,所有的 Worker 进程去争抢这一个连接,并最终会有一个 worker 进程成功直接和客户端建立 TCP 连接,之后这个连接的所有数据收发直接和这个 worker 通讯,不经过主进程的 Reactor 线程转发。
如果客户端连接之间不需要交互,可以使用 BASE 模式。如 Memcache、HTTP 服务器等。
对应的结构已经实现了,见上42多进程Reactor模型的实现
SWOOLE_PROCESS 模式的 Server 所有客户端的 TCP 连接都是和主进程建立的,内部实现比较复杂,用了大量的进程间通信、进程管理机制。适合业务逻辑非常复杂的场景。Swoole提供了完善的进程管理、内存保护机制。 在业务逻辑非常复杂的情况下,也可以长期稳定运行。
Swoole 在 Reactor 线程中提供了 Buffer 的功能,可以应对大量慢速连接和逐字节的恶意客户端。
对应的模型结构
具体的业务实现结构,代码实现比较复杂,这里暂且略过
完
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!