page contents

swoole 第9次课 渐入佳境:实现多进程Reactor 2

多进程 Reactor

接上

运行kill命令时,可以看到:

pcntl_wait($status)回收的结果是kill的子进程号,表示回收成功,即pcntl_wait()返回退出的子进程进程号,发生错误时返回-1。

而pcntl_wait的参数$status  是状态信息参数--- pcntl_wait()将会存储状态信息到status 参数上,这个通过status参数返回的状态信息可以用以下函数 pcntl_wifexited()pcntl_wifstopped()pcntl_wifsignaled()pcntl_wexitstatus()pcntl_wtermsig()以及 pcntl_wstopsig()获取其具体的值。

具体运行如下:

attachments-2021-01-M9VQduFh6014a30fa460a.png


33) work结构

attachments-2021-01-dCjVNpdT6015da9198861.jpg

34)通信的情况

attachments-2021-01-8pAEalkl6015da9bbea16.png修改测试服务类:iostarjhy\test\multpBlocking\server.php 在onreceive方法中,获取当前子进程ip号,具体如下:

$server->on('receive',function(Worker $server,$client,$data){
dd($data,'处理客户端的数据');
dd(posix_getpid(),"获取当前进程的pid");
$server->send($client,'hello client');
});

运行发现, 获取当前的进程pid会变化,说明 子进程在“抢”

4.多进程Reactor模型

41) work进程结构及设置

attachments-2021-01-KxfIlHDi6015f65bb4c00.png

attachments-2021-01-d5fePCOV6015f66689fbb.png

42)多进程Reactor模型的实现

中间实现类: iostarjhy\src\MultpReactor\WorkerSwoole.php   同iostarjhy\src\Async\WorkerSwoole.php,这里复制,然后实现类于继承这个类

<?php
namespace IoStarJhy\MultpReactor;

use IoStarJhy\WorkerBase;

use Swoole\Event; //找不到这个类

class WorkerSwoole extends WorkerBase {


public function accept()
{
Event::add($this->server,$this->createConn());

}

public function createConn()
{
return function($socket){
//@抑制错误
$conn = @stream_socket_accept($this->server);
if (!empty($conn) && get_resource_type($conn) == "stream") {
// 触发建立连接事件
$this->events['connect']($this, $conn);
// 处理通信
Event::add($conn,$this->sendMessage());
}
};
}

public function sendMessage()
{
return function($conn){

$buffer = fread($conn, 65535);
if ('' === $buffer || false === $buffer) {
// 校验是否断开连接
$this->checkConn($buffer, $conn);
} else {
$this->events['receive']($this, $conn, $buffer);
}
};
}

public function checkConn($buffer, $conn)
{
//Event::del($conn); //1.可以直接在这里清除
if (strlen($buffer) === 0) {
if ((!get_resource_type($conn) == "Unknown")||(get_resource_type($conn) == "stream")) {
// 关闭连接
$this->close($conn);
}
$this->events['close']($this, $conn);
}
}
//2 也可以在workerbase添加方法delEvent空方法,并在close方法中$this->delEvent($client);同时本类中定义delEvent方法实现清除
public function delEvent($conn){
Event::del($conn);
}
}

实现类:iostarjhy\src\MultpReactor\Worker.php

<?php
namespace IoStarJhy\MultpReactor;

class Worker extends WorkerSwoole{
protected $config = [
'worker_num' => 4,
'context'=>[
'socket'=>[
//设置等待资源的个数
'backlog'=>'102400',
],
],
];
public function __construct($host,$port)
{
//这里是参考与workerman中的写法 stream_context_create创建上下文
$context = stream_context_create($this->config["context"]);
//设置端口号可以重复监听
\stream_context_set_option($context,'socket','so_reuseport',1);
dd("传递一个资源的文本 context");
//传递一个资源的文本 context
$this->server = stream_socket_server($this->flag.$host.":".$port,$errno,$errstr,STREAM_SERVER_BIND | STREAM_SERVER_LISTEN,$context);

}
public function fork()
{
$son = -2 ;
for($i=0;$i<$this->config['worker_num'];$i++){
if($son == -2 || $son > 0){
$son = pcntl_fork();
}
// 限定进程空间执行相关业务
if($son == 0){
//子进程空间
$this -> accept();
//子进程结束下面的循环
exit;
}
}
//父进程空间中 回收进程
for($i=0;$i<$this->config['worker_num'];$i++){
$sop = pcntl_wait($status);
}
}

public function start()
{
$this->fork();
}
}

测试服务端: iostarjhy\test\multpReactor\server.php

<?php
require_once __DIR__."/../../vendor/autoload.php";

use IoStarJhy\MultpReactor\Worker;

$server = new Worker('0.0.0.0',9500);

$server->on('connect',function($server,$client){
dd($client,"客户端建立连接成功");
});
$server->on('receive',function(Worker $server,$client,$data){
dd($data,'处理客户端的数据');
dd(posix_getpid(),"获取当前进程的pid");
$server->send($client,'hello client');
});
$server->on('close',function($server,$client){
dd($client,'断开连接');
});

$server->start();

测试客户端:iostarjhy\test\multpReactor\client.php

<?php
require_once __DIR__ . "/../../vendor/autoload.php";

//连接服务端
$fp = stream_socket_client("tcp://192.168.204.168:9500");
fwrite($fp, 'hello server');
dd(fread($fp, 65535));
fclose($fp);

运行

attachments-2021-01-u0QSTXa46015f8571632a.jpg


补充:

stream_context_create  创建资源流上下文

stream_context_set_option — 对资源流、数据包或者上下文设置参数

