page contents

swoole 第6次课-2 含毫命简:阻塞与非阻塞2

阻塞与非阻塞

接上

245)实现基础版本的worker模型及通信

改造基类iostarjhy\src\WorkerBase.php

<?php
namespace IoStarJhy;

abstract class WorkerBase
{
protected $server;
protected $events=[];
protected $config;
protected $typc="tcp";

public function __construct($host,$port){

$this->server = stream_socket_server("tcp://".$host.":".$port);
//echo "tcp://".$host.":".$port."\n";
dd("tcp://".swoole_get_local_ip()['ens33'].":".$port,"创建手写的worker服务");
}
/**
* 不同的模型实现不一样
* @var [type]
*/
protected abstract function accept();
/**
* 添加事件
* @method on
* @param [type] $event [description]
* @param [type] $call [description]
* @return [type] [description]
*/
public function on($event,$call){

$this->events[strtolower($event)] = $call;
}

public function set(){}

/**
* 发送信息给对应客户端
* @method send
* @param [type] $client [description]
* @param [type] $data [description]
* @return [type] [description]
*/
public function send($client,$data)
{
fwrite($client,$data);
}

public function start()
{
//var_dump($this->events);
dd($this->events,"启动后,输出服务的注册事件");
// 用于校验服务启动信息
// $this->check();
$this->accept();
}

public function close()
{

}
/**
* 用于校验服务启动信息
* @method check
* 六星教育 @shineyork老师
* @return [type] [description]
*/
public function check()
{
// 校验是否注册事件,以及注册的事件类型
}

}

实现类iostarjhy\src\Blocking\Worker.php

<?php
namespace IoStarJhy\Blocking;

use IoStarJhy\WorkerBase;

class Worker extends WorkerBase {

public function accept()
{
while(true){
//监听是否存在连接
$conn = stream_socket_accept($this->server);

if(!empty($conn)){
//触发建立连接事件
$this->events['connect']($this,$conn);

//接收服务的信息
$data = fread($conn,65535);

//处理信息
$this->events['receive']($this,$conn,$data);
}
}
}
}

测试服务端:iostarjhy\test\Blocking\server.php

<?php
require_once __DIR__."/../../vendor/autoload.php";

use IoStarJhy\Blocking\Worker;

$server = new Worker('0.0.0.0',9500);

$server->on('connect',function($server,$client){
dd($client,"客户端建立连接成功");
});
$server->on('receive',function(Worker $server,$client,$data){
dd($data,'处理客户端的数据');
$server->send($client,'hello client');
});
$server->on('close',function($server,$client){

});

$server->start();

测试客户端:iostarjhy\test\Blocking\client.php

<?php

require_once __DIR__."/../../vendor/autoload.php";

//连接服务端
$fp = stream_socket_client("tcp://192.168.204.168:9500");
fwrite($fp,'hello server');
var_dump(fread($fp,65535));

运行

attachments-2020-12-y6b1i4er5fda9e86af93d.png


说明 1基类workerBase的构造方法,只是stream_socket_server(创建)了个服务,并保存为workerBase、worker的一个server属性。

        2iostarjhy\test\Blocking\server.php下的on事件(connect,receive,close),是调用了基类workerBase的on方法,进行以事件名称为键,闭包为值的数组定义及保存,最终形成的events属性,结构如下

          $events=[

                'connect'=>function($server,$client){……}, 'receive'=>function($server,$client,$data){……}, 'close'=>function($server,$client){……}

          ]

      3. worker实现类的accept()方法, 首先是while(true)来建立并挂起连接,可接受多次信息(而不是一次就关闭服务),然后监听是否存在连接,如果存在,触发对应事件方法,接收信息,并调用事件处理及发送信息给客户端

public function accept()
{
//建立并挂起连接,可接受多次信息(而不是一次就关闭服务)
while(true){
//监听是否存在连接
$conn = stream_socket_accept($this->server);

if(!empty($conn)){
//触发建立连接事件
$this->events['connect']($this,$conn);

//接收服务的信息
$data = fread($conn,65535);

//处理信息
$this->events['receive']($this,$conn,$data);
}
}
}

      4.存在的问题,客户端一次发送多要数据,服务端只能接收到一条数据

  其它类文件等不变,测试客户端:iostarjhy\test\Blocking\client.php  改造如下:

<?php
require_once __DIR__ . "/../../vendor/autoload.php";

//连接服务端
$fp = stream_socket_client("tcp://192.168.204.168:9500");
fwrite($fp, 'hello server');
var_dump(fread($fp, 65535));
//服务端一次只能接受到一条数据,
fwrite($fp, 'hello server2');
var_dump(fread($fp, 65535));
fwrite($fp, 'hello server3');
var_dump(fread($fp, 65535));


运行

attachments-2020-12-lwWTjNlb5fdabdbdd4244.jpg

3.完善阻塞模型worker

基类iostarjhy\src\WorkerBase.php

