Skip to content

Commit

Permalink
fix: worker
Browse files Browse the repository at this point in the history
  • Loading branch information
Jamesb committed Feb 28, 2024
1 parent d603df4 commit 39be70c
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 34 deletions.
67 changes: 36 additions & 31 deletions packages/client/src/components/AdminPage.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -42,37 +42,42 @@ export const AdminPage = () => {
return <div>Loading...</div>;
}
return (
<TableContainer component={Paper}>
<Table aria-label="pipelines table">
<TableHead>
<TableRow>
<TableCell>
<span className="font-semibold">Username</span>
</TableCell>
<TableCell>
<span className="font-semibold">Created At</span>
</TableCell>
<TableCell>
<span className="font-semibold">Tasks</span>
</TableCell>
<TableCell>
<span className="font-semibold">Actions</span>
</TableCell>
</TableRow>
</TableHead>
<TableBody>
{sortBy(pipelines, (x) => dayjs(x.createdAt).valueOf()).map(
(pipeline) => (
<PipelineRow
key={pipeline.id}
pipeline={pipeline}
setForce={setForce}
/>
)
)}
</TableBody>
</Table>
</TableContainer>
<>
<Button onClick={() => trpcAdmin.unlockWorkers.mutate()}>
Force unlock workers
</Button>
<TableContainer component={Paper}>
<Table aria-label="pipelines table">
<TableHead>
<TableRow>
<TableCell>
<span className="font-semibold">Username</span>
</TableCell>
<TableCell>
<span className="font-semibold">Created At</span>
</TableCell>
<TableCell>
<span className="font-semibold">Tasks</span>
</TableCell>
<TableCell>
<span className="font-semibold">Actions</span>
</TableCell>
</TableRow>
</TableHead>
<TableBody>
{sortBy(pipelines, (x) => dayjs(x.createdAt).valueOf()).map(
(pipeline) => (
<PipelineRow
key={pipeline.id}
pipeline={pipeline}
setForce={setForce}
/>
)
)}
</TableBody>
</Table>
</TableContainer>
</>
);
};

Expand Down
7 changes: 6 additions & 1 deletion packages/server/src/lib/getNumRunningPipelines.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { UserModel } from "shared/src/schemas";
import { z } from "zod";
import { prisma } from "../db";
import { sortBy } from "remeda";

export async function getNumRunningPipelines(
authenticatedUser: z.infer<typeof UserModel>
Expand All @@ -20,7 +21,11 @@ export async function getNumRunningPipelines(
)
// TODO: check
// slice(1) because the first task is the pipeline itself, doesn't get updated properly
.filter((p) => p.tasks.slice(1).some((t) => t.status === "running"));
.filter((p) =>
sortBy(p.tasks, (x) => x.createdAt.valueOf())
.slice(1)
.some((t) => t.status === "running")
);

return pipelines.length;
}
8 changes: 8 additions & 0 deletions packages/server/src/lib/jobsPrisma.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,18 @@ interface Job {
created_at: Date;
updated_at: Date;
key: string | null;
locked_by: string | null;
}

export async function getGraphileJobs(): Promise<any[]> {
return await prisma.$queryRaw<any[]>(
Prisma.sql`SELECT * FROM graphile_worker.jobs`
);
}

export async function getRunningWorkerIds(): Promise<string[]> {
// unique list of locked_by ids
return await prisma.$queryRaw<string[]>(
Prisma.sql`SELECT DISTINCT locked_by FROM graphile_worker.jobs`
);
}
8 changes: 8 additions & 0 deletions packages/server/src/routers/adminRouter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
getGraphileJobs,
getPipelineJobByKey,
getPipelineTaskJobById,
getRunningWorkerIds,
} from "../lib/jobsPrisma";
import { indexBy, omit } from "remeda";

Expand Down Expand Up @@ -206,4 +207,11 @@ export const adminRouter = router({
},
});
}),

unlockWorkers: publicProcedure.mutation(async () => {
const utils = await workerUtils();
const workerIds = await getRunningWorkerIds();
await utils.forceUnlockWorkers(workerIds);
console.log("attempted to unlock workers");
}),
});
19 changes: 17 additions & 2 deletions packages/server/src/tasks/worker.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
import { Logger, TaskList, parseCronItem, run } from "graphile-worker";
import {
JobHelpers,
Logger,
TaskList,
parseCronItem,
run,
} from "graphile-worker";
import dotenv from "dotenv";
import { workerUtils } from "./workerUtils";
import { prisma } from "../db";
Expand All @@ -18,7 +24,7 @@ dotenv.config();
// if they have < max pipelines, add a pipeline to get recommendations
export const addPipelineCronJob = {
id: "recurring-add-pipeline-cron",
handler: async () => {
handler: async (_: any, helpers: JobHelpers) => {
const users = shuffle(await prisma.user.findMany());
for (const user of users) {
const numActivePipelines = await getNumRunningPipelines(user);
Expand All @@ -31,6 +37,14 @@ export const addPipelineCronJob = {
runId: uuidv4(),
emailResults: true,
});
} else {
helpers.logger.info(
"Skipping user " +
user.username +
" as they have " +
numActivePipelines +
" active pipelines"
);
}
}
},
Expand Down Expand Up @@ -100,6 +114,7 @@ export async function startWorker() {
taskList: taskList as TaskList,
connectionString: process.env.DATABASE_URL,
});

const getTaskOrPipeline = async (jobOrKeyId: string) => {
const pipeline = await prisma.pipelineRun.findFirst({
where: { jobKeyId: jobOrKeyId },
Expand Down

0 comments on commit 39be70c

Please sign in to comment.