-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathplugin.php
164 lines (134 loc) · 4.78 KB
/
plugin.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
<?php
require_once 'php-resque/lib/Resque.php';
require_once 'php-resque/lib/Resque/Worker.php';
Resque_Event::listen('afterEnqueue', array('Resque_Scaler', 'afterEnqueue'));
Resque_Event::listen('beforeFork', array('Resque_Scaler', 'beforeFork'));
class Resque_Scaler
{
// define how many jobs require how many workers.
public static $SCALE_SETTING = array(
15 => 2,
25 => 3,
40 => 4,
60 => 5
);
public static function afterEnqueue($class, $arguments)
{
fwrite(STDOUT, "Job was queued for " . $class . ".\n");
$class_vars = get_class_vars($class);
if(self::check_need_worker($class_vars["queue"])) {
fwrite(STDOUT, "we need more workers\n");
self::add_worker();
} else {
fwrite(STDOUT, "workers is enough.\n");
}
}
public static function beforeFork($job)
{
fwrite(STDOUT, "Just about to performe " . $job . "\n");
if(self::check_kill_worker($job->queue)) {
fwrite(STDOUT, "too many workers...kill this one.\n");
// NOTE: tried to kill with $worker->shutdown but it's not working. use kill to send SIGQUIT instead.
$server_workers = self::server_workers(self::get_all_workers());
$current_workers = $server_workers[self::get_hostname()];
`kill -3 {$current_workers[0]["pid"]}`;
//$worker = $job->worker;
//$worker->shutdown();
} else {
fwrite(STDOUT, "we still need this worker.\n");
}
}
// -----------------
public static function cal_need_worker($queue)
{
$need_worker = 1;
$pending_job_count = Resque::size($queue);
// check if we need more workers
foreach(self::$SCALE_SETTING as $job_count => $worker_count) {
if($pending_job_count > $job_count) {
$need_worker = $worker_count;
}
}
return $need_worker;
}
public static function check_kill_worker($queue)
{
$need_worker = self::cal_need_worker($queue);
$current_worker = sizeof(self::get_all_workers($queue));
return ($current_worker > $need_worker) ? TRUE : FALSE;
}
public static function check_need_worker($queue)
{
$need_worker = self::cal_need_worker($queue);
$current_worker = sizeof(self::get_all_workers($queue));
return ($need_worker > $current_worker) ? TRUE : FALSE;
}
// get worker info directly from redis, bad practice.
// TODO: refactor with a Resque_Scaler_Worker extends Resque_Worker
public static function get_all_workers($queue=NULL)
{
$ret = array();
$workers = Resque::redis()->smembers('workers');
if(!is_array($workers)) {
$workers = array();
}
foreach($workers as $workerId) {
$worker_data = explode(':', $workerId, 3);
$worker = array();
$worker['hostname'] = $worker_data[0];
$worker['queues'] = explode(',', $worker_data[2]);
$worker['pid'] = $worker_data[1];
$worker['workerId'] = $workerId;
if(($queue && (in_array($queue, $worker['queues']) || in_array("*", $worker['queues']))) || !$queue)
{
$ret[] = $worker;
}
}
return $ret;
}
public static function set_backend()
{
Resque::setBackend("localhost:6379");
}
public static function server_workers($workers=array())
{
$ret = array();
foreach($workers as $worker) {
$ret[$worker['hostname']][] = $worker;
}
return $ret;
}
public static function get_hostname()
{
if(function_exists('gethostname')) {
$hostname = gethostname();
}
else {
$hostname = php_uname('n');
}
return $hostname;
}
public static function add_worker()
{
$server_workers = self::server_workers(self::get_all_workers());
$current_workers = $server_workers[self::get_hostname()];
if(sizeof($current_workers) > 0) {
$pid = pcntl_fork();
if($pid == -1) {
die("Could not fork worker ".$i."\n");
}
// Child, start the worker
else if(!$pid) {
// if there are more than 1 types of workers on this machine, we don't know which kind to create. just create the first one.
$worker = new Resque_Worker($current_workers[0]['queues']);
// TODO: set logLevel
//$worker->logLevel = 2;
fwrite(STDOUT, '*** Starting worker '.$worker."\n");
// TODO: set interval
$worker->work();
}
return TRUE;
}
return FALSE;
}
}