From b679a1013547237ab20408da2a5a1a312965aff9 Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Thu, 4 Feb 2021 15:50:08 -0800 Subject: [PATCH 01/17] first-pass Signed-off-by: Richard Liaw --- python/ray/util/client/server/dataservicer.py | 12 +++++++-- python/ray/util/client/server/server.py | 26 +++++++++++-------- 2 files changed, 25 insertions(+), 13 deletions(-) diff --git a/python/ray/util/client/server/dataservicer.py b/python/ray/util/client/server/dataservicer.py index 82ddc85c6f5f8..7725ff0da95ce 100644 --- a/python/ray/util/client/server/dataservicer.py +++ b/python/ray/util/client/server/dataservicer.py @@ -3,7 +3,7 @@ import grpc import sys -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Callable from threading import Lock import ray.core.generated.ray_client_pb2 as ray_client_pb2 @@ -17,10 +17,12 @@ class DataServicer(ray_client_pb2_grpc.RayletDataStreamerServicer): - def __init__(self, basic_service: "RayletServicer"): + def __init__(self, basic_service: "RayletServicer", + ray_connect_handler: Callable): self.basic_service = basic_service self._clients_lock = Lock() self._num_clients = 0 # guarded by self._clients_lock + self.ray_connect_handler = ray_connect_handler def Datapath(self, request_iterator, context): metadata = {k: v for k, v in context.invocation_metadata()} @@ -31,6 +33,8 @@ def Datapath(self, request_iterator, context): logger.info(f"New data connection from client {client_id}") try: with self._clients_lock: + if self._num_clients == 0 and not ray.is_initialized(): + self.ray_connect_handler() self._num_clients += 1 for req in request_iterator: resp = None @@ -63,9 +67,13 @@ def Datapath(self, request_iterator, context): finally: logger.info(f"Lost data connection from client {client_id}") self.basic_service.release_all(client_id) + with self._clients_lock: self._num_clients -= 1 + if self._num_clients == 0: + ray.shutdown() + def _build_connection_response(self): with self._clients_lock: cur_num_clients = self._num_clients diff --git a/python/ray/util/client/server/server.py b/python/ray/util/client/server/server.py index 6a7badaf703a3..24ed341ccffad 100644 --- a/python/ray/util/client/server/server.py +++ b/python/ray/util/client/server/server.py @@ -422,10 +422,11 @@ def __getattr__(self, attr): return getattr(self.grpc_server, attr) -def serve(connection_str): +def serve(connection_str, ray_connect_handler): server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) task_servicer = RayletServicer() - data_servicer = DataServicer(task_servicer) + data_servicer = DataServicer( + task_servicer, ray_connect_handler=ray_connect_handler) logs_servicer = LogstreamServicer() ray_client_pb2_grpc.add_RayletDriverServicer_to_server( task_servicer, server) @@ -477,18 +478,21 @@ def main(): help="Password for connecting to Redis") args = parser.parse_args() logging.basicConfig(level="INFO") - if args.redis_address: - if args.redis_password: - ray.init( - address=args.redis_address, - _redis_password=args.redis_password) + + def ray_connect_handler(): + if args.redis_address: + if args.redis_password: + ray.init( + address=args.redis_address, + _redis_password=args.redis_password) + else: + ray.init(address=args.redis_address) else: - ray.init(address=args.redis_address) - else: - ray.init() + ray.init() + hostport = "%s:%d" % (args.host, args.port) logger.info(f"Starting Ray Client server on {hostport}") - server = serve(hostport) + server = serve(hostport, ray_connect_handler) try: while True: time.sleep(1000) From 1373f48f0561d617c5fd99588785795662b15317 Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Thu, 4 Feb 2021 15:56:23 -0800 Subject: [PATCH 02/17] not-sure-about-this Signed-off-by: Richard Liaw --- python/ray/util/client/worker.py | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/python/ray/util/client/worker.py b/python/ray/util/client/worker.py index 3f04c80a48caf..7b66700c92d9d 100644 --- a/python/ray/util/client/worker.py +++ b/python/ray/util/client/worker.py @@ -94,16 +94,16 @@ def __init__(self, # RayletDriverStub, allowing for unary requests. self.server = ray_client_pb2_grpc.RayletDriverStub( self.channel) - # Now the HTTP2 channel is ready, or proxied, but the - # servicer may not be ready. Call is_initialized() and if - # it throws, the servicer is not ready. On success, the - # `ray_ready` result is checked. - ray_ready = self.is_initialized() - if ray_ready: - # Ray is ready! Break out of the retry loop - break - # Ray is not ready yet, wait a timeout - time.sleep(timeout) + # # Now the HTTP2 channel is ready, or proxied, but the + # # servicer may not be ready. Call is_initialized() and if + # # it throws, the servicer is not ready. On success, the + # # `ray_ready` result is checked. + # ray_ready = self.is_initialized() + # if ray_ready: + # # Ray is ready! Break out of the retry loop + # break + # # Ray is not ready yet, wait a timeout + # time.sleep(timeout) except grpc.FutureTimeoutError: logger.info( f"Couldn't connect channel in {timeout} seconds, retrying") @@ -120,10 +120,10 @@ def __init__(self, f"retry in {timeout}s...") timeout = backoff(timeout) - # If we made it through the loop without ray_ready it means we've used - # up our retries and should error back to the user. - if not ray_ready: - raise ConnectionError("ray client connection timeout") + # # If we made it through the loop without ray_ready it means we've used + # # up our retries and should error back to the user. + # if not ray_ready: + # raise ConnectionError("ray client connection timeout") # Initialize the streams to finish protocol negotiation. self.data_client = DataClient(self.channel, self._client_id, From 0410c3726e396e12838d57e83bd0bb0738b8ec68 Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Thu, 4 Feb 2021 16:32:58 -0800 Subject: [PATCH 03/17] ok Signed-off-by: Richard Liaw --- python/ray/util/client/server/server.py | 2 ++ python/ray/util/client/worker.py | 36 ++++++++++++++----------- src/ray/protobuf/ray_client.proto | 1 + 3 files changed, 24 insertions(+), 15 deletions(-) diff --git a/python/ray/util/client/server/server.py b/python/ray/util/client/server/server.py index 24ed341ccffad..22f76d19c3eea 100644 --- a/python/ray/util/client/server/server.py +++ b/python/ray/util/client/server/server.py @@ -104,6 +104,8 @@ def _return_debug_cluster_info(self, request, context=None) -> str: data = ray.nodes() elif request.type == ray_client_pb2.ClusterInfoType.IS_INITIALIZED: data = ray.is_initialized() + elif request.type == ray_client_pb2.ClusterInfoType.IS_ALIVE: + data = True else: raise TypeError("Unsupported cluster info type") return json.dumps(data) diff --git a/python/ray/util/client/worker.py b/python/ray/util/client/worker.py index 7b66700c92d9d..357745f9ce427 100644 --- a/python/ray/util/client/worker.py +++ b/python/ray/util/client/worker.py @@ -83,7 +83,7 @@ def __init__(self, # looking like a gRPC connection, though it may be a proxy. conn_attempts = 0 timeout = INITIAL_TIMEOUT_SEC - ray_ready = False + service_ready = False while conn_attempts < max(connection_retries, 1): conn_attempts += 1 try: @@ -94,16 +94,16 @@ def __init__(self, # RayletDriverStub, allowing for unary requests. self.server = ray_client_pb2_grpc.RayletDriverStub( self.channel) - # # Now the HTTP2 channel is ready, or proxied, but the - # # servicer may not be ready. Call is_initialized() and if - # # it throws, the servicer is not ready. On success, the - # # `ray_ready` result is checked. - # ray_ready = self.is_initialized() - # if ray_ready: - # # Ray is ready! Break out of the retry loop - # break - # # Ray is not ready yet, wait a timeout - # time.sleep(timeout) + # Now the HTTP2 channel is ready, or proxied, but the + # servicer may not be ready. Call is_initialized() and if + # it throws, the servicer is not ready. On success, the + # `service_ready` result is checked. + service_ready = self.is_service_alive() + if service_ready: + # Service is ready! Break out of the retry loop + break + # Service is not ready yet, wait a timeout + time.sleep(timeout) except grpc.FutureTimeoutError: logger.info( f"Couldn't connect channel in {timeout} seconds, retrying") @@ -120,10 +120,10 @@ def __init__(self, f"retry in {timeout}s...") timeout = backoff(timeout) - # # If we made it through the loop without ray_ready it means we've used - # # up our retries and should error back to the user. - # if not ray_ready: - # raise ConnectionError("ray client connection timeout") + # If we made it through the loop without service_ready, it means + # we've used up our retries and should error back to the user. + if not service_ready: + raise ConnectionError("ray client connection timeout") # Initialize the streams to finish protocol negotiation. self.data_client = DataClient(self.channel, self._client_id, @@ -377,6 +377,12 @@ def is_initialized(self) -> bool: ray_client_pb2.ClusterInfoType.IS_INITIALIZED) return False + def is_service_alive(self) -> bool: + if self.server is not None: + return self.get_cluster_info( + ray_client_pb2.ClusterInfoType.IS_ALIVE) + return False + def is_connected(self) -> bool: return self._conn_state == grpc.ChannelConnectivity.READY diff --git a/src/ray/protobuf/ray_client.proto b/src/ray/protobuf/ray_client.proto index 6781f1935246c..ac6d46e90fc19 100644 --- a/src/ray/protobuf/ray_client.proto +++ b/src/ray/protobuf/ray_client.proto @@ -139,6 +139,7 @@ message ClusterInfoType { CLUSTER_RESOURCES = 2; AVAILABLE_RESOURCES = 3; RUNTIME_CONTEXT = 4; + IS_ALIVE = 5; } } From f3ad8a82108d099a53d45c97c3d7a8eebaa05277 Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Thu, 4 Feb 2021 16:34:51 -0800 Subject: [PATCH 04/17] ok Signed-off-by: Richard Liaw --- python/ray/util/client/worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/util/client/worker.py b/python/ray/util/client/worker.py index 357745f9ce427..7b19708567a60 100644 --- a/python/ray/util/client/worker.py +++ b/python/ray/util/client/worker.py @@ -121,7 +121,7 @@ def __init__(self, timeout = backoff(timeout) # If we made it through the loop without service_ready, it means - # we've used up our retries and should error back to the user. + # we've used up our retries and should error back to the user. if not service_ready: raise ConnectionError("ray client connection timeout") From e517bb40cf6f53814e44d59b160741154794d33c Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Thu, 4 Feb 2021 16:56:19 -0800 Subject: [PATCH 05/17] Revert "ok" This reverts commit 0410c3726e396e12838d57e83bd0bb0738b8ec68. --- python/ray/util/client/server/server.py | 2 -- python/ray/util/client/worker.py | 36 +++++++++++-------------- src/ray/protobuf/ray_client.proto | 1 - 3 files changed, 15 insertions(+), 24 deletions(-) diff --git a/python/ray/util/client/server/server.py b/python/ray/util/client/server/server.py index 22f76d19c3eea..24ed341ccffad 100644 --- a/python/ray/util/client/server/server.py +++ b/python/ray/util/client/server/server.py @@ -104,8 +104,6 @@ def _return_debug_cluster_info(self, request, context=None) -> str: data = ray.nodes() elif request.type == ray_client_pb2.ClusterInfoType.IS_INITIALIZED: data = ray.is_initialized() - elif request.type == ray_client_pb2.ClusterInfoType.IS_ALIVE: - data = True else: raise TypeError("Unsupported cluster info type") return json.dumps(data) diff --git a/python/ray/util/client/worker.py b/python/ray/util/client/worker.py index 7b19708567a60..7b66700c92d9d 100644 --- a/python/ray/util/client/worker.py +++ b/python/ray/util/client/worker.py @@ -83,7 +83,7 @@ def __init__(self, # looking like a gRPC connection, though it may be a proxy. conn_attempts = 0 timeout = INITIAL_TIMEOUT_SEC - service_ready = False + ray_ready = False while conn_attempts < max(connection_retries, 1): conn_attempts += 1 try: @@ -94,16 +94,16 @@ def __init__(self, # RayletDriverStub, allowing for unary requests. self.server = ray_client_pb2_grpc.RayletDriverStub( self.channel) - # Now the HTTP2 channel is ready, or proxied, but the - # servicer may not be ready. Call is_initialized() and if - # it throws, the servicer is not ready. On success, the - # `service_ready` result is checked. - service_ready = self.is_service_alive() - if service_ready: - # Service is ready! Break out of the retry loop - break - # Service is not ready yet, wait a timeout - time.sleep(timeout) + # # Now the HTTP2 channel is ready, or proxied, but the + # # servicer may not be ready. Call is_initialized() and if + # # it throws, the servicer is not ready. On success, the + # # `ray_ready` result is checked. + # ray_ready = self.is_initialized() + # if ray_ready: + # # Ray is ready! Break out of the retry loop + # break + # # Ray is not ready yet, wait a timeout + # time.sleep(timeout) except grpc.FutureTimeoutError: logger.info( f"Couldn't connect channel in {timeout} seconds, retrying") @@ -120,10 +120,10 @@ def __init__(self, f"retry in {timeout}s...") timeout = backoff(timeout) - # If we made it through the loop without service_ready, it means - # we've used up our retries and should error back to the user. - if not service_ready: - raise ConnectionError("ray client connection timeout") + # # If we made it through the loop without ray_ready it means we've used + # # up our retries and should error back to the user. + # if not ray_ready: + # raise ConnectionError("ray client connection timeout") # Initialize the streams to finish protocol negotiation. self.data_client = DataClient(self.channel, self._client_id, @@ -377,12 +377,6 @@ def is_initialized(self) -> bool: ray_client_pb2.ClusterInfoType.IS_INITIALIZED) return False - def is_service_alive(self) -> bool: - if self.server is not None: - return self.get_cluster_info( - ray_client_pb2.ClusterInfoType.IS_ALIVE) - return False - def is_connected(self) -> bool: return self._conn_state == grpc.ChannelConnectivity.READY diff --git a/src/ray/protobuf/ray_client.proto b/src/ray/protobuf/ray_client.proto index ac6d46e90fc19..6781f1935246c 100644 --- a/src/ray/protobuf/ray_client.proto +++ b/src/ray/protobuf/ray_client.proto @@ -139,7 +139,6 @@ message ClusterInfoType { CLUSTER_RESOURCES = 2; AVAILABLE_RESOURCES = 3; RUNTIME_CONTEXT = 4; - IS_ALIVE = 5; } } From 682ed3018787127c1212ca0708f649a249993cd3 Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Thu, 4 Feb 2021 17:17:55 -0800 Subject: [PATCH 06/17] try-this Signed-off-by: Richard Liaw --- python/ray/util/client/worker.py | 29 ++++++++++------------------- 1 file changed, 10 insertions(+), 19 deletions(-) diff --git a/python/ray/util/client/worker.py b/python/ray/util/client/worker.py index 7b66700c92d9d..714d7472ee18b 100644 --- a/python/ray/util/client/worker.py +++ b/python/ray/util/client/worker.py @@ -83,7 +83,7 @@ def __init__(self, # looking like a gRPC connection, though it may be a proxy. conn_attempts = 0 timeout = INITIAL_TIMEOUT_SEC - ray_ready = False + service_ready = False while conn_attempts < max(connection_retries, 1): conn_attempts += 1 try: @@ -94,16 +94,11 @@ def __init__(self, # RayletDriverStub, allowing for unary requests. self.server = ray_client_pb2_grpc.RayletDriverStub( self.channel) - # # Now the HTTP2 channel is ready, or proxied, but the - # # servicer may not be ready. Call is_initialized() and if - # # it throws, the servicer is not ready. On success, the - # # `ray_ready` result is checked. - # ray_ready = self.is_initialized() - # if ray_ready: - # # Ray is ready! Break out of the retry loop - # break - # # Ray is not ready yet, wait a timeout - # time.sleep(timeout) + # Initialize the streams to finish protocol negotiation. + self.data_client = DataClient(self.channel, self._client_id, + self.metadata) + service_ready = True + break except grpc.FutureTimeoutError: logger.info( f"Couldn't connect channel in {timeout} seconds, retrying") @@ -120,14 +115,10 @@ def __init__(self, f"retry in {timeout}s...") timeout = backoff(timeout) - # # If we made it through the loop without ray_ready it means we've used - # # up our retries and should error back to the user. - # if not ray_ready: - # raise ConnectionError("ray client connection timeout") - - # Initialize the streams to finish protocol negotiation. - self.data_client = DataClient(self.channel, self._client_id, - self.metadata) + # If we made it through the loop without service_ready it means we've used + # up our retries and should error back to the user. + if not service_ready: + raise ConnectionError("ray client connection timeout") self.reference_count: Dict[bytes, int] = defaultdict(int) self.log_client = LogstreamClient(self.channel, self.metadata) From 3e6ce826c70e54cf96181bce6124e49f0375833f Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Thu, 4 Feb 2021 17:28:17 -0800 Subject: [PATCH 07/17] ok Signed-off-by: Richard Liaw --- python/ray/util/client/server/server.py | 25 +++++++++++++++---------- python/ray/util/client/worker.py | 5 +++-- 2 files changed, 18 insertions(+), 12 deletions(-) diff --git a/python/ray/util/client/server/server.py b/python/ray/util/client/server/server.py index 24ed341ccffad..cbc41731d27e7 100644 --- a/python/ray/util/client/server/server.py +++ b/python/ray/util/client/server/server.py @@ -459,6 +459,19 @@ def shutdown_with_server(server, _exiting_interpreter=False): ray.shutdown(_exiting_interpreter) +def create_ray_handler(redis_address, redis_password): + def ray_connect_handler(): + if redis_address: + if redis_password: + ray.init(address=redis_address, _redis_password=redis_password) + else: + ray.init(address=redis_address) + else: + ray.init() + + return ray_connect_handler + + def main(): import argparse parser = argparse.ArgumentParser() @@ -479,16 +492,8 @@ def main(): args = parser.parse_args() logging.basicConfig(level="INFO") - def ray_connect_handler(): - if args.redis_address: - if args.redis_password: - ray.init( - address=args.redis_address, - _redis_password=args.redis_password) - else: - ray.init(address=args.redis_address) - else: - ray.init() + ray_connect_handler = create_ray_handler(args.redis_address, + args.redis_password) hostport = "%s:%d" % (args.host, args.port) logger.info(f"Starting Ray Client server on {hostport}") diff --git a/python/ray/util/client/worker.py b/python/ray/util/client/worker.py index 714d7472ee18b..fcd69ebd32083 100644 --- a/python/ray/util/client/worker.py +++ b/python/ray/util/client/worker.py @@ -115,8 +115,9 @@ def __init__(self, f"retry in {timeout}s...") timeout = backoff(timeout) - # If we made it through the loop without service_ready it means we've used - # up our retries and should error back to the user. + # If we made it through the loop without service_ready + # it means we've used up our retries and + # should error back to the user. if not service_ready: raise ConnectionError("ray client connection timeout") self.reference_count: Dict[bytes, int] = defaultdict(int) From 321751d655c785a1f44be795ed4ea651b7f4cbfa Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Thu, 4 Feb 2021 20:08:56 -0800 Subject: [PATCH 08/17] clientdisable Signed-off-by: Richard Liaw --- python/ray/util/client/server/dataservicer.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/python/ray/util/client/server/dataservicer.py b/python/ray/util/client/server/dataservicer.py index 7725ff0da95ce..3e48eebbace79 100644 --- a/python/ray/util/client/server/dataservicer.py +++ b/python/ray/util/client/server/dataservicer.py @@ -9,6 +9,7 @@ import ray.core.generated.ray_client_pb2 as ray_client_pb2 import ray.core.generated.ray_client_pb2_grpc as ray_client_pb2_grpc from ray.util.client import CURRENT_PROTOCOL_VERSION +from ray._private.client_mode_hook import disable_client_hook if TYPE_CHECKING: from ray.util.client.server.server import RayletServicer @@ -33,8 +34,9 @@ def Datapath(self, request_iterator, context): logger.info(f"New data connection from client {client_id}") try: with self._clients_lock: - if self._num_clients == 0 and not ray.is_initialized(): - self.ray_connect_handler() + with disable_client_hook(): + if self._num_clients == 0 and not ray.is_initialized(): + self.ray_connect_handler() self._num_clients += 1 for req in request_iterator: resp = None @@ -72,7 +74,8 @@ def Datapath(self, request_iterator, context): self._num_clients -= 1 if self._num_clients == 0: - ray.shutdown() + with disable_client_hook(): + ray.shutdown() def _build_connection_response(self): with self._clients_lock: From dff6906b30c13d6ea5d8d7793197c32af60d81eb Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Thu, 4 Feb 2021 21:00:23 -0800 Subject: [PATCH 09/17] update Signed-off-by: Richard Liaw --- python/ray/tests/test_client_init.py | 263 ++++++++++-------- python/ray/util/client/server/dataservicer.py | 4 +- python/ray/util/client/server/server.py | 12 +- 3 files changed, 155 insertions(+), 124 deletions(-) diff --git a/python/ray/tests/test_client_init.py b/python/ray/tests/test_client_init.py index 6b6ce8a425980..561f4bfe16234 100644 --- a/python/ray/tests/test_client_init.py +++ b/python/ray/tests/test_client_init.py @@ -1,3 +1,4 @@ +# flake8: noqa """Client tests that run their own init (as with init_and_serve) live here""" import pytest @@ -38,130 +39,150 @@ def get(self): return self.val -def test_basic_preregister(): +@pytest.fixture +def init_and_serve(): + server_handle, _ = ray_client_server.init_and_serve("localhost:50051") + yield server_handle + ray_client_server.shutdown_with_server(server_handle.grpc_server) + time.sleep(2) + + +@pytest.fixture +def init_and_serve_lazy(): + cluster = ray.cluster_utils.Cluster() + cluster.add_node(num_cpus=1, num_gpus=0) + address = cluster.address + + def connect(): + ray.init(address=address) + + server_handle = ray_client_server.serve("localhost:50051", connect) + yield server_handle + ray_client_server.shutdown_with_server(server_handle.grpc_server) + time.sleep(2) + + +def test_basic_preregister(init_and_serve): from ray.util.client import ray - server, _ = ray_client_server.init_and_serve("localhost:50051") - try: - ray.connect("localhost:50051") - val = ray.get(hello_world.remote()) - print(val) - assert val >= 20 - assert val <= 200 - c = C.remote(3) - x = c.double.remote() - y = c.double.remote() - ray.wait([x, y]) - val = ray.get(c.get.remote()) - assert val == 12 - finally: - ray.disconnect() - ray_client_server.shutdown_with_server(server) - time.sleep(2) - - -def test_num_clients(): + ray.connect("localhost:50051") + val = ray.get(hello_world.remote()) + print(val) + assert val >= 20 + assert val <= 200 + c = C.remote(3) + x = c.double.remote() + y = c.double.remote() + ray.wait([x, y]) + val = ray.get(c.get.remote()) + assert val == 12 + ray.disconnect() + + +def test_num_clients(init_and_serve_lazy): # Tests num clients reporting; useful if you want to build an app that # load balances clients between Ray client servers. - server_handle, _ = ray_client_server.init_and_serve("localhost:50051") + + def get_job_id(api): + return api.get_runtime_context().worker.current_job_id + + server_handle = init_and_serve_lazy server = server_handle.grpc_server - try: - api1 = RayAPIStub() - info1 = api1.connect("localhost:50051") - assert info1["num_clients"] == 1, info1 - api2 = RayAPIStub() - info2 = api2.connect("localhost:50051") - assert info2["num_clients"] == 2, info2 - - # Disconnect the first two clients. - api1.disconnect() - api2.disconnect() - time.sleep(1) - - api3 = RayAPIStub() - info3 = api3.connect("localhost:50051") - assert info3["num_clients"] == 1, info3 - - # Check info contains ray and python version. - assert isinstance(info3["ray_version"], str), info3 - assert isinstance(info3["ray_commit"], str), info3 - assert isinstance(info3["python_version"], str), info3 - assert isinstance(info3["protocol_version"], str), info3 - api3.disconnect() - finally: - ray_client_server.shutdown_with_server(server) - time.sleep(2) - - -def test_python_version(): + api1 = RayAPIStub() + info1 = api1.connect("localhost:50051") + job_id_1 = get_job_id(api1) + assert info1["num_clients"] == 1, info1 + api2 = RayAPIStub() + info2 = api2.connect("localhost:50051") + job_id_2 = get_job_id(api2) + assert info2["num_clients"] == 2, info2 + + assert job_id_1 == job_id_2 + + # Disconnect the first two clients. + api1.disconnect() + api2.disconnect() + time.sleep(1) - server_handle, _ = ray_client_server.init_and_serve("localhost:50051") - try: - ray = RayAPIStub() - info1 = ray.connect("localhost:50051") - assert info1["python_version"] == ".".join( - [str(x) for x in list(sys.version_info)[:3]]) - ray.disconnect() - time.sleep(1) - - def mock_connection_response(): - return ray_client_pb2.ConnectionInfoResponse( - num_clients=1, - python_version="2.7.12", - ray_version="", - ray_commit="", - protocol_version=CURRENT_PROTOCOL_VERSION, - ) - - # inject mock connection function - server_handle.data_servicer._build_connection_response = \ - mock_connection_response - - ray = RayAPIStub() - with pytest.raises(RuntimeError): - _ = ray.connect("localhost:50051") - - ray = RayAPIStub() - info3 = ray.connect("localhost:50051", ignore_version=True) - assert info3["num_clients"] == 1, info3 - ray.disconnect() - finally: - ray_client_server.shutdown_with_server(server_handle.grpc_server) - time.sleep(2) - - -def test_protocol_version(): + api3 = RayAPIStub() + info3 = api3.connect("localhost:50051") + job_id_3 = get_job_id(api3) + assert info3["num_clients"] == 1, info3 + assert job_id_1 != job_id_3 - server_handle, _ = ray_client_server.init_and_serve("localhost:50051") - try: - ray = RayAPIStub() - info1 = ray.connect("localhost:50051") - local_py_version = ".".join( - [str(x) for x in list(sys.version_info)[:3]]) - assert info1["protocol_version"] == CURRENT_PROTOCOL_VERSION, info1 - ray.disconnect() - time.sleep(1) - - def mock_connection_response(): - return ray_client_pb2.ConnectionInfoResponse( - num_clients=1, - python_version=local_py_version, - ray_version="", - ray_commit="", - protocol_version="2050-01-01", # from the future - ) - - # inject mock connection function - server_handle.data_servicer._build_connection_response = \ - mock_connection_response - - ray = RayAPIStub() - with pytest.raises(RuntimeError): - _ = ray.connect("localhost:50051") - - ray = RayAPIStub() - info3 = ray.connect("localhost:50051", ignore_version=True) - assert info3["num_clients"] == 1, info3 - ray.disconnect() - finally: - ray_client_server.shutdown_with_server(server_handle.grpc_server) - time.sleep(2) + # Check info contains ray and python version. + assert isinstance(info3["ray_version"], str), info3 + assert isinstance(info3["ray_commit"], str), info3 + assert isinstance(info3["python_version"], str), info3 + assert isinstance(info3["protocol_version"], str), info3 + api3.disconnect() + + +def test_python_version(init_and_serve): + from ray.util.client import ray + server_handle = init_and_serve + ray = RayAPIStub() + info1 = ray.connect("localhost:50051") + assert info1["python_version"] == ".".join( + [str(x) for x in list(sys.version_info)[:3]]) + ray.disconnect() + time.sleep(1) + + def mock_connection_response(): + return ray_client_pb2.ConnectionInfoResponse( + num_clients=1, + python_version="2.7.12", + ray_version="", + ray_commit="", + protocol_version=CURRENT_PROTOCOL_VERSION, + ) + + # inject mock connection function + server_handle.data_servicer._build_connection_response = \ + mock_connection_response + + ray = RayAPIStub() + with pytest.raises(RuntimeError): + _ = ray.connect("localhost:50051") + + ray = RayAPIStub() + info3 = ray.connect("localhost:50051", ignore_version=True) + assert info3["num_clients"] == 1, info3 + ray.disconnect() + + +def test_protocol_version(init_and_serve): + from ray.util.client import ray + server_handle = init_and_serve + ray = RayAPIStub() + info1 = ray.connect("localhost:50051") + local_py_version = ".".join([str(x) for x in list(sys.version_info)[:3]]) + assert info1["protocol_version"] == CURRENT_PROTOCOL_VERSION, info1 + ray.disconnect() + time.sleep(1) + + def mock_connection_response(): + return ray_client_pb2.ConnectionInfoResponse( + num_clients=1, + python_version=local_py_version, + ray_version="", + ray_commit="", + protocol_version="2050-01-01", # from the future + ) + + # inject mock connection function + server_handle.data_servicer._build_connection_response = \ + mock_connection_response + + ray = RayAPIStub() + with pytest.raises(RuntimeError): + _ = ray.connect("localhost:50051") + + ray = RayAPIStub() + info3 = ray.connect("localhost:50051", ignore_version=True) + assert info3["num_clients"] == 1, info3 + ray.disconnect() + + +if __name__ == "__main__": + import pytest + sys.exit(pytest.main(["-v", __file__] + sys.argv[1:])) diff --git a/python/ray/util/client/server/dataservicer.py b/python/ray/util/client/server/dataservicer.py index 3e48eebbace79..c9e345219a9bd 100644 --- a/python/ray/util/client/server/dataservicer.py +++ b/python/ray/util/client/server/dataservicer.py @@ -73,8 +73,8 @@ def Datapath(self, request_iterator, context): with self._clients_lock: self._num_clients -= 1 - if self._num_clients == 0: - with disable_client_hook(): + with disable_client_hook(): + if self._num_clients == 0: ray.shutdown() def _build_connection_response(self): diff --git a/python/ray/util/client/server/server.py b/python/ray/util/client/server/server.py index cbc41731d27e7..3b72e51ec9be0 100644 --- a/python/ray/util/client/server/server.py +++ b/python/ray/util/client/server/server.py @@ -449,7 +449,17 @@ def init_and_serve(connection_str, *args, **kwargs): with disable_client_hook(): # Disable client mode inside the worker's environment info = ray.init(*args, **kwargs) - server_handle = serve(connection_str) + + def ray_connect_handler(): + # Ray client will disconnect from ray when + # num_clients == 0. + if ray.is_initialized(): + return info + else: + return ray.init(*args, **kwargs) + + server_handle = serve( + connection_str, ray_connect_handler=ray_connect_handler) return (server_handle, info) From 3120440f84e73c42ef355398d1a3506524588cc8 Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Fri, 5 Feb 2021 01:03:14 -0800 Subject: [PATCH 10/17] ping Signed-off-by: Richard Liaw --- python/ray/util/client/worker.py | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/python/ray/util/client/worker.py b/python/ray/util/client/worker.py index fcd69ebd32083..6748d9bb635ce 100644 --- a/python/ray/util/client/worker.py +++ b/python/ray/util/client/worker.py @@ -68,6 +68,7 @@ def __init__(self, """ self.metadata = metadata if metadata else [] self.channel = None + self.server = None self._conn_state = grpc.ChannelConnectivity.IDLE self._client_id = make_client_id() self._converted: Dict[str, ClientStub] = {} @@ -94,11 +95,9 @@ def __init__(self, # RayletDriverStub, allowing for unary requests. self.server = ray_client_pb2_grpc.RayletDriverStub( self.channel) - # Initialize the streams to finish protocol negotiation. - self.data_client = DataClient(self.channel, self._client_id, - self.metadata) - service_ready = True - break + service_ready = bool(self.ping_server()) + if service_ready: + break except grpc.FutureTimeoutError: logger.info( f"Couldn't connect channel in {timeout} seconds, retrying") @@ -120,6 +119,10 @@ def __init__(self, # should error back to the user. if not service_ready: raise ConnectionError("ray client connection timeout") + + # Initialize the streams to finish protocol negotiation. + self.data_client = DataClient(self.channel, self._client_id, + self.metadata) self.reference_count: Dict[bytes, int] = defaultdict(int) self.log_client = LogstreamClient(self.channel, self.metadata) @@ -369,6 +372,13 @@ def is_initialized(self) -> bool: ray_client_pb2.ClusterInfoType.IS_INITIALIZED) return False + def ping_server(self) -> bool: + if self.server is not None: + result = self.get_cluster_info( + ray_client_pb2.ClusterInfoType.IS_INITIALIZED) + return result is not None + return False + def is_connected(self) -> bool: return self._conn_state == grpc.ChannelConnectivity.READY From ef24c44a5a6536d26c3655bd835b2b8953b0979b Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Fri, 5 Feb 2021 01:05:30 -0800 Subject: [PATCH 11/17] ok Signed-off-by: Richard Liaw --- python/ray/util/client/worker.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/python/ray/util/client/worker.py b/python/ray/util/client/worker.py index 6748d9bb635ce..200ec7518e6a8 100644 --- a/python/ray/util/client/worker.py +++ b/python/ray/util/client/worker.py @@ -373,6 +373,11 @@ def is_initialized(self) -> bool: return False def ping_server(self) -> bool: + """Simple health check. + + Piggybacks the IS_INITIALIZED call to check if the server provides + an actual response. + """ if self.server is not None: result = self.get_cluster_info( ray_client_pb2.ClusterInfoType.IS_INITIALIZED) From 11621602685f66151340a26cf4b6214d78a4a618 Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Fri, 5 Feb 2021 13:23:05 -0800 Subject: [PATCH 12/17] Update python/ray/util/client/worker.py --- python/ray/util/client/worker.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/ray/util/client/worker.py b/python/ray/util/client/worker.py index 200ec7518e6a8..db9a1cc630522 100644 --- a/python/ray/util/client/worker.py +++ b/python/ray/util/client/worker.py @@ -98,6 +98,8 @@ def __init__(self, service_ready = bool(self.ping_server()) if service_ready: break + # Ray is not ready yet, wait a timeout + time.sleep(timeout) except grpc.FutureTimeoutError: logger.info( f"Couldn't connect channel in {timeout} seconds, retrying") From f4b5e8e8d934128f3f8affb6dfc046cd737d4bff Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Fri, 5 Feb 2021 13:25:34 -0800 Subject: [PATCH 13/17] ok Signed-off-by: Richard Liaw --- python/ray/tests/test_client_init.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/python/ray/tests/test_client_init.py b/python/ray/tests/test_client_init.py index 561f4bfe16234..8053ab5774e60 100644 --- a/python/ray/tests/test_client_init.py +++ b/python/ray/tests/test_client_init.py @@ -1,4 +1,3 @@ -# flake8: noqa """Client tests that run their own init (as with init_and_serve) live here""" import pytest @@ -85,8 +84,6 @@ def test_num_clients(init_and_serve_lazy): def get_job_id(api): return api.get_runtime_context().worker.current_job_id - server_handle = init_and_serve_lazy - server = server_handle.grpc_server api1 = RayAPIStub() info1 = api1.connect("localhost:50051") job_id_1 = get_job_id(api1) @@ -118,7 +115,6 @@ def get_job_id(api): def test_python_version(init_and_serve): - from ray.util.client import ray server_handle = init_and_serve ray = RayAPIStub() info1 = ray.connect("localhost:50051") @@ -151,7 +147,6 @@ def mock_connection_response(): def test_protocol_version(init_and_serve): - from ray.util.client import ray server_handle = init_and_serve ray = RayAPIStub() info1 = ray.connect("localhost:50051") From c3028164853a373befc7aeea8a61777ed4ba2a06 Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Sat, 6 Feb 2021 01:14:51 -0800 Subject: [PATCH 14/17] fix Signed-off-by: Richard Liaw --- python/ray/util/client/server/server.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/python/ray/util/client/server/server.py b/python/ray/util/client/server/server.py index 3b72e51ec9be0..6e65c929b8d82 100644 --- a/python/ray/util/client/server/server.py +++ b/python/ray/util/client/server/server.py @@ -422,7 +422,13 @@ def __getattr__(self, attr): return getattr(self.grpc_server, attr) -def serve(connection_str, ray_connect_handler): +def serve(connection_str, ray_connect_handler=None): + def default_connect_handler(): + with disable_client_hook(): + if not ray.is_initialized(): + return ray.init() + + ray_connect_handler = ray_connect_handler or default_connect_handler server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) task_servicer = RayletServicer() data_servicer = DataServicer( From 1e4794736def88e7909fbf4b692518bd5d4e7343 Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Sat, 6 Feb 2021 18:46:38 -0800 Subject: [PATCH 15/17] client-server-disable Signed-off-by: Richard Liaw --- python/ray/tests/test_job.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/ray/tests/test_job.py b/python/ray/tests/test_job.py index cc7909dd8cb9c..15313d7bafbd1 100644 --- a/python/ray/tests/test_job.py +++ b/python/ray/tests/test_job.py @@ -33,7 +33,7 @@ def __init__(self): assert len(actor_table) == 1 job_table = ray.jobs() - assert len(job_table) == 3 # dash, ray client server + assert len(job_table) == 2 # dash # Kill the driver process. p.kill() @@ -79,7 +79,7 @@ def value(self): assert len(actor_table) == 1 job_table = ray.jobs() - assert len(job_table) == 3 # dash, ray client server + assert len(job_table) == 2 # dash # Kill the driver process. p.kill() From a78c3c7cadac1f6129445074d1d59f88ea467c0d Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Sat, 6 Feb 2021 19:00:03 -0800 Subject: [PATCH 16/17] client Signed-off-by: Richard Liaw --- python/ray/tests/BUILD | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/ray/tests/BUILD b/python/ray/tests/BUILD index 4ef81d504f633..2572c50c2dcf4 100644 --- a/python/ray/tests/BUILD +++ b/python/ray/tests/BUILD @@ -26,6 +26,8 @@ py_test_module_list( "test_basic_3.py", "test_cancel.py", "test_cli.py", + "test_client.py", + "test_client_init.py", "test_component_failures_2.py", "test_component_failures_3.py", "test_error_ray_not_initialized.py", @@ -80,9 +82,7 @@ py_test_module_list( "test_asyncio.py", "test_autoscaler.py", "test_autoscaler_yaml.py", - "test_client_init.py", "test_client_metadata.py", - "test_client.py", "test_client_references.py", "test_client_terminate.py", "test_command_runner.py", From 7061aa340dea2de6900fc381f6bce761caf871d4 Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Sun, 7 Feb 2021 00:10:17 -0800 Subject: [PATCH 17/17] client Signed-off-by: Richard Liaw --- ci/travis/ci.sh | 1 + 1 file changed, 1 insertion(+) diff --git a/ci/travis/ci.sh b/ci/travis/ci.sh index 6267a232125a6..d8298784c1b5f 100755 --- a/ci/travis/ci.sh +++ b/ci/travis/ci.sh @@ -151,6 +151,7 @@ test_python() { -python/ray/tests:test_basic_3 # timeout -python/ray/tests:test_basic_3_client_mode -python/ray/tests:test_cli + -python/ray/tests:test_client_init # timeout -python/ray/tests:test_failure -python/ray/tests:test_global_gc -python/ray/tests:test_job