引入背景:假如我们每天有10000个订单生成,需要同步到仓储系统中去,以前做法是开启一个crontab去跑这些任务,但是发现总有感觉同步效率低,间隔时间都是分钟级别的。
解决方案测试:我们将同步订单的任务表添加一个hash作为key,作为分发条件,因为mysql中select如果做mod函数是用不到索引的,所以我们自己做随机hash,但是务必不需要范围太大,以免服务器资源不够,方法是根据hashkey投放到不同的进程中进行同步,测试代码如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
|
<?php
/**
* Created by PhpStorm.
* User: xujun
* Date: 2017/8/26
* Time: 9:37
*/
//假定需要处理的数据如下
class Process{
public $mpid =0;
public $max_precess =5;
//代替从数据库中读取的内容
public $task = [
[ 'uid' =>1, 'uname' => 'bot' , 'hash' =>1, 'handle' => 'test' ],
[ 'uid' =>2, 'uname' => 'bot1' , 'hash' =>2, 'handle' => 'test' ],
[ 'uid' =>3, 'uname' => 'bot2' , 'hash' =>3, 'handle' => 'test' ],
[ 'uid' =>4, 'uname' => 'bot3' , 'hash' =>4, 'handle' => 'test' ],
[ 'uid' =>2, 'uname' => 'bot4' , 'hash' =>2, 'handle' => 'test' ],
[ 'uid' =>3, 'uname' => 'bot5' , 'hash' =>3, 'handle' => 'test' ],
[ 'uid' =>4, 'uname' => 'bot6' , 'hash' =>1, 'handle' => 'test' ],
];
public $works = [];
//public $new_index=0;
function test( $index , $task ){
}
public function __construct(){
try {
$this ->swoole_table = new swoole_table(1024);
$this ->swoole_table->column( 'index' , swoole_table::TYPE_INT); //用于父子进程间数据交换
$this ->swoole_table->create();
$this ->mpid = posix_getpid();
$this ->run();
$this ->processWait();
} catch (\Exception $e ){
die ( 'ALL ERROR: ' . $e ->getMessage());
}
}
public function run(){
for ( $i =0; $i < $this ->max_precess; $i ++) {
$this ->CreateProcess();
}
}
private function getTask( $index ){
$_return = [];
foreach ( $this ->task as $v ){
if ( $v [ 'hash' ]== $index ){
$_return [] = $v ;
}
}
return $_return ;
}
public function CreateProcess( $index =null){
if ( is_null ( $index )){ //如果没有指定了索引,新建的子进程,开启计数
$index = $this ->swoole_table->get( 'index' );
if ( $index === false){
$index = 0;
} else {
$index = $index [ 'index' ]+1;
}
print_r( $index );
}
$this ->swoole_table->set( 'index' , array ( 'index' => $index ));
$process = new swoole_process( function (swoole_process $worker ) use ( $index ){
swoole_set_process_name(sprintf( 'php-ps:%s' , $index ));
$task = $this ->getTask( $index );
foreach ( $task as $v ){
call_user_func_array( array ( $this , $v [ 'handle' ]), array ( $index , $v ));
}
sleep(20);
}, false, false);
$pid = $process ->start();
$this ->works[ $index ]= $pid ;
return $pid ;
}
public function rebootProcess( $ret ){
$pid = $ret [ 'pid' ];
$index = array_search ( $pid , $this ->works);
if ( $index !==false){
$index = intval ( $index );
$new_pid = $this ->CreateProcess( $index );
echo "rebootProcess: {$index}={$new_pid} Done\n" ;
return ;
}
throw new \Exception( 'rebootProcess Error: no pid' );
}
public function processWait(){
while (1) {
if ( count ( $this ->works)){
$ret = swoole_process::wait();
if ( $ret ) {
$this ->rebootProcess( $ret );
}
} else {
break ;
}
}
}
}
$process = new Process();
|
这里代码中,使用了swoole_table作为进程间共享的内存,为了分配index。以及当进程退出后,父进程通过wait重新拉起该进程任务。
测试截图
进程ps
结果 休眠20s后退出后会被自动拉起
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。
原文链接:http://www.cnblogs.com/gavinjunftd/p/7434846.html