Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce new VCE with ActorPool #1969

Merged
merged 58 commits into from
Aug 23, 2023
Merged

Introduce new VCE with ActorPool #1969

merged 58 commits into from
Aug 23, 2023

Conversation

jafermarq
Copy link
Contributor

@jafermarq jafermarq commented Jun 25, 2023

This is a draft implementation of the VCE making use of ActorPools as a replacement of using Tasks.

Context

The reasons why we want to move away from using Ray Tasks is because something changed after ray 1.11.1 that made the context (here i'm using this term very loosely) not being shared between tasks, which affects negatively those that make use of GPU resources quite evidently. As observed by several users #1942 #1376 #1383 this leads to GPU OOM.

A working solution to the above is using max_calls=1 in the @ray.remote decorator throughout the RayClientProxy, however, this comes with big overheads when the Ray Tasks (in our case executing a method in a flower client) can be completed in a few seconds. Currently this is the most typical situation in FL: clients don't not hold large volumes of data that requires several minutes of training. In this scenario, using max_calls=1 results in several seconds of task initialisation that represents a significant portion of the total FL simulation time. This gets worse the more Tasks are spawned, i.e., when there are more clients being simulated.

To mitigate this, the advice has been to downgrade to ray 1.11.1. However, such version of Ray is directly incompatible with the latest version of Flower due each requiring conflicting grpcio versions. This can be fixed by building the wheels for Ray 1.11.1 after bumping the version of grpcio. You can find this in my Ray fork (just execute the docker command indicated in python/README-building-wheels.md and you'll get the wheels). This makes that older version of Ray compatible with the current Flower version and there is no OOM issue (and yes, no need to set up max_calls)

Using ActorPool

The approach implemented in this PR to solve the above issue, instantiates an ActorPool upon start_simulation() is executed. The pool contains as many actors (see Actor) as can be fit in the system given the client_resources that can be set by the user (the same behaviour as with the previous implementation using Tasks is preserved). Now, each virtual ClientProxy (for which i have drafted a new one I call RayClientProxyForActorPool) instead of launching a Task, they submit a job to the actor pool. Each job is executed by an idle actor.

How can I test this?

git checkout VCE_with_actorpool
git pull
pip install -e .
# set `num_gpus` in the `client_resources` dictionary
# ensure you pass `is_simulation=True` to the ServerConfig when calling `start_simulation`
python examples/simulation_pytorch/main.py

How does this perform compared to the other two alternatives when we set num_gpus=0.2 (i.e. 5 clients can run concurrently on GPU) ?

  • Using Tasks w/ ray==1.11.1 --> ~22s
  • Using ActorPool w/ ray==2.5.1 (i.e. the code in this PR) --> ~22s
  • Using Tasks and max_calls=1 w/ ray==2.5.1 --> ~110s

The above is tested on a machine with AMD 5900X and NVIDIA 3090Ti.

The difference was even more dramatic is setting num_gpus=0.5: 33s for ActorPool vs 197s for Task with max_calls=1.

As expected, using max_calls for these workloads that take just a couple of seconds is harmful in terms of wall clock time. Given the change in the (low level?) handling of Ray Tasks post 1.11.1, I think the best alternative is to use entities that persist throughout the FL experiment (therefore not needing to release GPU/CPU resources): Ray Actors.

Limitations

  • It is fine to submit one job from each ClientProxy but if we also want each proxy to return a result upon method completion (i.e. when calling fit()), things get a bit tricky if we want these to be in order. The current implementation gets a result (which might not be the one corresponding to the client). This can be mitigated easily by using cid as identifiers of job submission/retrieval. This is the reason why i'm passing the is_simulation flag around.

  • I'm inclined to say that although the current implementation works, it would be better to submit all jobs in one go, using map, and then have a single blocking statement until all jobs are completed. Something like this in the fit_clients() and evaluate_clients() functions in src/py/flwr/server/server.py:

        round_tasks = [client_fn for _ in range(N)] # let's add N tasks (N = clients)
        # let's submit the tasks to the pool
        gen = pool.map(lambda a, v : a.run_client.remote(v), round_tasks)
        round_results = list(gen)

@pedropgusmao
Copy link
Contributor

Nice.
Do we know how Ray performs resource allocation for Actors? Is it done once per actor using CUDA_VISIBLE_DEVICES or does it change every time an actor is called?

I ask because I wonder if using actors will eventually allow specific GPUs to be used by specific actors, and prevent multiple actors from keeping memory in GPUs they are not using. This would probably solve the max_calls=1 requirement, right?

@jafermarq
Copy link
Contributor Author

I'm unsure if max_calls can be used with actors... but actors can be removed from the pool if idle (ray.util.ActorPoo.pop_idle). But again, we don't want anything related to max_calls=1 if using it results in big penalties to run the workloads in the clients (which more often than not are lightweight enough that the cost of "re-init" the process is harmful in terms of overall wall-clock). If the GPU resources allocated for each actor correctly budget for the max VRAM a client would use, an actor would become IDLE only when no more work is supposed to be done in the round.

If specific actors could be mapped to specific GPUs (i guess here you are thinking of the setting where not all GPUs are the same, specially in terms of VRAM) that would be nice. Maybe something Placement Groups can achieve?

@noah822
Copy link

noah822 commented Jun 26, 2023

I personally think this implementation makes a lot of sense. Some comments on the limitation: the second note there might be a better practice as you've said. .map() api of actor will (probably) return an ordered collection of result, so that there is no need to track cid to associate result with the corresponding client.
I am not quite familiar with the mechanism of ray actor and how it schedules its resource usage. Can it eliminate the OOM problem, i.e release memory after a client finishes its local training? The performance in terms of the wall clock time compared with ray task is pretty impressive tho.

@pedropgusmao
Copy link
Contributor

Regarding the limitations:

  • Could fit() be changed to return either its single result (for backward compatibility) or multiple results Union[Tuple[Paramenters, int, Metrics], List[Tuple[Paramenters, int, Metrics]]. This would allow Actors to receive a list of cids and process them in sequence. I think this would also require changing flwr.client.Client.fit() -> Union[FitRes,ListFitRes] the creation of a gRPC message:
message ListFitRes {
    repeated FitRes fit_res = 1;
  }

I used List in the example above to aggregate results, but maybe Dict would be even better, with the gRPC message being

message ListFitRes {
    map<string, FitRes>fit_res = 1;
  }

Removing the requirement of cid being an integer.

  • Regarding the mapping, maybe this could be made available for the user to choose a specific mapping cid -> Actor. This would allow the experimentation of novel virtual client allocation strategies.

@jafermarq
Copy link
Contributor Author

jafermarq commented Jun 26, 2023

@pedropgusmao , yeah ideally we keep the ClientProxy fully backwards compatible and under the same API as the standard client proxy dealing with gRPC devices. What I have done is use a "buffer" where a result are cached if it doesn't belong to the ClientProxy that fetched them from the pool. When this happens, the client proxy waits until its result is added to the cache. All clients will retrieve one result from the actor pool so eventually all ClientProxies will be paired with their respective FitRes (or other type depending on the task)

@danieljanes danieljanes changed the title VCE with ActorPool Introduce new VCE with ActorPool Aug 22, 2023
src/py/flwr/simulation/app.py Outdated Show resolved Hide resolved
src/py/flwr/simulation/app.py Outdated Show resolved Hide resolved
src/py/flwr/simulation/app.py Outdated Show resolved Hide resolved
src/py/flwr/simulation/app.py Outdated Show resolved Hide resolved
src/py/flwr/simulation/ray_transport/ray_actor.py Outdated Show resolved Hide resolved
src/py/flwr/simulation/ray_transport/ray_client_proxy.py Outdated Show resolved Hide resolved
src/py/flwr/simulation/ray_transport/ray_client_proxy.py Outdated Show resolved Hide resolved
Copy link
Member

@danieljanes danieljanes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lgtm, merging it now 🚀

@danieljanes danieljanes merged commit ed7313c into main Aug 23, 2023
27 checks passed
@danieljanes danieljanes deleted the VCE_with_actorpool branch August 23, 2023 08:34
alessiomora pushed a commit to alessiomora/flower that referenced this pull request Aug 30, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants