-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathPartitioner.js
More file actions
114 lines (101 loc) · 4.89 KB
/
Partitioner.js
File metadata and controls
114 lines (101 loc) · 4.89 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
"use strict"
const cluster = require('cluster');
const jobService = require("./Services/JobService");
const PartitionService = require("./Services/PartitionService");
const jobs = require("./Application/Jobs");
const Lock = require("./Application/ExecuteLocked");
const lock = new Lock();
const Worker = require("./Application/Worker");
const validator = require("validator");
const utils = require("./Application/Utils");
const variables = require("./Application/CommonVariables");
let _workers = [];
let _workerPartitionIndex = 0;
let _numberOfWorkers;
let _logger;
const defaultConfiguration = {
numberOfWorkers: 1,
cleanIdlePartitionsAfterMinutes: 15,
loggerLevel: "error",
consoleLogger: true,
fileLogger: true,
fileLoggerPath: "./logger"
};
let validate = configuration => {
if (!utils.isNull(configuration.numberOfWorkers) && !validator.isInt(configuration.numberOfWorkers, { min: 1 }))
throw new Error("numberOfWorkers should be an integer >= 1");
if (!utils.isNull(configuration.cleanIdlePartitionsAfterMinutes) && !validator.isInt(configuration.cleanIdlePartitionsAfterMinutes, { min: 1 }))
throw new Error("cleanIdlePartitionsAfterMinutes should be an integer >= 1");
if (!utils.isNull(configuration.loggerLevel) && !(
validator.equals(configuration.loggerLevel, 'debug')
|| validator.equals(configuration.loggerLevel, 'info')
|| validator.equals(configuration.loggerLevel, 'warn')
|| validator.equals(configuration.loggerLevel, 'error'))
)
throw new Error("loggerLevel should be debug, info, warn or error");
if (!utils.isNull(configuration.consoleLogger) && !(
validator.equals(configuration.consoleLogger, true)
|| validator.equals(configuration.consoleLogger, false))
)
throw new Error("consoleLogger should be true or false");
if (!utils.isNull(configuration.fileLogger) && !(
validator.equals(configuration.fileLogger, true)
|| validator.equals(configuration.fileLogger, false))
)
throw new Error("fileLogger should be true or false");
if (!utils.isNull(configuration.fileLoggerPath) && typeof (configuration.fileLoggerPath) !== typeof (defaultConfiguration.fileLoggerPath))
throw new Error("fileLoggerPath should be a string");
}
class Partitioner {
constructor(configuration) {
if (cluster.isWorker)
throw new Error("a worker is trying to instantiate a partitioner")
if (configuration)
validate(configuration)
const config = configuration ? configuration : defaultConfiguration
_numberOfWorkers = utils.coalesce(config.numberOfWorkers, defaultConfiguration.numberOfWorkers)
this.partitionService = new PartitionService(utils.coalesce(config.cleanIdlePartitionsAfterMinutes, defaultConfiguration.cleanIdlePartitionsAfterMinutes))
let processEnv = {}
const Logger = require("./Application/Logger")
const consoleLogger = utils.coalesce(config.consoleLogger, defaultConfiguration.consoleLogger)
const fileLogger = utils.coalesce(config.fileLogger, defaultConfiguration.fileLogger)
const fileLoggerPath = utils.coalesce(config.fileLoggerPath, defaultConfiguration.fileLoggerPath)
const loggerLevel = utils.coalesce(config.loggerLevel, defaultConfiguration.loggerLevel)
Logger.new(consoleLogger, loggerLevel, fileLogger, fileLoggerPath).then(log => {
_logger = log
processEnv[variables.loggerLevel] = loggerLevel
processEnv[variables.consoleLogger] = consoleLogger
processEnv[variables.fileLogger] = fileLogger
processEnv[variables.fileLoggerPath] = fileLoggerPath
for (var i = 0; i < _numberOfWorkers; i++) {
_workers.push(new Worker(cluster.fork(processEnv)));
}
})
}
enqueueJob(job, callback) {
if (!utils.areNotNull(job, job.id, job.partitionId, job.type))
throw new Error("Job null or invalid, should contain id, partitionId, type, data: {}")
lock.execWrite(() => {
return this.partitionService.get(job.partitionId)
.then(partition => {
if (utils.isNull(partition)) {
const index = ++_workerPartitionIndex % _numberOfWorkers
return this.partitionService.push(job.partitionId, _workers[index].worker)
} else {
return partition
}
})
}).then(partition => {
jobService.push(job.id, callback).then(() => {
_logger.debug("jobId: %d, partitionId: %d, type: %s, pushed", job.id, job.partitionId, job.type)
partition.worker.send(job)
})
})
}
}
module.exports = {
Partitioner: Partitioner,
registerJob: function (title, func) {
jobs.registerJob(title, func)
}
};