From 2ba2c43c0835a45fe9f0f299b12630c5dbe9ba6a Mon Sep 17 00:00:00 2001 From: Andrei Gavrilescu Date: Mon, 18 May 2020 16:12:27 +0300 Subject: [PATCH 1/8] Added Amplitude/WorkerPool/Logging --- .prettierrc | 5 + WorkerPool.js | 131 +++++++++++++++++ app.js | 253 +++++++++++++++++++++----------- cert.pem | 20 +++ config/default.yaml | 2 +- config/production.yaml | 2 +- extract.js | 322 ++++++++++++++++++++++++----------------- key.pem | 27 ++++ logging.js | 109 ++++++++++++-- package-lock.json | 120 +++++++++++++++ package.json | 5 +- store/s3.js | 4 +- utils.js | 25 +++- 13 files changed, 796 insertions(+), 229 deletions(-) create mode 100644 .prettierrc create mode 100644 WorkerPool.js create mode 100644 cert.pem create mode 100644 key.pem diff --git a/.prettierrc b/.prettierrc new file mode 100644 index 00000000..a85dc7d2 --- /dev/null +++ b/.prettierrc @@ -0,0 +1,5 @@ +{ + "singleQuote": true, + "printWidth": 140, + "tabWidth": 4 + } \ No newline at end of file diff --git a/WorkerPool.js b/WorkerPool.js new file mode 100644 index 00000000..07fd0087 --- /dev/null +++ b/WorkerPool.js @@ -0,0 +1,131 @@ +const EventEmitter = require('events'); +const { Worker } = require('worker_threads'); +const uuid = require('uuid'); + +const logger = require('./logging'); + +const WorkerStatus = Object.freeze({ + IDLE: 'IDLE', + STOPPED: 'STOPPED', + RUNNING: 'RUNNING', +}); + +class WorkerPool extends EventEmitter { + constructor(workerScriptPath, poolSize) { + super(); + + this.taskQueue = []; + this.workerPool = []; + this.workerScriptPath = workerScriptPath; + this.poolSize = poolSize; + + for (let i = 0; i < poolSize; ++i) { + this._addWorkerToPool(); + } + } + + _addWorkerToPool() { + const workerMeta = this._createWorker(uuid.v4()); + this.workerPool.push(workerMeta); + this._workerPoolIntrospect(); + + return workerMeta; + } + + _createWorker(workerID) { + const workerInstance = new Worker(this.workerScriptPath, { workerData: { workerID } }); + const workerMeta = { workerID, worker: workerInstance, status: WorkerStatus.IDLE }; + + logger.info('Created worker %s', workerMeta); + + workerInstance.on('message', (message) => { + // logger.info(`Worker message: ${JSON.stringify(message)}`); + this.emit( message.type, message.body); + this._processNextTask(workerMeta); + }); + + workerInstance.on('error', (error) => { + logger.error('Worker <%o> with error %o: ', workerMeta, error); + workerMeta.status = WorkerStatus.STOPPED; + + // this.emit('error', error); + // Remove current worker from pool as it's no longer usable. + this._removeWorkerFromPool(workerMeta); + + // Bring the worker pool back to maximum capacity + // TODO Regenerate should not add more workers if we are shutting down. + this._regenerateWorkerToPool(); + }); + + workerInstance.on('exit', (exitCode) => { + logger.info('Worker %s exited with code %d.', workerMeta, exitCode); + workerMeta.status = WorkerStatus.STOPPED; + + // Remove current worker from pool as it's no longer usable. + this._removeWorkerFromPool(workerMeta); + + // Bring the worker pool back to maximum capacity + this._regenerateWorkerToPool(); + }); + + return workerMeta; + } + + _workerPoolIntrospect() { + const workerPoolInfo = this.workerPool.map((workerMeta) => { + return { uuid: workerMeta.workerID, status: workerMeta.status }; + }); + + logger.info('Worker pool introspect: ', JSON.stringify(workerPoolInfo)); + } + + _removeWorkerFromPool(worker) { + logger.info('Removing worker from pool: ', JSON.stringify(worker)); + const workerIndex = this.workerPool.indexOf(worker); + if (workerIndex > -1) { + this.workerPool.splice(workerIndex, 1); + } + this._workerPoolIntrospect(); + } + + _processTask(workerMeta, task) { + workerMeta.worker.postMessage(task); + workerMeta.status = WorkerStatus.RUNNING; + } + + _processNextTask(workerMeta) { + if (this.taskQueue.length === 0) { + workerMeta.status = WorkerStatus.IDLE; + } else { + this._processTask(workerMeta, this.taskQueue.shift()); + } + } + + _regenerateWorkerToPool() { + if (this.workerPool.length < this.poolSize) { + const workerMeta = this._addWorkerToPool(); + this._processNextTask(workerMeta); + } else { + logger.warn('Can not add additional worker, pool is already at max capacity!'); + } + } + + _getIdleWorkers() { + return this.workerPool.filter((worker) => { + return worker.status === WorkerStatus.IDLE; + }); + } + + addTask(task) { + const idleWorkers = this._getIdleWorkers(); + + if (idleWorkers.length > 0) { + this._processTask(idleWorkers[0], task); + } else { + this.taskQueue.push(task); + logger.info(`There are no IDLE workers queueing, current queue size <${this.taskQueue.length}>`); + } + } +} + +module.exports = WorkerPool; diff --git a/app.js b/app.js index 160f527c..bea4b4de 100644 --- a/app.js +++ b/app.js @@ -1,19 +1,25 @@ 'use strict'; +//const child_process = require('child_process'); const fs = require('fs'); -const config = require('config'); -const uuid = require('uuid'); const os = require('os'); -const child_process = require('child_process'); const http = require('http'); - +const https = require('https'); +const path = require('path'); const WebSocketServer = require('ws').Server; +const config = require('config'); +const uuid = require('uuid'); + const logger = require('./logging'); const obfuscate = require('./obfuscator'); +const { name: appName, version: appVersion } = require('./package'); +const { getEnvName, RequestType, ResponseType } = require('./utils'); +const WorkerPool = require('./WorkerPool'); +// const Amplitude = require('amplitude'); // Configure database, fall back to redshift-firehose. let database; -if (config.gcp && (config.gcp.dataset && config.gcp.table)) { +if (config.gcp && config.gcp.dataset && config.gcp.table) { database = require('./database/bigquery.js')(config.gcp); } if (!database) { @@ -49,93 +55,160 @@ const errored = new prom.Counter({ help: 'number of files with errors during processing', }); -class ProcessQueue { - constructor() { - this.maxProc = os.cpus().length; - this.q = []; - this.numProc = 0; - } - enqueue(clientid) { - this.q.push(clientid); - if (this.numProc < this.maxProc) { - process.nextTick(this.process.bind(this)); - } else { - logger.info('process Q too long: %s', this.numProc); - } - } - process() { - const clientid = this.q.shift(); - if (!clientid) return; - const p = child_process.fork('extract.js', [clientid]); - p.on('exit', (code) => { - this.numProc--; - logger.info(`Done clientid: <${clientid}> proc: <${this.numProc}> code: <${code}>`); - if (code === 0) { - processed.inc(); - } else { - errored.inc(); - } - if (this.numProc < 0) { - this.numProc = 0; - } - if (this.numProc < this.maxProc) { - process.nextTick(this.process.bind(this)); - } - const path = tempPath + '/' + clientid; - store.put(clientid, path) - .then(() => { - fs.unlink(path, () => { }); - }) - .catch((err) => { - logger.error('Error storing: %s - %s', path, err); - fs.unlink(path, () => { }); - }) - }); - p.on('message', (msg) => { - logger.debug('Received message from child process'); - const { url, clientid, connid, clientFeatures, connectionFeatures, streamFeatures } = msg; - database.put(url, clientid, connid, clientFeatures, connectionFeatures, streamFeatures); - }); - p.on('error', () => { - this.numProc--; - logger.warn(`Failed to spawn, rescheduling clientid: <${clientid}> proc: <${this.numProc}>`); - this.q.push(clientid); // do not immediately retry +// class ProcessQueue { +// constructor() { +// this.maxProc = os.cpus().length; +// this.q = []; +// this.numProc = 0; +// } +// enqueue(clientid) { +// this.q.push(clientid); +// if (this.numProc < this.maxProc) { +// process.nextTick(this.process.bind(this)); +// } else { +// logger.info('process Q too long: %s', this.numProc); +// } +// } +// process() { +// const clientid = this.q.shift(); +// if (!clientid) return; +// // const p = child_process.fork('extract.js', [clientid], { +// // execArgv: process.execArgv.concat([ '--inspect-port=5800' ]), +// // }); +// const p = child_process.fork("extract.js", [clientid]); +// p.on('exit', (code) => { +// this.numProc--; +// logger.info(`Done clientid: <${clientid}> proc: <${this.numProc}> code: <${code}>`); +// if (code === 0) { +// processed.inc(); +// } else { +// errored.inc(); +// } +// if (this.numProc < 0) { +// this.numProc = 0; +// } +// if (this.numProc < this.maxProc) { +// process.nextTick(this.process.bind(this)); +// } +// const path = tempPath + '/' + clientid; +// store +// .put(clientid, path) +// .then(() => { +// fs.unlink(path, () => {}); +// }) +// .catch((err) => { +// logger.error('Error storing: %s - %s', path, err); +// fs.unlink(path, () => {}); +// }); +// }); +// p.on('message', (msg) => { +// logger.info('Received message from child process: ', msg); +// const { url, clientid, connid, clientFeatures, connectionFeatures, streamFeatures } = msg; + +// if (database) { +// database.put(url, clientid, connid, clientFeatures, connectionFeatures, streamFeatures); +// } else { +// logger.warn('No database configured!'); +// } +// }); +// p.on('error', () => { +// this.numProc--; +// logger.warn(`Failed to spawn, rescheduling clientid: <${clientid}> proc: <${this.numProc}>`); +// this.q.push(clientid); // do not immediately retry +// }); +// this.numProc++; +// if (this.numProc > 10) { +// logger.info('Process Q: %n', this.numProc); +// } +// } +// } + +function storeDump(clientId){ + const path = tempPath + '/' + clientId; + store + .put(clientId, path) + .then(() => { + fs.unlink(path, () => {}); + }) + .catch((err) => { + logger.error('Error storing: %s - %s', path, err); + fs.unlink(path, () => {}); }); - this.numProc++; - if (this.numProc > 10) { - logger.info('Process Q: %n', this.numProc); - } - } } -const q = new ProcessQueue(); + +function getIdealWorkerCount() { + return os.cpus().length; +} + +const workerScriptPath = path.join(__dirname, './extract.js'); +const workerPool = new WorkerPool(workerScriptPath, getIdealWorkerCount()); + +workerPool.on(ResponseType.PROCESSING, (body) => { + logger.info('Handling PROCESSING event with body %o', body); + const { url, clientId, connid, clientFeatures, connectionFeatures, streamFeatures } = body; + + if (database) { + database.put(url, clientId, connid, clientFeatures, connectionFeatures, streamFeatures); + } else { + logger.warn('No database configured!'); + } +}); +workerPool.on(ResponseType.DONE, (body) => { + logger.info('Handling DONE event with body %o', body); + storeDump(body.clientId); +}); +workerPool.on(ResponseType.ERROR, (body) => { + // TODO handle requeue of the request, this also requires logic in extract.js + // i.e. we need to catch all potential errors and send back a request with + // the client id. + logger.error('Handling ERROR event with body %o', body); + + // If feature extraction failed at least attempt to store the dump in s3. + if (body.clientId) { + storeDump(body.clientId); + } else { + logger.error('Handling ERROR without a clientId field!'); + } + + // TODO At this point adding a retry mechanism can become detrimental, e.g. + // If there is a error with the dump file structure the error would just requeue ad infinitum, + // a smarter mechanism is required here, with some sort of maximum retry per request and so on. + // if (body.clientId) { + // logger.info('Requeued clientId %s', body.clientId); + // workerPool.addTask({ type: RequestType.PROCESS, body: { clientId: body.clientId } }); + // } +}); function setupWorkDirectory() { try { if (fs.existsSync(tempPath)) { - fs.readdirSync(tempPath).forEach(fname => { + fs.readdirSync(tempPath).forEach((fname) => { try { - logger.debug(`Removing file ${tempPath + '/' + fname}`) + logger.debug(`Removing file ${tempPath + '/' + fname}`); fs.unlinkSync(tempPath + '/' + fname); } catch (e) { logger.error(`Error while unlinking file ${fname} - ${e.message}`); } }); } else { - logger.debug(`Creating working dir ${tempPath}`) + logger.debug(`Creating working dir ${tempPath}`); fs.mkdirSync(tempPath); } } catch (e) { logger.error(`Error while accessing working dir ${tempPath} - ${e.message}`); + // The app is probably in an inconsistent state at this point, throw and stop process. + throw e; } } function setupHttpServer(port, keys) { - const options = keys ? { - key: keys.serviceKey, - cert: keys.certificate, - } : {} + const options = { + key: fs.readFileSync('key.pem'), + cert: fs.readFileSync('cert.pem'), + }; - const server = http.Server(options, () => { }) + const server = https + .Server(options, () => {}) .on('request', (request, response) => { switch (request.url) { case '/healthcheck': @@ -152,7 +225,8 @@ function setupHttpServer(port, keys) { } function setupMetricsServer(port) { - const metricsServer = http.Server() + const metricsServer = http + .Server() .on('request', (request, response) => { switch (request.url) { case '/metrics': @@ -178,13 +252,14 @@ function setupWebSocketsServer(server) { // TODO: check against known/valid urls const ua = upgradeReq.headers['user-agent']; - const clientid = uuid.v4(); - let tempStream = fs.createWriteStream(tempPath + '/' + clientid); + const clientId = uuid.v4(); + let tempStream = fs.createWriteStream(tempPath + '/' + clientId); tempStream.on('finish', () => { if (numberOfEvents > 0) { - q.enqueue(clientid); + //q.enqueue(clientid); + workerPool.addTask({ type: RequestType.PROCESS, body: { clientId } }); } else { - fs.unlink(tempPath + '/' + clientid, () => { + fs.unlink(tempPath + '/' + clientId, () => { // we're good... }); } @@ -206,7 +281,7 @@ function setupWebSocketsServer(server) { if (config.server.skipLoadBalancerIp) { forwardedIPs.pop(); } - const obfuscatedIPs = forwardedIPs.map(ip => { + const obfuscatedIPs = forwardedIPs.map((ip) => { const publicIP = ['publicIP', null, ip.trim()]; obfuscate(publicIP); return publicIP[2]; @@ -221,9 +296,9 @@ function setupWebSocketsServer(server) { tempStream.write(JSON.stringify(['publicIP', null, [publicIP[2]], Date.now()]) + '\n'); } - logger.info('New app connected: ua: <%s>, referer: <%s>, clientid: <%s>', ua, referer, clientid); + logger.info('New app connected: ua: <%s>, referer: <%s>, clientid: <%s>', ua, referer, clientId); - client.on('message', msg => { + client.on('message', (msg) => { try { const data = JSON.parse(msg); @@ -243,18 +318,20 @@ function setupWebSocketsServer(server) { tempStream.write(JSON.stringify(data) + '\n'); break; case 'constraints': - if (data[2].constraintsOptional) { // workaround for RtcStats.java bug. + if (data[2].constraintsOptional) { + // workaround for RtcStats.java bug. data[2].optional = []; - Object.keys(data[2].constraintsOptional).forEach(key => { + Object.keys(data[2].constraintsOptional).forEach((key) => { const pair = {}; - pair[key] = data[2].constraintsOptional[key] + pair[key] = data[2].constraintsOptional[key]; }); delete data[2].constraintsOptional; } tempStream.write(JSON.stringify(data) + '\n'); break; default: - if (data[0] === 'getstats' && data[2].values) { // workaround for RtcStats.java bug. + if (data[0] === 'getstats' && data[2].values) { + // workaround for RtcStats.java bug. const { timestamp, values } = data[2]; data[2] = values; data[2].timestamp = timestamp; @@ -268,7 +345,7 @@ function setupWebSocketsServer(server) { } }); - client.on('error', e => { + client.on('error', (e) => { logger.error('Websocket error: %s', e); }); @@ -282,6 +359,12 @@ function setupWebSocketsServer(server) { } function run(keys) { + logger.info('Initializing <%s>, version <%s>, env <%s> ...', appName, appVersion, getEnvName()); + + // const amplitude = new Amplitude('43df878c9fd741a83e0c80bec3a5ddf4') + // data.event_properties = trackObject; + + // amplitude.track(data); setupWorkDirectory(); server = setupHttpServer(config.get('server').port, keys); @@ -291,6 +374,8 @@ function run(keys) { } setupWebSocketsServer(server); + + logger.info('Initialization complete.'); } function stop() { @@ -302,5 +387,5 @@ function stop() { run(); module.exports = { - stop: stop + stop: stop, }; diff --git a/cert.pem b/cert.pem new file mode 100644 index 00000000..ad001456 --- /dev/null +++ b/cert.pem @@ -0,0 +1,20 @@ +-----BEGIN CERTIFICATE----- +MIIDUzCCAjsCFG9JBDNzPV9m0qSVVC9RSV4hyqIkMA0GCSqGSIb3DQEBCwUAMGYx +CzAJBgNVBAYTAlJPMQswCQYDVQQIDAJSTzELMAkGA1UEBwwCUk8xDTALBgNVBAoM +BFRFU1QxCzAJBgNVBAsMAlJPMQswCQYDVQQDDAJSTzEUMBIGCSqGSIb3DQEJARYF +ZHFxd2QwHhcNMjAwNDI2MjAwODI4WhcNNDcwOTExMjAwODI4WjBmMQswCQYDVQQG +EwJSTzELMAkGA1UECAwCUk8xCzAJBgNVBAcMAlJPMQ0wCwYDVQQKDARURVNUMQsw +CQYDVQQLDAJSTzELMAkGA1UEAwwCUk8xFDASBgkqhkiG9w0BCQEWBWRxcXdkMIIB +IjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAplmd7wR974Xw8Ajd3ES7Pg2d +kKALrSU3BLTclpr+q68V9dFCQ4b0Nh2yzzHvj1oSJggD26jNSTB+PDu6zJIN8nM2 +DncFMGCqxvtjIsBWbyASb2Du0RI8GbDfXNXY1R3Az2PlokH//fmq9TbluFXzJ6nW +pzI9riHDbxQV9OhIYAI7o9hXrFaqKqvmSSmBNcce+epD3Wau3nTYATkUn28x5Uml +S01ca0TiEobSgmwwuB5p/6SwLOlDUypBeYwQIM+A8PKtBur7Remzq+iPCJq6fwkc +2jhBsj5yo7mPeSfpcW46JJftVDSrKISKGVxeuBjZBRZpJXWJCLYM0R0xvxkxeQID +AQABMA0GCSqGSIb3DQEBCwUAA4IBAQCV8z9+KpNUSjT9lehDG8dNcGTZyvIG7aBV +YEKiaiHBU2SFBrJfLGAfS2E+l6RwgG3033LXwNwl/0ifnIwEPZ7noKm0F3lPj+fX +V/XDWG1ITqn6IowiLU1PLibs1iwuLjSb09y047WGaZAix+BMxxRBit8/OQ87KC0p +4nL8bwE3Aw7XLYVOXsXqODXdxnFZ0yMV/aPdCf175jjZmLoFkfRNMvqzNBcYGp4W +ddq+C+lLk6oBI9fQsn/Tu33qQlz9bVwZujjF1P9A3vvn1mVBctgK4VBJ4N6E177G +z7P0ZVgj8n8/jGF/oNBh/Wfzs6AUTreTJNvoqfrerq7O21Nh50X5 +-----END CERTIFICATE----- diff --git a/config/default.yaml b/config/default.yaml index 16cccc19..d1096491 100644 --- a/config/default.yaml +++ b/config/default.yaml @@ -4,7 +4,7 @@ server: # Set to true if you've a LB in front of RTCStats and you are obtaining # its IP address as part of the X-Forwarded-For header skipLoadBalancerIp: false - + logLevel: info s3: accessKeyId: secretAccessKey: diff --git a/config/production.yaml b/config/production.yaml index 16cccc19..d1096491 100644 --- a/config/production.yaml +++ b/config/production.yaml @@ -4,7 +4,7 @@ server: # Set to true if you've a LB in front of RTCStats and you are obtaining # its IP address as part of the X-Forwarded-For header skipLoadBalancerIp: false - + logLevel: info s3: accessKeyId: secretAccessKey: diff --git a/extract.js b/extract.js index 7c46fb1b..0903e536 100644 --- a/extract.js +++ b/extract.js @@ -1,7 +1,15 @@ const fs = require('fs'); +const { parentPort, workerData, isMainThread } = require('worker_threads'); -const canUseProcessSend = !!process.send; -const isProduction = process.env.NODE_ENV && process.env.NODE_ENV === 'production'; +const logger = require('./logging'); +const connectionfeatures = require('./features-connection'); +const clientfeatures = require('./features-client'); +const streamfeatures = require('./features-stream'); +const statsDecompressor = require('./getstats-deltacompression').decompress; +const statsMangler = require('./getstats-mangle'); +const { extractTracks, extractStreams, isProduction, ResponseType, RequestType } = require('./utils'); + +// const canUseProcessSend = !!process.send; function capitalize(str) { return str[0].toUpperCase() + str.substr(1); @@ -18,199 +26,255 @@ function safeFeature(feature) { return feature; } -const connectionfeatures = require('./features-connection'); -const clientfeatures = require('./features-client'); -const streamfeatures = require('./features-stream'); -const statsDecompressor = require('./getstats-deltacompression').decompress; -const statsMangler = require('./getstats-mangle'); -const {extractTracks, extractStreams} = require('./utils'); +// check that the sorter was called as a worker thread +if (!isMainThread) { + logger.info('Running feature extract worker thread: %o', workerData); + + // Handle parent requests + parentPort.on('message', (request) => { + switch (request.type) { + case RequestType.PROCESS: { + logger.info('Processing request: %o', request); + try { + processDump(request.body.clientId); + } catch (error) { + parentPort.postMessage({ + type: ResponseType.ERROR, + body: { clientId: request.body.clientId, error }, + }); + } + break; + } + default: { + logger.warn('Unsupported request: %o', request); + } + } + }); +} else { + logger.error('Attempting to run worker thread in main process context!'); +} // dumps all peerconnections. function dump(url, client) { // ignore connections that never send getUserMedia or peerconnection events. if (client.getUserMedia.length === 0 && Object.keys(client.peerConnections).length === 0) return; - if (!isProduction) { + if (!isProduction()) { let total = 0; - Object.keys(client.peerConnections).forEach(id => { + Object.keys(client.peerConnections).forEach((id) => { total += client.peerConnections[id].length; }); - console.log('DUMP', client.getUserMedia.length, Object.keys(client.peerConnections).length, total); + logger.info('DUMP', client.getUserMedia.length, Object.keys(client.peerConnections).length, total); return; } } // Feature generation -function generateFeatures(url, client, clientid) { +function generateFeatures(url, client, clientId) { // ignore connections that never send getUserMedia or peerconnection events. if (client.getUserMedia.length === 0 && Object.keys(client.peerConnections).length === 0) return; + // logger.info(JSON.stringify(client.identity)); // clientFeatures are the same for all peerconnections but are saved together // with each peerconnection anyway to make correlation easier. const clientFeatures = {}; - Object.keys(clientfeatures).forEach(fname => { + Object.keys(clientfeatures).forEach((fname) => { let feature = clientfeatures[fname].apply(null, [client]); if (feature !== undefined) { if (typeof feature === 'object') { - Object.keys(feature).forEach(subname => { + Object.keys(feature).forEach((subname) => { feature[subname] = safeFeature(feature[subname]); - if (!isProduction) { - console.log('PAGE', 'FEATURE', fname + capitalize(subname), '=>', safeFeature(feature[subname])); - } + logger.debug('PAGE', 'FEATURE', fname + capitalize(subname), '=>', safeFeature(feature[subname])); + clientFeatures[fname + capitalize(subname)] = feature[subname]; }); - } else { + } else { feature = safeFeature(feature); - if (!isProduction) { - console.log('PAGE', 'FEATURE', fname, '=>', feature); - } + logger.debug('PAGE', 'FEATURE', fname, '=>', feature); + clientFeatures[fname] = feature; } } }); - if (Object.keys(client.peerConnections).length === 0) { - // we only have GUM and potentially GUM errors. - if (canUseProcessSend && isProduction) { - process.send({url, clientid, connid: '', clientFeatures}); - } - } + // if (Object.keys(client.peerConnections).length === 0) { + // // we only have GUM and potentially GUM errors. + // parentPort.postMessage({ + // type: ResponseType.PROCESSING, + // body: { url, clientId, connid: '', clientFeatures }, + // }); + // } - Object.keys(client.peerConnections).forEach(connid => { + //logger.debug('Client features: ', clientFeatures); + const streamList = []; + const connectionFeatList = []; + + Object.keys(client.peerConnections).forEach((connid) => { if (connid === 'null' || connid === '') return; // ignore the null connid and empty strings const conn = client.peerConnections[connid]; const connectionFeatures = {}; - Object.keys(connectionfeatures).forEach(fname => { + Object.keys(connectionfeatures).forEach((fname) => { let feature = connectionfeatures[fname].apply(null, [client, conn]); if (feature !== undefined) { if (typeof feature === 'object') { - Object.keys(feature).forEach(subname => { + Object.keys(feature).forEach((subname) => { feature[subname] = safeFeature(feature[subname]); - if (!isProduction) { - console.log(connid, 'FEATURE', fname + capitalize(subname), '=>', safeFeature(feature[subname])); - } + logger.debug(connid, 'FEATURE', fname + capitalize(subname), '=>', safeFeature(feature[subname])); + connectionFeatures[fname + capitalize(subname)] = feature[subname]; }); - } else { + } else { feature = safeFeature(feature); - if (!isProduction) { - console.log(connid, 'FEATURE', fname, '=>', safeFeature(feature)); - } + logger.debug(connid, 'FEATURE', fname, '=>', safeFeature(feature)); + connectionFeatures[fname] = feature; } } }); + connectionFeatList.push(connectionFeatures); + const tracks = extractTracks(conn); const streams = extractStreams(tracks); + for (const [streamId, tracks] of streams.entries()) { - const streamFeatures = {streamId}; - for (const {trackId, kind, direction, stats} of tracks) { - Object.keys(streamfeatures).forEach(fname => { - let feature = streamfeatures[fname].apply(null, [{kind, direction, trackId, stats, peerConnectionLog: conn}]); + const streamFeatures = { streamId }; + for (const { trackId, kind, direction, stats } of tracks) { + Object.keys(streamfeatures).forEach((fname) => { + let feature = streamfeatures[fname].apply(null, [{ kind, direction, trackId, stats, peerConnectionLog: conn }]); if (feature !== undefined) { feature = safeFeature(feature); if (typeof feature === 'object') { - Object.keys(feature).forEach(subname => { + Object.keys(feature).forEach((subname) => { feature[subname] = safeFeature(feature[subname]); - if (!isProduction) { - console.log(connid, 'STREAM', streamId, 'TRACK', trackId, 'FEATURE', fname + capitalize(subname), '=>', safeFeature(feature[subname])); - } streamFeatures[fname + capitalize(subname)] = feature[subname]; + logger.debug( + connid, + 'STREAM', + streamId, + 'TRACK', + trackId, + 'FEATURE', + fname + capitalize(subname), + '=>', + safeFeature(feature[subname]) + ); }); - } else { + } else { feature = safeFeature(feature); - if (!isProduction) { - console.log(connid, 'STREAM', streamId, 'TRACK', trackId, 'FEATURE', fname, '=>', safeFeature(feature)); - } streamFeatures[fname] = feature; + logger.debug(connid, 'STREAM', streamId, 'TRACK', trackId, 'FEATURE', fname, '=>', safeFeature(feature)); } } }); } - if (canUseProcessSend && isProduction) { - process.send({url, clientid, connid, clientFeatures, connectionFeatures, streamFeatures}); - } + + streamList.push(streamFeatures); } + + connectionFeatures.streams = streamList; + delete client.peerConnections[connid]; // save memory }); + + const trackObject = { url, clientId, clientFeatures, connectionFeatList }; + + parentPort.postMessage({ type: ResponseType.DONE, body: trackObject }); } -const clientid = process.argv[2]; -const path = 'temp/' + clientid; -fs.readFile(path, {encoding: 'utf-8'}, (err, data) => { - if (err) { - console.error(err, path); - return; - } - const baseStats = {}; - const lines = data.split('\n'); - const client = JSON.parse(lines.shift()); - client.peerConnections = {}; - client.getUserMedia = []; - lines.forEach(line => { - if (line.length) { - const data = JSON.parse(line); - const time = new Date(data.time || data[3]); - delete data.time; - switch(data[0]) { - case 'publicIP': - client.publicIP = data[2]; - break; - case 'userfeedback': // TODO: might be renamed - client.feedback = data[2]; - break; - case 'tags': // experiment variation tags - client.tags = data[2]; - break; - case 'wsconnect': - client.websocketConnectionTime = data[2] >>> 0; - break; - case 'wsconnecterror': - client.websocketError = data[2]; - break; - case 'identity': // identity meta-information when its not possible to feed into RTCPeerConnection. - client.identity = data[2]; - break; - case 'getUserMedia': - case 'getUserMediaOnSuccess': - case 'getUserMediaOnFailure': - case 'navigator.mediaDevices.getUserMedia': - case 'navigator.mediaDevices.getUserMediaOnSuccess': - case 'navigator.mediaDevices.getUserMediaOnFailure': - case 'navigator.getDisplayMedia': - case 'navigator.getDisplayMediaOnSucces': - case 'navigator.mediaDevices.getDisplayMedia': - case 'navigator.mediaDevices.getDisplayMediaOnSuccess': - client.getUserMedia.push({ - time: time, - timestamp: time.getTime(), - type: data[0], - value: data[2] - }); - break; - default: - if (!client.peerConnections[data[1]]) { - client.peerConnections[data[1]] = []; - baseStats[data[1]] = {}; - } - if (data[0] === 'getstats') { // delta-compressed - data[2] = statsDecompressor(baseStats[data[1]], data[2]); - baseStats[data[1]] = JSON.parse(JSON.stringify(data[2])); - } - if (data[0] === 'getStats' || data[0] === 'getstats') { - data[2] = statsMangler(data[2]); - data[0] = 'getStats'; - } - client.peerConnections[data[1]].push({ - time: time, - timestamp: time.getTime(), - type: data[0], - value: data[2], - }); - break; +/** + * Extract a subset of features considered to be more relevant. + * + * @param {Object} features + */ +function extractRelevantStats(features) { + + +} + +function processDump(clientId) { + const path = 'temp/' + clientId; + fs.readFile(path, { encoding: 'utf-8' }, (err, data) => { + try { + if (err) { + throw err; } + const baseStats = {}; + const lines = data.split('\n'); + const client = JSON.parse(lines.shift()); + client.peerConnections = {}; + client.getUserMedia = []; + lines.forEach((line) => { + if (line.length) { + const data = JSON.parse(line); + const time = new Date(data.time || data[3]); + delete data.time; + switch (data[0]) { + case 'publicIP': + client.publicIP = data[2]; + break; + case 'userfeedback': // TODO: might be renamed + client.feedback = data[2]; + break; + case 'tags': // experiment variation tags + client.tags = data[2]; + break; + case 'wsconnect': + client.websocketConnectionTime = data[2] >>> 0; + break; + case 'wsconnecterror': + client.websocketError = data[2]; + break; + case 'identity': // identity meta-information when its not possible to feed into RTCPeerConnection. + client.identity = data[2]; + break; + case 'getUserMedia': + case 'getUserMediaOnSuccess': + case 'getUserMediaOnFailure': + case 'navigator.mediaDevices.getUserMedia': + case 'navigator.mediaDevices.getUserMediaOnSuccess': + case 'navigator.mediaDevices.getUserMediaOnFailure': + case 'navigator.getDisplayMedia': + case 'navigator.getDisplayMediaOnSucces': + case 'navigator.mediaDevices.getDisplayMedia': + case 'navigator.mediaDevices.getDisplayMediaOnSuccess': + client.getUserMedia.push({ + time: time, + timestamp: time.getTime(), + type: data[0], + value: data[2], + }); + break; + default: + if (!client.peerConnections[data[1]]) { + client.peerConnections[data[1]] = []; + baseStats[data[1]] = {}; + } + if (data[0] === 'getstats') { + // delta-compressed + data[2] = statsDecompressor(baseStats[data[1]], data[2]); + baseStats[data[1]] = JSON.parse(JSON.stringify(data[2])); + } + if (data[0] === 'getStats' || data[0] === 'getstats') { + data[2] = statsMangler(data[2]); + data[0] = 'getStats'; + } + client.peerConnections[data[1]].push({ + time: time, + timestamp: time.getTime(), + type: data[0], + value: data[2], + }); + break; + } + } + }); + + dump(client.url, client); + generateFeatures(client.url, client, clientId); + } catch (error) { + parentPort.postMessage({ + type: ResponseType.ERROR, + body: { clientId, error }, + }); } }); - - dump(client.url, client); - generateFeatures(client.url, client, clientid); -}); +} diff --git a/key.pem b/key.pem new file mode 100644 index 00000000..1d7217b3 --- /dev/null +++ b/key.pem @@ -0,0 +1,27 @@ +-----BEGIN RSA PRIVATE KEY----- +MIIEowIBAAKCAQEAplmd7wR974Xw8Ajd3ES7Pg2dkKALrSU3BLTclpr+q68V9dFC +Q4b0Nh2yzzHvj1oSJggD26jNSTB+PDu6zJIN8nM2DncFMGCqxvtjIsBWbyASb2Du +0RI8GbDfXNXY1R3Az2PlokH//fmq9TbluFXzJ6nWpzI9riHDbxQV9OhIYAI7o9hX +rFaqKqvmSSmBNcce+epD3Wau3nTYATkUn28x5UmlS01ca0TiEobSgmwwuB5p/6Sw +LOlDUypBeYwQIM+A8PKtBur7Remzq+iPCJq6fwkc2jhBsj5yo7mPeSfpcW46JJft +VDSrKISKGVxeuBjZBRZpJXWJCLYM0R0xvxkxeQIDAQABAoIBADtwQA9cgocoS8vo +zyVaZbEpekhn92QZrQwAd+VUYnUD7YvVBqFMQkxn1jFUfW2yWFPAf2hoa1mgeyqY +iQl5koQ0CHeorXD4yWyp/GU5Zmj0g8HKV+raYiEn4tewDXcw12kDH9UXUhn0sNJH +mFOCWoyskedR+1oR9FvnSGUm7l1nO1hUoLY8/+yxLKLoKzxJR214sLkeAWawhru+ +C3uviwHaxpqa4bUd14vZwM+MoeilfY/9L7q34OHw7UKjuU8ZYIoeqKBncmR1BQ54 +15Y2mQwzMZJFcWBj0h+9LlCgI8qSzDI1b3GIptFhGZqkL9G2Zf5RmSa6HjfzCMKQ +2EoKWUUCgYEA0tsHejuOjBnhM5FNn1lqWNh+qfxg58oZhH3/YhIgNXQAirCpWbrM +AQp30Y6V/ME2IfD09DaaC2jNNZ9wueAsjahcduUbZS2tMwIiMjOoTSSBsW7CW857 +8QvlJ9KiiX5oOs9OAuGbaxVq/emlwymudiVxMbsa04p9vcPy0Pc0GksCgYEAyfc+ +z1myLLfYma64wWXlKBjTiBr0liZzS8UHZVRlI14q3w1TS2C0YwesNn7Hz6iCCn4k +repPZc+ae7RQcj2tZQGO0vRHAFL+GIdAgf1nt1hcnItLh05Nmi86ECap4kaKChRY +KhAIWcakA/Da88fOsVDBMqU7fLMKrkwg6Y4iCMsCgYBFynGDJ8ta3AYKR4HlyHbG +yYHDSeHZVq9zhzDMiStYBflX7nlfVdDIV2qpVgSXEGyWd2bcnmYGeL3Tjd6F54lX +qe5Q/CxBJQk65O3kp+yA/CBhVkPGl2W2tzU2JSXfVJOzQ4KSuZHzs7ciK//NxTIV +sPbyeve6JRDRitYIDIqWWQKBgQCrxKYcv5JzyeBjxF/JzBl7YrH1XceLNCR22pmR +qpdh3yLjFXgz8Yk5eDsVFfpmOFBxEBut9kuUsV4Xu6F3p9EiyJJqA+um8O6+ebl5 +VMWy/2m0khuodgY2Ddh6CAgQNCIOtILPM1eG0xSHbX8qOlMmJyJJKpJPWg7JcmHD +gWicxQKBgGWxVCWgk0D9C31+Z5b1SwItmc2Iur4HJJF+WR1oSBqRPa33BCymULMt +kInermtwrUlyfN60HCnGlh3LKI97PTlEp1Wqdgas2ZW8e7eYRl6tzZ8B1yD4tqb7 +tM97QJsr+sOXtHTWFmhFFj4fuJbNi0W56OIfu6d7VN2QofXbKIql +-----END RSA PRIVATE KEY----- diff --git a/logging.js b/logging.js index 2f508a71..c277a3f4 100644 --- a/logging.js +++ b/logging.js @@ -1,19 +1,106 @@ -const winston = require('winston'); +const util = require('util'); +const os = require('os'); +const config = require('config'); +const { threadId } = require('worker_threads'); -const { DEBUG } = process.env; +const { createLogger, format, transports } = require('winston'); +require('winston-daily-rotate-file'); -const { combine, splat, timestamp, json } = winston.format; +const { isProduction } = require('./utils'); -const addSeverity = winston.format(logEntry => { - return { severity: logEntry.level.toUpperCase(), ...logEntry }; -}); +if (!config.get('server').logLevel) { + throw new Error('Please set the logLevel config!'); +} + +const { json, colorize } = format; +const LEVEL = Symbol.for('level'); + +function splatTransform(info) { + const args = info[Symbol.for('splat')]; + + if (args) { + info.message = util.format(info.message, ...args); + } + return info; +} + +function metaTransform(logEntry) { + const customMeta = { + timestamp: logEntry.timestamp, + level: logEntry[LEVEL], + PID: process.pid, + TID: threadId, + host: os.hostname(), + }; -const loggerOptions = { - level: DEBUG ? 'debug' : 'info', - format: combine(timestamp(), splat(), addSeverity(), json()), - transports: [new winston.transports.Console()], + logEntry = { ...customMeta, ...logEntry }; + return logEntry; +} + +const fileLogger = format.combine( + format.timestamp({ format: 'YYYY-MM-DD HH:mm:ss.SSS' }), + format(splatTransform)(), + format(metaTransform)(), + json() +); + +const logFileCommonCfg = { + format: fileLogger, + auditFile: 'logs/app-log-audit.json', + datePattern: 'YYYY-MM-DD', + zippedArchive: true, + maxSize: '100m', + maxFiles: '90d', }; -const logger = winston.createLogger(loggerOptions); +const appLogTransport = new transports.DailyRotateFile({ + ...logFileCommonCfg, + level: config.get('server').logLevel, + filename: 'logs/app-%DATE%.log', +}); + +const appErrorLogTransport = new transports.DailyRotateFile({ + ...logFileCommonCfg, + level: 'error', + filename: 'logs/app-error-%DATE%.log', +}); + +const appExceptionLogTransportCfg = { ...logFileCommonCfg }; +delete appExceptionLogTransportCfg.format; + +const appExceptionLogTransport = new transports.DailyRotateFile({ + ...appExceptionLogTransportCfg, + filename: 'logs/app-error-%DATE%.log', +}); + +const logger = createLogger({ + transports: [appLogTransport, appErrorLogTransport], + exceptionHandlers: [appExceptionLogTransport], +}); + +if (!isProduction()) { + + const consoleLogger = format.combine( + format.timestamp({ format: 'YYYY-MM-DD HH:mm:ss.SSS' }), + colorize(), + format(splatTransform)(), + format(metaTransform)(), + format.printf( + ({ level, message, timestamp, PID, TID, host }) => + `${timestamp} ${PID} ${TID} ${host} ${level}: ${message}` + ) + ); + // If we're not in production then also log to the `console` + logger.add( + new transports.Console({ + format: consoleLogger, + level: config.get('server').logLevel, + prettyPrint: true, + handleExceptions: true, + }) + ); +} + +logger.info('Logger successfully initialized.'); module.exports = logger; diff --git a/package-lock.json b/package-lock.json index dfb415c1..76c5c9fd 100644 --- a/package-lock.json +++ b/package-lock.json @@ -219,6 +219,14 @@ "repeat-string": "^1.5.2" } }, + "amplitude": { + "version": "4.0.1", + "resolved": "https://registry.npmjs.org/amplitude/-/amplitude-4.0.1.tgz", + "integrity": "sha512-a35pAUIHiLLyVqBna95PugL9yjJjGve0YuEYiMfnO/hUs633epSK9A3NfbWu5nc98XDi2ZfiSe2J4/XnA+89cA==", + "requires": { + "superagent": "^5.2.1" + } + }, "ansi-align": { "version": "2.0.0", "resolved": "https://registry.npmjs.org/ansi-align/-/ansi-align-2.0.0.tgz", @@ -291,6 +299,11 @@ "resolved": "https://registry.npmjs.org/async-limiter/-/async-limiter-1.0.1.tgz", "integrity": "sha512-csOlWGAcRFJaI6m+F2WKdnMKr4HhdhFVBk0H/QbJFMCr+uO2kwohwXQPxw/9OCxp05r5ghVBFSyioixx3gfkNQ==" }, + "asynckit": { + "version": "0.4.0", + "resolved": "https://registry.npmjs.org/asynckit/-/asynckit-0.4.0.tgz", + "integrity": "sha1-x57Zf380y48robyXkLzDZkdLS3k=" + }, "aws-sdk": { "version": "2.605.0", "resolved": "https://registry.npmjs.org/aws-sdk/-/aws-sdk-2.605.0.tgz", @@ -567,6 +580,19 @@ "text-hex": "1.0.x" } }, + "combined-stream": { + "version": "1.0.8", + "resolved": "https://registry.npmjs.org/combined-stream/-/combined-stream-1.0.8.tgz", + "integrity": "sha512-FQN4MRfuJeHf7cBbBMJFXhKSDq+2kAArBlmRBvcvFE5BB1HZKXtSFASDhdlz9zOYwxh8lDdnvmMOe/+5cdoEdg==", + "requires": { + "delayed-stream": "~1.0.0" + } + }, + "component-emitter": { + "version": "1.3.0", + "resolved": "https://registry.npmjs.org/component-emitter/-/component-emitter-1.3.0.tgz", + "integrity": "sha512-Rd3se6QB+sO1TwqZjscQrurpEPIfO0/yYnSin6Q/rD3mOutHvUrCAhJub3r90uNb+SESBuE0QYoB90YdfatsRg==" + }, "compressible": { "version": "2.0.18", "resolved": "https://registry.npmjs.org/compressible/-/compressible-2.0.18.tgz", @@ -625,6 +651,11 @@ "xdg-basedir": "^4.0.0" } }, + "cookiejar": { + "version": "2.1.2", + "resolved": "https://registry.npmjs.org/cookiejar/-/cookiejar-2.1.2.tgz", + "integrity": "sha512-Mw+adcfzPxcPeI+0WlvRrr/3lGVO0bD75SxX6811cxSh1Wbxx7xZBGK1eVtDf6si8rg2lhnUjsVLMFMfbRIuwA==" + }, "core-util-is": { "version": "1.0.2", "resolved": "https://registry.npmjs.org/core-util-is/-/core-util-is-1.0.2.tgz", @@ -701,6 +732,11 @@ "integrity": "sha1-s2nW+128E+7PUk+RsHD+7cNXzzQ=", "dev": true }, + "delayed-stream": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/delayed-stream/-/delayed-stream-1.0.0.tgz", + "integrity": "sha1-3zrhmayt+31ECqrgsp4icrJOxhk=" + }, "diagnostics": { "version": "1.1.1", "resolved": "https://registry.npmjs.org/diagnostics/-/diagnostics-1.1.1.tgz", @@ -1058,6 +1094,14 @@ "flat-cache": "^2.0.1" } }, + "file-stream-rotator": { + "version": "0.5.7", + "resolved": "https://registry.npmjs.org/file-stream-rotator/-/file-stream-rotator-0.5.7.tgz", + "integrity": "sha512-VYb3HZ/GiAGUCrfeakO8Mp54YGswNUHvL7P09WQcXAJNSj3iQ5QraYSp3cIn1MUyw6uzfgN/EFOarCNa4JvUHQ==", + "requires": { + "moment": "^2.11.2" + } + }, "fill-range": { "version": "7.0.1", "resolved": "https://registry.npmjs.org/fill-range/-/fill-range-7.0.1.tgz", @@ -1084,6 +1128,21 @@ "integrity": "sha512-a1hQMktqW9Nmqr5aktAux3JMNqaucxGcjtjWnZLHX7yyPCmlSV3M54nGYbqT8K+0GhF3NBgmJCc3ma+WOgX8Jg==", "dev": true }, + "form-data": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/form-data/-/form-data-3.0.0.tgz", + "integrity": "sha512-CKMFDglpbMi6PyN+brwB9Q/GOw0eAnsrEZDgcsH5Krhz5Od/haKHAX0NmQfha2zPPz0JpWzA7GJHGSnvCRLWsg==", + "requires": { + "asynckit": "^0.4.0", + "combined-stream": "^1.0.8", + "mime-types": "^2.1.12" + } + }, + "formidable": { + "version": "1.2.2", + "resolved": "https://registry.npmjs.org/formidable/-/formidable-1.2.2.tgz", + "integrity": "sha512-V8gLm+41I/8kguQ4/o1D3RIHRmhYFG4pnNyonvua+40rqcEmT4+V71yaZ3B457xbbgCsCfjSPi65u/W6vK1U5Q==" + }, "fs.realpath": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/fs.realpath/-/fs.realpath-1.0.0.tgz", @@ -1688,6 +1747,11 @@ "is-buffer": "~1.1.1" } }, + "methods": { + "version": "1.1.2", + "resolved": "https://registry.npmjs.org/methods/-/methods-1.1.2.tgz", + "integrity": "sha1-VSmk1nZUE07cxSZmVoNbD4Ua/O4=" + }, "mime": { "version": "2.4.4", "resolved": "https://registry.npmjs.org/mime/-/mime-2.4.4.tgz", @@ -1742,6 +1806,11 @@ } } }, + "moment": { + "version": "2.25.3", + "resolved": "https://registry.npmjs.org/moment/-/moment-2.25.3.tgz", + "integrity": "sha512-PuYv0PHxZvzc15Sp8ybUCoQ+xpyPWvjOuK72a5ovzp2LI32rJXOiIfyoFoYvG3s6EwwrdkMyWuRiEHSZRLJNdg==" + }, "ms": { "version": "2.1.2", "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.2.tgz", @@ -1834,6 +1903,11 @@ "path-key": "^2.0.0" } }, + "object-hash": { + "version": "2.0.3", + "resolved": "https://registry.npmjs.org/object-hash/-/object-hash-2.0.3.tgz", + "integrity": "sha512-JPKn0GMu+Fa3zt3Bmr66JhokJU5BaNBIh4ZeTlaCBzrBsOeXzwcKKAK1tbLiPKgvwmPXsDvvLHoWh5Bm7ofIYg==" + }, "once": { "version": "1.4.0", "resolved": "https://registry.npmjs.org/once/-/once-1.4.0.tgz", @@ -2034,6 +2108,11 @@ "resolved": "https://registry.npmjs.org/punycode/-/punycode-1.3.2.tgz", "integrity": "sha1-llOgNvt8HuQjQvIyXM7v6jkmxI0=" }, + "qs": { + "version": "6.9.3", + "resolved": "https://registry.npmjs.org/qs/-/qs-6.9.3.tgz", + "integrity": "sha512-EbZYNarm6138UKKq46tdx08Yo/q9ZhFoAXAI1meAFd2GtbRDhbZY2WQSICskT0c5q99aFzLG1D4nvTk9tqfXIw==" + }, "querystring": { "version": "0.2.0", "resolved": "https://registry.npmjs.org/querystring/-/querystring-0.2.0.tgz", @@ -2348,6 +2427,36 @@ "resolved": "https://registry.npmjs.org/stubs/-/stubs-3.0.0.tgz", "integrity": "sha1-6NK6H6nJBXAwPAMLaQD31fiavls=" }, + "superagent": { + "version": "5.2.2", + "resolved": "https://registry.npmjs.org/superagent/-/superagent-5.2.2.tgz", + "integrity": "sha512-pMWBUnIllK4ZTw7p/UaobiQPwAO5w/1NRRTDpV0FTVNmECztsxKspj3ZWEordVEaqpZtmOQJJna4yTLyC/q7PQ==", + "requires": { + "component-emitter": "^1.3.0", + "cookiejar": "^2.1.2", + "debug": "^4.1.1", + "fast-safe-stringify": "^2.0.7", + "form-data": "^3.0.0", + "formidable": "^1.2.1", + "methods": "^1.1.2", + "mime": "^2.4.4", + "qs": "^6.9.1", + "readable-stream": "^3.4.0", + "semver": "^6.3.0" + }, + "dependencies": { + "readable-stream": { + "version": "3.6.0", + "resolved": "https://registry.npmjs.org/readable-stream/-/readable-stream-3.6.0.tgz", + "integrity": "sha512-BViHy7LKeTz4oNnkcLJ+lVSL6vpiFeX6/d3oSH8zCW7UxP2onchk+vTGB143xuFjHS3deTgkKoXXymXqymiIdA==", + "requires": { + "inherits": "^2.0.3", + "string_decoder": "^1.1.1", + "util-deprecate": "^1.0.1" + } + } + } + }, "supports-color": { "version": "5.5.0", "resolved": "https://registry.npmjs.org/supports-color/-/supports-color-5.5.0.tgz", @@ -2773,6 +2882,17 @@ } } }, + "winston-daily-rotate-file": { + "version": "4.4.2", + "resolved": "https://registry.npmjs.org/winston-daily-rotate-file/-/winston-daily-rotate-file-4.4.2.tgz", + "integrity": "sha512-pVOUJKxN+Kn6LnOJZ4tTwdV5+N+fCkiRAb3bVnzcPtOj1ScxGNC3DyUhHuAHssBtMl5s45/aUcSUtApH+69V5A==", + "requires": { + "file-stream-rotator": "^0.5.7", + "object-hash": "^2.0.1", + "triple-beam": "^1.3.0", + "winston-transport": "^4.2.0" + } + }, "winston-transport": { "version": "4.3.0", "resolved": "https://registry.npmjs.org/winston-transport/-/winston-transport-4.3.0.tgz", diff --git a/package.json b/package.json index 39533d6a..55157272 100644 --- a/package.json +++ b/package.json @@ -6,7 +6,8 @@ "scripts": { "test": "eslint *.js database/*.js store/*.js && node test/client.js", "start": "NODE_ENV=production node app.js", - "watch:dev": "nodemon --exec 'NODE_ENV=production node app.js'" + "watch:dev": "nodemon --exec 'NODE_ENV=debug LOG=debug node app.js'", + "debug": "NODE_ENV=debug LOG=debug node app.js" }, "repository": { "type": "git", @@ -21,6 +22,7 @@ "dependencies": { "@google-cloud/bigquery": "^4.5.0", "@google-cloud/storage": "^4.1.3", + "amplitude": "^4.0.1", "aws-sdk": "^2.441.0", "bluebird": "^3.3.1", "config": "^1.17.1", @@ -31,6 +33,7 @@ "sdp": "^2.9.0", "uuid": "^2.0.1", "winston": "^3.2.1", + "winston-daily-rotate-file": "^4.4.2", "ws": "^5.1.1" }, "devDependencies": { diff --git a/store/s3.js b/store/s3.js index dda438f3..55fcf327 100644 --- a/store/s3.js +++ b/store/s3.js @@ -3,6 +3,8 @@ const fs = require('fs'); const AWS = require('aws-sdk'); +const logger = require('../logging'); + module.exports = function (config) { AWS.config = config; @@ -17,7 +19,7 @@ module.exports = function (config) { put: function (key, filename) { return new Promise((resolve, reject) => { if (!configured) { - console.log('no bucket configured for storage'); + logger.warn('no bucket configured for storage'); return resolve(); // not an error. } fs.readFile(filename, { encoding: 'utf-8' }, (err, data) => { diff --git a/utils.js b/utils.js index d338d391..28b253b3 100644 --- a/utils.js +++ b/utils.js @@ -1,4 +1,5 @@ /* feature extraction utils */ +const logger = require('./logging'); function capitalize(str) { return str[0].toUpperCase() + str.substr(1); @@ -86,7 +87,7 @@ function extractTracks(peerConnectionLog) { tracks.get(key).stats.push(report); } } else if (trackIdentifier !== undefined) { - console.log('NO ONTRACK FOR', trackIdentifier, report.ssrc); + logger.debug('NO ONTRACK FOR', trackIdentifier, report.ssrc); } } }); @@ -127,12 +128,34 @@ function isIceConnected({type, value}) { return type === 'oniceconnectionstatechange' && ['connected', 'completed'].includes(value); } +function getEnvName() { + return process.env.NODE_ENV || 'default'; +} + +function isProduction() { + return getEnvName() === 'production'; +} + +const RequestType = Object.freeze({ + PROCESS: 'PROCESS', +}); + +const ResponseType = Object.freeze({ + PROCESSING: 'PROCESSING', + DONE: 'DONE', + ERROR: 'ERROR', +}); + module.exports = { capitalize, extractTracks, extractStreams, + getEnvName, isIceConnected, + isProduction, mode, standardizedMoment, timeBetween, + RequestType, + ResponseType } From faa6c8be3134da66496ed4df8839d5a8bd1e9a4a Mon Sep 17 00:00:00 2001 From: Andrei Gavrilescu Date: Fri, 22 May 2020 14:18:14 +0300 Subject: [PATCH 2/8] Add amplitude connector. --- WorkerPool.js | 48 ++++++++------- app.js | 105 ++++++++------------------------- config/default.yaml | 3 + database/AmplitudeConnector.js | 84 ++++++++++++++++++++++++++ extract.js | 37 +++++------- features-connection.js | 2 + logging.js | 50 +++++++++++++--- 7 files changed, 196 insertions(+), 133 deletions(-) create mode 100644 database/AmplitudeConnector.js diff --git a/WorkerPool.js b/WorkerPool.js index 07fd0087..fa8443c2 100644 --- a/WorkerPool.js +++ b/WorkerPool.js @@ -10,6 +10,11 @@ const WorkerStatus = Object.freeze({ RUNNING: 'RUNNING', }); +/** + * The WorkerPool implementation will attempt to always keep the set number of worker running, that means in case of + * an error or a exit due to something happening inside the worker script, it will spawn a new one. + * However when the processes exits from the main thread or SIGKILL/SIGTERM it will shutdown as expected. + */ class WorkerPool extends EventEmitter { constructor(workerScriptPath, poolSize) { super(); @@ -36,35 +41,29 @@ class WorkerPool extends EventEmitter { const workerInstance = new Worker(this.workerScriptPath, { workerData: { workerID } }); const workerMeta = { workerID, worker: workerInstance, status: WorkerStatus.IDLE }; - logger.info('Created worker %s', workerMeta); + logger.info('Created worker %j', workerMeta); workerInstance.on('message', (message) => { // logger.info(`Worker message: ${JSON.stringify(message)}`); - this.emit( message.type, message.body); + this.emit(message.type, message.body); this._processNextTask(workerMeta); }); + // Uncaught error thrown in the worker script, a exit event will follow so we just log the error. workerInstance.on('error', (error) => { - logger.error('Worker <%o> with error %o: ', workerMeta, error); - workerMeta.status = WorkerStatus.STOPPED; - - // this.emit('error', error); - // Remove current worker from pool as it's no longer usable. - this._removeWorkerFromPool(workerMeta); - - // Bring the worker pool back to maximum capacity - // TODO Regenerate should not add more workers if we are shutting down. - this._regenerateWorkerToPool(); + logger.error('Worker %j with error %o: ', workerMeta, error); }); workerInstance.on('exit', (exitCode) => { - logger.info('Worker %s exited with code %d.', workerMeta, exitCode); + logger.info('Worker %j exited with code %d.', workerMeta, exitCode); workerMeta.status = WorkerStatus.STOPPED; // Remove current worker from pool as it's no longer usable. this._removeWorkerFromPool(workerMeta); - // Bring the worker pool back to maximum capacity + // Bring the worker pool back to maximum capacity. When the main thread is trying to exit + // this won't work as the creation is queued via a setTimeout, so an infinite loop shouldn't + // happen. this._regenerateWorkerToPool(); }); @@ -76,11 +75,11 @@ class WorkerPool extends EventEmitter { return { uuid: workerMeta.workerID, status: workerMeta.status }; }); - logger.info('Worker pool introspect: ', JSON.stringify(workerPoolInfo)); + logger.info('Worker pool introspect: %j ', workerPoolInfo); } _removeWorkerFromPool(worker) { - logger.info('Removing worker from pool: ', JSON.stringify(worker)); + logger.info('Removing worker from pool: %j', worker); const workerIndex = this.workerPool.indexOf(worker); if (workerIndex > -1) { this.workerPool.splice(workerIndex, 1); @@ -89,6 +88,7 @@ class WorkerPool extends EventEmitter { } _processTask(workerMeta, task) { + logger.info(`Processing task %j, current queue size %d`, task, this.taskQueue.length); workerMeta.worker.postMessage(task); workerMeta.status = WorkerStatus.RUNNING; } @@ -102,12 +102,16 @@ class WorkerPool extends EventEmitter { } _regenerateWorkerToPool() { - if (this.workerPool.length < this.poolSize) { - const workerMeta = this._addWorkerToPool(); - this._processNextTask(workerMeta); - } else { - logger.warn('Can not add additional worker, pool is already at max capacity!'); - } + // timeout is required here so the regeneration process doesn't enter an infinite loop + // when node.js is attempting to shutdown. + setTimeout(() => { + if (this.workerPool.length < this.poolSize) { + const workerMeta = this._addWorkerToPool(); + this._processNextTask(workerMeta); + } else { + logger.warn('Can not add additional worker, pool is already at max capacity!'); + } + }, 2000); } _getIdleWorkers() { diff --git a/app.js b/app.js index bea4b4de..29b1b787 100644 --- a/app.js +++ b/app.js @@ -10,12 +10,12 @@ const WebSocketServer = require('ws').Server; const config = require('config'); const uuid = require('uuid'); +const AmplitudeConnector = require('./database/AmplitudeConnector'); const logger = require('./logging'); const obfuscate = require('./obfuscator'); const { name: appName, version: appVersion } = require('./package'); const { getEnvName, RequestType, ResponseType } = require('./utils'); const WorkerPool = require('./WorkerPool'); -// const Amplitude = require('amplitude'); // Configure database, fall back to redshift-firehose. let database; @@ -35,6 +35,11 @@ if (!store) { store = require('./store/s3.js')(config.s3); } +let amplitude; +if (config.amplitude && config.amplitude.key) { + amplitude = new AmplitudeConnector(config.amplitude.key); +} + let server; const tempPath = 'temp'; @@ -55,75 +60,7 @@ const errored = new prom.Counter({ help: 'number of files with errors during processing', }); -// class ProcessQueue { -// constructor() { -// this.maxProc = os.cpus().length; -// this.q = []; -// this.numProc = 0; -// } -// enqueue(clientid) { -// this.q.push(clientid); -// if (this.numProc < this.maxProc) { -// process.nextTick(this.process.bind(this)); -// } else { -// logger.info('process Q too long: %s', this.numProc); -// } -// } -// process() { -// const clientid = this.q.shift(); -// if (!clientid) return; -// // const p = child_process.fork('extract.js', [clientid], { -// // execArgv: process.execArgv.concat([ '--inspect-port=5800' ]), -// // }); -// const p = child_process.fork("extract.js", [clientid]); -// p.on('exit', (code) => { -// this.numProc--; -// logger.info(`Done clientid: <${clientid}> proc: <${this.numProc}> code: <${code}>`); -// if (code === 0) { -// processed.inc(); -// } else { -// errored.inc(); -// } -// if (this.numProc < 0) { -// this.numProc = 0; -// } -// if (this.numProc < this.maxProc) { -// process.nextTick(this.process.bind(this)); -// } -// const path = tempPath + '/' + clientid; -// store -// .put(clientid, path) -// .then(() => { -// fs.unlink(path, () => {}); -// }) -// .catch((err) => { -// logger.error('Error storing: %s - %s', path, err); -// fs.unlink(path, () => {}); -// }); -// }); -// p.on('message', (msg) => { -// logger.info('Received message from child process: ', msg); -// const { url, clientid, connid, clientFeatures, connectionFeatures, streamFeatures } = msg; - -// if (database) { -// database.put(url, clientid, connid, clientFeatures, connectionFeatures, streamFeatures); -// } else { -// logger.warn('No database configured!'); -// } -// }); -// p.on('error', () => { -// this.numProc--; -// logger.warn(`Failed to spawn, rescheduling clientid: <${clientid}> proc: <${this.numProc}>`); -// this.q.push(clientid); // do not immediately retry -// }); -// this.numProc++; -// if (this.numProc > 10) { -// logger.info('Process Q: %n', this.numProc); -// } -// } -// } - -function storeDump(clientId){ +function storeDump(clientId) { const path = tempPath + '/' + clientId; store .put(clientId, path) @@ -144,17 +81,19 @@ const workerScriptPath = path.join(__dirname, './extract.js'); const workerPool = new WorkerPool(workerScriptPath, getIdealWorkerCount()); workerPool.on(ResponseType.PROCESSING, (body) => { - logger.info('Handling PROCESSING event with body %o', body); - const { url, clientId, connid, clientFeatures, connectionFeatures, streamFeatures } = body; + logger.debug('Handling PROCESSING event with body %j', body); - if (database) { - database.put(url, clientId, connid, clientFeatures, connectionFeatures, streamFeatures); - } else { - logger.warn('No database configured!'); - } + amplitude && amplitude.track(body); + // const { url, clientId, connid, clientFeatures, connectionFeatures, streamFeatures } = body; + + // if (database) { + // database.put(url, clientId, connid, clientFeatures, connectionFeatures, streamFeatures); + // } else { + // logger.warn('No database configured!'); + // } }); workerPool.on(ResponseType.DONE, (body) => { - logger.info('Handling DONE event with body %o', body); + logger.debug('Handling DONE event with body %j', body); storeDump(body.clientId); }); workerPool.on(ResponseType.ERROR, (body) => { @@ -361,10 +300,6 @@ function setupWebSocketsServer(server) { function run(keys) { logger.info('Initializing <%s>, version <%s>, env <%s> ...', appName, appVersion, getEnvName()); - // const amplitude = new Amplitude('43df878c9fd741a83e0c80bec3a5ddf4') - // data.event_properties = trackObject; - - // amplitude.track(data); setupWorkDirectory(); server = setupHttpServer(config.get('server').port, keys); @@ -384,6 +319,12 @@ function stop() { } } +// For now just log unhandled promise rejections, as the initial code did not take them into account and by default +// node just silently eats them. +process.on('unhandledRejection', (reason) => { + logger.error('Unhandled rejection: ', reason); +}); + run(); module.exports = { diff --git a/config/default.yaml b/config/default.yaml index d1096491..ff6a55b0 100644 --- a/config/default.yaml +++ b/config/default.yaml @@ -5,6 +5,9 @@ server: # its IP address as part of the X-Forwarded-For header skipLoadBalancerIp: false logLevel: info +amplitude: + key: '' + s3: accessKeyId: secretAccessKey: diff --git a/database/AmplitudeConnector.js b/database/AmplitudeConnector.js new file mode 100644 index 00000000..910c7c1a --- /dev/null +++ b/database/AmplitudeConnector.js @@ -0,0 +1,84 @@ +const Amplitude = require('amplitude'); + +const logger = require('../logging'); + +class AmplitudeConnector { + constructor(key, options) { + if (!key) { + throw new Error('[Amplitude] Please provide an amplitude key!'); + } + + this.amplitude = new Amplitude(key, options); + } + + /** + * Extract a subset of features considered to be more relevant. + * + * @param {Object} connectionFeatures + */ + extractRelevantStats(connectionFeatures) { + const filteredFeature = {}; + + // TODO Use object destructuring for a more clean approach. + filteredFeature.lifeTime = connectionFeatures.lifeTime; + filteredFeature.ICEFailure = connectionFeatures.ICEFailure; + filteredFeature.connectionTime = connectionFeatures.connectionTime; + filteredFeature.numberOfRemoteStreams = connectionFeatures.numberOfRemoteStreams; + filteredFeature.sessionDuration = connectionFeatures.sessionDuration; + filteredFeature.numberOfLocalSimulcastStreams = connectionFeatures.numberOfLocalSimulcastStreams; + filteredFeature.bytesTotalSent = connectionFeatures.bytesTotalSent; + filteredFeature.bytesTotalReceived = connectionFeatures.bytesTotalReceived; + filteredFeature.statsMeanRoundTripTime = connectionFeatures.statsMeanRoundTripTime; + filteredFeature.statsMeanReceivingBitrate = connectionFeatures.statsMeanReceivingBitrate; + filteredFeature.statsMeanSendingBitrate = connectionFeatures.statsMeanSendingBitrate; + filteredFeature.firstCandidatePairType = connectionFeatures.firstCandidatePairType; + filteredFeature.bweGoogActualEncBitrateMean = connectionFeatures.bweGoogActualEncBitrateMean; + filteredFeature.bweGoogRetransmitBitrateMean = connectionFeatures.bweGoogRetransmitBitrateMean; + filteredFeature.bweGoogTargetEncBitrateMean = connectionFeatures.bweGoogTargetEncBitrateMean; + filteredFeature.bweGoogTransmitBitrateMean = connectionFeatures.bweGoogTransmitBitrateMean; + filteredFeature.bweAvailableOutgoingBitrateMean = connectionFeatures.bweAvailableOutgoingBitrateMean; + filteredFeature.bweAvailableIncomingBitrateMean = connectionFeatures.bweAvailableIncomingBitrateMean; + + return filteredFeature; + } + + track(rtcstatsFeatures) { + try { + // TODO Add checks for identity info using object destructuring. + + // if (!rtcstatsFeatures.identity.userId || !rtcstatsFeatures.identity.deviceId || !rtcstatsFeatures.identity.sessionId) { + // throw new Error('[Amplitude] userId, deviceId and sessionId must be present, provided values userId: %s, deviceId: %s, sessionId: %s.'); + // } + + const amplitudeEvent = { + event_type: 'rtcstats-publish', + user_id: rtcstatsFeatures.identity.userId, + device_id: rtcstatsFeatures.identity.deviceId, + session_id: rtcstatsFeatures.identity.sessionId, + event_properties: { + rtcstatsIdentity: rtcstatsFeatures.clientId, + ...rtcstatsFeatures.identity.hosts, + ...rtcstatsFeatures.identity.deployInfo, + ...this.extractRelevantStats(rtcstatsFeatures.connectionFeatures), + }, + }; + + this.amplitude + .track(amplitudeEvent) + .then(() => + logger.info( + '[Amplitude] Sent event: rtcstats clientId: %s, user_id: %s, device_id: %s, session_id: %s', + rtcstatsFeatures.clientId, + amplitudeEvent.user_id, + amplitudeEvent.device_id, + amplitudeEvent.session_id + ) + ) + .catch((error) => logger.error('[Amplitude] track promise failed for event %j error: %s', amplitudeEvent, error.message)); + } catch (error) { + logger.error('[Amplitude] Failed to send rtcstats features %j with error: %s', rtcstatsFeatures, error.message); + } + } +} + +module.exports = AmplitudeConnector; diff --git a/extract.js b/extract.js index 0903e536..c85a4275 100644 --- a/extract.js +++ b/extract.js @@ -28,25 +28,25 @@ function safeFeature(feature) { // check that the sorter was called as a worker thread if (!isMainThread) { - logger.info('Running feature extract worker thread: %o', workerData); - + logger.info('Running feature extract worker thread: %j', workerData); + // throw new Error("Heavy"); // Handle parent requests parentPort.on('message', (request) => { switch (request.type) { case RequestType.PROCESS: { - logger.info('Processing request: %o', request); + logger.info('Worker is processing request: %j', request); try { processDump(request.body.clientId); } catch (error) { parentPort.postMessage({ type: ResponseType.ERROR, - body: { clientId: request.body.clientId, error }, + body: { clientId: request.body.clientId, error: { ...error } }, }); } break; } default: { - logger.warn('Unsupported request: %o', request); + logger.warn('Unsupported request: %j', request); } } }); @@ -95,6 +95,7 @@ function generateFeatures(url, client, clientId) { } } }); + // if (Object.keys(client.peerConnections).length === 0) { // // we only have GUM and potentially GUM errors. // parentPort.postMessage({ @@ -103,9 +104,9 @@ function generateFeatures(url, client, clientId) { // }); // } - //logger.debug('Client features: ', clientFeatures); + logger.debug('Client features: ', clientFeatures); + const streamList = []; - const connectionFeatList = []; Object.keys(client.peerConnections).forEach((connid) => { if (connid === 'null' || connid === '') return; // ignore the null connid and empty strings @@ -130,7 +131,6 @@ function generateFeatures(url, client, clientId) { } }); - connectionFeatList.push(connectionFeatures); const tracks = extractTracks(conn); const streams = extractStreams(tracks); @@ -172,22 +172,16 @@ function generateFeatures(url, client, clientId) { connectionFeatures.streams = streamList; + parentPort.postMessage({ + type: ResponseType.PROCESSING, + body: { clientId, connid, identity: client.identity, connectionFeatures }, + }); + delete client.peerConnections[connid]; // save memory }); - const trackObject = { url, clientId, clientFeatures, connectionFeatList }; - - parentPort.postMessage({ type: ResponseType.DONE, body: trackObject }); -} - -/** - * Extract a subset of features considered to be more relevant. - * - * @param {Object} features - */ -function extractRelevantStats(features) { - + parentPort.postMessage({ type: ResponseType.DONE, body: {clientId} }); } function processDump(clientId) { @@ -197,6 +191,7 @@ function processDump(clientId) { if (err) { throw err; } + const baseStats = {}; const lines = data.split('\n'); const client = JSON.parse(lines.shift()); @@ -273,7 +268,7 @@ function processDump(clientId) { } catch (error) { parentPort.postMessage({ type: ResponseType.ERROR, - body: { clientId, error }, + body: { clientId, error } }, }); } }); diff --git a/features-connection.js b/features-connection.js index 306112cb..7c681536 100644 --- a/features-connection.js +++ b/features-connection.js @@ -177,6 +177,8 @@ module.exports = { return lifeTime > 0 ? lifeTime : undefined; }, + // Time in which the connection was in a potential sending state. Calculated + // as the difference between the first setLocalDescription call and the last PC log. sendingDuration: function(client, peerConnectionLog) { let sendingDuration = 0; let prevTime = peerConnectionLog[0].timestamp; diff --git a/logging.js b/logging.js index c277a3f4..8652a5bd 100644 --- a/logging.js +++ b/logging.js @@ -6,24 +6,39 @@ const { threadId } = require('worker_threads'); const { createLogger, format, transports } = require('winston'); require('winston-daily-rotate-file'); -const { isProduction } = require('./utils'); - if (!config.get('server').logLevel) { throw new Error('Please set the logLevel config!'); } +function getEnvName() { + return process.env.NODE_ENV || 'default'; +} + +function isProduction() { + return getEnvName() === 'production'; +} + const { json, colorize } = format; const LEVEL = Symbol.for('level'); -function splatTransform(info) { - const args = info[Symbol.for('splat')]; +/** + * We use this formatter to get a console.log like logging system + * + * @param {Object} logEntry - info object passed by winston + */ +function splatTransform(logEntry) { + const args = logEntry[Symbol.for('splat')]; if (args) { - info.message = util.format(info.message, ...args); + logEntry.message = util.format(logEntry.message, ...args); } - return info; + return logEntry; } +/** + * Formatter that adds additional metadata to the log line. + * @param {Object} logEntry + */ function metaTransform(logEntry) { const customMeta = { timestamp: logEntry.timestamp, @@ -37,6 +52,9 @@ function metaTransform(logEntry) { return logEntry; } +// Combine the various custom formatters along with the winston's json to obtain a json like log line. +// This formatter will be used only for file logging as it's json thus more parser friendly in +// case we externalize this somewhere. const fileLogger = format.combine( format.timestamp({ format: 'YYYY-MM-DD HH:mm:ss.SSS' }), format(splatTransform)(), @@ -44,6 +62,7 @@ const fileLogger = format.combine( json() ); +// Winston rolling file common configuration used for both error and and normal logs file transports. const logFileCommonCfg = { format: fileLogger, auditFile: 'logs/app-log-audit.json', @@ -53,31 +72,46 @@ const logFileCommonCfg = { maxFiles: '90d', }; +// Error logs along with uncaught exceptions will have their own individual files. +// Normal log rolling file transport configuration based on common cfg. const appLogTransport = new transports.DailyRotateFile({ ...logFileCommonCfg, level: config.get('server').logLevel, filename: 'logs/app-%DATE%.log', }); +// Error log rolling file transport configuration based on common cfg. const appErrorLogTransport = new transports.DailyRotateFile({ ...logFileCommonCfg, level: 'error', filename: 'logs/app-error-%DATE%.log', }); +// Uncaught exception log transport configuration, we remove the custom formatters as it interferes with +// winston's way of logging exceptions. +// Warning! this transports swallows uncaught exceptions, logs and the exits the process with an error, +// uncaught exception handlers might not work. const appExceptionLogTransportCfg = { ...logFileCommonCfg }; delete appExceptionLogTransportCfg.format; +// Log uncaught exceptions in both error log and normal log in case we need to track some particular flow. const appExceptionLogTransport = new transports.DailyRotateFile({ ...appExceptionLogTransportCfg, filename: 'logs/app-error-%DATE%.log', }); +const appExceptionCommonLogTransport = new transports.DailyRotateFile({ + ...appExceptionLogTransportCfg, + filename: 'logs/app-%DATE%.log', +}); +// Create actual loggers with specific transports const logger = createLogger({ transports: [appLogTransport, appErrorLogTransport], - exceptionHandlers: [appExceptionLogTransport], + exceptionHandlers: [appExceptionLogTransport, appExceptionCommonLogTransport], }); +// Only add a console log transport if we're not in production (i.e. NODE_ENV != production) in order to avoid +// unnecessary operations. if (!isProduction()) { const consoleLogger = format.combine( @@ -90,7 +124,7 @@ if (!isProduction()) { `${timestamp} ${PID} ${TID} ${host} ${level}: ${message}` ) ); - // If we're not in production then also log to the `console` + logger.add( new transports.Console({ format: consoleLogger, From 446028ee053bc39a5d447cd45666d3b5c916ef2c Mon Sep 17 00:00:00 2001 From: Andrei Gavrilescu Date: Fri, 22 May 2020 14:23:39 +0300 Subject: [PATCH 3/8] Fix lint --- app.js | 18 +++++++++--------- extract.js | 3 ++- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/app.js b/app.js index 29b1b787..9cabd455 100644 --- a/app.js +++ b/app.js @@ -50,15 +50,15 @@ const connected = new prom.Gauge({ help: 'number of open websocket connections', }); -const processed = new prom.Counter({ - name: 'rtcstats_files_processed', - help: 'number of files processed', -}); +// const processed = new prom.Counter({ +// name: 'rtcstats_files_processed', +// help: 'number of files processed', +// }); -const errored = new prom.Counter({ - name: 'rtcstats_files_errored', - help: 'number of files with errors during processing', -}); +// const errored = new prom.Counter({ +// name: 'rtcstats_files_errored', +// help: 'number of files with errors during processing', +// }); function storeDump(clientId) { const path = tempPath + '/' + clientId; @@ -140,7 +140,7 @@ function setupWorkDirectory() { } } -function setupHttpServer(port, keys) { +function setupHttpServer(port) { const options = { key: fs.readFileSync('key.pem'), cert: fs.readFileSync('cert.pem'), diff --git a/extract.js b/extract.js index c85a4275..60b0697c 100644 --- a/extract.js +++ b/extract.js @@ -268,8 +268,9 @@ function processDump(clientId) { } catch (error) { parentPort.postMessage({ type: ResponseType.ERROR, - body: { clientId, error } }, + body: { clientId, error }, }); + } }); } From 2a5b0be1c215516c150a7214f17a3423dc08ee39 Mon Sep 17 00:00:00 2001 From: Andrei Gavrilescu Date: Tue, 23 Jun 2020 14:44:09 +0300 Subject: [PATCH 4/8] Enable database push --- Dockerfile | 2 +- app.js | 128 ++++++++++++++++------- certs/cert.pem | 21 ++++ certs/key.pem | 27 +++++ config/custom-environment-variables.yaml | 6 ++ config/debug.yaml | 38 +++++++ config/default.yaml | 13 ++- config/production.yaml | 9 +- database/redshift-firehose.js | 1 + extract.js | 64 ++++++++---- features-connection.js | 6 +- logging.js | 42 ++++---- package.json | 4 +- store/s3.js | 8 +- 14 files changed, 273 insertions(+), 96 deletions(-) create mode 100644 certs/cert.pem create mode 100644 certs/key.pem create mode 100644 config/custom-environment-variables.yaml create mode 100644 config/debug.yaml diff --git a/Dockerfile b/Dockerfile index cc5a983b..1431ee03 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM node:10.15.3-alpine +FROM node:12.16-alpine RUN apk add --no-cache git && \ rm -rf /var/lib/apt/lists/* /var/cache/apk /usr/share/man /tmp/* diff --git a/app.js b/app.js index 9cabd455..a83514d5 100644 --- a/app.js +++ b/app.js @@ -22,27 +22,39 @@ let database; if (config.gcp && config.gcp.dataset && config.gcp.table) { database = require('./database/bigquery.js')(config.gcp); } + if (!database) { database = require('./database/redshift-firehose.js')(config.firehose); } +if(!database) { + logger.warn('No database configured!'); +} + // Configure store, fall back to S3 let store; if (config.gcp && config.gcp.bucket) { store = require('./store/gcp.js')(config.gcp); } + if (!store) { store = require('./store/s3.js')(config.s3); } + +// Configure Amplitude backend let amplitude; if (config.amplitude && config.amplitude.key) { amplitude = new AmplitudeConnector(config.amplitude.key); +} else { + logger.warn('Amplitude is not configured!'); } let server; const tempPath = 'temp'; + +// Initialize prometheus metrics. const prom = require('prom-client'); const connected = new prom.Gauge({ @@ -50,15 +62,15 @@ const connected = new prom.Gauge({ help: 'number of open websocket connections', }); -// const processed = new prom.Counter({ -// name: 'rtcstats_files_processed', -// help: 'number of files processed', -// }); +const processed = new prom.Counter({ + name: 'rtcstats_files_processed', + help: 'number of files processed', +}); -// const errored = new prom.Counter({ -// name: 'rtcstats_files_errored', -// help: 'number of files with errors during processing', -// }); +const errored = new prom.Counter({ + name: 'rtcstats_files_errored', + help: 'number of files with errors during processing', +}); function storeDump(clientId) { const path = tempPath + '/' + clientId; @@ -83,18 +95,38 @@ const workerPool = new WorkerPool(workerScriptPath, getIdealWorkerCount()); workerPool.on(ResponseType.PROCESSING, (body) => { logger.debug('Handling PROCESSING event with body %j', body); - amplitude && amplitude.track(body); - // const { url, clientId, connid, clientFeatures, connectionFeatures, streamFeatures } = body; + // Amplitude has constraints and limits of what information one sends, so it has a designated backend which + // only sends specific features. + if (amplitude) { + amplitude.track(body); + } - // if (database) { - // database.put(url, clientId, connid, clientFeatures, connectionFeatures, streamFeatures); - // } else { - // logger.warn('No database configured!'); - // } + // Current supported databases are big data type so we can send bulk data without having to worry about + // volume. + if (database) { + const { url, clientId, connid, clientFeatures, connectionFeatures } = body; + + if(!connectionFeatures) { + database.put(url, clientId, connid, clientFeatures); + } else { + + // When using a database backend the streams features are stored separately, so we don't need them + // in the connectionFeatures object. + const streams = connectionFeatures.streams; + delete connectionFeatures.streams; + + for(const streamFeatures of streams) { + database.put(url, clientId, connid, clientFeatures, connectionFeatures, streamFeatures); + } + } + } }); workerPool.on(ResponseType.DONE, (body) => { logger.debug('Handling DONE event with body %j', body); - storeDump(body.clientId); + + processed.inc(); + + //storeDump(body.clientId); }); workerPool.on(ResponseType.ERROR, (body) => { // TODO handle requeue of the request, this also requires logic in extract.js @@ -102,6 +134,8 @@ workerPool.on(ResponseType.ERROR, (body) => { // the client id. logger.error('Handling ERROR event with body %o', body); + errored.inc(); + // If feature extraction failed at least attempt to store the dump in s3. if (body.clientId) { storeDump(body.clientId); @@ -140,35 +174,48 @@ function setupWorkDirectory() { } } -function setupHttpServer(port) { +function serverHandler(request, response) { + switch (request.url) { + case '/healthcheck': + response.writeHead(200); + response.end(); + break; + default: + response.writeHead(404); + response.end(); + } +} + +/** + * In case one wants to run the server locally, https is required, as browsers normally won't allow non + * secure web sockets on a https domain, so something like the bellow config is required along with a https + * server instead of http. + * + * @param {number} port + */ +function setupHttpsServer(port) { const options = { - key: fs.readFileSync('key.pem'), - cert: fs.readFileSync('cert.pem'), + key: fs.readFileSync(config.get('server').keyPath), + cert: fs.readFileSync(config.get('server').certPath), }; - const server = https - .Server(options, () => {}) - .on('request', (request, response) => { - switch (request.url) { - case '/healthcheck': - response.writeHead(200); - response.end(); - break; - default: - response.writeHead(404); - response.end(); - } - }) - .listen(port); + const server = https.createServer(options, serverHandler).listen(port); + + return server; +} + +function setupHttpServer(port) { + const server = http.createServer(serverHandler).listen(port); + return server; } function setupMetricsServer(port) { const metricsServer = http - .Server() - .on('request', (request, response) => { + .createServer((request, response) => { switch (request.url) { case '/metrics': + prom.collectDefaultMetrics(); response.writeHead(200, { 'Content-Type': prom.contentType }); response.end(prom.register.metrics()); break; @@ -297,12 +344,16 @@ function setupWebSocketsServer(server) { }); } -function run(keys) { +function run() { logger.info('Initializing <%s>, version <%s>, env <%s> ...', appName, appVersion, getEnvName()); setupWorkDirectory(); - server = setupHttpServer(config.get('server').port, keys); + if (config.get('server').useHTTPS) { + server = setupHttpsServer(config.get('server').port); + } else { + server = setupHttpServer(config.get('server').port); + } if (config.get('server').metrics) { setupMetricsServer(config.get('server').metrics); @@ -313,6 +364,9 @@ function run(keys) { logger.info('Initialization complete.'); } +/** + * Currently used from test script. + */ function stop() { if (server) { server.close(); diff --git a/certs/cert.pem b/certs/cert.pem new file mode 100644 index 00000000..970a589d --- /dev/null +++ b/certs/cert.pem @@ -0,0 +1,21 @@ +-----BEGIN CERTIFICATE----- +MIIDjDCCAnQCCQDPkKLEhyE8aTANBgkqhkiG9w0BAQUFADCBhzELMAkGA1UEBhMC +VVMxEzARBgNVBAgMCkNBTElGT1JOSUExETAPBgNVBAcMCFNhbiBKb3NlMQ0wCwYD +VQQKDAR0ZXN0MQ0wCwYDVQQLDAR0ZXN0MQ0wCwYDVQQDDAR0ZXN0MSMwIQYJKoZI +hvcNAQkBFhRqb2huLmRvZUBkb2VtYWlsLmNvbTAeFw0yMDA2MjIxODU2NDNaFw00 +NzExMDcxODU2NDNaMIGHMQswCQYDVQQGEwJVUzETMBEGA1UECAwKQ0FMSUZPUk5J +QTERMA8GA1UEBwwIU2FuIEpvc2UxDTALBgNVBAoMBHRlc3QxDTALBgNVBAsMBHRl +c3QxDTALBgNVBAMMBHRlc3QxIzAhBgkqhkiG9w0BCQEWFGpvaG4uZG9lQGRvZW1h +aWwuY29tMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA2WGnATMYgYCo +tF1djx1qPJ3WECsaWh6uqqirvz+6CuTbV5eJDxbVptfqI/RePby8bEHL0iZzN15t +044gMv9fvWciaa+lF48+H2mQtz4ASW44vpshsd9F1UxPS72Dw87ntIChJbj0e0me +NBT9knA3dL3MRz133wdrGkFGwHeovz/twgMkB+RAdz8wHIFo5znxid2ejQl0bDaI +J455Q8M8evReTSIe8RHxTUwWTUwt+s9VQOvCzb0fQS1MJKsAdmh3LfOqmMQ06vvR +yFSeDDcDDQrlxRb5MeczLfiu/t/UGpTEGCJXfiSeNISEXFtBQnTdQub8BUoHCw6+ +rVeUadFsYwIDAQABMA0GCSqGSIb3DQEBBQUAA4IBAQB5Uf2aRs6NeGJhdE50xuVw +93BerkZkXQaq/zhTtDsA6cczhxJjJXOakkA+AY0YxwWHEMw9sVjgPb0pzaChzsFh +aourbxWyRGSQXp8OMifnEMB9TOD6c4vZAZZpZnIlqTWVRDb/nayYEsw0m/cEfIYD +1wJqHbVw4vvsIJn9aCNnx/oyL+rl6d9cmgQ1CIlEMvTvGMCtO7yUDtAjSy9NQbIp +I+TB1fNDeoDJn1ghQPGViBLYUTj6cmI/u/wXSj401oM7rvrm+C7ZKlWohcwKXmm4 +fK0ngbgDOIEZpE7FqT7NV2BPs3XhycSl16mkGQJ75SIcpgm+bb1rKWxOGTiRksa4 +-----END CERTIFICATE----- diff --git a/certs/key.pem b/certs/key.pem new file mode 100644 index 00000000..95f2da58 --- /dev/null +++ b/certs/key.pem @@ -0,0 +1,27 @@ +-----BEGIN RSA PRIVATE KEY----- +MIIEogIBAAKCAQEA2WGnATMYgYCotF1djx1qPJ3WECsaWh6uqqirvz+6CuTbV5eJ +DxbVptfqI/RePby8bEHL0iZzN15t044gMv9fvWciaa+lF48+H2mQtz4ASW44vpsh +sd9F1UxPS72Dw87ntIChJbj0e0meNBT9knA3dL3MRz133wdrGkFGwHeovz/twgMk +B+RAdz8wHIFo5znxid2ejQl0bDaIJ455Q8M8evReTSIe8RHxTUwWTUwt+s9VQOvC +zb0fQS1MJKsAdmh3LfOqmMQ06vvRyFSeDDcDDQrlxRb5MeczLfiu/t/UGpTEGCJX +fiSeNISEXFtBQnTdQub8BUoHCw6+rVeUadFsYwIDAQABAoIBAH76hd0zhZsQFnvV +FfOlUQs7f3FOXERMK+dQQ5KhnQEEEgQmZk9EHWUqNoDuG6agesgZ3v9QqnirVif/ +m1tuxPQULIvjp+INMFKVDY2cT/qUwdzFLXeDXn1r593sQ+27DKnpgThRw63IoPr3 +T++cUSiGPa9Xfo/u+2cIvlVrEE017FxMr/b1POMwytrn3N8T+IVnD5Ny8xGnTeYd +F6Wl/FArfgL80/10son6Kw0GPaaQNC+DDesm8ZVE+y5jFvK22yBRgGc404uF5Vbo +8Dym0+zGD2gO8hZIjMMl9N2dgJ/b56qFf6dh23Tq69pKOvVsJpKTiIfQOwBSfmjp +j8CyVZECgYEA/QAOR0dY9NlhWwn1IWexE0JhFPYsKQhVo4Nwbxe+UGpjwpxzlNYM +EdG+JxETIqnutxw8ngvbhgpOLopvTWxwQnazw0Hr5tAQO3dB80l7QiIWfrfyZcgI +Gz9PyD1gWRF/zdIpLPg4TzTSDmWdcV/wej00IgkyuAX79N8sbX6LSy8CgYEA2/V7 +LhIv3mlcU8+CsPndbJLyo/lBsWEpNjBa4bIqz+kVuQDzE6+6iOs2fFBsRvJ34Kf/ +2UVK6s9xe37GD/PSo2GivGOb+zVFia+TJcSgHj9LQUEgfuOTFGBuZsrKbefzKwGA +de+H5q+6llk+W9ZTWy4TSYFPBWc7UkFZb724VQ0CgYAlEFoHJTOqAyKZFLddonQ1 +jxbr3DiR9k+deccB72eJHlzpCMSB/G2eOqzxyjWUcXKwTqmuuav6Ug4sEUnG/Ojh +Q8SICWNG2BpYq9r0ikJNaPMEs2wGbyyI2ViVzDAOPFsNywkPNnoBWIqhY0+SaWyw +a8D0b4aHoRDNSdiXXd+ILwKBgFQFP1pn5BUnVfdFyvxjVauFrl3odqmVHbLvYafY +8PWeaYfTzwZ0F+L5RkTSS6oGMLiGM/sAtw9e0lCEKpApaQqz3v/rZMfen4Nqp+DD +bQ5gyxRQFmOh9qrP8xwc1pqJAaAp4LIsH1OFSNbpnCJkik7IGOH5HQBJYKWZBNrk +M6d5AoGAJ2DWW1OHzZjA82WGXaD4n+4BB1d/2CVwg2KLcK8A8Q0yPOp3U+lkVdwM +bZ/B6b6yGoqgh1iRkK04b8/G6sChuZ5X/miGI3Qa4a0y5eX+EbRHNx3QgipfCMfQ +EPsByAK9PjuptR1NvGxnrvCT5xKAnoUqrmm0A+kf1gKknyeMqds= +-----END RSA PRIVATE KEY----- diff --git a/config/custom-environment-variables.yaml b/config/custom-environment-variables.yaml new file mode 100644 index 00000000..1a34ed16 --- /dev/null +++ b/config/custom-environment-variables.yaml @@ -0,0 +1,6 @@ +amplitude: + key: RTCSTATS_AMPLITUDE_KEY + +s3: + region: RTCSTATS_S3_AWS_REGION + bucket: RTCSTATS_S3_BUCKET \ No newline at end of file diff --git a/config/debug.yaml b/config/debug.yaml new file mode 100644 index 00000000..47989d84 --- /dev/null +++ b/config/debug.yaml @@ -0,0 +1,38 @@ +server: + port: 3000 + metrics: 8089 + # Set to true if you've a LB in front of RTCStats and you are obtaining + # its IP address as part of the X-Forwarded-For header + skipLoadBalancerIp: false + logLevel: debug + jsonConsoleLog: false + useHTTPS: false + +amplitude: + key: '' + +s3: + accessKeyId: + secretAccessKey: + region: + bucket: + useIAMAuth: false + +firehose: + accessKeyId: + secretAccessKey: + region: + stream: + +gcp: + bucket: + dataset: + table: + fields: + maxFlushTime: + bufferSize: + +github: + client_id: GITHUB_CLIENT_ID + client_secret: GITHUB_SECRET + callback_url: GITHUB_CALLBACK_URL diff --git a/config/default.yaml b/config/default.yaml index ff6a55b0..051b42d6 100644 --- a/config/default.yaml +++ b/config/default.yaml @@ -1,18 +1,25 @@ server: port: 3000 - metrics: - # Set to true if you've a LB in front of RTCStats and you are obtaining + metrics: 8089 + # Set to true if you've a LB in front of RTCStats and you are obtaining # its IP address as part of the X-Forwarded-For header skipLoadBalancerIp: false logLevel: info + jsonConsoleLog: true + # The provided certificates are intended for local testing usign a HTTPS server + useHTTPS: false + keyPath: './certs/key.pem' + certPath: './certs/cert.pem' + amplitude: - key: '' + key: s3: accessKeyId: secretAccessKey: region: bucket: + useIAMAuth: false firehose: accessKeyId: diff --git a/config/production.yaml b/config/production.yaml index d1096491..6b81433a 100644 --- a/config/production.yaml +++ b/config/production.yaml @@ -1,15 +1,20 @@ server: port: 3000 - metrics: - # Set to true if you've a LB in front of RTCStats and you are obtaining + metrics: 8089 + # Set to true if you've a LB in front of RTCStats and you are obtaining # its IP address as part of the X-Forwarded-For header skipLoadBalancerIp: false logLevel: info + +amplitude: + key: + s3: accessKeyId: secretAccessKey: region: bucket: + useIAMAuth: true firehose: accessKeyId: diff --git a/database/redshift-firehose.js b/database/redshift-firehose.js index d39645eb..b31c2b7b 100644 --- a/database/redshift-firehose.js +++ b/database/redshift-firehose.js @@ -20,6 +20,7 @@ module.exports = function (config) { firehose = new AWS.Firehose(); } else { logger.warn('No Firehose configuration present. Skipping firehose storage.') + return; } return { diff --git a/extract.js b/extract.js index 60b0697c..0e9567bb 100644 --- a/extract.js +++ b/extract.js @@ -51,7 +51,14 @@ if (!isMainThread) { } }); } else { - logger.error('Attempting to run worker thread in main process context!'); + const clientid = process.argv[2]; + if (!clientid) { + logger.error('Please provide a valid clientId!'); + return -1; + } + + logger.info(`Running feature extraction on ${clientid}...`); + processDump(clientid); } // dumps all peerconnections. @@ -73,6 +80,8 @@ function generateFeatures(url, client, clientId) { // ignore connections that never send getUserMedia or peerconnection events. if (client.getUserMedia.length === 0 && Object.keys(client.peerConnections).length === 0) return; + + const identity = client.identity; // logger.info(JSON.stringify(client.identity)); // clientFeatures are the same for all peerconnections but are saved together // with each peerconnection anyway to make correlation easier. @@ -96,16 +105,16 @@ function generateFeatures(url, client, clientId) { } }); - // if (Object.keys(client.peerConnections).length === 0) { - // // we only have GUM and potentially GUM errors. - // parentPort.postMessage({ - // type: ResponseType.PROCESSING, - // body: { url, clientId, connid: '', clientFeatures }, - // }); - // } + if (Object.keys(client.peerConnections).length === 0) { + // We only have GUM and potentially GUM errors. + parentPort.postMessage({ + type: ResponseType.PROCESSING, + body: { url, clientId, connid: '', identity, clientFeatures }, + }); + } + + logger.debug('Client features: %j', clientFeatures); - logger.debug('Client features: ', clientFeatures); - const streamList = []; Object.keys(client.peerConnections).forEach((connid) => { @@ -131,7 +140,6 @@ function generateFeatures(url, client, clientId) { } }); - const tracks = extractTracks(conn); const streams = extractStreams(tracks); @@ -172,20 +180,27 @@ function generateFeatures(url, client, clientId) { connectionFeatures.streams = streamList; - parentPort.postMessage({ - type: ResponseType.PROCESSING, - body: { clientId, connid, identity: client.identity, connectionFeatures }, - }); + if (!isMainThread) { + parentPort.postMessage({ + type: ResponseType.PROCESSING, + body: { url, clientId, connid, clientFeatures, identity, connectionFeatures }, + }); + } delete client.peerConnections[connid]; // save memory }); - - parentPort.postMessage({ type: ResponseType.DONE, body: {clientId} }); + if (!isMainThread) { + parentPort.postMessage({ type: ResponseType.DONE, body: { clientId } }); + } } function processDump(clientId) { - const path = 'temp/' + clientId; + let path = clientId; + if (!isMainThread) { + path = 'temp/' + clientId; + } + fs.readFile(path, { encoding: 'utf-8' }, (err, data) => { try { if (err) { @@ -266,11 +281,14 @@ function processDump(clientId) { dump(client.url, client); generateFeatures(client.url, client, clientId); } catch (error) { - parentPort.postMessage({ - type: ResponseType.ERROR, - body: { clientId, error }, - }); - + if (isMainThread) { + logger.error(error); + } else { + parentPort.postMessage({ + type: ResponseType.ERROR, + body: { clientId, error }, + }); + } } }); } diff --git a/features-connection.js b/features-connection.js index 7c681536..2d7f57bf 100644 --- a/features-connection.js +++ b/features-connection.js @@ -122,7 +122,7 @@ module.exports = { } }, peerIdentifier: function(client, peerConnectionLog) { - let constraints = getPeerConnectionConstraints(peerConnectionLog); + let constraints = getPeerConnectionConstraints(peerConnectionLog); if (!constraints.optional) return; constraints = constraints.optional; for (let i = 0; i < constraints.length; i++) { @@ -132,7 +132,7 @@ module.exports = { } }, conferenceIdentifier: function(client, peerConnectionLog) { - let constraints = getPeerConnectionConstraints(peerConnectionLog); + let constraints = getPeerConnectionConstraints(peerConnectionLog); if (!constraints.optional) return; constraints = constraints.optional; for (let i = 0; i < constraints.length; i++) { @@ -349,7 +349,7 @@ module.exports = { // was the peerconnection created with non-spec SDES? configuredSDES: function(client, peerConnectionLog) { - const constraints = getPeerConnectionConstraints(peerConnectionLog); + const constraints = getPeerConnectionConstraints(peerConnectionLog); return constraints && constraints.mandatory && constraints.mandatory.DtlsSrtpKeyAgreement === false; }, diff --git a/logging.js b/logging.js index 8652a5bd..131886e8 100644 --- a/logging.js +++ b/logging.js @@ -2,7 +2,6 @@ const util = require('util'); const os = require('os'); const config = require('config'); const { threadId } = require('worker_threads'); - const { createLogger, format, transports } = require('winston'); require('winston-daily-rotate-file'); @@ -10,21 +9,13 @@ if (!config.get('server').logLevel) { throw new Error('Please set the logLevel config!'); } -function getEnvName() { - return process.env.NODE_ENV || 'default'; -} - -function isProduction() { - return getEnvName() === 'production'; -} - const { json, colorize } = format; const LEVEL = Symbol.for('level'); /** * We use this formatter to get a console.log like logging system - * - * @param {Object} logEntry - info object passed by winston + * + * @param {Object} logEntry - info object passed by winston */ function splatTransform(logEntry) { const args = logEntry[Symbol.for('splat')]; @@ -37,7 +28,7 @@ function splatTransform(logEntry) { /** * Formatter that adds additional metadata to the log line. - * @param {Object} logEntry + * @param {Object} logEntry */ function metaTransform(logEntry) { const customMeta = { @@ -53,7 +44,7 @@ function metaTransform(logEntry) { } // Combine the various custom formatters along with the winston's json to obtain a json like log line. -// This formatter will be used only for file logging as it's json thus more parser friendly in +// This formatter will be used only for file logging as it's json thus more parser friendly in // case we externalize this somewhere. const fileLogger = format.combine( format.timestamp({ format: 'YYYY-MM-DD HH:mm:ss.SSS' }), @@ -69,7 +60,7 @@ const logFileCommonCfg = { datePattern: 'YYYY-MM-DD', zippedArchive: true, maxSize: '100m', - maxFiles: '90d', + maxFiles: '60d', }; // Error logs along with uncaught exceptions will have their own individual files. @@ -110,26 +101,29 @@ const logger = createLogger({ exceptionHandlers: [appExceptionLogTransport, appExceptionCommonLogTransport], }); -// Only add a console log transport if we're not in production (i.e. NODE_ENV != production) in order to avoid -// unnecessary operations. -if (!isProduction()) { - +// The JSON format is more suitable for production deployments that use the console. +// The alternative is a single line log format that is easier to read, useful for local development. +if (config.get('server').jsonConsoleLog) { + logger.add( + new transports.Console({ + format: fileLogger, + level: config.get('server').logLevel, + handleExceptions: true, + }) + ); +} else { const consoleLogger = format.combine( format.timestamp({ format: 'YYYY-MM-DD HH:mm:ss.SSS' }), colorize(), format(splatTransform)(), format(metaTransform)(), - format.printf( - ({ level, message, timestamp, PID, TID, host }) => - `${timestamp} ${PID} ${TID} ${host} ${level}: ${message}` - ) + format.printf(({ level, message, timestamp, PID, TID, host }) => `${timestamp} ${PID} ${TID} ${host} ${level}: ${message}`) ); - + logger.add( new transports.Console({ format: consoleLogger, level: config.get('server').logLevel, - prettyPrint: true, handleExceptions: true, }) ); diff --git a/package.json b/package.json index 55157272..7422b52b 100644 --- a/package.json +++ b/package.json @@ -6,8 +6,8 @@ "scripts": { "test": "eslint *.js database/*.js store/*.js && node test/client.js", "start": "NODE_ENV=production node app.js", - "watch:dev": "nodemon --exec 'NODE_ENV=debug LOG=debug node app.js'", - "debug": "NODE_ENV=debug LOG=debug node app.js" + "watch:dev": "nodemon --exec 'NODE_ENV=debug node app.js'", + "debug": "NODE_ENV=debug node app.js" }, "repository": { "type": "git", diff --git a/store/s3.js b/store/s3.js index 55fcf327..25e48057 100644 --- a/store/s3.js +++ b/store/s3.js @@ -6,13 +6,19 @@ const AWS = require('aws-sdk'); const logger = require('../logging'); module.exports = function (config) { - AWS.config = config; + + AWS.config.update({region: config.region}); + + if (!config.useIAMAuth) { + AWS.config = config; + } const s3bucket = new AWS.S3({ params: { Bucket: config.bucket } }); + const configured = !!config.bucket; return { From a98453ce9708d87609f49ceaf2081b5491b67838 Mon Sep 17 00:00:00 2001 From: Andrei Gavrilescu Date: Tue, 23 Jun 2020 14:44:31 +0300 Subject: [PATCH 5/8] delete old certs --- cert.pem | 20 -------------------- key.pem | 27 --------------------------- 2 files changed, 47 deletions(-) delete mode 100644 cert.pem delete mode 100644 key.pem diff --git a/cert.pem b/cert.pem deleted file mode 100644 index ad001456..00000000 --- a/cert.pem +++ /dev/null @@ -1,20 +0,0 @@ ------BEGIN CERTIFICATE----- -MIIDUzCCAjsCFG9JBDNzPV9m0qSVVC9RSV4hyqIkMA0GCSqGSIb3DQEBCwUAMGYx -CzAJBgNVBAYTAlJPMQswCQYDVQQIDAJSTzELMAkGA1UEBwwCUk8xDTALBgNVBAoM -BFRFU1QxCzAJBgNVBAsMAlJPMQswCQYDVQQDDAJSTzEUMBIGCSqGSIb3DQEJARYF -ZHFxd2QwHhcNMjAwNDI2MjAwODI4WhcNNDcwOTExMjAwODI4WjBmMQswCQYDVQQG -EwJSTzELMAkGA1UECAwCUk8xCzAJBgNVBAcMAlJPMQ0wCwYDVQQKDARURVNUMQsw -CQYDVQQLDAJSTzELMAkGA1UEAwwCUk8xFDASBgkqhkiG9w0BCQEWBWRxcXdkMIIB -IjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAplmd7wR974Xw8Ajd3ES7Pg2d -kKALrSU3BLTclpr+q68V9dFCQ4b0Nh2yzzHvj1oSJggD26jNSTB+PDu6zJIN8nM2 -DncFMGCqxvtjIsBWbyASb2Du0RI8GbDfXNXY1R3Az2PlokH//fmq9TbluFXzJ6nW -pzI9riHDbxQV9OhIYAI7o9hXrFaqKqvmSSmBNcce+epD3Wau3nTYATkUn28x5Uml -S01ca0TiEobSgmwwuB5p/6SwLOlDUypBeYwQIM+A8PKtBur7Remzq+iPCJq6fwkc -2jhBsj5yo7mPeSfpcW46JJftVDSrKISKGVxeuBjZBRZpJXWJCLYM0R0xvxkxeQID -AQABMA0GCSqGSIb3DQEBCwUAA4IBAQCV8z9+KpNUSjT9lehDG8dNcGTZyvIG7aBV -YEKiaiHBU2SFBrJfLGAfS2E+l6RwgG3033LXwNwl/0ifnIwEPZ7noKm0F3lPj+fX -V/XDWG1ITqn6IowiLU1PLibs1iwuLjSb09y047WGaZAix+BMxxRBit8/OQ87KC0p -4nL8bwE3Aw7XLYVOXsXqODXdxnFZ0yMV/aPdCf175jjZmLoFkfRNMvqzNBcYGp4W -ddq+C+lLk6oBI9fQsn/Tu33qQlz9bVwZujjF1P9A3vvn1mVBctgK4VBJ4N6E177G -z7P0ZVgj8n8/jGF/oNBh/Wfzs6AUTreTJNvoqfrerq7O21Nh50X5 ------END CERTIFICATE----- diff --git a/key.pem b/key.pem deleted file mode 100644 index 1d7217b3..00000000 --- a/key.pem +++ /dev/null @@ -1,27 +0,0 @@ ------BEGIN RSA PRIVATE KEY----- -MIIEowIBAAKCAQEAplmd7wR974Xw8Ajd3ES7Pg2dkKALrSU3BLTclpr+q68V9dFC -Q4b0Nh2yzzHvj1oSJggD26jNSTB+PDu6zJIN8nM2DncFMGCqxvtjIsBWbyASb2Du -0RI8GbDfXNXY1R3Az2PlokH//fmq9TbluFXzJ6nWpzI9riHDbxQV9OhIYAI7o9hX -rFaqKqvmSSmBNcce+epD3Wau3nTYATkUn28x5UmlS01ca0TiEobSgmwwuB5p/6Sw -LOlDUypBeYwQIM+A8PKtBur7Remzq+iPCJq6fwkc2jhBsj5yo7mPeSfpcW46JJft -VDSrKISKGVxeuBjZBRZpJXWJCLYM0R0xvxkxeQIDAQABAoIBADtwQA9cgocoS8vo -zyVaZbEpekhn92QZrQwAd+VUYnUD7YvVBqFMQkxn1jFUfW2yWFPAf2hoa1mgeyqY -iQl5koQ0CHeorXD4yWyp/GU5Zmj0g8HKV+raYiEn4tewDXcw12kDH9UXUhn0sNJH -mFOCWoyskedR+1oR9FvnSGUm7l1nO1hUoLY8/+yxLKLoKzxJR214sLkeAWawhru+ -C3uviwHaxpqa4bUd14vZwM+MoeilfY/9L7q34OHw7UKjuU8ZYIoeqKBncmR1BQ54 -15Y2mQwzMZJFcWBj0h+9LlCgI8qSzDI1b3GIptFhGZqkL9G2Zf5RmSa6HjfzCMKQ -2EoKWUUCgYEA0tsHejuOjBnhM5FNn1lqWNh+qfxg58oZhH3/YhIgNXQAirCpWbrM -AQp30Y6V/ME2IfD09DaaC2jNNZ9wueAsjahcduUbZS2tMwIiMjOoTSSBsW7CW857 -8QvlJ9KiiX5oOs9OAuGbaxVq/emlwymudiVxMbsa04p9vcPy0Pc0GksCgYEAyfc+ -z1myLLfYma64wWXlKBjTiBr0liZzS8UHZVRlI14q3w1TS2C0YwesNn7Hz6iCCn4k -repPZc+ae7RQcj2tZQGO0vRHAFL+GIdAgf1nt1hcnItLh05Nmi86ECap4kaKChRY -KhAIWcakA/Da88fOsVDBMqU7fLMKrkwg6Y4iCMsCgYBFynGDJ8ta3AYKR4HlyHbG -yYHDSeHZVq9zhzDMiStYBflX7nlfVdDIV2qpVgSXEGyWd2bcnmYGeL3Tjd6F54lX -qe5Q/CxBJQk65O3kp+yA/CBhVkPGl2W2tzU2JSXfVJOzQ4KSuZHzs7ciK//NxTIV -sPbyeve6JRDRitYIDIqWWQKBgQCrxKYcv5JzyeBjxF/JzBl7YrH1XceLNCR22pmR -qpdh3yLjFXgz8Yk5eDsVFfpmOFBxEBut9kuUsV4Xu6F3p9EiyJJqA+um8O6+ebl5 -VMWy/2m0khuodgY2Ddh6CAgQNCIOtILPM1eG0xSHbX8qOlMmJyJJKpJPWg7JcmHD -gWicxQKBgGWxVCWgk0D9C31+Z5b1SwItmc2Iur4HJJF+WR1oSBqRPa33BCymULMt -kInermtwrUlyfN60HCnGlh3LKI97PTlEp1Wqdgas2ZW8e7eYRl6tzZ8B1yD4tqb7 -tM97QJsr+sOXtHTWFmhFFj4fuJbNi0W56OIfu6d7VN2QofXbKIql ------END RSA PRIVATE KEY----- From 10a20994060e88613313fcdbd0036e2d68cf21ca Mon Sep 17 00:00:00 2001 From: Andrei Gavrilescu Date: Thu, 2 Jul 2020 15:46:13 +0300 Subject: [PATCH 6/8] Add metrics --- Makefile | 8 ++++--- WorkerPool.js | 4 ++++ app.js | 54 +++++++++++++++++++++++++++++++++++------- config/debug.yaml | 4 ++-- config/default.yaml | 6 ++--- config/production.yaml | 5 +++- extract.js | 18 +++++++++++--- package-lock.json | 2 +- package.json | 2 +- utils.js | 3 ++- 10 files changed, 83 insertions(+), 23 deletions(-) diff --git a/Makefile b/Makefile index da96f559..a22b9492 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -REGISTRY:=juandebravo +REGISTRY:=jitsi-dev IMAGE:=rtcstats-server REPOSITORY:=$(REGISTRY)/$(IMAGE) TAG:=latest @@ -9,16 +9,18 @@ build: run: @docker run \ -p 3000:3000 \ + -p 8085:8085 \ $(REPOSITORY):$(TAG) debug: @docker run \ -p 3000:3000 \ + -p 8085:8085 \ -v $(PWD):/rtcstats-server \ - -e DEBUG=true \ --entrypoint npm \ $(REPOSITORY):$(TAG) \ run watch:dev push: build - @docker push $(REGISTRY)/$(IMAGE) + @echo "Push not configured." + #@docker push $(REGISTRY)/$(IMAGE) diff --git a/WorkerPool.js b/WorkerPool.js index fa8443c2..0a126687 100644 --- a/WorkerPool.js +++ b/WorkerPool.js @@ -130,6 +130,10 @@ class WorkerPool extends EventEmitter { logger.info(`There are no IDLE workers queueing, current queue size <${this.taskQueue.length}>`); } } + + getTaskQueueSize() { + return this.taskQueue.length; + } } module.exports = WorkerPool; diff --git a/app.js b/app.js index a83514d5..e833cdf0 100644 --- a/app.js +++ b/app.js @@ -62,6 +62,17 @@ const connected = new prom.Gauge({ help: 'number of open websocket connections', }); +const connection_error = new prom.Counter({ + name: 'rtcstats_websocket_connection_error', + help: 'number of open websocket connections that failed with an error', +}); + + +const queueSize = new prom.Gauge({ + name: 'rtcstats_queue_size', + help: 'Number of dumps currently queued for processing', +}); + const processed = new prom.Counter({ name: 'rtcstats_files_processed', help: 'number of files processed', @@ -72,6 +83,23 @@ const errored = new prom.Counter({ help: 'number of files with errors during processing', }); +const processTime = new prom.Summary({ + name: 'rtcstats_processing_time', + help: 'Processing time for a request', + maxAgeSeconds: 600, + ageBuckets: 5, + percentiles: [0.1, 0.25, 0.5, 0.75, 0.9], + +}); + +const dumpSize = new prom.Summary({ + name: 'rtcstats_dump_size', + help: 'Size of processed rtcstats dumps', + maxAgeSeconds: 600, + ageBuckets: 5, + percentiles: [0.1, 0.25, 0.5, 0.75, 0.9], +}); + function storeDump(clientId) { const path = tempPath + '/' + clientId; store @@ -86,7 +114,9 @@ function storeDump(clientId) { } function getIdealWorkerCount() { - return os.cpus().length; + // Using all the CPUs available might slow down the main node.js thread which is responsible for handling + // requests. + return os.cpus().length - 1; } const workerScriptPath = path.join(__dirname, './extract.js'); @@ -126,13 +156,19 @@ workerPool.on(ResponseType.DONE, (body) => { processed.inc(); - //storeDump(body.clientId); + storeDump(body.clientId); +}); + +workerPool.on(ResponseType.METRICS, (body) => { + logger.info('Handling METRICS event with body %j', body); + processTime.observe(body.extractDurationMs); + dumpSize.observe(body.dumpFileSizeMb); }); workerPool.on(ResponseType.ERROR, (body) => { // TODO handle requeue of the request, this also requires logic in extract.js // i.e. we need to catch all potential errors and send back a request with // the client id. - logger.error('Handling ERROR event with body %o', body); + logger.error('Handling ERROR event with body %j', body); errored.inc(); @@ -160,7 +196,7 @@ function setupWorkDirectory() { logger.debug(`Removing file ${tempPath + '/' + fname}`); fs.unlinkSync(tempPath + '/' + fname); } catch (e) { - logger.error(`Error while unlinking file ${fname} - ${e.message}`); + logger.error(`Error while unlinking file ${fname} - ${e}`); } }); } else { @@ -168,7 +204,7 @@ function setupWorkDirectory() { fs.mkdirSync(tempPath); } } catch (e) { - logger.error(`Error while accessing working dir ${tempPath} - ${e.message}`); + logger.error(`Error while accessing working dir ${tempPath} - ${e}`); // The app is probably in an inconsistent state at this point, throw and stop process. throw e; } @@ -215,7 +251,8 @@ function setupMetricsServer(port) { .createServer((request, response) => { switch (request.url) { case '/metrics': - prom.collectDefaultMetrics(); + queueSize.set(workerPool.getTaskQueueSize()); + // prom.collectDefaultMetrics(); response.writeHead(200, { 'Content-Type': prom.contentType }); response.end(prom.register.metrics()); break; @@ -327,12 +364,13 @@ function setupWebSocketsServer(server) { break; } } catch (e) { - logger.error('Error while processing: %s - %s', e, msg); + logger.error('Error while processing: %s - %s', e.message, msg); } }); client.on('error', (e) => { logger.error('Websocket error: %s', e); + connection_error.inc(); }); client.on('close', () => { @@ -376,7 +414,7 @@ function stop() { // For now just log unhandled promise rejections, as the initial code did not take them into account and by default // node just silently eats them. process.on('unhandledRejection', (reason) => { - logger.error('Unhandled rejection: ', reason); + logger.error('Unhandled rejection: %s', reason); }); run(); diff --git a/config/debug.yaml b/config/debug.yaml index 47989d84..e3118f78 100644 --- a/config/debug.yaml +++ b/config/debug.yaml @@ -1,12 +1,12 @@ server: port: 3000 - metrics: 8089 + metrics: 8095 # Set to true if you've a LB in front of RTCStats and you are obtaining # its IP address as part of the X-Forwarded-For header skipLoadBalancerIp: false logLevel: debug jsonConsoleLog: false - useHTTPS: false + useHTTPS: true amplitude: key: '' diff --git a/config/default.yaml b/config/default.yaml index 051b42d6..fc065c46 100644 --- a/config/default.yaml +++ b/config/default.yaml @@ -1,13 +1,13 @@ server: port: 3000 - metrics: 8089 + metrics: 8095 # Set to true if you've a LB in front of RTCStats and you are obtaining # its IP address as part of the X-Forwarded-For header skipLoadBalancerIp: false logLevel: info - jsonConsoleLog: true + jsonConsoleLog: false # The provided certificates are intended for local testing usign a HTTPS server - useHTTPS: false + useHTTPS: true keyPath: './certs/key.pem' certPath: './certs/cert.pem' diff --git a/config/production.yaml b/config/production.yaml index 6b81433a..5efc099f 100644 --- a/config/production.yaml +++ b/config/production.yaml @@ -1,10 +1,13 @@ server: port: 3000 - metrics: 8089 + metrics: 8095 # Set to true if you've a LB in front of RTCStats and you are obtaining # its IP address as part of the X-Forwarded-For header skipLoadBalancerIp: false logLevel: info + jsonConsoleLog: true + # The provided certificates are intended for local testing usign a HTTPS server + useHTTPS: false amplitude: key: diff --git a/extract.js b/extract.js index 0e9567bb..8508a4cb 100644 --- a/extract.js +++ b/extract.js @@ -40,7 +40,7 @@ if (!isMainThread) { } catch (error) { parentPort.postMessage({ type: ResponseType.ERROR, - body: { clientId: request.body.clientId, error: { ...error } }, + body: { clientId: request.body.clientId, error: error.stack }, }); } break; @@ -201,6 +201,10 @@ function processDump(clientId) { path = 'temp/' + clientId; } + const extractStartTime = new Date().getTime(); + const dumpFileStats = fs.statSync(path); + const dumpFileSizeMb = dumpFileStats.size / 1000000.0; + fs.readFile(path, { encoding: 'utf-8' }, (err, data) => { try { if (err) { @@ -280,13 +284,21 @@ function processDump(clientId) { dump(client.url, client); generateFeatures(client.url, client, clientId); + const extractDurationMs = new Date().getTime() - extractStartTime; + + if (!isMainThread) { + parentPort.postMessage({ + type: ResponseType.METRICS, + body: { clientId, extractDurationMs, dumpFileSizeMb }, + }); + } } catch (error) { if (isMainThread) { - logger.error(error); + logger.error('%s', error); } else { parentPort.postMessage({ type: ResponseType.ERROR, - body: { clientId, error }, + body: { clientId, error: error.stack }, }); } } diff --git a/package-lock.json b/package-lock.json index 76c5c9fd..a451e830 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,6 +1,6 @@ { "name": "rtcstats-server", - "version": "1.0.0", + "version": "1.0.7", "lockfileVersion": 1, "requires": true, "dependencies": { diff --git a/package.json b/package.json index 7422b52b..5672d924 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "rtcstats-server", - "version": "1.0.0", + "version": "1.0.7", "description": "", "main": "websocket.js", "scripts": { diff --git a/utils.js b/utils.js index 28b253b3..e882cbea 100644 --- a/utils.js +++ b/utils.js @@ -143,7 +143,8 @@ const RequestType = Object.freeze({ const ResponseType = Object.freeze({ PROCESSING: 'PROCESSING', DONE: 'DONE', - ERROR: 'ERROR', + METRICS: 'METRICS', + ERROR: 'ERROR' }); module.exports = { From 0a9ff9791d1b426aadc013e88dff416eb6e1ee09 Mon Sep 17 00:00:00 2001 From: Andrei Gavrilescu Date: Tue, 7 Jul 2020 12:50:49 +0300 Subject: [PATCH 7/8] refactor project structure --- .gitignore | 3 +- Dockerfile | 2 +- package-lock.json | 19 ++++ package.json | 9 +- app.js => src/app.js | 86 ++++++------------- .../database}/AmplitudeConnector.js | 0 {database => src/database}/bigquery.js | 0 .../database}/redshift-firehose.js | 0 extract.js => src/features/extract.js | 8 +- .../features/features-client.js | 2 +- .../features/features-connection.js | 2 +- .../features/features-stream.js | 2 +- logging.js => src/logging.js | 0 src/prom-collector.js | 75 ++++++++++++++++ {queries => src/queries}/README.md | 0 {queries => src/queries}/lib/graph.js | 0 {queries => src/queries}/lib/query.js | 0 {queries => src/queries}/queries.js | 0 {store => src/store}/gcp.js | 0 {store => src/store}/s3.js | 0 {test => src/test}/client.js | 14 ++- {test => src/test}/clienttest.json | 0 WorkerPool.js => src/utils/WorkerPool.js | 4 +- features-v2.sql => src/utils/features-v2.sql | 0 fetchdump.js => src/utils/fetchdump.js | 0 .../utils/getstats-deltacompression.js | 0 .../utils/getstats-mangle.js | 0 obfuscator.js => src/utils/obfuscator.js | 0 utils.js => src/utils/utils.js | 13 ++- 29 files changed, 157 insertions(+), 82 deletions(-) rename app.js => src/app.js (88%) rename {database => src/database}/AmplitudeConnector.js (100%) rename {database => src/database}/bigquery.js (100%) rename {database => src/database}/redshift-firehose.js (100%) rename extract.js => src/features/extract.js (98%) rename features-client.js => src/features/features-client.js (99%) rename features-connection.js => src/features/features-connection.js (99%) rename features-stream.js => src/features/features-stream.js (99%) rename logging.js => src/logging.js (100%) create mode 100644 src/prom-collector.js rename {queries => src/queries}/README.md (100%) rename {queries => src/queries}/lib/graph.js (100%) rename {queries => src/queries}/lib/query.js (100%) rename {queries => src/queries}/queries.js (100%) rename {store => src/store}/gcp.js (100%) rename {store => src/store}/s3.js (100%) rename {test => src/test}/client.js (80%) rename {test => src/test}/clienttest.json (100%) rename WorkerPool.js => src/utils/WorkerPool.js (98%) rename features-v2.sql => src/utils/features-v2.sql (100%) rename fetchdump.js => src/utils/fetchdump.js (100%) rename getstats-deltacompression.js => src/utils/getstats-deltacompression.js (100%) rename getstats-mangle.js => src/utils/getstats-mangle.js (100%) rename obfuscator.js => src/utils/obfuscator.js (100%) rename utils.js => src/utils/utils.js (93%) diff --git a/.gitignore b/.gitignore index fdb568f6..54bd7dba 100644 --- a/.gitignore +++ b/.gitignore @@ -8,7 +8,6 @@ lib-cov *.pid *.gz *.tgz -*.sh *.tap temp @@ -18,7 +17,7 @@ results npm-debug.log -node_modules +node_modules GeoLite2-City.* diff --git a/Dockerfile b/Dockerfile index 1431ee03..d2e03341 100644 --- a/Dockerfile +++ b/Dockerfile @@ -18,7 +18,7 @@ COPY --chown=$app:$app . /$app RUN npm install -HEALTHCHECK --interval=10s --timeout=5s --start-period=10s \ +HEALTHCHECK --interval=10s --timeout=10s --start-period=10s \ CMD curl --silent --fail http://localhost:3000/healthcheck \ || exit 1 diff --git a/package-lock.json b/package-lock.json index a451e830..598ceec3 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1162,6 +1162,11 @@ "integrity": "sha1-GwqzvVU7Kg1jmdKcDj6gslIHgyc=", "dev": true }, + "gar": { + "version": "1.0.4", + "resolved": "https://registry.npmjs.org/gar/-/gar-1.0.4.tgz", + "integrity": "sha512-w4n9cPWyP7aHxKxYHFQMegj7WIAsL/YX/C4Bs5Rr8s1H9M1rNtRWRsw+ovYMkXDQ5S4ZbYHsHAPmevPjPgw44w==" + }, "gaxios": { "version": "2.2.2", "resolved": "https://registry.npmjs.org/gaxios/-/gaxios-2.2.2.tgz", @@ -1196,6 +1201,15 @@ "stream-events": "^1.0.4" } }, + "get-folder-size": { + "version": "2.0.1", + "resolved": "https://registry.npmjs.org/get-folder-size/-/get-folder-size-2.0.1.tgz", + "integrity": "sha512-+CEb+GDCM7tkOS2wdMKTn9vU7DgnKUTuDlehkNJKNSovdCOVxs14OfKCk4cvSaR3za4gj+OBdl9opPN9xrJ0zA==", + "requires": { + "gar": "^1.0.4", + "tiny-each-async": "2.0.3" + } + }, "get-stream": { "version": "3.0.0", "resolved": "https://registry.npmjs.org/get-stream/-/get-stream-3.0.0.tgz", @@ -2573,6 +2587,11 @@ "integrity": "sha1-8y6srFoXW+ol1/q1Zas+2HQe9W8=", "dev": true }, + "tiny-each-async": { + "version": "2.0.3", + "resolved": "https://registry.npmjs.org/tiny-each-async/-/tiny-each-async-2.0.3.tgz", + "integrity": "sha1-jru/1tYpXxNwAD+7NxYq/loKUdE=" + }, "tmp": { "version": "0.0.33", "resolved": "https://registry.npmjs.org/tmp/-/tmp-0.0.33.tgz", diff --git a/package.json b/package.json index 5672d924..0e887fe3 100644 --- a/package.json +++ b/package.json @@ -4,10 +4,10 @@ "description": "", "main": "websocket.js", "scripts": { - "test": "eslint *.js database/*.js store/*.js && node test/client.js", - "start": "NODE_ENV=production node app.js", - "watch:dev": "nodemon --exec 'NODE_ENV=debug node app.js'", - "debug": "NODE_ENV=debug node app.js" + "test": "eslint ./src/*.js ./src/database/*.js ./src/store/*.js && node ./src/test/client.js", + "start": "NODE_ENV=production node ./src/app.js", + "watch:dev": "nodemon --exec 'NODE_ENV=debug node ./src/app.js'", + "debug": "NODE_ENV=debug node ./src/app.js" }, "repository": { "type": "git", @@ -26,6 +26,7 @@ "aws-sdk": "^2.441.0", "bluebird": "^3.3.1", "config": "^1.17.1", + "get-folder-size": "^2.0.1", "js-yaml": "^3.13.1", "pem": "^1.14.2", "platform": "https://github.com/bestiejs/platform.js.git", diff --git a/app.js b/src/app.js similarity index 88% rename from app.js rename to src/app.js index e833cdf0..567ca2dc 100644 --- a/app.js +++ b/src/app.js @@ -5,17 +5,28 @@ const os = require('os'); const http = require('http'); const https = require('https'); const path = require('path'); -const WebSocketServer = require('ws').Server; +const WebSocketServer = require('ws').Server; const config = require('config'); const uuid = require('uuid'); const AmplitudeConnector = require('./database/AmplitudeConnector'); const logger = require('./logging'); -const obfuscate = require('./obfuscator'); -const { name: appName, version: appVersion } = require('./package'); -const { getEnvName, RequestType, ResponseType } = require('./utils'); -const WorkerPool = require('./WorkerPool'); +const obfuscate = require('./utils/obfuscator'); +const { name: appName, version: appVersion } = require('../package'); +const { + connected, + connection_error, + //diskQueueSize, + dumpSize, + errored, + processed, + processTime, + prom, + queueSize +} = require('./prom-collector'); +const { getEnvName, RequestType, ResponseType } = require('./utils/utils'); +const WorkerPool = require('./utils/WorkerPool'); // Configure database, fall back to redshift-firehose. let database; @@ -27,7 +38,7 @@ if (!database) { database = require('./database/redshift-firehose.js')(config.firehose); } -if(!database) { +if (!database) { logger.warn('No database configured!'); } @@ -41,7 +52,6 @@ if (!store) { store = require('./store/s3.js')(config.s3); } - // Configure Amplitude backend let amplitude; if (config.amplitude && config.amplitude.key) { @@ -53,53 +63,6 @@ if (config.amplitude && config.amplitude.key) { let server; const tempPath = 'temp'; - -// Initialize prometheus metrics. -const prom = require('prom-client'); - -const connected = new prom.Gauge({ - name: 'rtcstats_websocket_connections', - help: 'number of open websocket connections', -}); - -const connection_error = new prom.Counter({ - name: 'rtcstats_websocket_connection_error', - help: 'number of open websocket connections that failed with an error', -}); - - -const queueSize = new prom.Gauge({ - name: 'rtcstats_queue_size', - help: 'Number of dumps currently queued for processing', -}); - -const processed = new prom.Counter({ - name: 'rtcstats_files_processed', - help: 'number of files processed', -}); - -const errored = new prom.Counter({ - name: 'rtcstats_files_errored', - help: 'number of files with errors during processing', -}); - -const processTime = new prom.Summary({ - name: 'rtcstats_processing_time', - help: 'Processing time for a request', - maxAgeSeconds: 600, - ageBuckets: 5, - percentiles: [0.1, 0.25, 0.5, 0.75, 0.9], - -}); - -const dumpSize = new prom.Summary({ - name: 'rtcstats_dump_size', - help: 'Size of processed rtcstats dumps', - maxAgeSeconds: 600, - ageBuckets: 5, - percentiles: [0.1, 0.25, 0.5, 0.75, 0.9], -}); - function storeDump(clientId) { const path = tempPath + '/' + clientId; store @@ -119,7 +82,7 @@ function getIdealWorkerCount() { return os.cpus().length - 1; } -const workerScriptPath = path.join(__dirname, './extract.js'); +const workerScriptPath = path.join(__dirname, './features/extract.js'); const workerPool = new WorkerPool(workerScriptPath, getIdealWorkerCount()); workerPool.on(ResponseType.PROCESSING, (body) => { @@ -136,18 +99,17 @@ workerPool.on(ResponseType.PROCESSING, (body) => { if (database) { const { url, clientId, connid, clientFeatures, connectionFeatures } = body; - if(!connectionFeatures) { + if (!connectionFeatures) { database.put(url, clientId, connid, clientFeatures); } else { - // When using a database backend the streams features are stored separately, so we don't need them // in the connectionFeatures object. const streams = connectionFeatures.streams; delete connectionFeatures.streams; - for(const streamFeatures of streams) { + for (const streamFeatures of streams) { database.put(url, clientId, connid, clientFeatures, connectionFeatures, streamFeatures); - } + } } } }); @@ -406,9 +368,7 @@ function run() { * Currently used from test script. */ function stop() { - if (server) { - server.close(); - } + process.exit(); } // For now just log unhandled promise rejections, as the initial code did not take them into account and by default @@ -421,4 +381,6 @@ run(); module.exports = { stop: stop, + // We expose the number of processed items for use in the test script + processed: processed }; diff --git a/database/AmplitudeConnector.js b/src/database/AmplitudeConnector.js similarity index 100% rename from database/AmplitudeConnector.js rename to src/database/AmplitudeConnector.js diff --git a/database/bigquery.js b/src/database/bigquery.js similarity index 100% rename from database/bigquery.js rename to src/database/bigquery.js diff --git a/database/redshift-firehose.js b/src/database/redshift-firehose.js similarity index 100% rename from database/redshift-firehose.js rename to src/database/redshift-firehose.js diff --git a/extract.js b/src/features/extract.js similarity index 98% rename from extract.js rename to src/features/extract.js index 8508a4cb..eae5e648 100644 --- a/extract.js +++ b/src/features/extract.js @@ -1,13 +1,13 @@ const fs = require('fs'); const { parentPort, workerData, isMainThread } = require('worker_threads'); -const logger = require('./logging'); +const logger = require('../logging'); const connectionfeatures = require('./features-connection'); const clientfeatures = require('./features-client'); const streamfeatures = require('./features-stream'); -const statsDecompressor = require('./getstats-deltacompression').decompress; -const statsMangler = require('./getstats-mangle'); -const { extractTracks, extractStreams, isProduction, ResponseType, RequestType } = require('./utils'); +const statsDecompressor = require('../utils//getstats-deltacompression').decompress; +const statsMangler = require('../utils/getstats-mangle'); +const { extractTracks, extractStreams, isProduction, ResponseType, RequestType } = require('../utils/utils'); // const canUseProcessSend = !!process.send; diff --git a/features-client.js b/src/features/features-client.js similarity index 99% rename from features-client.js rename to src/features/features-client.js index 4f07346c..3662311c 100644 --- a/features-client.js +++ b/src/features/features-client.js @@ -8,7 +8,7 @@ // The first type of feature is contained in this file. const platform = require('platform'); -const {timeBetween} = require('./utils'); +const {timeBetween} = require('../utils/utils'); module.exports = { origin: function(client) { diff --git a/features-connection.js b/src/features/features-connection.js similarity index 99% rename from features-connection.js rename to src/features/features-connection.js index 2d7f57bf..7feda935 100644 --- a/features-connection.js +++ b/src/features/features-connection.js @@ -7,7 +7,7 @@ // 3) features which are specific to a track. // The second type of feature is contained in this file. -const {capitalize, standardizedMoment, timeBetween, isIceConnected} = require('./utils'); +const {capitalize, standardizedMoment, timeBetween, isIceConnected} = require('../utils/utils'); const SDPUtils = require('sdp'); function getPeerConnectionConfig(peerConnectionLog) { diff --git a/features-stream.js b/src/features/features-stream.js similarity index 99% rename from features-stream.js rename to src/features/features-stream.js index cfcfec24..2632ab6b 100644 --- a/features-stream.js +++ b/src/features/features-stream.js @@ -7,7 +7,7 @@ // 3) features which are specific to a track. // The third type of feature is contained in this file. -const {mode, standardizedMoment} = require('./utils'); +const {mode, standardizedMoment} = require('../utils/utils'); // each feature expects {kind, direction, trackId, stats} as argument. module.exports = { numberOfStats: ({stats}) => stats.length, diff --git a/logging.js b/src/logging.js similarity index 100% rename from logging.js rename to src/logging.js diff --git a/src/prom-collector.js b/src/prom-collector.js new file mode 100644 index 00000000..654ad281 --- /dev/null +++ b/src/prom-collector.js @@ -0,0 +1,75 @@ +// Initialize prometheus metrics. +const getFolderSize = require('get-folder-size'); +const prom = require('prom-client'); + +const logger = require('./logging'); + +const connected = new prom.Gauge({ + name: 'rtcstats_websocket_connections', + help: 'number of open websocket connections', +}); + +const connection_error = new prom.Counter({ + name: 'rtcstats_websocket_connection_error', + help: 'number of open websocket connections that failed with an error', +}); + + +const queueSize = new prom.Gauge({ + name: 'rtcstats_queue_size', + help: 'Number of dumps currently queued for processing', +}); + +const diskQueueSize = new prom.Gauge({ + name: 'rtcstats_disk_queue_size', + help: 'Size occupied on disk by queued dumps', +}); + +const processed = new prom.Counter({ + name: 'rtcstats_files_processed', + help: 'number of files processed', +}); + +const errored = new prom.Counter({ + name: 'rtcstats_files_errored', + help: 'number of files with errors during processing', +}); + +const processTime = new prom.Summary({ + name: 'rtcstats_processing_time', + help: 'Processing time for a request', + maxAgeSeconds: 600, + ageBuckets: 5, + percentiles: [0.1, 0.25, 0.5, 0.75, 0.9], + +}); + +const dumpSize = new prom.Summary({ + name: 'rtcstats_dump_size', + help: 'Size of processed rtcstats dumps', + maxAgeSeconds: 600, + ageBuckets: 5, + percentiles: [0.1, 0.25, 0.5, 0.75, 0.9], +}); + +setInterval(() => { + getFolderSize('temp', (err, size) => { + if (err) { + logger.err('Could not get disk queue dir size %j', err); + return; + } + diskQueueSize.set(size); + }); +},10000); + +module.exports = { + connected, + connection_error, + diskQueueSize, + dumpSize, + errored, + processed, + processTime, + prom, + queueSize +} \ No newline at end of file diff --git a/queries/README.md b/src/queries/README.md similarity index 100% rename from queries/README.md rename to src/queries/README.md diff --git a/queries/lib/graph.js b/src/queries/lib/graph.js similarity index 100% rename from queries/lib/graph.js rename to src/queries/lib/graph.js diff --git a/queries/lib/query.js b/src/queries/lib/query.js similarity index 100% rename from queries/lib/query.js rename to src/queries/lib/query.js diff --git a/queries/queries.js b/src/queries/queries.js similarity index 100% rename from queries/queries.js rename to src/queries/queries.js diff --git a/store/gcp.js b/src/store/gcp.js similarity index 100% rename from store/gcp.js rename to src/store/gcp.js diff --git a/store/s3.js b/src/store/s3.js similarity index 100% rename from store/s3.js rename to src/store/s3.js diff --git a/test/client.js b/src/test/client.js similarity index 80% rename from test/client.js rename to src/test/client.js index a8a68080..4ef92813 100644 --- a/test/client.js +++ b/src/test/client.js @@ -1,14 +1,22 @@ var WebSocket = require('ws'); var fs = require('fs'); var config = require('config'); + var server = require('../app'); -var statsCompressor = require('../getstats-deltacompression').compress; +var statsCompressor = require('../utils/getstats-deltacompression').compress; -var data = JSON.parse(fs.readFileSync('test/clienttest.json')); +var data = JSON.parse(fs.readFileSync('src/test/clienttest.json')); var url = data.url; var origin = url.split('/').splice(0, 3).join('/'); var path = url.split('/').splice(3).join('/'); +function checkTestCompletion(server) { + if (server.processed.get().values[0].value === 1) { + server.stop(); + } else { + setTimeout(checkTestCompletion, 1000, server); + } +} // using setTimeout here is bad obviously. This should wait for the server to listen setTimeout(function() { process.env.NODE_TLS_REJECT_UNAUTHORIZED = '0'; // ignore self-signed cert @@ -29,7 +37,7 @@ setTimeout(function() { var evt = events.shift(); if (!evt) { ws.close(); - server.stop(); + checkTestCompletion(server); return; } if (evt.type === 'getStats') { diff --git a/test/clienttest.json b/src/test/clienttest.json similarity index 100% rename from test/clienttest.json rename to src/test/clienttest.json diff --git a/WorkerPool.js b/src/utils/WorkerPool.js similarity index 98% rename from WorkerPool.js rename to src/utils/WorkerPool.js index 0a126687..c2ee0c02 100644 --- a/WorkerPool.js +++ b/src/utils/WorkerPool.js @@ -2,7 +2,7 @@ const EventEmitter = require('events'); const { Worker } = require('worker_threads'); const uuid = require('uuid'); -const logger = require('./logging'); +const logger = require('../logging'); const WorkerStatus = Object.freeze({ IDLE: 'IDLE', @@ -111,7 +111,7 @@ class WorkerPool extends EventEmitter { } else { logger.warn('Can not add additional worker, pool is already at max capacity!'); } - }, 2000); + } , 2000); } _getIdleWorkers() { diff --git a/features-v2.sql b/src/utils/features-v2.sql similarity index 100% rename from features-v2.sql rename to src/utils/features-v2.sql diff --git a/fetchdump.js b/src/utils/fetchdump.js similarity index 100% rename from fetchdump.js rename to src/utils/fetchdump.js diff --git a/getstats-deltacompression.js b/src/utils/getstats-deltacompression.js similarity index 100% rename from getstats-deltacompression.js rename to src/utils/getstats-deltacompression.js diff --git a/getstats-mangle.js b/src/utils/getstats-mangle.js similarity index 100% rename from getstats-mangle.js rename to src/utils/getstats-mangle.js diff --git a/obfuscator.js b/src/utils/obfuscator.js similarity index 100% rename from obfuscator.js rename to src/utils/obfuscator.js diff --git a/utils.js b/src/utils/utils.js similarity index 93% rename from utils.js rename to src/utils/utils.js index e882cbea..e36b99c7 100644 --- a/utils.js +++ b/src/utils/utils.js @@ -1,5 +1,16 @@ /* feature extraction utils */ -const logger = require('./logging'); +const logger = require('../logging'); + +const exec = require('child_process').exec; + +function getDirSize(pathToDir, callback) { + const child = exec('du -sh /path/to/dir', function(error, stdout, stderr){ + console.log('stderr: ' + stderr); + if (error !== null){ + console.log('exec error: ' + error); + } + }); +} function capitalize(str) { return str[0].toUpperCase() + str.substr(1); From e21963dd6f4679adab821b8aa34c1a7e753853d6 Mon Sep 17 00:00:00 2001 From: Andrei Gavrilescu Date: Thu, 30 Jul 2020 21:58:05 +0300 Subject: [PATCH 8/8] fix logger error --- Dockerfile | 11 +++++++---- Makefile | 14 ++++++++++++-- config/custom-environment-variables.yaml | 3 +++ config/default.yaml | 2 +- package-lock.json | 2 +- package.json | 2 +- src/app.js | 4 ++-- src/database/AmplitudeConnector.js | 3 ++- src/prom-collector.js | 2 +- 9 files changed, 30 insertions(+), 13 deletions(-) diff --git a/Dockerfile b/Dockerfile index d2e03341..caae29a5 100644 --- a/Dockerfile +++ b/Dockerfile @@ -13,14 +13,17 @@ RUN chown -R $app:$app /$app USER $app -COPY --chown=$app:$app . /$app +# Use cached node_modules in case package.json doesn't change. +COPY package.json package-lock.json /$app/ RUN npm install +COPY --chown=$app:$app . /$app -HEALTHCHECK --interval=10s --timeout=10s --start-period=10s \ - CMD curl --silent --fail http://localhost:3000/healthcheck \ - || exit 1 +# This will run in k8s context so we use the heartbeat from there. +# HEALTHCHECK --interval=10s --timeout=10s --start-period=10s \ +# CMD curl --silent --fail http://localhost:3000/healthcheck \ +# || exit 1 EXPOSE 3000 diff --git a/Makefile b/Makefile index a22b9492..0fcbd84b 100644 --- a/Makefile +++ b/Makefile @@ -9,13 +9,23 @@ build: run: @docker run \ -p 3000:3000 \ - -p 8085:8085 \ + -p 8095:8095 \ $(REPOSITORY):$(TAG) +debug-restricted: + @docker run \ + -p 3000:3000 \ + -p 8095:8095 \ + --env RTCSTATS_LOG_LEVEL=info \ + --entrypoint npm \ + --cpuset-cpus=0 \ + $(REPOSITORY):$(TAG) \ + run debug + debug: @docker run \ -p 3000:3000 \ - -p 8085:8085 \ + -p 8095:8095 \ -v $(PWD):/rtcstats-server \ --entrypoint npm \ $(REPOSITORY):$(TAG) \ diff --git a/config/custom-environment-variables.yaml b/config/custom-environment-variables.yaml index 1a34ed16..c6a09a87 100644 --- a/config/custom-environment-variables.yaml +++ b/config/custom-environment-variables.yaml @@ -1,3 +1,6 @@ +server: + logLevel: RTCSTATS_LOG_LEVEL + amplitude: key: RTCSTATS_AMPLITUDE_KEY diff --git a/config/default.yaml b/config/default.yaml index fc065c46..dcd61b43 100644 --- a/config/default.yaml +++ b/config/default.yaml @@ -7,7 +7,7 @@ server: logLevel: info jsonConsoleLog: false # The provided certificates are intended for local testing usign a HTTPS server - useHTTPS: true + useHTTPS: false keyPath: './certs/key.pem' certPath: './certs/cert.pem' diff --git a/package-lock.json b/package-lock.json index 598ceec3..f766d702 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,6 +1,6 @@ { "name": "rtcstats-server", - "version": "1.0.7", + "version": "1.0.9", "lockfileVersion": 1, "requires": true, "dependencies": { diff --git a/package.json b/package.json index 0e887fe3..f445f59a 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "rtcstats-server", - "version": "1.0.7", + "version": "1.0.9", "description": "", "main": "websocket.js", "scripts": { diff --git a/src/app.js b/src/app.js index 567ca2dc..781fcdfc 100644 --- a/src/app.js +++ b/src/app.js @@ -214,7 +214,7 @@ function setupMetricsServer(port) { switch (request.url) { case '/metrics': queueSize.set(workerPool.getTaskQueueSize()); - // prom.collectDefaultMetrics(); + prom.collectDefaultMetrics(); response.writeHead(200, { 'Content-Type': prom.contentType }); response.end(prom.register.metrics()); break; @@ -361,7 +361,7 @@ function run() { setupWebSocketsServer(server); - logger.info('Initialization complete.'); + logger.info('Initialization complete.') } /** diff --git a/src/database/AmplitudeConnector.js b/src/database/AmplitudeConnector.js index 910c7c1a..6fea4870 100644 --- a/src/database/AmplitudeConnector.js +++ b/src/database/AmplitudeConnector.js @@ -57,8 +57,9 @@ class AmplitudeConnector { session_id: rtcstatsFeatures.identity.sessionId, event_properties: { rtcstatsIdentity: rtcstatsFeatures.clientId, + displayName : rtcstatsFeatures.identity.displayName, ...rtcstatsFeatures.identity.hosts, - ...rtcstatsFeatures.identity.deployInfo, + ...rtcstatsFeatures.identity.deploymentInfo, ...this.extractRelevantStats(rtcstatsFeatures.connectionFeatures), }, }; diff --git a/src/prom-collector.js b/src/prom-collector.js index 654ad281..06dc2a85 100644 --- a/src/prom-collector.js +++ b/src/prom-collector.js @@ -55,7 +55,7 @@ const dumpSize = new prom.Summary({ setInterval(() => { getFolderSize('temp', (err, size) => { if (err) { - logger.err('Could not get disk queue dir size %j', err); + logger.error('Could not get disk queue dir size %j', err); return; } diskQueueSize.set(size);