Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
"aws-sdk": "^2.1004.0",
"axios": "^0.30.0",
"bcrypt": "^5.1.0",
"bull": "^3.16.0",
"bullmq": "5.56.8",
"commander": "^7.1.0",
"connection-string": "^3.1.0",
"dagre": "^0.8.5",
Expand Down Expand Up @@ -86,7 +86,6 @@
"@swc/core": "^1.3.62",
"@swc/jest": "^0.2.26",
"@types/bcrypt": "^3.0.0",
"@types/bull": "^3.14.0",
"@types/dotenv": "^6.1.1",
"@types/jest": "^27.0.3",
"@types/js-cookie": "^2.2.4",
Expand Down
5,978 changes: 2,317 additions & 3,661 deletions pnpm-lock.yaml

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion src/pages/api/v1/builds/[uuid]/deploy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ export default async (req: NextApiRequest, res: NextApiResponse) => {

const buildId = build.id;
const runUUID = nanoid();
await buildService.resolveAndDeployBuildQueue.add({
await buildService.resolveAndDeployBuildQueue.add('resolve-deploy', {
buildId,
runUUID,
});
Expand Down
2 changes: 1 addition & 1 deletion src/pages/api/v1/builds/[uuid]/services/[name]/build.ts
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ export default async (req: NextApiRequest, res: NextApiResponse) => {

const runUUID = nanoid();
const buildService = new BuildService();
await buildService.resolveAndDeployBuildQueue.add({
await buildService.resolveAndDeployBuildQueue.add('resolve-deploy', {
buildId,
githubRepositoryId,
runUUID,
Expand Down
2 changes: 1 addition & 1 deletion src/pages/api/v1/builds/[uuid]/webhooks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ async function invokeWebhooks(req: NextApiRequest, res: NextApiResponse) {
}

const webhookService = new WebhookService();
await webhookService.webhookQueue.add({
await webhookService.webhookQueue.add('webhook', {
buildId,
});
return res.status(200).json({
Expand Down
2 changes: 1 addition & 1 deletion src/pages/api/webhooks/github.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ export default async (req: NextApiRequest, res: NextApiResponse) => {
try {
if (LIFECYCLE_MODE === 'all') BootstrapJobs(services);
const message = stringify({ ...req, ...{ headers: req.headers } });
await services.GithubService.webhookQueue.add({ message });
await services.GithubService.webhookQueue.add('webhook', { message });
res.status(200).end();
} catch (error) {
logger.child({ error }).error(`Github Webhook failure: Error: ${error}`);
Expand Down
77 changes: 62 additions & 15 deletions src/server/jobs/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@

import { IServices } from 'server/services/types';
import rootLogger from '../lib/logger';
import { defaultDb } from 'server/lib/dependencies';
import { defaultDb, redisClient } from 'server/lib/dependencies';
import RedisClient from 'server/lib/redisClient';
import QueueManager from 'server/lib/queueManager';
import { MAX_GITHUB_API_REQUEST, GITHUB_API_REQUEST_INTERVAL, QUEUE_NAMES } from 'shared/config';

let isBootstrapped = false;

Expand All @@ -32,38 +33,84 @@ export default function bootstrapJobs(services: IServices) {
}

logger.info(`Bootstrapping jobs...... Yes`);
services.GithubService.webhookQueue.process(125, services.GithubService.processWebhooks);
services.ActivityStream.commentQueue.process(2, services.ActivityStream.processComments);
const queueManager = QueueManager.getInstance();

queueManager.registerWorker(QUEUE_NAMES.WEBHOOK_PROCESSING, services.GithubService.processWebhooks, {
connection: redisClient.getConnection(),
concurrency: 125,
});

queueManager.registerWorker(QUEUE_NAMES.COMMENT_QUEUE, services.ActivityStream.processComments, {
connection: redisClient.getConnection(),
concurrency: 2,
limiter: {
max: MAX_GITHUB_API_REQUEST,
duration: GITHUB_API_REQUEST_INTERVAL,
},
});

/* Run once per hour */
services.PullRequest.cleanupClosedPRQueue.add(
'cleanup',
{},
{
repeat: {
every: 60000 * 60, // Once an hour
},
}
);
services.PullRequest.cleanupClosedPRQueue.process(services.PullRequest.processCleanupClosedPRs);

queueManager.registerWorker(QUEUE_NAMES.CLEANUP, services.PullRequest.processCleanupClosedPRs, {
connection: redisClient.getConnection(),
concurrency: 1,
});

services.GlobalConfig.setupCacheRefreshJob();
services.PullRequest.cleanupClosedPRQueue.add({}, {});

services.Ingress.ingressManifestQueue.process(1, services.Ingress.createOrUpdateIngressForBuild);
queueManager.registerWorker(QUEUE_NAMES.GLOBAL_CONFIG_CACHE_REFRESH, services.GlobalConfig.processCacheRefresh, {
connection: redisClient.getConnection(),
concurrency: 1,
});

services.Ingress.ingressCleanupQueue.process(1, services.Ingress.ingressCleanupForBuild);
services.PullRequest.cleanupClosedPRQueue.add('cleanup', {}, {});

services.BuildService.deleteQueue.process(20, services.BuildService.processDeleteQueue);
queueManager.registerWorker(QUEUE_NAMES.INGRESS_MANIFEST, services.Ingress.createOrUpdateIngressForBuild, {
connection: redisClient.getConnection(),
concurrency: 1,
});

services.Webhook.webhookQueue.process(10, services.Webhook.processWebhookQueue);
queueManager.registerWorker(QUEUE_NAMES.INGRESS_CLEANUP, services.Ingress.ingressCleanupForBuild, {
connection: redisClient.getConnection(),
concurrency: 1,
});

queueManager.registerWorker(QUEUE_NAMES.DELETE_QUEUE, services.BuildService.processDeleteQueue, {
connection: redisClient.getConnection(),
concurrency: 20,
});

queueManager.registerWorker(QUEUE_NAMES.WEBHOOK_QUEUE, services.Webhook.processWebhookQueue, {
connection: redisClient.getConnection(),
concurrency: 10,
});

queueManager.registerWorker(QUEUE_NAMES.RESOLVE_AND_DEPLOY, services.BuildService.processResolveAndDeployBuildQueue, {
connection: redisClient.getConnection(),
concurrency: 125,
});

services.BuildService.resolveAndDeployBuildQueue.process(
125,
services.BuildService.processResolveAndDeployBuildQueue
);
/**
* The actual build queue
*/
services.BuildService.buildQueue.process(125, services.BuildService.processBuildQueue);
services.GithubService.githubDeploymentQueue.process(125, services.GithubService.processGithubDeployment);
queueManager.registerWorker(QUEUE_NAMES.BUILD_QUEUE, services.BuildService.processBuildQueue, {
connection: redisClient.getConnection(),
concurrency: 125,
});

queueManager.registerWorker(QUEUE_NAMES.GITHUB_DEPLOYMENT, services.GithubService.processGithubDeployment, {
connection: redisClient.getConnection(),
concurrency: 125,
});

defaultDb.services = services;

Expand Down
8 changes: 7 additions & 1 deletion src/server/lib/__mocks__/redisClientMock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,12 @@

jest.mock('ioredis', () => {
class RedisMock {
duplicate = jest.fn(() => new RedisMock());
options = {};
duplicate = jest.fn(() => {
const instance = new RedisMock();
instance.options = {};
return instance;
});
setMaxListeners = jest.fn();
quit = jest.fn().mockResolvedValue(undefined);
connect = jest.fn();
Expand All @@ -26,6 +31,7 @@ jest.mock('ioredis', () => {
hset = jest.fn().mockResolvedValue(1);
expire = jest.fn().mockResolvedValue(1);
hmset = jest.fn().mockResolvedValue('OK');
disconnect = jest.fn();
}
return RedisMock;
});
Expand Down
96 changes: 81 additions & 15 deletions src/server/lib/queueManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,22 @@
* limitations under the License.
*/

import Queue from 'bull';
import type { Queue as BullQueue } from 'bull';
import { Queue, Worker, QueueOptions, WorkerOptions, Processor } from 'bullmq';
import { Redis } from 'ioredis';
import rootLogger from './logger';

const logger = rootLogger.child({
filename: 'lib/queueManager.ts',
});

interface RegisteredQueue {
queue: Queue;
worker?: Worker;
}

export default class QueueManager {
private static instance: QueueManager;
private queues: BullQueue[] = [];
private registeredQueues: RegisteredQueue[] = [];

private constructor() {}

Expand All @@ -35,26 +40,87 @@ export default class QueueManager {
return this.instance;
}

public registerQueue(queueName: string, options: Queue.QueueOptions): BullQueue<any> {
public registerQueue(
queueName: string,
options: {
connection: Redis;
defaultJobOptions?: QueueOptions['defaultJobOptions'];
}
): Queue {
logger.debug(`Registering queue ${queueName}`);
const queue = new Queue(queueName, options);
this.queues.push(queue);

const queue = new Queue(queueName, {
connection: options.connection.duplicate ? options.connection.duplicate() : options.connection,
defaultJobOptions: options.defaultJobOptions,
});

this.registeredQueues.push({ queue });
return queue;
}

public getQueues(): BullQueue[] {
return this.queues;
public registerWorker(
queueName: string,
processor: Processor,
options: {
connection: Redis;
concurrency?: number;
settings?: WorkerOptions['settings'];
limiter?: {
max: number;
duration: number;
};
}
): Worker {
logger.debug(`Registering worker for queue ${queueName}`);

const workerConnection = options.connection.duplicate ? options.connection.duplicate() : options.connection;
// ensure maxRetriesPerRequest is null for workers
if (workerConnection.options) {
workerConnection.options.maxRetriesPerRequest = null;
}

const worker = new Worker(queueName, processor, {
connection: workerConnection,
concurrency: options.concurrency,
settings: options.settings,
limiter: options.limiter,
});

// find queue to associate with worker
const registered = this.registeredQueues.find((r) => r.queue?.name === queueName);
if (registered) {
registered.worker = worker;
} else {
this.registeredQueues.push({ queue: null, worker });
}

return worker;
}

public getQueues(): Queue[] {
return this.registeredQueues.map((r) => r.queue).filter(Boolean);
}

public async emptyAndCloseAllQueues(): Promise<void> {
for (const queue of this.queues) {
logger.debug(`Closing queue: ${queue.name}`);
try {
await queue.close();
} catch (error) {
logger.warn(`⚠️ Error closing queue ${queue.name}:`, error.message);
for (const { queue, worker } of this.registeredQueues) {
if (worker) {
logger.debug(`Closing worker for queue: ${worker.name}`);
try {
await worker.close();
} catch (error) {
logger.warn(`⚠️ Error closing worker for queue ${worker.name}:`, error.message);
}
}

if (queue) {
logger.debug(`Closing queue: ${queue.name}`);
try {
await queue.close();
} catch (error) {
logger.warn(`⚠️ Error closing queue ${queue.name}:`, error.message);
}
}
}
logger.info('✅ All bull queues have been closed successfully.');
logger.info('✅ All queues have been closed successfully.');
}
}
24 changes: 5 additions & 19 deletions src/server/lib/redisClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,25 +83,11 @@ export class RedisClient {
return this.redlock;
}

public getBullCreateClient() {
return (type: string): Redis => {
switch (type) {
case 'client':
return this.redis;
case 'subscriber':
return this.subscriber;
case 'bclient': {
const bclient = this.redis.duplicate();
this.bclients.push(bclient);
return bclient;
}
default: {
const client = this.redis.duplicate();
this.bclients.push(client);
return client;
}
}
};
public getConnection(): Redis {
const connection = this.redis.duplicate();
// BullMQ requires maxRetriesPerRequest to be null for blocking operations
connection.options.maxRetriesPerRequest = null;
return connection;
}

public async close(): Promise<void> {
Expand Down
28 changes: 25 additions & 3 deletions src/server/lib/tests/envVariables.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/

import Database from 'server/database';
import Redis from 'ioredis';
import { EnvironmentVariables } from '../envVariables';
import GlobalConfigService from 'server/services/globalConfig';
import { IServices } from 'server/services/types';
Expand All @@ -26,9 +25,30 @@ jest.mock('server/database');
jest.mock('redlock', () => {
return jest.fn().mockImplementation(() => ({}));
});
jest.mock('bullmq', () => ({
Queue: jest.fn().mockImplementation(() => ({
add: jest.fn(),
close: jest.fn(),
})),
Worker: jest.fn().mockImplementation(() => ({
close: jest.fn(),
})),
}));
jest.mock('ioredis', () => {
return jest.fn().mockImplementation(() => ({
duplicate: jest.fn(() => new Redis()),
const mockRedis = jest.fn().mockImplementation(() => ({
duplicate: jest.fn(() => ({
options: {},
setMaxListeners: jest.fn(),
hgetall: jest.fn().mockResolvedValue({
lifecycleDefaults: JSON.stringify({
defaultUUID: 'dev-0',
defaultPublicUrl: 'dev-0.lifecycle.dev.example.com',
}),
}),
hmset: jest.fn(),
on: jest.fn(),
info: jest.fn().mockResolvedValue('redis_version:6.0.5'),
})),
setMaxListeners: jest.fn(),
hgetall: jest.fn().mockResolvedValue({
lifecycleDefaults: JSON.stringify({
Expand All @@ -39,7 +59,9 @@ jest.mock('ioredis', () => {
hmset: jest.fn(),
on: jest.fn(),
info: jest.fn().mockResolvedValue('redis_version:6.0.5'),
options: {},
}));
return mockRedis;
});

class TestEnvironmentVariables extends EnvironmentVariables {
Expand Down
Loading