Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix merge conflicts between development and main branch #353

Merged
merged 19 commits into from
Mar 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 34 additions & 21 deletions cmd/deployer/app/resource_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"google.golang.org/grpc/credentials/insecure"

"github.com/cisco-open/flame/cmd/deployer/app/deployer"
"github.com/cisco-open/flame/cmd/deployer/config"
"github.com/cisco-open/flame/pkg/openapi"
"github.com/cisco-open/flame/pkg/openapi/constants"
pbNotify "github.com/cisco-open/flame/pkg/proto/notification"
Expand All @@ -42,13 +43,9 @@ import (
)

const (
deploymentDirPath = "/" + util.ProjectName + "/deployment"
deploymentTemplateDir = "templates"

jobTemplateDirPath = "/" + util.ProjectName + "/template"
jobDeploymentFilePrefix = "job-agent"
jobTemplatePath = jobTemplateDirPath + "/" + jobDeploymentFilePrefix + ".yaml.mustache"
k8sShortLabelLength = 12
k8sShortLabelLength = 12
)

var (
Expand All @@ -64,13 +61,17 @@ type resourceHandler struct {
namespace string
dplyr deployer.Deployer

// variables for job templates
jobTemplateDirPath string
jobTemplatePath string
deploymentDirPath string

stream pbNotify.DeployEventRoute_GetDeployEventClient

grpcDialOpt grpc.DialOption
}

func NewResourceHandler(apiserverEp string, notifierEp string, computeSpec openapi.ComputeSpec,
platform string, namespace string, bInsecure bool, bPlain bool) *resourceHandler {
func NewResourceHandler(cfg *config.Config, computeSpec openapi.ComputeSpec, bInsecure bool, bPlain bool) *resourceHandler {
var grpcDialOpt grpc.DialOption

if bPlain {
Expand All @@ -86,21 +87,28 @@ func NewResourceHandler(apiserverEp string, notifierEp string, computeSpec opena
grpcDialOpt = grpc.WithTransportCredentials(credentials.NewTLS(tlsCfg))
}

dplyr, err := deployer.NewDeployer(platform)
dplyr, err := deployer.NewDeployer(cfg.Platform)
if err != nil {
zap.S().Errorf("failed to obtain a job deployer: %v", err)
return nil
}

parentDir := filepath.Dir(cfg.JobTemplate.Folder)
deploymentDirPath := filepath.Join(parentDir, "deployment")

rHandler := &resourceHandler{
apiserverEp: apiserverEp,
notifierEp: notifierEp,
apiserverEp: cfg.Apiserver,
notifierEp: cfg.Notifier,
spec: computeSpec,

platform: platform,
namespace: namespace,
platform: cfg.Platform,
namespace: cfg.Namespace,
dplyr: dplyr,

jobTemplateDirPath: cfg.JobTemplate.Folder,
jobTemplatePath: filepath.Join(cfg.JobTemplate.Folder, cfg.JobTemplate.File),
deploymentDirPath: deploymentDirPath,

grpcDialOpt: grpcDialOpt,
}

Expand Down Expand Up @@ -250,7 +258,7 @@ func (r *resourceHandler) revokeResource(jobId string) (err error) {
}
taskStatuses[taskId] = openapi.AGENT_REVOKE_SUCCESS
// 2.delete all the task resource specification files
deploymentChartPath := filepath.Join(deploymentDirPath, jobId, taskId)
deploymentChartPath := filepath.Join(r.deploymentDirPath, jobId, taskId)
removeErr := os.RemoveAll(deploymentChartPath)
if removeErr != nil {
zap.S().Errorf("Errors occurred deleting specification files: %v", removeErr)
Expand Down Expand Up @@ -324,11 +332,14 @@ func (r *resourceHandler) deployResources(deploymentConfig openapi.DeploymentCon
errMsg := fmt.Sprintf("failed to initialize a job deployer: %v", err)
return fmt.Errorf(errMsg)
}

agentStatuses := map[string]openapi.AgentState{}
defer r.postDeploymentStatus(deploymentConfig.JobId, agentStatuses)

for taskId := range deploymentConfig.AgentKVs {
deploymentChartPath := filepath.Join(deploymentDirPath, deploymentConfig.JobId, taskId)
deploymentChartPath := filepath.Join(r.deploymentDirPath, deploymentConfig.JobId, taskId)
targetTemplateDirPath := filepath.Join(deploymentChartPath, deploymentTemplateDir)

if makeErr := os.MkdirAll(targetTemplateDirPath, util.FilePerm0644); makeErr != nil {
errMsg := fmt.Sprintf("failed to create a deployment template folder: %v", makeErr)
err = fmt.Errorf("%v; %v", err, errMsg)
Expand All @@ -337,37 +348,38 @@ func (r *resourceHandler) deployResources(deploymentConfig openapi.DeploymentCon
}

// Copy helm chart files to destination folder
copyErr := copyHelmCharts(helmChartFiles, jobTemplateDirPath, deploymentChartPath)
copyErr := copyHelmCharts(helmChartFiles, r.jobTemplateDirPath, deploymentChartPath)
if copyErr != nil {
err = fmt.Errorf("%v; %v", err, copyErr)
agentStatuses[taskId] = openapi.AGENT_DEPLOY_FAILED
continue
}

taskKey := deploymentConfig.AgentKVs[taskId]

ctx := map[string]string{
"imageLoc": deploymentConfig.ImageLoc,
"imageLoc": deploymentConfig.ImageLoc,
constants.ParamTaskID: taskId,
"taskKey": taskKey,
"taskKey": deploymentConfig.AgentKVs[taskId],
}
rendered, renderErr := mustache.RenderFile(jobTemplatePath, &ctx)

rendered, renderErr := mustache.RenderFile(r.jobTemplatePath, &ctx)
if renderErr != nil {
errMsg := fmt.Sprintf("failed to render a template for task %s: %v", taskId, renderErr)
err = fmt.Errorf("%v; %v", err, errMsg)
agentStatuses[taskId] = openapi.AGENT_DEPLOY_FAILED
continue
}

deploymentFileName := fmt.Sprintf("%s-%s.yaml", jobDeploymentFilePrefix, taskId)
deploymentFileName := fmt.Sprintf("task-%s.yaml", taskId)
deploymentFilePath := filepath.Join(targetTemplateDirPath, deploymentFileName)

writeErr := os.WriteFile(deploymentFilePath, []byte(rendered), util.FilePerm0644)
if writeErr != nil {
errMsg := fmt.Sprintf("failed to write a job rosource spec %s: %v", taskId, writeErr)
err = fmt.Errorf("%v; %v", err, errMsg)
agentStatuses[taskId] = openapi.AGENT_DEPLOY_FAILED
continue
}

//using short id of task as label name does not support more than 35 characters
installErr := r.dplyr.Install("job-"+deploymentConfig.JobId+"-"+taskId[:k8sShortLabelLength], deploymentChartPath)
if installErr != nil {
Expand All @@ -376,6 +388,7 @@ func (r *resourceHandler) deployResources(deploymentConfig openapi.DeploymentCon
agentStatuses[taskId] = openapi.AGENT_DEPLOY_FAILED
continue
}

agentStatuses[taskId] = openapi.AGENT_DEPLOY_SUCCESS
}

Expand Down
186 changes: 63 additions & 123 deletions cmd/deployer/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,149 +18,89 @@ package cmd

import (
"fmt"
"strings"
"path/filepath"

"github.com/spf13/cobra"
"go.uber.org/zap"

"github.com/cisco-open/flame/cmd/deployer/app"
"github.com/cisco-open/flame/cmd/deployer/config"
"github.com/cisco-open/flame/pkg/openapi"
"github.com/cisco-open/flame/pkg/util"
)

const (
argApiserver = "apiserver"
argNotifier = "notifier"
argAdminId = "adminid"
argRegion = "region"
argComputeId = "computeid"
argApiKey = "apikey"
argPlatform = "platform"
argNamespace = "namespace"

optionInsecure = "insecure"
optionPlain = "plain"
)

var rootCmd = &cobra.Command{
Use: util.Deployer,
Short: util.ProjectName + " Deployer",
RunE: func(cmd *cobra.Command, args []string) error {
flags := cmd.Flags()

apiserver, err := flags.GetString(argApiserver)
if err != nil {
return err
}
if len(strings.Split(apiserver, ":")) != util.NumTokensInRestEndpoint {
return fmt.Errorf("incorrect format for apiserver endpoint: %s", apiserver)
}

notifier, err := flags.GetString(argNotifier)
if err != nil {
return err
}
if len(strings.Split(notifier, ":")) != util.NumTokensInEndpoint {
return fmt.Errorf("incorrect format for notifier endpoint: %s", notifier)
}

adminId, err := flags.GetString(argAdminId)
if err != nil {
return err
}

region, err := flags.GetString(argRegion)
if err != nil {
return err
}

computeId, err := flags.GetString(argComputeId)
if err != nil {
return err
}

apikey, err := flags.GetString(argApiKey)
if err != nil {
return err
}

platform, err := flags.GetString(argPlatform)
if err != nil {
return err
}

namespace, err := flags.GetString(argNamespace)
if err != nil {
return err
}

bInsecure, _ := flags.GetBool(optionInsecure)
bPlain, _ := flags.GetBool(optionPlain)

if bInsecure && bPlain {
err = fmt.Errorf("options --%s and --%s are incompatible; enable one of them", optionInsecure, optionPlain)
return err
}

computeSpec := openapi.ComputeSpec{
AdminId: adminId,
Region: region,
ComputeId: computeId,
ApiKey: apikey,
}

compute, err := app.NewCompute(apiserver, computeSpec, bInsecure, bPlain)
if err != nil {
return err
}

err = compute.RegisterNewCompute()
if err != nil {
err = fmt.Errorf("unable to register new compute with controller: %s", err)
return err
}

resoureHandler := app.NewResourceHandler(apiserver, notifier, computeSpec, platform, namespace, bInsecure, bPlain)
resoureHandler.Start()

select {}
},
}
var (
cfgFile string
cfg *config.Config

rootCmd = &cobra.Command{
Use: util.Deployer,
Short: util.ProjectName + " Deployer",
RunE: func(cmd *cobra.Command, args []string) error {
flags := cmd.Flags()

bInsecure, _ := flags.GetBool(optionInsecure)
bPlain, _ := flags.GetBool(optionPlain)

if bInsecure && bPlain {
err := fmt.Errorf("options --%s and --%s are incompatible; enable one of them",
optionInsecure, optionPlain)
return err
}

computeSpec := openapi.ComputeSpec{
AdminId: cfg.AdminId,
Region: cfg.Region,
ComputeId: cfg.ComputeId,
ApiKey: cfg.Apikey,
}

compute, err := app.NewCompute(cfg.Apiserver, computeSpec, bInsecure, bPlain)
if err != nil {
return err
}

err = compute.RegisterNewCompute()
if err != nil {
err = fmt.Errorf("unable to register new compute with controller: %s", err)
return err
}

resoureHandler := app.NewResourceHandler(cfg, computeSpec, bInsecure, bPlain)
resoureHandler.Start()

select {}
},
}
)

func init() {
defaultApiServerEp := fmt.Sprintf("http://0.0.0.0:%d", util.ApiServerRestApiPort)
rootCmd.Flags().StringP(argApiserver, "a", defaultApiServerEp, "API server endpoint")
rootCmd.MarkFlagRequired(argApiserver)

defaultNotifierEp := fmt.Sprintf("0.0.0.0:%d", util.NotifierGrpcPort)
rootCmd.Flags().StringP(argNotifier, "n", defaultNotifierEp, "Notifier endpoint")
rootCmd.MarkFlagRequired(argNotifier)

defaultAdminId := "admin"
rootCmd.Flags().StringP(argAdminId, "d", defaultAdminId, "unique admin id")
rootCmd.MarkFlagRequired(argAdminId)

defaultRegion := "region"
rootCmd.Flags().StringP(argRegion, "r", defaultRegion, "region name")
rootCmd.MarkFlagRequired(argRegion)
cobra.OnInitialize(initConfig)

defaultComputeId := "compute"
rootCmd.Flags().StringP(argComputeId, "c", defaultComputeId, "unique compute id")
rootCmd.MarkFlagRequired(argComputeId)
usage := "config file (default: /etc/flame/deployer.yaml)"
rootCmd.PersistentFlags().StringVar(&cfgFile, "config", "", usage)
rootCmd.CompletionOptions.DisableDefaultCmd = true

defaultApiKey := "apiKey"
rootCmd.Flags().StringP(argApiKey, "k", defaultApiKey, "unique apikey")
rootCmd.MarkFlagRequired(argApiKey)
rootCmd.PersistentFlags().Bool(optionInsecure, false, "Allow insecure connection")
rootCmd.PersistentFlags().Bool(optionPlain, false, "Allow unencrypted connection")
}

defaultPlatform := "k8s"
rootCmd.Flags().StringP(argPlatform, "p", defaultPlatform, "compute platform")
rootCmd.MarkFlagRequired(argPlatform)
func initConfig() {
if cfgFile == "" {
cfgFile = filepath.Join("/etc/flame/deployer.yaml")
}

defaultNamespace := "flame"
rootCmd.Flags().StringP(argNamespace, "s", defaultNamespace, "compute namespace")
rootCmd.MarkFlagRequired(argNamespace)
var err error

rootCmd.PersistentFlags().Bool(optionInsecure, false, "Allow insecure connection")
rootCmd.PersistentFlags().Bool(optionPlain, false, "Allow unencrypted connection")
cfg, err = config.LoadConfig(cfgFile)
if err != nil {
zap.S().Fatalf("Failed to load config %s: %v", cfgFile, err)
}
}

func Execute() error {
Expand Down
Loading