1.server与client通信及stream_select识别 socket
1)server与client通信
代码演示上述通信
服务端:iostarjhy\test\server.php
<?php
require __DIR__."/../vendor/autoload.php";
$socket = stream_socket_server("tcp://0.0.0.0:8000", $errno, $errstr);
dd($socket, "stream_socket_server:return");
//1
/*$conn = stream_socket_accept($socket);
dd($conn,"stream_socket_accept:return");*/
//2
while(true){
$conn = stream_socket_accept($socket);
dd($conn,"stream_socket_accept:return");
//3 读写 从新建的client里写,是$conn,stream_socket_accept监控$socket的结果产生的新的client socket,而非专属的$socket
var_dump(fread($conn,65535));
fwrite($conn,"hello,client,I am server!");
}
客户端:iostarjhy\test\client.php
<?php
require __DIR__."/../vendor/autoload.php";
//连接服务端
$fp = stream_socket_client("tcp://192.168.204.168:8000");
//1
/*dd($fp,"stream_socket_client:return");*/
//2
/*dd($fp,"stream_socket_client:return");
//特别注意:写入,这里写入到的是服务端新建client的socket
fwrite($fp,'hi,server,I am client!');
//读取同上
var_dump(fread($fp,65535));*/
//3
dd($fp,"stream_socket_client:return");
sleep(10);
//特别注意:写入,这里写入到的是服务端新建client的 socket
fwrite($fp,'hi,server,I am client!');
//读取同上
var_dump(fread($fp,65535));
客户端与服务端,注释1对应部分运行
结果说明:stream_socket_server产生一个专属的socket,而stream_socket_accept监控专属的socket,有连接时产生一个新的client的 socket
客户端注释1,与服务端注释2对应部分运行
结果说明:stream_socket_server产生一个专属的socket,而stream_socket_accept监控专属的socket,有多个连接时,分别对应的产生一个新的client的socket,形成多个新的client的socket
客户端与服务端,注释2对应部分运行
结果说明:服务端读写,从新建的client里写,是$conn,stream_socket_accept监控$socket的结果产生的新的client的 socket,而非专属的$socket。而特别注意,客户端的写入,写入到的是服务端新建的client的socket里,读取同理,尽管标识为$fp。
客户端注释3,对应服务器端2 部分运行
结果说明:多个客户端,写入的是对应的 新建的clinet 代表的socket,而非专属的socket;
2)server与socket 及 socket的状态
专属 socket有信息,表明有客户连接了, client socket有信息,表示 socket状态为可读,其它不做深入研究
3)stream_select与socket
stream_select作用,就是定时轮询socket 集合,并根据socket读取状态,分类到读写容器中。
2.实现io复用模型
21)准备知识--两个函数
stream_set_blocking设置stream为非阻塞影响函数fget,fread https://php.golaravel.com/function.stream-set-blocking.html
stream_select 选择流,用于检测socket状况是否存在信息 https://php.golaravel.com/function.stream-select.html
22)io复用模型
23)实现io复用-结构
3实现io多路模型
31)基本实现
基类:ostarjhy\src\WorkerBase.php 同上节,无变化,具体如下
<?php
namespace IoStarJhy;
abstract class WorkerBase
{
protected $server;
protected $events=[];
protected $config;
protected $typc="tcp";
public function __construct($host,$port){
$this->server = stream_socket_server($this->typc."://".$host.":".$port);
dd("tcp://".swoole_get_local_ip()['ens33'].":".$port,"创建手写的worker服务");
}
/**
* 不同的模型实现不一样
* @var [type]
*/
protected abstract function accept();
/**
* 添加事件
* @method on
* @param [type] $event [description]
* @param [type] $call [description]
* @return [type] [description]
*/
public function on($event,$call){
$this->events[strtolower($event)] = $call;
}
public function set(){}
/**
* 发送信息
* @method send
* @param [type] $client [description]
* @param [type] $data [description]
* @return [type] [description]
*/
public function send($client,$data)
{
fwrite($client,$data);
}
public function start()
{
//var_dump($this->events);
dd($this->events,"启动后,输出服务的注册事件");
// 用于校验服务启动信息
$this->check();
//建立并挂起连接,可接收多次请求信息
$this->accept();
}
public function close($client)
{
\fclose($client);
dd($client);
}
/**
* 用于校验服务启动信息
* @method check
* 六星教育 @shineyork老师
* @return [type] [description]
*/
public function check()
{
// 校验是否注册事件,以及注册的事件类型
if ($this->type == 'tcp') {
if (empty($this->event['connect']) || ! $this->event['connect'] instanceof Closure) {
dd("tcp服务必须要有回调事件: connect");
exit;
}
if (empty($this->event['receive']) || ! $this->event['receive'] instanceof Closure ) {
dd("tcp服务必须要有回调事件: receive");
exit;
}
if (empty($this->event['close']) || ! $this->event['close'] instanceof Closure ) {
dd("tcp服务必须要有回调事件: close");
exit;
}
} else if ($this->type == 'http') {
if (empty($this->event['request']) || ! $this->event['request'] instanceof Closure ) {
dd("http服务必须要有回调事件: request");
exit;
}
} else if ($this->type == 'http'){
}
}
}
实现类:iostarjhy\src\Muliplexing\Worker.php(重点)
<?php
namespace IoStarJhy\Muliplexing;
use IoStarJhy\WorkerBase;
class Worker extends WorkerBase {
//stream_select监控的动态socket集合,包括专属socket及因不同客户端由stream_select创建的client socket
protected $sockets = [];
//修改基类的构造方法,设置非阻塞并记录$sockets
public function __construct($host,$port){
parent::__construct($host,$port);
//设置非阻塞
stream_set_blocking($this->server,0);
//记录专属服务的socket
$this->sockets[(int) $this->server] = $this->server;
//说明:(int) $this->server强转化,如$this->server为:resource(13) of type (stream),int强转化为: 13。主要作用是以此为key,区分记录socket
}
public function accept()
{
//循环执行 stream_select不会主动(自动)的循环或定时轮询,所以这里循环执行
while (true){
sleep(1);
$reads = $this->sockets;
//检测sockets集合中的socket资源的状态 (读写状态,专属socket为可读的)
stream_select($reads,$w,$e,60);
//注意 stream_select前三个参数,为指针类型的参数,可往其内写入数据
foreach($reads as $key=>$socket){
//方便观看,对比
dd($reads, '$reads');//注意:array (13 => NULL,) 值为空,只是采用的dd打印方法,无法打印这个集合项的格式
dd($socket, '$socket');
dd($key, '$key');
dd($this->server, '$this->server');
//是专属socket,还是客户端的socket ,专属的创建client socket,客户端的通信
if($socket == $this->server){
//有新的连接
$conn = $this->createConn();
if($conn){
$this->sockets[(int) $conn] = $conn;
}else{
dd("建立连接不成功!");
}
} else {
//消息通信
dd("进行消息通信");
$this->sendMessage($socket);
}
}
}
}
public function createConn()
{
// 监听是否存在连接
$conn = stream_socket_accept($this->server);
if (!empty($conn)) {
//触发建立连接事件
$this->events['connect']($this, $conn);
return $conn;
}
return null;
}
public function sendMessage($conn)
{
//接收服务的信息
$data = fread($conn, 65535);
$this->events['receive']($this, $conn, $data);
}
}
测试服务端:iostarjhy\test\mul\server.php
<?php
require_once __DIR__."/../../vendor/autoload.php";
use IoStarJhy\Muliplexing\Worker;
$server = new Worker('0.0.0.0',9500);
$server->on('connect',function($server,$client){
dd($client,"客户端建立连接成功");
});
$server->on('receive',function(Worker $server,$client,$data){
dd($data,'处理客户端的数据');
$server->send($client,'hello client');
//$server->close($client);
});
$server->on('close',function(Worker $server,$client){
dd($client,'断开连接');
//$server->close($client);
});
$server->start();
测试客户端:iostarjhy\test\mul\client.php
<?php
require_once __DIR__ . "/../../vendor/autoload.php";
//连接服务端
$fp = stream_socket_client("tcp://192.168.204.168:9500");
//1部分
/*fwrite($fp, "hello server");
dd(fread($fp, 65535));*/
//2部分
fwrite($fp, "hello server1");
dd(fread($fp, 65535));
fwrite($fp, "hello server2");
dd(fread($fp, 65535));
服务端服务端,测试客户端第1部分 ,同步运行,结果如下
从上看出:通信正常.
说明,1)stream_socket_server创建服务产生的专属socket,在客户端建立连接时,被 stream_select监控,为有效连接则创建新的client socket
2)服务端及客户端,读取及写入的是对应的 client socket,而非 专属socket.从上述截图标出部分对比,可以看出
32)多信息通信
服务端服务端,测试客户端第2部分 ,同步运行
从上看出:多信通信正常
33)完善功能:连接的检验与关闭
其它都不变,修改实现类,只是sendMessage略有改动,加了连接的检验及连接关闭
iostarjhy\src\Muliplexing\Worker.php
<?php
namespace IoStarJhy\Muliplexing;
use IoStarJhy\WorkerBase;
class Worker extends WorkerBase {
//stream_select监控的动态socket集合,包括专属socket及因不同客户端由stream_select创建的client socket
protected $sockets = [];
//修改基类的构造方法,设置非阻塞并记录$sockets
public function __construct($host,$port){
parent::__construct($host,$port);
//设置非阻塞
stream_set_blocking($this->server,0);
//记录专属服务的socket
$this->sockets[(int) $this->server] = $this->server;
//说明:(int) $this->server强转化,如$this->server为:resource(13) of type (stream),int强转化为: 13。主要作用是以此为key,区分记录socket
}
public function accept()
{
//循环执行 stream_select不会主动(自动)的循环或定时轮询,所以这里循环执行
while (true){
sleep(1);
$reads = $this->sockets;
//检测sockets集合中的socket资源的状态 (读写状态,专属socket为可读的)
stream_select($reads,$w,$e,60);
//注意 stream_select前三个参数,为指针类型的参数,可往其内写入数据
foreach($reads as $key=>$socket){
//方便观看,对比
/*dd($reads, '$reads');//注意:array (13 => NULL,) 值为空,只是采用的dd打印方法,无法打印这个集合项的格式
dd($socket, '$socket');
dd($key, '$key');
dd($this->server, '$this->server');*/
//是专属socket,还是客户端的socket ,专属的创建client socket,客户端的通信
if($socket == $this->server){
//有新的连接
$conn = $this->createConn();
if($conn){
$this->sockets[(int) $conn] = $conn;
}else{
dd("建立连接不成功!");
}
} else {
//消息通信
dd("进行消息通信");
$this->sendMessage($socket);
}
}
}
}
public function createConn()
{
// 监听是否存在连接
$conn = stream_socket_accept($this->server);
if (!empty($conn)) {
//触发建立连接事件
$this->events['connect']($this, $conn);
return $conn;
}
return null;
}
public function sendMessage($conn)
{
//接收服务的信息
$data = fread($conn, 65535);
if($data === '' || false === $data){
$this->checkConn($data, $conn);
}else{
$this->events['receive']($this, $conn, $data);
}
}
//检验连接
protected function checkConn($buffer,$conn){
if(strlen($buffer) === 0){
if(! \get_resource_type($conn) == "Unknown"){
//断开连接
$this->close($conn);
}
call_user_func($this->events['close'],$this,$conn);
unset($this->sockets[(int) $conn]);
}
}
}
由上可见,连接已断开
未完,见swoole 第7次课 荷塘墨画:io复用与信号模型2
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!