From eda0b519d49c24cbb148eebbf34494dba1adaaa4 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Tue, 19 Mar 2024 14:25:14 -0400 Subject: [PATCH 1/6] Minimize scope of expensive lock --- .../apache_beam/runners/worker/sdk_worker.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py index 04b58a6f1987..8bb367d89e4d 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py @@ -607,12 +607,16 @@ def _schedule_periodic_shutdown(self): # type: () -> None def shutdown_inactive_bundle_processors(): # type: () -> None + access_times = [] with self._lock: - for descriptor_id, last_access_time in self.last_access_times.items(): - if (time.time() - last_access_time > - DEFAULT_BUNDLE_PROCESSOR_CACHE_SHUTDOWN_THRESHOLD_S): - BundleProcessorCache._shutdown_cached_bundle_processors( - self.cached_bundle_processors[descriptor_id]) + # Shutdown can be expensive, keep out of lock + access_times = list(self.last_access_times.items()) + + for descriptor_id, last_access_time in access_times: + if (time.time() - last_access_time > + DEFAULT_BUNDLE_PROCESSOR_CACHE_SHUTDOWN_THRESHOLD_S): + BundleProcessorCache._shutdown_cached_bundle_processors( + self.cached_bundle_processors[descriptor_id]) self.periodic_shutdown = PeriodicThread( DEFAULT_BUNDLE_PROCESSOR_CACHE_SHUTDOWN_THRESHOLD_S, From 434a3334c3be2f0a04341f273d0ab918de395fd1 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Wed, 20 Mar 2024 09:52:00 -0400 Subject: [PATCH 2/6] Build list in lock --- .../apache_beam/runners/worker/sdk_worker.py | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py index 8bb367d89e4d..15b71b9e8d50 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py @@ -607,16 +607,18 @@ def _schedule_periodic_shutdown(self): # type: () -> None def shutdown_inactive_bundle_processors(): # type: () -> None - access_times = [] + inactive_descriptor_ids = [] + cur_time = time.time() with self._lock: # Shutdown can be expensive, keep out of lock - access_times = list(self.last_access_times.items()) - - for descriptor_id, last_access_time in access_times: - if (time.time() - last_access_time > - DEFAULT_BUNDLE_PROCESSOR_CACHE_SHUTDOWN_THRESHOLD_S): - BundleProcessorCache._shutdown_cached_bundle_processors( - self.cached_bundle_processors[descriptor_id]) + for descriptor_id, last_access_time in access_times: + if (cur_time - last_access_time > + DEFAULT_BUNDLE_PROCESSOR_CACHE_SHUTDOWN_THRESHOLD_S): + inactive_descriptor_ids.append(descriptor_id) + + for descriptor_id in inactive_descriptor_ids: + BundleProcessorCache._shutdown_cached_bundle_processors( + self.cached_bundle_processors[descriptor_id]) self.periodic_shutdown = PeriodicThread( DEFAULT_BUNDLE_PROCESSOR_CACHE_SHUTDOWN_THRESHOLD_S, From cccaeee89195508d1d93adac02cd895141bdef9a Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Wed, 20 Mar 2024 11:54:10 -0400 Subject: [PATCH 3/6] Update sdks/python/apache_beam/runners/worker/sdk_worker.py Co-authored-by: tvalentyn --- sdks/python/apache_beam/runners/worker/sdk_worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py index 15b71b9e8d50..ac46ec89d8c4 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py @@ -611,7 +611,7 @@ def shutdown_inactive_bundle_processors(): cur_time = time.time() with self._lock: # Shutdown can be expensive, keep out of lock - for descriptor_id, last_access_time in access_times: + for descriptor_id, last_access_time in self.last_access_times.items(): if (cur_time - last_access_time > DEFAULT_BUNDLE_PROCESSOR_CACHE_SHUTDOWN_THRESHOLD_S): inactive_descriptor_ids.append(descriptor_id) From b5a45c1c632f7d2f3d225927ec9cd07fc5efae8d Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Wed, 20 Mar 2024 11:54:33 -0400 Subject: [PATCH 4/6] Move comment --- sdks/python/apache_beam/runners/worker/sdk_worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py index ac46ec89d8c4..b3dc4bf4b773 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py @@ -610,12 +610,12 @@ def shutdown_inactive_bundle_processors(): inactive_descriptor_ids = [] cur_time = time.time() with self._lock: - # Shutdown can be expensive, keep out of lock for descriptor_id, last_access_time in self.last_access_times.items(): if (cur_time - last_access_time > DEFAULT_BUNDLE_PROCESSOR_CACHE_SHUTDOWN_THRESHOLD_S): inactive_descriptor_ids.append(descriptor_id) + # Shutdown can be expensive, keep out of lock for descriptor_id in inactive_descriptor_ids: BundleProcessorCache._shutdown_cached_bundle_processors( self.cached_bundle_processors[descriptor_id]) From d927d984843ca5474c68ecaad743c3f16ee066ae Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Wed, 20 Mar 2024 11:57:21 -0400 Subject: [PATCH 5/6] Simplify --- sdks/python/apache_beam/runners/worker/sdk_worker.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py index b3dc4bf4b773..0236619b908a 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py @@ -608,11 +608,10 @@ def _schedule_periodic_shutdown(self): def shutdown_inactive_bundle_processors(): # type: () -> None inactive_descriptor_ids = [] - cur_time = time.time() + inactive_time = time.time() - DEFAULT_BUNDLE_PROCESSOR_CACHE_SHUTDOWN_THRESHOLD_S with self._lock: for descriptor_id, last_access_time in self.last_access_times.items(): - if (cur_time - last_access_time > - DEFAULT_BUNDLE_PROCESSOR_CACHE_SHUTDOWN_THRESHOLD_S): + if (inactive_time > last_access_time): inactive_descriptor_ids.append(descriptor_id) # Shutdown can be expensive, keep out of lock From afe5bc1ebd7611ff75f599672e054e1a1c689b79 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Wed, 20 Mar 2024 13:30:20 -0400 Subject: [PATCH 6/6] fmt --- sdks/python/apache_beam/runners/worker/sdk_worker.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py index 0236619b908a..65059ab054f9 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py @@ -608,10 +608,11 @@ def _schedule_periodic_shutdown(self): def shutdown_inactive_bundle_processors(): # type: () -> None inactive_descriptor_ids = [] - inactive_time = time.time() - DEFAULT_BUNDLE_PROCESSOR_CACHE_SHUTDOWN_THRESHOLD_S + inactive_time = time.time( + ) - DEFAULT_BUNDLE_PROCESSOR_CACHE_SHUTDOWN_THRESHOLD_S with self._lock: for descriptor_id, last_access_time in self.last_access_times.items(): - if (inactive_time > last_access_time): + if (inactive_time > last_access_time): inactive_descriptor_ids.append(descriptor_id) # Shutdown can be expensive, keep out of lock