page contents

swoole 第5次课-依葫芦画瓢:加速tp6,实现swoole聊天室、队列消息和服务通信2

加速tp swoole聊天室 swoole队列 swoole服务通信

接上

3.用swoole实现队列消息通知

  31)根据上面的扩展----实现人与人的聊天沟通,队列通讯实现思路如下:

 attachments-2020-12-MibRfROL5fcb5e3809f1a.jpg


32)实现下单功能(即记录浏览器客户端信息功能)

新建文件夹,作为实现队列的工作目录,class\05\queuejhy

并创建web服务器端及客户端,实现下单功能

web服务器端:class\05\queuejhy\order.php

<?php
header('Access-Control-Allow-Origin:*'); // *代表允许任何网址请求
header('Access-Control-Allow-Methods:POST,GET,OPTIONS,DELETE'); // 允许请求的类型
header('Access-Control-Allow-Headers: Content-Type,Content-Length,Accept-Encoding,X-Requested-with, Origin');

$data = [
'http_id' => $_POST['http_id']
];

$data = json_encode($data);

$new = time();
// 队列 -》 任务投递
// 处理任务 //File_APPEND追加,LOCK_EX标记可以防止多人同时写入file_put_contents($file, $site, FILE_APPEND | LOCK_EX);
file_put_contents('task.txt', $data."\n");

$ret = [
"处理: ".$data.'任务',
"处理任务所需要的时间" . (time() - $new)
];
echo json_encode($ret);

下单的客户端:class\05\queuejhy\order.html

<!doctype html>
<html lang="en">

<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width,initial-scale=1, maximum-scale=1, user-scalable=no">
<title>websocketjhy</title>
<script src="http://libs.baidu.com/jquery/2.0.0/jquery.min.js"></script>
</head>

<body>
<input type="submit" value="buy" onclick="buy()">
<div id="message"></div>
<script>
// jquery的弹窗输入
//let id = prompt('输入id用于测试');
//prompt()方法用于显示可提示用户进行输入的对话框。这个方法返回用户输入的字符串。
//id = id + '_' + getFormatDate();
// let命令,用来声明局部变量。它的用法类似于var,但是所声明的变量,只在let命令所在的代码块内有效,而且有暂时性死区的约束 。let让js真正拥有了块级作用域
let http_id = 'http_order_' + getFormatDate();
console.log(http_id);
     //*********webSocket处先注释掉后待放开  start************
/*//创建一个webSocket 实例
var webSocket = new WebSocket('ws://192.168.204.168:9500?http_id='+http_id);
webSocket.onerror = function(event) {
document.getElementById("message").innerHTML = "<p>close</p>";
console.log("error" + event.data);
};
// 打开websocket
webSocket.onopen = function(event) {
console.log("open:" + sockState());

webSocket.send(`{"method":"serverBroadcast","msg":"大家好我是${id}"}`);

document.getElementById("message").innerHTML = "<p>Connect to Service</p>";
};

//监听消息
webSocket.onmessage = function(event) {
console.log("onMessage");

var data = eval('(' + event.data + ')');

console.log(data);
document.getElementById("message").innerHTML += "<p>response:" + data.msg + "</p>"
};

webSocket.onclose = function(event) {
document.getElementById("message").innerHTML = "<p>close</p>";
console.log("close:" + sockState());
webSocket.close();
}
/!**
*0:未连接
*1:连接成功,可通讯
*2:正在关闭
*3:连接已关闭或无法打开
*!/
function sockState() {
var status = ['未连接', '连接成功,可通讯', '正在关闭', '连接已关闭或无法打开'];
return status[webSocket.readyState];
}

function start(event) {
console.log(webSocket);

var msg = document.getElementById('msg').value;
var clientId = document.getElementById('clientId').value;

document.getElementById('msg').value = '';
document.getElementById('clientId').value = '';

console.log("send:" + sockState());
console.log("msg=" + msg);

let data ;
if (clientId == "") {
// 为广播
// webSocket.send();
data = `{"method":"serverBroadcast","msg":"${msg}"}`;
} else {
// 为私了
data = `{"method":"privateChat","msg":"${msg}","clientId":"${clientId}"}`
}
console.log(data);
webSocket.send(data);
document.getElementById("message").innerHTML += "<p>request" + msg + "</p>"
};

function close(event) {
webSocket.close();
}//*********webSocket处先注释掉后待放开 end *************/

