-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathParalleProcessor.php
More file actions
114 lines (98 loc) · 2.96 KB
/
ParalleProcessor.php
File metadata and controls
114 lines (98 loc) · 2.96 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
<?php
namespace App\Services;
use Symfony\Component\Process\Process;
class ParalleProcessor
{
private $tasks;
private $processing = [];
private $parallelCount = 3;
private $estimateProcessTime = 1; //in sec
CONST WAITING = 1;
CONST PROCESSING = 2;
CONST COMPLETED = 3;
/**
* ParallelProcessor constructor.
* @param $tasks
* @param array $options
*/
public function __construct($tasks, $options = [])
{
$this->setup($tasks, $options);
}
private function setup($tasks, $options) {
foreach ($tasks as $task) {
$task['state'] = self::WAITING;
$this->tasks[] = $task;
}
foreach ($options as $key => $value) {
$this->$key = $value;
}
}
/**
* This is to start the processing
*/
public function start() {
$this->process();
$this->checkAvailability();
}
private function process() {
while (
(count($this->processing) < $this->parallelCount) &&
$task = $this->getNextTask())
{
$process = Process::fromShellCommandline($task);
$process->setTimeout(0);
$process->disableOutput();
$process->start();
$this->processing[] = $process;
}
}
private function checkAvailability() {
while (count($this->processing) || !$this->isFinished()) {
foreach ($this->processing as $i => $runningProcess) {
if (! $runningProcess->isRunning()) {
unset($this->processing[$i]);
$this->process();
$this->taskFinish($i);
}
}
sleep($this->estimateProcessTime);
}
}
/**
* This function will return if any task is pending for processing and return the command for that, also mark the task in processing
* returns null if no task is remaining
* command: the task execution command
* @return string|null
*/
private function getNextTask() {
foreach ($this->tasks as $i => $task) {
if($task['state'] === self::WAITING) {
$this->tasks[$i]['state'] = self::PROCESSING;
return $task['command'];
}
}
return null;
}
/**
* @param $key
* Mark the task as completed
*/
private function taskFinish($key) {
$this->tasks[$key]['state'] = self::COMPLETED;
}
/**
* To check if all the task execution is completed or not
* return true if all are completed
* return false if some are pending
* @return bool
*/
public function isFinished() {
foreach ($this->tasks as $task) {
if($task['state'] !== self::COMPLETED) {
return false;
}
}
return true;
}
}