Skip to content

Commit

Permalink
Merge pull request #1214 from carver/service-is-operational
Browse files Browse the repository at this point in the history
Add BaseService.is_operational
  • Loading branch information
carver authored Aug 23, 2018
2 parents 14aec04 + 75068c0 commit fcf73d0
Show file tree
Hide file tree
Showing 14 changed files with 36 additions and 25 deletions.
2 changes: 1 addition & 1 deletion p2p/DEVELOPMENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ library.
## BaseService

- If your service needs to run coroutines in the background, you should use the `BaseService.run_task()` method and
ensure they exit when `is_running` is False or when the cancel token is triggered.
ensure they exit when `is_operational` is False or when the cancel token is triggered.
- If your service runs other services in the background, you should pass your CancelToken down to
those services and run those using `BaseService.run_child_service()`, or
`BaseService.run_daemon()` if you want the parent to be terminated when the child dies
Expand Down
2 changes: 1 addition & 1 deletion p2p/discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ async def _run(self) -> None:
await self._start_udp_listener()
connect_loop_sleep = 2
self.run_task(self.proto.bootstrap())
while not self.cancel_token.triggered:
while self.is_operational:
await self.maybe_connect_to_more_peers()
await self.sleep(connect_loop_sleep)

Expand Down
2 changes: 1 addition & 1 deletion p2p/nat.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ async def _run(self) -> None:
On every iteration we configure the port mapping with a lifetime of 30 minutes and then
sleep for that long as well.
"""
while not self.cancel_token.triggered:
while self.is_operational:
try:
# Wait for the port mapping lifetime, and then try registering it again
await self.wait(asyncio.sleep(self._nat_portmap_lifetime))
Expand Down
6 changes: 3 additions & 3 deletions p2p/peer.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ async def _cleanup(self) -> None:
self.close()

async def _run(self) -> None:
while not self.cancel_token.triggered:
while self.is_operational:
try:
cmd, msg = await self.read_msg()
except (PeerConnectionLost, TimeoutError) as err:
Expand Down Expand Up @@ -530,7 +530,7 @@ async def disconnect(self, reason: DisconnectReason) -> None:
self.logger.debug("Disconnecting from remote peer; reason: %s", reason.name)
self.base_protocol.send_disconnect(reason.value)
self.close()
if self.is_running:
if self.is_operational:
await self.cancel()

def select_sub_protocol(self, remote_capabilities: List[Tuple[bytes, int]]
Expand Down Expand Up @@ -912,7 +912,7 @@ def get_peers(self, min_td: int) -> List[BasePeer]:
return [peer for peer in peers if peer.head_td >= min_td]

async def _periodically_report_stats(self) -> None:
while self.is_running:
while self.is_operational:
inbound_peers = len(
[peer for peer in self.connected_nodes.values() if peer.inbound])
self.logger.info("Connected peers: %d inbound, %d outbound",
Expand Down
15 changes: 12 additions & 3 deletions p2p/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ async def run(
"""
if self.is_running:
raise RuntimeError("Cannot start the service while it's already running")
elif self.cancel_token.triggered:
elif self.is_cancelled:
raise RuntimeError("Cannot restart a service that has already been cancelled")

if finished_callback:
Expand Down Expand Up @@ -135,6 +135,7 @@ async def _run_task_wrapper() -> None:
pass
except Exception as e:
self.logger.warning("Task %s finished unexpectedly: %s", awaitable, e)
self.logger.warning("Task failure traceback", exc_info=True)
else:
self.logger.debug("Task %s finished with no errors", awaitable)
self._tasks.add(asyncio.ensure_future(_run_task_wrapper()))
Expand All @@ -158,7 +159,7 @@ async def _run_daemon_wrapper() -> None:
try:
await service.run()
finally:
if not self.cancel_token.triggered:
if not self.is_cancelled:
self.logger.debug(
"%s finished while we're still running, terminating as well", service)
self.cancel_token.trigger()
Expand Down Expand Up @@ -188,7 +189,7 @@ async def cleanup(self) -> None:

async def cancel(self) -> None:
"""Trigger the CancelToken and wait for the cleaned_up event to be set."""
if self.cancel_token.triggered:
if self.is_cancelled:
self.logger.warning("Tried to cancel %s, but it was already cancelled", self)
return
elif not self.is_running:
Expand All @@ -215,6 +216,14 @@ def _forcibly_cancel_all_tasks(self) -> None:
for task in self._tasks:
task.cancel()

@property
def is_cancelled(self) -> bool:
return self.cancel_token.triggered

@property
def is_operational(self) -> bool:
return self.events.started.is_set() and not self.cancel_token.triggered

@property
def is_running(self) -> bool:
return self._run_lock.locked()
Expand Down
2 changes: 2 additions & 0 deletions tests/p2p/test_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@ async def test_daemon_exit_causes_parent_cancellation():
service = ParentService()
asyncio.ensure_future(service.run())
await asyncio.sleep(0.01)
assert service.daemon.is_operational
assert service.daemon.is_running
await service.daemon.cancel()
await asyncio.sleep(0.01)
assert not service.is_operational
assert not service.is_running
await service.events.cleaned_up.wait()
2 changes: 1 addition & 1 deletion tests/trinity/core/integration_test_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

async def connect_to_peers_loop(peer_pool, nodes):
"""Loop forever trying to connect to one of the given nodes if the pool is not yet full."""
while not peer_pool.cancel_token.triggered:
while peer_pool.is_operational:
try:
if not peer_pool.is_full:
await peer_pool.connect_to_nodes(nodes)
Expand Down
2 changes: 1 addition & 1 deletion trinity/plugins/builtin/tx_pool/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ async def _run(self) -> None:
self.logger.info("Running Tx Pool")

