From 9ef295852bfc63081cf3bd647682aed1f9762f11 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=A9my=20Rakic?= Date: Mon, 29 Aug 2022 16:24:17 +0200 Subject: [PATCH 1/2] sort the pending queue according to cost/priority If multiple pieces of work are waiting in the pending queue, we can sort it according to their priorities: higher priorities should be scheduled sooner. They are more often than not wider than pure chains, and this should create more parallelism opportunities earlier in the pipeline: a high priority piece of work represents more future pieces of work down the line. This is a scheduling tradeoff that behaves differently for each project, machine configuration, amount of available parallelism at a given point in time, etc, but seems to help more often than hinders, at low-core counts and with enough units of work to be done, so that there is jobserver token contention where choosing a "better" piece of work to work on next is possible. --- src/cargo/core/compiler/job_queue.rs | 10 +++++++++- src/cargo/util/dependency_queue.rs | 7 +++++++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/src/cargo/core/compiler/job_queue.rs b/src/cargo/core/compiler/job_queue.rs index 358b6258737..c3d6c13928d 100644 --- a/src/cargo/core/compiler/job_queue.rs +++ b/src/cargo/core/compiler/job_queue.rs @@ -583,11 +583,19 @@ impl<'cfg> DrainState<'cfg> { } } + // If multiple pieces of work are waiting in the pending queue, we can + // sort it according to their priorities: higher priorities should be + // scheduled sooner. + self.pending_queue + .sort_by_cached_key(|(unit, _)| self.queue.priority(unit)); + // Now that we've learned of all possible work that we can execute // try to spawn it so long as we've got a jobserver token which says // we're able to perform some parallel work. + // The `pending_queue` is sorted in ascending priority order, and we're + // removing the highest priority items from its end. while self.has_extra_tokens() && !self.pending_queue.is_empty() { - let (unit, job) = self.pending_queue.remove(0); + let (unit, job) = self.pending_queue.pop().unwrap(); *self.counts.get_mut(&unit.pkg.package_id()).unwrap() -= 1; if !cx.bcx.build_config.build_plan { // Print out some nice progress information. diff --git a/src/cargo/util/dependency_queue.rs b/src/cargo/util/dependency_queue.rs index 5a3289eb7ee..fae5c22f3b0 100644 --- a/src/cargo/util/dependency_queue.rs +++ b/src/cargo/util/dependency_queue.rs @@ -170,6 +170,13 @@ impl DependencyQueue { self.dep_map.len() } + /// Returns the relative priority of a node. Higher priorities should be scheduled sooner. + /// Currently computed as the transitive cost of the given node: its own, plus the cost of its + /// reverse dependencies. + pub(crate) fn priority(&self, node: &N) -> usize { + self.priority[node] + } + /// Indicate that something has finished. /// /// Calling this function indicates that the `node` has produced `edge`. All From 4ffe98ae3aa86783b909bc1697f1fbd9050c00ff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=A9my=20Rakic?= Date: Fri, 2 Sep 2022 17:55:42 +0200 Subject: [PATCH 2/2] store jobs priorities in the pending queue This cleans up the priority-sorted scheduling by removing the need for a priority accessor that would hash the nodes, and allows inserting in the queue at the correctly sorted position to remove the insert + sort combination. --- src/cargo/core/compiler/job_queue.rs | 27 +++++++++++---------- src/cargo/util/dependency_queue.rs | 35 +++++++++++----------------- 2 files changed, 29 insertions(+), 33 deletions(-) diff --git a/src/cargo/core/compiler/job_queue.rs b/src/cargo/core/compiler/job_queue.rs index c3d6c13928d..b568a5ee2fc 100644 --- a/src/cargo/core/compiler/job_queue.rs +++ b/src/cargo/core/compiler/job_queue.rs @@ -162,7 +162,7 @@ struct DrainState<'cfg> { /// The list of jobs that we have not yet started executing, but have /// retrieved from the `queue`. We eagerly pull jobs off the main queue to /// allow us to request jobserver tokens pretty early. - pending_queue: Vec<(Unit, Job)>, + pending_queue: Vec<(Unit, Job, usize)>, print: DiagnosticPrinter<'cfg>, /// How many jobs we've finished @@ -576,26 +576,29 @@ impl<'cfg> DrainState<'cfg> { // possible that can run. Note that this is also the point where we // start requesting job tokens. Each job after the first needs to // request a token. - while let Some((unit, job)) = self.queue.dequeue() { - self.pending_queue.push((unit, job)); + while let Some((unit, job, priority)) = self.queue.dequeue() { + // We want to keep the pieces of work in the `pending_queue` sorted + // by their priorities, and insert the current job at its correctly + // sorted position: following the lower priority jobs, and the ones + // with the same priority (since they were dequeued before the + // current one, we also keep that relation). + let idx = self + .pending_queue + .partition_point(|&(_, _, p)| p <= priority); + self.pending_queue.insert(idx, (unit, job, priority)); if self.active.len() + self.pending_queue.len() > 1 { jobserver_helper.request_token(); } } - // If multiple pieces of work are waiting in the pending queue, we can - // sort it according to their priorities: higher priorities should be - // scheduled sooner. - self.pending_queue - .sort_by_cached_key(|(unit, _)| self.queue.priority(unit)); - // Now that we've learned of all possible work that we can execute // try to spawn it so long as we've got a jobserver token which says // we're able to perform some parallel work. - // The `pending_queue` is sorted in ascending priority order, and we're - // removing the highest priority items from its end. + // The `pending_queue` is sorted in ascending priority order, and we + // remove items from its end to schedule the highest priority items + // sooner. while self.has_extra_tokens() && !self.pending_queue.is_empty() { - let (unit, job) = self.pending_queue.pop().unwrap(); + let (unit, job, _) = self.pending_queue.pop().unwrap(); *self.counts.get_mut(&unit.pkg.package_id()).unwrap() -= 1; if !cx.bcx.build_config.build_plan { // Print out some nice progress information. diff --git a/src/cargo/util/dependency_queue.rs b/src/cargo/util/dependency_queue.rs index fae5c22f3b0..33e8bf28e84 100644 --- a/src/cargo/util/dependency_queue.rs +++ b/src/cargo/util/dependency_queue.rs @@ -149,15 +149,15 @@ impl DependencyQueue { /// /// A package is ready to be built when it has 0 un-built dependencies. If /// `None` is returned then no packages are ready to be built. - pub fn dequeue(&mut self) -> Option<(N, V)> { - let key = self + pub fn dequeue(&mut self) -> Option<(N, V, usize)> { + let (key, priority) = self .dep_map .iter() .filter(|(_, (deps, _))| deps.is_empty()) - .map(|(key, _)| key.clone()) - .max_by_key(|k| self.priority[k])?; + .map(|(key, _)| (key.clone(), self.priority[key])) + .max_by_key(|(_, priority)| *priority)?; let (_, data) = self.dep_map.remove(&key).unwrap(); - Some((key, data)) + Some((key, data, priority)) } /// Returns `true` if there are remaining packages to be built. @@ -170,13 +170,6 @@ impl DependencyQueue { self.dep_map.len() } - /// Returns the relative priority of a node. Higher priorities should be scheduled sooner. - /// Currently computed as the transitive cost of the given node: its own, plus the cost of its - /// reverse dependencies. - pub(crate) fn priority(&self, node: &N) -> usize { - self.priority[node] - } - /// Indicate that something has finished. /// /// Calling this function indicates that the `node` has produced `edge`. All @@ -220,19 +213,19 @@ mod test { q.queue(5, (), vec![(4, ()), (3, ())], 1); q.queue_finished(); - assert_eq!(q.dequeue(), Some((1, ()))); - assert_eq!(q.dequeue(), Some((3, ()))); + assert_eq!(q.dequeue(), Some((1, (), 5))); + assert_eq!(q.dequeue(), Some((3, (), 4))); assert_eq!(q.dequeue(), None); q.finish(&3, &()); assert_eq!(q.dequeue(), None); q.finish(&1, &()); - assert_eq!(q.dequeue(), Some((2, ()))); + assert_eq!(q.dequeue(), Some((2, (), 4))); assert_eq!(q.dequeue(), None); q.finish(&2, &()); - assert_eq!(q.dequeue(), Some((4, ()))); + assert_eq!(q.dequeue(), Some((4, (), 3))); assert_eq!(q.dequeue(), None); q.finish(&4, &()); - assert_eq!(q.dequeue(), Some((5, ()))); + assert_eq!(q.dequeue(), Some((5, (), 2))); } #[test] @@ -245,16 +238,16 @@ mod test { q.queue(4, (), vec![(2, ()), (3, ())], 1); q.queue_finished(); - assert_eq!(q.dequeue(), Some((3, ()))); - assert_eq!(q.dequeue(), Some((1, ()))); + assert_eq!(q.dequeue(), Some((3, (), 9))); + assert_eq!(q.dequeue(), Some((1, (), 4))); assert_eq!(q.dequeue(), None); q.finish(&3, &()); assert_eq!(q.dequeue(), None); q.finish(&1, &()); - assert_eq!(q.dequeue(), Some((2, ()))); + assert_eq!(q.dequeue(), Some((2, (), 3))); assert_eq!(q.dequeue(), None); q.finish(&2, &()); - assert_eq!(q.dequeue(), Some((4, ()))); + assert_eq!(q.dequeue(), Some((4, (), 2))); assert_eq!(q.dequeue(), None); q.finish(&4, &()); assert_eq!(q.dequeue(), None);