注意,两种用法

stream_context_set_option ( resource $stream_or_context , string $wrapper , string $option , mixed $value ) : bool   --意思是给$stream_or_context设置参数$wrapper[$option],值为$value,即 $wrapper[$option]=$value
stream_context_set_option ( resource $stream_or_context , array $options ) : bool  ------options 必须是一个 $arr['wrapper']['option'] = $value 格式二维关联数组 
stream_socket_server(string $local_socket [, int &$errno [, string &$errstr [, int $flags = STREAM_SERVER_BIND | STREAM_SERVER_LISTEN [, resource $context ]]]])注意第三个参数 flag ,STREAM_SERVER_BIND | STREAM_SERVER_LISTEN 设置多端口监听

根据补充理解worker中的__contruct()
//根据配置创建 创建上下文资源  
$context = stream_context_create($this->config['context']);
// 设置端口可以重复监听 即给资源$context,添加参数并设置值,$socket['so_reuseport'']=1;使之端口支持重复监听
\stream_context_set_option($context, 'socket', 'so_reuseport', 1);
 // 传递一个资源的文本 context  设置多端口监听STREAM_SERVER_BIND | STREAM_SERVER_LISTEN  
$this->server = stream_socket_server($host.":".$port , $errno , $errstr, STREAM_SERVER_BIND | STREAM_SERVER_LISTEN, $context);

5.swoole的两种运行模式
    51) 在 Swoole\Server 构造函数的第三个参数,可以填 2 个常量值 -- SWOOLE_BASE 或 SWOOLE_PROCESS,默认为SWOOLE_PROCESS。 这两个是swoole的两种运行模式
   52)两种模式的区别以及优缺点

SWOOLE_BASE 这种模式就是传统的异步非阻塞 Server。与 Nginx 和 Node.js 等程序是完全一致的。

worker_num 参数对于 BASE 模式仍然有效,会启动多个 Worker 进程。

当有 TCP 连接请求进来的时候,所有的 Worker 进程去争抢这一个连接,并最终会有一个 worker 进程成功直接和客户端建立 TCP 连接,之后这个连接的所有数据收发直接和这个 worker 通讯,不经过主进程的 Reactor 线程转发。

  • BASE 模式下没有 Master 进程的角色,只有 Manager 进程的角色。
  • 每个 Worker 进程同时承担了 SWOOLE_PROCESS 模式下 Reactor 线程和 Worker 进程两部分职责。
  • BASE 模式下 Manager 进程是可选的,当设置了 worker_num=1,并且没有使用 Task 和 MaxRequest 特性时,底层将直接创建一个单独的 Worker 进程,不创建 Manager进程

BASE 模式的优点:

  • BASE 模式没有 IPC 开销,性能更好
  • BASE 模式代码更简单,不容易出错

BASE 模式的缺点:

  • TCP 连接是在 Worker 进程中维持的,所以当某个 Worker 进程挂掉时,此 Worker 内的所有连接都将被关闭
  • 少量 TCP 长连接无法利用到所有 Worker 进程
  • TCP 连接与 Worker 是绑定的,长连接应用中某些连接的数据量大,这些连接所在的 Worker 进程负载会非常高。但某些连接数据量小,所以在 Worker 进程的负载会非常低,不同的 Worker 进程无法实现均衡。
  • 如果回调函数中有阻塞操作会导致 Server 退化为同步模式,此时容易导致 TCP 的 backlog 队列塞满问题。

BASE 模式的适用场景:

如果客户端连接之间不需要交互,可以使用 BASE 模式。如 Memcache、HTTP 服务器等。

attachments-2021-01-9ZdbJv1X6016a13fd6b80.jpg对应的结构已经实现了,见上42多进程Reactor模型的实现

attachments-2021-01-HBKlkFUo6016a18e1e7ce.png

SWOOLE_PROCESS

SWOOLE_PROCESS 模式的 Server 所有客户端的 TCP 连接都是和主进程建立的,内部实现比较复杂,用了大量的进程间通信、进程管理机制。适合业务逻辑非常复杂的场景。Swoole提供了完善的进程管理、内存保护机制。 在业务逻辑非常复杂的情况下,也可以长期稳定运行。

Swoole 在 Reactor 线程中提供了 Buffer 的功能,可以应对大量慢速连接和逐字节的恶意客户端。

进程模式的优点:

  • 连接与数据请求发送是分离的,不会因为某些连接数据量大某些连接数据量小导致 Worker 进程不均衡
  • Worker 进程发送致命错误时,连接并不会被切断
  • 可实现单连接并发,仅保持少量 TCP 连接,请求可以并发地在多个 Worker 进程中处理

进程模式的缺点:

  • 存在 2 次 IPC 的开销,master 进程与 worker 进程需要使用 unixSocket 进行通信

对应的模型结构

attachments-2021-01-P8Fgr9336016a2d84b9fc.png

具体的业务实现结构,代码实现比较复杂,这里暂且略过

attachments-2021-01-pChBXpmu6016a26a83b63.png


6.扩展-阅读swoole源码(略)


  • 发表于 2021-01-30 07:50
  • 阅读 ( 922 )
  • 分类:swoole

你可能感兴趣的文章

相关问题

0 条评论

请先 登录 后评论
吉洪叶
吉洪叶

21 篇文章

作家榜 »

  1. 轩辕小不懂 2403 文章
  2. 小柒 1470 文章
  3. Pack 1135 文章
  4. Nen 576 文章
  5. 王昭君 209 文章
  6. 文双 71 文章
  7. 小威 64 文章
  8. Cara 36 文章