Cache应用/任务Mutex,用于高并发任务处理经过多个项目使用

时间:2021-10-19 08:01:19
<?php
/**
* Class Cache redis 用于报表的缓存基本存储和读写 2.0
* <pre>
* Cache::read("diamond.account",$nick);
* Cache::readSync("diamond.account",$nick);
* $finder = Cache::createFinder("diamond.account",$nick);
* $finder->read();
* $finder->readSync();
*
* Cache::save("diamond.account",$nick,$data);
* $storage = Cache::createStorage("diamond.account",$nick);
* $storage->save($data);
* $storage->save($data,7200);
* </pre>
* @category cache
* @package cache
* @author oShine <oyjqdlp@126.com>
* @version 2.0.0.0
* @copyright oShine 2015/08/07
*/
class Cache { /**
* 非安全读取的数据
* @param $key
* @param string $nick
* @return array|null
*/
public static function read($key,$nick = "sys"){
$finder = self::createFinder($key,$nick);
return $finder->read();
} /**
* 同步读取数据
* @param $key
* @param string $nick
* @return mixed
*/
public static function readSync($key,$nick = "sys"){
$finder = self::createFinder($key,$nick);
return $finder->readSync();
} /**
* 创建Finder
* @param $key
* @param string $nick
* @return Finder
*/
public static function createFinder($key,$nick = "sys"){
$key = Generate::key($key,$nick);
return new Finder($key);
} /**
* 创建Storage
* @param $key
* @param string $nick
* @return Storage
*/
public static function createStorage($key,$nick = "sys"){
$key = Generate::key($key,$nick);
return new Storage($key);
} /**
* 保存数据
* @param $key
* @param string $nick
* @param array $data
* @param int $expired
* @return bool
*/
public static function save($key,$nick = "sys",$data = array(),$expired=7200){
$storage = self::createStorage($key,$nick);
return $storage->save($data,$expired);
} /**
* @param string $nick
*/
public static function clear($nick = "sys"){
$redis = CacheFactory::create();
$redis->del($redis->keys(md5($nick).".data.*"));
} } /**
* Class Finder 数据读取
* @category cache
* @package cache
* @author oShine <oyjqdlp@126.com>
* @version 2.0.0.1
* @copyright oShine 2015/08/07
*/
class Finder { /**
* @var string $key
*/
public $key; /**
* @param string $key
*/
public function __construct($key){
$this->key = $key;
} /**
* 非安全读取数据
* @return mixed
*/
public function read(){
$data = $this->readData();
if($data->isRead === true && !$data->isExpired()) {
return json_decode(json_encode($data->data), true);
}
return null;
} /**
* @return Data
*/
protected function readData(){
$redis = CacheFactory::create();
$rptData = new Data();
$data = json_decode($redis->get($this->key));
if(false == $data){
$rptData->isRead = false;
$rptData->expiredTime = time();
$rptData->expired = 24*3600;
}else{
$rptData->expired = $data->expired;
$rptData->isRead = isset($data->isRead) && $data->isRead === true?true:false;
$rptData->expiredTime = $data->expiredTime;
$rptData->data = $data->data;
}
return $rptData;
} /**
* 同步读取数据
* @return mixed
*/
public function readSync(){
while(true){
$rptData = $this->readData();
if($rptData->isRead && !$rptData->isExpired())
return $this->read();
sleep(1);
}
}
} /**
* Class Storage 数据存储
* @category cache
* @package cache
* @author oShine <oyjqdlp@126.com>
* @version 2.0.0.0
* @copyright oShine 2015/08/07
*/
class Storage { /**
* @var string key
*/
public $key; /**
* @param string $key
*/
public function __construct($key){
$this->key = $key;
} /**
* @return bool
*/
public function flush(){
$rptData = new Data();
$rptData->data = null;
$rptData->expiredTime = time();
$rptData->isRead = false;
$rptData->expired = 1;
$redis = CacheFactory::create(); return $redis->setex($this->key, $rptData->expired,json_encode($rptData));
} /**
* 写入数据
* @param $data
* @param int $expired
* @return bool
*/
public function save($data,$expired=7200){ $rptData = new Data();
$rptData->data = $data;
$rptData->expiredTime = time();
$rptData->isRead = true;
$rptData->expired = $expired;
$redis = CacheFactory::create(); return $redis->setex($this->key, $rptData->expired,json_encode($rptData));
}
} /**
* Class Data redis存储数据实体
* @category cache
* @package cache
* @author oShine <oyjqdlp@126.com>
* @version 2.0.0.0
* @copyright oShine 2015/08/07
*/
class Data {
/**
* @var int $expired 失效间隔时长
*/
public $expired;
/**
* @var int
*/
public $expiredTime;
/**
* @var mixed 存储的具体数据
*/
public $data;
/**
* @var bool 是否可以读取
*/
public $isRead; /**
* 是否失效
* @return bool
*/
public function isExpired(){
if(time()-$this->expiredTime > $this->expired)
return true;
return false;
}
} /**
* Class Generate key生成
* @category cache
* @package cache
* @author oShine <oyjqdlp@126.com>
* @version 2.0.0.0
* @copyright oShine 2015/08/07
*/
class Generate {
/**
* @static
* @param $key
* @param $nick
* @return string
*/
public static function key($key,$nick){
return md5($nick).".data.".$key;
}
}

