page contents

swoole 第8次课 更上层楼:异步io模型与单进程Reactor 2

异步io模型, 单进程Reactor

3.实现异步IO模型

   31)实现异步IO模型

   attachments-2021-01-nWvIPZzB5ff2f497b5cd3.jpg

类似与信号,不同的是,这里把事件设置到linux 内核中。

32)实现io复用-结构

attachments-2021-01-wl3llFsm5ff2f5027f7c0.png33)实现Io模型思路

attachments-2021-01-qBwHg5od5ff2f52344b9b.png

34)php-event 实现 异步IO模型

基类:同上iostarjhy\src\WorkerBase.php,这里再重复一次

<?php
namespace IoStarJhy;

abstract class WorkerBase
{
protected $server;
protected $events=[];
protected $config;
protected $typc="tcp";

public function __construct($host,$port){

$this->server = stream_socket_server("tcp://".$host.":".$port);
//echo "tcp://".$host.":".$port."\n";
dd("tcp://".swoole_get_local_ip()['ens33'].":".$port,"创建手写的worker服务");
}
/**
* 不同的模型实现不一样
* @var [type]
*/
protected abstract function accept();
/**
* 添加事件
* @method on
* @param [type] $event [description]
* @param [type] $call [description]
* @return [type] [description]
*/
public function on($event,$call){

$this->events[strtolower($event)] = $call;
}

public function set(array $config = []){}

/**
* 发送信息
* @method send
* @param [type] $client [description]
* @param [type] $data [description]
* @return [type] [description]
*/
public function send($client,$data)
{
fwrite($client,$data);
}

public function start()
{
//var_dump($this->events);
dd($this->events,"启动后,输出服务的注册事件");
// 用于校验服务启动信息
$this->check();
//建立并挂起连接,可接收多次请求信息
$this->accept();
}

public function close($client)
{
\fclose($client);
dd($client);
}
/**
* 用于校验服务启动信息
* @method check
* 六星教育 @shineyork老师
* @return [type] [description]
*/
public function check()
{
// 校验是否注册事件,以及注册的事件类型

if ($this->type == 'tcp') {
if (empty($this->event['connect']) || ! $this->event['connect'] instanceof Closure) {
dd("tcp服务必须要有回调事件: connect");
exit;
}

if (empty($this->event['receive']) || ! $this->event['receive'] instanceof Closure ) {
dd("tcp服务必须要有回调事件: receive");
exit;
}

if (empty($this->event['close']) || ! $this->event['close'] instanceof Closure ) {
dd("tcp服务必须要有回调事件: close");
exit;
}
} else if ($this->type == 'http') {
if (empty($this->event['request']) || ! $this->event['request'] instanceof Closure ) {
dd("http服务必须要有回调事件: request");
exit;
}
} else if ($this->type == 'http'){

}
}

}

php-event 事件类   iostarjhy\src\Async\Events.php

<?php
namespace iostarjhy\Async;

use \Event;

class Events
{
protected $client;
protected $eventBase;

public function __construct($eventBase, $client)
{
$this->eventBase = $eventBase;
$this->client = $client;
}

public function hander(Worker $server,&$count)
{
$event=new Event($this->eventBase,$this->client,Event::PERSIST |Event::READ | Event::WRITE ,function($socket) use ($server,&$count){
$server->sendMessage($socket);
// 清空
($count[(int) $socket][Event::PERSIST | Event::READ | Event::WRITE])->free();
});
$event->add();
$count[(int) $this->client][Event::PERSIST | Event::READ | Event::WRITE] = $event;
}
}

实现类 :iostarjhy\src\Async\Worker.php

<?php
namespace IoStarJhy\Async;

use IoStarJhy\WorkerBase;
//use iostarjhy\Async\Events;
use \EventBase;
use \Event;