function getFormatDate() {
var nowDate = new Date();
var year = nowDate.getFullYear();
var month = nowDate.getMonth() + 1 < 10 ? "0" + (nowDate.getMonth() + 1) : nowDate.getMonth() + 1;
var date = nowDate.getDate() < 10 ? "0" + nowDate.getDate() : nowDate.getDate();
var hour = nowDate.getHours() < 10 ? "0" + nowDate.getHours() : nowDate.getHours();
var minute = nowDate.getMinutes() < 10 ? "0" + nowDate.getMinutes() : nowDate.getMinutes();
var second = nowDate.getSeconds() < 10 ? "0" + nowDate.getSeconds() : nowDate.getSeconds();
return year + "" + month + "" + date + "" + hour + "" + minute + "" + second;
}

function buy() {
$.ajax({
url: "http://192.168.204.168/class/05/queuejhy/order.php", // 请求实际web项目
dataType:'json',
type:'post',
data: {
http_id: http_id
},
success:function(ret) {
console.log(ret);
}

注意1:webSocket处先注释掉后待放开

注意2:在phpstorm里新建的文件夹queuejhy,在phpstorm里不能上传 (目前是ftp软件里上传的,解决方法待更新)

注意3:在phpstorm里新建文件夹里的queuejhy下,新建立文件 order.html、order.php不能上传,及web服务端程order.php序中,队列处理写入文件时(file_put_contents('task.txt', $data."\n");)需要给足权限,所以先要在xshell或宝塔里给足权限。如下

chmod 777 -R queuejhy/  

运行方法:客户端浏览器中打开file:///D:/phpstudy_pro/WWW/swoole_2006/class/05/queuejhy/order.html文件,并打开控制台,点击buy,观看控制台信息,如下:

attachments-2020-12-GxcBOPHA5fcc40433b428.pngXshell中,查看是否写入task.txt文件,及写入的内容,如下

attachments-2020-12-8Uqe4ciS5fcc408dc3c9a.jpg

33).全局变量方式记录请求队列客户端 queue无效--原因进程隔离

     放开class\05\queuejhy\order.html 中的 websocket部分注释

     创建websocket文件:\class\05\queuejhy\websocket.php

     <?php


class WebsocketServer{
//全局变量方式 ,由于进程隔离,全局变量在不同的进程,内存空间是隔离的,所以无效,
//除非设置进程数为1,但作为服务器,供多个用户访问,是不现实的
private $conns=[]; //全局变量记录对应请求客户端
private $server;
private $host='0.0.0.0';
private $port='9500';
public function __construct()
{
$this->server = new Swoole\WebSocket\Server($this->host,$this->port);
echo swoole_get_local_ip()['ens33'].':'.$this->port."\n";
$this->onInit();
}
public function open($server, $req){
echo "connection open: {$req->fd}\n";
echo "http_id:".$req->get['http_id']."\n";
//主要是把这个http_id进行保存,通过全局变量方式
$this->conns[$req->get['http_id']]=$req->fd;
//进程隔离,不能读取
var_dump($this->conns);
echo "连接建立结束\n";
}
public function message($server, $frame){
echo "received message: {$frame->data}\n";
//进程隔离,不能读取
var_dump($this->conns);
$server->push($frame->fd, json_encode(["msg" => "hello"]));
}
public function close($server, $fd){
echo "connection close: {$fd}\n";
}
//注册事件函数
public function onInit(){
// [$this, 'open'] 把对象方法转为闭包参数传递
$this->server->on("open",[$this,"open"]);
$this->server->on("message",[$this,"message"]);
$this->server->on("close",[$this,"close"]);
}
public function start(){
$this->server->start();
}
}
//启动服务
(new WebsocketServer())->start();

     创建 queue.php文件:class\05\queuejhy\queue.php

<?php
use Swoole\Coroutine\Http\Client;
// 处理业务逻辑
// 推送信息到websocket -》推送到前台通知
Co\run(function () { // 开启协程环境
$cli = new \Swoole\Coroutine\Http\Client("192.168.204.168", 9500);
$cli->upgrade('/'); // 升级成websocket服务
//$data = file_get_contents('task.txt');
$cli->push("p"); // 推送信息
//$cli->push($data); // 推送信息
$ret = $cli->recv(); // 接收结果
var_dump($ret);
$cli->close();
});

运行方法

先开一个xshell并运行 php websocket.php,

再浏览器中file:///D:/phpstudy_pro/WWW/swoole_2006/class/05/queuejhy/order.html,进行下单,即点击buy,浏览器运行结果如下 :

   attachments-2020-12-BYhhOIX75fcc68f909d46.png

再打一个xshell,进入对应目录下,运行 php queue.php,结果如下

attachments-2020-12-ds6p4TFG5fcc690ecd61f.png

由上结果可以看到:明明是设置的全局变量,但是却读取不到,为什么呢?原因是进程隔离。

补充1:局部变量

在事件回调函数返回后,所有局部对象和变量会全部回收,不需要 unset。如果变量是一个资源类型,那么对应的资源也会被 PHP 底层释放

补充2:全局变量

  • 使用 global 关键词声明的变量,使用 static 关键词声明的类静态变量、函数静态变量,PHP 的超全局变量,包括 $_GET$_POST$GLOBALS 全局变量,及对象,类静态变量,保存在 Server 对象上的变量不会被释放。需要程序员自行处理这些变量和对象的销毁工作。

补充3:进程隔离

使用 Swoole 开发 Server 程序,需要了解进程隔离问题,Swoole\Server 程序的不同 Worker 进程之间是隔离的,在编程时操作全局变量、定时器、事件监听,仅在当前进程内有效。

  • 不同的进程中 PHP 变量不是共享,即使是全局变量,在 A 进程内修改了它的值,在 B 进程内是无效的
  • 如果需要在不同的 Worker 进程内共享数据,可以用 Redis、MySQL、文件、Swoole\Table、APCu、shmget 等工具实现
  • 不同进程的文件句柄是隔离的,所以在 A 进程创建的 Socket 连接或打开的文件,在 B 进程内是无效,即使是将它的 fd 发送到 B 进程也是不可用的

另,websocket.php中的$conns改成静态的,private static $conns=[];赋值及调用的地方对应改为self::$conns,也是读取不出来的,这里不再测试了。

34)table方式

补充:根据33)中的补充3,推荐使用共享内存来保存数据,进一步了解swoole/table

