Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sonar-project.properties
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ sonar.javascript.lcov.reportPaths=coverage/lcov.info

# Path is relative to the sonar-project.properties file. Replace "\" by "/" on Windows.
sonar.sources=src
sonar.exclusions=src/api-docs/**, src/app-server.js, src/helpers/timed-match/match-proc.js, src/helpers/cache/worker.js
sonar.exclusions=src/api-docs/**, src/app-server.js, src/helpers/timed-match/match-proc.js
sonar.tests=tests
sonar.language=js

Expand Down
11 changes: 9 additions & 2 deletions src/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,20 @@ import './db/mongoose.js';
import mongoose from 'mongoose';
import swaggerDocument from './api-docs/swagger-document.js';
import clientApiRouter from './routers/client-api.js';
import TimedMatch from './helpers/timed-match/index.js';
import schema from './aggregator/schema.js';
import { appAuth, resourcesAuth } from './middleware/auth.js';
import { clientLimiter, defaultLimiter } from './middleware/limiter.js';
import { createServer } from './app-server.js';

/**
* Initialize TimedMatch Worker
*/
TimedMatch.initializeWorker();

/**
* Express app instance
*/
const app = express();
app.use(express.json());

Expand All @@ -32,7 +41,6 @@ app.use(clientApiRouter);
/**
* GraphQL Routes
*/

const handler = (req, res, next) =>
createHandler({ schema, context: req })(req, res, next);

Expand All @@ -42,7 +50,6 @@ app.use('/graphql', appAuth, clientLimiter, handler);
/**
* API Docs and Health Check
*/