<?php
namespace IoStarJhy;

abstract class WorkerBase
{
protected $server;
protected $events=[];
protected $config;
protected $typc="tcp";

public function __construct($host,$port){

$this->server = stream_socket_server($this->typc."://".$host.":".$port);
//echo "tcp://".$host.":".$port."\n";
dd("tcp://".swoole_get_local_ip()['ens33'].":".$port,"创建手写的worker服务");
}
/**
* 不同的模型实现不一样
* @var [type]
*/
protected abstract function accept();
/**
* 添加事件
* @method on
* @param [type] $event [description]
* @param [type] $call [description]
* @return [type] [description]
*/
public function on($event,$call){

$this->events[strtolower($event)] = $call;
}

public function set(){}

/**
* 发送信息
* @method send
* @param [type] $client [description]
* @param [type] $data [description]
* @return [type] [description]
*/
public function send($client,$data)
{
fwrite($client,$data);
}

public function start()
{
//var_dump($this->events);
dd($this->events,"启动后,输出服务的注册事件");
// 用于校验服务启动信息
$this->check();
//建立并挂起连接,可接收多次请求信息
$this->accept();
}

public function close($client)
{
//\fclose($client);
dd($client);
}
/**
* 用于校验服务启动信息
* @method check
* 六星教育 @shineyork老师
* @return [type] [description]
*/
public function check()
{
// 校验是否注册事件,以及注册的事件类型

if ($this->type == 'tcp') {
if (empty($this->event['connect']) || ! $this->event['connect'] instanceof Closure) {
dd("tcp服务必须要有回调事件: connect");
exit;
}

if (empty($this->event['receive']) || ! $this->event['receive'] instanceof Closure ) {
dd("tcp服务必须要有回调事件: receive");
exit;
}

if (empty($this->event['close']) || ! $this->event['close'] instanceof Closure ) {
dd("tcp服务必须要有回调事件: close");
exit;
}
} else if ($this->type == 'http') {
if (empty($this->event['request']) || ! $this->event['request'] instanceof Closure ) {
dd("http服务必须要有回调事件: request");
exit;
}
} else if ($this->type == 'http'){

}
}


}

实现类iostarjhy\src\Blocking\Worker.php

<?php
namespace IoStarJhy\Blocking;

use IoStarJhy\WorkerBase;

class Worker extends WorkerBase {

public function accept()
{
//建立并挂起连接,可接受多次信息(而不是一次就关闭服务)
while(true){
//监听是否存在连接
$conn = stream_socket_accept($this->server);

if(!empty($conn)){
//触发建立连接事件
$this->events['connect']($this,$conn);

//接收服务的信息
$data = fread($conn,65535);

//处理信息
$this->events['receive']($this,$conn,$data);

//如下这样处理,有些时候,不能马上断开
//$this->events['close']($this,$conn);


//缺少心跳检测机制

dd($conn);

if(get_resource_type($conn) != "Unknown"){
$this->events['close']($this,$conn);
}
}
}
}
}

测试服务端:iostarjhy\test\Blocking\server.php

<?php
require_once __DIR__."/../../vendor/autoload.php";

use IoStarJhy\Blocking\Worker;

$server = new Worker('0.0.0.0',9500);

$server->on('connect',function($server,$client){
dd($client,"客户端建立连接成功");
});
$server->on('receive',function(Worker $server,$client,$data){
dd($data,'处理客户端的数据');
// sleep(5);
$server->send($client,'hello client');
//$server->close($client);
});
$server->on('close',function(Worker $server,$client){
dd($client,'断开连接');
//$server->close($client);
});

$server->start();

测试客户端:iostarjhy\test\Blocking\client.php

<?php
require_once __DIR__ . "/../../vendor/autoload.php";

//连接服务端
$fp = stream_socket_client("tcp://192.168.204.168:9500");
fwrite($fp, 'hello server');
var_dump(fread($fp, 65535));
//服务端一次只能接受到一条数据,
/*fwrite($fp, 'hello server2');
var_dump(fread($fp, 65535));
fwrite($fp, 'hello server3');
var_dump(fread($fp, 65535));*/
//fclose($fp);

运行

attachments-2020-12-ANH2qxMz5fdad9a821cc3.jpg

  未完,见swoole 第6次课-2 含毫命简:阻塞与非阻塞3

六星教育社区-程序员编程技术分享交流学习高端论坛

  • 发表于 2020-12-17 07:56
  • 阅读 ( 678 )
  • 分类:swoole

你可能感兴趣的文章

相关问题

0 条评论

请先 登录 后评论
吉洪叶
吉洪叶

21 篇文章

作家榜 »

  1. 轩辕小不懂 2403 文章
  2. 小柒 1470 文章
  3. Pack 1135 文章
  4. Nen 576 文章
  5. 王昭君 209 文章
  6. 文双 71 文章
  7. 小威 64 文章
  8. Cara 36 文章