class Worker extends WorkerBase
{

public function __construct($host,$port){
parent::__construct($host,$port);
//设置非阻塞
stream_set_blocking($this->server,0);
}

public function accept()
{
$count = []; //参数,记录,并清除该标识事件监听,节省内存。这个参数必不可少,且引用传参
$eventBase = new EventBase();
$event = new Event($eventBase, $this->server, Event::WRITE | Event::READ | Event::PERSIST, function($socket) use (&$eventBase,&$count){
$conn = stream_socket_accept($socket);
if (!empty($conn)) {
//触发建立连接事件
$this->events['connect']($this, $conn);
}
//处理通信
(new Events($eventBase, $conn))->hander($this,$count);
});
$count[(int) $this->server][Event::WRITE | Event::READ | Event::PERSIST] = $this->server;
$event->add();
$eventBase->loop();
}

public function sendMessage($conn)
{
//接收服务的信息
$data = fread($conn, 65535);
if('' === $data || false === $data){
//$this->checkConn($data, $conn); //先注释掉
}else{
$this->events['receive']($this, $conn, $data);
}
}
//检验连接
protected function checkConn($buffer,$conn){
if(strlen($buffer) === 0){
if(! \get_resource_type($conn) == "Unknown"){
//断开连接
$this->close($conn);
}
call_user_func($this->events['close'],$this,$conn);
unset($this->sockets[(int) $conn]);
}
}
}

测试服务器端iostarjhy\test\async\server.php

<?php
require_once __DIR__."/../../vendor/autoload.php";

use IoStarJhy\Async\Worker;

$server = new Worker('0.0.0.0',9500);

$server->on('connect',function($server,$client){
dd($client,"客户端建立连接成功");
});
$server->on('receive',function(Worker $server,$client,$data){
dd($data,'处理客户端的数据');
//sleep(5);
$server->send($client,'hello client');
//$server->close($client);
});
$server->on('close',function($server,$client){
dd($client,'断开连接');
//$server->close($client);
});

$server->start();

测试客户端  iostarjhy\test\async\client.php

<?php
require_once __DIR__ . "/../../vendor/autoload.php";

//连接服务端
$fp = stream_socket_client("tcp://192.168.204.168:9500");
fwrite($fp, 'hello server');
dd(fread($fp, 65535));
fclose($fp);

运行

attachments-2021-01-jV1OIFvh5ff3c48275dfb.png


4 Reactor模型

41)Reactor模型--单线程或单进程
attachments-2021-01-NjiPvVY95ff79691e19b6.png

流程说明:

1. Reactor对象通过select监控客户端请求事件,收到事件后通过dispatch进行分发

2. 如果是建立连接请求事件,则由Acceptor通过accept处理连接请求,然后创建一个Handler对象处理连接完成后的后续业务处理

3. 如果不是建立连接事件,则Reactor会分发调用连接对应的Handler来响应

4. Handler会完成read->业务处理->send的完整业务流程

Reactor模型与异步io:值得注意到是,异步io就是reactor的一种模型

42)构建单进程Reactor模型

attachments-2021-01-fBfdbYGH5ff798d0693fb.png

43)原生实现单进程Reactor模型

基类:同上iostarjhy\src\WorkerBase.php 略

Reactor类:类似上面的php-event事件监听类  iostarjhy\src\Reactor\Reactor.php

<?php
namespace IoStarJhy\Reactor;

use \EventBase as EventBase;
use \Event as Event;

class Reactor
{
protected $reactor;
protected $events;
protected static $instance = null;
const READ = EVENT::READ | EVENT::PERSIST;
const WRITE = EVENT::WRITE | EVENT::PERSIST;
const EVENT = EVENT::READ | EVENT::WRITE | EVENT::PERSIST;

public static function getInstance()
{
if(is_null(self::$instance)){
self::$instance = new self();
self::$instance->reactor = new EventBase;
}
return self::$instance;
}

public function add($fd,$what,$call)
{
$event = new Event($this->reactor,$fd,$what,$call);
$event->add();
$this->events[(int) $fd][$what] = $event;
}
public function del($fd,$what)
{
($this->events[(int) $fd][$what])->free();
//($this->events[(int) $fd])[$what]->free();
}
public function run()
{
$this->reactor->loop();
}

}

