Skip to content

Commit

Permalink
Initial end-to-end pipeline for deployer
Browse files Browse the repository at this point in the history
This PR implements the eventroute and triggerroute code for notifier to receive and send deployment events. The controller triggers an add resource event while allocating computes and this event is received at the deployer.
  • Loading branch information
dhruvsgarg committed Jul 20, 2022
1 parent 0188fb9 commit 4f298de
Show file tree
Hide file tree
Showing 7 changed files with 232 additions and 24 deletions.
32 changes: 28 additions & 4 deletions cmd/controller/app/job/default_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,10 @@ func (h *DefaultHandler) ChangeState(state JobHandlerState) {
}

func (h *DefaultHandler) allocateComputes() error {
// Placeholder invocation for new deployer
h.notifyDeploy(pbNotify.DeployEventType_ADD_RESOURCE)
zap.S().Infof("Invoking notifyDeploy - add resource - from allocateComputes()")

deploymentChartPath := filepath.Join(deploymentDirPath, h.jobId)
targetTemplateDirPath := filepath.Join(deploymentChartPath, deploymentTemplateDir)
if err := os.MkdirAll(targetTemplateDirPath, util.FilePerm0644); err != nil {
Expand Down Expand Up @@ -297,7 +301,7 @@ func (h *DefaultHandler) allocateComputes() error {
return nil
}

func (h *DefaultHandler) notify(evtType pbNotify.JobEventType) error {
func (h *DefaultHandler) notifyJob(evtType pbNotify.JobEventType) error {
req := &pbNotify.JobEventRequest{
Type: evtType,
TaskIds: make([]string, 0),
Expand All @@ -308,12 +312,32 @@ func (h *DefaultHandler) notify(evtType pbNotify.JobEventType) error {
req.TaskIds = append(req.TaskIds, taskInfo.TaskId)
}

resp, err := newNotifyClient(h.notifier, h.bInsecure, h.bPlain).sendNotification(req)
resp, err := newNotifyClient(h.notifier, h.bInsecure, h.bPlain).sendJobNotification(req)
if err != nil {
return fmt.Errorf("failed to notify for job: %v", err)
}

zap.S().Infof("response status from notifyJob = %s", resp.Status.String())

return nil
}

func (h *DefaultHandler) notifyDeploy(evtType pbNotify.DeployEventType) error {
// TODO: add computeId field to tasks and remove hard-coding
computeIds := []string{"compute-1"}

req := &pbNotify.DeployEventRequest{
Type: evtType,
ComputeIds: computeIds,
JobId: h.jobId,
}

resp, err := newNotifyClient(h.notifier, h.bInsecure, h.bPlain).sendDeployNotification(req)
if err != nil {
return fmt.Errorf("failed to notify: %v", err)
return fmt.Errorf("failed to notify for deployment: %v", err)
}

zap.S().Infof("response status = %s", resp.Status.String())
zap.S().Infof("response status from notifyDeploy = %s", resp.Status.String())

return nil
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/controller/app/job/job_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ func (s *StateStarting) Deploy(event *JobEvent) {
}

// send a job start message to notifier
err = s.hdlr.notify(pbNotify.JobEventType_START_JOB)
err = s.hdlr.notifyJob(pbNotify.JobEventType_START_JOB)
if err != nil {
zap.S().Debugf("%v", err)

Expand Down
33 changes: 28 additions & 5 deletions cmd/controller/app/job/notify.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,22 +55,45 @@ func newNotifyClient(endpoint string, bInsecure bool, bPlain bool) *notifyClient
return &notifyClient{endpoint: endpoint, grpcDialOpt: grpcDialOpt}
}

// sendNotification sends a notification request to the notifier
func (nc *notifyClient) sendNotification(req *pbNotify.JobEventRequest) (*pbNotify.JobResponse, error) {
// sendJobNotification sends a job notification request to the notifier
func (nc *notifyClient) sendJobNotification(req *pbNotify.JobEventRequest) (*pbNotify.JobResponse, error) {
conn, err := grpc.Dial(nc.endpoint, nc.grpcDialOpt)
if err != nil {
return nil, fmt.Errorf("failed to connect to notifier: %v", err)
return nil, fmt.Errorf("sendJobNotification failed to connect to notifier: %v", err)
}
defer conn.Close()

trClient := pbNotify.NewJobTriggerRouteClient(conn)

zap.S().Infof("Successfully connected to notifier: %s", nc.endpoint)
zap.S().Infof("sendJobNotification successfully connected to notifier: %s", nc.endpoint)

response, err := trClient.NotifyJob(context.Background(), req)

if err != nil {
errMsg := fmt.Sprintf("notification failed: %v", err)
errMsg := fmt.Sprintf("sendJobNotification failed: %v", err)
zap.S().Warn(errMsg)
return nil, fmt.Errorf(errMsg)
}

return response, nil
}

// sendDeployNotification sends a deployment notification request to the notifier
func (nc *notifyClient) sendDeployNotification(req *pbNotify.DeployEventRequest) (*pbNotify.DeployResponse, error) {
conn, err := grpc.Dial(nc.endpoint, nc.grpcDialOpt)
if err != nil {
return nil, fmt.Errorf("sendDeployNotification failed to connect to notifier: %v", err)
}
defer conn.Close()

trClient := pbNotify.NewDeployTriggerRouteClient(conn)

zap.S().Infof("sendDeployNotification successfully connected to notifier: %s", nc.endpoint)

response, err := trClient.NotifyDeploy(context.Background(), req)

if err != nil {
errMsg := fmt.Sprintf("sendDeployNotification failed: %v", err)
zap.S().Warn(errMsg)
return nil, fmt.Errorf(errMsg)
}
Expand Down
63 changes: 62 additions & 1 deletion cmd/deployer/app/resource_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package app

import (
"context"
"crypto/tls"
"net/http"
"time"
Expand All @@ -28,13 +29,16 @@ import (
"google.golang.org/grpc/credentials/insecure"

"github.com/cisco-open/flame/pkg/openapi"
pbNotify "github.com/cisco-open/flame/pkg/proto/notification"
)

type resourceHandler struct {
apiserverEp string
notifierEp string
spec openapi.ComputeSpec

stream pbNotify.DeployEventRoute_GetDeployEventClient

grpcDialOpt grpc.DialOption
}

Expand Down Expand Up @@ -81,6 +85,8 @@ func (r *resourceHandler) doStart() {
zap.S().Fatalf("Cannot connect with notifier: %v", err)
}

r.do()

// if connection is broken right after connection is made, this can cause
// too many connection/disconnection events. To migitage that, add some static
// pause time.
Expand All @@ -95,7 +101,62 @@ func (r *resourceHandler) connect() error {
zap.S().Debugf("Cannot connect with notifier: %v, conn: %v", err, conn)
return err
}
zap.S().Infof("Connected with notifier at: %s", r.notifierEp)
zap.S().Infof("Connected with notifier at: %s, will open stream", r.notifierEp)

client := pbNotify.NewDeployEventRouteClient(conn)
in := &pbNotify.DeployInfo{
ComputeId: r.spec.ComputeId,
ApiKey: r.spec.ApiKey,
}

// setup notification stream
stream, err := client.GetDeployEvent(context.Background(), in)
if err != nil {
zap.S().Debugf("Open stream error: %v", err)
return err
}

r.stream = stream
zap.S().Infof("Opened stream with notifier at %s", r.notifierEp)

return nil
}

func (r *resourceHandler) do() {
for {
resp, err := r.stream.Recv()
if err != nil {
zap.S().Errorf("Failed to receive notification: %v", err)
break
}

r.dealWith(resp)
}

zap.S().Info("Disconnected from notifier")
}

func (r *resourceHandler) dealWith(in *pbNotify.DeployEvent) {
switch in.GetType() {
case pbNotify.DeployEventType_ADD_RESOURCE:
r.addResource(in.JobId)

case pbNotify.DeployEventType_REVOKE_RESOURCE:
r.revokeResource(in.JobId)

case pbNotify.DeployEventType_UNKNOWN_DEPLOYMENT_TYPE:
fallthrough
default:
zap.S().Errorf("Invalid message type: %s", in.GetType())
}
}

func (r *resourceHandler) addResource(jobId string) {
zap.S().Infof("Received add resource request for job %s", jobId)
// TODO: implement addResource method
}

func (r *resourceHandler) revokeResource(jobId string) {
zap.S().Infof("Received revoke resource request for job %s", jobId)
// TODO: implement revokeResource method
}
58 changes: 50 additions & 8 deletions cmd/notifier/app/eventroute.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ func (s *notificationServer) GetJobEvent(in *pbNotify.JobTaskInfo, stream pbNoti

case <-stream.Context().Done():
zap.S().Infof("Stream context is done for task %s", taskId)
s.mutex.Lock()
delete(s.eventQueues, taskId)
s.mutex.Unlock()
s.mutex_job.Lock()
delete(s.jobEventQueues, taskId)
s.mutex_job.Unlock()
return nil
}
}
Expand All @@ -56,14 +56,56 @@ func (s *notificationServer) GetJobEvent(in *pbNotify.JobTaskInfo, stream pbNoti
func (s *notificationServer) getJobEventChannel(taskId string) chan *pbNotify.JobEvent {
var eventCh chan *pbNotify.JobEvent

s.mutex.Lock()
if _, ok := s.eventQueues[taskId]; !ok {
s.mutex_job.Lock()
if _, ok := s.jobEventQueues[taskId]; !ok {
eventCh = make(chan *pbNotify.JobEvent, eventChannelLen)
s.eventQueues[taskId] = eventCh
s.jobEventQueues[taskId] = eventCh
} else {
eventCh = s.eventQueues[taskId]
eventCh = s.jobEventQueues[taskId]
}
s.mutex.Unlock()
s.mutex_job.Unlock()

return eventCh
}

// GetDeployEvent is called by the client to subscribe to the notification service.
// Adds the client to the server client map and stores the client stream.
func (s *notificationServer) GetDeployEvent(in *pbNotify.DeployInfo, stream pbNotify.DeployEventRoute_GetDeployEventServer) error {
zap.S().Debugf("Serving event for deployer %v", in)

computeId := in.GetComputeId()

eventCh := s.getDeployEventChannel(computeId)
for {
select {
case event := <-eventCh:
zap.S().Infof("Pushing event %v to deployer %s", event, computeId)
err := stream.Send(event)
if err != nil {
zap.S().Warnf("Failed to push notification to deployer %s: %v", computeId, err)
}

case <-stream.Context().Done():
zap.S().Infof("Stream context is done for deployer %s", computeId)
s.mutex_deploy.Lock()
delete(s.deployEventQueues, computeId)
s.mutex_deploy.Unlock()
return nil
}
}
}

func (s *notificationServer) getDeployEventChannel(computeId string) chan *pbNotify.DeployEvent {
var eventCh chan *pbNotify.DeployEvent

s.mutex_deploy.Lock()
if _, ok := s.deployEventQueues[computeId]; !ok {
eventCh = make(chan *pbNotify.DeployEvent, eventChannelLen)
s.deployEventQueues[computeId] = eventCh
} else {
eventCh = s.deployEventQueues[computeId]
}
s.mutex_deploy.Unlock()

return eventCh
}
18 changes: 14 additions & 4 deletions cmd/notifier/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,16 @@ import (
// notificationServer implement the notification service and include - proto unimplemented method and
// maintains list of connected clients & their streams.
type notificationServer struct {
eventQueues map[string]chan *pbNotify.JobEvent
mutex sync.Mutex
jobEventQueues map[string]chan *pbNotify.JobEvent
deployEventQueues map[string]chan *pbNotify.DeployEvent
mutex_job sync.Mutex
mutex_deploy sync.Mutex

pbNotify.UnimplementedJobEventRouteServer
pbNotify.UnimplementedJobTriggerRouteServer

pbNotify.UnimplementedDeployEventRouteServer
pbNotify.UnimplementedDeployTriggerRouteServer
}

// StartGRPCService starts the notification grpc server
Expand All @@ -48,13 +53,18 @@ func StartGRPCService(portNo uint16) {
// create grpc server
s := grpc.NewServer()
server := &notificationServer{
eventQueues: make(map[string]chan *pbNotify.JobEvent),
jobEventQueues: make(map[string]chan *pbNotify.JobEvent),
deployEventQueues: make(map[string]chan *pbNotify.DeployEvent),
}

// register grpc services
// register job grpc services
pbNotify.RegisterJobEventRouteServer(s, server)
pbNotify.RegisterJobTriggerRouteServer(s, server)

// register deploy grpc services
pbNotify.RegisterDeployEventRouteServer(s, server)
pbNotify.RegisterDeployTriggerRouteServer(s, server)

reflection.Register(s)

zap.S().Infof("Notification GRPC server listening at %v", lis.Addr())
Expand Down
50 changes: 49 additions & 1 deletion cmd/notifier/app/triggerroute.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (s *notificationServer) NotifyJob(ctx context.Context, in *pbNotify.JobEven
case pbNotify.JobEventType_UNKNOWN_EVENT_TYPE:
fallthrough
default:
return nil, fmt.Errorf("unknown event type: %s", in.GetType())
return nil, fmt.Errorf("unknown job event type: %s", in.GetType())
}

failedTasks := make([]string, 0)
Expand Down Expand Up @@ -72,3 +72,51 @@ func (s *notificationServer) NotifyJob(ctx context.Context, in *pbNotify.JobEven

return resp, nil
}

func (s *notificationServer) NotifyDeploy(ctx context.Context, in *pbNotify.DeployEventRequest) (*pbNotify.DeployResponse, error) {
zap.S().Info("TriggerRoute - received message from controller to %v for compute %v", in.GetType(), in.ComputeIds)
switch in.Type {
case pbNotify.DeployEventType_ADD_RESOURCE:
case pbNotify.DeployEventType_REVOKE_RESOURCE:

case pbNotify.DeployEventType_UNKNOWN_DEPLOYMENT_TYPE:
fallthrough
default:
return nil, fmt.Errorf("unknown deploy event type: %s", in.GetType())
}

failedDeployers := make([]string, 0)
for _, computeId := range in.ComputeIds {
event := pbNotify.DeployEvent{
Type: in.Type,
JobId: in.JobId,
}

eventCh := s.getDeployEventChannel(computeId)

select {
case eventCh <- &event:
// Do nothing
default:
failedDeployers = append(failedDeployers, computeId)
}
}

resp := &pbNotify.DeployResponse{
Status: pbNotify.DeployResponse_SUCCESS,
Message: "Successfully issued deployment instructions to deployers",
FailedDeployers: failedDeployers,
}

if len(in.ComputeIds) > 0 && len(failedDeployers) == len(in.ComputeIds) {
resp.Message = "Failed to issue deployment instructions for all deployers"
resp.Status = pbNotify.DeployResponse_ERROR
} else if len(failedDeployers) > 0 && len(failedDeployers) < len(in.ComputeIds) {
resp.Message = "Issued deployment instructions for some deployers successfully"
resp.Status = pbNotify.DeployResponse_PARTIAL_SUCCESS
}

zap.S().Info(resp.Message)

return resp, nil
}

0 comments on commit 4f298de

Please sign in to comment.