高性能共享内存 Table 点击了解更多

Swoole\Table 一个基于共享内存和锁实现的超高性能,并发数据结构。用于解决多进程 / 多线程数据共享和同步加锁问题。Table 的内存容量不受 PHP 的 memory_limit 控制。

注意:不要使用数组方式读写 Table,一定要使用文档中提供的 API 来进行操作;
数组方式取出的 Swoole\Table\Row 对象为一次性对象,请勿依赖其进行过多操作。

  • 优势

    • 性能强悍,单线程每秒可读写 200 万次;
    • 应用代码无需加锁,Table 内置行锁自旋锁,所有操作均是多线程 / 多进程安全。用户层完全不需要考虑数据同步问题;
    • 支持多进程,Table 可以用于多进程之间共享数据;
    • 使用行锁,而不是全局锁,仅当 2 个进程在同一 CPU 时间,并发读取同一条数据才会进行发生抢锁。
  • 遍历

请勿在遍历期间进行删除操作(可将所有 key 取出后进行删除)

方法

__construct()

创建内存表。

Swoole\Table::__construct(int $size, float $conflict_proportion = 0.2);Copy to clipboardErrorCopied
  • 参数  int $size 功能:指定表格的最大行数


      由于 Table 底层是建立在共享内存之上,所以无法动态扩容。所以 $size 必须在创建前自己计算设置好,Table 能存储的最大行数与 $size 正相关,但不完全一致,如 $size 为 1024 实际可存储的行数小于1024,如果 $size 过大,机器内存不足 Table 会创建失败。

    • float $conflict_proportion 功能:哈希冲突的最大比例 默认值0.2 (即 20%)  其它值:最小为 0.2,最大为 1

其他四个方法:column定义结构 ,create创建结构,set设置 $key=>$val ,其中$val为对应的结构数据,get读取

column方法:功能:内存表增加一列:用法Swoole\Table->column(string $name, int $type, int $size = 0);

参数:string $name 指定字段的名称

          int $type 指定字段类型  默认值:无 其它值:Table::TYPE_INT, Table::TYPE_FLOAT, Table::TYPE_STRING

          int $size:指定字符串字段的最大长度【字符串类型的字段必须指定 $size】

 create方法:创建内存表。定义好表的结构后,执行 create 向操作系统申请内存,创建表。

 set方法:设置行的数据。Table 使用 key-value 的方式来访问数据。用法:Swoole\Table->set(string $key,array$value): bool

参数   string $key  数据的 key。相同的 $key 对应同一行数据,如果 set 同一个 key,会覆盖上一次的数据,key 最大长度不得超过 63 字节

         array $value 数据的 value。必须是一个数组,必须与字段定义的 $name 完全相同。-Table->set() 可以设置全部字段的值,也可以只修改部分字段;

注意:-Table->set() 未设置前,该行数据的所有字段均为空;

          set/get/del 是自带行锁,所以不需要调用 lock 加锁;

          Key 非二进制安全,必须为字符串类型,不得传入二进制数据

返回值:设置成功返回 true,失败返回 false

get方法: 获取一行数据或一个字段的数据。

用法:Swoole\Table->get(string $key, string $field = null): array|false

参数:$key--数据的 key【必须为字符串类型】

          string $field --当指定了 $field 时仅返回该字段的值,而不是整个记录

返回值:$key 不存在,将返回 false,成功返回结果数组,当指定了 $field 时仅返回该字段的值,而不是整个记录

