PHP异步任务之swoole

时间:2022-09-24 17:21:37

一、安装swoole

  1. 下载地址:https://github.com/swoole/swoole-src/releases/tag/1.8.12-stable

  2. 下载压缩包,上传到服务器(测试centos),解压缩

  3. PHP异步任务之swoole

  4. cd swoole-src-1.8.7-stable

  5. phpize(phpize是用来扩展php扩展模块的,通过phpize可以建立php的外挂模块)

  6. ./configure

  7. make && make install

  8. 添加php.ini扩展extension=swoole.so

  9. 通过php -m查看扩展信息如果出现swoole则为成功

  10. PHP异步任务之swoole

  11. 测试make test出现上图信息,表示安装成功

二、问题解决

  1. 出现问题prce_open,为扩展问题,修改php.ini文件,去除即可

三、测试方案

  1. swoole下examples  service.php start 启动swoole服务

  2. 推荐测试工具PHP异步任务之swoole

四、服务端Demo(CI框架,以发送邮件为例)

<?php
class Swoole extends CI_Controller
{
    static $index = 0;
    static $serv;

    private static $buffers = array();

    /**
     * @param $fd
     * @return swoole_buffer
     */
    static function getBuffer($fd, $create = true)
    {
        if (!isset(self::$buffers[$fd]))
        {
            if (!$create)
            {
                return false;
            }
            self::$buffers[$fd] = new swoole_buffer(1024 * 128);
        }
        return self::$buffers[$fd];
    }
    
    function my_log($msg)
    {
        global $serv;
        if (empty($serv->worker_pid))
        {
            $serv->worker_pid = posix_getpid();
        }
        //echo "#".$serv->worker_pid."\t[".date('H:i:s')."]\t".$msg.PHP_EOL;
    }
    
    function my_onStart(swoole_server $serv)
    {
        global $argv;
        swoole_set_process_name("php {$argv[0]}: master");
        $this->my_log("Server: start.Swoole version is [".SWOOLE_VERSION."]");
        $this->my_log("MasterPid={$serv->master_pid}|Manager_pid={$serv->manager_pid}");
    }
    
    function forkChildInWorker() {
        global $serv;
        $this->my_log("on worker start\n");
        $process = new swoole_process( function (swoole_process $worker) use ($serv) {
    
        });
    
            $pid = $process->start();
            $this->my_log("Fork child process success. pid={$pid}\n");
            //保存子进程对象,这里如果不保存,那对象会被销毁,管道也会被关闭
            $serv->childprocess = $process;
    }
    
    function processRename(swoole_server $serv, $worker_id) {
    
        global $argv;
        if ( $serv->taskworker)
        {
            swoole_set_process_name("php {$argv[0]}: task");
        }
        else
        {
            swoole_set_process_name("php {$argv[0]}: worker");
        }
    
        $this->my_log("WorkerStart: MasterPid={$serv->master_pid}|Manager_pid={$serv->manager_pid}|WorkerId={$serv->worker_id}|WorkerPid={$serv->worker_pid}");
    }
    
    function setTimerInWorker(swoole_server $serv, $worker_id) {
    
        if ($worker_id == 0) {
            $this->my_log("Start: ".microtime(true)."\n");
        }
    }
    
    function my_onShutdown($serv)
    {
        $this->my_log("Server: onShutdown\n");
    }
    
    function my_onClose(swoole_server $serv, $fd, $from_id)
    {
        $this->my_log("Client[$fd@$from_id]: fd=$fd is closed");
        $buffer = Swoole::getBuffer($fd);
        if ($buffer)
        {
            $buffer->clear();
        }
    }
    
    function my_onConnect(swoole_server $serv, $fd, $from_id)
    {
        $this->my_log("Client: Connect --- {$fd}");
    }
    
    function timer_show($id)
    {
        $this->my_log("Timer#$id");
    }
    function my_onWorkerStart(swoole_server $serv, $worker_id)
    {
        $this->processRename($serv, $worker_id);
    
        if (!$serv->taskworker)
        {
            swoole_process::signal(SIGUSR2, function($signo){
                $this->my_log("SIGNAL: $signo\n");
            });
            $serv->defer(function(){
                $this->my_log("defer call\n");
            });
        }
        else
        {
    
        }
    
    }
    
    function my_onWorkerStop($serv, $worker_id)
    {
        $this->my_log("WorkerStop[$worker_id]|pid=".$serv->worker_pid.".\n");
    }
    
