Skip to content

Commit

Permalink
Implement basic pipeline for deployer (#197)
Browse files Browse the repository at this point in the history
This commit establishes the deployer apiserver and controller pipeline to get deployment configuration and allocate resources.
  • Loading branch information
dhruvsgarg committed Jul 24, 2022
1 parent cb15fb9 commit 9ddddae
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 18 deletions.
51 changes: 50 additions & 1 deletion cmd/deployer/app/resource_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package app
import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
"net/http"
"time"

Expand All @@ -30,6 +32,7 @@ import (

"github.com/cisco-open/flame/pkg/openapi"
pbNotify "github.com/cisco-open/flame/pkg/proto/notification"
"github.com/cisco-open/flame/pkg/restapi"
)

type resourceHandler struct {
Expand Down Expand Up @@ -153,10 +156,56 @@ func (r *resourceHandler) dealWith(in *pbNotify.DeployEvent) {

func (r *resourceHandler) addResource(jobId string) {
zap.S().Infof("Received add resource request for job %s", jobId)
// TODO: implement addResource method

// Sending request to apiserver to get deployment config for specific jobId and computeId
deploymentConfig, err := r.getDeploymentConfig(jobId)
if err != nil {
fmt.Printf("Failed to get deploymentConfig for job %s: %v\n", jobId, err)
}
zap.S().Infof("Got deployment config from apiserver")

// Deploy resources (agents) for the job based on the configuration
err = r.deployResources(deploymentConfig)
if err != nil {
fmt.Printf("Failed to deploy resources for job %s: %v\n", jobId, err)
}
zap.S().Infof("Successfully added resources for compute %s and jobId %s", r.spec.ComputeId, jobId)
}

func (r *resourceHandler) revokeResource(jobId string) {
zap.S().Infof("Received revoke resource request for job %s", jobId)
// TODO: implement revokeResource method
}

func (r *resourceHandler) getDeploymentConfig(jobId string) (openapi.DeploymentConfig, error) {
zap.S().Infof("Sending request to apiserver / controller to get deployment config")
// construct url
uriMap := map[string]string{
"computeId": r.spec.ComputeId,
"jobId": jobId,
}
url := restapi.CreateURL(r.apiserverEp, restapi.GetDeploymentConfigEndpoint, uriMap)
code, respBody, err := restapi.HTTPGet(url)
if err != nil || restapi.CheckStatusCode(code) != nil {
fmt.Printf("Deployer failed to get deployment config for job %s - code: %d, error: %v\n", jobId, code, err)
return openapi.DeploymentConfig{}, nil
}

deploymentConfig := openapi.DeploymentConfig{}
err = json.Unmarshal(respBody, &deploymentConfig)
if err != nil {
fmt.Printf("WARNING: Failed to parse resp message: %v", err)
return openapi.DeploymentConfig{}, nil
}
return deploymentConfig, nil
}

func (r *resourceHandler) deployResources(deploymentConfig openapi.DeploymentConfig) error {
zap.S().Infof("Beginning deployment of agents")

// iterate over agent kvs and invoke install for each
// Create more functions and move code from /cmd/controller/deployer/k8s.go

zap.S().Infof("Completed deployment of agents")
return nil
}
38 changes: 30 additions & 8 deletions pkg/openapi/apiserver/api_computes_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ package apiserver

import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
Expand Down Expand Up @@ -121,17 +122,38 @@ func (s *ComputesApiService) GetComputeStatus(ctx context.Context, computeId str

// GetDeploymentConfig - Get the deployment config for a job for a compute cluster
func (s *ComputesApiService) GetDeploymentConfig(ctx context.Context, computeId string, jobId string) (openapi.ImplResponse, error) {
// TODO - update GetDeploymentConfig with the required logic for this service method.
// Add api_computes_service.go to the .openapi-generator-ignore to avoid overwriting
// this service implementation when updating open api generation.
// TODO - add logic later to validate the request coming from the deployer with ApiKey
// Report error if apikey for the deployer does not match with the apiserver cache or from db

//TODO: Uncomment the next line to return response Response(200, DeploymentConfig{}) or use other options such as http.Ok ...
//return Response(200, DeploymentConfig{}), nil
// create controller request
uriMap := map[string]string{
"computeId": computeId,
"jobId": jobId,
}
url := restapi.CreateURL(HostEndpoint, restapi.GetDeploymentConfigEndpoint, uriMap)

//TODO: Uncomment the next line to return response Response(0, Error{}) or use other options such as http.Ok ...
//return Response(0, Error{}), nil
// send post request
code, resp, err := restapi.HTTPGet(url)
if err != nil {
errMsg := fmt.Sprintf("failed to send get request to controller: %v", err)
zap.S().Errorf(errMsg)
return openapi.Response(http.StatusInternalServerError, nil), err
}

if err = restapi.CheckStatusCode(code); err != nil {
return openapi.Response(code, nil), fmt.Errorf("%s", string(resp))
}
deploymentConfig := openapi.DeploymentConfig{}
err = json.Unmarshal(resp, &deploymentConfig)
if err != nil {
errMsg := fmt.Sprintf("failed to send get deployment config from controller: %v", err)
zap.S().Errorf(errMsg)
return openapi.Response(http.StatusInternalServerError, nil), err
}

zap.S().Infof("Successfully got deployment config from controller for jobId %s and computeId %s", jobId, computeId)

return openapi.Response(http.StatusNotImplemented, nil), errors.New("GetDeploymentConfig method not implemented")
return openapi.Response(http.StatusOK, deploymentConfig), nil
}

// GetDeploymentStatus - Get the deployment status for a job on a compute cluster
Expand Down
14 changes: 5 additions & 9 deletions pkg/openapi/controller/api_computes_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (

"github.com/cisco-open/flame/cmd/controller/app/database"
"github.com/cisco-open/flame/pkg/openapi"
"go.uber.org/zap"
)

// ComputesApiService is a service that implements the logic for the ComputesApiServicer
Expand Down Expand Up @@ -121,17 +122,12 @@ func (s *ComputesApiService) GetComputeStatus(ctx context.Context, computeId str

// GetDeploymentConfig - Get the deployment config for a job for a compute cluster
func (s *ComputesApiService) GetDeploymentConfig(ctx context.Context, computeId string, jobId string) (openapi.ImplResponse, error) {
// TODO - update GetDeploymentConfig with the required logic for this service method.
// Add api_computes_service.go to the .openapi-generator-ignore to avoid overwriting
// this service implementation when updating open api generation.

//TODO: Uncomment the next line to return response Response(200, DeploymentConfig{}) or use other options such as http.Ok ...
//return Response(200, DeploymentConfig{}), nil
// TODO - update logic to populate deployment config

//TODO: Uncomment the next line to return response Response(0, Error{}) or use other options such as http.Ok ...
//return Response(0, Error{}), nil
deploymentConfig := openapi.DeploymentConfig{}
zap.S().Infof("Populated deployment config for jobId %s and computeId %s", jobId, computeId)

return openapi.Response(http.StatusNotImplemented, nil), errors.New("GetDeploymentConfig method not implemented")
return openapi.Response(http.StatusOK, deploymentConfig), nil
}

// GetDeploymentStatus - Get the deployment status for a job on a compute cluster
Expand Down

0 comments on commit 9ddddae

Please sign in to comment.