Skip to content

Commit

Permalink
Fixed: Queued jobs are not considered in deferring logic (#7907)
Browse files Browse the repository at this point in the history
  • Loading branch information
bsekachev authored Jun 5, 2024
1 parent 0800862 commit 41476eb
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 19 deletions.
4 changes: 4 additions & 0 deletions changelog.d/20240605_093600_boris_kill_worker.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
### Fixed

- Queued jobs are not considered in deferring logic
(<https://github.com/cvat-ai/cvat/pull/7907>)
31 changes: 12 additions & 19 deletions cvat/apps/engine/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,25 +165,18 @@ def define_dependent_job(
if not should_be_dependent:
return None

started_user_jobs = [
job
for job in queue.job_class.fetch_many(
queue.started_job_registry.get_job_ids(), queue.connection
)
if job and job.meta.get("user", {}).get("id") == user_id
]
deferred_user_jobs = [
job
for job in queue.job_class.fetch_many(
queue.deferred_job_registry.get_job_ids(), queue.connection
)
# Since there is no cleanup implementation in DeferredJobRegistry,
# this registry can contain "outdated" jobs that weren't deleted from it
# but were added to another registry. Probably such situations can occur
# if there are active or deferred jobs when restarting the worker container.
if job and job.meta.get("user", {}).get("id") == user_id and job.is_deferred
]
all_user_jobs = started_user_jobs + deferred_user_jobs
queues = [queue.deferred_job_registry, queue, queue.started_job_registry]
# Since there is no cleanup implementation in DeferredJobRegistry,
# this registry can contain "outdated" jobs that weren't deleted from it
# but were added to another registry. Probably such situations can occur
# if there are active or deferred jobs when restarting the worker container.
filters = [lambda job: job.is_deferred, lambda _: True, lambda _: True]
all_user_jobs = []
for q, f in zip(queues, filters):
job_ids = q.get_job_ids()
jobs = q.job_class.fetch_many(job_ids, q.connection)
jobs = filter(lambda job: job and job.meta.get("user", {}).get("id") == user_id and f(job), jobs)
all_user_jobs.extend(jobs)

# prevent possible cyclic dependencies
if rq_id:
Expand Down

0 comments on commit 41476eb

Please sign in to comment.