Swoole整合ThinkPHP3.2系列教程二

时间:2022-01-06 06:47:45

swoole和ThinkPHP的整合

先上一份在我们系统内部的swoole整合的架构预览

├─Application                           应用目录
│ │
│ ├─Cli Cli模块
│ │ ├─Controller 制器类
│ │ │ ├─StartController TP框架加载时默认加载的控制器
│ │ │ ├─SwooleController 我们的业务逻辑写在这里面
│ │ └─ ...
├─Swoole
│ │
│ ├─log swoole运行日志
│ ├─Server.php swoole的服务代码
│ ├─swoole.php 用于cli模式下启动和软重启swoole服务

最早测试的时候不是这样搭建的,而是把Server.php里的关于服务的东西放在了TP控制器里,在cli模式下调用

php index.php(入口文件) Swoole/start (控制器/方法)

这种模式是把swoole套在了TP里运行,也是可行的,但是总觉得启动个swoole服务为毛还要告诉TP一声?

我们想要的一种模式是业务服务器独立运行,swoole服务作为守护进程常驻内存,当浏览器需要运行比较耗时的操作时,需要跟swoole服务进程建立长连接,当耗时的任务执行完毕时会通知浏览器已经完毕。

整个过程下来,swoole服务和业务服务不应该耦合在一起的。最完美的状态是swoole独立运行(使用swoole框架重新写耗时操作的业务逻辑代码),独立连接数据库。当浏览器执行这些任务时,就不再找业务服务器了,而是直接跟swoole服务打交道。

可是我们没有那么多的时间和精力,我们只能在swoole服务的进程里调用TP框架的东西,来执行我们在TP里写的代码。最后参考了网上的一些方案,选择了这样与TP结合。


代码逻辑

/Swoole/Server.php

  • 关于swoole服务的一些配置,比如监听地址,端口号(默认为9501),和一些基础配置

  • 一些回调函数,里面的代码注释很完整,可以直接看代码

