From 3a4781bf7cadf04f6a324871654eed8f01cdadae Mon Sep 17 00:00:00 2001 From: Manuel Astudillo Date: Sun, 13 Oct 2024 11:54:01 +0200 Subject: [PATCH] feat(queue): add queue version support (#2822) --- src/classes/queue.ts | 14 ++++++ src/classes/scripts.ts | 101 ++++++++++++++++++++--------------------- src/utils.ts | 24 ++++++++++ tests/test_queue.ts | 8 ++++ 4 files changed, 96 insertions(+), 51 deletions(-) diff --git a/src/classes/queue.ts b/src/classes/queue.ts index e1dde0044f..76b68a1303 100644 --- a/src/classes/queue.ts +++ b/src/classes/queue.ts @@ -13,6 +13,7 @@ import { QueueGetters } from './queue-getters'; import { Repeat } from './repeat'; import { RedisConnection } from './redis-connection'; import { JobScheduler } from './job-scheduler'; +import { readPackageJson } from '../utils'; export interface ObliterateOpts { /** @@ -168,11 +169,24 @@ export class Queue< } get metaValues(): Record { + const { name, version } = readPackageJson(); + return { 'opts.maxLenEvents': this.opts?.streams?.events?.maxLen ?? 10000, + version: `${name}:${version}`, }; } + /** + * Get library version. + * + * @returns the content of the meta.library field. + */ + async getVersion(): Promise { + const client = await this.client; + return await client.hget(this.keys.meta, 'version'); + } + get repeat(): Promise { return new Promise(async resolve => { if (!this._repeat) { diff --git a/src/classes/scripts.ts b/src/classes/scripts.ts index 285ec36efa..cd0dcdfdaa 100644 --- a/src/classes/scripts.ts +++ b/src/classes/scripts.ts @@ -398,11 +398,14 @@ export class Scripts { return (client).removeJobScheduler(keys.concat(args)); } - protected removeArgs(jobId: string, removeChildren: boolean): (string | number)[] { + protected removeArgs( + jobId: string, + removeChildren: boolean, + ): (string | number)[] { const keys: (string | number)[] = ['', 'meta'].map(name => this.queue.toKey(name), ); - + const args = [jobId, removeChildren ? 1 : 0]; return keys.concat(args); @@ -411,13 +414,9 @@ export class Scripts { async remove(jobId: string, removeChildren: boolean): Promise { const client = await this.queue.client; - const args = this.removeArgs( - jobId, removeChildren - ); + const args = this.removeArgs(jobId, removeChildren); - const result = await (client).removeJob( - args, - ); + const result = await (client).removeJob(args); if (result < 0) { throw this.finishedErrors({ @@ -607,49 +606,6 @@ export class Scripts { } } - finishedErrors({ - code, - jobId, - parentKey, - command, - state, - }: { - code: number; - jobId?: string; - parentKey?: string; - command: string; - state?: string; - }): Error { - switch (code) { - case ErrorCode.JobNotExist: - return new Error(`Missing key for job ${jobId}. ${command}`); - case ErrorCode.JobLockNotExist: - return new Error(`Missing lock for job ${jobId}. ${command}`); - case ErrorCode.JobNotInState: - return new Error( - `Job ${jobId} is not in the ${state} state. ${command}`, - ); - case ErrorCode.JobPendingDependencies: - return new Error(`Job ${jobId} has pending dependencies. ${command}`); - case ErrorCode.ParentJobNotExist: - return new Error(`Missing key for parent job ${parentKey}. ${command}`); - case ErrorCode.JobLockMismatch: - return new Error( - `Lock mismatch for job ${jobId}. Cmd ${command} from ${state}`, - ); - case ErrorCode.ParentJobCannotBeReplaced: - return new Error( - `The parent job ${parentKey} cannot be replaced. ${command}`, - ); - case ErrorCode.JobBelongsToJobScheduler: - return new Error( - `Job ${jobId} belongs to a job scheduler and cannot be removed directly. ${command}`, - ); - default: - return new Error(`Unknown code ${code} error for ${jobId}. ${command}`); - } - } - private drainArgs(delayed: boolean): (string | number)[] { const queueKeys = this.queue.keys; @@ -1463,6 +1419,49 @@ export class Scripts { }; } } + + finishedErrors({ + code, + jobId, + parentKey, + command, + state, + }: { + code: number; + jobId?: string; + parentKey?: string; + command: string; + state?: string; + }): Error { + switch (code) { + case ErrorCode.JobNotExist: + return new Error(`Missing key for job ${jobId}. ${command}`); + case ErrorCode.JobLockNotExist: + return new Error(`Missing lock for job ${jobId}. ${command}`); + case ErrorCode.JobNotInState: + return new Error( + `Job ${jobId} is not in the ${state} state. ${command}`, + ); + case ErrorCode.JobPendingDependencies: + return new Error(`Job ${jobId} has pending dependencies. ${command}`); + case ErrorCode.ParentJobNotExist: + return new Error(`Missing key for parent job ${parentKey}. ${command}`); + case ErrorCode.JobLockMismatch: + return new Error( + `Lock mismatch for job ${jobId}. Cmd ${command} from ${state}`, + ); + case ErrorCode.ParentJobCannotBeReplaced: + return new Error( + `The parent job ${parentKey} cannot be replaced. ${command}`, + ); + case ErrorCode.JobBelongsToJobScheduler: + return new Error( + `Job ${jobId} belongs to a job scheduler and cannot be removed directly. ${command}`, + ); + default: + return new Error(`Unknown code ${code} error for ${jobId}. ${command}`); + } + } } export function raw2NextJobData(raw: any[]) { diff --git a/src/utils.ts b/src/utils.ts index d6d5bb37d2..7870be2b77 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -10,6 +10,9 @@ import { ChildMessage, RedisClient } from './interfaces'; import { EventEmitter } from 'events'; import * as semver from 'semver'; +import { join } from 'path'; +import { readFileSync } from 'fs'; + export const errorObject: { [index: string]: any } = { value: null }; export function tryCatch( @@ -261,3 +264,24 @@ export const toString = (value: any): string => { }; export const QUEUE_EVENT_SUFFIX = ':qe'; + +export const readPackageJson: () => { name: string; version: string } = () => { + const packageJsonPossiblePaths = [ + join(__dirname, '../package.json'), + join(__dirname, '../../package.json'), + join(__dirname, '../../../package.json'), + ]; + + for (const path of packageJsonPossiblePaths) { + try { + return JSON.parse(readFileSync(path, 'utf-8')); + } catch (err) { + if ((err).code === 'ENOENT') { + continue; + } + console.log(err); + } + } + + return { name: 'bullmq', version: '0.0.0' }; +}; diff --git a/tests/test_queue.ts b/tests/test_queue.ts index 75cff55d5d..435ca89ced 100644 --- a/tests/test_queue.ts +++ b/tests/test_queue.ts @@ -36,6 +36,14 @@ describe('queues', function () { await connection.quit(); }); + it('should return the queue version', async () => { + const queue = new Queue(queueName, { connection }); + const version = await queue.getVersion(); + const { version: pkgJsonVersion, name } = require('../package.json'); + expect(version).to.be.equal(`${name}:${pkgJsonVersion}`); + return queue.close(); + }); + describe('.add', () => { describe('when jobId is provided as integer', () => { it('throws error', async function () {