From f4d03d49713cf89260c141ee35b4dadb31ad4193 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Wed, 20 Mar 2024 16:42:38 -0400 Subject: [PATCH] Minimize scope of expensive lock (#30679) * Minimize scope of expensive lock * Build list in lock * Update sdks/python/apache_beam/runners/worker/sdk_worker.py Co-authored-by: tvalentyn * Move comment * Simplify * fmt --------- Co-authored-by: tvalentyn --- .../apache_beam/runners/worker/sdk_worker.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py index 04b58a6f1987..65059ab054f9 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py @@ -607,12 +607,18 @@ def _schedule_periodic_shutdown(self): # type: () -> None def shutdown_inactive_bundle_processors(): # type: () -> None + inactive_descriptor_ids = [] + inactive_time = time.time( + ) - DEFAULT_BUNDLE_PROCESSOR_CACHE_SHUTDOWN_THRESHOLD_S with self._lock: for descriptor_id, last_access_time in self.last_access_times.items(): - if (time.time() - last_access_time > - DEFAULT_BUNDLE_PROCESSOR_CACHE_SHUTDOWN_THRESHOLD_S): - BundleProcessorCache._shutdown_cached_bundle_processors( - self.cached_bundle_processors[descriptor_id]) + if (inactive_time > last_access_time): + inactive_descriptor_ids.append(descriptor_id) + + # Shutdown can be expensive, keep out of lock + for descriptor_id in inactive_descriptor_ids: + BundleProcessorCache._shutdown_cached_bundle_processors( + self.cached_bundle_processors[descriptor_id]) self.periodic_shutdown = PeriodicThread( DEFAULT_BUNDLE_PROCESSOR_CACHE_SHUTDOWN_THRESHOLD_S,