This repository was archived by the owner on Jan 21, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 8
Expand file tree
/
Copy pathmultiProcessManager.php
More file actions
166 lines (154 loc) · 6.56 KB
/
multiProcessManager.php
File metadata and controls
166 lines (154 loc) · 6.56 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
<?php
/**
* Class multiProcessManager
*/
class multiProcessManager {
private $workers = null;
private $thread_name = null;
private $child_pids = [];
private $parent_thread = false;
private $script_last_modified_time = null;
/**
* master_worker constructor.
*
* @param string $thread_name
* @param array $workers
* @param bool $parent_thread_can_continue_execution - Should we keep allow execution of this script to continue?
* true = This script will continue running whilst the workers are still being processed
* false = This script will halt execution at this point until the workers have completed and then execution will
* continue as normal
*
* Regardless of the value specified here, we will keep the child processes running until completion (unless a fatal
* error occurs).
*
*/
public function __construct(string $thread_name, array $workers, bool $parent_thread_can_continue_execution = true) {
$this->setThreadName($thread_name);
$this->workers = $workers;
// Start our workers running
$this->init();
if (!$parent_thread_can_continue_execution) {
$this->doWaitForWorkersToComplete();
}
}
/**
* @param string $thread_name
*/
public function setThreadName(string $thread_name) {
$this->thread_name = ucfirst($thread_name);
}
/**
*
*/
private function init() {
$workers_required = count($this->workers);
for ($i = 0; $i < $workers_required; $i++) {
$pid = pcntl_fork();
$this->child_pids[] = $pid;
if ($pid == -1) {
// Failed to fork
trigger_error('Failed to spawn worker process');
} else if ($pid) {
// We are the parent
$this->parent_thread = true;
} else {
// We are the child
$worker_number = ($i + 1);
cli_set_process_title($this->thread_name . ': Child Process #' . $worker_number);
call_user_func($this->workers[$i]);
break;
}
}
}
/**
*
*/
private function doWaitForWorkersToComplete() {
// Keep running the parent thread running until all the child processes have completed/exited
if ($this->parent_thread) {
while (count($this->child_pids) > 0) {
foreach ($this->child_pids as $key => $pid) {
$res = pcntl_waitpid($pid, $status, WNOHANG);
// If the process has already exited
if ($res == -1 || $res > 0) {
unset($this->child_pids[$key]);
}
// Check for changes in our PHP scripts, if any are found, then we'll restart the scripts processing.
// This helps to prevent leaving workers running for days with out of date code in place.
$this->checkForScriptChange();
}
sleep(1);
}
}
}
/**
* TODO:
* As part of future improvement, this should monitor ALL PHP scripts within the root directory and recurse down
* At the time of writing `inotify` doesn't compile successfully for PHP7 so this need implementing later.
*
* Glob() could be an option if you need to implement support sooner.
*/
protected function checkForScriptChange() {
$this->setPaths();
// At the moment this just covers the PHP file called from command line, in the future, this should cover all
// files within the __CWD__ directory recursively
$script_to_monitor = __CWD__ . DIRECTORY_SEPARATOR . __SCRIPT__;
if ($this->script_last_modified_time === null) {
$this->script_last_modified_time = filemtime($script_to_monitor);
}
clearstatcache(); // PHP caches the result of filemtime(), this flushes its cache
$last_modified = filemtime($script_to_monitor);
if ($last_modified > $this->script_last_modified_time) {
$this->script_last_modified_time = $last_modified;
if (exec('php -l ' . $script_to_monitor) == 'No syntax errors detected in ' . $script_to_monitor) {
$this->terminateChildThreads();
die(exec('php ' . $script_to_monitor));
} else {
// Syntax check failed, don't restart the thread
trigger_error('Syntax check failed on: ' . $script_to_monitor);
}
}
}
/**
* @return bool
*/
private function setPaths(): bool {
if (!defined('__SCRIPT__') || !defined('__CWD__')) {
$script_path = (isset($_SERVER['PATH_TRANSLATED']) ? $_SERVER['PATH_TRANSLATED'] : (isset($_SERVER['PHP_SELF']) ? $_SERVER['PHP_SELF'] : (isset($_SERVER['SCRIPT_NAME']) ? $_SERVER['SCRIPT_NAME'] : null)));
if ($script_path === null) {
trigger_error('Failed to find the current script path. Please ensure you\'re running PHP from the command line.');
return false;
}
if (strstr($script_path, DIRECTORY_SEPARATOR)) {
$script_path_parts = explode(DIRECTORY_SEPARATOR, $script_path);
$script = array_pop($script_path_parts);
$cwd = rtrim(implode(DIRECTORY_SEPARATOR, $script_path_parts), DIRECTORY_SEPARATOR);
} else {
$script = $script_path;
$cwd = getcwd();
}
define('__SCRIPT__', $script);
define('__CWD__', $cwd);
}
return true;
}
/**
*
*/
public function terminateChildThreads() {
if ($this->parent_thread && !empty($this->child_pids)) {
foreach ($this->child_pids as $key => $pid) {
if (posix_kill($pid, 9)) {
unset($this->child_pids[$key]);
}
}
}
}
/**
*
*/
public function __destruct() {
$this->doWaitForWorkersToComplete();
$this->terminateChildThreads();
}
}