CacheFactory:

<?php
/**
* @category cache
* @package cache
* @author oShine <oyjqdlp@126.com>
* @version 2.0.0.0
* @copyright oShine 2015/08/07
*/
class CacheFactory { /**
* @var Redis $instance
*/
private static $instance = null; /**
* @return Redis
*/
public static function create(){ if(self::$instance == null){
self::$instance = new Redis();
self::$instance->connect(Yii::app()->params["RedisServerIP"]);
}else{
try{
if(preg_match("/PONG/",self::$instance->ping())){
return self::$instance;
}
}catch (Exception $e){
self::$instance = new Redis();
self::$instance->connect(Yii::app()->params["RedisServerIP"]);
}
}
return self::$instance; } }

Mutex:用于任务锁,辅助任务处理,使用Cache记录任务状态,告别表锁任务标记

<?php

/**
* Class Mutex 用于任务锁
* @category mutex
* @package cache
* @author oShine <oyjqdlp@126.com>
* @version 2.0.0.0
* @copyright oShine 2015/08/07
* @example
* Mutex::create($nick)->init("download")->wait()
* Mutex::create($nick)->init("download")->doing()
* Mutex::create($nick)->init("download")->done()
* Mutex::create($nick)->init("download")->error()
*/
class Mutex { /**
* @var string
*/
private $nick; /**
* @param string $nick
*/
public function __construct($nick){
$this->nick = $nick;
} /**
* @param $nick
* @return Mutex
*/
public static function create($nick){
return new self($nick);
} /**
* @param $nick
*/
public static function clear($nick){
$redis = CacheFactory::create();
$redis->del($redis->keys(md5($nick).".mutex.*"));
} /**
* @param $key
* @return MutexStorage
*/
public function init($key){
$key = md5($this->nick).".mutex.".$key;
return new MutexStorage($key, CacheFactory::create());
} } /**
* Class MutexStorage
* @category mutex
* @package cache
* @author oShine <oyjqdlp@126.com>
* @version 2.0.0.0
* @copyright oShine 2015/08/07
*/
class MutexStorage { private $key; /**
* @var Redis $redis
*/
private $redis; public function __construct($key,$redis){
$this->key = $key;
$this->redis = $redis;
} /**
* @return $this
* @throws Exception
*/
public function wait(){
$this->save("WAIT");
return $this;
} /**
* @return $this
* @throws Exception
*/
public function doing(){
$this->save("DOING");
return $this;
} /**
* @return $this
* @throws Exception
*/
public function done(){
$this->save("DONE");
return $this;
} /**
* @return $this
* @throws Exception
*/
public function error(){
$this->save("ERROR");
return $this;
} /**
* @param $data
* @return $this
* @throws Exception
*/
protected function save($data){ $data = json_encode(array("status"=>$data,"date"=>date("Y-m-d"),"timestamp"=>time())); $flag = $this->redis->setex($this->key,3*24*2400,$data);
if(!$flag)
throw new Exception("SAVE Error!");
return $this;
} /**
* @return string|null
*/
protected function get(){ $data = json_decode($this->redis->get($this->key),true);
if(strtotime(date("Y-m-d")) == strtotime($data["date"])){
return $data["status"];
}
return null;
} /**
* @return bool
*/
public function isDoing(){
$data = json_decode($this->redis->get($this->key),true);
if(isset($data) && isset($data["status"]) && $data["status"] == "DOING" && isset($data["timestamp"]) && (time()-$data["timestamp"])<60)
return true;
return false;
} /**
* @return bool
*/
public function isDone(){
$status = $this->get();
if(isset($status) && $status == "DONE")
return true;
return false;
} /**
* @return bool
*/
public function isError(){
$status = $this->get();
if(isset($status) && $status == "ERROR")
return true;
return false;
}
}