diff --git a/.gitignore b/.gitignore index fdb568f6..54bd7dba 100644 --- a/.gitignore +++ b/.gitignore @@ -8,7 +8,6 @@ lib-cov *.pid *.gz *.tgz -*.sh *.tap temp @@ -18,7 +17,7 @@ results npm-debug.log -node_modules +node_modules GeoLite2-City.* diff --git a/.prettierrc b/.prettierrc new file mode 100644 index 00000000..a85dc7d2 --- /dev/null +++ b/.prettierrc @@ -0,0 +1,5 @@ +{ + "singleQuote": true, + "printWidth": 140, + "tabWidth": 4 + } \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index cc5a983b..caae29a5 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM node:10.15.3-alpine +FROM node:12.16-alpine RUN apk add --no-cache git && \ rm -rf /var/lib/apt/lists/* /var/cache/apk /usr/share/man /tmp/* @@ -13,14 +13,17 @@ RUN chown -R $app:$app /$app USER $app -COPY --chown=$app:$app . /$app +# Use cached node_modules in case package.json doesn't change. +COPY package.json package-lock.json /$app/ RUN npm install +COPY --chown=$app:$app . /$app -HEALTHCHECK --interval=10s --timeout=5s --start-period=10s \ - CMD curl --silent --fail http://localhost:3000/healthcheck \ - || exit 1 +# This will run in k8s context so we use the heartbeat from there. +# HEALTHCHECK --interval=10s --timeout=10s --start-period=10s \ +# CMD curl --silent --fail http://localhost:3000/healthcheck \ +# || exit 1 EXPOSE 3000 diff --git a/Makefile b/Makefile index da96f559..0fcbd84b 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -REGISTRY:=juandebravo +REGISTRY:=jitsi-dev IMAGE:=rtcstats-server REPOSITORY:=$(REGISTRY)/$(IMAGE) TAG:=latest @@ -9,16 +9,28 @@ build: run: @docker run \ -p 3000:3000 \ + -p 8095:8095 \ $(REPOSITORY):$(TAG) +debug-restricted: + @docker run \ + -p 3000:3000 \ + -p 8095:8095 \ + --env RTCSTATS_LOG_LEVEL=info \ + --entrypoint npm \ + --cpuset-cpus=0 \ + $(REPOSITORY):$(TAG) \ + run debug + debug: @docker run \ -p 3000:3000 \ + -p 8095:8095 \ -v $(PWD):/rtcstats-server \ - -e DEBUG=true \ --entrypoint npm \ $(REPOSITORY):$(TAG) \ run watch:dev push: build - @docker push $(REGISTRY)/$(IMAGE) + @echo "Push not configured." + #@docker push $(REGISTRY)/$(IMAGE) diff --git a/app.js b/app.js deleted file mode 100644 index 160f527c..00000000 --- a/app.js +++ /dev/null @@ -1,306 +0,0 @@ -'use strict'; -const fs = require('fs'); -const config = require('config'); -const uuid = require('uuid'); -const os = require('os'); -const child_process = require('child_process'); -const http = require('http'); - -const WebSocketServer = require('ws').Server; - -const logger = require('./logging'); -const obfuscate = require('./obfuscator'); - -// Configure database, fall back to redshift-firehose. -let database; -if (config.gcp && (config.gcp.dataset && config.gcp.table)) { - database = require('./database/bigquery.js')(config.gcp); -} -if (!database) { - database = require('./database/redshift-firehose.js')(config.firehose); -} - -// Configure store, fall back to S3 -let store; -if (config.gcp && config.gcp.bucket) { - store = require('./store/gcp.js')(config.gcp); -} -if (!store) { - store = require('./store/s3.js')(config.s3); -} - -let server; -const tempPath = 'temp'; - -const prom = require('prom-client'); - -const connected = new prom.Gauge({ - name: 'rtcstats_websocket_connections', - help: 'number of open websocket connections', -}); - -const processed = new prom.Counter({ - name: 'rtcstats_files_processed', - help: 'number of files processed', -}); - -const errored = new prom.Counter({ - name: 'rtcstats_files_errored', - help: 'number of files with errors during processing', -}); - -class ProcessQueue { - constructor() { - this.maxProc = os.cpus().length; - this.q = []; - this.numProc = 0; - } - enqueue(clientid) { - this.q.push(clientid); - if (this.numProc < this.maxProc) { - process.nextTick(this.process.bind(this)); - } else { - logger.info('process Q too long: %s', this.numProc); - } - } - process() { - const clientid = this.q.shift(); - if (!clientid) return; - const p = child_process.fork('extract.js', [clientid]); - p.on('exit', (code) => { - this.numProc--; - logger.info(`Done clientid: <${clientid}> proc: <${this.numProc}> code: <${code}>`); - if (code === 0) { - processed.inc(); - } else { - errored.inc(); - } - if (this.numProc < 0) { - this.numProc = 0; - } - if (this.numProc < this.maxProc) { - process.nextTick(this.process.bind(this)); - } - const path = tempPath + '/' + clientid; - store.put(clientid, path) - .then(() => { - fs.unlink(path, () => { }); - }) - .catch((err) => { - logger.error('Error storing: %s - %s', path, err); - fs.unlink(path, () => { }); - }) - }); - p.on('message', (msg) => { - logger.debug('Received message from child process'); - const { url, clientid, connid, clientFeatures, connectionFeatures, streamFeatures } = msg; - database.put(url, clientid, connid, clientFeatures, connectionFeatures, streamFeatures); - }); - p.on('error', () => { - this.numProc--; - logger.warn(`Failed to spawn, rescheduling clientid: <${clientid}> proc: <${this.numProc}>`); - this.q.push(clientid); // do not immediately retry - }); - this.numProc++; - if (this.numProc > 10) { - logger.info('Process Q: %n', this.numProc); - } - } -} -const q = new ProcessQueue(); - -function setupWorkDirectory() { - try { - if (fs.existsSync(tempPath)) { - fs.readdirSync(tempPath).forEach(fname => { - try { - logger.debug(`Removing file ${tempPath + '/' + fname}`) - fs.unlinkSync(tempPath + '/' + fname); - } catch (e) { - logger.error(`Error while unlinking file ${fname} - ${e.message}`); - } - }); - } else { - logger.debug(`Creating working dir ${tempPath}`) - fs.mkdirSync(tempPath); - } - } catch (e) { - logger.error(`Error while accessing working dir ${tempPath} - ${e.message}`); - } -} - -function setupHttpServer(port, keys) { - const options = keys ? { - key: keys.serviceKey, - cert: keys.certificate, - } : {} - - const server = http.Server(options, () => { }) - .on('request', (request, response) => { - switch (request.url) { - case '/healthcheck': - response.writeHead(200); - response.end(); - break; - default: - response.writeHead(404); - response.end(); - } - }) - .listen(port); - return server; -} - -function setupMetricsServer(port) { - const metricsServer = http.Server() - .on('request', (request, response) => { - switch (request.url) { - case '/metrics': - response.writeHead(200, { 'Content-Type': prom.contentType }); - response.end(prom.register.metrics()); - break; - default: - response.writeHead(404); - response.end(); - } - }) - .listen(port); - return metricsServer; -} - -function setupWebSocketsServer(server) { - const wss = new WebSocketServer({ server }); - wss.on('connection', (client, upgradeReq) => { - connected.inc(); - let numberOfEvents = 0; - // the url the client is coming from - const referer = upgradeReq.headers['origin'] + upgradeReq.url; - // TODO: check against known/valid urls - - const ua = upgradeReq.headers['user-agent']; - const clientid = uuid.v4(); - let tempStream = fs.createWriteStream(tempPath + '/' + clientid); - tempStream.on('finish', () => { - if (numberOfEvents > 0) { - q.enqueue(clientid); - } else { - fs.unlink(tempPath + '/' + clientid, () => { - // we're good... - }); - } - }); - - const meta = { - path: upgradeReq.url, - origin: upgradeReq.headers['origin'], - url: referer, - userAgent: ua, - time: Date.now(), - fileFormat: 2, - }; - tempStream.write(JSON.stringify(meta) + '\n'); - - const forwardedFor = upgradeReq.headers['x-forwarded-for']; - if (forwardedFor) { - const forwardedIPs = forwardedFor.split(','); - if (config.server.skipLoadBalancerIp) { - forwardedIPs.pop(); - } - const obfuscatedIPs = forwardedIPs.map(ip => { - const publicIP = ['publicIP', null, ip.trim()]; - obfuscate(publicIP); - return publicIP[2]; - }); - - const publicIP = ['publicIP', null, obfuscatedIPs, Date.now()]; - tempStream.write(JSON.stringify(publicIP) + '\n'); - } else { - const { remoteAddress } = upgradeReq.connection; - const publicIP = ['publicIP', null, remoteAddress]; - obfuscate(publicIP); - tempStream.write(JSON.stringify(['publicIP', null, [publicIP[2]], Date.now()]) + '\n'); - } - - logger.info('New app connected: ua: <%s>, referer: <%s>, clientid: <%s>', ua, referer, clientid); - - client.on('message', msg => { - try { - const data = JSON.parse(msg); - - numberOfEvents++; - - if (data[0].endsWith('OnError')) { - // monkey-patch java/swift sdk bugs. - data[0] = data[0].replace(/OnError$/, 'OnFailure'); - } - switch (data[0]) { - case 'getUserMedia': - case 'getUserMediaOnSuccess': - case 'getUserMediaOnFailure': - case 'navigator.mediaDevices.getUserMedia': - case 'navigator.mediaDevices.getUserMediaOnSuccess': - case 'navigator.mediaDevices.getUserMediaOnFailure': - tempStream.write(JSON.stringify(data) + '\n'); - break; - case 'constraints': - if (data[2].constraintsOptional) { // workaround for RtcStats.java bug. - data[2].optional = []; - Object.keys(data[2].constraintsOptional).forEach(key => { - const pair = {}; - pair[key] = data[2].constraintsOptional[key] - }); - delete data[2].constraintsOptional; - } - tempStream.write(JSON.stringify(data) + '\n'); - break; - default: - if (data[0] === 'getstats' && data[2].values) { // workaround for RtcStats.java bug. - const { timestamp, values } = data[2]; - data[2] = values; - data[2].timestamp = timestamp; - } - obfuscate(data); - tempStream.write(JSON.stringify(data) + '\n'); - break; - } - } catch (e) { - logger.error('Error while processing: %s - %s', e, msg); - } - }); - - client.on('error', e => { - logger.error('Websocket error: %s', e); - }); - - client.on('close', () => { - connected.dec(); - tempStream.write(JSON.stringify(['close', null, null, Date.now()])); - tempStream.end(); - tempStream = null; - }); - }); -} - -function run(keys) { - setupWorkDirectory(); - - server = setupHttpServer(config.get('server').port, keys); - - if (config.get('server').metrics) { - setupMetricsServer(config.get('server').metrics); - } - - setupWebSocketsServer(server); -} - -function stop() { - if (server) { - server.close(); - } -} - -run(); - -module.exports = { - stop: stop -}; diff --git a/certs/cert.pem b/certs/cert.pem new file mode 100644 index 00000000..970a589d --- /dev/null +++ b/certs/cert.pem @@ -0,0 +1,21 @@ +-----BEGIN CERTIFICATE----- +MIIDjDCCAnQCCQDPkKLEhyE8aTANBgkqhkiG9w0BAQUFADCBhzELMAkGA1UEBhMC +VVMxEzARBgNVBAgMCkNBTElGT1JOSUExETAPBgNVBAcMCFNhbiBKb3NlMQ0wCwYD +VQQKDAR0ZXN0MQ0wCwYDVQQLDAR0ZXN0MQ0wCwYDVQQDDAR0ZXN0MSMwIQYJKoZI +hvcNAQkBFhRqb2huLmRvZUBkb2VtYWlsLmNvbTAeFw0yMDA2MjIxODU2NDNaFw00 +NzExMDcxODU2NDNaMIGHMQswCQYDVQQGEwJVUzETMBEGA1UECAwKQ0FMSUZPUk5J +QTERMA8GA1UEBwwIU2FuIEpvc2UxDTALBgNVBAoMBHRlc3QxDTALBgNVBAsMBHRl +c3QxDTALBgNVBAMMBHRlc3QxIzAhBgkqhkiG9w0BCQEWFGpvaG4uZG9lQGRvZW1h +aWwuY29tMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA2WGnATMYgYCo +tF1djx1qPJ3WECsaWh6uqqirvz+6CuTbV5eJDxbVptfqI/RePby8bEHL0iZzN15t +044gMv9fvWciaa+lF48+H2mQtz4ASW44vpshsd9F1UxPS72Dw87ntIChJbj0e0me +NBT9knA3dL3MRz133wdrGkFGwHeovz/twgMkB+RAdz8wHIFo5znxid2ejQl0bDaI +J455Q8M8evReTSIe8RHxTUwWTUwt+s9VQOvCzb0fQS1MJKsAdmh3LfOqmMQ06vvR +yFSeDDcDDQrlxRb5MeczLfiu/t/UGpTEGCJXfiSeNISEXFtBQnTdQub8BUoHCw6+ +rVeUadFsYwIDAQABMA0GCSqGSIb3DQEBBQUAA4IBAQB5Uf2aRs6NeGJhdE50xuVw +93BerkZkXQaq/zhTtDsA6cczhxJjJXOakkA+AY0YxwWHEMw9sVjgPb0pzaChzsFh +aourbxWyRGSQXp8OMifnEMB9TOD6c4vZAZZpZnIlqTWVRDb/nayYEsw0m/cEfIYD +1wJqHbVw4vvsIJn9aCNnx/oyL+rl6d9cmgQ1CIlEMvTvGMCtO7yUDtAjSy9NQbIp +I+TB1fNDeoDJn1ghQPGViBLYUTj6cmI/u/wXSj401oM7rvrm+C7ZKlWohcwKXmm4 +fK0ngbgDOIEZpE7FqT7NV2BPs3XhycSl16mkGQJ75SIcpgm+bb1rKWxOGTiRksa4 +-----END CERTIFICATE----- diff --git a/certs/key.pem b/certs/key.pem new file mode 100644 index 00000000..95f2da58 --- /dev/null +++ b/certs/key.pem @@ -0,0 +1,27 @@ +-----BEGIN RSA PRIVATE KEY----- +MIIEogIBAAKCAQEA2WGnATMYgYCotF1djx1qPJ3WECsaWh6uqqirvz+6CuTbV5eJ +DxbVptfqI/RePby8bEHL0iZzN15t044gMv9fvWciaa+lF48+H2mQtz4ASW44vpsh +sd9F1UxPS72Dw87ntIChJbj0e0meNBT9knA3dL3MRz133wdrGkFGwHeovz/twgMk +B+RAdz8wHIFo5znxid2ejQl0bDaIJ455Q8M8evReTSIe8RHxTUwWTUwt+s9VQOvC +zb0fQS1MJKsAdmh3LfOqmMQ06vvRyFSeDDcDDQrlxRb5MeczLfiu/t/UGpTEGCJX +fiSeNISEXFtBQnTdQub8BUoHCw6+rVeUadFsYwIDAQABAoIBAH76hd0zhZsQFnvV +FfOlUQs7f3FOXERMK+dQQ5KhnQEEEgQmZk9EHWUqNoDuG6agesgZ3v9QqnirVif/ +m1tuxPQULIvjp+INMFKVDY2cT/qUwdzFLXeDXn1r593sQ+27DKnpgThRw63IoPr3 +T++cUSiGPa9Xfo/u+2cIvlVrEE017FxMr/b1POMwytrn3N8T+IVnD5Ny8xGnTeYd +F6Wl/FArfgL80/10son6Kw0GPaaQNC+DDesm8ZVE+y5jFvK22yBRgGc404uF5Vbo +8Dym0+zGD2gO8hZIjMMl9N2dgJ/b56qFf6dh23Tq69pKOvVsJpKTiIfQOwBSfmjp +j8CyVZECgYEA/QAOR0dY9NlhWwn1IWexE0JhFPYsKQhVo4Nwbxe+UGpjwpxzlNYM +EdG+JxETIqnutxw8ngvbhgpOLopvTWxwQnazw0Hr5tAQO3dB80l7QiIWfrfyZcgI +Gz9PyD1gWRF/zdIpLPg4TzTSDmWdcV/wej00IgkyuAX79N8sbX6LSy8CgYEA2/V7 +LhIv3mlcU8+CsPndbJLyo/lBsWEpNjBa4bIqz+kVuQDzE6+6iOs2fFBsRvJ34Kf/ +2UVK6s9xe37GD/PSo2GivGOb+zVFia+TJcSgHj9LQUEgfuOTFGBuZsrKbefzKwGA +de+H5q+6llk+W9ZTWy4TSYFPBWc7UkFZb724VQ0CgYAlEFoHJTOqAyKZFLddonQ1 +jxbr3DiR9k+deccB72eJHlzpCMSB/G2eOqzxyjWUcXKwTqmuuav6Ug4sEUnG/Ojh +Q8SICWNG2BpYq9r0ikJNaPMEs2wGbyyI2ViVzDAOPFsNywkPNnoBWIqhY0+SaWyw +a8D0b4aHoRDNSdiXXd+ILwKBgFQFP1pn5BUnVfdFyvxjVauFrl3odqmVHbLvYafY +8PWeaYfTzwZ0F+L5RkTSS6oGMLiGM/sAtw9e0lCEKpApaQqz3v/rZMfen4Nqp+DD +bQ5gyxRQFmOh9qrP8xwc1pqJAaAp4LIsH1OFSNbpnCJkik7IGOH5HQBJYKWZBNrk +M6d5AoGAJ2DWW1OHzZjA82WGXaD4n+4BB1d/2CVwg2KLcK8A8Q0yPOp3U+lkVdwM +bZ/B6b6yGoqgh1iRkK04b8/G6sChuZ5X/miGI3Qa4a0y5eX+EbRHNx3QgipfCMfQ +EPsByAK9PjuptR1NvGxnrvCT5xKAnoUqrmm0A+kf1gKknyeMqds= +-----END RSA PRIVATE KEY----- diff --git a/config/custom-environment-variables.yaml b/config/custom-environment-variables.yaml new file mode 100644 index 00000000..c6a09a87 --- /dev/null +++ b/config/custom-environment-variables.yaml @@ -0,0 +1,9 @@ +server: + logLevel: RTCSTATS_LOG_LEVEL + +amplitude: + key: RTCSTATS_AMPLITUDE_KEY + +s3: + region: RTCSTATS_S3_AWS_REGION + bucket: RTCSTATS_S3_BUCKET \ No newline at end of file diff --git a/config/debug.yaml b/config/debug.yaml new file mode 100644 index 00000000..e3118f78 --- /dev/null +++ b/config/debug.yaml @@ -0,0 +1,38 @@ +server: + port: 3000 + metrics: 8095 + # Set to true if you've a LB in front of RTCStats and you are obtaining + # its IP address as part of the X-Forwarded-For header + skipLoadBalancerIp: false + logLevel: debug + jsonConsoleLog: false + useHTTPS: true + +amplitude: + key: '' + +s3: + accessKeyId: + secretAccessKey: + region: + bucket: + useIAMAuth: false + +firehose: + accessKeyId: + secretAccessKey: + region: + stream: + +gcp: + bucket: + dataset: + table: + fields: + maxFlushTime: + bufferSize: + +github: + client_id: GITHUB_CLIENT_ID + client_secret: GITHUB_SECRET + callback_url: GITHUB_CALLBACK_URL diff --git a/config/default.yaml b/config/default.yaml index 16cccc19..dcd61b43 100644 --- a/config/default.yaml +++ b/config/default.yaml @@ -1,15 +1,25 @@ server: port: 3000 - metrics: - # Set to true if you've a LB in front of RTCStats and you are obtaining + metrics: 8095 + # Set to true if you've a LB in front of RTCStats and you are obtaining # its IP address as part of the X-Forwarded-For header skipLoadBalancerIp: false + logLevel: info + jsonConsoleLog: false + # The provided certificates are intended for local testing usign a HTTPS server + useHTTPS: false + keyPath: './certs/key.pem' + certPath: './certs/cert.pem' + +amplitude: + key: s3: accessKeyId: secretAccessKey: region: bucket: + useIAMAuth: false firehose: accessKeyId: diff --git a/config/production.yaml b/config/production.yaml index 16cccc19..5efc099f 100644 --- a/config/production.yaml +++ b/config/production.yaml @@ -1,15 +1,23 @@ server: port: 3000 - metrics: - # Set to true if you've a LB in front of RTCStats and you are obtaining + metrics: 8095 + # Set to true if you've a LB in front of RTCStats and you are obtaining # its IP address as part of the X-Forwarded-For header skipLoadBalancerIp: false + logLevel: info + jsonConsoleLog: true + # The provided certificates are intended for local testing usign a HTTPS server + useHTTPS: false + +amplitude: + key: s3: accessKeyId: secretAccessKey: region: bucket: + useIAMAuth: true firehose: accessKeyId: diff --git a/extract.js b/extract.js deleted file mode 100644 index 7c46fb1b..00000000 --- a/extract.js +++ /dev/null @@ -1,216 +0,0 @@ -const fs = require('fs'); - -const canUseProcessSend = !!process.send; -const isProduction = process.env.NODE_ENV && process.env.NODE_ENV === 'production'; - -function capitalize(str) { - return str[0].toUpperCase() + str.substr(1); -} - -function safeFeature(feature) { - if (typeof feature === 'number' && isNaN(feature)) feature = -1; - if (typeof feature === 'number' && !isFinite(feature)) feature = -2; - /* - if (feature === false) feature = 0; - if (feature === true) feature = 1; - */ - - return feature; -} - -const connectionfeatures = require('./features-connection'); -const clientfeatures = require('./features-client'); -const streamfeatures = require('./features-stream'); -const statsDecompressor = require('./getstats-deltacompression').decompress; -const statsMangler = require('./getstats-mangle'); -const {extractTracks, extractStreams} = require('./utils'); - -// dumps all peerconnections. -function dump(url, client) { - // ignore connections that never send getUserMedia or peerconnection events. - if (client.getUserMedia.length === 0 && Object.keys(client.peerConnections).length === 0) return; - if (!isProduction) { - let total = 0; - Object.keys(client.peerConnections).forEach(id => { - total += client.peerConnections[id].length; - }); - console.log('DUMP', client.getUserMedia.length, Object.keys(client.peerConnections).length, total); - return; - } -} - -// Feature generation -function generateFeatures(url, client, clientid) { - // ignore connections that never send getUserMedia or peerconnection events. - if (client.getUserMedia.length === 0 && Object.keys(client.peerConnections).length === 0) return; - - // clientFeatures are the same for all peerconnections but are saved together - // with each peerconnection anyway to make correlation easier. - const clientFeatures = {}; - Object.keys(clientfeatures).forEach(fname => { - let feature = clientfeatures[fname].apply(null, [client]); - if (feature !== undefined) { - if (typeof feature === 'object') { - Object.keys(feature).forEach(subname => { - feature[subname] = safeFeature(feature[subname]); - if (!isProduction) { - console.log('PAGE', 'FEATURE', fname + capitalize(subname), '=>', safeFeature(feature[subname])); - } - clientFeatures[fname + capitalize(subname)] = feature[subname]; - }); - } else { - feature = safeFeature(feature); - if (!isProduction) { - console.log('PAGE', 'FEATURE', fname, '=>', feature); - } - clientFeatures[fname] = feature; - } - } - }); - if (Object.keys(client.peerConnections).length === 0) { - // we only have GUM and potentially GUM errors. - if (canUseProcessSend && isProduction) { - process.send({url, clientid, connid: '', clientFeatures}); - } - } - - Object.keys(client.peerConnections).forEach(connid => { - if (connid === 'null' || connid === '') return; // ignore the null connid and empty strings - const conn = client.peerConnections[connid]; - const connectionFeatures = {}; - Object.keys(connectionfeatures).forEach(fname => { - let feature = connectionfeatures[fname].apply(null, [client, conn]); - if (feature !== undefined) { - if (typeof feature === 'object') { - Object.keys(feature).forEach(subname => { - feature[subname] = safeFeature(feature[subname]); - if (!isProduction) { - console.log(connid, 'FEATURE', fname + capitalize(subname), '=>', safeFeature(feature[subname])); - } - connectionFeatures[fname + capitalize(subname)] = feature[subname]; - }); - } else { - feature = safeFeature(feature); - if (!isProduction) { - console.log(connid, 'FEATURE', fname, '=>', safeFeature(feature)); - } - connectionFeatures[fname] = feature; - } - } - }); - - const tracks = extractTracks(conn); - const streams = extractStreams(tracks); - for (const [streamId, tracks] of streams.entries()) { - const streamFeatures = {streamId}; - for (const {trackId, kind, direction, stats} of tracks) { - Object.keys(streamfeatures).forEach(fname => { - let feature = streamfeatures[fname].apply(null, [{kind, direction, trackId, stats, peerConnectionLog: conn}]); - if (feature !== undefined) { - feature = safeFeature(feature); - if (typeof feature === 'object') { - Object.keys(feature).forEach(subname => { - feature[subname] = safeFeature(feature[subname]); - if (!isProduction) { - console.log(connid, 'STREAM', streamId, 'TRACK', trackId, 'FEATURE', fname + capitalize(subname), '=>', safeFeature(feature[subname])); - } - streamFeatures[fname + capitalize(subname)] = feature[subname]; - }); - } else { - feature = safeFeature(feature); - if (!isProduction) { - console.log(connid, 'STREAM', streamId, 'TRACK', trackId, 'FEATURE', fname, '=>', safeFeature(feature)); - } - streamFeatures[fname] = feature; - } - } - }); - } - if (canUseProcessSend && isProduction) { - process.send({url, clientid, connid, clientFeatures, connectionFeatures, streamFeatures}); - } - } - delete client.peerConnections[connid]; // save memory - }); -} - -const clientid = process.argv[2]; -const path = 'temp/' + clientid; -fs.readFile(path, {encoding: 'utf-8'}, (err, data) => { - if (err) { - console.error(err, path); - return; - } - const baseStats = {}; - const lines = data.split('\n'); - const client = JSON.parse(lines.shift()); - client.peerConnections = {}; - client.getUserMedia = []; - lines.forEach(line => { - if (line.length) { - const data = JSON.parse(line); - const time = new Date(data.time || data[3]); - delete data.time; - switch(data[0]) { - case 'publicIP': - client.publicIP = data[2]; - break; - case 'userfeedback': // TODO: might be renamed - client.feedback = data[2]; - break; - case 'tags': // experiment variation tags - client.tags = data[2]; - break; - case 'wsconnect': - client.websocketConnectionTime = data[2] >>> 0; - break; - case 'wsconnecterror': - client.websocketError = data[2]; - break; - case 'identity': // identity meta-information when its not possible to feed into RTCPeerConnection. - client.identity = data[2]; - break; - case 'getUserMedia': - case 'getUserMediaOnSuccess': - case 'getUserMediaOnFailure': - case 'navigator.mediaDevices.getUserMedia': - case 'navigator.mediaDevices.getUserMediaOnSuccess': - case 'navigator.mediaDevices.getUserMediaOnFailure': - case 'navigator.getDisplayMedia': - case 'navigator.getDisplayMediaOnSucces': - case 'navigator.mediaDevices.getDisplayMedia': - case 'navigator.mediaDevices.getDisplayMediaOnSuccess': - client.getUserMedia.push({ - time: time, - timestamp: time.getTime(), - type: data[0], - value: data[2] - }); - break; - default: - if (!client.peerConnections[data[1]]) { - client.peerConnections[data[1]] = []; - baseStats[data[1]] = {}; - } - if (data[0] === 'getstats') { // delta-compressed - data[2] = statsDecompressor(baseStats[data[1]], data[2]); - baseStats[data[1]] = JSON.parse(JSON.stringify(data[2])); - } - if (data[0] === 'getStats' || data[0] === 'getstats') { - data[2] = statsMangler(data[2]); - data[0] = 'getStats'; - } - client.peerConnections[data[1]].push({ - time: time, - timestamp: time.getTime(), - type: data[0], - value: data[2], - }); - break; - } - } - }); - - dump(client.url, client); - generateFeatures(client.url, client, clientid); -}); diff --git a/logging.js b/logging.js deleted file mode 100644 index 2f508a71..00000000 --- a/logging.js +++ /dev/null @@ -1,19 +0,0 @@ -const winston = require('winston'); - -const { DEBUG } = process.env; - -const { combine, splat, timestamp, json } = winston.format; - -const addSeverity = winston.format(logEntry => { - return { severity: logEntry.level.toUpperCase(), ...logEntry }; -}); - -const loggerOptions = { - level: DEBUG ? 'debug' : 'info', - format: combine(timestamp(), splat(), addSeverity(), json()), - transports: [new winston.transports.Console()], -}; - -const logger = winston.createLogger(loggerOptions); - -module.exports = logger; diff --git a/package-lock.json b/package-lock.json index dfb415c1..f766d702 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,6 +1,6 @@ { "name": "rtcstats-server", - "version": "1.0.0", + "version": "1.0.9", "lockfileVersion": 1, "requires": true, "dependencies": { @@ -219,6 +219,14 @@ "repeat-string": "^1.5.2" } }, + "amplitude": { + "version": "4.0.1", + "resolved": "https://registry.npmjs.org/amplitude/-/amplitude-4.0.1.tgz", + "integrity": "sha512-a35pAUIHiLLyVqBna95PugL9yjJjGve0YuEYiMfnO/hUs633epSK9A3NfbWu5nc98XDi2ZfiSe2J4/XnA+89cA==", + "requires": { + "superagent": "^5.2.1" + } + }, "ansi-align": { "version": "2.0.0", "resolved": "https://registry.npmjs.org/ansi-align/-/ansi-align-2.0.0.tgz", @@ -291,6 +299,11 @@ "resolved": "https://registry.npmjs.org/async-limiter/-/async-limiter-1.0.1.tgz", "integrity": "sha512-csOlWGAcRFJaI6m+F2WKdnMKr4HhdhFVBk0H/QbJFMCr+uO2kwohwXQPxw/9OCxp05r5ghVBFSyioixx3gfkNQ==" }, + "asynckit": { + "version": "0.4.0", + "resolved": "https://registry.npmjs.org/asynckit/-/asynckit-0.4.0.tgz", + "integrity": "sha1-x57Zf380y48robyXkLzDZkdLS3k=" + }, "aws-sdk": { "version": "2.605.0", "resolved": "https://registry.npmjs.org/aws-sdk/-/aws-sdk-2.605.0.tgz", @@ -567,6 +580,19 @@ "text-hex": "1.0.x" } }, + "combined-stream": { + "version": "1.0.8", + "resolved": "https://registry.npmjs.org/combined-stream/-/combined-stream-1.0.8.tgz", + "integrity": "sha512-FQN4MRfuJeHf7cBbBMJFXhKSDq+2kAArBlmRBvcvFE5BB1HZKXtSFASDhdlz9zOYwxh8lDdnvmMOe/+5cdoEdg==", + "requires": { + "delayed-stream": "~1.0.0" + } + }, + "component-emitter": { + "version": "1.3.0", + "resolved": "https://registry.npmjs.org/component-emitter/-/component-emitter-1.3.0.tgz", + "integrity": "sha512-Rd3se6QB+sO1TwqZjscQrurpEPIfO0/yYnSin6Q/rD3mOutHvUrCAhJub3r90uNb+SESBuE0QYoB90YdfatsRg==" + }, "compressible": { "version": "2.0.18", "resolved": "https://registry.npmjs.org/compressible/-/compressible-2.0.18.tgz", @@ -625,6 +651,11 @@ "xdg-basedir": "^4.0.0" } }, + "cookiejar": { + "version": "2.1.2", + "resolved": "https://registry.npmjs.org/cookiejar/-/cookiejar-2.1.2.tgz", + "integrity": "sha512-Mw+adcfzPxcPeI+0WlvRrr/3lGVO0bD75SxX6811cxSh1Wbxx7xZBGK1eVtDf6si8rg2lhnUjsVLMFMfbRIuwA==" + }, "core-util-is": { "version": "1.0.2", "resolved": "https://registry.npmjs.org/core-util-is/-/core-util-is-1.0.2.tgz", @@ -701,6 +732,11 @@ "integrity": "sha1-s2nW+128E+7PUk+RsHD+7cNXzzQ=", "dev": true }, + "delayed-stream": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/delayed-stream/-/delayed-stream-1.0.0.tgz", + "integrity": "sha1-3zrhmayt+31ECqrgsp4icrJOxhk=" + }, "diagnostics": { "version": "1.1.1", "resolved": "https://registry.npmjs.org/diagnostics/-/diagnostics-1.1.1.tgz", @@ -1058,6 +1094,14 @@ "flat-cache": "^2.0.1" } }, + "file-stream-rotator": { + "version": "0.5.7", + "resolved": "https://registry.npmjs.org/file-stream-rotator/-/file-stream-rotator-0.5.7.tgz", + "integrity": "sha512-VYb3HZ/GiAGUCrfeakO8Mp54YGswNUHvL7P09WQcXAJNSj3iQ5QraYSp3cIn1MUyw6uzfgN/EFOarCNa4JvUHQ==", + "requires": { + "moment": "^2.11.2" + } + }, "fill-range": { "version": "7.0.1", "resolved": "https://registry.npmjs.org/fill-range/-/fill-range-7.0.1.tgz", @@ -1084,6 +1128,21 @@ "integrity": "sha512-a1hQMktqW9Nmqr5aktAux3JMNqaucxGcjtjWnZLHX7yyPCmlSV3M54nGYbqT8K+0GhF3NBgmJCc3ma+WOgX8Jg==", "dev": true }, + "form-data": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/form-data/-/form-data-3.0.0.tgz", + "integrity": "sha512-CKMFDglpbMi6PyN+brwB9Q/GOw0eAnsrEZDgcsH5Krhz5Od/haKHAX0NmQfha2zPPz0JpWzA7GJHGSnvCRLWsg==", + "requires": { + "asynckit": "^0.4.0", + "combined-stream": "^1.0.8", + "mime-types": "^2.1.12" + } + }, + "formidable": { + "version": "1.2.2", + "resolved": "https://registry.npmjs.org/formidable/-/formidable-1.2.2.tgz", + "integrity": "sha512-V8gLm+41I/8kguQ4/o1D3RIHRmhYFG4pnNyonvua+40rqcEmT4+V71yaZ3B457xbbgCsCfjSPi65u/W6vK1U5Q==" + }, "fs.realpath": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/fs.realpath/-/fs.realpath-1.0.0.tgz", @@ -1103,6 +1162,11 @@ "integrity": "sha1-GwqzvVU7Kg1jmdKcDj6gslIHgyc=", "dev": true }, + "gar": { + "version": "1.0.4", + "resolved": "https://registry.npmjs.org/gar/-/gar-1.0.4.tgz", + "integrity": "sha512-w4n9cPWyP7aHxKxYHFQMegj7WIAsL/YX/C4Bs5Rr8s1H9M1rNtRWRsw+ovYMkXDQ5S4ZbYHsHAPmevPjPgw44w==" + }, "gaxios": { "version": "2.2.2", "resolved": "https://registry.npmjs.org/gaxios/-/gaxios-2.2.2.tgz", @@ -1137,6 +1201,15 @@ "stream-events": "^1.0.4" } }, + "get-folder-size": { + "version": "2.0.1", + "resolved": "https://registry.npmjs.org/get-folder-size/-/get-folder-size-2.0.1.tgz", + "integrity": "sha512-+CEb+GDCM7tkOS2wdMKTn9vU7DgnKUTuDlehkNJKNSovdCOVxs14OfKCk4cvSaR3za4gj+OBdl9opPN9xrJ0zA==", + "requires": { + "gar": "^1.0.4", + "tiny-each-async": "2.0.3" + } + }, "get-stream": { "version": "3.0.0", "resolved": "https://registry.npmjs.org/get-stream/-/get-stream-3.0.0.tgz", @@ -1688,6 +1761,11 @@ "is-buffer": "~1.1.1" } }, + "methods": { + "version": "1.1.2", + "resolved": "https://registry.npmjs.org/methods/-/methods-1.1.2.tgz", + "integrity": "sha1-VSmk1nZUE07cxSZmVoNbD4Ua/O4=" + }, "mime": { "version": "2.4.4", "resolved": "https://registry.npmjs.org/mime/-/mime-2.4.4.tgz", @@ -1742,6 +1820,11 @@ } } }, + "moment": { + "version": "2.25.3", + "resolved": "https://registry.npmjs.org/moment/-/moment-2.25.3.tgz", + "integrity": "sha512-PuYv0PHxZvzc15Sp8ybUCoQ+xpyPWvjOuK72a5ovzp2LI32rJXOiIfyoFoYvG3s6EwwrdkMyWuRiEHSZRLJNdg==" + }, "ms": { "version": "2.1.2", "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.2.tgz", @@ -1834,6 +1917,11 @@ "path-key": "^2.0.0" } }, + "object-hash": { + "version": "2.0.3", + "resolved": "https://registry.npmjs.org/object-hash/-/object-hash-2.0.3.tgz", + "integrity": "sha512-JPKn0GMu+Fa3zt3Bmr66JhokJU5BaNBIh4ZeTlaCBzrBsOeXzwcKKAK1tbLiPKgvwmPXsDvvLHoWh5Bm7ofIYg==" + }, "once": { "version": "1.4.0", "resolved": "https://registry.npmjs.org/once/-/once-1.4.0.tgz", @@ -2034,6 +2122,11 @@ "resolved": "https://registry.npmjs.org/punycode/-/punycode-1.3.2.tgz", "integrity": "sha1-llOgNvt8HuQjQvIyXM7v6jkmxI0=" }, + "qs": { + "version": "6.9.3", + "resolved": "https://registry.npmjs.org/qs/-/qs-6.9.3.tgz", + "integrity": "sha512-EbZYNarm6138UKKq46tdx08Yo/q9ZhFoAXAI1meAFd2GtbRDhbZY2WQSICskT0c5q99aFzLG1D4nvTk9tqfXIw==" + }, "querystring": { "version": "0.2.0", "resolved": "https://registry.npmjs.org/querystring/-/querystring-0.2.0.tgz", @@ -2348,6 +2441,36 @@ "resolved": "https://registry.npmjs.org/stubs/-/stubs-3.0.0.tgz", "integrity": "sha1-6NK6H6nJBXAwPAMLaQD31fiavls=" }, + "superagent": { + "version": "5.2.2", + "resolved": "https://registry.npmjs.org/superagent/-/superagent-5.2.2.tgz", + "integrity": "sha512-pMWBUnIllK4ZTw7p/UaobiQPwAO5w/1NRRTDpV0FTVNmECztsxKspj3ZWEordVEaqpZtmOQJJna4yTLyC/q7PQ==", + "requires": { + "component-emitter": "^1.3.0", + "cookiejar": "^2.1.2", + "debug": "^4.1.1", + "fast-safe-stringify": "^2.0.7", + "form-data": "^3.0.0", + "formidable": "^1.2.1", + "methods": "^1.1.2", + "mime": "^2.4.4", + "qs": "^6.9.1", + "readable-stream": "^3.4.0", + "semver": "^6.3.0" + }, + "dependencies": { + "readable-stream": { + "version": "3.6.0", + "resolved": "https://registry.npmjs.org/readable-stream/-/readable-stream-3.6.0.tgz", + "integrity": "sha512-BViHy7LKeTz4oNnkcLJ+lVSL6vpiFeX6/d3oSH8zCW7UxP2onchk+vTGB143xuFjHS3deTgkKoXXymXqymiIdA==", + "requires": { + "inherits": "^2.0.3", + "string_decoder": "^1.1.1", + "util-deprecate": "^1.0.1" + } + } + } + }, "supports-color": { "version": "5.5.0", "resolved": "https://registry.npmjs.org/supports-color/-/supports-color-5.5.0.tgz", @@ -2464,6 +2587,11 @@ "integrity": "sha1-8y6srFoXW+ol1/q1Zas+2HQe9W8=", "dev": true }, + "tiny-each-async": { + "version": "2.0.3", + "resolved": "https://registry.npmjs.org/tiny-each-async/-/tiny-each-async-2.0.3.tgz", + "integrity": "sha1-jru/1tYpXxNwAD+7NxYq/loKUdE=" + }, "tmp": { "version": "0.0.33", "resolved": "https://registry.npmjs.org/tmp/-/tmp-0.0.33.tgz", @@ -2773,6 +2901,17 @@ } } }, + "winston-daily-rotate-file": { + "version": "4.4.2", + "resolved": "https://registry.npmjs.org/winston-daily-rotate-file/-/winston-daily-rotate-file-4.4.2.tgz", + "integrity": "sha512-pVOUJKxN+Kn6LnOJZ4tTwdV5+N+fCkiRAb3bVnzcPtOj1ScxGNC3DyUhHuAHssBtMl5s45/aUcSUtApH+69V5A==", + "requires": { + "file-stream-rotator": "^0.5.7", + "object-hash": "^2.0.1", + "triple-beam": "^1.3.0", + "winston-transport": "^4.2.0" + } + }, "winston-transport": { "version": "4.3.0", "resolved": "https://registry.npmjs.org/winston-transport/-/winston-transport-4.3.0.tgz", diff --git a/package.json b/package.json index 39533d6a..f445f59a 100644 --- a/package.json +++ b/package.json @@ -1,12 +1,13 @@ { "name": "rtcstats-server", - "version": "1.0.0", + "version": "1.0.9", "description": "", "main": "websocket.js", "scripts": { - "test": "eslint *.js database/*.js store/*.js && node test/client.js", - "start": "NODE_ENV=production node app.js", - "watch:dev": "nodemon --exec 'NODE_ENV=production node app.js'" + "test": "eslint ./src/*.js ./src/database/*.js ./src/store/*.js && node ./src/test/client.js", + "start": "NODE_ENV=production node ./src/app.js", + "watch:dev": "nodemon --exec 'NODE_ENV=debug node ./src/app.js'", + "debug": "NODE_ENV=debug node ./src/app.js" }, "repository": { "type": "git", @@ -21,9 +22,11 @@ "dependencies": { "@google-cloud/bigquery": "^4.5.0", "@google-cloud/storage": "^4.1.3", + "amplitude": "^4.0.1", "aws-sdk": "^2.441.0", "bluebird": "^3.3.1", "config": "^1.17.1", + "get-folder-size": "^2.0.1", "js-yaml": "^3.13.1", "pem": "^1.14.2", "platform": "https://github.com/bestiejs/platform.js.git", @@ -31,6 +34,7 @@ "sdp": "^2.9.0", "uuid": "^2.0.1", "winston": "^3.2.1", + "winston-daily-rotate-file": "^4.4.2", "ws": "^5.1.1" }, "devDependencies": { diff --git a/src/app.js b/src/app.js new file mode 100644 index 00000000..781fcdfc --- /dev/null +++ b/src/app.js @@ -0,0 +1,386 @@ +'use strict'; +//const child_process = require('child_process'); +const fs = require('fs'); +const os = require('os'); +const http = require('http'); +const https = require('https'); +const path = require('path'); + +const WebSocketServer = require('ws').Server; +const config = require('config'); +const uuid = require('uuid'); + +const AmplitudeConnector = require('./database/AmplitudeConnector'); +const logger = require('./logging'); +const obfuscate = require('./utils/obfuscator'); +const { name: appName, version: appVersion } = require('../package'); +const { + connected, + connection_error, + //diskQueueSize, + dumpSize, + errored, + processed, + processTime, + prom, + queueSize +} = require('./prom-collector'); +const { getEnvName, RequestType, ResponseType } = require('./utils/utils'); +const WorkerPool = require('./utils/WorkerPool'); + +// Configure database, fall back to redshift-firehose. +let database; +if (config.gcp && config.gcp.dataset && config.gcp.table) { + database = require('./database/bigquery.js')(config.gcp); +} + +if (!database) { + database = require('./database/redshift-firehose.js')(config.firehose); +} + +if (!database) { + logger.warn('No database configured!'); +} + +// Configure store, fall back to S3 +let store; +if (config.gcp && config.gcp.bucket) { + store = require('./store/gcp.js')(config.gcp); +} + +if (!store) { + store = require('./store/s3.js')(config.s3); +} + +// Configure Amplitude backend +let amplitude; +if (config.amplitude && config.amplitude.key) { + amplitude = new AmplitudeConnector(config.amplitude.key); +} else { + logger.warn('Amplitude is not configured!'); +} + +let server; +const tempPath = 'temp'; + +function storeDump(clientId) { + const path = tempPath + '/' + clientId; + store + .put(clientId, path) + .then(() => { + fs.unlink(path, () => {}); + }) + .catch((err) => { + logger.error('Error storing: %s - %s', path, err); + fs.unlink(path, () => {}); + }); +} + +function getIdealWorkerCount() { + // Using all the CPUs available might slow down the main node.js thread which is responsible for handling + // requests. + return os.cpus().length - 1; +} + +const workerScriptPath = path.join(__dirname, './features/extract.js'); +const workerPool = new WorkerPool(workerScriptPath, getIdealWorkerCount()); + +workerPool.on(ResponseType.PROCESSING, (body) => { + logger.debug('Handling PROCESSING event with body %j', body); + + // Amplitude has constraints and limits of what information one sends, so it has a designated backend which + // only sends specific features. + if (amplitude) { + amplitude.track(body); + } + + // Current supported databases are big data type so we can send bulk data without having to worry about + // volume. + if (database) { + const { url, clientId, connid, clientFeatures, connectionFeatures } = body; + + if (!connectionFeatures) { + database.put(url, clientId, connid, clientFeatures); + } else { + // When using a database backend the streams features are stored separately, so we don't need them + // in the connectionFeatures object. + const streams = connectionFeatures.streams; + delete connectionFeatures.streams; + + for (const streamFeatures of streams) { + database.put(url, clientId, connid, clientFeatures, connectionFeatures, streamFeatures); + } + } + } +}); +workerPool.on(ResponseType.DONE, (body) => { + logger.debug('Handling DONE event with body %j', body); + + processed.inc(); + + storeDump(body.clientId); +}); + +workerPool.on(ResponseType.METRICS, (body) => { + logger.info('Handling METRICS event with body %j', body); + processTime.observe(body.extractDurationMs); + dumpSize.observe(body.dumpFileSizeMb); +}); +workerPool.on(ResponseType.ERROR, (body) => { + // TODO handle requeue of the request, this also requires logic in extract.js + // i.e. we need to catch all potential errors and send back a request with + // the client id. + logger.error('Handling ERROR event with body %j', body); + + errored.inc(); + + // If feature extraction failed at least attempt to store the dump in s3. + if (body.clientId) { + storeDump(body.clientId); + } else { + logger.error('Handling ERROR without a clientId field!'); + } + + // TODO At this point adding a retry mechanism can become detrimental, e.g. + // If there is a error with the dump file structure the error would just requeue ad infinitum, + // a smarter mechanism is required here, with some sort of maximum retry per request and so on. + // if (body.clientId) { + // logger.info('Requeued clientId %s', body.clientId); + // workerPool.addTask({ type: RequestType.PROCESS, body: { clientId: body.clientId } }); + // } +}); + +function setupWorkDirectory() { + try { + if (fs.existsSync(tempPath)) { + fs.readdirSync(tempPath).forEach((fname) => { + try { + logger.debug(`Removing file ${tempPath + '/' + fname}`); + fs.unlinkSync(tempPath + '/' + fname); + } catch (e) { + logger.error(`Error while unlinking file ${fname} - ${e}`); + } + }); + } else { + logger.debug(`Creating working dir ${tempPath}`); + fs.mkdirSync(tempPath); + } + } catch (e) { + logger.error(`Error while accessing working dir ${tempPath} - ${e}`); + // The app is probably in an inconsistent state at this point, throw and stop process. + throw e; + } +} + +function serverHandler(request, response) { + switch (request.url) { + case '/healthcheck': + response.writeHead(200); + response.end(); + break; + default: + response.writeHead(404); + response.end(); + } +} + +/** + * In case one wants to run the server locally, https is required, as browsers normally won't allow non + * secure web sockets on a https domain, so something like the bellow config is required along with a https + * server instead of http. + * + * @param {number} port + */ +function setupHttpsServer(port) { + const options = { + key: fs.readFileSync(config.get('server').keyPath), + cert: fs.readFileSync(config.get('server').certPath), + }; + + const server = https.createServer(options, serverHandler).listen(port); + + return server; +} + +function setupHttpServer(port) { + const server = http.createServer(serverHandler).listen(port); + + return server; +} + +function setupMetricsServer(port) { + const metricsServer = http + .createServer((request, response) => { + switch (request.url) { + case '/metrics': + queueSize.set(workerPool.getTaskQueueSize()); + prom.collectDefaultMetrics(); + response.writeHead(200, { 'Content-Type': prom.contentType }); + response.end(prom.register.metrics()); + break; + default: + response.writeHead(404); + response.end(); + } + }) + .listen(port); + return metricsServer; +} + +function setupWebSocketsServer(server) { + const wss = new WebSocketServer({ server }); + wss.on('connection', (client, upgradeReq) => { + connected.inc(); + let numberOfEvents = 0; + // the url the client is coming from + const referer = upgradeReq.headers['origin'] + upgradeReq.url; + // TODO: check against known/valid urls + + const ua = upgradeReq.headers['user-agent']; + const clientId = uuid.v4(); + let tempStream = fs.createWriteStream(tempPath + '/' + clientId); + tempStream.on('finish', () => { + if (numberOfEvents > 0) { + //q.enqueue(clientid); + workerPool.addTask({ type: RequestType.PROCESS, body: { clientId } }); + } else { + fs.unlink(tempPath + '/' + clientId, () => { + // we're good... + }); + } + }); + + const meta = { + path: upgradeReq.url, + origin: upgradeReq.headers['origin'], + url: referer, + userAgent: ua, + time: Date.now(), + fileFormat: 2, + }; + tempStream.write(JSON.stringify(meta) + '\n'); + + const forwardedFor = upgradeReq.headers['x-forwarded-for']; + if (forwardedFor) { + const forwardedIPs = forwardedFor.split(','); + if (config.server.skipLoadBalancerIp) { + forwardedIPs.pop(); + } + const obfuscatedIPs = forwardedIPs.map((ip) => { + const publicIP = ['publicIP', null, ip.trim()]; + obfuscate(publicIP); + return publicIP[2]; + }); + + const publicIP = ['publicIP', null, obfuscatedIPs, Date.now()]; + tempStream.write(JSON.stringify(publicIP) + '\n'); + } else { + const { remoteAddress } = upgradeReq.connection; + const publicIP = ['publicIP', null, remoteAddress]; + obfuscate(publicIP); + tempStream.write(JSON.stringify(['publicIP', null, [publicIP[2]], Date.now()]) + '\n'); + } + + logger.info('New app connected: ua: <%s>, referer: <%s>, clientid: <%s>', ua, referer, clientId); + + client.on('message', (msg) => { + try { + const data = JSON.parse(msg); + + numberOfEvents++; + + if (data[0].endsWith('OnError')) { + // monkey-patch java/swift sdk bugs. + data[0] = data[0].replace(/OnError$/, 'OnFailure'); + } + switch (data[0]) { + case 'getUserMedia': + case 'getUserMediaOnSuccess': + case 'getUserMediaOnFailure': + case 'navigator.mediaDevices.getUserMedia': + case 'navigator.mediaDevices.getUserMediaOnSuccess': + case 'navigator.mediaDevices.getUserMediaOnFailure': + tempStream.write(JSON.stringify(data) + '\n'); + break; + case 'constraints': + if (data[2].constraintsOptional) { + // workaround for RtcStats.java bug. + data[2].optional = []; + Object.keys(data[2].constraintsOptional).forEach((key) => { + const pair = {}; + pair[key] = data[2].constraintsOptional[key]; + }); + delete data[2].constraintsOptional; + } + tempStream.write(JSON.stringify(data) + '\n'); + break; + default: + if (data[0] === 'getstats' && data[2].values) { + // workaround for RtcStats.java bug. + const { timestamp, values } = data[2]; + data[2] = values; + data[2].timestamp = timestamp; + } + obfuscate(data); + tempStream.write(JSON.stringify(data) + '\n'); + break; + } + } catch (e) { + logger.error('Error while processing: %s - %s', e.message, msg); + } + }); + + client.on('error', (e) => { + logger.error('Websocket error: %s', e); + connection_error.inc(); + }); + + client.on('close', () => { + connected.dec(); + tempStream.write(JSON.stringify(['close', null, null, Date.now()])); + tempStream.end(); + tempStream = null; + }); + }); +} + +function run() { + logger.info('Initializing <%s>, version <%s>, env <%s> ...', appName, appVersion, getEnvName()); + + setupWorkDirectory(); + + if (config.get('server').useHTTPS) { + server = setupHttpsServer(config.get('server').port); + } else { + server = setupHttpServer(config.get('server').port); + } + + if (config.get('server').metrics) { + setupMetricsServer(config.get('server').metrics); + } + + setupWebSocketsServer(server); + + logger.info('Initialization complete.') +} + +/** + * Currently used from test script. + */ +function stop() { + process.exit(); +} + +// For now just log unhandled promise rejections, as the initial code did not take them into account and by default +// node just silently eats them. +process.on('unhandledRejection', (reason) => { + logger.error('Unhandled rejection: %s', reason); +}); + +run(); + +module.exports = { + stop: stop, + // We expose the number of processed items for use in the test script + processed: processed +}; diff --git a/src/database/AmplitudeConnector.js b/src/database/AmplitudeConnector.js new file mode 100644 index 00000000..6fea4870 --- /dev/null +++ b/src/database/AmplitudeConnector.js @@ -0,0 +1,85 @@ +const Amplitude = require('amplitude'); + +const logger = require('../logging'); + +class AmplitudeConnector { + constructor(key, options) { + if (!key) { + throw new Error('[Amplitude] Please provide an amplitude key!'); + } + + this.amplitude = new Amplitude(key, options); + } + + /** + * Extract a subset of features considered to be more relevant. + * + * @param {Object} connectionFeatures + */ + extractRelevantStats(connectionFeatures) { + const filteredFeature = {}; + + // TODO Use object destructuring for a more clean approach. + filteredFeature.lifeTime = connectionFeatures.lifeTime; + filteredFeature.ICEFailure = connectionFeatures.ICEFailure; + filteredFeature.connectionTime = connectionFeatures.connectionTime; + filteredFeature.numberOfRemoteStreams = connectionFeatures.numberOfRemoteStreams; + filteredFeature.sessionDuration = connectionFeatures.sessionDuration; + filteredFeature.numberOfLocalSimulcastStreams = connectionFeatures.numberOfLocalSimulcastStreams; + filteredFeature.bytesTotalSent = connectionFeatures.bytesTotalSent; + filteredFeature.bytesTotalReceived = connectionFeatures.bytesTotalReceived; + filteredFeature.statsMeanRoundTripTime = connectionFeatures.statsMeanRoundTripTime; + filteredFeature.statsMeanReceivingBitrate = connectionFeatures.statsMeanReceivingBitrate; + filteredFeature.statsMeanSendingBitrate = connectionFeatures.statsMeanSendingBitrate; + filteredFeature.firstCandidatePairType = connectionFeatures.firstCandidatePairType; + filteredFeature.bweGoogActualEncBitrateMean = connectionFeatures.bweGoogActualEncBitrateMean; + filteredFeature.bweGoogRetransmitBitrateMean = connectionFeatures.bweGoogRetransmitBitrateMean; + filteredFeature.bweGoogTargetEncBitrateMean = connectionFeatures.bweGoogTargetEncBitrateMean; + filteredFeature.bweGoogTransmitBitrateMean = connectionFeatures.bweGoogTransmitBitrateMean; + filteredFeature.bweAvailableOutgoingBitrateMean = connectionFeatures.bweAvailableOutgoingBitrateMean; + filteredFeature.bweAvailableIncomingBitrateMean = connectionFeatures.bweAvailableIncomingBitrateMean; + + return filteredFeature; + } + + track(rtcstatsFeatures) { + try { + // TODO Add checks for identity info using object destructuring. + + // if (!rtcstatsFeatures.identity.userId || !rtcstatsFeatures.identity.deviceId || !rtcstatsFeatures.identity.sessionId) { + // throw new Error('[Amplitude] userId, deviceId and sessionId must be present, provided values userId: %s, deviceId: %s, sessionId: %s.'); + // } + + const amplitudeEvent = { + event_type: 'rtcstats-publish', + user_id: rtcstatsFeatures.identity.userId, + device_id: rtcstatsFeatures.identity.deviceId, + session_id: rtcstatsFeatures.identity.sessionId, + event_properties: { + rtcstatsIdentity: rtcstatsFeatures.clientId, + displayName : rtcstatsFeatures.identity.displayName, + ...rtcstatsFeatures.identity.hosts, + ...rtcstatsFeatures.identity.deploymentInfo, + ...this.extractRelevantStats(rtcstatsFeatures.connectionFeatures), + }, + }; + + this.amplitude + .track(amplitudeEvent) + .then(() => + logger.info( + '[Amplitude] Sent event: rtcstats clientId: %s, user_id: %s, device_id: %s, session_id: %s', + rtcstatsFeatures.clientId, + amplitudeEvent.user_id, + amplitudeEvent.device_id, + amplitudeEvent.session_id + ) + ) + .catch((error) => logger.error('[Amplitude] track promise failed for event %j error: %s', amplitudeEvent, error.message)); + } catch (error) { + logger.error('[Amplitude] Failed to send rtcstats features %j with error: %s', rtcstatsFeatures, error.message); + } + } +} + +module.exports = AmplitudeConnector; diff --git a/database/bigquery.js b/src/database/bigquery.js similarity index 100% rename from database/bigquery.js rename to src/database/bigquery.js diff --git a/database/redshift-firehose.js b/src/database/redshift-firehose.js similarity index 99% rename from database/redshift-firehose.js rename to src/database/redshift-firehose.js index d39645eb..b31c2b7b 100644 --- a/database/redshift-firehose.js +++ b/src/database/redshift-firehose.js @@ -20,6 +20,7 @@ module.exports = function (config) { firehose = new AWS.Firehose(); } else { logger.warn('No Firehose configuration present. Skipping firehose storage.') + return; } return { diff --git a/src/features/extract.js b/src/features/extract.js new file mode 100644 index 00000000..eae5e648 --- /dev/null +++ b/src/features/extract.js @@ -0,0 +1,306 @@ +const fs = require('fs'); +const { parentPort, workerData, isMainThread } = require('worker_threads'); + +const logger = require('../logging'); +const connectionfeatures = require('./features-connection'); +const clientfeatures = require('./features-client'); +const streamfeatures = require('./features-stream'); +const statsDecompressor = require('../utils//getstats-deltacompression').decompress; +const statsMangler = require('../utils/getstats-mangle'); +const { extractTracks, extractStreams, isProduction, ResponseType, RequestType } = require('../utils/utils'); + +// const canUseProcessSend = !!process.send; + +function capitalize(str) { + return str[0].toUpperCase() + str.substr(1); +} + +function safeFeature(feature) { + if (typeof feature === 'number' && isNaN(feature)) feature = -1; + if (typeof feature === 'number' && !isFinite(feature)) feature = -2; + /* + if (feature === false) feature = 0; + if (feature === true) feature = 1; + */ + + return feature; +} + +// check that the sorter was called as a worker thread +if (!isMainThread) { + logger.info('Running feature extract worker thread: %j', workerData); + // throw new Error("Heavy"); + // Handle parent requests + parentPort.on('message', (request) => { + switch (request.type) { + case RequestType.PROCESS: { + logger.info('Worker is processing request: %j', request); + try { + processDump(request.body.clientId); + } catch (error) { + parentPort.postMessage({ + type: ResponseType.ERROR, + body: { clientId: request.body.clientId, error: error.stack }, + }); + } + break; + } + default: { + logger.warn('Unsupported request: %j', request); + } + } + }); +} else { + const clientid = process.argv[2]; + if (!clientid) { + logger.error('Please provide a valid clientId!'); + return -1; + } + + logger.info(`Running feature extraction on ${clientid}...`); + processDump(clientid); +} + +// dumps all peerconnections. +function dump(url, client) { + // ignore connections that never send getUserMedia or peerconnection events. + if (client.getUserMedia.length === 0 && Object.keys(client.peerConnections).length === 0) return; + if (!isProduction()) { + let total = 0; + Object.keys(client.peerConnections).forEach((id) => { + total += client.peerConnections[id].length; + }); + logger.info('DUMP', client.getUserMedia.length, Object.keys(client.peerConnections).length, total); + return; + } +} + +// Feature generation +function generateFeatures(url, client, clientId) { + // ignore connections that never send getUserMedia or peerconnection events. + if (client.getUserMedia.length === 0 && Object.keys(client.peerConnections).length === 0) return; + + + const identity = client.identity; + // logger.info(JSON.stringify(client.identity)); + // clientFeatures are the same for all peerconnections but are saved together + // with each peerconnection anyway to make correlation easier. + const clientFeatures = {}; + Object.keys(clientfeatures).forEach((fname) => { + let feature = clientfeatures[fname].apply(null, [client]); + if (feature !== undefined) { + if (typeof feature === 'object') { + Object.keys(feature).forEach((subname) => { + feature[subname] = safeFeature(feature[subname]); + logger.debug('PAGE', 'FEATURE', fname + capitalize(subname), '=>', safeFeature(feature[subname])); + + clientFeatures[fname + capitalize(subname)] = feature[subname]; + }); + } else { + feature = safeFeature(feature); + logger.debug('PAGE', 'FEATURE', fname, '=>', feature); + + clientFeatures[fname] = feature; + } + } + }); + + if (Object.keys(client.peerConnections).length === 0) { + // We only have GUM and potentially GUM errors. + parentPort.postMessage({ + type: ResponseType.PROCESSING, + body: { url, clientId, connid: '', identity, clientFeatures }, + }); + } + + logger.debug('Client features: %j', clientFeatures); + + const streamList = []; + + Object.keys(client.peerConnections).forEach((connid) => { + if (connid === 'null' || connid === '') return; // ignore the null connid and empty strings + const conn = client.peerConnections[connid]; + const connectionFeatures = {}; + Object.keys(connectionfeatures).forEach((fname) => { + let feature = connectionfeatures[fname].apply(null, [client, conn]); + if (feature !== undefined) { + if (typeof feature === 'object') { + Object.keys(feature).forEach((subname) => { + feature[subname] = safeFeature(feature[subname]); + logger.debug(connid, 'FEATURE', fname + capitalize(subname), '=>', safeFeature(feature[subname])); + + connectionFeatures[fname + capitalize(subname)] = feature[subname]; + }); + } else { + feature = safeFeature(feature); + logger.debug(connid, 'FEATURE', fname, '=>', safeFeature(feature)); + + connectionFeatures[fname] = feature; + } + } + }); + + const tracks = extractTracks(conn); + const streams = extractStreams(tracks); + + for (const [streamId, tracks] of streams.entries()) { + const streamFeatures = { streamId }; + for (const { trackId, kind, direction, stats } of tracks) { + Object.keys(streamfeatures).forEach((fname) => { + let feature = streamfeatures[fname].apply(null, [{ kind, direction, trackId, stats, peerConnectionLog: conn }]); + if (feature !== undefined) { + feature = safeFeature(feature); + if (typeof feature === 'object') { + Object.keys(feature).forEach((subname) => { + feature[subname] = safeFeature(feature[subname]); + streamFeatures[fname + capitalize(subname)] = feature[subname]; + logger.debug( + connid, + 'STREAM', + streamId, + 'TRACK', + trackId, + 'FEATURE', + fname + capitalize(subname), + '=>', + safeFeature(feature[subname]) + ); + }); + } else { + feature = safeFeature(feature); + streamFeatures[fname] = feature; + logger.debug(connid, 'STREAM', streamId, 'TRACK', trackId, 'FEATURE', fname, '=>', safeFeature(feature)); + } + } + }); + } + + streamList.push(streamFeatures); + } + + connectionFeatures.streams = streamList; + + if (!isMainThread) { + parentPort.postMessage({ + type: ResponseType.PROCESSING, + body: { url, clientId, connid, clientFeatures, identity, connectionFeatures }, + }); + } + + delete client.peerConnections[connid]; // save memory + }); + + if (!isMainThread) { + parentPort.postMessage({ type: ResponseType.DONE, body: { clientId } }); + } +} + +function processDump(clientId) { + let path = clientId; + if (!isMainThread) { + path = 'temp/' + clientId; + } + + const extractStartTime = new Date().getTime(); + const dumpFileStats = fs.statSync(path); + const dumpFileSizeMb = dumpFileStats.size / 1000000.0; + + fs.readFile(path, { encoding: 'utf-8' }, (err, data) => { + try { + if (err) { + throw err; + } + + const baseStats = {}; + const lines = data.split('\n'); + const client = JSON.parse(lines.shift()); + client.peerConnections = {}; + client.getUserMedia = []; + lines.forEach((line) => { + if (line.length) { + const data = JSON.parse(line); + const time = new Date(data.time || data[3]); + delete data.time; + switch (data[0]) { + case 'publicIP': + client.publicIP = data[2]; + break; + case 'userfeedback': // TODO: might be renamed + client.feedback = data[2]; + break; + case 'tags': // experiment variation tags + client.tags = data[2]; + break; + case 'wsconnect': + client.websocketConnectionTime = data[2] >>> 0; + break; + case 'wsconnecterror': + client.websocketError = data[2]; + break; + case 'identity': // identity meta-information when its not possible to feed into RTCPeerConnection. + client.identity = data[2]; + break; + case 'getUserMedia': + case 'getUserMediaOnSuccess': + case 'getUserMediaOnFailure': + case 'navigator.mediaDevices.getUserMedia': + case 'navigator.mediaDevices.getUserMediaOnSuccess': + case 'navigator.mediaDevices.getUserMediaOnFailure': + case 'navigator.getDisplayMedia': + case 'navigator.getDisplayMediaOnSucces': + case 'navigator.mediaDevices.getDisplayMedia': + case 'navigator.mediaDevices.getDisplayMediaOnSuccess': + client.getUserMedia.push({ + time: time, + timestamp: time.getTime(), + type: data[0], + value: data[2], + }); + break; + default: + if (!client.peerConnections[data[1]]) { + client.peerConnections[data[1]] = []; + baseStats[data[1]] = {}; + } + if (data[0] === 'getstats') { + // delta-compressed + data[2] = statsDecompressor(baseStats[data[1]], data[2]); + baseStats[data[1]] = JSON.parse(JSON.stringify(data[2])); + } + if (data[0] === 'getStats' || data[0] === 'getstats') { + data[2] = statsMangler(data[2]); + data[0] = 'getStats'; + } + client.peerConnections[data[1]].push({ + time: time, + timestamp: time.getTime(), + type: data[0], + value: data[2], + }); + break; + } + } + }); + + dump(client.url, client); + generateFeatures(client.url, client, clientId); + const extractDurationMs = new Date().getTime() - extractStartTime; + + if (!isMainThread) { + parentPort.postMessage({ + type: ResponseType.METRICS, + body: { clientId, extractDurationMs, dumpFileSizeMb }, + }); + } + } catch (error) { + if (isMainThread) { + logger.error('%s', error); + } else { + parentPort.postMessage({ + type: ResponseType.ERROR, + body: { clientId, error: error.stack }, + }); + } + } + }); +} diff --git a/features-client.js b/src/features/features-client.js similarity index 99% rename from features-client.js rename to src/features/features-client.js index 4f07346c..3662311c 100644 --- a/features-client.js +++ b/src/features/features-client.js @@ -8,7 +8,7 @@ // The first type of feature is contained in this file. const platform = require('platform'); -const {timeBetween} = require('./utils'); +const {timeBetween} = require('../utils/utils'); module.exports = { origin: function(client) { diff --git a/features-connection.js b/src/features/features-connection.js similarity index 99% rename from features-connection.js rename to src/features/features-connection.js index 306112cb..7feda935 100644 --- a/features-connection.js +++ b/src/features/features-connection.js @@ -7,7 +7,7 @@ // 3) features which are specific to a track. // The second type of feature is contained in this file. -const {capitalize, standardizedMoment, timeBetween, isIceConnected} = require('./utils'); +const {capitalize, standardizedMoment, timeBetween, isIceConnected} = require('../utils/utils'); const SDPUtils = require('sdp'); function getPeerConnectionConfig(peerConnectionLog) { @@ -122,7 +122,7 @@ module.exports = { } }, peerIdentifier: function(client, peerConnectionLog) { - let constraints = getPeerConnectionConstraints(peerConnectionLog); + let constraints = getPeerConnectionConstraints(peerConnectionLog); if (!constraints.optional) return; constraints = constraints.optional; for (let i = 0; i < constraints.length; i++) { @@ -132,7 +132,7 @@ module.exports = { } }, conferenceIdentifier: function(client, peerConnectionLog) { - let constraints = getPeerConnectionConstraints(peerConnectionLog); + let constraints = getPeerConnectionConstraints(peerConnectionLog); if (!constraints.optional) return; constraints = constraints.optional; for (let i = 0; i < constraints.length; i++) { @@ -177,6 +177,8 @@ module.exports = { return lifeTime > 0 ? lifeTime : undefined; }, + // Time in which the connection was in a potential sending state. Calculated + // as the difference between the first setLocalDescription call and the last PC log. sendingDuration: function(client, peerConnectionLog) { let sendingDuration = 0; let prevTime = peerConnectionLog[0].timestamp; @@ -347,7 +349,7 @@ module.exports = { // was the peerconnection created with non-spec SDES? configuredSDES: function(client, peerConnectionLog) { - const constraints = getPeerConnectionConstraints(peerConnectionLog); + const constraints = getPeerConnectionConstraints(peerConnectionLog); return constraints && constraints.mandatory && constraints.mandatory.DtlsSrtpKeyAgreement === false; }, diff --git a/features-stream.js b/src/features/features-stream.js similarity index 99% rename from features-stream.js rename to src/features/features-stream.js index cfcfec24..2632ab6b 100644 --- a/features-stream.js +++ b/src/features/features-stream.js @@ -7,7 +7,7 @@ // 3) features which are specific to a track. // The third type of feature is contained in this file. -const {mode, standardizedMoment} = require('./utils'); +const {mode, standardizedMoment} = require('../utils/utils'); // each feature expects {kind, direction, trackId, stats} as argument. module.exports = { numberOfStats: ({stats}) => stats.length, diff --git a/src/logging.js b/src/logging.js new file mode 100644 index 00000000..131886e8 --- /dev/null +++ b/src/logging.js @@ -0,0 +1,134 @@ +const util = require('util'); +const os = require('os'); +const config = require('config'); +const { threadId } = require('worker_threads'); +const { createLogger, format, transports } = require('winston'); +require('winston-daily-rotate-file'); + +if (!config.get('server').logLevel) { + throw new Error('Please set the logLevel config!'); +} + +const { json, colorize } = format; +const LEVEL = Symbol.for('level'); + +/** + * We use this formatter to get a console.log like logging system + * + * @param {Object} logEntry - info object passed by winston + */ +function splatTransform(logEntry) { + const args = logEntry[Symbol.for('splat')]; + + if (args) { + logEntry.message = util.format(logEntry.message, ...args); + } + return logEntry; +} + +/** + * Formatter that adds additional metadata to the log line. + * @param {Object} logEntry + */ +function metaTransform(logEntry) { + const customMeta = { + timestamp: logEntry.timestamp, + level: logEntry[LEVEL], + PID: process.pid, + TID: threadId, + host: os.hostname(), + }; + + logEntry = { ...customMeta, ...logEntry }; + return logEntry; +} + +// Combine the various custom formatters along with the winston's json to obtain a json like log line. +// This formatter will be used only for file logging as it's json thus more parser friendly in +// case we externalize this somewhere. +const fileLogger = format.combine( + format.timestamp({ format: 'YYYY-MM-DD HH:mm:ss.SSS' }), + format(splatTransform)(), + format(metaTransform)(), + json() +); + +// Winston rolling file common configuration used for both error and and normal logs file transports. +const logFileCommonCfg = { + format: fileLogger, + auditFile: 'logs/app-log-audit.json', + datePattern: 'YYYY-MM-DD', + zippedArchive: true, + maxSize: '100m', + maxFiles: '60d', +}; + +// Error logs along with uncaught exceptions will have their own individual files. +// Normal log rolling file transport configuration based on common cfg. +const appLogTransport = new transports.DailyRotateFile({ + ...logFileCommonCfg, + level: config.get('server').logLevel, + filename: 'logs/app-%DATE%.log', +}); + +// Error log rolling file transport configuration based on common cfg. +const appErrorLogTransport = new transports.DailyRotateFile({ + ...logFileCommonCfg, + level: 'error', + filename: 'logs/app-error-%DATE%.log', +}); + +// Uncaught exception log transport configuration, we remove the custom formatters as it interferes with +// winston's way of logging exceptions. +// Warning! this transports swallows uncaught exceptions, logs and the exits the process with an error, +// uncaught exception handlers might not work. +const appExceptionLogTransportCfg = { ...logFileCommonCfg }; +delete appExceptionLogTransportCfg.format; + +// Log uncaught exceptions in both error log and normal log in case we need to track some particular flow. +const appExceptionLogTransport = new transports.DailyRotateFile({ + ...appExceptionLogTransportCfg, + filename: 'logs/app-error-%DATE%.log', +}); +const appExceptionCommonLogTransport = new transports.DailyRotateFile({ + ...appExceptionLogTransportCfg, + filename: 'logs/app-%DATE%.log', +}); + +// Create actual loggers with specific transports +const logger = createLogger({ + transports: [appLogTransport, appErrorLogTransport], + exceptionHandlers: [appExceptionLogTransport, appExceptionCommonLogTransport], +}); + +// The JSON format is more suitable for production deployments that use the console. +// The alternative is a single line log format that is easier to read, useful for local development. +if (config.get('server').jsonConsoleLog) { + logger.add( + new transports.Console({ + format: fileLogger, + level: config.get('server').logLevel, + handleExceptions: true, + }) + ); +} else { + const consoleLogger = format.combine( + format.timestamp({ format: 'YYYY-MM-DD HH:mm:ss.SSS' }), + colorize(), + format(splatTransform)(), + format(metaTransform)(), + format.printf(({ level, message, timestamp, PID, TID, host }) => `${timestamp} ${PID} ${TID} ${host} ${level}: ${message}`) + ); + + logger.add( + new transports.Console({ + format: consoleLogger, + level: config.get('server').logLevel, + handleExceptions: true, + }) + ); +} + +logger.info('Logger successfully initialized.'); + +module.exports = logger; diff --git a/src/prom-collector.js b/src/prom-collector.js new file mode 100644 index 00000000..06dc2a85 --- /dev/null +++ b/src/prom-collector.js @@ -0,0 +1,75 @@ +// Initialize prometheus metrics. +const getFolderSize = require('get-folder-size'); +const prom = require('prom-client'); + +const logger = require('./logging'); + +const connected = new prom.Gauge({ + name: 'rtcstats_websocket_connections', + help: 'number of open websocket connections', +}); + +const connection_error = new prom.Counter({ + name: 'rtcstats_websocket_connection_error', + help: 'number of open websocket connections that failed with an error', +}); + + +const queueSize = new prom.Gauge({ + name: 'rtcstats_queue_size', + help: 'Number of dumps currently queued for processing', +}); + +const diskQueueSize = new prom.Gauge({ + name: 'rtcstats_disk_queue_size', + help: 'Size occupied on disk by queued dumps', +}); + +const processed = new prom.Counter({ + name: 'rtcstats_files_processed', + help: 'number of files processed', +}); + +const errored = new prom.Counter({ + name: 'rtcstats_files_errored', + help: 'number of files with errors during processing', +}); + +const processTime = new prom.Summary({ + name: 'rtcstats_processing_time', + help: 'Processing time for a request', + maxAgeSeconds: 600, + ageBuckets: 5, + percentiles: [0.1, 0.25, 0.5, 0.75, 0.9], + +}); + +const dumpSize = new prom.Summary({ + name: 'rtcstats_dump_size', + help: 'Size of processed rtcstats dumps', + maxAgeSeconds: 600, + ageBuckets: 5, + percentiles: [0.1, 0.25, 0.5, 0.75, 0.9], +}); + +setInterval(() => { + getFolderSize('temp', (err, size) => { + if (err) { + logger.error('Could not get disk queue dir size %j', err); + return; + } + diskQueueSize.set(size); + }); +},10000); + +module.exports = { + connected, + connection_error, + diskQueueSize, + dumpSize, + errored, + processed, + processTime, + prom, + queueSize +} \ No newline at end of file diff --git a/queries/README.md b/src/queries/README.md similarity index 100% rename from queries/README.md rename to src/queries/README.md diff --git a/queries/lib/graph.js b/src/queries/lib/graph.js similarity index 100% rename from queries/lib/graph.js rename to src/queries/lib/graph.js diff --git a/queries/lib/query.js b/src/queries/lib/query.js similarity index 100% rename from queries/lib/query.js rename to src/queries/lib/query.js diff --git a/queries/queries.js b/src/queries/queries.js similarity index 100% rename from queries/queries.js rename to src/queries/queries.js diff --git a/store/gcp.js b/src/store/gcp.js similarity index 100% rename from store/gcp.js rename to src/store/gcp.js diff --git a/store/s3.js b/src/store/s3.js similarity index 87% rename from store/s3.js rename to src/store/s3.js index dda438f3..25e48057 100644 --- a/store/s3.js +++ b/src/store/s3.js @@ -3,21 +3,29 @@ const fs = require('fs'); const AWS = require('aws-sdk'); +const logger = require('../logging'); + module.exports = function (config) { - AWS.config = config; + + AWS.config.update({region: config.region}); + + if (!config.useIAMAuth) { + AWS.config = config; + } const s3bucket = new AWS.S3({ params: { Bucket: config.bucket } }); + const configured = !!config.bucket; return { put: function (key, filename) { return new Promise((resolve, reject) => { if (!configured) { - console.log('no bucket configured for storage'); + logger.warn('no bucket configured for storage'); return resolve(); // not an error. } fs.readFile(filename, { encoding: 'utf-8' }, (err, data) => { diff --git a/test/client.js b/src/test/client.js similarity index 80% rename from test/client.js rename to src/test/client.js index a8a68080..4ef92813 100644 --- a/test/client.js +++ b/src/test/client.js @@ -1,14 +1,22 @@ var WebSocket = require('ws'); var fs = require('fs'); var config = require('config'); + var server = require('../app'); -var statsCompressor = require('../getstats-deltacompression').compress; +var statsCompressor = require('../utils/getstats-deltacompression').compress; -var data = JSON.parse(fs.readFileSync('test/clienttest.json')); +var data = JSON.parse(fs.readFileSync('src/test/clienttest.json')); var url = data.url; var origin = url.split('/').splice(0, 3).join('/'); var path = url.split('/').splice(3).join('/'); +function checkTestCompletion(server) { + if (server.processed.get().values[0].value === 1) { + server.stop(); + } else { + setTimeout(checkTestCompletion, 1000, server); + } +} // using setTimeout here is bad obviously. This should wait for the server to listen setTimeout(function() { process.env.NODE_TLS_REJECT_UNAUTHORIZED = '0'; // ignore self-signed cert @@ -29,7 +37,7 @@ setTimeout(function() { var evt = events.shift(); if (!evt) { ws.close(); - server.stop(); + checkTestCompletion(server); return; } if (evt.type === 'getStats') { diff --git a/test/clienttest.json b/src/test/clienttest.json similarity index 100% rename from test/clienttest.json rename to src/test/clienttest.json diff --git a/src/utils/WorkerPool.js b/src/utils/WorkerPool.js new file mode 100644 index 00000000..c2ee0c02 --- /dev/null +++ b/src/utils/WorkerPool.js @@ -0,0 +1,139 @@ +const EventEmitter = require('events'); +const { Worker } = require('worker_threads'); +const uuid = require('uuid'); + +const logger = require('../logging'); + +const WorkerStatus = Object.freeze({ + IDLE: 'IDLE', + STOPPED: 'STOPPED', + RUNNING: 'RUNNING', +}); + +/** + * The WorkerPool implementation will attempt to always keep the set number of worker running, that means in case of + * an error or a exit due to something happening inside the worker script, it will spawn a new one. + * However when the processes exits from the main thread or SIGKILL/SIGTERM it will shutdown as expected. + */ +class WorkerPool extends EventEmitter { + constructor(workerScriptPath, poolSize) { + super(); + + this.taskQueue = []; + this.workerPool = []; + this.workerScriptPath = workerScriptPath; + this.poolSize = poolSize; + + for (let i = 0; i < poolSize; ++i) { + this._addWorkerToPool(); + } + } + + _addWorkerToPool() { + const workerMeta = this._createWorker(uuid.v4()); + this.workerPool.push(workerMeta); + this._workerPoolIntrospect(); + + return workerMeta; + } + + _createWorker(workerID) { + const workerInstance = new Worker(this.workerScriptPath, { workerData: { workerID } }); + const workerMeta = { workerID, worker: workerInstance, status: WorkerStatus.IDLE }; + + logger.info('Created worker %j', workerMeta); + + workerInstance.on('message', (message) => { + // logger.info(`Worker message: ${JSON.stringify(message)}`); + this.emit(message.type, message.body); + this._processNextTask(workerMeta); + }); + + // Uncaught error thrown in the worker script, a exit event will follow so we just log the error. + workerInstance.on('error', (error) => { + logger.error('Worker %j with error %o: ', workerMeta, error); + }); + + workerInstance.on('exit', (exitCode) => { + logger.info('Worker %j exited with code %d.', workerMeta, exitCode); + workerMeta.status = WorkerStatus.STOPPED; + + // Remove current worker from pool as it's no longer usable. + this._removeWorkerFromPool(workerMeta); + + // Bring the worker pool back to maximum capacity. When the main thread is trying to exit + // this won't work as the creation is queued via a setTimeout, so an infinite loop shouldn't + // happen. + this._regenerateWorkerToPool(); + }); + + return workerMeta; + } + + _workerPoolIntrospect() { + const workerPoolInfo = this.workerPool.map((workerMeta) => { + return { uuid: workerMeta.workerID, status: workerMeta.status }; + }); + + logger.info('Worker pool introspect: %j ', workerPoolInfo); + } + + _removeWorkerFromPool(worker) { + logger.info('Removing worker from pool: %j', worker); + const workerIndex = this.workerPool.indexOf(worker); + if (workerIndex > -1) { + this.workerPool.splice(workerIndex, 1); + } + this._workerPoolIntrospect(); + } + + _processTask(workerMeta, task) { + logger.info(`Processing task %j, current queue size %d`, task, this.taskQueue.length); + workerMeta.worker.postMessage(task); + workerMeta.status = WorkerStatus.RUNNING; + } + + _processNextTask(workerMeta) { + if (this.taskQueue.length === 0) { + workerMeta.status = WorkerStatus.IDLE; + } else { + this._processTask(workerMeta, this.taskQueue.shift()); + } + } + + _regenerateWorkerToPool() { + // timeout is required here so the regeneration process doesn't enter an infinite loop + // when node.js is attempting to shutdown. + setTimeout(() => { + if (this.workerPool.length < this.poolSize) { + const workerMeta = this._addWorkerToPool(); + this._processNextTask(workerMeta); + } else { + logger.warn('Can not add additional worker, pool is already at max capacity!'); + } + } , 2000); + } + + _getIdleWorkers() { + return this.workerPool.filter((worker) => { + return worker.status === WorkerStatus.IDLE; + }); + } + + addTask(task) { + const idleWorkers = this._getIdleWorkers(); + + if (idleWorkers.length > 0) { + this._processTask(idleWorkers[0], task); + } else { + this.taskQueue.push(task); + logger.info(`There are no IDLE workers queueing, current queue size <${this.taskQueue.length}>`); + } + } + + getTaskQueueSize() { + return this.taskQueue.length; + } +} + +module.exports = WorkerPool; diff --git a/features-v2.sql b/src/utils/features-v2.sql similarity index 100% rename from features-v2.sql rename to src/utils/features-v2.sql diff --git a/fetchdump.js b/src/utils/fetchdump.js similarity index 100% rename from fetchdump.js rename to src/utils/fetchdump.js diff --git a/getstats-deltacompression.js b/src/utils/getstats-deltacompression.js similarity index 100% rename from getstats-deltacompression.js rename to src/utils/getstats-deltacompression.js diff --git a/getstats-mangle.js b/src/utils/getstats-mangle.js similarity index 100% rename from getstats-mangle.js rename to src/utils/getstats-mangle.js diff --git a/obfuscator.js b/src/utils/obfuscator.js similarity index 100% rename from obfuscator.js rename to src/utils/obfuscator.js diff --git a/utils.js b/src/utils/utils.js similarity index 84% rename from utils.js rename to src/utils/utils.js index d338d391..e36b99c7 100644 --- a/utils.js +++ b/src/utils/utils.js @@ -1,4 +1,16 @@ /* feature extraction utils */ +const logger = require('../logging'); + +const exec = require('child_process').exec; + +function getDirSize(pathToDir, callback) { + const child = exec('du -sh /path/to/dir', function(error, stdout, stderr){ + console.log('stderr: ' + stderr); + if (error !== null){ + console.log('exec error: ' + error); + } + }); +} function capitalize(str) { return str[0].toUpperCase() + str.substr(1); @@ -86,7 +98,7 @@ function extractTracks(peerConnectionLog) { tracks.get(key).stats.push(report); } } else if (trackIdentifier !== undefined) { - console.log('NO ONTRACK FOR', trackIdentifier, report.ssrc); + logger.debug('NO ONTRACK FOR', trackIdentifier, report.ssrc); } } }); @@ -127,12 +139,35 @@ function isIceConnected({type, value}) { return type === 'oniceconnectionstatechange' && ['connected', 'completed'].includes(value); } +function getEnvName() { + return process.env.NODE_ENV || 'default'; +} + +function isProduction() { + return getEnvName() === 'production'; +} + +const RequestType = Object.freeze({ + PROCESS: 'PROCESS', +}); + +const ResponseType = Object.freeze({ + PROCESSING: 'PROCESSING', + DONE: 'DONE', + METRICS: 'METRICS', + ERROR: 'ERROR' +}); + module.exports = { capitalize, extractTracks, extractStreams, + getEnvName, isIceConnected, + isProduction, mode, standardizedMoment, timeBetween, + RequestType, + ResponseType }