with self.subscribe(self._peer_pool):
while not self.cancel_token.triggered:
while self.is_operational:
peer, cmd, msg = await self.wait(
self.msg_queue.get(), token=self.cancel_token)
peer = cast(ETHPeer, peer)
Expand Down
2 changes: 1 addition & 1 deletion trinity/protocol/common/exchanges.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ async def get_result(
- the manager service is running
- the payload validator is primed with the request payload
"""
if not self._manager.is_running:
if not self._manager.is_operational:
await self._manager.launch_service()

# bind the outbound request payload to the payload validator
Expand Down
8 changes: 4 additions & 4 deletions trinity/protocol/common/managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ async def _run(self) -> None:
self.logger.debug("Launching %s for peer %s", self.__class__.__name__, self._peer)

with self.subscribe_peer(self._peer):
while not self.cancel_token.triggered:
while self.is_operational:
peer, cmd, msg = await self.wait(self.msg_queue.get())
if peer != self._peer:
self.logger.error("Unexpected peer: %s expected: %s", peer, self._peer)
Expand Down Expand Up @@ -202,8 +202,8 @@ async def launch_service(self) -> None:
await self._response_stream.events.started.wait()

@property
def is_running(self) -> bool:
return self.service is not None and self.service.is_running
def is_operational(self) -> bool:
return self.service is not None and self.service.is_operational

async def get_result(
self,
Expand All @@ -213,7 +213,7 @@ async def get_result(
payload_validator: Callable[[TResponsePayload], None],
timeout: int = None) -> TResult:

if not self.is_running:
if not self.is_operational:
raise ValidationError("You must call `launch_service` before initiating a peer request")

stream = self._response_stream
Expand Down
8 changes: 4 additions & 4 deletions trinity/sync/common/chain.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def register_peer(self, peer: BasePeer) -> None:
self._sync_requests.put_nowait(cast(HeaderRequestingPeer, self.peer_pool.highest_td_peer))

async def _handle_msg_loop(self) -> None:
while self.is_running:
while self.is_operational:
peer, cmd, msg = await self.wait(self.msg_queue.get())
# Our handle_msg() method runs cpu-intensive tasks in sub-processes so that the main
# loop can keep processing msgs, and that's why we use self.run_task() instead of
Expand All @@ -98,7 +98,7 @@ async def handle_msg(self, peer: HeaderRequestingPeer, cmd: protocol.Command,
async def _run(self) -> None:
self.run_task(self._handle_msg_loop())
with self.subscribe(self.peer_pool):
while not self.cancel_token.triggered:
while self.is_operational:
peer_or_finished: Any = await self.wait_first(
self._sync_requests.get(),
self._sync_complete.wait()
Expand Down Expand Up @@ -156,8 +156,8 @@ async def _sync(self, peer: HeaderRequestingPeer) -> None:
# will be discarded by _fetch_missing_headers() so we don't unnecessarily process them
# again.
start_at = max(GENESIS_BLOCK_NUMBER + 1, head.block_number - MAX_REORG_DEPTH)
while self.is_running:
if not peer.is_running:
while self.is_operational:
if not peer.is_operational:
self.logger.info("%s disconnected, aborting sync", peer)
break

Expand Down
6 changes: 3 additions & 3 deletions trinity/sync/full/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ async def get_peer_for_request(self, node_keys: Set[Hash32]) -> ETHPeer:
raise NoIdlePeers()

async def _handle_msg_loop(self) -> None:
while self.is_running:
while self.is_operational:
peer, cmd, msg = await self.wait(self.msg_queue.get())
# Run self._handle_msg() with self.run_task() instead of awaiting for it so that we
# can keep consuming msgs while _handle_msg() performs cpu-intensive tasks in separate
Expand Down Expand Up @@ -281,7 +281,7 @@ async def _request_and_process_nodes(self, peer: ETHPeer, batch: Tuple[Hash32, .
await self._process_nodes(node_data)

async def _periodically_retry_timedout_and_missing(self) -> None:
while self.is_running:
while self.is_operational:
timed_out = self.request_tracker.get_timed_out()
if timed_out:
self.logger.debug("Re-requesting %d timed out trie nodes", len(timed_out))
Expand Down Expand Up @@ -330,7 +330,7 @@ async def _run(self) -> None:
self.logger.info("Finished state sync with root hash %s", encode_hex(self.root_hash))

async def _periodically_report_progress(self) -> None:
while self.is_running:
while self.is_operational:
requested_nodes = sum(
len(node_keys) for _, node_keys in self.request_tracker.active_requests.values())
msg = "processed=%d " % self._total_processed_nodes
Expand Down
2 changes: 1 addition & 1 deletion trinity/sync/light/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def __init__(

async def _run(self) -> None:
with self.subscribe(self.peer_pool):
while not self.cancel_token.triggered:
while self.is_operational:
peer, cmd, msg = await self.wait(self.msg_queue.get())
if isinstance(msg, dict):
request_id = msg.get('request_id')
Expand Down
2 changes: 1 addition & 1 deletion trinity/sync/sharding/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ async def propose(self) -> Collation:
#
async def _run(self) -> None:
with self.subscribe(self.peer_pool):
while not self.cancel_token.triggered:
while self.is_operational:
peer, cmd, msg = await self.cancel_token.cancellable_wait(
self.msg_queue.get())

Expand Down

0 comments on commit fcf73d0

Please sign in to comment.