Skip to content

Commit

Permalink
feat: coordination revision (#394)
Browse files Browse the repository at this point in the history
The coordinated FL is resived. The key changes are:

1) The coordinator has full responsibility of informing the end
of the training to the workers of all roles (top agg, mid agg and
trainer).

2) The top aggregator now receives from the coordinator a list of
middle aggregators that it can interact with. This can prevent the top
aggregator from sending a global model to the middle aggregators which
don't have any trainers. This improves efficiency by avoiding
unnecessary data communication. It may help to support serverless fl
case.

3) For (1), the coordinator expects the rounds information in its
config file.

4) The coordinator and middle aggregator can exchange meta information.
The middle aggregator can share extra information for its load or
resource utilization, etc. This kind of information can be utilized to
make the coordinator make a smart decision on mapping middle
aggregators with trainers.
  • Loading branch information
myungjin committed Apr 7, 2023
1 parent 2622d11 commit 35a46de
Show file tree
Hide file tree
Showing 12 changed files with 235 additions and 191 deletions.
11 changes: 7 additions & 4 deletions lib/python/flame/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@

import asyncio
import logging
from typing import Any, Tuple, Union
from datetime import datetime
from typing import Any, Union

import cloudpickle
from aiostream import stream
Expand Down Expand Up @@ -199,7 +200,7 @@ async def _put():

return status

def recv(self, end_id) -> Any:
def recv(self, end_id) -> tuple[Any, datetime]:
"""Receive a message from an end in a blocking call fashion."""
logger.debug(f"will receive data from {end_id}")

Expand Down Expand Up @@ -231,7 +232,9 @@ async def _get():

return msg, timestamp

def recv_fifo(self, end_ids: list[str], first_k: int = 0) -> Tuple[str, Any]:
def recv_fifo(
self, end_ids: list[str], first_k: int = 0
) -> tuple[Any, tuple[str, datetime]]:
"""Receive a message per end from a list of ends.
The message arrival order among ends is not fixed.
Expand Down Expand Up @@ -305,7 +308,7 @@ async def _streamer_for_recv_fifo(self, end_ids: list[str]):
of their corresponding end so that they can be read later.
"""

async def _get_inner(end_id) -> Tuple[str, Any]:
async def _get_inner(end_id) -> tuple[str, Any]:
if not self.has(end_id):
# can't receive message from end_id
yield end_id, None
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"taskid": "09d06b7526964db86cf37c70e8e0cdb6bd7aa743",
"backend": "mqtt",
"backend": "p2p",
"brokers": [
{
"host": "localhost",
Expand Down Expand Up @@ -32,10 +32,10 @@
},
"funcTags": {
"top-aggregator": [
"notifyCoordinator"
"coordinate"
],
"coordinator": [
"checkEOT"
"coordinateWithTopAgg"
]
}
},
Expand All @@ -54,10 +54,10 @@
},
"funcTags": {
"middle-aggregator": [
"getTrainers"
"coordinate"
],
"coordinator": [
"selectTrainers"
"coordinateWithMidAgg"
]
}
},
Expand All @@ -76,14 +76,17 @@
},
"funcTags": {
"trainer": [
"getAggregator"
"coordinate"
],
"coordinator": [
"selectAggregator"
"coordinateWithTrainer"
]
}
}
],
"hyperparameters": {
"rounds": 10
},
"job": {
"id": "622a358619ab59012eabeefb",
"name": "mnist"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"taskid": "49d06b7526964db86cf37c70e8e0cdb6bd7aa743",
"backend": "mqtt",
"backend": "p2p",
"brokers": [
{
"host": "localhost",
Expand Down Expand Up @@ -32,10 +32,10 @@
},
"funcTags": {
"middle-aggregator": [
"getTrainers"
"coordinate"
],
"coordinator": [
"selectTrainers"
"coordinateWithMidAgg"
]
}
},
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"taskid": "49d06b7526964db86cf37c70e8e0cdb6bd7aa744",
"backend": "mqtt",
"backend": "p2p",
"brokers": [
{
"host": "localhost",
Expand Down Expand Up @@ -32,10 +32,10 @@
},
"funcTags": {
"middle-aggregator": [
"getTrainers"
"coordinate"
],
"coordinator": [
"selectTrainers"
"coordinateWithMidAgg"
]
}
},
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"taskid": "49d06b7526964db86cf37c70e8e0cdb6bd7aa742",
"backend": "mqtt",
"backend": "p2p",
"brokers": [
{
"host": "localhost",
Expand Down Expand Up @@ -31,10 +31,10 @@
},
"funcTags": {
"top-aggregator": [
"notifyCoordinator"
"coordinate"
],
"coordinator": [
"checkEOT"
"coordinateWithTopAgg"
]
}
},
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"taskid": "49d06b7526964db86cf37c70e8e0cdb6bd7aa745",
"backend": "mqtt",
"backend": "p2p",
"brokers": [
{
"host": "localhost",
Expand Down Expand Up @@ -31,10 +31,10 @@
},
"funcTags": {
"trainer": [
"getAggregator"
"coordinate"
],
"coordinator": [
"selectAggregator"
"coordinateWithTrainer"
]
}
},
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"taskid": "49d06b7526964db86cf37c70e8e0cdb6bd7aa746",
"backend": "mqtt",
"backend": "p2p",
"brokers": [
{
"host": "localhost",
Expand Down Expand Up @@ -31,10 +31,10 @@
},
"funcTags": {
"trainer": [
"getAggregator"
"coordinate"
],
"coordinator": [
"selectAggregator"
"coordinateWithTrainer"
]
}
},
Expand Down
Loading

0 comments on commit 35a46de

Please sign in to comment.