Skip to content

Commit

Permalink
feat(framework) Add deployment engine executor (#3629)
Browse files Browse the repository at this point in the history
Co-authored-by: Daniel J. Beutel <daniel@flower.ai>
  • Loading branch information
charlesbvll and danieljanes authored Jun 29, 2024
1 parent 0f64311 commit bb8e413
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 6 deletions.
4 changes: 2 additions & 2 deletions src/py/flwr/cli/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
def build(
directory: Annotated[
Optional[Path],
typer.Option(help="The Flower project directory to bundle into a FAB"),
typer.Option(help="Path of the Flower project to bundle into a FAB"),
] = None,
) -> str:
"""Build a Flower project into a Flower App Bundle (FAB).
Expand Down Expand Up @@ -118,7 +118,7 @@ def build(
fab_file.writestr(".info/CONTENT", list_file_content)

typer.secho(
f"🎊 Successfully built {fab_filename}.", fg=typer.colors.GREEN, bold=True
f"🎊 Successfully built {fab_filename}", fg=typer.colors.GREEN, bold=True
)

return fab_filename
Expand Down
17 changes: 13 additions & 4 deletions src/py/flwr/cli/run/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@
import sys
from enum import Enum
from logging import DEBUG
from pathlib import Path
from typing import Optional

import typer
from typing_extensions import Annotated

from flwr.cli import config_utils
from flwr.cli.build import build
from flwr.common.constant import SUPEREXEC_DEFAULT_ADDRESS
from flwr.common.grpc import GRPC_MAX_MESSAGE_LENGTH, create_channel
from flwr.common.logger import log
Expand Down Expand Up @@ -52,10 +54,14 @@ def run(
case_sensitive=False, help="Use this flag to use the new SuperExec API"
),
] = False,
directory: Annotated[
Optional[Path],
typer.Option(help="Path of the Flower project to run"),
] = None,
) -> None:
"""Run Flower project."""
if use_superexec:
_start_superexec_run()
_start_superexec_run(directory)
return

typer.secho("Loading project configuration... ", fg=typer.colors.BLUE)
Expand Down Expand Up @@ -109,7 +115,7 @@ def run(
)


def _start_superexec_run() -> None:
def _start_superexec_run(directory: Optional[Path]) -> None:
def on_channel_state_change(channel_connectivity: str) -> None:
"""Log channel connectivity."""
log(DEBUG, channel_connectivity)
Expand All @@ -124,5 +130,8 @@ def on_channel_state_change(channel_connectivity: str) -> None:
channel.subscribe(on_channel_state_change)
stub = ExecStub(channel)

req = StartRunRequest()
stub.StartRun(req)
fab_path = build(directory)

req = StartRunRequest(fab_file=Path(fab_path).read_bytes())
res = stub.StartRun(req)
typer.secho(f"🎊 Successfully started run {res.run_id}", fg=typer.colors.GREEN)
109 changes: 109 additions & 0 deletions src/py/flwr/superexec/deployment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
# Copyright 2024 Flower Labs GmbH. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Deployment engine executor."""

import subprocess
import sys
from logging import ERROR, INFO
from typing import Optional

from typing_extensions import override

from flwr.cli.config_utils import get_fab_metadata
from flwr.cli.install import install_from_fab
from flwr.common.grpc import create_channel
from flwr.common.logger import log
from flwr.proto.driver_pb2 import CreateRunRequest # pylint: disable=E0611
from flwr.proto.driver_pb2_grpc import DriverStub
from flwr.server.driver.grpc_driver import DEFAULT_SERVER_ADDRESS_DRIVER

from .executor import Executor, RunTracker


class DeploymentEngine(Executor):
"""Deployment engine executor."""

def __init__(
self,
address: str = DEFAULT_SERVER_ADDRESS_DRIVER,
root_certificates: Optional[bytes] = None,
) -> None:
self.address = address
self.root_certificates = root_certificates
self.stub: Optional[DriverStub] = None

def _connect(self) -> None:
if self.stub is None:
channel = create_channel(
server_address=self.address,
insecure=(self.root_certificates is None),
root_certificates=self.root_certificates,
)
self.stub = DriverStub(channel)

def _create_run(self, fab_id: str, fab_version: str) -> int:
if self.stub is None:
self._connect()

assert self.stub is not None

req = CreateRunRequest(fab_id=fab_id, fab_version=fab_version)
res = self.stub.CreateRun(request=req)
return int(res.run_id)

@override
def start_run(self, fab_file: bytes) -> Optional[RunTracker]:
"""Start run using the Flower Deployment Engine."""
try:
# Install FAB to flwr dir
fab_version, fab_id = get_fab_metadata(fab_file)
fab_path = install_from_fab(fab_file, None, True)

# Install FAB Python package
subprocess.check_call(
[sys.executable, "-m", "pip", "install", str(fab_path)],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)

# Call SuperLink to create run
run_id: int = self._create_run(fab_id, fab_version)
log(INFO, "Created run %s", str(run_id))

# Start ServerApp
proc = subprocess.Popen( # pylint: disable=consider-using-with
[
"flower-server-app",
"--run-id",
str(run_id),
"--insecure",
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
log(INFO, "Started run %s", str(run_id))

return RunTracker(
run_id=run_id,
proc=proc,
)
# pylint: disable-next=broad-except
except Exception as e:
log(ERROR, "Could not start run: %s", str(e))
return None


executor = DeploymentEngine()

0 comments on commit bb8e413

Please sign in to comment.