Connect  连接类 : iostarjhy\src\Reactor\Connection.php

<?php
namespace IoStarJhy\Reactor;

class Connection
{
protected $conn;
protected $worker;

public function __construct($conn, $worker)
{
$this->conn = $conn;
$this->worker = $worker;
}

public function handler()
{
Reactor::getInstance()->add($this->conn, Reactor::EVENT, $this->sendMessage());
}

public function sendMessage()
{
return function($conn){

$buffer = fread($conn, 65535);
dd($buffer,"接收服务的信息"); //调试信息
sleep(5); //无意义,调试用的
if ('' === $buffer || false === $buffer) {
// 校验是否断开连接
$this->checkConn($buffer, $conn);
} else {
$this->worker->events['receive']($this->worker, $conn, $buffer);
}
};
}

/**
* 校验连接状态
* @method closeConn
* 六星教育 @shineyork老师
* @param socket $conn 连接信息
*/
public function checkConn($buffer, $conn)
{
if (strlen($buffer) === 0) {
if (!get_resource_type($conn) == "Unknown") {
// 关闭连接
$this->worker->close($conn);
}
$this->worker->events['close']($this->worker, $conn);
}
//dd("校验连接状态");//调试信息
//sleep(4); //调试用的
Reactor::getInstance()->del($conn, Reactor::EVENT);
}


}

Worker实现类 :iostarjhy\src\Reactor\Worker.php

<?php
namespace IoStarJhy\Reactor;

use IoStarJhy\WorkerBase;
use \EventBase;
use \Event;

class Worker extends WorkerBase
{

public $events = [];

public function accept()
{
Reactor::getInstance()->add($this->server,Reactor::EVENT,$this->createConn());
Reactor::getInstance()->run();
}

public function createConn()
{
return function($socket){
$conn = stream_socket_accept($socket);
if (!empty($conn)) {
// 触发建立连接事件
$this->events['connect']($this, $conn);
// 处理通信
(new Connection($conn, $this))->handler();
}

};
}

}

测试服务端,类上 ,路径iostarjhy\test\reactor\server.php

<?php
require_once __DIR__."/../../vendor/autoload.php";

use IoStarJhy\Reactor\Worker;

$server = new Worker('0.0.0.0',9500);

$server->on('connect',function($server,$client){
dd($client,"客户端建立连接成功");
});
$server->on('receive',function(Worker $server,$client,$data){
$server->send($client,'hello client');
//$server->close($client);
});
$server->on('close',function($server,$client){
dd($client,'断开连接');
//$server->close($client);
});

$server->start();

测试客户端,类上 ,路径iostarjhy\test\reactor\client.php

<?php
require_once __DIR__ . "/../../vendor/autoload.php";

//连接服务端
$fp = stream_socket_client("tcp://192.168.204.168:9500");
fwrite($fp, 'hello server3');
dd(fread($fp, 65535));
fclose($fp);

运行

attachments-2021-01-slbAPGrl6004d1bbaea65.jpg


说明 : bug问题的查找, bug的出现不可避免,解决方法,按程序流程,在可能出现问题的地方,打印(dd)相关变量及信息。有时为避免速度过快输出信息过多,并可 配合sleep(10)睡眠时间,查看出现的 bug的地方

  • 发表于 2021-01-04 08:07
  • 阅读 ( 672 )

你可能感兴趣的文章

相关问题

0 条评论

请先 登录 后评论
吉洪叶
吉洪叶

21 篇文章

作家榜 »

  1. 轩辕小不懂 2403 文章
  2. 小柒 1478 文章
  3. Pack 1135 文章
  4. Nen 576 文章
  5. 王昭君 209 文章
  6. 文双 71 文章
  7. 小威 64 文章
  8. Cara 36 文章