Skip to content

Commit

Permalink
refactor WaitedObject 💰(#6)
Browse files Browse the repository at this point in the history
  • Loading branch information
aniketmaurya authored Dec 20, 2023
1 parent 42d602e commit 2ee1deb
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 13 deletions.
18 changes: 6 additions & 12 deletions src/fastserve/batching.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,6 @@ def set_result(self, result) -> None:
def completed(self) -> bool:
return self._event.is_set()

@property
def result(self):
if isinstance(self._result, Exception):
raise self._result
return self._result

@property
def completion_time(self) -> str:
if self.completed_at:
Expand All @@ -84,9 +78,10 @@ def completion_time(self) -> str:

def get(self, timeout: float = None) -> Any:
if self.completed:
return self.result
if isinstance(self._result, Exception):
raise self._result
self._event.wait(timeout)
return self.result
return self._result

def __repr__(self) -> str:
d = dict(
Expand All @@ -107,8 +102,7 @@ def __init__(
):
self._batched_queue = BatchedQueue(timeout=timeout, bs=bs)
self.func = func
self._event = Event()
self._cancel_signal = Event()
self._cancel_processing = Event()
signal.signal(signal.SIGINT, self.signal_handler)

self._thread = Thread(target=self._process_queue, daemon=True)
Expand All @@ -117,7 +111,7 @@ def __init__(
def _process_queue(self):
logger.info("Started processing")
while True:
if self._cancel_signal.is_set():
if self._cancel_processing.is_set():
return
t0 = time.time()
batch: List[WaitedObject] = self._batched_queue.get()
Expand Down Expand Up @@ -148,7 +142,7 @@ def process(self, item: Any):

def cancel(self):
logger.info("Terminating Batch Processor...")
self._cancel_signal.set()
self._cancel_processing.set()
self._thread.join()
logger.info("Batch Processor terminated!")

Expand Down
2 changes: 1 addition & 1 deletion tests/test_batching.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,4 @@ def test_batch_processor():
assert wait.completed
assert isinstance(result, (int, float))
p.cancel()
assert p._cancel_signal.is_set()
assert p._cancel_processing.is_set()

0 comments on commit 2ee1deb

Please sign in to comment.