From f22292dbe83c6b9274046adabd987f056c3374a6 Mon Sep 17 00:00:00 2001 From: Volodymyr Brazhnyk Date: Fri, 17 May 2024 13:09:10 +0200 Subject: [PATCH 1/5] Refactor Job init logic --- .../common/src/scorekeeper/RegisterHandler.ts | 60 +--- packages/common/src/scorekeeper/jobs/Job.ts | 75 ++++ .../common/src/scorekeeper/jobs/JobConfigs.ts | 327 ++++++------------ .../common/src/scorekeeper/jobs/JobFactory.ts | 100 ------ .../common/src/scorekeeper/jobs/JobRunner.ts | 37 -- .../common/src/scorekeeper/jobs/JobsClass.ts | 87 +---- .../src/scorekeeper/jobs/JobsRunnerFactory.ts | 14 - .../scorekeeper/jobs/MicroserviceJobRunner.ts | 83 ----- .../src/scorekeeper/jobs/MonolithJobRunner.ts | 8 - .../jobs/specificJobs/ClearOfflineJob.ts | 2 +- .../jobs/specificJobs/ReleaseMonitorJob.ts | 2 +- packages/common/src/scorekeeper/jobs/types.ts | 52 +++ .../common/src/scorekeeper/scorekeeper.ts | 44 +-- .../test/scorekeeper/scorekeeper.int.test.ts | 44 ++- 14 files changed, 297 insertions(+), 638 deletions(-) create mode 100644 packages/common/src/scorekeeper/jobs/Job.ts delete mode 100644 packages/common/src/scorekeeper/jobs/JobFactory.ts delete mode 100644 packages/common/src/scorekeeper/jobs/JobRunner.ts delete mode 100644 packages/common/src/scorekeeper/jobs/JobsRunnerFactory.ts delete mode 100644 packages/common/src/scorekeeper/jobs/MicroserviceJobRunner.ts delete mode 100644 packages/common/src/scorekeeper/jobs/MonolithJobRunner.ts create mode 100644 packages/common/src/scorekeeper/jobs/types.ts diff --git a/packages/common/src/scorekeeper/RegisterHandler.ts b/packages/common/src/scorekeeper/RegisterHandler.ts index 4616bff49..bb4ed9ca5 100644 --- a/packages/common/src/scorekeeper/RegisterHandler.ts +++ b/packages/common/src/scorekeeper/RegisterHandler.ts @@ -3,17 +3,8 @@ * * @function RegisterHandler */ -import { - ApiHandler, - ChainData, - Config, - logger, - queries, - ScoreKeeper, -} from "../index"; +import { ApiHandler, ChainData, Config, logger, queries } from "../index"; import { scorekeeperLabel } from "./scorekeeper"; -import { jobStatusEmitter } from "../Events"; -import { Job, JobStatus } from "./jobs/JobsClass"; export const registerAPIHandler = ( handler: ApiHandler, @@ -52,52 +43,3 @@ export const registerAPIHandler = ( logger.info(`New Session Event: ${sessionIndex}`, scorekeeperLabel); }); }; - -export const registerEventEmitterHandler = (scoreKeeper: ScoreKeeper) => { - logger.info(`Registering event emitter handler`, scorekeeperLabel); - jobStatusEmitter.on("jobStarted", (data) => { - // scoreKeeper.updateJobStarted(data); - }); - - jobStatusEmitter.on("jobRunning", (data) => { - // scoreKeeper.updateJobRunning(data); - }); - - jobStatusEmitter.on("jobFinished", (data) => { - // scoreKeeper.updateJobFinished(data); - }); - - jobStatusEmitter.on("jobErrored", (data) => { - // scoreKeeper.updateJobErrored(data); - }); - - jobStatusEmitter.on("jobProgress", (data) => { - // scoreKeeper.updateJobProgress(data); - }); -}; - -export const registerJobStatusEventEmitterHandler = (job: Job) => { - logger.info( - `Registering event emitter handler for job: ${job.getName()}`, - scorekeeperLabel, - ); - jobStatusEmitter.on("jobStarted", (data: JobStatus) => { - job.updateJobStatus(data); - }); - - jobStatusEmitter.on("jobRunning", (data: JobStatus) => { - job.updateJobStatus(data); - }); - - jobStatusEmitter.on("jobFinished", (data: JobStatus) => { - job.updateJobStatus(data); - }); - - jobStatusEmitter.on("jobErrored", (data: JobStatus) => { - job.updateJobStatus(data); - }); - - jobStatusEmitter.on("jobProgress", (data: JobStatus) => { - job.updateJobStatus(data); - }); -}; diff --git a/packages/common/src/scorekeeper/jobs/Job.ts b/packages/common/src/scorekeeper/jobs/Job.ts new file mode 100644 index 000000000..4d4fe3501 --- /dev/null +++ b/packages/common/src/scorekeeper/jobs/Job.ts @@ -0,0 +1,75 @@ +import { startJob } from "./cron/StartCronJobs"; +import logger from "../../logger"; +import { jobStatusEmitter } from "../../Events"; +import { JobStatus, JobConfig, JobRunnerMetadata } from "./types"; + +export class Job { + protected status: JobStatus; + protected jobConfig: JobConfig; + protected jobRunnerMetadata: JobRunnerMetadata; + + // TODO: remove this dependency injection during the next refactoring phases + private startJobFunction: ( + metadata: JobRunnerMetadata, + jobConfig: JobConfig, + ) => Promise; + + static events: string[] = [ + "jobStarted", + "jobRunning", + "jobFinished", + "jobErrored", + "jobProgress", + ]; + + // TODO: startJob as parameter + constructor(jobConfig: JobConfig, jobRunnerMetadata: JobRunnerMetadata) { + this.status = { + name: jobConfig.name, + updated: Date.now(), + status: "Not Running", + }; + this.jobConfig = jobConfig; + this.jobRunnerMetadata = jobRunnerMetadata; + this.startJobFunction = startJob; + } + + private log(message: string) { + logger.info(message, { label: "Job" }); + } + + // TODO: remove events and use db to handle the state + // then we can decouple scorekeeper and gateway + private registerEventHandlers() { + this.log(`Registering event handlers for ${this.jobConfig.name}`); + Job.events.forEach((event) => { + jobStatusEmitter.on(event, (data: JobStatus) => { + this.updateJobStatus(data); + }); + }); + } + + public async run(): Promise { + this.registerEventHandlers(); + this.log(`Starting ${this.getName()}`); + await this.startJobFunction(this.jobRunnerMetadata, this.jobConfig); + } + + public getName(): string { + return this.jobConfig.name; + } + + public updateJobStatus(status: JobStatus) { + if (status.name == this.getName()) { + this.status = { ...this.status, ...status }; + } + } + + public getStatusAsJson(): string { + return JSON.stringify(this.status); + } + + public getStatus(): JobStatus { + return this.status; + } +} diff --git a/packages/common/src/scorekeeper/jobs/JobConfigs.ts b/packages/common/src/scorekeeper/jobs/JobConfigs.ts index 994fb0eab..70f11dfc6 100644 --- a/packages/common/src/scorekeeper/jobs/JobConfigs.ts +++ b/packages/common/src/scorekeeper/jobs/JobConfigs.ts @@ -1,5 +1,4 @@ -import { JobConfig, JobRunnerMetadata, jobsLabel } from "./JobsClass"; -import { Constants } from "../../index"; +import * as Constants from "../../constants"; import { activeValidatorJobWithTiming, blockJobWithTiming, @@ -16,11 +15,11 @@ import { validityJobWithTiming, } from "./specificJobs"; import { mainScorekeeperJob } from "./specificJobs/MainScorekeeperJob"; -import logger from "../../logger"; import { executionJob } from "./specificJobs/ExecutionJob"; import { cancelJob } from "./specificJobs/CancelJob"; import { staleNominationJob } from "./specificJobs/StaleNomination"; import { clearOfflineJob } from "./specificJobs/ClearOfflineJob"; +import { JobConfig } from "./types"; export enum JobNames { ActiveValidator = "ActiveValidatorJob", @@ -43,215 +42,113 @@ export enum JobNames { StaleNomination = "StaleNominationJob", } -export const getJobConfigs = ( - jobRunnerMetadata: JobRunnerMetadata, -): JobConfig[] => { - try { - logger.info(`getting job configs for each job`, jobsLabel); - - const activeValdiatorJobConfig: JobConfig = { - jobKey: "activeValidator", - defaultFrequency: Constants.ACTIVE_VALIDATOR_CRON, - jobFunction: async () => { - await activeValidatorJobWithTiming(jobRunnerMetadata); - }, - name: JobNames.ActiveValidator, - preventOverlap: true, - }; - - const monitorJobConfig: JobConfig = { - jobKey: "monitor", - defaultFrequency: Constants.MONITOR_CRON, - jobFunction: async () => { - await getLatestTaggedRelease(); - }, - name: JobNames.Monitor, - preventOverlap: true, - }; - - const clearOfflineJobConfig: JobConfig = { - jobKey: "clearOffline", - defaultFrequency: Constants.CLEAR_OFFLINE_CRON, - jobFunction: async () => { - await clearOfflineJob(); - }, - name: JobNames.ClearOffline, - preventOverlap: true, - }; - - const validityJobConfig: JobConfig = { - jobKey: "validity", - defaultFrequency: Constants.VALIDITY_CRON, - jobFunction: async () => { - await validityJobWithTiming(jobRunnerMetadata); - }, - name: JobNames.Validity, - preventOverlap: true, - }; - - const scoreJobConfig: JobConfig = { - jobKey: "score", - defaultFrequency: Constants.SCORE_CRON, - jobFunction: async () => { - await scoreJobWithTiming(jobRunnerMetadata); - }, - name: JobNames.Score, - preventOverlap: true, - }; - - const eraStatsJobConfig: JobConfig = { - jobKey: "eraStats", - defaultFrequency: Constants.ERA_STATS_CRON, - jobFunction: async () => { - await eraStatsJobWithTiming(jobRunnerMetadata); - }, - name: JobNames.EraStats, - preventOverlap: true, - }; - - const eraPointsJobConfig: JobConfig = { - jobKey: "eraPoints", - defaultFrequency: Constants.ERA_POINTS_CRON, - jobFunction: async () => { - await eraPointsJobWithTiming(jobRunnerMetadata); - }, - name: JobNames.EraPoints, - preventOverlap: true, - }; - - const inclusionJobConfig: JobConfig = { - jobKey: "inclusion", - defaultFrequency: Constants.INCLUSION_CRON, - jobFunction: async () => { - await inclusionJobWithTiming(jobRunnerMetadata); - }, - name: JobNames.Inclusion, - preventOverlap: true, - }; - - const sessionKeyJobConfig: JobConfig = { - jobKey: "sessionKey", - defaultFrequency: Constants.SESSION_KEY_CRON, - jobFunction: async () => { - await sessionKeyJobWithTiming(jobRunnerMetadata); - }, - name: JobNames.SessionKey, - preventOverlap: true, - }; - - const unclaimedEraJobConfig: JobConfig = { - jobKey: "unclaimedEras", - defaultFrequency: Constants.UNCLAIMED_ERAS_CRON, - jobFunction: async () => { - await unclaimedEraJobWithTiming(jobRunnerMetadata); - }, - name: JobNames.UnclaimedEras, - preventOverlap: true, - }; - - const validatorPrefJobConfig: JobConfig = { - jobKey: "validatorPref", - defaultFrequency: Constants.VALIDATOR_PREF_CRON, - jobFunction: async () => { - await validatorPrefJobWithTiming(jobRunnerMetadata); - }, - name: JobNames.ValidatorPref, - preventOverlap: true, - }; - - const locationStatsJobConfig: JobConfig = { - jobKey: "locationStats", - defaultFrequency: Constants.LOCATION_STATS_CRON, - jobFunction: async () => { - await locationStatsJobWithTiming(jobRunnerMetadata); - }, - name: JobNames.LocationStats, - preventOverlap: true, - }; - - const nominatorJobConfig: JobConfig = { - jobKey: "nominator", - defaultFrequency: Constants.NOMINATOR_CRON, - jobFunction: async () => { - await nominatorJobWithTiming(jobRunnerMetadata); - }, - name: JobNames.Nominator, - preventOverlap: true, - }; - - const blockDataJobConfig: JobConfig = { - jobKey: "block", - defaultFrequency: Constants.BLOCK_CRON, - jobFunction: async () => { - await blockJobWithTiming(jobRunnerMetadata); - }, - name: JobNames.BlockData, - preventOverlap: true, - }; - - const mainScorekeeperJobConfig: JobConfig = { - jobKey: "scorekeeper", - defaultFrequency: Constants.SCOREKEEPER_CRON, - jobFunction: async () => { - await mainScorekeeperJob(jobRunnerMetadata); - }, - name: JobNames.MainScorekeeper, - preventOverlap: true, - }; - - const executionJobConfig: JobConfig = { - jobKey: "execution", - defaultFrequency: Constants.EXECUTION_CRON, - jobFunction: async () => { - await executionJob(jobRunnerMetadata); - }, - name: JobNames.Execution, - preventOverlap: true, - }; - - const cancelJobConfig: JobConfig = { - jobKey: "cancel", - defaultFrequency: Constants.CANCEL_CRON, - jobFunction: async () => { - await cancelJob(jobRunnerMetadata); - }, - name: JobNames.Cancel, - preventOverlap: true, - }; - - const staleNominationJobConfig: JobConfig = { - jobKey: "stale", - defaultFrequency: Constants.STALE_CRON, - jobFunction: async () => { - await staleNominationJob(jobRunnerMetadata); - }, - name: JobNames.StaleNomination, - preventOverlap: true, - }; - - return [ - activeValdiatorJobConfig, - monitorJobConfig, - clearOfflineJobConfig, - validityJobConfig, - scoreJobConfig, - eraStatsJobConfig, - eraPointsJobConfig, - inclusionJobConfig, - sessionKeyJobConfig, - unclaimedEraJobConfig, - validatorPrefJobConfig, - locationStatsJobConfig, - nominatorJobConfig, - blockDataJobConfig, - mainScorekeeperJobConfig, - executionJobConfig, - cancelJobConfig, - staleNominationJobConfig, - ]; - } catch (e) { - logger.error(`Error getting job configs:`, jobsLabel); - logger.error(JSON.stringify(e)); - return []; - } -}; +export const jobConfigs: JobConfig[] = [ + { + jobKey: "activeValidator", + defaultFrequency: Constants.ACTIVE_VALIDATOR_CRON, + jobFunction: activeValidatorJobWithTiming, + name: JobNames.ActiveValidator, + }, + { + jobKey: "monitor", + defaultFrequency: Constants.MONITOR_CRON, + jobFunction: getLatestTaggedRelease, + name: JobNames.Monitor, + }, + { + jobKey: "clearOffline", + defaultFrequency: Constants.CLEAR_OFFLINE_CRON, + jobFunction: clearOfflineJob, + name: JobNames.ClearOffline, + }, + { + jobKey: "validity", + defaultFrequency: Constants.VALIDITY_CRON, + jobFunction: validityJobWithTiming, + name: JobNames.Validity, + }, + { + jobKey: "score", + defaultFrequency: Constants.SCORE_CRON, + jobFunction: scoreJobWithTiming, + name: JobNames.Score, + }, + { + jobKey: "eraStats", + defaultFrequency: Constants.ERA_STATS_CRON, + jobFunction: eraStatsJobWithTiming, + name: JobNames.EraStats, + }, + { + jobKey: "eraPoints", + defaultFrequency: Constants.ERA_POINTS_CRON, + jobFunction: eraPointsJobWithTiming, + name: JobNames.EraPoints, + }, + { + jobKey: "inclusion", + defaultFrequency: Constants.INCLUSION_CRON, + jobFunction: inclusionJobWithTiming, + name: JobNames.Inclusion, + }, + { + jobKey: "sessionKey", + defaultFrequency: Constants.SESSION_KEY_CRON, + jobFunction: sessionKeyJobWithTiming, + name: JobNames.SessionKey, + }, + { + jobKey: "unclaimedEras", + defaultFrequency: Constants.UNCLAIMED_ERAS_CRON, + jobFunction: unclaimedEraJobWithTiming, + name: JobNames.UnclaimedEras, + }, + { + jobKey: "validatorPref", + defaultFrequency: Constants.VALIDATOR_PREF_CRON, + jobFunction: validatorPrefJobWithTiming, + name: JobNames.ValidatorPref, + }, + { + jobKey: "locationStats", + defaultFrequency: Constants.LOCATION_STATS_CRON, + jobFunction: locationStatsJobWithTiming, + name: JobNames.LocationStats, + }, + { + jobKey: "nominator", + defaultFrequency: Constants.NOMINATOR_CRON, + jobFunction: nominatorJobWithTiming, + name: JobNames.Nominator, + }, + { + jobKey: "block", + defaultFrequency: Constants.BLOCK_CRON, + jobFunction: blockJobWithTiming, + name: JobNames.BlockData, + }, + { + jobKey: "scorekeeper", + defaultFrequency: Constants.SCOREKEEPER_CRON, + jobFunction: mainScorekeeperJob, + name: JobNames.MainScorekeeper, + }, + { + jobKey: "execution", + defaultFrequency: Constants.EXECUTION_CRON, + jobFunction: executionJob, + name: JobNames.Execution, + }, + { + jobKey: "cancel", + defaultFrequency: Constants.CANCEL_CRON, + jobFunction: cancelJob, + name: JobNames.Cancel, + }, + { + jobKey: "stale", + defaultFrequency: Constants.STALE_CRON, + jobFunction: staleNominationJob, + name: JobNames.StaleNomination, + }, +]; diff --git a/packages/common/src/scorekeeper/jobs/JobFactory.ts b/packages/common/src/scorekeeper/jobs/JobFactory.ts deleted file mode 100644 index 9b4c69e55..000000000 --- a/packages/common/src/scorekeeper/jobs/JobFactory.ts +++ /dev/null @@ -1,100 +0,0 @@ -import { Job, JobConfig, JobRunnerMetadata, jobsLabel } from "./JobsClass"; -import { - ActiveValidatorJob, - BlockDataJob, - EraPointsJob, - EraStatsJob, - InclusionJob, - LocationStatsJob, - MonitorJob, - NominatorJob, - ScoreJob, - SessionKeyJob, - UnclaimedErasJob, - ValidatorPrefJob, - ValidityJob, -} from "./specificJobs"; -import { ClearOfflineJob } from "./specificJobs/ClearOfflineJob"; -import logger from "../../logger"; -import { MainScorekeeperJob } from "./specificJobs/MainScorekeeperJob"; -import { JobNames } from "./JobConfigs"; -import { ExecutionJob } from "./specificJobs/ExecutionJob"; -import { CancelJob } from "./specificJobs/CancelJob"; -import { StaleNominationJob } from "./specificJobs/StaleNomination"; - -export class JobFactory { - static makeJobs = async ( - jobConfigs: JobConfig[], - jobRunnerMetadata: JobRunnerMetadata, - ): Promise => { - try { - const jobs: Job[] = []; - for (const jobConfig of jobConfigs) { - switch (jobConfig.name) { - case JobNames.ActiveValidator: - jobs.push(new ActiveValidatorJob(jobConfig, jobRunnerMetadata)); - break; - case JobNames.Monitor: - jobs.push(new MonitorJob(jobConfig, jobRunnerMetadata)); - break; - case JobNames.ClearOffline: - jobs.push(new ClearOfflineJob(jobConfig, jobRunnerMetadata)); - break; - case JobNames.Score: - jobs.push(new ScoreJob(jobConfig, jobRunnerMetadata)); - break; - case JobNames.Validity: - jobs.push(new ValidityJob(jobConfig, jobRunnerMetadata)); - break; - case JobNames.EraStats: - jobs.push(new EraStatsJob(jobConfig, jobRunnerMetadata)); - break; - case JobNames.EraPoints: - jobs.push(new EraPointsJob(jobConfig, jobRunnerMetadata)); - break; - case JobNames.LocationStats: - jobs.push(new LocationStatsJob(jobConfig, jobRunnerMetadata)); - break; - case JobNames.UnclaimedEras: - jobs.push(new UnclaimedErasJob(jobConfig, jobRunnerMetadata)); - break; - case JobNames.Inclusion: - jobs.push(new InclusionJob(jobConfig, jobRunnerMetadata)); - break; - case JobNames.SessionKey: - jobs.push(new SessionKeyJob(jobConfig, jobRunnerMetadata)); - break; - case JobNames.ValidatorPref: - jobs.push(new ValidatorPrefJob(jobConfig, jobRunnerMetadata)); - break; - case JobNames.Nominator: - jobs.push(new NominatorJob(jobConfig, jobRunnerMetadata)); - break; - case JobNames.BlockData: - jobs.push(new BlockDataJob(jobConfig, jobRunnerMetadata)); - break; - case JobNames.MainScorekeeper: - jobs.push(new MainScorekeeperJob(jobConfig, jobRunnerMetadata)); - break; - case JobNames.Execution: - jobs.push(new ExecutionJob(jobConfig, jobRunnerMetadata)); - break; - case JobNames.Cancel: - jobs.push(new CancelJob(jobConfig, jobRunnerMetadata)); - break; - case JobNames.StaleNomination: - jobs.push(new StaleNominationJob(jobConfig, jobRunnerMetadata)); - break; - default: - logger.error(`Job not found: ${jobConfig.name}`, jobsLabel); - break; - } - } - return jobs; - } catch (e) { - logger.error(`Error making jobs: ${e}`, jobsLabel); - logger.error(JSON.stringify(e), jobsLabel); - return []; - } - }; -} diff --git a/packages/common/src/scorekeeper/jobs/JobRunner.ts b/packages/common/src/scorekeeper/jobs/JobRunner.ts deleted file mode 100644 index 4701cdf22..000000000 --- a/packages/common/src/scorekeeper/jobs/JobRunner.ts +++ /dev/null @@ -1,37 +0,0 @@ -import { logger } from "../../index"; -import { scorekeeperLabel } from "../scorekeeper"; -import { Job, JobRunnerMetadata, jobsLabel } from "./JobsClass"; -import { JobFactory } from "./JobFactory"; -import { getJobConfigs } from "./JobConfigs"; - -export abstract class JobsRunner { - constructor(protected readonly metadata: JobRunnerMetadata) {} - - abstract _startSpecificJobs(): Promise; - - public startJobs = async (): Promise => { - try { - return await this._startSpecificJobs(); - } catch (e) { - logger.warn(`There was an error running some cron jobs...`, jobsLabel); - logger.error(e); - return []; - } - }; -} - -export const startMonolithJobs = async ( - metadata: JobRunnerMetadata, -): Promise => { - try { - const jobs = await JobFactory.makeJobs(getJobConfigs(metadata), metadata); - for (const job of jobs) { - await job.setupAndStartJob(); - } - return jobs; - } catch (e) { - logger.error(JSON.stringify(e), scorekeeperLabel); - logger.error("Error starting monolith jobs", scorekeeperLabel); - return []; - } -}; diff --git a/packages/common/src/scorekeeper/jobs/JobsClass.ts b/packages/common/src/scorekeeper/jobs/JobsClass.ts index 0ad2a90d9..f30695edb 100644 --- a/packages/common/src/scorekeeper/jobs/JobsClass.ts +++ b/packages/common/src/scorekeeper/jobs/JobsClass.ts @@ -1,84 +1,3 @@ -import { ApiHandler, ChainData, Config, Constraints } from "../../index"; -import MatrixBot from "../../matrix"; -import Nominator from "../../nominator/nominator"; -import { ConfigSchema } from "../../config"; -import { startJob } from "./cron/StartCronJobs"; -import logger from "../../logger"; -import { registerJobStatusEventEmitterHandler } from "../RegisterHandler"; - -export const jobsLabel = { label: "Jobs" }; - -export type JobRunnerMetadata = { - config: Config.ConfigSchema; - chaindata: ChainData; - nominatorGroups: Nominator[]; - nominating: boolean; - // currentEra: number; - bot: MatrixBot; - constraints: Constraints.OTV; - handler: ApiHandler; - currentTargets: { stash?: string; identity?: any }[]; -}; - -export type JobConfig = { - jobKey: keyof ConfigSchema["cron"] | ""; - defaultFrequency: string; - jobFunction: (metadata: JobRunnerMetadata) => Promise; - name: string; - preventOverlap?: boolean; -}; - -export interface JobStatus { - name: string; - updated: number; - enabled?: boolean; - runCount?: number; - status: string; - frequency?: string; - error?: string; - progress?: number; // Progress from 0 to 100 - iteration?: string; // Name of the current iteration -} - -export abstract class Job { - protected _status: JobStatus; - protected _jobConfig: JobConfig; - protected _jobRunnerMetadata: JobRunnerMetadata; - - constructor(jobConfig: JobConfig, jobRunnerMetadata: JobRunnerMetadata) { - this._status = { - name: jobConfig.name, - updated: Date.now(), - status: "Not Running", - }; - this._jobConfig = jobConfig; - this._jobRunnerMetadata = jobRunnerMetadata; - } - - public setupAndStartJob = async (): Promise => { - logger.info( - `Registering Event Emitter for ${this._jobConfig.name}`, - jobsLabel, - ); - registerJobStatusEventEmitterHandler(this); - logger.info(`Starting ${this._jobConfig.name}`, jobsLabel); - await startJob(this._jobRunnerMetadata, this._jobConfig); - }; - - public getName = (): string => { - return this._jobConfig.name; - }; - - public updateJobStatus(status: JobStatus) { - if (status.name == this._jobConfig.name) { - this._status = { ...this._status, ...status }; - } - } - - public getStatusAsJson(): string { - return JSON.stringify(this._status); - } - public getStatus = (): JobStatus => { - return this._status; - }; -} +export { JobConfig, JobRunnerMetadata, JobStatus } from "./types"; +export { Job } from "./Job"; +// TODO: remove this file during the next refactoring step diff --git a/packages/common/src/scorekeeper/jobs/JobsRunnerFactory.ts b/packages/common/src/scorekeeper/jobs/JobsRunnerFactory.ts deleted file mode 100644 index 52912d8af..000000000 --- a/packages/common/src/scorekeeper/jobs/JobsRunnerFactory.ts +++ /dev/null @@ -1,14 +0,0 @@ -import { MicroserviceJobRunner } from "./MicroserviceJobRunner"; -import { MonolithJobRunner } from "./MonolithJobRunner"; -import { JobsRunner } from "./JobRunner"; -import { JobRunnerMetadata } from "./JobsClass"; - -export class JobsRunnerFactory { - static makeJobs = async ( - metadata: JobRunnerMetadata, - ): Promise => { - if (!metadata.config?.redis?.host && metadata.config?.redis?.port) - return new MicroserviceJobRunner(metadata); - else return new MonolithJobRunner(metadata); - }; -} diff --git a/packages/common/src/scorekeeper/jobs/MicroserviceJobRunner.ts b/packages/common/src/scorekeeper/jobs/MicroserviceJobRunner.ts deleted file mode 100644 index 72502b537..000000000 --- a/packages/common/src/scorekeeper/jobs/MicroserviceJobRunner.ts +++ /dev/null @@ -1,83 +0,0 @@ -import { logger } from "../..//index"; - -// import { otvWorker } from "@1kv/worker"; -import { scorekeeperLabel } from "../scorekeeper"; -import { JobsRunner } from "./JobRunner"; -import { Job } from "./JobsClass"; - -export class MicroserviceJobRunner extends JobsRunner { - _startSpecificJobs = async (): Promise => { - const { config, chaindata } = this.metadata; - if (!config?.redis?.host || !config?.redis?.port) { - logger.error( - `No redis config found. Microservice Jobs will not be started.`, - scorekeeperLabel, - ); - return []; - } - try { - // Jobs get run in separate worker - logger.info(`Starting bullmq Queues and Workers....`, scorekeeperLabel); - // const releaseMonitorQueue = - // await otvWorker.queues.createReleaseMonitorQueue( - // config.redis.host, - // config.redis.port, - // ); - // const constraintsQueue = await otvWorker.queues.createConstraintsQueue( - // config.redis.host, - // config.redis.port, - // ); - // const chaindataQueue = await otvWorker.queues.createChainDataQueue( - // config.redis.host, - // config.redis.port, - // ); - // const blockQueue = await otvWorker.queues.createBlockQueue( - // config.redis.host, - // config.redis.port, - // ); - // - // const removeRepeatableJobs = true; - // if (removeRepeatableJobs) { - // logger.info(`remove jobs: ${removeRepeatableJobs}`, scorekeeperLabel); - // // Remove any previous repeatable jobs - // await otvWorker.queues.removeRepeatableJobsFromQueues([ - // releaseMonitorQueue, - // constraintsQueue, - // chaindataQueue, - // blockQueue, - // ]); - // } - // - // const obliterateQueues = false; - // if (obliterateQueues) { - // await otvWorker.queues.obliterateQueues([ - // releaseMonitorQueue, - // constraintsQueue, - // chaindataQueue, - // blockQueue, - // ]); - // } - // - // // Add repeatable jobs to the queues - // // Queues need to have different repeat time intervals - // await otvWorker.queues.addReleaseMonitorJob(releaseMonitorQueue, 60000); - // await otvWorker.queues.addValidityJob(constraintsQueue, 1000001); - // await otvWorker.queues.addScoreJob(constraintsQueue, 100002); - // await otvWorker.queues.addActiveValidatorJob(chaindataQueue, 100003); - // await otvWorker.queues.addEraPointsJob(chaindataQueue, 100006); - // await otvWorker.queues.addEraStatsJob(chaindataQueue, 110008); - // await otvWorker.queues.addInclusionJob(chaindataQueue, 100008); - // await otvWorker.queues.addNominatorJob(chaindataQueue, 100009); - // await otvWorker.queues.addSessionKeyJob(chaindataQueue, 100010); - // await otvWorker.queues.addValidatorPrefJob(chaindataQueue, 100101); - // await otvWorker.queues.addAllBlocks(blockQueue, chaindata); - // TODO update this as queue job - // await startLocationStatsJob(this.config, this.chaindata); - return []; - } catch (e) { - logger.error(JSON.stringify(e), scorekeeperLabel); - logger.error("Error starting microservice jobs", scorekeeperLabel); - return []; - } - }; -} diff --git a/packages/common/src/scorekeeper/jobs/MonolithJobRunner.ts b/packages/common/src/scorekeeper/jobs/MonolithJobRunner.ts deleted file mode 100644 index 61239a9b0..000000000 --- a/packages/common/src/scorekeeper/jobs/MonolithJobRunner.ts +++ /dev/null @@ -1,8 +0,0 @@ -import { JobsRunner, startMonolithJobs } from "./JobRunner"; -import { Job } from "./JobsClass"; - -export class MonolithJobRunner extends JobsRunner { - _startSpecificJobs = async (): Promise => { - return await startMonolithJobs(this.metadata); - }; -} diff --git a/packages/common/src/scorekeeper/jobs/specificJobs/ClearOfflineJob.ts b/packages/common/src/scorekeeper/jobs/specificJobs/ClearOfflineJob.ts index 92bf00c61..c0b252ea0 100644 --- a/packages/common/src/scorekeeper/jobs/specificJobs/ClearOfflineJob.ts +++ b/packages/common/src/scorekeeper/jobs/specificJobs/ClearOfflineJob.ts @@ -9,7 +9,7 @@ export class ClearOfflineJob extends Job { } } -export const clearOfflineJob = async () => { +export const clearOfflineJob = async (jobRunnerMetadata?: JobRunnerMetadata) => { jobStatusEmitter.emit("jobProgress", { name: JobNames.ClearOffline, progress: 0, diff --git a/packages/common/src/scorekeeper/jobs/specificJobs/ReleaseMonitorJob.ts b/packages/common/src/scorekeeper/jobs/specificJobs/ReleaseMonitorJob.ts index 9167c7f42..0276d3a32 100644 --- a/packages/common/src/scorekeeper/jobs/specificJobs/ReleaseMonitorJob.ts +++ b/packages/common/src/scorekeeper/jobs/specificJobs/ReleaseMonitorJob.ts @@ -12,7 +12,7 @@ export class MonitorJob extends Job { } } -export const getLatestTaggedRelease = async () => { +export const getLatestTaggedRelease = async (jobRunnerMetadata?: JobRunnerMetadata) => { try { const start = Date.now(); diff --git a/packages/common/src/scorekeeper/jobs/types.ts b/packages/common/src/scorekeeper/jobs/types.ts new file mode 100644 index 000000000..ed97896fb --- /dev/null +++ b/packages/common/src/scorekeeper/jobs/types.ts @@ -0,0 +1,52 @@ +import { ConfigSchema } from "../../config"; +import { ApiHandler, ChainData, Config, Constraints } from "../../index"; +import MatrixBot from "../../matrix"; +import Nominator from "../../nominator/nominator"; + +export type JobRunnerMetadata = { + config: Config.ConfigSchema; + chaindata: ChainData; + nominatorGroups: Nominator[]; + nominating: boolean; + bot: MatrixBot; + constraints: Constraints.OTV; + handler: ApiHandler; + currentTargets: { stash?: string; identity?: any }[]; +}; + +export type JobConfig = { + jobKey: keyof Config.ConfigSchema["cron"] | ""; + defaultFrequency: string; + jobFunction: (metadata: JobRunnerMetadata) => Promise; + name: string; + preventOverlap?: boolean; +}; + +// JobDefinition abstraction is an intermediate stage of refactoring +// TODO: JobDefinition => JobConfig +export type JobDefinition = { + jobKey: keyof ConfigSchema["cron"] | ""; + defaultFrequency: string; + jobFunction: (metadata: JobRunnerMetadata) => Promise; +}; + +type StatusName = + | "running" + | "finished" + | "errored" + | "started" + | "Not Running"; + +export type JobStatus = { + name: string; + updated: number; + enabled?: boolean; + runCount?: number; + status: StatusName; + frequency?: string; + error?: string; + // Progress from 0 to 100 + progress?: number; + // Name of the current iteration + iteration?: string; +}; diff --git a/packages/common/src/scorekeeper/scorekeeper.ts b/packages/common/src/scorekeeper/scorekeeper.ts index e0f7eeb6f..67aa83a59 100644 --- a/packages/common/src/scorekeeper/scorekeeper.ts +++ b/packages/common/src/scorekeeper/scorekeeper.ts @@ -10,16 +10,13 @@ import { } from "../index"; import Nominator from "../nominator/nominator"; -import { - registerAPIHandler, - registerEventEmitterHandler, -} from "./RegisterHandler"; -import { Job, JobRunnerMetadata, JobStatus } from "./jobs/JobsClass"; -import { JobsRunnerFactory } from "./jobs/JobsRunnerFactory"; +import { registerAPIHandler } from "./RegisterHandler"; +import { Job } from "./jobs/Job"; +import { JobRunnerMetadata, JobStatus } from "./jobs/types"; +import { jobConfigs } from "./jobs/JobConfigs"; import { startRound } from "./Round"; import { NominatorStatus } from "../types"; import { setAllIdentities } from "../utils"; -// import { monitorJob } from "./jobs"; export type NominatorGroup = Config.NominatorConfig[]; @@ -63,7 +60,6 @@ export default class ScoreKeeper { this.upSince = Date.now(); registerAPIHandler(this.handler, this.config, this.chaindata, this.bot); - registerEventEmitterHandler(this); } public getJobsStatusAsJson() { const statuses: Record = {}; @@ -74,25 +70,13 @@ export default class ScoreKeeper { } getAllNominatorBondedAddresses(): string[] { - const bondedAddresses = []; - const nomGroup = this.nominatorGroups; - if (nomGroup) { - for (const nom of nomGroup) { - bondedAddresses.push(nom?.bondedAddress); - } - - return bondedAddresses; - } else { - return []; - } + return this.nominatorGroups + ? this.nominatorGroups.map((nom) => nom?.bondedAddress) + : []; } getAllNominatorStatus(): NominatorStatus[] { - const statuses = []; - for (const nom of this.nominatorGroups) { - statuses.push(nom.getStatus()); - } - return statuses; + return this.nominatorGroups.map((nom) => nom.getStatus()); } getAllNominatorStatusJson() { @@ -235,6 +219,14 @@ export default class ScoreKeeper { return true; } + startJobs(metadata: JobRunnerMetadata) { + this._jobs = jobConfigs.map((config) => { + const job = new Job(config, metadata); + job.run(); + return job; + }); + } + // Begin the main workflow of the scorekeeper async begin(): Promise { try { @@ -294,9 +286,7 @@ export default class ScoreKeeper { currentTargets: this.currentTargets, }; - const jobRunner = await JobsRunnerFactory.makeJobs(metadata); - - this._jobs = await jobRunner.startJobs(); + this.startJobs(metadata); this.isStarted = true; return true; } catch (e) { diff --git a/packages/common/test/scorekeeper/scorekeeper.int.test.ts b/packages/common/test/scorekeeper/scorekeeper.int.test.ts index 787758e6e..f3858d0d9 100644 --- a/packages/common/test/scorekeeper/scorekeeper.int.test.ts +++ b/packages/common/test/scorekeeper/scorekeeper.int.test.ts @@ -1,6 +1,7 @@ import { beforeAll, describe, expect, it } from "vitest"; import { ScoreKeeper } from "../../src"; import { getAndStartScorekeeper } from "../testUtils/scorekeeper"; +import { jobStatusEmitter } from "../../src/Events"; const TIMEOUT_DURATION = 5200000; // 120 seconds describe("Scorekeeper Integration Tests", () => { @@ -24,16 +25,41 @@ describe("Scorekeeper Integration Tests", () => { it( "should start jobs and have their status as started", async () => { - const status = scorekeeper.getJobsStatusAsJson(); - for (const key in status) { - if (status.hasOwnProperty(key)) { - const job = status[key]; - if (job.status !== "started") { - console.error(`Job ${job.name} is not started.`); - } - expect(job.status).toBeDefined(); + const statusJson = scorekeeper.getJobsStatusAsJson(); + const statuses = Object.values(statusJson); + statuses.forEach((job) => { + if (job.status !== "started") { + console.error(`Job ${job.name} is not started.`); } - } + expect(job.status).toBeDefined(); + }); + }, + TIMEOUT_DURATION, + ); + + it( + "should update status of the job on event", + async () => { + const statusesBefore = scorekeeper.getJobsStatusAsJson(); + const firstJob = Object.values(statusesBefore)[0]; + const secondJob = Object.values(statusesBefore)[1]; + + jobStatusEmitter.emit("jobRunning", { + name: firstJob.name, + status: "running", + updated: Date.now(), + }); + + await new Promise((resolve) => setTimeout(resolve, 500)); + + const statusesAfter = scorekeeper.getJobsStatusAsJson(); + const updatedFirstJob = Object.values(statusesAfter)[0]; + const unchangedSecondJob = Object.values(statusesAfter)[1]; + + // Check that the first job's status has been updated + expect(updatedFirstJob.status).toBe("running"); + // Check that the second job's status remains unchanged + expect(unchangedSecondJob.status).toBe(secondJob.status); }, TIMEOUT_DURATION, ); From 465fe86abf0ecc740e5f7a48d937f89d56582089 Mon Sep 17 00:00:00 2001 From: Volodymyr Brazhnyk Date: Fri, 17 May 2024 14:21:34 +0200 Subject: [PATCH 2/5] Add some TODOs --- packages/common/src/scorekeeper/jobs/Job.ts | 6 ++++-- packages/common/src/scorekeeper/jobs/JobConfigs.ts | 2 ++ packages/common/src/scorekeeper/jobs/JobsClass.ts | 2 +- packages/common/src/scorekeeper/jobs/types.ts | 10 +--------- 4 files changed, 8 insertions(+), 12 deletions(-) diff --git a/packages/common/src/scorekeeper/jobs/Job.ts b/packages/common/src/scorekeeper/jobs/Job.ts index 4d4fe3501..85481029c 100644 --- a/packages/common/src/scorekeeper/jobs/Job.ts +++ b/packages/common/src/scorekeeper/jobs/Job.ts @@ -8,12 +8,14 @@ export class Job { protected jobConfig: JobConfig; protected jobRunnerMetadata: JobRunnerMetadata; - // TODO: remove this dependency injection during the next refactoring phases + // TODO: remove this dependency injection private startJobFunction: ( metadata: JobRunnerMetadata, jobConfig: JobConfig, ) => Promise; + // TODO: remove events and use db to handle the state + // then we can decouple scorekeeper and gateway static events: string[] = [ "jobStarted", "jobRunning", @@ -22,7 +24,6 @@ export class Job { "jobProgress", ]; - // TODO: startJob as parameter constructor(jobConfig: JobConfig, jobRunnerMetadata: JobRunnerMetadata) { this.status = { name: jobConfig.name, @@ -55,6 +56,7 @@ export class Job { await this.startJobFunction(this.jobRunnerMetadata, this.jobConfig); } + // TODO: remove this public interface after decoupling with the Gateway public getName(): string { return this.jobConfig.name; } diff --git a/packages/common/src/scorekeeper/jobs/JobConfigs.ts b/packages/common/src/scorekeeper/jobs/JobConfigs.ts index 70f11dfc6..c9791687e 100644 --- a/packages/common/src/scorekeeper/jobs/JobConfigs.ts +++ b/packages/common/src/scorekeeper/jobs/JobConfigs.ts @@ -21,6 +21,8 @@ import { staleNominationJob } from "./specificJobs/StaleNomination"; import { clearOfflineJob } from "./specificJobs/ClearOfflineJob"; import { JobConfig } from "./types"; +// TODO: remove this enum, generate name based on the jobKey instead, +// store job name as a Job Class attribute export enum JobNames { ActiveValidator = "ActiveValidatorJob", Monitor = "MonitorJob", diff --git a/packages/common/src/scorekeeper/jobs/JobsClass.ts b/packages/common/src/scorekeeper/jobs/JobsClass.ts index f30695edb..957e7e8a8 100644 --- a/packages/common/src/scorekeeper/jobs/JobsClass.ts +++ b/packages/common/src/scorekeeper/jobs/JobsClass.ts @@ -1,3 +1,3 @@ -export { JobConfig, JobRunnerMetadata, JobStatus } from "./types"; +export type { JobConfig, JobRunnerMetadata, JobStatus } from "./types"; export { Job } from "./Job"; // TODO: remove this file during the next refactoring step diff --git a/packages/common/src/scorekeeper/jobs/types.ts b/packages/common/src/scorekeeper/jobs/types.ts index ed97896fb..626a383cc 100644 --- a/packages/common/src/scorekeeper/jobs/types.ts +++ b/packages/common/src/scorekeeper/jobs/types.ts @@ -1,4 +1,3 @@ -import { ConfigSchema } from "../../config"; import { ApiHandler, ChainData, Config, Constraints } from "../../index"; import MatrixBot from "../../matrix"; import Nominator from "../../nominator/nominator"; @@ -22,14 +21,7 @@ export type JobConfig = { preventOverlap?: boolean; }; -// JobDefinition abstraction is an intermediate stage of refactoring -// TODO: JobDefinition => JobConfig -export type JobDefinition = { - jobKey: keyof ConfigSchema["cron"] | ""; - defaultFrequency: string; - jobFunction: (metadata: JobRunnerMetadata) => Promise; -}; - +// There is a dependency on status names in scorekeeper-status-ui type StatusName = | "running" | "finished" From 1f90f6790e7b5aa012fdbc3aeebc7ec0e947f2f1 Mon Sep 17 00:00:00 2001 From: Volodymyr Brazhnyk Date: Fri, 17 May 2024 15:35:32 +0200 Subject: [PATCH 3/5] Address lint issues --- .../src/scorekeeper/jobs/specificJobs/ClearOfflineJob.ts | 4 +++- .../src/scorekeeper/jobs/specificJobs/ReleaseMonitorJob.ts | 4 +++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/packages/common/src/scorekeeper/jobs/specificJobs/ClearOfflineJob.ts b/packages/common/src/scorekeeper/jobs/specificJobs/ClearOfflineJob.ts index c0b252ea0..baad30eb2 100644 --- a/packages/common/src/scorekeeper/jobs/specificJobs/ClearOfflineJob.ts +++ b/packages/common/src/scorekeeper/jobs/specificJobs/ClearOfflineJob.ts @@ -9,7 +9,9 @@ export class ClearOfflineJob extends Job { } } -export const clearOfflineJob = async (jobRunnerMetadata?: JobRunnerMetadata) => { +export const clearOfflineJob = async ( + jobRunnerMetadata?: JobRunnerMetadata, +) => { jobStatusEmitter.emit("jobProgress", { name: JobNames.ClearOffline, progress: 0, diff --git a/packages/common/src/scorekeeper/jobs/specificJobs/ReleaseMonitorJob.ts b/packages/common/src/scorekeeper/jobs/specificJobs/ReleaseMonitorJob.ts index 0276d3a32..7eb0db0c2 100644 --- a/packages/common/src/scorekeeper/jobs/specificJobs/ReleaseMonitorJob.ts +++ b/packages/common/src/scorekeeper/jobs/specificJobs/ReleaseMonitorJob.ts @@ -12,7 +12,9 @@ export class MonitorJob extends Job { } } -export const getLatestTaggedRelease = async (jobRunnerMetadata?: JobRunnerMetadata) => { +export const getLatestTaggedRelease = async ( + jobRunnerMetadata?: JobRunnerMetadata, +) => { try { const start = Date.now(); From c6052e297d662d2bd59a0328c92808b2b30b2620 Mon Sep 17 00:00:00 2001 From: Volodymyr Brazhnyk Date: Sun, 19 May 2024 23:20:45 +0200 Subject: [PATCH 4/5] Refactoring stage two --- packages/common/src/config.ts | 1 + .../common/src/scorekeeper/RegisterHandler.ts | 3 +- packages/common/src/scorekeeper/Round.ts | 7 +- packages/common/src/scorekeeper/jobs/Job.ts | 189 +++++++++++++----- .../common/src/scorekeeper/jobs/JobConfigs.ts | 79 ++------ .../common/src/scorekeeper/jobs/JobsClass.ts | 3 - .../src/scorekeeper/jobs/cron/SetupCronJob.ts | 78 -------- .../scorekeeper/jobs/cron/StartCronJobs.ts | 51 ----- .../jobs/specificJobs/ActiveValidatorJob.ts | 15 +- .../jobs/specificJobs/BlockDataJob.ts | 20 +- .../jobs/specificJobs/CancelJob.ts | 9 +- .../jobs/specificJobs/ChainDataJob.ts | 2 +- .../jobs/specificJobs/ClearOfflineJob.ts | 20 +- .../jobs/specificJobs/ConstraintsJob.ts | 46 ++--- .../jobs/specificJobs/EraPointsJob.ts | 24 +-- .../jobs/specificJobs/EraStatsJob.ts | 55 ++--- .../jobs/specificJobs/ExecutionJob.ts | 27 +-- .../jobs/specificJobs/InclusionJob.ts | 36 ++-- .../jobs/specificJobs/LocationStatsJob.ts | 73 +++---- .../jobs/specificJobs/MainScorekeeperJob.ts | 37 ++-- .../jobs/specificJobs/NominatorJob.ts | 26 +-- .../jobs/specificJobs/ReleaseMonitorJob.ts | 20 +- .../jobs/specificJobs/SessionKeyJob.ts | 25 +-- .../jobs/specificJobs/StaleNomination.ts | 9 +- .../jobs/specificJobs/UnclaimedErasJob.ts | 8 +- .../jobs/specificJobs/ValidatorPrefJob.ts | 25 +-- packages/common/src/scorekeeper/jobs/types.ts | 54 +++-- .../common/src/scorekeeper/scorekeeper.ts | 18 +- .../test/scorekeeper/scorekeeper.int.test.ts | 35 +++- packages/common/test/testUtils/scorekeeper.ts | 5 +- packages/scorekeeper-status-ui/src/App.tsx | 26 ++- 31 files changed, 412 insertions(+), 614 deletions(-) delete mode 100644 packages/common/src/scorekeeper/jobs/JobsClass.ts delete mode 100644 packages/common/src/scorekeeper/jobs/cron/SetupCronJob.ts delete mode 100644 packages/common/src/scorekeeper/jobs/cron/StartCronJobs.ts diff --git a/packages/common/src/config.ts b/packages/common/src/config.ts index 015a700e4..b451d6c00 100644 --- a/packages/common/src/config.ts +++ b/packages/common/src/config.ts @@ -45,6 +45,7 @@ export type ConfigSchema = { validity: string; validityEnabled: boolean; execution: string; + // TODO: add executionEnabled? scorekeeper: string; scorekeeperEnabled: boolean; cancel: string; diff --git a/packages/common/src/scorekeeper/RegisterHandler.ts b/packages/common/src/scorekeeper/RegisterHandler.ts index bb4ed9ca5..a87a1a3a5 100644 --- a/packages/common/src/scorekeeper/RegisterHandler.ts +++ b/packages/common/src/scorekeeper/RegisterHandler.ts @@ -4,7 +4,8 @@ * @function RegisterHandler */ import { ApiHandler, ChainData, Config, logger, queries } from "../index"; -import { scorekeeperLabel } from "./scorekeeper"; +// eslint-disable-next-line @typescript-eslint/no-unused-vars +import ScoreKeeper, { scorekeeperLabel } from "./scorekeeper"; export const registerAPIHandler = ( handler: ApiHandler, diff --git a/packages/common/src/scorekeeper/Round.ts b/packages/common/src/scorekeeper/Round.ts index 6238d7342..f44702208 100644 --- a/packages/common/src/scorekeeper/Round.ts +++ b/packages/common/src/scorekeeper/Round.ts @@ -13,7 +13,7 @@ import MatrixBot from "../matrix"; import ApiHandler from "../ApiHandler/ApiHandler"; import Nominator from "../nominator/nominator"; import { jobStatusEmitter } from "../Events"; -import { JobNames } from "./jobs/JobConfigs"; +import { JobEvent, JobKey } from "./jobs/types"; import { NominatorState, NominatorStatus } from "../types"; /// Handles the beginning of a new round. @@ -83,10 +83,9 @@ export const startRound = async ( const isValid = await constraints.checkCandidate(candidate, validators); const progress = Math.floor((index / allCandidates.length) * 100); - jobStatusEmitter.emit("jobProgress", { - name: JobNames.MainScorekeeper, + jobStatusEmitter.emit(JobEvent.Progress, { + name: JobKey.MainScorekeeper, progress, - updated: Date.now(), iteration: `${isValid ? "✅ " : "❌ "} ${candidate.name}`, }); diff --git a/packages/common/src/scorekeeper/jobs/Job.ts b/packages/common/src/scorekeeper/jobs/Job.ts index 85481029c..bad8eae64 100644 --- a/packages/common/src/scorekeeper/jobs/Job.ts +++ b/packages/common/src/scorekeeper/jobs/Job.ts @@ -1,77 +1,166 @@ -import { startJob } from "./cron/StartCronJobs"; import logger from "../../logger"; import { jobStatusEmitter } from "../../Events"; -import { JobStatus, JobConfig, JobRunnerMetadata } from "./types"; +import { JobStatus, JobEvent, JobInfo, JobRunnerMetadata } from "./types"; +import { CronJob } from "cron"; +import { ConfigSchema } from "../../config"; export class Job { - protected status: JobStatus; - protected jobConfig: JobConfig; - protected jobRunnerMetadata: JobRunnerMetadata; + private cronJob: CronJob | null = null; + private status: JobStatus = JobStatus.Initialized; + private name: string; + private jobKey: string; + jobFunction: () => Promise; + private frequency: string; + private preventOverlap: boolean; + private jobRunCount = 0; + private isEnabled: boolean; + private error: string; + private progress: number; + private iteration: string; + private updated: number = Date.now(); - // TODO: remove this dependency injection - private startJobFunction: ( + constructor( + jobKey: string, + jobFunction: (metadata: JobRunnerMetadata) => Promise, + defaultFrequency: string, metadata: JobRunnerMetadata, - jobConfig: JobConfig, - ) => Promise; + preventOverlap = true, + ) { + const config = metadata.config; + this.jobKey = jobKey; + this.name = this.formatJobName(jobKey); + this.jobFunction = async () => await jobFunction(metadata); + this.frequency = this.getFrequency(config, defaultFrequency, jobKey); + this.isEnabled = this.checkJobEnabled(metadata.config, jobKey); + this.preventOverlap = preventOverlap; + this.registerEventHandlers(); + } + + public start(): void { + if (!this.isEnabled) { + this.log(`${this.name} is disabled`); + return; + } + if (this.cronJob) { + this.log(`${this.name} is already initialized`); + return; + } + + this.cronJob = new CronJob(this.frequency, this.executeJob.bind(this)); + this.cronJob.start(); + this.log(`${this.name} started with frequency: ${this.frequency}`); + } - // TODO: remove events and use db to handle the state - // then we can decouple scorekeeper and gateway - static events: string[] = [ - "jobStarted", - "jobRunning", - "jobFinished", - "jobErrored", - "jobProgress", - ]; + public stop(): void { + if (!this.cronJob) { + this.log(`Can't stop ${this.name} that hasn't been started`); + return; + } + this.cronJob.stop(); + this.log(`Job ${this.name} has been stopped`); + this.setStatus(JobStatus.Finished); + } - constructor(jobConfig: JobConfig, jobRunnerMetadata: JobRunnerMetadata) { - this.status = { - name: jobConfig.name, - updated: Date.now(), - status: "Not Running", + public getStatusAsJson(): string { + return JSON.stringify(this.getStatus()); + } + + public getStatus(): JobInfo { + return { + name: this.jobKey, + updated: this.updated, + enabled: this.isEnabled, + runCount: this.jobRunCount, + status: this.status, + frequency: this.frequency, + error: this.error, + progress: this.progress, + iteration: this.iteration, }; - this.jobConfig = jobConfig; - this.jobRunnerMetadata = jobRunnerMetadata; - this.startJobFunction = startJob; + } + + private executeJob(): void { + if (this.preventOverlap && this.status === JobStatus.Started) { + this.log(`${this.name} skipped execution due to overlap`); + return; + } + this.setStatus(JobStatus.Started); + this.jobFunction() + .then(() => { + this.setStatus(JobStatus.Finished); + }) + .catch((error) => { + this.error = error; + this.setStatus(JobStatus.Failed, error); + }) + .finally(() => { + this.jobRunCount++; + // TODO: this behavior is questionable. It overwrites Failed status. + this.setStatus(JobStatus.Finished); + }); } private log(message: string) { logger.info(message, { label: "Job" }); } - // TODO: remove events and use db to handle the state - // then we can decouple scorekeeper and gateway - private registerEventHandlers() { - this.log(`Registering event handlers for ${this.jobConfig.name}`); - Job.events.forEach((event) => { - jobStatusEmitter.on(event, (data: JobStatus) => { - this.updateJobStatus(data); - }); - }); + private formatJobName(jobKey: string): string { + // Ex. activeValidator => ActiveValidatorJob + return jobKey.charAt(0).toUpperCase() + jobKey.slice(1) + "Job"; } - public async run(): Promise { - this.registerEventHandlers(); - this.log(`Starting ${this.getName()}`); - await this.startJobFunction(this.jobRunnerMetadata, this.jobConfig); + private checkJobEnabled(config: ConfigSchema, jobKey: string): boolean { + // Enabled if the job has a defined property, ex. "activeValidatorEnabled" + return ( + config.cron && + (config.cron[ + `${jobKey}Enabled` as keyof typeof config.cron + ] as boolean) !== undefined + ); } - // TODO: remove this public interface after decoupling with the Gateway - public getName(): string { - return this.jobConfig.name; + private getFrequency( + config: ConfigSchema, + defaultFrequency: string, + jobKey: string, + ): string { + // If there is no key in the config, use the default frequency + return config.cron && + (config.cron[jobKey as keyof typeof config.cron] as string) !== undefined + ? config.cron[jobKey as keyof typeof config.cron].toString() + : defaultFrequency; } - public updateJobStatus(status: JobStatus) { - if (status.name == this.getName()) { - this.status = { ...this.status, ...status }; - } + private registerEventHandlers() { + jobStatusEmitter.on(JobEvent.Progress, this.updateJobHandler.bind(this)); + // TODO: check if we need these handlers at all. Maybe only for the chained jobs. + jobStatusEmitter.on(JobEvent.Failed, this.updateJobHandler.bind(this)); + jobStatusEmitter.on(JobEvent.Finished, this.updateJobHandler.bind(this)); } - public getStatusAsJson(): string { - return JSON.stringify(this.status); + private updateJobHandler(info: JobInfo): void { + if (info.name !== this.jobKey) { + return; + } + if (info.error) { + this.error = info.error; + } + if (info.progress !== undefined) { + this.progress = info.progress; + } + if (info.iteration !== undefined) { + this.iteration = info.iteration; + } + if (info.status !== undefined) { + this.setStatus(info.status, info.error); + } } - public getStatus(): JobStatus { - return this.status; + private setStatus(status: JobStatus, error?: string): void { + this.updated = Date.now(); + this.status = status; + this.log( + `${this.name} changed status to: ${this.status}${error ? " with error: " + error : ""}`, + ); } } diff --git a/packages/common/src/scorekeeper/jobs/JobConfigs.ts b/packages/common/src/scorekeeper/jobs/JobConfigs.ts index c9791687e..9c2adf1fb 100644 --- a/packages/common/src/scorekeeper/jobs/JobConfigs.ts +++ b/packages/common/src/scorekeeper/jobs/JobConfigs.ts @@ -19,138 +19,97 @@ import { executionJob } from "./specificJobs/ExecutionJob"; import { cancelJob } from "./specificJobs/CancelJob"; import { staleNominationJob } from "./specificJobs/StaleNomination"; import { clearOfflineJob } from "./specificJobs/ClearOfflineJob"; -import { JobConfig } from "./types"; - -// TODO: remove this enum, generate name based on the jobKey instead, -// store job name as a Job Class attribute -export enum JobNames { - ActiveValidator = "ActiveValidatorJob", - Monitor = "MonitorJob", - ClearOffline = "ClearOfflineJob", - Validity = "ValidityJob", - Score = "ScoreJob", - EraStats = "EraStatsJob", - EraPoints = "EraPointsJob", - Inclusion = "InclusionJob", - SessionKey = "SessionKeyJob", - UnclaimedEras = "UnclaimedErasJob", - ValidatorPref = "ValidatorPrefJob", - LocationStats = "LocationStatsJob", - Nominator = "NominatorJob", - BlockData = "BlockDataJob", - MainScorekeeper = "MainScorekeeperJob", - Execution = "ExecutionJob", - Cancel = "CancelJob", - StaleNomination = "StaleNominationJob", -} +import { JobConfig, JobKey } from "./types"; export const jobConfigs: JobConfig[] = [ { - jobKey: "activeValidator", + jobKey: JobKey.ActiveValidator, defaultFrequency: Constants.ACTIVE_VALIDATOR_CRON, jobFunction: activeValidatorJobWithTiming, - name: JobNames.ActiveValidator, }, { - jobKey: "monitor", + jobKey: JobKey.Monitor, defaultFrequency: Constants.MONITOR_CRON, jobFunction: getLatestTaggedRelease, - name: JobNames.Monitor, }, { - jobKey: "clearOffline", + jobKey: JobKey.ClearOffline, defaultFrequency: Constants.CLEAR_OFFLINE_CRON, jobFunction: clearOfflineJob, - name: JobNames.ClearOffline, }, { - jobKey: "validity", + jobKey: JobKey.Validity, defaultFrequency: Constants.VALIDITY_CRON, jobFunction: validityJobWithTiming, - name: JobNames.Validity, }, { - jobKey: "score", + jobKey: JobKey.Score, defaultFrequency: Constants.SCORE_CRON, jobFunction: scoreJobWithTiming, - name: JobNames.Score, }, { - jobKey: "eraStats", + jobKey: JobKey.EraStats, defaultFrequency: Constants.ERA_STATS_CRON, jobFunction: eraStatsJobWithTiming, - name: JobNames.EraStats, }, { - jobKey: "eraPoints", + jobKey: JobKey.EraPoints, defaultFrequency: Constants.ERA_POINTS_CRON, jobFunction: eraPointsJobWithTiming, - name: JobNames.EraPoints, }, { - jobKey: "inclusion", + jobKey: JobKey.EraPoints, defaultFrequency: Constants.INCLUSION_CRON, jobFunction: inclusionJobWithTiming, - name: JobNames.Inclusion, }, { - jobKey: "sessionKey", + jobKey: JobKey.SessionKey, defaultFrequency: Constants.SESSION_KEY_CRON, jobFunction: sessionKeyJobWithTiming, - name: JobNames.SessionKey, }, { - jobKey: "unclaimedEras", + jobKey: JobKey.UnclaimedEras, defaultFrequency: Constants.UNCLAIMED_ERAS_CRON, jobFunction: unclaimedEraJobWithTiming, - name: JobNames.UnclaimedEras, }, { - jobKey: "validatorPref", + jobKey: JobKey.ValidatorPref, defaultFrequency: Constants.VALIDATOR_PREF_CRON, jobFunction: validatorPrefJobWithTiming, - name: JobNames.ValidatorPref, }, { - jobKey: "locationStats", + jobKey: JobKey.LocationStats, defaultFrequency: Constants.LOCATION_STATS_CRON, jobFunction: locationStatsJobWithTiming, - name: JobNames.LocationStats, }, { - jobKey: "nominator", + jobKey: JobKey.Nominator, defaultFrequency: Constants.NOMINATOR_CRON, jobFunction: nominatorJobWithTiming, - name: JobNames.Nominator, }, { - jobKey: "block", + jobKey: JobKey.BlockData, defaultFrequency: Constants.BLOCK_CRON, jobFunction: blockJobWithTiming, - name: JobNames.BlockData, }, { - jobKey: "scorekeeper", + jobKey: JobKey.MainScorekeeper, defaultFrequency: Constants.SCOREKEEPER_CRON, jobFunction: mainScorekeeperJob, - name: JobNames.MainScorekeeper, }, { - jobKey: "execution", + jobKey: JobKey.Execution, defaultFrequency: Constants.EXECUTION_CRON, jobFunction: executionJob, - name: JobNames.Execution, }, { - jobKey: "cancel", + jobKey: JobKey.Cancel, defaultFrequency: Constants.CANCEL_CRON, jobFunction: cancelJob, - name: JobNames.Cancel, }, { - jobKey: "stale", + jobKey: JobKey.StaleNomination, defaultFrequency: Constants.STALE_CRON, jobFunction: staleNominationJob, - name: JobNames.StaleNomination, }, ]; diff --git a/packages/common/src/scorekeeper/jobs/JobsClass.ts b/packages/common/src/scorekeeper/jobs/JobsClass.ts deleted file mode 100644 index 957e7e8a8..000000000 --- a/packages/common/src/scorekeeper/jobs/JobsClass.ts +++ /dev/null @@ -1,3 +0,0 @@ -export type { JobConfig, JobRunnerMetadata, JobStatus } from "./types"; -export { Job } from "./Job"; -// TODO: remove this file during the next refactoring step diff --git a/packages/common/src/scorekeeper/jobs/cron/SetupCronJob.ts b/packages/common/src/scorekeeper/jobs/cron/SetupCronJob.ts deleted file mode 100644 index 3ca4d564d..000000000 --- a/packages/common/src/scorekeeper/jobs/cron/SetupCronJob.ts +++ /dev/null @@ -1,78 +0,0 @@ -import { CronJob } from "cron"; -import { logger } from "../../../index"; -import { jobStatusEmitter } from "../../../Events"; -import { JobStatus } from "../JobsClass"; - -type JobFunction = () => Promise | void; -export const setupCronJob = async ( - enabled: boolean, // Whether the cron job is enabled - configFrequency: string | undefined, // Frequency from config - defaultFrequency: string, // Default frequency - jobFunction: JobFunction, // Job function to execute - name: string, // Description for logging - loggerLabel: { label: string }, // Optional logging label - preventOverlap = false, // Optional flag to prevent overlapping executions -): Promise => { - if (!enabled) { - logger.warn(`${name} is disabled.`, loggerLabel); - return; - } - - let jobRunCount = 0; - const frequency = configFrequency || defaultFrequency; - logger.info(`Starting ${name} with frequency ${frequency}`, loggerLabel); - const startedStatus: JobStatus = { - status: "started", - frequency: frequency, - name: name, - runCount: jobRunCount, - updated: Date.now(), - }; - jobStatusEmitter.emit("jobStarted", startedStatus); - - let isRunning = false; - - const cron = new CronJob(frequency, async () => { - if (preventOverlap && isRunning) { - logger.info(`Skipped ${name} execution due to overlap.`, loggerLabel); - return; - } - - isRunning = true; - logger.info(`Executing ${name}.`, loggerLabel); - const runningStatus: JobStatus = { - status: "running", - name: name, - runCount: jobRunCount, - updated: Date.now(), - }; - jobStatusEmitter.emit("jobRunning", runningStatus); - - try { - await jobFunction(); - } catch (e) { - logger.error(`Error executing ${name}: ${e}`, loggerLabel); - const errorStatus: JobStatus = { - status: "errored", - name: name, - runCount: jobRunCount, - updated: Date.now(), - error: JSON.stringify(e), - }; - - jobStatusEmitter.emit("jobErrored", errorStatus); - } finally { - isRunning = false; - jobRunCount++; - const finishedStatus: JobStatus = { - status: "finished", - name: name, - runCount: jobRunCount, - updated: Date.now(), - }; - jobStatusEmitter.emit("jobFinished", finishedStatus); - } - }); - - cron.start(); -}; diff --git a/packages/common/src/scorekeeper/jobs/cron/StartCronJobs.ts b/packages/common/src/scorekeeper/jobs/cron/StartCronJobs.ts deleted file mode 100644 index c76e0d7e4..000000000 --- a/packages/common/src/scorekeeper/jobs/cron/StartCronJobs.ts +++ /dev/null @@ -1,51 +0,0 @@ -import { logger } from "../../..//index"; -import { setupCronJob } from "./SetupCronJob"; -import { JobConfig, JobRunnerMetadata } from "../JobsClass"; - -// Functions for starting the cron jobs - -export const cronLabel = { label: "Cron" }; - -export const startJob = async ( - metadata: JobRunnerMetadata, - jobConfig: JobConfig, -) => { - const { config } = metadata; - const { - jobKey, // Use jobKey instead of separate scheduleKey and enabledKey - jobFunction, - name, - preventOverlap = true, - defaultFrequency, - } = jobConfig; - - // Construct enabledKey by appending "Enabled" to jobKey - const enabledKey = `${jobKey}Enabled`; - - // Ensure the keys exist in config.cron before accessing them - const isEnabled = - config.cron && - (config.cron[enabledKey as keyof typeof config.cron] as boolean) !== - undefined; - - const frequency = - config.cron && - (config.cron[jobKey as keyof typeof config.cron] as string) !== undefined - ? config.cron[jobKey as keyof typeof config.cron].toString() - : defaultFrequency; - - if (!isEnabled) { - logger.warn(`${name} is disabled`, cronLabel); - return; - } - - await setupCronJob( - true, - frequency, - defaultFrequency, - () => jobFunction(metadata), - name, - cronLabel, - preventOverlap, - ); -}; diff --git a/packages/common/src/scorekeeper/jobs/specificJobs/ActiveValidatorJob.ts b/packages/common/src/scorekeeper/jobs/specificJobs/ActiveValidatorJob.ts index 1b045b696..74106c40e 100644 --- a/packages/common/src/scorekeeper/jobs/specificJobs/ActiveValidatorJob.ts +++ b/packages/common/src/scorekeeper/jobs/specificJobs/ActiveValidatorJob.ts @@ -1,17 +1,11 @@ import { ChainData, logger, Models, queries } from "../../../index"; -import { Job, JobConfig, JobRunnerMetadata } from "../JobsClass"; +import { JobEvent, JobRunnerMetadata } from "../types"; import { jobStatusEmitter } from "../../../Events"; import { withExecutionTimeLogging } from "../../../utils"; -import { JobNames } from "../JobConfigs"; +import { JobKey } from "../types"; export const activeLabel = { label: "ActiveValidatorJob" }; -export class ActiveValidatorJob extends Job { - constructor(jobConfig: JobConfig, jobRunnerMetadata: JobRunnerMetadata) { - super(jobConfig, jobRunnerMetadata); - } -} - export const individualActiveValidatorJob = async ( chaindata: ChainData, candidate: Models.Candidate, @@ -55,10 +49,9 @@ export const activeValidatorJob = async ( const progress = (processedCandidates / totalCandidates) * 100; // Emit progress update with candidate name as the iteration - jobStatusEmitter.emit("jobProgress", { - name: JobNames.ActiveValidator, + jobStatusEmitter.emit(JobEvent.Progress, { + key: JobKey.ActiveValidator, progress, - updated: Date.now(), iteration: `${isActive ? "✅ " : "❌ "} ${candidate.name}`, }); } diff --git a/packages/common/src/scorekeeper/jobs/specificJobs/BlockDataJob.ts b/packages/common/src/scorekeeper/jobs/specificJobs/BlockDataJob.ts index 3d8536eab..b626119c5 100644 --- a/packages/common/src/scorekeeper/jobs/specificJobs/BlockDataJob.ts +++ b/packages/common/src/scorekeeper/jobs/specificJobs/BlockDataJob.ts @@ -1,22 +1,16 @@ import { ChainData, logger, queries } from "../../../index"; import { CoinGeckoClient } from "coingecko-api-v3"; -import { Job, JobConfig, JobRunnerMetadata } from "../JobsClass"; +import { JobEvent, JobRunnerMetadata } from "../types"; import { jobStatusEmitter } from "../../../Events"; import { formatDateFromUnix, withExecutionTimeLogging } from "../../../utils"; import { ApiDecoration } from "@polkadot/api/types"; import { Block, EventRecord, Phase } from "@polkadot/types/interfaces"; import type { FrameSystemEventRecord } from "@polkadot/types/lookup"; import { Exposure } from "../../../chaindata/queries/ValidatorPref"; -import { JobNames } from "../JobConfigs"; +import { JobKey } from "../types"; export const blockdataLabel = { label: "Block" }; -export class BlockDataJob extends Job { - constructor(jobConfig: JobConfig, jobRunnerMetadata: JobRunnerMetadata) { - super(jobConfig, jobRunnerMetadata); - } -} - export const blockJob = async ( metadata: JobRunnerMetadata, ): Promise => { @@ -65,10 +59,9 @@ export const blockJob = async ( blockdataLabel, ); } - jobStatusEmitter.emit("jobProgress", { - name: JobNames.BlockData, + jobStatusEmitter.emit(JobEvent.Progress, { + name: JobKey.BlockData, progress: (latestCount / latestTotal) * 100, - updated: Date.now(), iteration: `Block processed: #${i}`, }); } @@ -104,10 +97,9 @@ export const blockJob = async ( blockdataLabel, ); } - jobStatusEmitter.emit("jobProgress", { - name: JobNames.BlockData, + jobStatusEmitter.emit(JobEvent.Progress, { + name: JobKey.BlockData, progress: (earliestCount / earliestTotal) * 100, - updated: Date.now(), iteration: `Block processed: #${i}`, }); } diff --git a/packages/common/src/scorekeeper/jobs/specificJobs/CancelJob.ts b/packages/common/src/scorekeeper/jobs/specificJobs/CancelJob.ts index 69b145873..1992a53c1 100644 --- a/packages/common/src/scorekeeper/jobs/specificJobs/CancelJob.ts +++ b/packages/common/src/scorekeeper/jobs/specificJobs/CancelJob.ts @@ -1,13 +1,8 @@ -import { Job, JobConfig, JobRunnerMetadata } from "../JobsClass"; +import { JobRunnerMetadata } from "../types"; import logger from "../../../logger"; import { Util } from "../../../index"; -import { cronLabel } from "../cron/StartCronJobs"; -export class CancelJob extends Job { - constructor(jobConfig: JobConfig, jobRunnerMetadata: JobRunnerMetadata) { - super(jobConfig, jobRunnerMetadata); - } -} +export const cronLabel = { label: "Cron" }; export const cancelJob = async ( metadata: JobRunnerMetadata, diff --git a/packages/common/src/scorekeeper/jobs/specificJobs/ChainDataJob.ts b/packages/common/src/scorekeeper/jobs/specificJobs/ChainDataJob.ts index 02bf10b06..223b51458 100644 --- a/packages/common/src/scorekeeper/jobs/specificJobs/ChainDataJob.ts +++ b/packages/common/src/scorekeeper/jobs/specificJobs/ChainDataJob.ts @@ -14,7 +14,7 @@ import { SESSION_KEY_JOB, VALIDATOR_PREF_JOB, } from "./index"; -import { JobRunnerMetadata } from "../JobsClass"; +import { JobRunnerMetadata } from "../types"; export const chaindataLabel = { label: "Chaindata" }; diff --git a/packages/common/src/scorekeeper/jobs/specificJobs/ClearOfflineJob.ts b/packages/common/src/scorekeeper/jobs/specificJobs/ClearOfflineJob.ts index baad30eb2..75d8ddf0e 100644 --- a/packages/common/src/scorekeeper/jobs/specificJobs/ClearOfflineJob.ts +++ b/packages/common/src/scorekeeper/jobs/specificJobs/ClearOfflineJob.ts @@ -1,26 +1,18 @@ -import { Job, JobConfig, JobRunnerMetadata } from "../JobsClass"; +import { JobEvent, JobRunnerMetadata } from "../types"; import { jobStatusEmitter } from "../../../Events"; -import { JobNames } from "../JobConfigs"; +import { JobKey } from "../types"; import { queries } from "../../../index"; -export class ClearOfflineJob extends Job { - constructor(jobConfig: JobConfig, jobRunnerMetadata: JobRunnerMetadata) { - super(jobConfig, jobRunnerMetadata); - } -} - export const clearOfflineJob = async ( jobRunnerMetadata?: JobRunnerMetadata, ) => { - jobStatusEmitter.emit("jobProgress", { - name: JobNames.ClearOffline, + jobStatusEmitter.emit(JobEvent.Progress, { + name: JobKey.ClearOffline, progress: 0, - updated: Date.now(), }); await queries.clearAccumulated(); - jobStatusEmitter.emit("jobProgress", { - name: JobNames.ClearOffline, + jobStatusEmitter.emit(JobEvent.Progress, { + name: JobKey.ClearOffline, progress: 100, - updated: Date.now(), }); }; diff --git a/packages/common/src/scorekeeper/jobs/specificJobs/ConstraintsJob.ts b/packages/common/src/scorekeeper/jobs/specificJobs/ConstraintsJob.ts index 4677eed01..f40f1a836 100644 --- a/packages/common/src/scorekeeper/jobs/specificJobs/ConstraintsJob.ts +++ b/packages/common/src/scorekeeper/jobs/specificJobs/ConstraintsJob.ts @@ -1,5 +1,5 @@ import { SCORE_JOB, VALIDITY_JOB } from "./index"; -import { Job, JobConfig, JobRunnerMetadata, JobStatus } from "../JobsClass"; +import { JobEvent, JobInfo, JobRunnerMetadata, JobStatus } from "../types"; import { allCandidates, getLatestValidatorScoreMetadata, @@ -12,23 +12,10 @@ import { } from "../../../utils"; import { jobStatusEmitter } from "../../../Events"; import logger from "../../../logger"; -import { Constraints, queries } from "../../../index"; -import { JobNames } from "../JobConfigs"; +import { JobKey } from "../types"; export const constraintsLabel = { label: "ConstraintsJob" }; -export class ValidityJob extends Job { - constructor(jobConfig: JobConfig, jobRunnerMetadata: JobRunnerMetadata) { - super(jobConfig, jobRunnerMetadata); - } -} - -export class ScoreJob extends Job { - constructor(jobConfig: JobConfig, jobRunnerMetadata: JobRunnerMetadata) { - super(jobConfig, jobRunnerMetadata); - } -} - export const validityJob = async ( metadata: JobRunnerMetadata, ): Promise => { @@ -51,10 +38,9 @@ export const validityJob = async ( const progress = ((index + 1) / totalCandidates) * 100; // Emit progress update with candidate name in the iteration - jobStatusEmitter.emit("jobProgress", { - name: JobNames.Validity, + jobStatusEmitter.emit(JobEvent.Progress, { + name: JobKey.Validity, progress, - updated: Date.now(), iteration: `${isValid ? "✅ " : "❌ "} ${candidate.name}`, }); @@ -66,13 +52,11 @@ export const validityJob = async ( return true; } catch (e) { logger.error(`Error running validity job: ${e}`, constraintsLabel); - const errorStatus: JobStatus = { - status: "errored", - name: JobNames.Validity, - updated: Date.now(), + jobStatusEmitter.emit(JobEvent.Failed, { + status: JobStatus.Failed, + name: JobKey.Validity, error: JSON.stringify(e), - }; - jobStatusEmitter.emit("jobErrored", errorStatus); + }); return false; } }; @@ -108,10 +92,9 @@ export const scoreJob = async ( const progress = ((index + 1) / totalCandidates) * 100; // Emit progress update including the candidate name - jobStatusEmitter.emit("jobProgress", { - name: JobNames.Score, + jobStatusEmitter.emit(JobEvent.Progress, { + name: JobKey.Score, progress, - updated: Date.now(), iteration: `[${score?.toFixed(1)}] ${candidate.name}`, }); @@ -125,13 +108,12 @@ export const scoreJob = async ( return true; } catch (e) { logger.error(`Error running score job: ${e}`, constraintsLabel); - const errorStatus: JobStatus = { - status: "errored", - name: JobNames.Score, - updated: Date.now(), + const errorInfo: JobInfo = { + status: JobStatus.Failed, + name: JobKey.Score, error: JSON.stringify(e), }; - jobStatusEmitter.emit("jobErrored", errorStatus); + jobStatusEmitter.emit(JobEvent.Failed, errorInfo); return false; } }; diff --git a/packages/common/src/scorekeeper/jobs/specificJobs/EraPointsJob.ts b/packages/common/src/scorekeeper/jobs/specificJobs/EraPointsJob.ts index 283fafb21..6bb2a56f2 100644 --- a/packages/common/src/scorekeeper/jobs/specificJobs/EraPointsJob.ts +++ b/packages/common/src/scorekeeper/jobs/specificJobs/EraPointsJob.ts @@ -1,17 +1,10 @@ import { ChainData, Constants, logger, queries } from "../../../index"; -import { Job, JobConfig, JobRunnerMetadata, JobStatus } from "../JobsClass"; +import { JobEvent, JobKey, JobRunnerMetadata, JobStatus } from "../types"; import { jobStatusEmitter } from "../../../Events"; import { withExecutionTimeLogging } from "../../../utils"; -import { JobNames } from "../JobConfigs"; export const erapointsLabel = { label: "EraPointsJob" }; -export class EraPointsJob extends Job { - constructor(jobConfig: JobConfig, jobRunnerMetadata: JobRunnerMetadata) { - super(jobConfig, jobRunnerMetadata); - } -} - // Gets and sets the total era points for a given era export const individualEraPointsJob = async ( chaindata: ChainData, @@ -74,10 +67,9 @@ export const eraPointsJob = async ( const progress = (processedEras / Constants.ERAPOINTS_JOB_MAX_ERAS) * 100; // Emit progress update with active era as iteration - jobStatusEmitter.emit("jobProgress", { - name: JobNames.EraPoints, + jobStatusEmitter.emit(JobEvent.Progress, { + name: JobKey.EraPoints, progress, - updated: Date.now(), iteration: `Active era: ${i}`, }); @@ -89,13 +81,11 @@ export const eraPointsJob = async ( return true; } catch (e) { logger.error(`Error running era points job: ${e}`, erapointsLabel); - const errorStatus: JobStatus = { - status: "errored", - name: JobNames.EraPoints, - updated: Date.now(), + jobStatusEmitter.emit(JobEvent.Failed, { + status: JobStatus.Failed, + name: JobKey.EraPoints, error: JSON.stringify(e), - }; - jobStatusEmitter.emit("jobErrored", errorStatus); + }); return false; } }; diff --git a/packages/common/src/scorekeeper/jobs/specificJobs/EraStatsJob.ts b/packages/common/src/scorekeeper/jobs/specificJobs/EraStatsJob.ts index 9a8cd5780..a2f4f9f5a 100644 --- a/packages/common/src/scorekeeper/jobs/specificJobs/EraStatsJob.ts +++ b/packages/common/src/scorekeeper/jobs/specificJobs/EraStatsJob.ts @@ -1,21 +1,14 @@ import { logger, queries } from "../../../index"; -import { Job, JobConfig, JobRunnerMetadata, JobStatus } from "../JobsClass"; +import { JobEvent, JobKey, JobRunnerMetadata, JobStatus } from "../types"; import { jobStatusEmitter } from "../../../Events"; import { setAllIdentities, setValidatorRanks, withExecutionTimeLogging, } from "../../../utils"; -import { JobNames } from "../JobConfigs"; export const erastatsLabel = { label: "EraStatsJob" }; -export class EraStatsJob extends Job { - constructor(jobConfig: JobConfig, jobRunnerMetadata: JobRunnerMetadata) { - super(jobConfig, jobRunnerMetadata); - } -} - export const eraStatsJob = async ( metadata: JobRunnerMetadata, ): Promise => { @@ -37,10 +30,9 @@ export const eraStatsJob = async ( return false; } // Emit progress update indicating the start of the job - jobStatusEmitter.emit("jobProgress", { - name: JobNames.EraStats, + jobStatusEmitter.emit(JobEvent.Progress, { + name: JobKey.EraStats, progress: 0, - updated: Date.now(), }); const allCandidates = await queries.allCandidates(); const valid = allCandidates.filter((candidate) => candidate.valid); @@ -59,10 +51,9 @@ export const eraStatsJob = async ( for (const [index, validator] of validators.entries()) { // Emit progress update for each validator processed const progressPercentage = ((index + 1) / validators.length) * 100; - jobStatusEmitter.emit("jobProgress", { - name: JobNames.EraStats, + jobStatusEmitter.emit(JobEvent.Progress, { + name: JobKey.EraStats, progress: progressPercentage, - updated: Date.now(), iteration: `Processed validator: ${validator}`, }); } @@ -70,10 +61,9 @@ export const eraStatsJob = async ( await queries.setValidatorSet(currentSession, currentEra, validators); // Emit progress update after storing identities - jobStatusEmitter.emit("jobProgress", { - name: JobNames.EraStats, + jobStatusEmitter.emit(JobEvent.Progress, { + name: JobKey.EraStats, progress: 25, - updated: Date.now(), }); for (let i = currentEra; i > currentEra - 20; i--) { @@ -97,39 +87,32 @@ export const eraStatsJob = async ( // Emit progress update for each era processed const progressPercentage = ((currentEra - i) / (currentEra - 20)) * 100; - jobStatusEmitter.emit("jobProgress", { - name: JobNames.EraStats, + jobStatusEmitter.emit(JobEvent.Progress, { + name: JobKey.EraStats, progress: progressPercentage, - updated: Date.now(), iteration: `Processed era: ${i}`, }); } // Emit progress update after processing eras - jobStatusEmitter.emit("jobProgress", { - name: JobNames.EraStats, + jobStatusEmitter.emit(JobEvent.Progress, { + name: JobKey.EraStats, progress: 100, - updated: Date.now(), }); await setValidatorRanks(); - - const finishedStatus: JobStatus = { - status: "finished", - name: JobNames.EraStats, - updated: Date.now(), - }; - jobStatusEmitter.emit("jobFinished", finishedStatus); + jobStatusEmitter.emit(JobEvent.Finished, { + status: JobStatus.Finished, + name: JobKey.EraStats, + }); return true; } catch (e) { logger.error(`Error running era stats job: ${e}`, erastatsLabel); - const errorStatus: JobStatus = { - status: "errored", - name: JobNames.EraStats, - updated: Date.now(), + jobStatusEmitter.emit(JobEvent.Failed, { + status: JobStatus.Failed, + name: JobKey.EraStats, error: JSON.stringify(e), - }; - jobStatusEmitter.emit("jobErrored", errorStatus); + }); return false; } }; diff --git a/packages/common/src/scorekeeper/jobs/specificJobs/ExecutionJob.ts b/packages/common/src/scorekeeper/jobs/specificJobs/ExecutionJob.ts index 6769e928d..b36b3eae6 100644 --- a/packages/common/src/scorekeeper/jobs/specificJobs/ExecutionJob.ts +++ b/packages/common/src/scorekeeper/jobs/specificJobs/ExecutionJob.ts @@ -1,16 +1,10 @@ -import { Job, JobConfig, JobRunnerMetadata } from "../JobsClass"; +import { JobEvent, JobKey, JobRunnerMetadata } from "../types"; import logger from "../../../logger"; import { Constants, queries, Util } from "../../../index"; -import { cronLabel } from "../cron/StartCronJobs"; import { jobStatusEmitter } from "../../../Events"; -import { JobNames } from "../JobConfigs"; import { NominatorState, NominatorStatus } from "../../../types"; -export class ExecutionJob extends Job { - constructor(jobConfig: JobConfig, jobRunnerMetadata: JobRunnerMetadata) { - super(jobConfig, jobRunnerMetadata); - } -} +export const cronLabel = { label: "Cron" }; export const executionJob = async ( metadata: JobRunnerMetadata, @@ -47,10 +41,9 @@ export const executionJob = async ( for (const [index, data] of allDelayed.entries()) { const progressPercentage = ((index + 1) / allDelayed.length) * 100; - jobStatusEmitter.emit("jobProgress", { - name: JobNames.Execution, + jobStatusEmitter.emit(JobEvent.Progress, { + name: JobKey.Execution, progress: progressPercentage, - updated: Date.now(), iteration: `Processed transaction: ${data.callHash}`, }); @@ -179,10 +172,9 @@ export const executionJob = async ( nominator.lastEraNomination = era; // Create a Nomination Object - jobStatusEmitter.emit("jobProgress", { - name: JobNames.Execution, + jobStatusEmitter.emit(JobEvent.Progress, { + name: JobKey.Execution, progress: progressPercentage, // You can adjust this if needed - updated: Date.now(), iteration: `Executed transaction: ${data.callHash}`, }); await queries.setNomination( @@ -248,11 +240,10 @@ export const executionJob = async ( await Util.sleep(7000); } } - jobStatusEmitter.emit("jobProgress", { - name: JobNames.Execution, + jobStatusEmitter.emit(JobEvent.Progress, { + name: JobKey.Execution, progress: 100, - updated: Date.now(), - message: "All transactions processed", + iteration: "All transactions processed", }); return true; } catch (e) { diff --git a/packages/common/src/scorekeeper/jobs/specificJobs/InclusionJob.ts b/packages/common/src/scorekeeper/jobs/specificJobs/InclusionJob.ts index 273d817b1..b2ff6e3a4 100644 --- a/packages/common/src/scorekeeper/jobs/specificJobs/InclusionJob.ts +++ b/packages/common/src/scorekeeper/jobs/specificJobs/InclusionJob.ts @@ -1,17 +1,11 @@ import { logger, queries } from "../../../index"; -import { Job, JobConfig, JobRunnerMetadata, JobStatus } from "../JobsClass"; +import { JobEvent, JobRunnerMetadata, JobStatus } from "../types"; import { jobStatusEmitter } from "../../../Events"; import { withExecutionTimeLogging } from "../../../utils"; -import { JobNames } from "../JobConfigs"; +import { JobKey } from "../types"; export const inclusionLabel = { label: "InclusionJob" }; -export class InclusionJob extends Job { - constructor(jobConfig: JobConfig, jobRunnerMetadata: JobRunnerMetadata) { - super(jobConfig, jobRunnerMetadata); - } -} - export const inclusionJob = async ( metadata: JobRunnerMetadata, ): Promise => { @@ -22,10 +16,9 @@ export const inclusionJob = async ( const candidates = await queries.allCandidates(); // Emit progress update indicating the start of the job - jobStatusEmitter.emit("jobProgress", { - name: JobNames.Inclusion, + jobStatusEmitter.emit(JobEvent.Progress, { + name: JobKey.Inclusion, progress: 0, - updated: Date.now(), }); for (const candidate of candidates) { @@ -58,32 +51,27 @@ export const inclusionJob = async ( // Emit progress update for each candidate processed, including the candidate's name const progressPercentage = ((candidates.indexOf(candidate) + 1) / candidates.length) * 100; - jobStatusEmitter.emit("jobProgress", { - name: JobNames.Inclusion, + jobStatusEmitter.emit(JobEvent.Progress, { + name: JobKey.Inclusion, progress: progressPercentage, - updated: Date.now(), iteration: `Processed candidate ${candidate.name}`, }); } // Emit progress update indicating the completion of the job - jobStatusEmitter.emit("jobProgress", { - name: "Inclusion Job", + jobStatusEmitter.emit(JobEvent.Progress, { + name: JobKey.Inclusion, progress: 100, - updated: Date.now(), }); return true; } catch (e) { logger.error(`Error running inclusion job: ${e}`, inclusionLabel); - const errorStatus: JobStatus = { - status: "errored", - name: JobNames.Inclusion, - updated: Date.now(), + jobStatusEmitter.emit(JobEvent.Failed, { + status: JobStatus.Failed, + name: JobKey.Inclusion, error: JSON.stringify(e), - }; - - jobStatusEmitter.emit("jobErrored", errorStatus); + }); return false; } }; diff --git a/packages/common/src/scorekeeper/jobs/specificJobs/LocationStatsJob.ts b/packages/common/src/scorekeeper/jobs/specificJobs/LocationStatsJob.ts index 09f9a70ed..ec4167020 100644 --- a/packages/common/src/scorekeeper/jobs/specificJobs/LocationStatsJob.ts +++ b/packages/common/src/scorekeeper/jobs/specificJobs/LocationStatsJob.ts @@ -1,26 +1,19 @@ import { logger, queries, Score } from "../../../index"; -import { Job, JobConfig, JobRunnerMetadata, JobStatus } from "../JobsClass"; +import { JobEvent, JobRunnerMetadata, JobStatus } from "../types"; import { jobStatusEmitter } from "../../../Events"; import { withExecutionTimeLogging } from "../../../utils"; -import { JobNames } from "../JobConfigs"; +import { JobKey } from "../types"; export const locationstatsLabel = { label: "LocationStatsJob" }; -export class LocationStatsJob extends Job { - constructor(jobConfig: JobConfig, jobRunnerMetadata: JobRunnerMetadata) { - super(jobConfig, jobRunnerMetadata); - } -} - export const locationStatsJob = async (metadata: JobRunnerMetadata) => { try { const { chaindata } = metadata; await queries.cleanBlankLocations(); - jobStatusEmitter.emit("jobProgress", { - name: JobNames.LocationStats, + jobStatusEmitter.emit(JobEvent.Progress, { + name: JobKey.LocationStats, progress: 25, - updated: Date.now(), }); let totalNodes = []; @@ -36,10 +29,9 @@ export const locationStatsJob = async (metadata: JobRunnerMetadata) => { const locationMap = new Map(); const locationArr = []; - jobStatusEmitter.emit("jobProgress", { - name: JobNames.LocationStats, + jobStatusEmitter.emit(JobEvent.Progress, { + name: JobKey.LocationStats, progress: 50, - updated: Date.now(), }); // Add all candidate entries to the list of nodes @@ -62,10 +54,9 @@ export const locationStatsJob = async (metadata: JobRunnerMetadata) => { // Emit progress update for each candidate processed const progressPercentage = ((index + 1) / candidates.length) * 100; - jobStatusEmitter.emit("jobProgress", { - name: JobNames.LocationStats, + jobStatusEmitter.emit(JobEvent.Progress, { + name: JobKey.LocationStats, progress: progressPercentage, - updated: Date.now(), iteration: `Processed candidate ${candidate.name}`, }); } @@ -107,10 +98,9 @@ export const locationStatsJob = async (metadata: JobRunnerMetadata) => { // Emit progress update for each validator processed const progressPercentage = ((index + 1) / validatorset.validators.length) * 100; - jobStatusEmitter.emit("jobProgress", { - name: JobNames.LocationStats, + jobStatusEmitter.emit(JobEvent.Progress, { + name: JobKey.LocationStats, progress: progressPercentage, - updated: Date.now(), iteration: `Processed validator ${validatorAddress}`, }); } @@ -131,10 +121,9 @@ export const locationStatsJob = async (metadata: JobRunnerMetadata) => { } }); - jobStatusEmitter.emit("jobProgress", { - name: JobNames.LocationStats, + jobStatusEmitter.emit(JobEvent.Progress, { + name: JobKey.LocationStats, progress: 75, - updated: Date.now(), }); // Iterate through all candidates and the active validator set @@ -153,10 +142,9 @@ export const locationStatsJob = async (metadata: JobRunnerMetadata) => { // Emit progress update for each node processed const progressPercentage = ((index + 1) / totalNodes.length) * 100; - jobStatusEmitter.emit("jobProgress", { - name: JobNames.LocationStats, + jobStatusEmitter.emit(JobEvent.Progress, { + name: JobKey.LocationStats, progress: progressPercentage, - updated: Date.now(), iteration: `Processed node ${node.address}`, }); } @@ -166,10 +154,9 @@ export const locationStatsJob = async (metadata: JobRunnerMetadata) => { // Emit progress update for each location processed const progressPercentage = (locationArr.length / locationMap.size) * 100; - jobStatusEmitter.emit("jobProgress", { - name: JobNames.LocationStats, + jobStatusEmitter.emit(JobEvent.Progress, { + name: JobKey.LocationStats, progress: progressPercentage, - updated: Date.now(), iteration: `Processed location ${name}`, }); } @@ -202,10 +189,9 @@ export const locationStatsJob = async (metadata: JobRunnerMetadata) => { // Emit progress update for each region processed const progressPercentage = (regionArr.length / regionMap.size) * 100; - jobStatusEmitter.emit("jobProgress", { - name: JobNames.LocationStats, + jobStatusEmitter.emit(JobEvent.Progress, { + name: JobKey.LocationStats, progress: progressPercentage, - updated: Date.now(), iteration: `Processed region ${name}`, }); } @@ -238,10 +224,9 @@ export const locationStatsJob = async (metadata: JobRunnerMetadata) => { // Emit progress update for each country processed const progressPercentage = (countryArr.length / countryMap.size) * 100; - jobStatusEmitter.emit("jobProgress", { - name: JobNames.LocationStats, + jobStatusEmitter.emit(JobEvent.Progress, { + name: JobKey.LocationStats, progress: progressPercentage, - updated: Date.now(), iteration: `Processed country ${name}`, }); } @@ -274,10 +259,9 @@ export const locationStatsJob = async (metadata: JobRunnerMetadata) => { // Emit progress update for each provider processed const progressPercentage = (providerArr.length / providerMap.size) * 100; - jobStatusEmitter.emit("jobProgress", { - name: JobNames.LocationStats, + jobStatusEmitter.emit(JobEvent.Progress, { + name: JobKey.LocationStats, progress: progressPercentage, - updated: Date.now(), iteration: `Processed provider ${name}`, }); } @@ -307,7 +291,7 @@ export const locationStatsJob = async (metadata: JobRunnerMetadata) => { decentralization, ); - jobStatusEmitter.emit("jobProgress", { + jobStatusEmitter.emit(JobEvent.Progress, { name: "Location Stats Job", progress: 100, updated: Date.now(), @@ -316,14 +300,11 @@ export const locationStatsJob = async (metadata: JobRunnerMetadata) => { return true; } catch (e) { logger.error(`Error running location stats job: ${e}`, locationstatsLabel); - const errorStatus: JobStatus = { - status: "errored", - name: JobNames.LocationStats, - updated: Date.now(), + jobStatusEmitter.emit(JobEvent.Progress, { + status: JobStatus.Failed, + name: JobKey.LocationStats, error: JSON.stringify(e), - }; - - jobStatusEmitter.emit("jobErrored", errorStatus); + }); return false; } }; diff --git a/packages/common/src/scorekeeper/jobs/specificJobs/MainScorekeeperJob.ts b/packages/common/src/scorekeeper/jobs/specificJobs/MainScorekeeperJob.ts index 67be1a52d..e51b3f7b3 100644 --- a/packages/common/src/scorekeeper/jobs/specificJobs/MainScorekeeperJob.ts +++ b/packages/common/src/scorekeeper/jobs/specificJobs/MainScorekeeperJob.ts @@ -1,17 +1,11 @@ -import { Job, JobConfig, JobRunnerMetadata, JobStatus } from "../JobsClass"; +import { JobEvent, JobRunnerMetadata, JobStatus } from "../types"; import logger from "../../../logger"; import { queries } from "../../../index"; import { startRound } from "../../Round"; import { jobStatusEmitter } from "../../../Events"; -import { JobNames } from "../JobConfigs"; +import { JobKey } from "../types"; import { NOMINATOR_SHOULD_NOMINATE_ERAS_THRESHOLD } from "../../../constants"; -export class MainScorekeeperJob extends Job { - constructor(jobConfig: JobConfig, jobRunnerMetadata: JobRunnerMetadata) { - super(jobConfig, jobRunnerMetadata); - } -} - const mainScoreKeeperLabel = { label: "MainScorekeeperJob" }; export const mainScorekeeperJob = async ( @@ -31,14 +25,11 @@ export const mainScorekeeperJob = async ( const [activeEra, err] = await chaindata.getActiveEraIndex(); if (err) { logger.warn(`CRITICAL: ${err}`, mainScoreKeeperLabel); - const errorStatus: JobStatus = { - status: "errored", - name: JobNames.MainScorekeeper, - updated: Date.now(), + jobStatusEmitter.emit(JobEvent.Failed, { + status: JobStatus.Failed, + name: JobKey.MainScorekeeper, error: JSON.stringify(err), - }; - - jobStatusEmitter.emit("jobErrored", errorStatus); + }); return; } @@ -79,14 +70,11 @@ export const mainScorekeeperJob = async ( "Nominating is disabled in the settings and Dry Run is false. Skipping round.", mainScoreKeeperLabel, ); - const errorStatus: JobStatus = { - status: "errored", - name: JobNames.MainScorekeeper, - updated: Date.now(), + jobStatusEmitter.emit(JobEvent.Failed, { + status: JobStatus.Failed, + name: JobKey.MainScorekeeper, error: "Nominating Disabled", - }; - - jobStatusEmitter.emit("jobErrored", errorStatus); + }); return; } else { logger.info( @@ -131,10 +119,9 @@ export const mainScorekeeperJob = async ( ); // Emit progress update event with custom iteration name - jobStatusEmitter.emit("jobProgress", { - name: JobNames.MainScorekeeper, + jobStatusEmitter.emit(JobEvent.Progress, { + name: JobKey.MainScorekeeper, progress, - updated: Date.now(), iteration: `Processed nominator group ${processedNominatorGroups}`, }); } diff --git a/packages/common/src/scorekeeper/jobs/specificJobs/NominatorJob.ts b/packages/common/src/scorekeeper/jobs/specificJobs/NominatorJob.ts index 1df157ba5..a61478735 100644 --- a/packages/common/src/scorekeeper/jobs/specificJobs/NominatorJob.ts +++ b/packages/common/src/scorekeeper/jobs/specificJobs/NominatorJob.ts @@ -1,17 +1,11 @@ import { jobStatusEmitter } from "../../../Events"; import { logger, queries } from "../../../index"; -import { Job, JobConfig, JobRunnerMetadata, JobStatus } from "../JobsClass"; +import { JobEvent, JobRunnerMetadata, JobStatus } from "../types"; import { withExecutionTimeLogging } from "../../../utils"; -import { JobNames } from "../JobConfigs"; +import { JobKey } from "../types"; export const nominatorLabel = { label: "NominatorJob" }; -export class NominatorJob extends Job { - constructor(jobConfig: JobConfig, jobRunnerMetadata: JobRunnerMetadata) { - super(jobConfig, jobRunnerMetadata); - } -} - export const nominatorJob = async ( metadata: JobRunnerMetadata, ): Promise => { @@ -63,10 +57,9 @@ export const nominatorJob = async ( ); // Emit progress update event with candidate's name - jobStatusEmitter.emit("jobProgress", { - name: JobNames.Nominator, + jobStatusEmitter.emit(JobEvent.Progress, { + name: JobKey.Nominator, progress, - updated: Date.now(), iteration: `Processed candidate ${candidate.name}`, }); } @@ -74,14 +67,11 @@ export const nominatorJob = async ( return true; } catch (e) { logger.error(`Error running nominator job: ${e}`, nominatorLabel); - const errorStatus: JobStatus = { - status: "errored", - name: JobNames.Nominator, - updated: Date.now(), + jobStatusEmitter.emit(JobEvent.Failed, { + status: JobStatus.Failed, + name: JobKey.Nominator, error: JSON.stringify(e), - }; - - jobStatusEmitter.emit("jobErrored", errorStatus); + }); return false; } }; diff --git a/packages/common/src/scorekeeper/jobs/specificJobs/ReleaseMonitorJob.ts b/packages/common/src/scorekeeper/jobs/specificJobs/ReleaseMonitorJob.ts index 7eb0db0c2..81efa97b2 100644 --- a/packages/common/src/scorekeeper/jobs/specificJobs/ReleaseMonitorJob.ts +++ b/packages/common/src/scorekeeper/jobs/specificJobs/ReleaseMonitorJob.ts @@ -1,17 +1,10 @@ import { logger, queries } from "../../../index"; import { Octokit } from "@octokit/rest"; -import { Job, JobConfig, JobRunnerMetadata, JobStatus } from "../JobsClass"; -import { JobNames } from "../JobConfigs"; +import { JobEvent, JobKey, JobRunnerMetadata, JobStatus } from "../types"; import { jobStatusEmitter } from "../../../Events"; export const monitorLabel = { label: "Monitor" }; -export class MonitorJob extends Job { - constructor(jobConfig: JobConfig, jobRunnerMetadata: JobRunnerMetadata) { - super(jobConfig, jobRunnerMetadata); - } -} - export const getLatestTaggedRelease = async ( jobRunnerMetadata?: JobRunnerMetadata, ) => { @@ -68,14 +61,11 @@ export const getLatestTaggedRelease = async ( logger.info(`Done. Took ${(end - start) / 1000} seconds`, monitorLabel); } catch (e) { logger.error(`Error running monitor job: ${e}`, monitorLabel); - const errorStatus: JobStatus = { - status: "errored", - name: JobNames.Monitor, - updated: Date.now(), + jobStatusEmitter.emit(JobEvent.Failed, { + status: JobStatus.Failed, + name: JobKey.Monitor, error: JSON.stringify(e), - }; - - jobStatusEmitter.emit("jobErrored", errorStatus); + }); } }; diff --git a/packages/common/src/scorekeeper/jobs/specificJobs/SessionKeyJob.ts b/packages/common/src/scorekeeper/jobs/specificJobs/SessionKeyJob.ts index 35a284464..20e161c82 100644 --- a/packages/common/src/scorekeeper/jobs/specificJobs/SessionKeyJob.ts +++ b/packages/common/src/scorekeeper/jobs/specificJobs/SessionKeyJob.ts @@ -1,17 +1,10 @@ import { logger, queries } from "../../../index"; -import { Job, JobConfig, JobRunnerMetadata, JobStatus } from "../JobsClass"; +import { JobEvent, JobKey, JobRunnerMetadata, JobStatus } from "../types"; import { jobStatusEmitter } from "../../../Events"; import { withExecutionTimeLogging } from "../../../utils"; -import { JobNames } from "../JobConfigs"; export const sessionkeyLabel = { label: "SessionKeyJob" }; -export class SessionKeyJob extends Job { - constructor(jobConfig: JobConfig, jobRunnerMetadata: JobRunnerMetadata) { - super(jobConfig, jobRunnerMetadata); - } -} - export const sessionKeyJob = async (metadata: JobRunnerMetadata) => { try { const { chaindata } = metadata; @@ -44,10 +37,9 @@ export const sessionKeyJob = async (metadata: JobRunnerMetadata) => { ); // Emit progress update event with validator's name - jobStatusEmitter.emit("jobProgress", { - name: JobNames.SessionKey, + jobStatusEmitter.emit(JobEvent.Progress, { + name: JobKey.SessionKey, progress, - updated: Date.now(), iteration: `Processed validator ${validator}`, }); } @@ -55,14 +47,11 @@ export const sessionKeyJob = async (metadata: JobRunnerMetadata) => { return true; } catch (e) { logger.error(`Error running session key job: ${e}`, sessionkeyLabel); - const errorStatus: JobStatus = { - status: "errored", - name: JobNames.SessionKey, - updated: Date.now(), + jobStatusEmitter.emit(JobEvent.Failed, { + status: JobStatus.Failed, + name: JobKey.SessionKey, error: JSON.stringify(e), - }; - - jobStatusEmitter.emit("jobErrored", errorStatus); + }); return false; } }; diff --git a/packages/common/src/scorekeeper/jobs/specificJobs/StaleNomination.ts b/packages/common/src/scorekeeper/jobs/specificJobs/StaleNomination.ts index 85f2651a5..7d7a551af 100644 --- a/packages/common/src/scorekeeper/jobs/specificJobs/StaleNomination.ts +++ b/packages/common/src/scorekeeper/jobs/specificJobs/StaleNomination.ts @@ -1,12 +1,7 @@ -import { Job, JobConfig, JobRunnerMetadata } from "../JobsClass"; +import { JobRunnerMetadata } from "../types"; import logger from "../../../logger"; -import { cronLabel } from "../cron/StartCronJobs"; -export class StaleNominationJob extends Job { - constructor(jobConfig: JobConfig, jobRunnerMetadata: JobRunnerMetadata) { - super(jobConfig, jobRunnerMetadata); - } -} +export const cronLabel = { label: "Cron" }; export const staleNominationJob = async ( metadata: JobRunnerMetadata, diff --git a/packages/common/src/scorekeeper/jobs/specificJobs/UnclaimedErasJob.ts b/packages/common/src/scorekeeper/jobs/specificJobs/UnclaimedErasJob.ts index d6c4cd667..e07201ccf 100644 --- a/packages/common/src/scorekeeper/jobs/specificJobs/UnclaimedErasJob.ts +++ b/packages/common/src/scorekeeper/jobs/specificJobs/UnclaimedErasJob.ts @@ -1,15 +1,9 @@ import { logger } from "../../../index"; -import { Job, JobConfig, JobRunnerMetadata } from "../JobsClass"; +import { JobRunnerMetadata } from "../types"; import { withExecutionTimeLogging } from "../../../utils"; export const unclaimedErasLabel = { label: "UnclaimedErasJob" }; -export class UnclaimedErasJob extends Job { - constructor(jobConfig: JobConfig, jobRunnerMetadata: JobRunnerMetadata) { - super(jobConfig, jobRunnerMetadata); - } -} - export const unclaimedErasJob = async (metadata: JobRunnerMetadata) => { const { chaindata } = metadata; logger.info(`Unclaimed Eras done`, unclaimedErasLabel); diff --git a/packages/common/src/scorekeeper/jobs/specificJobs/ValidatorPrefJob.ts b/packages/common/src/scorekeeper/jobs/specificJobs/ValidatorPrefJob.ts index 7a2d79906..5e3e60950 100644 --- a/packages/common/src/scorekeeper/jobs/specificJobs/ValidatorPrefJob.ts +++ b/packages/common/src/scorekeeper/jobs/specificJobs/ValidatorPrefJob.ts @@ -1,17 +1,10 @@ import { logger, Models, queries } from "../../../index"; -import { Job, JobConfig, JobRunnerMetadata, JobStatus } from "../JobsClass"; +import { JobEvent, JobKey, JobRunnerMetadata, JobStatus } from "../types"; import { jobStatusEmitter } from "../../../Events"; import { withExecutionTimeLogging } from "../../../utils"; -import { JobNames } from "../JobConfigs"; export const validatorPrefLabel = { label: "ValidatorPrefJob" }; -export class ValidatorPrefJob extends Job { - constructor(jobConfig: JobConfig, jobRunnerMetadata: JobRunnerMetadata) { - super(jobConfig, jobRunnerMetadata); - } -} - export const individualValidatorPrefJob = async ( metadata: JobRunnerMetadata, candidate: Models.Candidate, @@ -90,10 +83,9 @@ export const validatorPrefJob = async ( ); // Emit progress update event with candidate's name - jobStatusEmitter.emit("jobProgress", { - name: JobNames.ValidatorPref, + jobStatusEmitter.emit(JobEvent.Progress, { + name: JobKey.ValidatorPref, progress, - updated: Date.now(), iteration: `Processed candidate ${candidate.name}`, }); } @@ -104,14 +96,11 @@ export const validatorPrefJob = async ( validatorPrefLabel, ); - const errorStatus: JobStatus = { - status: "errored", - name: JobNames.ValidatorPref, - updated: Date.now(), + jobStatusEmitter.emit(JobEvent.Failed, { + status: JobStatus.Failed, + name: JobKey.ValidatorPref, error: JSON.stringify(e), - }; - - jobStatusEmitter.emit("jobErrored", errorStatus); + }); return false; } }; diff --git a/packages/common/src/scorekeeper/jobs/types.ts b/packages/common/src/scorekeeper/jobs/types.ts index 626a383cc..c5cffd77d 100644 --- a/packages/common/src/scorekeeper/jobs/types.ts +++ b/packages/common/src/scorekeeper/jobs/types.ts @@ -2,6 +2,28 @@ import { ApiHandler, ChainData, Config, Constraints } from "../../index"; import MatrixBot from "../../matrix"; import Nominator from "../../nominator/nominator"; +// Job keys used in the config +export enum JobKey { + ActiveValidator = "activeValidator", + Monitor = "monitor", + ClearOffline = "clearOffline", + Validity = "validity", + Score = "score", + EraStats = "eraStats", + EraPoints = "eraPoints", + Inclusion = "inclusion", + SessionKey = "sessionKey", + UnclaimedEras = "unclaimedEras", + ValidatorPref = "validatorPref", + LocationStats = "locationStats", + Nominator = "nominator", + BlockData = "block", + MainScorekeeper = "scorekeeper", + Execution = "execution", + Cancel = "cancel", + StaleNomination = "stale", +} + export type JobRunnerMetadata = { config: Config.ConfigSchema; chaindata: ChainData; @@ -14,27 +36,35 @@ export type JobRunnerMetadata = { }; export type JobConfig = { - jobKey: keyof Config.ConfigSchema["cron"] | ""; + jobKey: JobKey; defaultFrequency: string; jobFunction: (metadata: JobRunnerMetadata) => Promise; - name: string; preventOverlap?: boolean; }; -// There is a dependency on status names in scorekeeper-status-ui -type StatusName = - | "running" - | "finished" - | "errored" - | "started" - | "Not Running"; +export enum JobStatus { + Initialized = "Initialized", + Started = "Started", + Running = "Running", // This status is not used and seems redundant + Finished = "Finished", + Failed = "Failed", +} + +export enum JobEvent { + Started = "Started", + Running = "Running", + Finished = "Finished", + Failed = "Failed", + Progress = "Progress", +} -export type JobStatus = { +// Used to expose Job info to the Gateway +export type JobInfo = { name: string; - updated: number; + updated?: number; enabled?: boolean; runCount?: number; - status: StatusName; + status: JobStatus; frequency?: string; error?: string; // Progress from 0 to 100 diff --git a/packages/common/src/scorekeeper/scorekeeper.ts b/packages/common/src/scorekeeper/scorekeeper.ts index 67aa83a59..8c9122dc5 100644 --- a/packages/common/src/scorekeeper/scorekeeper.ts +++ b/packages/common/src/scorekeeper/scorekeeper.ts @@ -12,7 +12,7 @@ import { import Nominator from "../nominator/nominator"; import { registerAPIHandler } from "./RegisterHandler"; import { Job } from "./jobs/Job"; -import { JobRunnerMetadata, JobStatus } from "./jobs/types"; +import { JobInfo, JobRunnerMetadata } from "./jobs/types"; import { jobConfigs } from "./jobs/JobConfigs"; import { startRound } from "./Round"; import { NominatorStatus } from "../types"; @@ -49,7 +49,7 @@ export default class ScoreKeeper { public isStarted = false; - constructor(handler: ApiHandler, config: Config.ConfigSchema, bot: any) { + constructor(handler: ApiHandler, config: Config.ConfigSchema, bot?: any) { this.handler = handler; this.chaindata = new ChainData(this.handler); this.config = config; @@ -62,9 +62,10 @@ export default class ScoreKeeper { registerAPIHandler(this.handler, this.config, this.chaindata, this.bot); } public getJobsStatusAsJson() { - const statuses: Record = {}; + const statuses: Record = {}; for (const job of this._jobs) { - statuses[job.getName()] = job.getStatus(); + const status = job.getStatus(); + statuses[status.name] = status; } return statuses; } @@ -221,8 +222,13 @@ export default class ScoreKeeper { startJobs(metadata: JobRunnerMetadata) { this._jobs = jobConfigs.map((config) => { - const job = new Job(config, metadata); - job.run(); + const job = new Job( + config.jobKey, + config.jobFunction, + config.defaultFrequency, + metadata, + ); + job.start(); return job; }); } diff --git a/packages/common/test/scorekeeper/scorekeeper.int.test.ts b/packages/common/test/scorekeeper/scorekeeper.int.test.ts index f3858d0d9..689b7a16b 100644 --- a/packages/common/test/scorekeeper/scorekeeper.int.test.ts +++ b/packages/common/test/scorekeeper/scorekeeper.int.test.ts @@ -2,13 +2,20 @@ import { beforeAll, describe, expect, it } from "vitest"; import { ScoreKeeper } from "../../src"; import { getAndStartScorekeeper } from "../testUtils/scorekeeper"; import { jobStatusEmitter } from "../../src/Events"; +import { JobEvent, JobStatus } from "../../src/scorekeeper/jobs/types"; +import { getKusamaProdConfig } from "../testUtils/config"; const TIMEOUT_DURATION = 5200000; // 120 seconds describe("Scorekeeper Integration Tests", () => { let scorekeeper: ScoreKeeper; beforeAll(async () => { - scorekeeper = await getAndStartScorekeeper(); + const config = getKusamaProdConfig(); + + // The first job "activeValidator" running every 5 seconds + config.cron.activeValidator = "*/5 * * * * *"; + + scorekeeper = await getAndStartScorekeeper(config); }, TIMEOUT_DURATION); it( @@ -23,13 +30,13 @@ describe("Scorekeeper Integration Tests", () => { ); it( - "should start jobs and have their status as started", + "should start jobs and have their status as initialized", async () => { const statusJson = scorekeeper.getJobsStatusAsJson(); const statuses = Object.values(statusJson); statuses.forEach((job) => { - if (job.status !== "started") { - console.error(`Job ${job.name} is not started.`); + if (job.status !== JobStatus.Initialized) { + console.error(`Job ${job.name} is not initialized.`); } expect(job.status).toBeDefined(); }); @@ -37,6 +44,17 @@ describe("Scorekeeper Integration Tests", () => { TIMEOUT_DURATION, ); + it( + "should wait till the activeValidator job will be finished", + async () => { + await new Promise((resolve) => setTimeout(resolve, 5000)); + const statusJson = scorekeeper.getJobsStatusAsJson(); + const statuses = Object.values(statusJson); + expect(statuses[0].status).toBe(JobStatus.Finished); + }, + TIMEOUT_DURATION, + ); + it( "should update status of the job on event", async () => { @@ -44,20 +62,17 @@ describe("Scorekeeper Integration Tests", () => { const firstJob = Object.values(statusesBefore)[0]; const secondJob = Object.values(statusesBefore)[1]; - jobStatusEmitter.emit("jobRunning", { + jobStatusEmitter.emit(JobEvent.Finished, { name: firstJob.name, - status: "running", - updated: Date.now(), + status: JobStatus.Finished, }); - await new Promise((resolve) => setTimeout(resolve, 500)); - const statusesAfter = scorekeeper.getJobsStatusAsJson(); const updatedFirstJob = Object.values(statusesAfter)[0]; const unchangedSecondJob = Object.values(statusesAfter)[1]; // Check that the first job's status has been updated - expect(updatedFirstJob.status).toBe("running"); + expect(updatedFirstJob.status).toBe(JobStatus.Finished); // Check that the second job's status remains unchanged expect(unchangedSecondJob.status).toBe(secondJob.status); }, diff --git a/packages/common/test/testUtils/scorekeeper.ts b/packages/common/test/testUtils/scorekeeper.ts index 094d99966..e1f3843e9 100644 --- a/packages/common/test/testUtils/scorekeeper.ts +++ b/packages/common/test/testUtils/scorekeeper.ts @@ -3,8 +3,9 @@ import { getKusamaProdConfig } from "./config"; import { ScoreKeeper } from "../../src"; import ApiHandler from "../../src/ApiHandler/ApiHandler"; import { KusamaEndpoints } from "../../src/constants"; +import { ConfigSchema } from "../../src/config"; -export const getAndStartScorekeeper = async () => { +export const getAndStartScorekeeper = async (defaultConfig?: ConfigSchema) => { const apiHandler = new ApiHandler(KusamaEndpoints); await apiHandler.setAPI(); await apiHandler.getApi()?.isReady; @@ -16,7 +17,7 @@ export const getAndStartScorekeeper = async () => { await addProdKusamaCandidates(); - const config = getKusamaProdConfig(); + const config = defaultConfig ? defaultConfig : getKusamaProdConfig(); const scorekeeper = new ScoreKeeper(apiHandler, config); for (const nominatorGroup of config.scorekeeper.nominators) { diff --git a/packages/scorekeeper-status-ui/src/App.tsx b/packages/scorekeeper-status-ui/src/App.tsx index 51366e574..40f90db79 100644 --- a/packages/scorekeeper-status-ui/src/App.tsx +++ b/packages/scorekeeper-status-ui/src/App.tsx @@ -24,11 +24,19 @@ import { Identicon } from "@polkadot/react-identicon"; import EraStatsBar from "./EraStatsBar"; import { debounce } from "lodash"; +export enum JobStatus { + Initialized = "Initialized", + Started = "Started", + Running = "Running", + Finished = "Finished", + Failed = "Failed", +} + interface Job { name: string; runCount: number; updated: number; - status: "running" | "finished" | "errored" | "started" | "Not Running"; + status: JobStatus; progress?: number; error?: string; iteration?: string; @@ -162,19 +170,19 @@ const App = () => { let iconComponent; // Initialize icon component switch (status) { - case "running": + case JobStatus.Running: iconComponent = ; statusText = "Running"; break; - case "started": + case JobStatus.Started: iconComponent = ; statusText = "Started"; break; - case "finished": + case JobStatus.Finished: iconComponent = ; statusText = "Finished"; break; - case "errored": + case JobStatus.Failed: iconComponent = ; statusText = "Errored"; break; @@ -344,7 +352,7 @@ const App = () => { {jobs.map((job: Job) => { const jobAgeInSeconds = (Date.now() - job.updated) / 1000; // Convert milliseconds to seconds // const isOld = jobAgeInSeconds > OLD_JOB_THRESHOLD_SECONDS; - const isError = job.status === "errored"; + const isError = job.status === JobStatus.Failed; return ( { initial={{ opacity: 0, x: -100 }} animate={{ opacity: 1, x: 0 }} transition={{ duration: 0.5 }} - className={`jobItem ${job.status === "errored" ? "jobItemError" : job.status === "running" && job.isOld ? "jobItemOld" : ""}`} + className={`jobItem ${job.status === JobStatus.Failed ? "jobItemError" : job.status === JobStatus.Running && job.isOld ? "jobItemOld" : ""}`} >
- {job.status === "errored" && ( + {job.status === JobStatus.Failed && ( )} - {job.status === "running" && job.isOld && ( + {job.status === JobStatus.Running && job.isOld && ( )}
From 20c2c73d4fe99203ce1289e27dfcaae2dd2ca788 Mon Sep 17 00:00:00 2001 From: Volodymyr Brazhnyk Date: Mon, 20 May 2024 12:18:31 +0200 Subject: [PATCH 5/5] Add private modifier --- packages/common/src/scorekeeper/jobs/Job.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/common/src/scorekeeper/jobs/Job.ts b/packages/common/src/scorekeeper/jobs/Job.ts index bad8eae64..40525f972 100644 --- a/packages/common/src/scorekeeper/jobs/Job.ts +++ b/packages/common/src/scorekeeper/jobs/Job.ts @@ -9,7 +9,7 @@ export class Job { private status: JobStatus = JobStatus.Initialized; private name: string; private jobKey: string; - jobFunction: () => Promise; + private jobFunction: () => Promise; private frequency: string; private preventOverlap: boolean; private jobRunCount = 0;