Skip to content

Commit

Permalink
Fix merge conflicts between development and main branch (#353)
Browse files Browse the repository at this point in the history
* optimizer compatibility with tensorflow and example for medmnist keras/pytorch (#320)

Tensorflow compatibility for new optimizers was added, which included fedavg, fedadam, fedadagrad, and fedyogi.

A shell script for tesing all 8 possible combinations of optimizers and frameworks is included.
This allows the medmnist example to be run with keras (the folder structure was refactored to include a trainer and aggregator for keras).

The typo in fedavg.py has now been fixed.

* feat+fix: grpc support for hierarchical fl (#321)

Hierarchical fl didn't work with grpc as backend. This is because
groupby field was not considered in metaserver service and p2p
backend.

In addition, a middle aggregator hangs even after a job is
completed. This deadlock occurs because p2p backend cleanup code is
called as a part of a channel cleanup. However, in a middle
aggregator, p2p backend is responsible for tasks across all
channnels. The p2p cleanup code couldn't finish cleanup because
a broadcast task for in the other channel can't finish. This bug is
fixed here by getting the p2p backend cleanup code out side of channel
cleanup code.

* documenation for metaserver/mqtt local (#322)

Documentation for using metaserver will allow users to run examples with a local broker.
It also allows for mqtt local brokers.
This decreases the chances of any job ID collisions.

Modifications to the config.json for the mnist example were made in order to make it easier to switch to a local broker.
The readme does indicate how to do this for other examples now.

Co-authored-by: vboxuser <vboxuser@Ubuntu.myguest.virtualbox.org>

* feat: asynchronous fl (#323)

Asynchronous FL is implemented for two-tier topology and three-tier
hierarchical topology.

The main algorithm is based on the following two papers:
- https://arxiv.org/pdf/2111.04877.pdf
- https://arxiv.org/pdf/2106.06639.pdf

Two examples for asynchronous fl are also added. One is for a two-tier
topology and the other for a three-tier hierarchical topology.

This implementation includes the core algorithm but  doesn't include
SecAgg algorithm (presented in the papers), which is not the scope of
this change.

* fix+refactor: asyncfl loss divergence (#330)

For asyncfl, a client (trainer) should send delta by subtracting local
weights from original global weights after training. In the current
implementation, the whole local weights were sent to a
server (aggregator). This causes loss divergence.

Supporting delta update requires refactoring of aggregators of
synchronous fl (horizontal/{top_aggregator.py, middle_aggregator.py})
as well as optimizers' do() function.

The changes here support delta update universally across all types of
modes (horizontal synchronous, asynchronous, and hybrid).

* fix: conflict bewtween integer tensor and float tensor (#335)

Model architectures can have integer tensors. Applying aggregation on
those tensors results in type mistmatch and throws a runtime error:
"RuntimeError: result type Float can't be cast to the desired output
type Long"

Integer tensors don't matter in back propagation. So, as a workaround
to the issue, we typecast to the original dtype when the original type
is different from the dtype of weighted tensors for aggregation. In
this way, we can keep the model architecture as is.

* refactor: config for hybrid example in library (#334)

To enable library-only execution for hybrid example, its configuration
files are updated accordingly. The revised configuration has local
mqtt and p2p broker config and p2p broker is selected.

* misc: asynchronous hierarchical fl example (#340)

Since the Flame SDK supports asynchronous FL, we add an example of an
asynchronous hierarchical FL for control plane.

* chore: clean up examples folder (#336)

The examples folder at the top level directory has some outdated and
irrelevant files. Those are now removed from the folder.

* fix: workaround for hybrid mode with two p2p backends (#345)

Due to grpc/grpc#25364, when two p2p
backends (which rely on grpc and asyncio) are defined, the hybrid mode
example throws an execption: 'BlockingIOError: [Errno 35] Resource
temporarily unavailable'. The issue still appears unresolved. As a
temporary workaround, we use two different types of backends: mqtt for
one and p2p for the other. This means that when this example is
executed, both metaserver and a mqtt broker (e.g., mosquitto) must be
running in the local machine.

* fix: distributed mode (#344)

Distributed mode has a bug: before 'weights' is not defined as member
variable, deepcopy(self.weights) in _update_weights() is called.
To address this issue, self.weights is initialized in __init__().

Also, to run a distributed example locally, configuration files are
revised.

* example/implementation for fedprox (#339)

This example is similar to the ones seen in the fedprox paper, although it currently does not simmulate stragglers and uses another dataset/architecture.

A few things were changed in order for there to be a simple process for modifying trainers.
This includes a function in util.py and another class variable in the trainer containing information on the client side regularizer.

Additionally, tests are automated (mu=1,0.1,0.01,0.001,0) so running the example generates or modifies existing files in order to provide the propper configuration for an experiment.

* Create diagnose script (#348)

* Create diagnose script

* Make the script executable

---------

Co-authored-by: Alex Ungurean <aungurea@cisco.com>

* refactor+fix: configurable deployer / lib regularizer fix (#351)

deployer's job template file is hard-coded, which makes it hard to use
different template file at deployment time. Using different different
template file is useful when underlying infrastructure is
different (e.g., k8s vs knative). To support that, template folder and
file is fed as config variables.

Also, deployer's config info is fed as command argument, which is
cumbersome. So, the config parsing part is refactored such that the
info is fed as a configuration file.

During the testing of deployer change, a bug in the library
is identified. The fix for it is added here too.

Finally, the local dns configuration in flame.sh is updated so that it
can be done correctly across different linux distributions (e.g.,
archlinux and ubuntu). The tests for flame.sh are under archlinux and
ubuntu.

* Add missing merge fix

* Make sdk config backwards compatible. (#355)

---------

Co-authored-by: GustavBaumgart <98069699+GustavBaumgart@users.noreply.github.com>
Co-authored-by: Myungjin Lee <myungjin@users.noreply.github.com>
Co-authored-by: vboxuser <vboxuser@Ubuntu.myguest.virtualbox.org>
Co-authored-by: alexandruuBytex <56033021+alexandruuBytex@users.noreply.github.com>
Co-authored-by: Alex Ungurean <aungurea@cisco.com>
Co-authored-by: elqurio <119978637+elqurio@users.noreply.github.com>
  • Loading branch information
7 people committed Mar 3, 2023
1 parent 0d35888 commit f981cb4
Show file tree
Hide file tree
Showing 144 changed files with 6,065 additions and 549 deletions.
55 changes: 34 additions & 21 deletions cmd/deployer/app/resource_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"google.golang.org/grpc/credentials/insecure"

"github.com/cisco-open/flame/cmd/deployer/app/deployer"
"github.com/cisco-open/flame/cmd/deployer/config"
"github.com/cisco-open/flame/pkg/openapi"
"github.com/cisco-open/flame/pkg/openapi/constants"
pbNotify "github.com/cisco-open/flame/pkg/proto/notification"
Expand All @@ -42,13 +43,9 @@ import (
)

const (
deploymentDirPath = "/" + util.ProjectName + "/deployment"
deploymentTemplateDir = "templates"

jobTemplateDirPath = "/" + util.ProjectName + "/template"
jobDeploymentFilePrefix = "job-agent"
jobTemplatePath = jobTemplateDirPath + "/" + jobDeploymentFilePrefix + ".yaml.mustache"
k8sShortLabelLength = 12
k8sShortLabelLength = 12
)

var (
Expand All @@ -64,13 +61,17 @@ type resourceHandler struct {
namespace string
dplyr deployer.Deployer

// variables for job templates
jobTemplateDirPath string
jobTemplatePath string
deploymentDirPath string

stream pbNotify.DeployEventRoute_GetDeployEventClient

grpcDialOpt grpc.DialOption
}

func NewResourceHandler(apiserverEp string, notifierEp string, computeSpec openapi.ComputeSpec,
platform string, namespace string, bInsecure bool, bPlain bool) *resourceHandler {
func NewResourceHandler(cfg *config.Config, computeSpec openapi.ComputeSpec, bInsecure bool, bPlain bool) *resourceHandler {
var grpcDialOpt grpc.DialOption

if bPlain {
Expand All @@ -86,21 +87,28 @@ func NewResourceHandler(apiserverEp string, notifierEp string, computeSpec opena
grpcDialOpt = grpc.WithTransportCredentials(credentials.NewTLS(tlsCfg))
}

dplyr, err := deployer.NewDeployer(platform)
dplyr, err := deployer.NewDeployer(cfg.Platform)
if err != nil {
zap.S().Errorf("failed to obtain a job deployer: %v", err)
return nil
}

parentDir := filepath.Dir(cfg.JobTemplate.Folder)
deploymentDirPath := filepath.Join(parentDir, "deployment")

rHandler := &resourceHandler{
apiserverEp: apiserverEp,
notifierEp: notifierEp,
apiserverEp: cfg.Apiserver,
notifierEp: cfg.Notifier,
spec: computeSpec,

platform: platform,
namespace: namespace,
platform: cfg.Platform,
namespace: cfg.Namespace,
dplyr: dplyr,

jobTemplateDirPath: cfg.JobTemplate.Folder,
jobTemplatePath: filepath.Join(cfg.JobTemplate.Folder, cfg.JobTemplate.File),
deploymentDirPath: deploymentDirPath,

grpcDialOpt: grpcDialOpt,
}

Expand Down Expand Up @@ -250,7 +258,7 @@ func (r *resourceHandler) revokeResource(jobId string) (err error) {
}
taskStatuses[taskId] = openapi.AGENT_REVOKE_SUCCESS
// 2.delete all the task resource specification files
deploymentChartPath := filepath.Join(deploymentDirPath, jobId, taskId)
deploymentChartPath := filepath.Join(r.deploymentDirPath, jobId, taskId)
removeErr := os.RemoveAll(deploymentChartPath)
if removeErr != nil {
zap.S().Errorf("Errors occurred deleting specification files: %v", removeErr)
Expand Down Expand Up @@ -324,11 +332,14 @@ func (r *resourceHandler) deployResources(deploymentConfig openapi.DeploymentCon
errMsg := fmt.Sprintf("failed to initialize a job deployer: %v", err)
return fmt.Errorf(errMsg)
}

agentStatuses := map[string]openapi.AgentState{}
defer r.postDeploymentStatus(deploymentConfig.JobId, agentStatuses)

for taskId := range deploymentConfig.AgentKVs {
deploymentChartPath := filepath.Join(deploymentDirPath, deploymentConfig.JobId, taskId)
deploymentChartPath := filepath.Join(r.deploymentDirPath, deploymentConfig.JobId, taskId)
targetTemplateDirPath := filepath.Join(deploymentChartPath, deploymentTemplateDir)

if makeErr := os.MkdirAll(targetTemplateDirPath, util.FilePerm0644); makeErr != nil {
errMsg := fmt.Sprintf("failed to create a deployment template folder: %v", makeErr)
err = fmt.Errorf("%v; %v", err, errMsg)
Expand All @@ -337,37 +348,38 @@ func (r *resourceHandler) deployResources(deploymentConfig openapi.DeploymentCon
}

// Copy helm chart files to destination folder
copyErr := copyHelmCharts(helmChartFiles, jobTemplateDirPath, deploymentChartPath)
copyErr := copyHelmCharts(helmChartFiles, r.jobTemplateDirPath, deploymentChartPath)
if copyErr != nil {
err = fmt.Errorf("%v; %v", err, copyErr)
agentStatuses[taskId] = openapi.AGENT_DEPLOY_FAILED
continue
}

taskKey := deploymentConfig.AgentKVs[taskId]

ctx := map[string]string{
"imageLoc": deploymentConfig.ImageLoc,
"imageLoc": deploymentConfig.ImageLoc,
constants.ParamTaskID: taskId,
"taskKey": taskKey,
"taskKey": deploymentConfig.AgentKVs[taskId],
}
rendered, renderErr := mustache.RenderFile(jobTemplatePath, &ctx)

rendered, renderErr := mustache.RenderFile(r.jobTemplatePath, &ctx)
if renderErr != nil {
errMsg := fmt.Sprintf("failed to render a template for task %s: %v", taskId, renderErr)
err = fmt.Errorf("%v; %v", err, errMsg)
agentStatuses[taskId] = openapi.AGENT_DEPLOY_FAILED
continue
}

deploymentFileName := fmt.Sprintf("%s-%s.yaml", jobDeploymentFilePrefix, taskId)
deploymentFileName := fmt.Sprintf("task-%s.yaml", taskId)
deploymentFilePath := filepath.Join(targetTemplateDirPath, deploymentFileName)

writeErr := os.WriteFile(deploymentFilePath, []byte(rendered), util.FilePerm0644)
if writeErr != nil {
errMsg := fmt.Sprintf("failed to write a job rosource spec %s: %v", taskId, writeErr)
err = fmt.Errorf("%v; %v", err, errMsg)
agentStatuses[taskId] = openapi.AGENT_DEPLOY_FAILED
continue
}

//using short id of task as label name does not support more than 35 characters
installErr := r.dplyr.Install("job-"+deploymentConfig.JobId+"-"+taskId[:k8sShortLabelLength], deploymentChartPath)
if installErr != nil {
Expand All @@ -376,6 +388,7 @@ func (r *resourceHandler) deployResources(deploymentConfig openapi.DeploymentCon
agentStatuses[taskId] = openapi.AGENT_DEPLOY_FAILED
continue
}

agentStatuses[taskId] = openapi.AGENT_DEPLOY_SUCCESS
}

Expand Down
186 changes: 63 additions & 123 deletions cmd/deployer/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,149 +18,89 @@ package cmd

import (
"fmt"
"strings"
"path/filepath"

"github.com/spf13/cobra"
"go.uber.org/zap"

"github.com/cisco-open/flame/cmd/deployer/app"
"github.com/cisco-open/flame/cmd/deployer/config"
"github.com/cisco-open/flame/pkg/openapi"
"github.com/cisco-open/flame/pkg/util"
)

const (
argApiserver = "apiserver"
argNotifier = "notifier"
argAdminId = "adminid"
argRegion = "region"
argComputeId = "computeid"
argApiKey = "apikey"
argPlatform = "platform"
argNamespace = "namespace"

optionInsecure = "insecure"
optionPlain = "plain"
)

var rootCmd = &cobra.Command{
Use: util.Deployer,
Short: util.ProjectName + " Deployer",
RunE: func(cmd *cobra.Command, args []string) error {
flags := cmd.Flags()

apiserver, err := flags.GetString(argApiserver)
if err != nil {
return err
}
if len(strings.Split(apiserver, ":")) != util.NumTokensInRestEndpoint {
return fmt.Errorf("incorrect format for apiserver endpoint: %s", apiserver)
}

notifier, err := flags.GetString(argNotifier)
if err != nil {
return err
}
if len(strings.Split(notifier, ":")) != util.NumTokensInEndpoint {
return fmt.Errorf("incorrect format for notifier endpoint: %s", notifier)
}

adminId, err := flags.GetString(argAdminId)
if err != nil {
return err
}

region, err := flags.GetString(argRegion)
if err != nil {
return err
}

computeId, err := flags.GetString(argComputeId)
if err != nil {
return err
}

apikey, err := flags.GetString(argApiKey)
if err != nil {
return err
}

platform, err := flags.GetString(argPlatform)
if err != nil {
return err
}

namespace, err := flags.GetString(argNamespace)
if err != nil {
return err
}

bInsecure, _ := flags.GetBool(optionInsecure)
bPlain, _ := flags.GetBool(optionPlain)

if bInsecure && bPlain {
err = fmt.Errorf("options --%s and --%s are incompatible; enable one of them", optionInsecure, optionPlain)
return err
}

computeSpec := openapi.ComputeSpec{
AdminId: adminId,
Region: region,
ComputeId: computeId,
ApiKey: apikey,
}

compute, err := app.NewCompute(apiserver, computeSpec, bInsecure, bPlain)
if err != nil {
return err
}

err = compute.RegisterNewCompute()
if err != nil {
err = fmt.Errorf("unable to register new compute with controller: %s", err)
return err
}

resoureHandler := app.NewResourceHandler(apiserver, notifier, computeSpec, platform, namespace, bInsecure, bPlain)
resoureHandler.Start()

select {}
},
}
var (
cfgFile string
cfg *config.Config

rootCmd = &cobra.Command{
Use: util.Deployer,
Short: util.ProjectName + " Deployer",
RunE: func(cmd *cobra.Command, args []string) error {
flags := cmd.Flags()

bInsecure, _ := flags.GetBool(optionInsecure)
bPlain, _ := flags.GetBool(optionPlain)

if bInsecure && bPlain {
err := fmt.Errorf("options --%s and --%s are incompatible; enable one of them",
optionInsecure, optionPlain)
return err
}

computeSpec := openapi.ComputeSpec{
AdminId: cfg.AdminId,
Region: cfg.Region,
ComputeId: cfg.ComputeId,
ApiKey: cfg.Apikey,
}

compute, err := app.NewCompute(cfg.Apiserver, computeSpec, bInsecure, bPlain)
if err != nil {
return err
}

err = compute.RegisterNewCompute()
if err != nil {
err = fmt.Errorf("unable to register new compute with controller: %s", err)
return err
}

resoureHandler := app.NewResourceHandler(cfg, computeSpec, bInsecure, bPlain)
resoureHandler.Start()

select {}
},
}
)

func init() {
defaultApiServerEp := fmt.Sprintf("http://0.0.0.0:%d", util.ApiServerRestApiPort)
rootCmd.Flags().StringP(argApiserver, "a", defaultApiServerEp, "API server endpoint")
rootCmd.MarkFlagRequired(argApiserver)

defaultNotifierEp := fmt.Sprintf("0.0.0.0:%d", util.NotifierGrpcPort)
rootCmd.Flags().StringP(argNotifier, "n", defaultNotifierEp, "Notifier endpoint")
rootCmd.MarkFlagRequired(argNotifier)

defaultAdminId := "admin"
rootCmd.Flags().StringP(argAdminId, "d", defaultAdminId, "unique admin id")
rootCmd.MarkFlagRequired(argAdminId)

defaultRegion := "region"
rootCmd.Flags().StringP(argRegion, "r", defaultRegion, "region name")
rootCmd.MarkFlagRequired(argRegion)
cobra.OnInitialize(initConfig)

defaultComputeId := "compute"
rootCmd.Flags().StringP(argComputeId, "c", defaultComputeId, "unique compute id")
rootCmd.MarkFlagRequired(argComputeId)
usage := "config file (default: /etc/flame/deployer.yaml)"
rootCmd.PersistentFlags().StringVar(&cfgFile, "config", "", usage)
rootCmd.CompletionOptions.DisableDefaultCmd = true

defaultApiKey := "apiKey"
rootCmd.Flags().StringP(argApiKey, "k", defaultApiKey, "unique apikey")
rootCmd.MarkFlagRequired(argApiKey)
rootCmd.PersistentFlags().Bool(optionInsecure, false, "Allow insecure connection")
rootCmd.PersistentFlags().Bool(optionPlain, false, "Allow unencrypted connection")
}

defaultPlatform := "k8s"
rootCmd.Flags().StringP(argPlatform, "p", defaultPlatform, "compute platform")
rootCmd.MarkFlagRequired(argPlatform)
func initConfig() {
if cfgFile == "" {
cfgFile = filepath.Join("/etc/flame/deployer.yaml")
}

defaultNamespace := "flame"
rootCmd.Flags().StringP(argNamespace, "s", defaultNamespace, "compute namespace")
rootCmd.MarkFlagRequired(argNamespace)
var err error

rootCmd.PersistentFlags().Bool(optionInsecure, false, "Allow insecure connection")
rootCmd.PersistentFlags().Bool(optionPlain, false, "Allow unencrypted connection")
cfg, err = config.LoadConfig(cfgFile)
if err != nil {
zap.S().Fatalf("Failed to load config %s: %v", cfgFile, err)
}
}

func Execute() error {
Expand Down
Loading

0 comments on commit f981cb4

Please sign in to comment.