page contents

swoole 第7次课 荷塘墨画:io复用与信号模型1

io复用,信号模型

1.server与client通信及stream_select识别 socket

1)server与client通信

attachments-2020-12-Uc3rsRr55fe537378abf6.jpg

  代码演示上述通信

  服务端:iostarjhy\test\server.php

  <?php

require __DIR__."/../vendor/autoload.php";

$socket = stream_socket_server("tcp://0.0.0.0:8000", $errno, $errstr);

dd($socket, "stream_socket_server:return");

//1
/*$conn = stream_socket_accept($socket);
dd($conn,"stream_socket_accept:return");*/

//2
while(true){
$conn = stream_socket_accept($socket);
dd($conn,"stream_socket_accept:return");
//3 读写 从新建的client里写,是$conn,stream_socket_accept监控$socket的结果产生的新的client socket,而非专属的$socket
var_dump(fread($conn,65535));
fwrite($conn,"hello,client,I am server!");
}

客户端:iostarjhy\test\client.php

<?php
require __DIR__."/../vendor/autoload.php";
//连接服务端
$fp = stream_socket_client("tcp://192.168.204.168:8000");
//1
/*dd($fp,"stream_socket_client:return");*/
//2
/*dd($fp,"stream_socket_client:return");
//特别注意:写入,这里写入到的是服务端新建client的socket
fwrite($fp,'hi,server,I am client!');
//读取同上
var_dump(fread($fp,65535));*/

//3
dd($fp,"stream_socket_client:return");
sleep(10);
//特别注意:写入,这里写入到的是服务端新建client的 socket
fwrite($fp,'hi,server,I am client!');
//读取同上
var_dump(fread($fp,65535));

客户端与服务端,注释1对应部分运行

attachments-2020-12-E0pDwC4K5fe5d2cb91d40.png结果说明:stream_socket_server产生一个专属的socket,而stream_socket_accept监控专属的socket,有连接时产生一个新的client的 socket

客户端注释1,与服务端注释2对应部分运行

attachments-2020-12-EXYK3tgi5fe5d3fd4bbfb.jpg结果说明:stream_socket_server产生一个专属的socket,而stream_socket_accept监控专属的socket,有多个连接时,分别对应的产生一个新的client的socket,形成多个新的client的socket

客户端与服务端,注释2对应部分运行

attachments-2020-12-xvcVQ4ym5fe5d53907c9f.png

结果说明:服务端读写,从新建的client里写,是$conn,stream_socket_accept监控$socket的结果产生的新的client的 socket,而非专属的$socket。而特别注意,客户端的写入,写入到的是服务端新建的client的socket里,读取同理,尽管标识为$fp。


客户端注释3,对应服务器端2 部分运行

attachments-2020-12-NYgj5pSl5fe5d54564e53.png

结果说明:多个客户端,写入的是对应的 新建的clinet 代表的socket,而非专属的socket;


2)server与socket 及 socket的状态

    attachments-2020-12-GzSO9t1X5fe538e2271e4.png

attachments-2020-12-atOAwMQy5fe538f701f9d.png

专属 socket有信息,表明有客户连接了, client  socket有信息,表示 socket状态为可读,其它不做深入研究

3)stream_select与socket

attachments-2020-12-9NhOceUP5fe5391f355bd.png

stream_select作用,就是定时轮询socket 集合,并根据socket读取状态,分类到读写容器中。

2.实现io复用模型

21)准备知识--两个函数

stream_set_blocking设置stream为非阻塞影响函数fget,fread     https://php.golaravel.com/function.stream-set-blocking.html

stream_select                选择流,用于检测socket状况是否存在信息    https://php.golaravel.com/function.stream-select.html

22)io复用模型  

attachments-2020-12-mP2WMZ7w5fe69b82c8d49.pngselect 可能就是stream_select吧?

23)实现io复用-结构

attachments-2020-12-BL4TAlmC5fe69c188b66e.png


3实现io多路模型

31)基本实现

 基类:ostarjhy\src\WorkerBase.php 同上节,无变化,具体如下

<?php
namespace IoStarJhy;

