Skip to content

Commit

Permalink
Minimize scope of expensive lock (#30679)
Browse files Browse the repository at this point in the history
* Minimize scope of expensive lock

* Build list in lock

* Update sdks/python/apache_beam/runners/worker/sdk_worker.py

Co-authored-by: tvalentyn <tvalentyn@users.noreply.github.com>

* Move comment

* Simplify

* fmt

---------

Co-authored-by: tvalentyn <tvalentyn@users.noreply.github.com>
  • Loading branch information
damccorm and tvalentyn authored Mar 20, 2024
1 parent a9ce770 commit f4d03d4
Showing 1 changed file with 10 additions and 4 deletions.
14 changes: 10 additions & 4 deletions sdks/python/apache_beam/runners/worker/sdk_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -607,12 +607,18 @@ def _schedule_periodic_shutdown(self):
# type: () -> None
def shutdown_inactive_bundle_processors():
# type: () -> None
inactive_descriptor_ids = []
inactive_time = time.time(
) - DEFAULT_BUNDLE_PROCESSOR_CACHE_SHUTDOWN_THRESHOLD_S
with self._lock:
for descriptor_id, last_access_time in self.last_access_times.items():
if (time.time() - last_access_time >
DEFAULT_BUNDLE_PROCESSOR_CACHE_SHUTDOWN_THRESHOLD_S):
BundleProcessorCache._shutdown_cached_bundle_processors(
self.cached_bundle_processors[descriptor_id])
if (inactive_time > last_access_time):
inactive_descriptor_ids.append(descriptor_id)

# Shutdown can be expensive, keep out of lock
for descriptor_id in inactive_descriptor_ids:
BundleProcessorCache._shutdown_cached_bundle_processors(
self.cached_bundle_processors[descriptor_id])

self.periodic_shutdown = PeriodicThread(
DEFAULT_BUNDLE_PROCESSOR_CACHE_SHUTDOWN_THRESHOLD_S,
Expand Down

0 comments on commit f4d03d4

Please sign in to comment.