3.实现异步IO模型
31)实现异步IO模型
类似与信号,不同的是,这里把事件设置到linux 内核中。
32)实现io复用-结构
34)php-event 实现 异步IO模型
基类:同上iostarjhy\src\WorkerBase.php,这里再重复一次
<?php
namespace IoStarJhy;
abstract class WorkerBase
{
protected $server;
protected $events=[];
protected $config;
protected $typc="tcp";
public function __construct($host,$port){
$this->server = stream_socket_server("tcp://".$host.":".$port);
//echo "tcp://".$host.":".$port."\n";
dd("tcp://".swoole_get_local_ip()['ens33'].":".$port,"创建手写的worker服务");
}
/**
* 不同的模型实现不一样
* @var [type]
*/
protected abstract function accept();
/**
* 添加事件
* @method on
* @param [type] $event [description]
* @param [type] $call [description]
* @return [type] [description]
*/
public function on($event,$call){
$this->events[strtolower($event)] = $call;
}
public function set(array $config = []){}
/**
* 发送信息
* @method send
* @param [type] $client [description]
* @param [type] $data [description]
* @return [type] [description]
*/
public function send($client,$data)
{
fwrite($client,$data);
}
public function start()
{
//var_dump($this->events);
dd($this->events,"启动后,输出服务的注册事件");
// 用于校验服务启动信息
$this->check();
//建立并挂起连接,可接收多次请求信息
$this->accept();
}
public function close($client)
{
\fclose($client);
dd($client);
}
/**
* 用于校验服务启动信息
* @method check
* 六星教育 @shineyork老师
* @return [type] [description]
*/
public function check()
{
// 校验是否注册事件,以及注册的事件类型
if ($this->type == 'tcp') {
if (empty($this->event['connect']) || ! $this->event['connect'] instanceof Closure) {
dd("tcp服务必须要有回调事件: connect");
exit;
}
if (empty($this->event['receive']) || ! $this->event['receive'] instanceof Closure ) {
dd("tcp服务必须要有回调事件: receive");
exit;
}
if (empty($this->event['close']) || ! $this->event['close'] instanceof Closure ) {
dd("tcp服务必须要有回调事件: close");
exit;
}
} else if ($this->type == 'http') {
if (empty($this->event['request']) || ! $this->event['request'] instanceof Closure ) {
dd("http服务必须要有回调事件: request");
exit;
}
} else if ($this->type == 'http'){
}
}
}
php-event 事件类 iostarjhy\src\Async\Events.php
<?php
namespace iostarjhy\Async;
use \Event;
class Events
{
protected $client;
protected $eventBase;
public function __construct($eventBase, $client)
{
$this->eventBase = $eventBase;
$this->client = $client;
}
public function hander(Worker $server,&$count)
{
$event=new Event($this->eventBase,$this->client,Event::PERSIST |Event::READ | Event::WRITE ,function($socket) use ($server,&$count){
$server->sendMessage($socket);
// 清空
($count[(int) $socket][Event::PERSIST | Event::READ | Event::WRITE])->free();
});
$event->add();
$count[(int) $this->client][Event::PERSIST | Event::READ | Event::WRITE] = $event;
}
}
实现类 :iostarjhy\src\Async\Worker.php
<?php
namespace IoStarJhy\Async;
use IoStarJhy\WorkerBase;
//use iostarjhy\Async\Events;
use \EventBase;
use \Event;
class Worker extends WorkerBase
{
public function __construct($host,$port){
parent::__construct($host,$port);
//设置非阻塞
stream_set_blocking($this->server,0);
}
public function accept()
{
$count = []; //参数,记录,并清除该标识事件监听,节省内存。这个参数必不可少,且引用传参
$eventBase = new EventBase();
$event = new Event($eventBase, $this->server, Event::WRITE | Event::READ | Event::PERSIST, function($socket) use (&$eventBase,&$count){
$conn = stream_socket_accept($socket);
if (!empty($conn)) {
//触发建立连接事件
$this->events['connect']($this, $conn);
}
//处理通信
(new Events($eventBase, $conn))->hander($this,$count);
});
$count[(int) $this->server][Event::WRITE | Event::READ | Event::PERSIST] = $this->server;
$event->add();
$eventBase->loop();
}
public function sendMessage($conn)
{
//接收服务的信息
$data = fread($conn, 65535);
if('' === $data || false === $data){
//$this->checkConn($data, $conn); //先注释掉
}else{
$this->events['receive']($this, $conn, $data);
}
}
//检验连接
protected function checkConn($buffer,$conn){
if(strlen($buffer) === 0){
if(! \get_resource_type($conn) == "Unknown"){
//断开连接
$this->close($conn);
}
call_user_func($this->events['close'],$this,$conn);
unset($this->sockets[(int) $conn]);
}
}
}
测试服务器端iostarjhy\test\async\server.php
<?php
require_once __DIR__."/../../vendor/autoload.php";
use IoStarJhy\Async\Worker;
$server = new Worker('0.0.0.0',9500);
$server->on('connect',function($server,$client){
dd($client,"客户端建立连接成功");
});
$server->on('receive',function(Worker $server,$client,$data){
dd($data,'处理客户端的数据');
//sleep(5);
$server->send($client,'hello client');
//$server->close($client);
});
$server->on('close',function($server,$client){
dd($client,'断开连接');
//$server->close($client);
});
$server->start();
测试客户端 iostarjhy\test\async\client.php
<?php
require_once __DIR__ . "/../../vendor/autoload.php";
//连接服务端
$fp = stream_socket_client("tcp://192.168.204.168:9500");
fwrite($fp, 'hello server');
dd(fread($fp, 65535));
fclose($fp);
运行
4 Reactor模型
1. Reactor对象通过select监控客户端请求事件,收到事件后通过dispatch进行分发
2. 如果是建立连接请求事件,则由Acceptor通过accept处理连接请求,然后创建一个Handler对象处理连接完成后的后续业务处理
3. 如果不是建立连接事件,则Reactor会分发调用连接对应的Handler来响应
4. Handler会完成read->业务处理->send的完整业务流程
Reactor模型与异步io:值得注意到是,异步io就是reactor的一种模型
42)构建单进程Reactor模型
43)原生实现单进程Reactor模型
基类:同上iostarjhy\src\WorkerBase.php 略
Reactor类:类似上面的php-event事件监听类 iostarjhy\src\Reactor\Reactor.php
<?php
namespace IoStarJhy\Reactor;
use \EventBase as EventBase;
use \Event as Event;
class Reactor
{
protected $reactor;
protected $events;
protected static $instance = null;
const READ = EVENT::READ | EVENT::PERSIST;
const WRITE = EVENT::WRITE | EVENT::PERSIST;
const EVENT = EVENT::READ | EVENT::WRITE | EVENT::PERSIST;
public static function getInstance()
{
if(is_null(self::$instance)){
self::$instance = new self();
self::$instance->reactor = new EventBase;
}
return self::$instance;
}
public function add($fd,$what,$call)
{
$event = new Event($this->reactor,$fd,$what,$call);
$event->add();
$this->events[(int) $fd][$what] = $event;
}
public function del($fd,$what)
{
($this->events[(int) $fd][$what])->free();
//($this->events[(int) $fd])[$what]->free();
}
public function run()
{
$this->reactor->loop();
}
}
Connect 连接类 : iostarjhy\src\Reactor\Connection.php
<?php
namespace IoStarJhy\Reactor;
class Connection
{
protected $conn;
protected $worker;
public function __construct($conn, $worker)
{
$this->conn = $conn;
$this->worker = $worker;
}
public function handler()
{
Reactor::getInstance()->add($this->conn, Reactor::EVENT, $this->sendMessage());
}
public function sendMessage()
{
return function($conn){
$buffer = fread($conn, 65535);
dd($buffer,"接收服务的信息"); //调试信息
sleep(5); //无意义,调试用的
if ('' === $buffer || false === $buffer) {
// 校验是否断开连接
$this->checkConn($buffer, $conn);
} else {
$this->worker->events['receive']($this->worker, $conn, $buffer);
}
};
}
/**
* 校验连接状态
* @method closeConn
* 六星教育 @shineyork老师
* @param socket $conn 连接信息
*/
public function checkConn($buffer, $conn)
{
if (strlen($buffer) === 0) {
if (!get_resource_type($conn) == "Unknown") {
// 关闭连接
$this->worker->close($conn);
}
$this->worker->events['close']($this->worker, $conn);
}
//dd("校验连接状态");//调试信息
//sleep(4); //调试用的
Reactor::getInstance()->del($conn, Reactor::EVENT);
}
}
Worker实现类 :iostarjhy\src\Reactor\Worker.php
<?php
namespace IoStarJhy\Reactor;
use IoStarJhy\WorkerBase;
use \EventBase;
use \Event;
class Worker extends WorkerBase
{
public $events = [];
public function accept()
{
Reactor::getInstance()->add($this->server,Reactor::EVENT,$this->createConn());
Reactor::getInstance()->run();
}
public function createConn()
{
return function($socket){
$conn = stream_socket_accept($socket);
if (!empty($conn)) {
// 触发建立连接事件
$this->events['connect']($this, $conn);
// 处理通信
(new Connection($conn, $this))->handler();
}
};
}
}
测试服务端,类上 ,路径iostarjhy\test\reactor\server.php
<?php
require_once __DIR__."/../../vendor/autoload.php";
use IoStarJhy\Reactor\Worker;
$server = new Worker('0.0.0.0',9500);
$server->on('connect',function($server,$client){
dd($client,"客户端建立连接成功");
});
$server->on('receive',function(Worker $server,$client,$data){
$server->send($client,'hello client');
//$server->close($client);
});
$server->on('close',function($server,$client){
dd($client,'断开连接');
//$server->close($client);
});
$server->start();
测试客户端,类上 ,路径iostarjhy\test\reactor\client.php
<?php
require_once __DIR__ . "/../../vendor/autoload.php";
//连接服务端
$fp = stream_socket_client("tcp://192.168.204.168:9500");
fwrite($fp, 'hello server3');
dd(fread($fp, 65535));
fclose($fp);
运行
说明 : bug问题的查找, bug的出现不可避免,解决方法,按程序流程,在可能出现问题的地方,打印(dd)相关变量及信息。有时为避免速度过快输出信息过多,并可 配合sleep(10)睡眠时间,查看出现的 bug的地方
完
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!