    function my_onPacket($serv, $data, $clientInfo)
    {
        $serv->sendto($clientInfo['address'], $clientInfo['port'], "Server " . $data);
        var_dump($clientInfo);
    }
    
    function my_onFinish(swoole_server $serv, $task_id, $data)
    {
        list($str, $fd) = explode('-', $data);
        $serv->send($fd, 'taskok');
        var_dump($str, $fd);
        $this->my_log("AsyncTask Finish: result={$data}. PID=".$serv->worker_pid.PHP_EOL);
    }
    
    function my_onWorkerError(swoole_server $serv, $worker_id, $worker_pid, $exit_code, $signo)
    {
        $this->my_log("worker abnormal exit. WorkerId=$worker_id|Pid=$worker_pid|ExitCode=$exit_code|Signal=$signo\n");
    }
    
    function broadcast(swoole_server $serv, $fd = 0, $data = "hello")
    {
        $start_fd = 0;
        $this->my_log("broadcast\n");
        while(true)
        {
            $conn_list = $serv->connection_list($start_fd, 10);
            if($conn_list === false)
            {
                break;
            }
            $start_fd = end($conn_list);
            foreach($conn_list as $conn)
            {
                if($conn === $fd) continue;
                $ret1 = $serv->send($conn, $data);
            }
        }
    }
    
    /**
     * 开启swoole服务,处理相关异步任务
     */
    public function service_start(){
        $serv = new swoole_server("127.0.0.1", 9501, SWOOLE_PROCESS, SWOOLE_SOCK_TCP);
        $serv->set(config_item('swoole'));
        Swoole::$serv = $serv;
        $serv->on('PipeMessage', function($serv, $src_worker_id, $msg) {
            $this->my_log("PipeMessage: Src={$src_worker_id},Msg=".trim($msg));
            if ($serv->taskworker)
            {
                $serv->sendMessage("hello user process",
                    $src_worker_id);
            }
        });
        
            $serv->on('Start', array($this, 'my_onStart'));
            $serv->on('Connect', array($this, 'my_onConnect'));
            $serv->on('Receive', array($this, 'my_onReceive'));
            $serv->on('Packet', array($this, 'my_onPacket'));
            $serv->on('Close', array($this, 'my_onClose'));
            $serv->on('Shutdown', array($this, 'my_onShutdown'));
            $serv->on('WorkerStart', array($this, 'my_onWorkerStart'));
            $serv->on('WorkerStop', array($this, 'my_onWorkerStop'));
            $serv->on('Task', array($this, 'my_onTask'));
            $serv->on('Finish', array($this, 'my_onFinish'));
            $serv->on('WorkerError', array($this, 'my_onWorkerError'));
            $serv->on('ManagerStart', function($serv) {
                global $argv;
                swoole_set_process_name("php {$argv[0]}: manager");
            });
            $serv->start();
    }

    /**
     * 接收客户端信息
     * @param swoole_server $serv 服务端
     * @param int $fd 主进程ID
     * @param string $from_id 客户端ID
     * @param string $data 数据
     */
    function my_onReceive(swoole_server $serv, $fd, $from_id, $data)
    {
        print_r($data);
        $this->my_log("Worker#{$serv->worker_pid} Client[$fd@$from_id]: received: $data");
        $serv->task($data, -1, function (swoole_server $serv, $task_id, $data)
        {
            $this->my_log("Task Callback: ");
            var_dump($task_id, $data);
        });
    }
    
    function my_onTask(swoole_server $serv, $task_id, $from_id, $data)
    {
        $data=json_decode($data,true);
        switch ($data['cmd']){
            case 'sendemail':
                $this->sendEmail($data);
                break;
            default:
                break;
        }
    }
    
    /**
     * 发送邮件
     * @param  array $data 邮件内容
     */
    private function sendEmail($data){
        //TODO 发送邮件
    }
}
五、客户端Demo

$client = new swoole_client(SWOOLE_SOCK_TCP);
$client->connect('127.0.0.1', 9501);
$data=array(
    'cmd'=>'sendemail',
    'to'=>$to,
    'subject'=>$subject,
    'message'=>$message,
    'type'=>$type
);
$client->send(json_encode($data));
$client->close();

六、swoole服务查看

PHP异步任务之swoole

七、swoole服务关闭

  1. kill 29193

八、swoole后台PHP进程查看

  1. ps aux|grep php

  2. PHP异步任务之swoole

本文出自 “kamil” 博客,转载请与作者联系!