Skip to content

Commit

Permalink
Development merge conflicts with main resolved (#356)
Browse files Browse the repository at this point in the history
* optimizer compatibility with tensorflow and example for medmnist keras/pytorch (#320)

Tensorflow compatibility for new optimizers was added, which included fedavg, fedadam, fedadagrad, and fedyogi.

A shell script for tesing all 8 possible combinations of optimizers and frameworks is included.
This allows the medmnist example to be run with keras (the folder structure was refactored to include a trainer and aggregator for keras).

The typo in fedavg.py has now been fixed.

* feat+fix: grpc support for hierarchical fl (#321)

Hierarchical fl didn't work with grpc as backend. This is because
groupby field was not considered in metaserver service and p2p
backend.

In addition, a middle aggregator hangs even after a job is
completed. This deadlock occurs because p2p backend cleanup code is
called as a part of a channel cleanup. However, in a middle
aggregator, p2p backend is responsible for tasks across all
channnels. The p2p cleanup code couldn't finish cleanup because
a broadcast task for in the other channel can't finish. This bug is
fixed here by getting the p2p backend cleanup code out side of channel
cleanup code.

* documenation for metaserver/mqtt local (#322)

Documentation for using metaserver will allow users to run examples with a local broker.
It also allows for mqtt local brokers.
This decreases the chances of any job ID collisions.

Modifications to the config.json for the mnist example were made in order to make it easier to switch to a local broker.
The readme does indicate how to do this for other examples now.

Co-authored-by: vboxuser <vboxuser@Ubuntu.myguest.virtualbox.org>

* feat: asynchronous fl (#323)

Asynchronous FL is implemented for two-tier topology and three-tier
hierarchical topology.

The main algorithm is based on the following two papers:
- https://arxiv.org/pdf/2111.04877.pdf
- https://arxiv.org/pdf/2106.06639.pdf

Two examples for asynchronous fl are also added. One is for a two-tier
topology and the other for a three-tier hierarchical topology.

This implementation includes the core algorithm but  doesn't include
SecAgg algorithm (presented in the papers), which is not the scope of
this change.

* fix+refactor: asyncfl loss divergence (#330)

For asyncfl, a client (trainer) should send delta by subtracting local
weights from original global weights after training. In the current
implementation, the whole local weights were sent to a
server (aggregator). This causes loss divergence.

Supporting delta update requires refactoring of aggregators of
synchronous fl (horizontal/{top_aggregator.py, middle_aggregator.py})
as well as optimizers' do() function.

The changes here support delta update universally across all types of
modes (horizontal synchronous, asynchronous, and hybrid).

* fix: conflict bewtween integer tensor and float tensor (#335)

Model architectures can have integer tensors. Applying aggregation on
those tensors results in type mistmatch and throws a runtime error:
"RuntimeError: result type Float can't be cast to the desired output
type Long"

Integer tensors don't matter in back propagation. So, as a workaround
to the issue, we typecast to the original dtype when the original type
is different from the dtype of weighted tensors for aggregation. In
this way, we can keep the model architecture as is.

* refactor: config for hybrid example in library (#334)

To enable library-only execution for hybrid example, its configuration
files are updated accordingly. The revised configuration has local
mqtt and p2p broker config and p2p broker is selected.

* misc: asynchronous hierarchical fl example (#340)

Since the Flame SDK supports asynchronous FL, we add an example of an
asynchronous hierarchical FL for control plane.

* chore: clean up examples folder (#336)

The examples folder at the top level directory has some outdated and
irrelevant files. Those are now removed from the folder.

* fix: workaround for hybrid mode with two p2p backends (#345)

Due to grpc/grpc#25364, when two p2p
backends (which rely on grpc and asyncio) are defined, the hybrid mode
example throws an execption: 'BlockingIOError: [Errno 35] Resource
temporarily unavailable'. The issue still appears unresolved. As a
temporary workaround, we use two different types of backends: mqtt for
one and p2p for the other. This means that when this example is
executed, both metaserver and a mqtt broker (e.g., mosquitto) must be
running in the local machine.

* fix: distributed mode (#344)

Distributed mode has a bug: before 'weights' is not defined as member
variable, deepcopy(self.weights) in _update_weights() is called.
To address this issue, self.weights is initialized in __init__().

Also, to run a distributed example locally, configuration files are
revised.

* example/implementation for fedprox (#339)

This example is similar to the ones seen in the fedprox paper, although it currently does not simmulate stragglers and uses another dataset/architecture.

A few things were changed in order for there to be a simple process for modifying trainers.
This includes a function in util.py and another class variable in the trainer containing information on the client side regularizer.

Additionally, tests are automated (mu=1,0.1,0.01,0.001,0) so running the example generates or modifies existing files in order to provide the propper configuration for an experiment.

* Create diagnose script (#348)

* Create diagnose script

* Make the script executable

---------

Co-authored-by: Alex Ungurean <aungurea@cisco.com>

* refactor+fix: configurable deployer / lib regularizer fix (#351)

deployer's job template file is hard-coded, which makes it hard to use
different template file at deployment time. Using different different
template file is useful when underlying infrastructure is
different (e.g., k8s vs knative). To support that, template folder and
file is fed as config variables.

Also, deployer's config info is fed as command argument, which is
cumbersome. So, the config parsing part is refactored such that the
info is fed as a configuration file.

During the testing of deployer change, a bug in the library
is identified. The fix for it is added here too.

Finally, the local dns configuration in flame.sh is updated so that it
can be done correctly across different linux distributions (e.g.,
archlinux and ubuntu). The tests for flame.sh are under archlinux and
ubuntu.

* Add missing merge fix

* fix: out of order message delivery (#354)

In order to minimize the blocking of asynchronous I/O tasks when a
large number of message chunks (e.g., model size is large) need to be
assembled, threading is used. This led to a bug wherein messages are
delivered to a receive queue in an out-of-order fashion. For example,
consider two back-to-back messages; the first message is larger than
the second. In this case, the second can be inserted before the first
because the second has high chance to finish the assembling.

The changes here are for fixing the bug. To do so, we leverage another
queue to synchronize message delivery order. For that, chunk manager
class is introduced. One chunk manager is created for a backend. And
it maintains a list of chunk threads per end, each of which is
responsible for assemling messages sequentially and pushing them into
the receive queue of the end.

* gpu and cpu compatibility pytorch (#350)

This modification will allow trainers/aggregators to work on different devices across different machines.

Basically, all weights are communicated by placing them on the CPU before serializing them.
Then, they are moved back to the device where they were previously.

All trainers now require a self.model attribute and all middle aggregators must only use the CPU (not enforced).

* Make sdk config backwards compatible. (#355)

---------

Co-authored-by: GustavBaumgart <98069699+GustavBaumgart@users.noreply.github.com>
Co-authored-by: Myungjin Lee <myungjin@users.noreply.github.com>
Co-authored-by: vboxuser <vboxuser@Ubuntu.myguest.virtualbox.org>
Co-authored-by: alexandruuBytex <56033021+alexandruuBytex@users.noreply.github.com>
Co-authored-by: Alex Ungurean <aungurea@cisco.com>
Co-authored-by: elqurio <119978637+elqurio@users.noreply.github.com>
  • Loading branch information
7 people committed Mar 3, 2023
1 parent f981cb4 commit 956d7ff
Show file tree
Hide file tree
Showing 11 changed files with 217 additions and 70 deletions.
115 changes: 115 additions & 0 deletions lib/python/flame/backend/chunk_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
# Copyright 2023 Cisco Systems, Inc. and its affiliates
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# SPDX-License-Identifier: Apache-2.0
"""Chunk Manager."""

import logging
from queue import Queue
from threading import Thread

from ..channel import Channel
from ..common.util import run_async
from ..proto import backend_msg_pb2 as msg_pb2
from .chunk_store import ChunkStore

logger = logging.getLogger(__name__)

KEY_CHANNEL = "channel"
KEY_LOOP = "loop"


class ChunkThread(Thread):
"""ChunkThread class."""

def __init__(self,
group=None,
target=None,
name=None,
args=(),
kwargs=None,
*,
daemon=None):
"""Initialize an instance."""
# call parent constructure method
super().__init__(group, target, name, daemon=daemon)

self._loop = kwargs[KEY_LOOP]
self._channel = kwargs[KEY_CHANNEL]

self.queue = Queue()
self.chunk_store = ChunkStore()

def insert(self, msg: msg_pb2.Data) -> None:
"""Put a message into queue in the chunk thread."""
self.queue.put(msg)

def run(self):
"""Override run function of Thread.
The function assembles chunks into a full-size message and passes
the message to its designated receive queue.
"""

async def inner(end_id: str, data: bytes):
logger.debug(f'fully assembled data size = {len(data)}')

rxq = self._channel.get_rxq(end_id)
if rxq is None:
logger.debug(f"rxq not found for {end_id}")
return

await rxq.put(data)

while True:
msg = self.queue.get()

# assemble is done in a chunk thread so that it won't block
# asyncio task
status = self.chunk_store.assemble(msg)
if not status:
# reset chunk_store if message is wrong
self.chunk_store.reset()
else:
if not self.chunk_store.eom:
# not an end of message, hence, can't get a payload
# out of chunk store yet
continue

payload = self.chunk_store.get_data()
# now push payload to a target receive queue.
_, status = run_async(inner(msg.end_id, payload), self._loop)

# message was completely assembled, reset the chunk store
self.chunk_store.reset()


class ChunkManager(object):
"""ChunkStore class."""

def __init__(self, loop):
"""Initialize an instance."""
self._loop = loop
self._chunk_threads: dict[str, ChunkThread] = {}

def handle(self, msg: msg_pb2.Data, channel: Channel) -> None:
"""Process msg."""
if msg.end_id not in self._chunk_threads:
kwargs = {KEY_LOOP: self._loop, KEY_CHANNEL: channel}
chunk_thd = ChunkThread(kwargs=kwargs, daemon=True)
self._chunk_threads[msg.end_id] = chunk_thd
chunk_thd.start()

chunk_thd = self._chunk_threads[msg.end_id]
chunk_thd.insert(msg)
44 changes: 10 additions & 34 deletions lib/python/flame/backend/chunk_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,9 @@
"""ChunkStore."""

import logging
from threading import Thread
from typing import Tuple, Union

from ..common.constants import EMPTY_PAYLOAD
from ..common.util import run_async
from ..proto import backend_msg_pb2 as msg_pb2

DEFAULT_CHUNK_SIZE = 1048576 # 1MB
Expand All @@ -31,11 +29,12 @@
class ChunkStore(object):
"""ChunkStore class."""

def __init__(self, loop=None, channel=None):
def __init__(self):
"""Initialize an instance."""
self._loop = loop
self._channel = channel
self.reset()

def reset(self):
"""Reset the state of chunk store."""
# for fragment
self.pidx = 0
self.cidx = DEFAULT_CHUNK_SIZE
Expand All @@ -44,10 +43,14 @@ def __init__(self, loop=None, channel=None):
self.recv_buf = list()

# for both fragment and assemble
self.data = b''
self.data = EMPTY_PAYLOAD
self.seqno = -1
self.eom = False

def get_data(self) -> Union[bytes, None]:
"""Return data."""
return self.data

def set_data(self, data: bytes) -> None:
"""Set data in chunk store."""
logger.debug(f"setting data of size {len(data)}")
Expand Down Expand Up @@ -91,11 +94,6 @@ def assemble(self, msg: msg_pb2.Data) -> bool:
This method pushes message payload into a receive buffer.
If eom (end of message) is set, bytes in the array are joined.
Then, the assembled data will be put into a receive queue.
The join operation can be exepnsive if the data size is large.
We run the join operation in a separate thread in order to unblock
asyncio tasks as quickly as possible.
"""
# out of order delivery
if self.seqno + 1 != msg.seqno:
Expand All @@ -109,28 +107,6 @@ def assemble(self, msg: msg_pb2.Data) -> bool:
self.eom = msg.eom

if self.eom:
# we assemble payloads in the recv buf array
# only if eom is set to True.
# In this way, we only pay byte concatenation cost once
_thread = Thread(target=self._assemble,
args=(msg.end_id, ),
daemon=True)
_thread.start()
self.data = EMPTY_PAYLOAD.join(self.recv_buf)

return True

def _assemble(self, end_id: str) -> None:
# This code must be executed in a separate thread
self.data = EMPTY_PAYLOAD.join(self.recv_buf)

async def inner():
logger.debug(f'fully assembled data size = {len(self.data)}')

rxq = self._channel.get_rxq(end_id)
if rxq is None:
logger.debug(f"rxq not found for {end_id}")
return

await rxq.put(self.data)

_, status = run_async(inner(), self._loop)
17 changes: 6 additions & 11 deletions lib/python/flame/backend/mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from ..common.util import background_thread_loop, run_async
from ..proto import backend_msg_pb2 as msg_pb2
from .abstract import AbstractBackend
from .chunk_manager import ChunkManager
from .chunk_store import ChunkStore

END_STATUS_ON = 'online'
Expand Down Expand Up @@ -75,7 +76,6 @@ def __init__(self):
self._broker = None
self._mqtt_client = None
self._last_payload_sig = None
self._msg_chunks = None
self._cleanup_waits = None
if self._initialized:
return
Expand All @@ -86,6 +86,9 @@ def __init__(self):
with background_thread_loop() as loop:
self._loop = loop

# initialize a chunk manager
self.chunk_mgr = ChunkManager(self._loop)

async def _init_loop_stuff():
self._eventq = asyncio.Queue()

Expand Down Expand Up @@ -114,8 +117,6 @@ async def _monitor_end_termination(self):

def configure(self, broker: str, job_id: str, task_id: str):
"""Configure the backend."""
self._msg_chunks: dict[str, ChunkStore] = {}

self._broker = broker
self._job_id = job_id
self._id = task_id
Expand Down Expand Up @@ -246,14 +247,8 @@ async def _handle_data(self, any_msg: Any) -> None:
expiry = time.time() + MQTT_TIME_WAIT
self._cleanup_waits[msg.end_id] = expiry

if msg.end_id not in self._msg_chunks:
channel = self._channels[msg.channel_name]
self._msg_chunks[msg.end_id] = ChunkStore(self._loop, channel)

chunk_store = self._msg_chunks[msg.end_id]
if not chunk_store.assemble(msg) or chunk_store.eom:
# clean up if message is wrong or completely assembled
del self._msg_chunks[msg.end_id]
channel = self._channels[msg.channel_name]
self.chunk_mgr.handle(msg, channel)

async def _rx_task(self):
self._rx_deque = deque()
Expand Down
17 changes: 6 additions & 11 deletions lib/python/flame/backend/p2p.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from ..proto import backend_msg_pb2_grpc as msg_pb2_grpc
from ..proto import meta_pb2, meta_pb2_grpc
from .abstract import AbstractBackend
from .chunk_manager import ChunkManager
from .chunk_store import ChunkStore

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -106,7 +107,6 @@ def __init__(self):
self._id = None
self._channels = None
self._broker = None
self._msg_chunks = None
if self._initialized:
return

Expand All @@ -121,6 +121,9 @@ def __init__(self):
with background_thread_loop() as loop:
self._loop = loop

# initialize a chunk manager
self.chunk_mgr = ChunkManager(self._loop)

async def _init_loop_stuff():
self._eventq = asyncio.Queue()

Expand Down Expand Up @@ -153,8 +156,6 @@ async def _setup_server(self):

def configure(self, broker: str, job_id: str, task_id: str):
"""Configure the backend."""
self._msg_chunks: dict[str, ChunkStore] = {}

self._broker = broker
self._job_id = job_id
self._id = task_id
Expand Down Expand Up @@ -323,14 +324,8 @@ async def _handle_data(self, msg: msg_pb2.Data) -> None:
logger.debug('message sent to self; do nothing')
return

if msg.end_id not in self._msg_chunks:
channel = self._channels[msg.channel_name]
self._msg_chunks[msg.end_id] = ChunkStore(self._loop, channel)

chunk_store = self._msg_chunks[msg.end_id]
if not chunk_store.assemble(msg) or chunk_store.eom:
# clean up if message is wrong or completely assembled
del self._msg_chunks[msg.end_id]
channel = self._channels[msg.channel_name]
self.chunk_mgr.handle(msg, channel)

async def _tx_task(self, channel, end_id, comm_type: CommType):
"""Conducts data transmission in a loop.
Expand Down
6 changes: 6 additions & 0 deletions lib/python/flame/common/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,9 @@ class CommType(Enum):

BROADCAST = 1
UNICAST = 2

class DeviceType(Enum):
"""Enum class for device."""

CPU = 1
GPU = 2
37 changes: 37 additions & 0 deletions lib/python/flame/common/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

from ..config import Config
from .typing import ModelWeights
from .constants import DeviceType

PYTORCH = 'torch'
TENSORFLOW = 'tensorflow'
Expand Down Expand Up @@ -137,3 +138,39 @@ def delta_weights_tensorflow(a: ModelWeights,
return None

return [x - y for (x, y) in zip(a, b)]


def get_pytorch_device(dtype: DeviceType):
import torch
if dtype == DeviceType.CPU:
device_name = "cpu"
elif dtype == DeviceType.GPU:
device_name = "cuda"
else:
raise TypeError(f"Device type {dtype} is not supported.")

return torch.device(device_name)

def weights_to_device(weights, dtype: DeviceType):
"""Send model weights to device type dtype."""

framework = get_ml_framework_in_use()
if framework == MLFramework.TENSORFLOW:
return weights
elif framework == MLFramework.PYTORCH:
torch_device = get_pytorch_device(dtype)
return {name: weights[name].to(torch_device) for name in weights}

return None

def weights_to_model_device(weights, model):
"""Send model weights to same device as model"""
framework = get_ml_framework_in_use()
if framework == MLFramework.TENSORFLOW:
return weights
elif framework == MLFramework.PYTORCH:
# make assumption all tensors are on same device
torch_device = next(model.parameters()).device
return {name: weights[name].to(torch_device) for name in weights}

return None
Loading

0 comments on commit 956d7ff

Please sign in to comment.