Skip to content

Commit

Permalink
fix(scs-job): revert to lodash.Pick for _filterQueuedJob
Browse files Browse the repository at this point in the history
  • Loading branch information
Gerald Baulig committed Oct 11, 2024
1 parent ac592d6 commit bde6d4a
Showing 1 changed file with 12 additions and 11 deletions.
23 changes: 12 additions & 11 deletions packages/scs-jobs/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ export const _filterJobData = (data: Data, encode: boolean, logger: Logger): Pic

if (encode) {
if (picked?.payload?.value && typeof picked.payload.value === 'string') {
(picked as any).payload = marshallProtobufAny(unmarshallProtobufAny(picked.payload, logger));
picked.payload = marshallProtobufAny(unmarshallProtobufAny(picked.payload, logger));
}
}

Expand All @@ -44,15 +44,17 @@ export const _filterJobData = (data: Data, encode: boolean, logger: Logger): Pic
picked.meta.modified = new Date(picked.meta.modified);
}

return picked as any;
return picked;
};


export const _filterQueuedJob = <T extends FilterOpts>(job: T, logger: Logger): T => {
const picked: T = {
...job,
type: job.name,
};
export const _filterQueuedJob = <T extends FilterOpts>(job: T, logger: Logger): Pick<T, 'id' | 'type' | 'data' | 'opts' | 'name'> => {
if (job && !job.type) {
(job as any).type = (job as any).name;
}
const picked: any = _.pick(job, [
'id', 'type', 'data', 'opts', 'name'
]);

if (picked?.data) {
picked.data = _filterJobData(picked.data, false, logger);
Expand All @@ -64,7 +66,6 @@ export const _filterQueuedJob = <T extends FilterOpts>(job: T, logger: Logger):
return picked;
};


export const runWorker = async (
queue: string,
concurrency: number,
Expand Down Expand Up @@ -93,13 +94,13 @@ export const runWorker = async (

logger.info(`Registering worker for queue ${queue}`);
const worker = new Worker(queue, async job => {
const filteredJob = _filterQueuedJob<JobType>(job as any, logger);
const filteredJob = _filterQueuedJob(job, logger);
// For recurring job add time so if service goes down we can fire jobs
// for the missed schedules comparing the last run time
let lastRunTime;
if (filteredJob?.opts?.repeat &&
((filteredJob.opts.repeat as any)?.every ||
(filteredJob.opts.repeat as any)?.pattern)) {
(filteredJob.opts.repeat?.every ||
filteredJob.opts.repeat?.pattern)) {
if (filteredJob?.data) {
// adding time to payload data for recurring jobs
const dateTime = new Date();
Expand Down

0 comments on commit bde6d4a

Please sign in to comment.