Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(framework) Add deployment engine executor #3629

Merged
merged 28 commits into from
Jun 29, 2024
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
b908908
feat(framework) Add deployment engine executor
charlesbvll Jun 17, 2024
93f4fc5
Merge branch 'main' into add-deployment-engine-executor
charlesbvll Jun 17, 2024
fd70046
Merge branch 'main' into add-deployment-engine-executor
danieljanes Jun 19, 2024
1216a70
Merge branch 'main' into add-deployment-engine-executor
charlesbvll Jun 21, 2024
3eed1b3
Add copyright notice
charlesbvll Jun 21, 2024
988d63f
Apply suggestions
charlesbvll Jun 21, 2024
7eb92d0
Add simple error handling
charlesbvll Jun 21, 2024
d33ced6
Disable pylint error
charlesbvll Jun 21, 2024
986cbd6
fix imports
charlesbvll Jun 21, 2024
25434ef
Merge branch 'main' into add-deployment-engine-executor
charlesbvll Jun 21, 2024
21831e3
Merge branch 'main' into add-deployment-engine-executor
charlesbvll Jun 21, 2024
d69b3e7
Remove debug line
charlesbvll Jun 21, 2024
035abb0
Merge branch 'main' into add-deployment-engine-executor
charlesbvll Jun 21, 2024
45e1af1
Add app to flwr run and fix order
charlesbvll Jun 21, 2024
e8e4015
Merge branch 'main' into add-deployment-engine-executor
charlesbvll Jun 21, 2024
2968e61
Merge branch 'main' into add-deployment-engine-executor
charlesbvll Jun 23, 2024
8f11125
Merge branch 'main' into add-deployment-engine-executor
charlesbvll Jun 23, 2024
d0a746c
Merge branch 'main' into add-deployment-engine-executor
charlesbvll Jun 24, 2024
dbcdac2
Merge branch 'main' into add-deployment-engine-executor
danieljanes Jun 28, 2024
05e1afc
Log run start
danieljanes Jun 29, 2024
e24790d
Create run after FAB installation
danieljanes Jun 29, 2024
62708ef
Rename deployment_plugin to deployment
danieljanes Jun 29, 2024
4f736d4
Format deployment engine exec
danieljanes Jun 29, 2024
2672b95
Log run_id
danieljanes Jun 29, 2024
60fa3c7
Remove mentions of 'plugin'
danieljanes Jun 29, 2024
c4a1039
Align argument naming
danieljanes Jun 29, 2024
da3182f
Fix lint issues
danieljanes Jun 29, 2024
70e6777
Merge branch 'main' into add-deployment-engine-executor
danieljanes Jun 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/py/flwr/cli/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
def build(
directory: Annotated[
Optional[Path],
typer.Option(help="The Flower project directory to bundle into a FAB"),
typer.Option(help="Path of the Flower project to bundle into a FAB"),
] = None,
) -> str:
"""Build a Flower project into a Flower App Bundle (FAB).
Expand Down Expand Up @@ -118,7 +118,7 @@ def build(
fab_file.writestr(".info/CONTENT", list_file_content)

typer.secho(
f"🎊 Successfully built {fab_filename}.", fg=typer.colors.GREEN, bold=True
f"🎊 Successfully built {fab_filename}", fg=typer.colors.GREEN, bold=True
)

return fab_filename
Expand Down
17 changes: 13 additions & 4 deletions src/py/flwr/cli/run/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@
import sys
from enum import Enum
from logging import DEBUG
from pathlib import Path
from typing import Optional

import typer
from typing_extensions import Annotated

from flwr.cli import config_utils
from flwr.cli.build import build
from flwr.common.constant import SUPEREXEC_DEFAULT_ADDRESS
from flwr.common.grpc import GRPC_MAX_MESSAGE_LENGTH, create_channel
from flwr.common.logger import log
Expand Down Expand Up @@ -52,10 +54,14 @@ def run(
case_sensitive=False, help="Use this flag to use the new SuperExec API"
),
] = False,
directory: Annotated[
Optional[Path],
typer.Option(help="Path of the Flower project to run"),
] = None,
) -> None:
"""Run Flower project."""
if use_superexec:
_start_superexec_run()
_start_superexec_run(directory)
return

typer.secho("Loading project configuration... ", fg=typer.colors.BLUE)
Expand Down Expand Up @@ -109,7 +115,7 @@ def run(
)


def _start_superexec_run() -> None:
def _start_superexec_run(directory: Optional[Path]) -> None:
def on_channel_state_change(channel_connectivity: str) -> None:
"""Log channel connectivity."""
log(DEBUG, channel_connectivity)
Expand All @@ -124,5 +130,8 @@ def on_channel_state_change(channel_connectivity: str) -> None:
channel.subscribe(on_channel_state_change)
stub = ExecStub(channel)

req = StartRunRequest()
stub.StartRun(req)
fab_path = build(directory)

req = StartRunRequest(fab_file=Path(fab_path).read_bytes())
res = stub.StartRun(req)
typer.secho(f"🎊 Successfully started run {res.run_id}", fg=typer.colors.GREEN)
109 changes: 109 additions & 0 deletions src/py/flwr/superexec/deployment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
# Copyright 2024 Flower Labs GmbH. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Deployment engine executor."""

import subprocess
import sys
from logging import ERROR, INFO
from typing import Optional

from typing_extensions import override

from flwr.cli.config_utils import get_fab_metadata
from flwr.cli.install import install_from_fab
from flwr.common.grpc import create_channel
from flwr.common.logger import log
from flwr.proto.driver_pb2 import CreateRunRequest # pylint: disable=E0611
from flwr.proto.driver_pb2_grpc import DriverStub
from flwr.server.driver.grpc_driver import DEFAULT_SERVER_ADDRESS_DRIVER

from .executor import Executor, RunTracker


class DeploymentEngine(Executor):
"""Deployment engine executor."""

def __init__(
self,
address: str = DEFAULT_SERVER_ADDRESS_DRIVER,
root_certificates: Optional[bytes] = None,
) -> None:
self.address = address
self.root_certificates = root_certificates
self.stub: Optional[DriverStub] = None

def _connect(self) -> None:
if self.stub is None:
channel = create_channel(
server_address=self.address,
insecure=(self.root_certificates is None),
root_certificates=self.root_certificates,
)
self.stub = DriverStub(channel)

def _create_run(self, fab_id: str, fab_version: str) -> int:
if self.stub is None:
self._connect()

assert self.stub is not None

req = CreateRunRequest(fab_id=fab_id, fab_version=fab_version)
res = self.stub.CreateRun(request=req)
return int(res.run_id)

@override
def start_run(self, fab_file: bytes) -> Optional[RunTracker]:
"""Start run using the Flower Deployment Engine."""
try:
# Install FAB to flwr dir
fab_version, fab_id = get_fab_metadata(fab_file)
fab_path = install_from_fab(fab_file, None, True)

# Install FAB Python package
subprocess.check_call(
[sys.executable, "-m", "pip", "install", str(fab_path)],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)

# Call SuperLink to create run
run_id: int = self._create_run(fab_id, fab_version)
log(INFO, "Created run %s", str(run_id))

# Start ServerApp
proc = subprocess.Popen(
[
"flower-server-app",
"--run-id",
str(run_id),
"--insecure",
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
log(INFO, "Started run %s", str(run_id))

return RunTracker(
run_id=run_id,
proc=proc,
)
# pylint: disable-next=broad-except
except Exception as e:
log(ERROR, "Could not start run: %s", str(e))
return None


exec = DeploymentEngine()
Loading