Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(framework) Add SuperExec servicer #3606

Merged
merged 12 commits into from
Jun 14, 2024
51 changes: 51 additions & 0 deletions src/py/flwr/superexec/exec_grpc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# Copyright 2024 Flower Labs GmbH. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""SuperExec gRPC API."""

from logging import INFO
from typing import Optional, Tuple

import grpc

from flwr.common import GRPC_MAX_MESSAGE_LENGTH
from flwr.common.logger import log
from flwr.proto.exec_pb2_grpc import add_ExecServicer_to_server
from flwr.server.superlink.fleet.grpc_bidi.grpc_server import generic_create_grpc_server

from .exec_servicer import ExecServicer
from .executor import Executor


def run_superexec_api_grpc(
address: str,
plugin: Executor,
certificates: Optional[Tuple[bytes, bytes, bytes]],
) -> grpc.Server:
"""Run SuperExec API (gRPC, request-response)."""
exec_servicer: grpc.Server = ExecServicer(
plugin=plugin,
)
superexec_add_servicer_to_server_fn = add_ExecServicer_to_server
superexec_grpc_server = generic_create_grpc_server(
servicer_and_add_fn=(exec_servicer, superexec_add_servicer_to_server_fn),
server_address=address,
max_message_length=GRPC_MAX_MESSAGE_LENGTH,
certificates=certificates,
)

log(INFO, "Flower ECE: Starting SuperExec API (gRPC-rere) on %s", address)
superexec_grpc_server.start()

return superexec_grpc_server
48 changes: 48 additions & 0 deletions src/py/flwr/superexec/exec_servicer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Copyright 2024 Flower Labs GmbH. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""SuperExec API servicer."""


from logging import INFO
from subprocess import Popen
from typing import Dict

import grpc

from flwr.common.logger import log
from flwr.proto import exec_pb2_grpc # pylint: disable=E0611
from flwr.proto.exec_pb2 import ( # pylint: disable=E0611
StartRunRequest,
StartRunResponse,
)

from .executor import Executor


class ExecServicer(exec_pb2_grpc.ExecServicer):
"""SuperExec API servicer."""

def __init__(self, plugin: Executor) -> None:
charlesbvll marked this conversation as resolved.
Show resolved Hide resolved
self.plugin = plugin
self.runs: Dict[int, Popen] = {} # type: ignore
charlesbvll marked this conversation as resolved.
Show resolved Hide resolved

def StartRun(
self, request: StartRunRequest, context: grpc.ServicerContext
) -> StartRunResponse:
"""Create run ID."""
log(INFO, "ExecServicer.StartRun")
run = self.plugin.start_run(request.fab_file)
self.runs[run.run_id] = run.proc
return StartRunResponse(run_id=run.run_id)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Starting the run can fail. We should immediately start with error handling here.

Copy link
Member Author

@charlesbvll charlesbvll Jun 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@danieljanes Should we make the run_id and proc of RunTracker optionals? Or should we make the return type of start_run optional? I think the second option is the most straightforward

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@danieljanes I updated it but do you think we should add an error field to the proto definition of the response?

60 changes: 60 additions & 0 deletions src/py/flwr/superexec/exec_servicer_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# Copyright 2024 Flower Labs GmbH. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Test the SuperExec API servicer."""


import subprocess
import unittest
from unittest.mock import MagicMock

from flwr.proto.exec_pb2 import StartRunRequest # pylint: disable=E0611

from .exec_servicer import ExecServicer


class ExecServicerTestCase(unittest.TestCase):
"""Test suite for class ExecServicer and helper functions."""

def test_start_run(self) -> None:
"""Test StartRun method of ExecServicer."""
run_res = MagicMock()
run_res.run_id = 10
with subprocess.Popen(
["echo", "success"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
) as proc:
run_res.proc = proc

exec_plugin = MagicMock()
exec_plugin.start_run = lambda _: run_res

context_mock = MagicMock()

request = StartRunRequest()
request.fab_file = b"test"

# Create a instance of FlowerServiceServicer
servicer = ExecServicer(plugin=exec_plugin)

# Execute
response = servicer.StartRun(request, context_mock)

assert response.run_id == 10


if __name__ == "__main__":
unittest.main(verbosity=2)
59 changes: 59 additions & 0 deletions src/py/flwr/superexec/executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# Copyright 2024 Flower Labs GmbH. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Execute and monitor a Flower run."""

from abc import ABC, abstractmethod
from dataclasses import dataclass
from subprocess import Popen
from typing import Optional


@dataclass
class Run:
charlesbvll marked this conversation as resolved.
Show resolved Hide resolved
"""Represents a Flower run (composed of a run_id and the associated process)."""
charlesbvll marked this conversation as resolved.
Show resolved Hide resolved

run_id: int
proc: Popen # type: ignore


class Executor(ABC):
"""Execute and monitor a Flower run."""

@abstractmethod
def start_run(
self,
fab_file: bytes,
ttl: Optional[float] = None,
charlesbvll marked this conversation as resolved.
Show resolved Hide resolved
) -> Run:
"""Start a run using the given Flower App ID and version.
charlesbvll marked this conversation as resolved.
Show resolved Hide resolved

This method creates a new run on the SuperLink, returns its run_id
and also starts the run execution.

Parameters
----------
fab_file : bytes
The Flower App Bundle file bytes.
ttl : Optional[float] (default: None)
Time-to-live for the round trip of this message, i.e., the time from sending
this message to receiving a reply. It specifies in seconds the duration for
which the message and its potential reply are considered valid. If unset,
the default TTL (i.e., `common.DEFAULT_TTL`) will be used.
charlesbvll marked this conversation as resolved.
Show resolved Hide resolved

Returns
-------
run_id : int
The run_id of the run created by the SuperLink.
charlesbvll marked this conversation as resolved.
Show resolved Hide resolved
"""