abstract class WorkerBase
{
protected $server;
protected $events=[];
protected $config;
protected $typc="tcp";

public function __construct($host,$port){
$this->server = stream_socket_server($this->typc."://".$host.":".$port);
dd("tcp://".swoole_get_local_ip()['ens33'].":".$port,"创建手写的worker服务");
}
/**
* 不同的模型实现不一样
* @var [type]
*/
protected abstract function accept();
/**
* 添加事件
* @method on
* @param [type] $event [description]
* @param [type] $call [description]
* @return [type] [description]
*/
public function on($event,$call){
$this->events[strtolower($event)] = $call;
}

public function set(){}

/**
* 发送信息
* @method send
* @param [type] $client [description]
* @param [type] $data [description]
* @return [type] [description]
*/
public function send($client,$data)
{
fwrite($client,$data);
}

public function start()
{
//var_dump($this->events);
dd($this->events,"启动后,输出服务的注册事件");
// 用于校验服务启动信息
$this->check();
//建立并挂起连接,可接收多次请求信息
$this->accept();
}

public function close($client)
{
\fclose($client);
dd($client);
}
/**
* 用于校验服务启动信息
* @method check
* 六星教育 @shineyork老师
* @return [type] [description]
*/
public function check()
{
// 校验是否注册事件,以及注册的事件类型

if ($this->type == 'tcp') {
if (empty($this->event['connect']) || ! $this->event['connect'] instanceof Closure) {
dd("tcp服务必须要有回调事件: connect");
exit;
}

if (empty($this->event['receive']) || ! $this->event['receive'] instanceof Closure ) {
dd("tcp服务必须要有回调事件: receive");
exit;
}

if (empty($this->event['close']) || ! $this->event['close'] instanceof Closure ) {
dd("tcp服务必须要有回调事件: close");
exit;
}
} else if ($this->type == 'http') {
if (empty($this->event['request']) || ! $this->event['request'] instanceof Closure ) {
dd("http服务必须要有回调事件: request");
exit;
}
} else if ($this->type == 'http'){

}
}


}

实现类:iostarjhy\src\Muliplexing\Worker.php(重点)

<?php
namespace IoStarJhy\Muliplexing;

use IoStarJhy\WorkerBase;

class Worker extends WorkerBase {

//stream_select监控的动态socket集合,包括专属socket及因不同客户端由stream_select创建的client socket
protected $sockets = [];

//修改基类的构造方法,设置非阻塞并记录$sockets
public function __construct($host,$port){
parent::__construct($host,$port);
//设置非阻塞
stream_set_blocking($this->server,0);
//记录专属服务的socket
$this->sockets[(int) $this->server] = $this->server;
//说明:(int) $this->server强转化,如$this->server为:resource(13) of type (stream),int强转化为: 13。主要作用是以此为key,区分记录socket
}

public function accept()
{
//循环执行 stream_select不会主动(自动)的循环或定时轮询,所以这里循环执行
while (true){
sleep(1);
$reads = $this->sockets;
//检测sockets集合中的socket资源的状态 (读写状态,专属socket为可读的)
stream_select($reads,$w,$e,60);
//注意 stream_select前三个参数,为指针类型的参数,可往其内写入数据
foreach($reads as $key=>$socket){
//方便观看,对比
dd($reads, '$reads');//注意:array (13 => NULL,) 值为空,只是采用的dd打印方法,无法打印这个集合项的格式
dd($socket, '$socket');
dd($key, '$key');
dd($this->server, '$this->server');

//是专属socket,还是客户端的socket ,专属的创建client socket,客户端的通信
if($socket == $this->server){
//有新的连接
$conn = $this->createConn();
if($conn){
$this->sockets[(int) $conn] = $conn;
}else{
dd("建立连接不成功!");
}
} else {
//消息通信
dd("进行消息通信");
$this->sendMessage($socket);
}
}
}

}

public function createConn()
{
// 监听是否存在连接
$conn = stream_socket_accept($this->server);
if (!empty($conn)) {
//触发建立连接事件
$this->events['connect']($this, $conn);
return $conn;
}
return null;
}

public function sendMessage($conn)
{
//接收服务的信息
$data = fread($conn, 65535);
$this->events['receive']($this, $conn, $data);
}
}

测试服务端:iostarjhy\test\mul\server.php

<?php
require_once __DIR__."/../../vendor/autoload.php";

use IoStarJhy\Muliplexing\Worker;

$server = new Worker('0.0.0.0',9500);

$server->on('connect',function($server,$client){
dd($client,"客户端建立连接成功");
});
$server->on('receive',function(Worker $server,$client,$data){
dd($data,'处理客户端的数据');
$server->send($client,'hello client');
//$server->close($client);
});
$server->on('close',function(Worker $server,$client){
dd($client,'断开连接');
//$server->close($client);
});

$server->start();

测试客户端:iostarjhy\test\mul\client.php