app.use('/api-docs', resourcesAuth(),
swaggerUi.serve,
swaggerUi.setup(swaggerDocument)
Expand Down
152 changes: 94 additions & 58 deletions src/helpers/timed-match/index.js
Original file line number Diff line number Diff line change
@@ -1,75 +1,99 @@
import cp from 'node:child_process';
import path from 'node:path';
import { Worker } from 'node:worker_threads';
import { fileURLToPath } from 'node:url';

import tryMatch from '../timed-match/match.js';

const __filename = fileURLToPath(import.meta.url);
const __dirname = path.dirname(__filename);

/**
* This class will run a match operation using a child process.
* This class will run a match operation using a Worker Thread.
*
* Workers should be killed given a specified (3000 ms default) time limit.
*
* Blacklist caching is available to prevent sequence of matching failures and resource usage.
*/
export default class TimedMatch {
static _worker = this._createChildProcess();
static _blacklisted = [];
static _maxBlackListed = process.env.REGEX_MAX_BLACKLIST || 50;
static _maxTimeLimit = process.env.REGEX_MAX_TIMEOUT || 3000;
static #worker = undefined;
static #workerActive = false;
static #blacklisted = [];
static #maxBlackListed = process.env.REGEX_MAX_BLACKLIST || 50;
static #maxTimeLimit = process.env.REGEX_MAX_TIMEOUT || 3000;

/**
* Initialize Worker process for working with Regex process operators
*/
static initializeWorker() {
this.#worker = this.#createWorker();
this.#workerActive = true;
}

/**
* Gracefully terminate worker
*/
static terminateWorker() {
this.#worker?.terminate();
this.#workerActive = false;
}

/**
* Run match using child process
* Executes regex matching operation with timeout protection.
*
* If a worker is initialized and active, the operation runs in a separate worker thread
* with timeout protection to prevent runaway regex operations. Uses SharedArrayBuffer
* for synchronous communication between main thread and worker.
*
* If no worker is available, falls back to direct execution on the main thread.
*
* Failed operations (timeouts, errors) are automatically added to a blacklist to
* prevent repeated attempts with the same problematic patterns.
*
* @param {*} values array of regular expression to be evaluated
* @param {*} input to be matched
* @returns match result
*/
static async tryMatch(values, input) {
let result = false;
let timer, resolveListener;

if (this._isBlackListed({ values, input })) {
return false;
static tryMatch(values, input) {
if (this.#worker && this.#workerActive) {
return this.#safeMatch(values, input);
}

const matchPromise = new Promise((resolve) => {
resolveListener = resolve;
this._worker.on('message', resolveListener);
this._worker.send({ values, input });
});

const matchTimer = new Promise((resolve) => {
timer = setTimeout(() => {
this._resetWorker({ values, input });
resolve(false);
}, this._maxTimeLimit);
});

await Promise.race([matchPromise, matchTimer]).then((value) => {
this._worker.off('message', resolveListener);
clearTimeout(timer);
result = value;
});

return result;
}

return tryMatch(values, input);
}

/**
* Clear entries from failed matching operations
* Run match using Node.js Worker Threads API.
*
* @param {*} values array of regular expression to be evaluated
* @param {*} input to be matched
* @returns match result
*/
static clearBlackList() {
this._blacklisted = [];
}

static setMaxBlackListed(value) {
this._maxBlackListed = value;
}
static #safeMatch(values, input) {
if (this.#isBlackListed(values, input)) {
return false;
}

static setMaxTimeLimit(value) {
this._maxTimeLimit = value;
// Create a SharedArrayBuffer for communication
const sharedBuffer = new SharedArrayBuffer(4);
const int32Array = new Int32Array(sharedBuffer);

// Send parameters to worker using postMessage (Worker Threads API)
this.#worker.postMessage({ values, input, sharedBuffer });

// Wait for worker to complete or timeout
const result = Atomics.wait(int32Array, 0, 0, this.#maxTimeLimit);

if (result === 'timed-out') {
this.#resetWorker(values, input);
return false;
}

// Get the actual result from the shared buffer
return Atomics.load(int32Array, 0) === 1;
}

static _isBlackListed({ values, input }) {
const bls = this._blacklisted.filter(bl =>
static #isBlackListed(values, input) {
const bls = this.#blacklisted.filter(bl =>
// input can contain same segment that could fail matching operation
(bl.input.includes(input) || input.includes(bl.input)) &&
// regex order should not affect
Expand All @@ -85,27 +109,39 @@ export default class TimedMatch {
*
* @param {*} param0 list of regex and input
*/
static _resetWorker({ values, input }) {
this._worker.kill();
this._worker = this._createChildProcess();
static #resetWorker(values, input) {
this.#worker.terminate();
this.#worker = this.#createWorker();

if (this._blacklisted.length == this._maxBlackListed) {
this._blacklisted.splice(0, 1);
if (this.#blacklisted.length == this.#maxBlackListed) {
this.#blacklisted.splice(0, 1);
}

this._blacklisted.push({
this.#blacklisted.push({
res: values,
input
});
}

static _createChildProcess() {
const match_proc = cp.fork(`${__dirname}/match-proc.js`, {
stdio: 'ignore'
});
static #createWorker() {
const match_proc = new Worker(`${__dirname}/match-proc.js`);

match_proc.unref();
match_proc.channel.unref();
return match_proc;
}

/**
* Clear entries from failed matching operations
*/
static clearBlackList() {
this.#blacklisted = [];
}

static setMaxBlackListed(value) {
this.#maxBlackListed = value;
}

static setMaxTimeLimit(value) {
this.#maxTimeLimit = value;
}
}
23 changes: 11 additions & 12 deletions src/helpers/timed-match/match-proc.js
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
function tryMatch(values, input) {
let result = false;
for (const value of values) {
if (input.match(value)) {
result = true;
break;
}
}
import { parentPort } from 'node:worker_threads';
import tryMatch from './match.js';

return result;
}
parentPort.on('message', (e) => {
const { values, input, sharedBuffer } = e;
const int32Array = new Int32Array(sharedBuffer);
const result = tryMatch(values, input);

process.on('message', ({ values, input }) => {
process.send(tryMatch(values, input));
// Store result in shared buffer (1 for true, 0 for false)
Atomics.store(int32Array, 0, result ? 1 : 0);

// Notify the main thread that work is complete
Atomics.notify(int32Array, 0);
});
11 changes: 11 additions & 0 deletions src/helpers/timed-match/match.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
export default function tryMatch(values, input) {
let result = false;
for (const value of values) {
if (input.match(value)) {
result = true;
break;
}
}

return result;
}
12 changes: 6 additions & 6 deletions src/models/config-strategy.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ export const OperationsType = Object.freeze({
HAS_ALL: 'HAS_ALL'
});

export async function processOperation(strategy, operation, input, values) {
export function processOperation(strategy, operation, input, values) {
switch(strategy) {
case StrategiesType.NETWORK:
return processNETWORK(operation, input, values);
Expand Down Expand Up @@ -141,16 +141,16 @@ function processDATE(operation, input, values) {
}
}

async function processREGEX(operation, input, values) {
function processREGEX(operation, input, values) {
switch(operation) {
case OperationsType.EXIST:
return await TimedMatch.tryMatch(values, input);
return TimedMatch.tryMatch(values, input);
case OperationsType.NOT_EXIST:
return !(await processREGEX(OperationsType.EXIST, input, values));
return !processREGEX(OperationsType.EXIST, input, values);
case OperationsType.EQUAL:
return await TimedMatch.tryMatch([String.raw`\b${values[0]}\b`], input);
return TimedMatch.tryMatch([String.raw`\b${values[0]}\b`], input);
case OperationsType.NOT_EQUAL:
return !(await TimedMatch.tryMatch([String.raw`\b${values[0]}\b`], input));
return !TimedMatch.tryMatch([String.raw`\b${values[0]}\b`], input);
}
}

Expand Down
10 changes: 5 additions & 5 deletions src/services/criteria.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ export async function evaluateCriteria(config, context, strategyFilter) {
}

// Check strategy
if (!(await checkStrategy(context.entry, environment, response))) {
if (!checkStrategy(context.entry, environment, response)) {
return addMetricsAndReturn(context, config, environment, response);
}

Expand Down Expand Up @@ -84,7 +84,7 @@ function checkFlags(config, environment, response) {
return true;
}

async function checkStrategy(entry, environment, response) {
function checkStrategy(entry, environment, response) {
const { strategies } = response;

if (strategies) {
Expand All @@ -93,7 +93,7 @@ async function checkStrategy(entry, environment, response) {
continue;
}

if (!(await checkStrategyInput(entry, strategy, response))) {
if (!checkStrategyInput(entry, strategy, response)) {
return false;
}
}
Expand All @@ -102,15 +102,15 @@ async function checkStrategy(entry, environment, response) {
return true;
}

async function checkStrategyInput(entry, { strategy, operation, values }, response) {
function checkStrategyInput(entry, { strategy, operation, values }, response) {
if (!entry?.length) {
response.result = false;
response.reason = `Strategy '${strategy}' did not receive any input`;
return false;
}

const strategyEntry = entry.filter(e => e.strategy === strategy);
if (strategyEntry.length == 0 || !(await processOperation(strategy, operation, strategyEntry[0].input, values))) {
if (strategyEntry.length == 0 || !processOperation(strategy, operation, strategyEntry[0].input, values)) {
response.result = false;
response.reason = `Strategy '${strategy}' does not agree`;
return false;
Expand Down
Loading