<?php
// +----------------------------------------------------------------------
// 2017-8-23 09:03:40
// 此次修改为只作为websocket的服务端
// +----------------------------------------------------------------------
class Server{
protected $swoole;
// 监听所有地址
protected $host = '0.0.0.0';
// 监听 9501 端口
protected $port = 9501;
// 配置项
protected $option = [
//设置启动的worker进程数
'worker_num' => 2,
//task进程的数量
'task_worker_num' => 4,
//指定task任务使用消息队列模式,3表示完全争抢模式,task进程会争抢队列,无法使用定向投递
'task_ipc_mode' => 3,
//task进程的最大任务数
'task_max_request' => 1000,
// 守护进程化
'daemonize' => false,
// 监听队列的长度
'backlog' => 128,
//绑定uid时用
'dispatch_mode' => 5,
//设置日志路径
'log_file' => SWOOLE_LOG_PATH,
];
protected function init(){
//异步非阻塞多进程的websocket
$this->swoole = new swoole_websocket_server($this->host, $this->port);
$eventList = ['Open', 'Message', 'Close', 'HandShake' , 'Task', 'Finish' , 'WorkerStart' , 'Receive'];

// 设置参数
if (!empty($this->option)) {
$this->swoole->set($this->option);
}
// 设置回调
foreach ($eventList as $event) {
if (method_exists($this, 'on' . $event)) {
$this->swoole->on($event, [$this, 'on' . $event]);
}
}
}
public function start(){
$this->init();
$this->swoole->start();
}
public function getHost(){
return $this->host;
}
public function getPort(){
return $this->port;
}
/**
* [onOpen 建立连接时的回调函数]
* @method onOpen
* @param swoole_server $serv [description]
* @param swoole_http_request $req [description]
* @return [type] [description]
*/

public function onOpen(swoole_server $serv, swoole_http_request $req){
//将连接绑定到uid上面
if(!empty($req->get)&&$req->get['uid']){
$serv->bind($req->fd , $req->get['uid']);
}
}
/**
* [onMessage 接收到socket客户端发送数据的回调函数]
* @method onMessage
* @param swoole_server $serv [description]
* @param swoole_websocket_frame $frame [description]
* @return [type] [description]
*/

public function onMessage(swoole_server $serv, swoole_websocket_frame $frame){
//收到数据时处理数据
//根据收到的cmd名字去调用指定的方法
$receive = json_decode($frame->data,true);
//为了避免数据处理量过大阻塞当前进程,导致服务响应变慢,我们把耗时的操作扔到TaskWorker进程池中执行
//当要执行的方法存在并且已经在swoole_log表里备案过的可以丢到task进程池
$swooleLog = D('SwooleLog');
$swooleController = A('Swoole');
//$receive['args']['id']是业务服务器那边数据库SwooleLog表里的id
if (method_exists($swooleController, $receive['cmd']) && $receive['args']['id']) {
$task_id = $serv->task($receive);
//记录task_id信息
//...其他你想要做的精细化处理
}else{
if($receive['cmd'] === 'reload'){
//利用Swoole提供的柔性终止/重启的机制
$rs = $serv -> reload();
$serv->push($frame->fd , $rs);
}elseif($receive['args']['id']){
$returnData = $this->_returnStr('submit_error');
$serv->push($frame->fd , $returnData);
}elseif(method_exists($this, $receive['cmd'])){
$this->{$receive['cmd']}($serv , $frame);
}
}
}
/**
* [onTask 在task_worker进程池内被调用时的回调函数]
* @method onTask
* @param swoole_server $serv [description]
* @param int $task_id [description]
* @param int $src_worker_id [description]
* @param mixed $data [description]
* @return [type] [description]
*/

public function onTask(swoole_server $serv, $task_id, $src_worker_id, $data){
//记录任务开始执行的时间
//...自己发挥 我们项目的业务逻辑部分已经删除
//
try{
$swooleController = A('Swoole');
$rs = $swooleController->{$data['cmd']}($data['args']);
return json_encode(['id'=>$data['args']['id'] , 'status' => true , 'other' => $rs]);
}catch(\Exception $e){
return json_encode(['id'=>$data['args']['id'] , 'status' => false , 'other' => $e->getMessage()]);
}
}
/**
* [onFinish 任务执行完毕时的回调函数]
* @method onFinish
* @param swoole_server $serv [description]
* @param int $task_id [description]
* @param string $data [description]
* @return [type] [description]
*/

public function onFinish(swoole_server $serv, $task_id, string $data){
$rs = json_decode($data , true);
$swooleLog = D('SwooleLog');
//...
//通知用户该任务已经执行完毕了 可以来查看数据了
//检查客户端链接是否存在 如果存在的话发送消息
$returnData = $this->_returnStr('task_excute_success');
foreach($serv->connections as $fd){
$conn = $serv->connection_info($fd);
//根据uid判断当前连接是活跃的
//这里是业务服务器那边能取到的用户信息,这里已经删除,保护我们的项目
$serv->push($fd , $returnData);
}
}
/**
* [onWorkerStart 为了应用能热加载 把框架的东西放到worker进程启动]
* @method onWorkerStart
* @param swoole_server $server [description]
* @param [type] $worker_id [description]
* @return [type] [description]
*/

public function onWorkerStart(swoole_server $server, $worker_id){
// 定义应用目录
define('APP_PATH', '../Application/');
// 定义应用模式
define('APP_Mode', 'cli');
define('BIND_MODULE','Cli');
// 引入ThinkPHP入口文件
require '../ThinkPHP/ThinkPHP.php';
}
/**
* [_returnStr 返回给客户端的数据]
* @method _returnStr
* @param [type] $type [description]
* @return [type] [description]
*/

private function _returnStr($type){
switch ($type) {
case 'submit_success':
$returnData = [
'code' => 202,
'info' => '任务已经提交,请等待服务器计算数据!'
];
break;
case 'submit_error':
$returnData = [
'code' => 404,
'info' => '任务提交失败,请联系管理员进行处理!'
];
break;
case 'task_excute_success':
$returnData = [
'code' => 200,
'info' => '任务执行完毕!'
];
break;
}
return json_encode($returnData);
}
/**
* [clients 获取所有的客户端连接信息,返回我们的uid]
* @method clients
* @return [type] [description]
*/

public function clients(swoole_server $serv , swoole_websocket_frame $frame){
//...
$serv->push($frame->fd , json_encode($serv->connections));
}
public function __call($method, $args){
call_user_func_array([$this->swoole, $method], $args);
}
}

