- 打开redis终端,lpush task_list 1 ,2,3,4,5…新建消息队列
- 在App/Lib目录下新建process文件夹,新建,代码如下:
<?php
namespace App\Lib\Process;
use EasySwoole\Component\Process\AbstractProcess;
use Swoole\Process;
use App\Lib\Redis\Redis;
use EasySwoole\EasySwoole\Logger;
class Consumer extends AbstractProcess
{
private $isRun = false;
public function run($arg)
{
$this->addTick(500,function (){
if(!$this->isRun){
$this->isRun = true;
$redis = new \redis();
while (true){
try{
$task = Redis::getInstance()->lPop('task_list');
if($task){
var_dump($task);
Logger::getInstance()->log($this->getProcessName().'---'.$task,Logger::LOG_LEVEL_INFO,'DEBUG');
}else{
break;
}
}catch (\Throwable $throwable){
break;
}
}
$this->isRun = false;
}
});
}
public function onShutDown()
{
}
public function onReceive(string $str, ...$args)
{
}
}
- 在App/Lib目录下新建Redis文件夹,新建,代码如下:
<?php
namespace App\Lib\Redis;
use EasySwoole\Component\Singleton;
use EasySwoole\EasySwoole\Config;
class Redis{
use Singleton;
public $redis='';
private function __construct(){
if(!extension_loaded('redis')){
throw new \Exception('不存在');
}
try{
$this->redis=new \Redis();
$result=$this->redis->connect('127.0.0.1',6379,5);
}catch(\Exception $e){
throw new \Exception($e->getMessage());
}
if($result===false){
throw new \Exception('redis连接失败');
}
}
public function get($key){
if(empty($key)){
return '';
}
return $this->redis->get($key);
}
public function lPop($key){
if(empty($key)){
return '';
}
return $this->redis->lPop($key);
}
public function lPush($key,$val){
if(empty($key)){
return '';
}
return $this->redis->lPush($key,$val);
}
}
- 在根目录下的下找到mainServerCreate方法,添加如下代码:
public static function mainServerCreate(EventRegister $register)
{
$allNum = 3;
for ($i = 0 ;$i < $allNum;$i++){
ServerManager::getInstance()->getSwooleServer()->addProcess((new Consumer("consumer_{$i}"))->getProcess());
}
}
- 添加生产者,在App/HttpController/Api/添加如下方法:
<?php
namespace App\HttpController\Api;
use App\HttpController\Api\Base;
use App\Lib\Redis\Redis;
class Index extends Base
{
function params(){
$data=[
'code'=>0,
'params'=>$this->request()->getRequestParam()
];
return $this->writeJson(200,$data,'获取成功');
}
function getRedis(){
$redis = \EasySwoole\RedisPool\Redis::getInstance()->pool('redis')::defer();
$redis->set('name','牛越洋');
$data = $redis->get('name');
return $this->writeJson(200, $data,'ok');
}
public function pub(){
$result=Redis::getInstance()->lPush('task_list',$this->request()->getRequestParam()['f']);
return $this->writeJson(200, $result,'ok');
}
}