三个方法示意图

attachments-2020-12-6svI5zh15fcc88c92f511.png

补充结束,方式tabel正式开始

websocket服务端:class\05\queuejhy\websocket.php 改造如下

<?php

class WebsocketServer{
//全局变量方式 ,由于进程隔离,全局变量在不同的进程,内存空间是隔离的,所以无效,
//除非设置进程数为1,但作为服务器,供多个用户访问,是不现实的
//private $conns=[]; //全局变量记录对应请求客户端
//private static $conns=[]; //全局变量记录对应请求客户端
private $table;//Swoole\Table对象
private $server;
private $host='0.0.0.0';
private $port='9500';
public function __construct()
{
$this->server = new Swoole\WebSocket\Server($this->host,$this->port);
echo swoole_get_local_ip()['ens33'].':'.$this->port."\n";
$this->onInit();
$this->tableInit();
}
public function tableInit(){
$this->table = new Swoole\Table(1*1024*1024);
$this->table->column('fd',Swoole\Table::TYPE_INT);
$this->table->create();
}
public function open($server, $req){
echo "connection open: {$req->fd}\n";
echo "http_id:".$req->get['http_id']."\n";
//主要是把这个http_id进行保存,通过全局变量方式
//$this->conns[$req->get['http_id']]=$req->fd;
//self::$conns[$req->get['http_id']]=$req->fd;
//进程隔离,不能读取
//var_dump($this->conns);
//var_dump(self::$conns);
//设置set
$this->table->set($req->get['http_id'],['fd'=>$req->fd]);
var_dump($this->table->count());
echo "连接建立结束\n";
}
public function message($server, $frame){
//echo "received message: {$frame->data}\n";
//进程隔离,不能读取
//var_dump($this->conns);
//var_dump(self::$conns);
var_dump($this->table->count());
var_dump($frame->data);
// 1 解析数据
$data=json_decode($frame->data,true);
//2 根据数据内容,结合open连接时$tabel->set设置的数据 获取客户端order.html浏览器的$fd
$fd=($this->table->get($data["http_id"]))['fd'];//$fd=$this->table->get($data["http_id"],'fd'); // get($key,$fieldname) 获取单字段时最好用这个
var_dump($fd);
//推送给所有连接的客户端
$server->push($frame->fd, json_encode(["msg" => "hello"]));
//推送给浏览器客户端
$server->push($fd, json_encode(["msg" => "通知完成"]));
}
public function close($server, $fd){
echo "connection close: {$fd}\n";
}
//注册事件函数
public function onInit(){
// [$this, 'open'] 把对象方法转为闭包参数传递
$this->server->on("open",[$this,"open"]);
$this->server->on("message",[$this,"message"]);
$this->server->on("close",[$this,"close"]);
}
public function start(){
$this->server->start();
}
}
//启动服务
(new WebsocketServer())->start();

队列处理端:class\05\queuejhy\queue.php

<?php
//use Swoole\Coroutine\Http\Client;
// 处理业务逻辑
// 推送信息到websocket -》推送到前台通知
Co\run(function () { // 开启协程环境
//$cli = new Client("192.168.204.168", 9500);//有上面的use时,有这个
$cli = new \Swoole\Coroutine\Http\Client("192.168.204.168", 9500);//没有上面的use时,用这个
$cli->upgrade('/'); // 升级成websocket服务
$data = file_get_contents('task.txt');
//$cli->push("p"); // 推送信息
$cli->push($data); // 推送信息
$ret = $cli->recv(); // 接收结果
var_dump($ret);
$cli->close();
});

运行方法同上,

打开xshell,运行websocket.php,

再浏览器中file:///D:/phpstudy_pro/WWW/swoole_2006/class/05/queuejhy/order.html,进行下单,即点击buy

再打开一个xshell,运行queue.php

结果如下:

attachments-2020-12-CqfDqGBA5fccbaa28c270.png


至此,功能已实现。

梳理实现流程

shineYork--简洁版

attachments-2020-12-b6lm0LWt5fceed242afd3.jpg

jhy-详情参考版

attachments-2020-12-H3Y0euWN5fceed3aab3cb.jpg

(未完,见swoole 第5次课-依葫芦画瓢:加速tp6,实现swoole聊天室、队列消息和服务通信3)


  • 发表于 2020-12-06 10:11
  • 阅读 ( 793 )

你可能感兴趣的文章

相关问题

0 条评论

请先 登录 后评论
吉洪叶
吉洪叶

21 篇文章

作家榜 »

  1. 轩辕小不懂 2403 文章
  2. 小柒 1320 文章
  3. Pack 1135 文章
  4. Nen 576 文章
  5. 王昭君 209 文章
  6. 文双 71 文章
  7. 小威 64 文章
  8. Cara 36 文章