/Swoole/swoole.php

是一个面向cli编程的代码,主要提供了start和reload两个命令

#!/bin/env php
<?php
/**
* 定义项目根目录&swoole-task pid
*/

define('SWOOLE_PATH', __DIR__);
define('SWOOLE_LOG_PATH', SWOOLE_PATH . DIRECTORY_SEPARATOR . 'log' . DIRECTORY_SEPARATOR . 'Swoole' . '.log');

/**
* 加载 swoole server
*/

include SWOOLE_PATH . DIRECTORY_SEPARATOR . 'Server.php';

//提示帮助信息
if ($argc != 2 || in_array($argv[1], array('--help', '-help', '-h', '-?'))) {
echo <<<HELP
用法:php swoole.php 选项 ... 可选的命令[start|reload|list]

--help|-help|-h|-? 显示本帮助说明

选项说明
start, 启动swoole服务[默认监测9501端口]
reload, 柔性重启所有workder进程
list, 查看当前所有连接的客户端数

HELP;

exit;
}

//执行命令行选项
switch($argv[1]){
case 'start':
start();
break;
case 'reload':
reload();
break;
case 'list':
clients();
break;
default:
exit("输入命令有误 : {$argv[1]}, 请查看帮助文档\n");
break;
}

//启动swoole服务
function start(){
$server = new Server();
$server->start();
}

//柔性重启swoole服务
function reload(){
echo "正在柔性重启swoole的worker进程..." . PHP_EOL;
try {
$server = new Server();
$port = $server->getPort();
$host = '127.0.0.1';
$cli = new swoole_http_client($host, $port);
$cli->set(['websocket_mask' => true]);
$cli->on('message', function ($_cli, $frame) {
if($frame->data){
exit('swoole重启成功!'.PHP_EOL);
}
});
$cli->upgrade('/', function ($cli) {
$cli->push(json_encode(['cmd' => 'reload']));
});
} catch (Exception $e) {
exit($e->getMessage() . PHP_EOL . $e->getTraceAsString());
}
}

function clients(){
try {
$server = new Server();
$port = $server->getPort();
$host = '127.0.0.1';
$cli = new swoole_http_client($host, $port);
$cli->set(['websocket_mask' => true]);
$cli->on('message', function ($_cli, $frame) {
$users = json_decode($frame->data , true);
if(empty($users)){
echo '当前没有客户端连接'.PHP_EOL;
}else{
foreach($users as $user){
echo '---'.$user.PHP_EOL;
}
}
exit();
});
$cli->upgrade('/', function ($cli) {
$cli->push(json_encode(['cmd' => 'clients']));

});
} catch (Exception $e) {
exit($e->getMessage() . PHP_EOL . $e->getTraceAsString());
}
}

存在的问题

0.以上的代码是从项目里摘出来的,大体思路还在,业务部分代码删除了。不保证正常运行,但是问题不大,如果报错了聪明的你一定一看就知道是什么错误。

1.由于我们将TP框架的东西放在workstart回调函数里启动,这里只要启动worker进程和task进程,都会加载一次TP框架的东西到内存里,对内存的占用问题还需要仔细研究。

2.目前还不支持那些不支持HTML5 WebSocket的浏览器 我会尽快想办法解决。用flash模拟socket请求的方法没有试成功,近期重新再搞一波。