Skip to content

Commit

Permalink
feat(queue): add queue version support (#2822)
Browse files Browse the repository at this point in the history
  • Loading branch information
manast authored Oct 13, 2024
1 parent c8abdc7 commit 3a4781b
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 51 deletions.
14 changes: 14 additions & 0 deletions src/classes/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import { QueueGetters } from './queue-getters';
import { Repeat } from './repeat';
import { RedisConnection } from './redis-connection';
import { JobScheduler } from './job-scheduler';
import { readPackageJson } from '../utils';

export interface ObliterateOpts {
/**
Expand Down Expand Up @@ -168,11 +169,24 @@ export class Queue<
}

get metaValues(): Record<string, string | number> {
const { name, version } = readPackageJson();

return {
'opts.maxLenEvents': this.opts?.streams?.events?.maxLen ?? 10000,
version: `${name}:${version}`,
};
}

/**
* Get library version.
*
* @returns the content of the meta.library field.
*/
async getVersion(): Promise<string> {
const client = await this.client;
return await client.hget(this.keys.meta, 'version');
}

get repeat(): Promise<Repeat> {
return new Promise<Repeat>(async resolve => {
if (!this._repeat) {
Expand Down
101 changes: 50 additions & 51 deletions src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -398,11 +398,14 @@ export class Scripts {
return (<any>client).removeJobScheduler(keys.concat(args));
}

protected removeArgs(jobId: string, removeChildren: boolean): (string | number)[] {
protected removeArgs(
jobId: string,
removeChildren: boolean,
): (string | number)[] {
const keys: (string | number)[] = ['', 'meta'].map(name =>
this.queue.toKey(name),
);

const args = [jobId, removeChildren ? 1 : 0];

return keys.concat(args);
Expand All @@ -411,13 +414,9 @@ export class Scripts {
async remove(jobId: string, removeChildren: boolean): Promise<number> {
const client = await this.queue.client;

const args = this.removeArgs(
jobId, removeChildren
);
const args = this.removeArgs(jobId, removeChildren);

const result = await (<any>client).removeJob(
args,
);
const result = await (<any>client).removeJob(args);

if (result < 0) {
throw this.finishedErrors({
Expand Down Expand Up @@ -607,49 +606,6 @@ export class Scripts {
}
}

finishedErrors({
code,
jobId,
parentKey,
command,
state,
}: {
code: number;
jobId?: string;
parentKey?: string;
command: string;
state?: string;
}): Error {
switch (code) {
case ErrorCode.JobNotExist:
return new Error(`Missing key for job ${jobId}. ${command}`);
case ErrorCode.JobLockNotExist:
return new Error(`Missing lock for job ${jobId}. ${command}`);
case ErrorCode.JobNotInState:
return new Error(
`Job ${jobId} is not in the ${state} state. ${command}`,
);
case ErrorCode.JobPendingDependencies:
return new Error(`Job ${jobId} has pending dependencies. ${command}`);
case ErrorCode.ParentJobNotExist:
return new Error(`Missing key for parent job ${parentKey}. ${command}`);
case ErrorCode.JobLockMismatch:
return new Error(
`Lock mismatch for job ${jobId}. Cmd ${command} from ${state}`,
);
case ErrorCode.ParentJobCannotBeReplaced:
return new Error(
`The parent job ${parentKey} cannot be replaced. ${command}`,
);
case ErrorCode.JobBelongsToJobScheduler:
return new Error(
`Job ${jobId} belongs to a job scheduler and cannot be removed directly. ${command}`,
);
default:
return new Error(`Unknown code ${code} error for ${jobId}. ${command}`);
}
}

private drainArgs(delayed: boolean): (string | number)[] {
const queueKeys = this.queue.keys;

Expand Down Expand Up @@ -1463,6 +1419,49 @@ export class Scripts {
};
}
}

finishedErrors({
code,
jobId,
parentKey,
command,
state,
}: {
code: number;
jobId?: string;
parentKey?: string;
command: string;
state?: string;
}): Error {
switch (code) {
case ErrorCode.JobNotExist:
return new Error(`Missing key for job ${jobId}. ${command}`);
case ErrorCode.JobLockNotExist:
return new Error(`Missing lock for job ${jobId}. ${command}`);
case ErrorCode.JobNotInState:
return new Error(
`Job ${jobId} is not in the ${state} state. ${command}`,
);
case ErrorCode.JobPendingDependencies:
return new Error(`Job ${jobId} has pending dependencies. ${command}`);
case ErrorCode.ParentJobNotExist:
return new Error(`Missing key for parent job ${parentKey}. ${command}`);
case ErrorCode.JobLockMismatch:
return new Error(
`Lock mismatch for job ${jobId}. Cmd ${command} from ${state}`,
);
case ErrorCode.ParentJobCannotBeReplaced:
return new Error(
`The parent job ${parentKey} cannot be replaced. ${command}`,
);
case ErrorCode.JobBelongsToJobScheduler:
return new Error(
`Job ${jobId} belongs to a job scheduler and cannot be removed directly. ${command}`,
);
default:
return new Error(`Unknown code ${code} error for ${jobId}. ${command}`);
}
}
}

export function raw2NextJobData(raw: any[]) {
Expand Down
24 changes: 24 additions & 0 deletions src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ import { ChildMessage, RedisClient } from './interfaces';
import { EventEmitter } from 'events';
import * as semver from 'semver';

import { join } from 'path';
import { readFileSync } from 'fs';

export const errorObject: { [index: string]: any } = { value: null };

export function tryCatch(
Expand Down Expand Up @@ -261,3 +264,24 @@ export const toString = (value: any): string => {
};

export const QUEUE_EVENT_SUFFIX = ':qe';

export const readPackageJson: () => { name: string; version: string } = () => {
const packageJsonPossiblePaths = [
join(__dirname, '../package.json'),
join(__dirname, '../../package.json'),
join(__dirname, '../../../package.json'),
];

for (const path of packageJsonPossiblePaths) {
try {
return JSON.parse(readFileSync(path, 'utf-8'));
} catch (err) {
if ((<any>err).code === 'ENOENT') {
continue;
}
console.log(err);
}
}

return { name: 'bullmq', version: '0.0.0' };
};
8 changes: 8 additions & 0 deletions tests/test_queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,14 @@ describe('queues', function () {
await connection.quit();
});

it('should return the queue version', async () => {
const queue = new Queue(queueName, { connection });
const version = await queue.getVersion();
const { version: pkgJsonVersion, name } = require('../package.json');
expect(version).to.be.equal(`${name}:${pkgJsonVersion}`);
return queue.close();
});

describe('.add', () => {
describe('when jobId is provided as integer', () => {
it('throws error', async function () {
Expand Down

0 comments on commit 3a4781b

Please sign in to comment.