<?php
require_once __DIR__ . "/../../vendor/autoload.php";

//连接服务端
$fp = stream_socket_client("tcp://192.168.204.168:9500");
//1部分
/*fwrite($fp, "hello server");
dd(fread($fp, 65535));*/

//2部分
fwrite($fp, "hello server1");
dd(fread($fp, 65535));

fwrite($fp, "hello server2");
dd(fread($fp, 65535));

服务端服务端,测试客户端第1部分 ,同步运行,结果如下

attachments-2020-12-TzazUPEw5fe6e1aa0967b.jpg

从上看出:通信正常.

说明,1)stream_socket_server创建服务产生的专属socket,在客户端建立连接时,被 stream_select监控,为有效连接则创建新的client  socket

          2)服务端及客户端,读取及写入的是对应的 client socket,而非 专属socket.从上述截图标出部分对比,可以看出

 32)多信息通信

服务端服务端,测试客户端第2部分 ,同步运行

attachments-2020-12-dKWJPoQB5fe6e50f79b5e.jpg

从上看出:多信通信正常

33)完善功能:连接的检验与关闭

其它都不变,修改实现类,只是sendMessage略有改动,加了连接的检验及连接关闭

iostarjhy\src\Muliplexing\Worker.php

<?php
namespace IoStarJhy\Muliplexing;

use IoStarJhy\WorkerBase;

class Worker extends WorkerBase {

//stream_select监控的动态socket集合,包括专属socket及因不同客户端由stream_select创建的client socket
protected $sockets = [];

//修改基类的构造方法,设置非阻塞并记录$sockets
public function __construct($host,$port){
parent::__construct($host,$port);
//设置非阻塞
stream_set_blocking($this->server,0);
//记录专属服务的socket
$this->sockets[(int) $this->server] = $this->server;
//说明:(int) $this->server强转化,如$this->server为:resource(13) of type (stream),int强转化为: 13。主要作用是以此为key,区分记录socket
}

public function accept()
{
//循环执行 stream_select不会主动(自动)的循环或定时轮询,所以这里循环执行
while (true){
sleep(1);
$reads = $this->sockets;
//检测sockets集合中的socket资源的状态 (读写状态,专属socket为可读的)
stream_select($reads,$w,$e,60);
//注意 stream_select前三个参数,为指针类型的参数,可往其内写入数据
foreach($reads as $key=>$socket){
//方便观看,对比
/*dd($reads, '$reads');//注意:array (13 => NULL,) 值为空,只是采用的dd打印方法,无法打印这个集合项的格式
dd($socket, '$socket');
dd($key, '$key');
dd($this->server, '$this->server');*/

//是专属socket,还是客户端的socket ,专属的创建client socket,客户端的通信
if($socket == $this->server){
//有新的连接
$conn = $this->createConn();
if($conn){
$this->sockets[(int) $conn] = $conn;
}else{
dd("建立连接不成功!");
}
} else {
//消息通信
dd("进行消息通信");
$this->sendMessage($socket);
}
}
}

}

public function createConn()
{
// 监听是否存在连接
$conn = stream_socket_accept($this->server);
if (!empty($conn)) {
//触发建立连接事件
$this->events['connect']($this, $conn);
return $conn;
}
return null;
}

public function sendMessage($conn)
{
//接收服务的信息
$data = fread($conn, 65535);
if($data === '' || false === $data){
$this->checkConn($data, $conn);
}else{
$this->events['receive']($this, $conn, $data);
}
}
//检验连接
protected function checkConn($buffer,$conn){
if(strlen($buffer) === 0){
if(! \get_resource_type($conn) == "Unknown"){
//断开连接
$this->close($conn);
}
call_user_func($this->events['close'],$this,$conn);
unset($this->sockets[(int) $conn]);
}
}
}

attachments-2020-12-LlGklSRK5fe6fd706470c.png

由上可见,连接已断开


未完,见swoole 第7次课 荷塘墨画:io复用与信号模型2


  • 发表于 2020-12-25 07:30
  • 阅读 ( 818 )

你可能感兴趣的文章

相关问题

0 条评论

请先 登录 后评论
吉洪叶
吉洪叶

21 篇文章

作家榜 »

  1. 轩辕小不懂 2403 文章
  2. 小柒 1470 文章
  3. Pack 1135 文章
  4. Nen 576 文章
  5. 王昭君 209 文章
  6. 文双 71 文章
  7. 小威 64 文章
  8. Cara 36 文章