From 814fe1677cf5cba92c4d9dc94667b25033df08ae Mon Sep 17 00:00:00 2001 From: dhruvsgarg Date: Tue, 19 Jul 2022 21:09:20 -0400 Subject: [PATCH] Initial end-to-end pipeline for deployer This PR implements the eventroute and triggerroute code for notifier to receive and send deployment events. The controller triggers an add resource event while allocating computes and this event is received at the deployer. --- cmd/controller/app/job/default_handler.go | 32 ++++++++++-- cmd/controller/app/job/job_state.go | 2 +- cmd/controller/app/job/notify.go | 33 ++++++++++-- cmd/deployer/app/resource_handler.go | 63 ++++++++++++++++++++++- cmd/notifier/app/eventroute.go | 58 ++++++++++++++++++--- cmd/notifier/app/server.go | 18 +++++-- cmd/notifier/app/triggerroute.go | 50 +++++++++++++++++- 7 files changed, 232 insertions(+), 24 deletions(-) diff --git a/cmd/controller/app/job/default_handler.go b/cmd/controller/app/job/default_handler.go index ae9671291..fab93b0cb 100644 --- a/cmd/controller/app/job/default_handler.go +++ b/cmd/controller/app/job/default_handler.go @@ -214,6 +214,10 @@ func (h *DefaultHandler) ChangeState(state JobHandlerState) { } func (h *DefaultHandler) allocateComputes() error { + // Placeholder invocation for new deployer + h.notifyDeploy(pbNotify.DeployEventType_ADD_RESOURCE) + zap.S().Infof("Invoking notifyDeploy - add resource - from allocateComputes()") + deploymentChartPath := filepath.Join(deploymentDirPath, h.jobId) targetTemplateDirPath := filepath.Join(deploymentChartPath, deploymentTemplateDir) if err := os.MkdirAll(targetTemplateDirPath, util.FilePerm0644); err != nil { @@ -297,7 +301,7 @@ func (h *DefaultHandler) allocateComputes() error { return nil } -func (h *DefaultHandler) notify(evtType pbNotify.JobEventType) error { +func (h *DefaultHandler) notifyJob(evtType pbNotify.JobEventType) error { req := &pbNotify.JobEventRequest{ Type: evtType, TaskIds: make([]string, 0), @@ -308,12 +312,32 @@ func (h *DefaultHandler) notify(evtType pbNotify.JobEventType) error { req.TaskIds = append(req.TaskIds, taskInfo.TaskId) } - resp, err := newNotifyClient(h.notifier, h.bInsecure, h.bPlain).sendNotification(req) + resp, err := newNotifyClient(h.notifier, h.bInsecure, h.bPlain).sendJobNotification(req) + if err != nil { + return fmt.Errorf("failed to notify for job: %v", err) + } + + zap.S().Infof("response status from notifyJob = %s", resp.Status.String()) + + return nil +} + +func (h *DefaultHandler) notifyDeploy(evtType pbNotify.DeployEventType) error { + // TODO: add computeId field to tasks and remove hard-coding + computeIds := []string{"compute-1"} + + req := &pbNotify.DeployEventRequest{ + Type: evtType, + ComputeIds: computeIds, + JobId: h.jobId, + } + + resp, err := newNotifyClient(h.notifier, h.bInsecure, h.bPlain).sendDeployNotification(req) if err != nil { - return fmt.Errorf("failed to notify: %v", err) + return fmt.Errorf("failed to notify for deployment: %v", err) } - zap.S().Infof("response status = %s", resp.Status.String()) + zap.S().Infof("response status from notifyDeploy = %s", resp.Status.String()) return nil } diff --git a/cmd/controller/app/job/job_state.go b/cmd/controller/app/job/job_state.go index 65cee1710..9b96717ed 100644 --- a/cmd/controller/app/job/job_state.go +++ b/cmd/controller/app/job/job_state.go @@ -232,7 +232,7 @@ func (s *StateStarting) Deploy(event *JobEvent) { } // send a job start message to notifier - err = s.hdlr.notify(pbNotify.JobEventType_START_JOB) + err = s.hdlr.notifyJob(pbNotify.JobEventType_START_JOB) if err != nil { zap.S().Debugf("%v", err) diff --git a/cmd/controller/app/job/notify.go b/cmd/controller/app/job/notify.go index 2027c9b63..50d497b62 100644 --- a/cmd/controller/app/job/notify.go +++ b/cmd/controller/app/job/notify.go @@ -55,22 +55,45 @@ func newNotifyClient(endpoint string, bInsecure bool, bPlain bool) *notifyClient return ¬ifyClient{endpoint: endpoint, grpcDialOpt: grpcDialOpt} } -// sendNotification sends a notification request to the notifier -func (nc *notifyClient) sendNotification(req *pbNotify.JobEventRequest) (*pbNotify.JobResponse, error) { +// sendJobNotification sends a job notification request to the notifier +func (nc *notifyClient) sendJobNotification(req *pbNotify.JobEventRequest) (*pbNotify.JobResponse, error) { conn, err := grpc.Dial(nc.endpoint, nc.grpcDialOpt) if err != nil { - return nil, fmt.Errorf("failed to connect to notifier: %v", err) + return nil, fmt.Errorf("sendJobNotification failed to connect to notifier: %v", err) } defer conn.Close() trClient := pbNotify.NewJobTriggerRouteClient(conn) - zap.S().Infof("Successfully connected to notifier: %s", nc.endpoint) + zap.S().Infof("sendJobNotification successfully connected to notifier: %s", nc.endpoint) response, err := trClient.NotifyJob(context.Background(), req) if err != nil { - errMsg := fmt.Sprintf("notification failed: %v", err) + errMsg := fmt.Sprintf("sendJobNotification failed: %v", err) + zap.S().Warn(errMsg) + return nil, fmt.Errorf(errMsg) + } + + return response, nil +} + +// sendDeployNotification sends a deployment notification request to the notifier +func (nc *notifyClient) sendDeployNotification(req *pbNotify.DeployEventRequest) (*pbNotify.DeployResponse, error) { + conn, err := grpc.Dial(nc.endpoint, nc.grpcDialOpt) + if err != nil { + return nil, fmt.Errorf("sendDeployNotification failed to connect to notifier: %v", err) + } + defer conn.Close() + + trClient := pbNotify.NewDeployTriggerRouteClient(conn) + + zap.S().Infof("sendDeployNotification successfully connected to notifier: %s", nc.endpoint) + + response, err := trClient.NotifyDeploy(context.Background(), req) + + if err != nil { + errMsg := fmt.Sprintf("sendDeployNotification failed: %v", err) zap.S().Warn(errMsg) return nil, fmt.Errorf(errMsg) } diff --git a/cmd/deployer/app/resource_handler.go b/cmd/deployer/app/resource_handler.go index 227839206..97dd7af0f 100644 --- a/cmd/deployer/app/resource_handler.go +++ b/cmd/deployer/app/resource_handler.go @@ -17,6 +17,7 @@ package app import ( + "context" "crypto/tls" "net/http" "time" @@ -28,6 +29,7 @@ import ( "google.golang.org/grpc/credentials/insecure" "github.com/cisco-open/flame/pkg/openapi" + pbNotify "github.com/cisco-open/flame/pkg/proto/notification" ) type resourceHandler struct { @@ -35,6 +37,8 @@ type resourceHandler struct { notifierEp string spec openapi.ComputeSpec + stream pbNotify.DeployEventRoute_GetDeployEventClient + grpcDialOpt grpc.DialOption } @@ -81,6 +85,8 @@ func (r *resourceHandler) doStart() { zap.S().Fatalf("Cannot connect with notifier: %v", err) } + r.do() + // if connection is broken right after connection is made, this can cause // too many connection/disconnection events. To migitage that, add some static // pause time. @@ -95,7 +101,62 @@ func (r *resourceHandler) connect() error { zap.S().Debugf("Cannot connect with notifier: %v, conn: %v", err, conn) return err } - zap.S().Infof("Connected with notifier at: %s", r.notifierEp) + zap.S().Infof("Connected with notifier at: %s, will open stream", r.notifierEp) + + client := pbNotify.NewDeployEventRouteClient(conn) + in := &pbNotify.DeployInfo{ + ComputeId: r.spec.ComputeId, + ApiKey: r.spec.ApiKey, + } + + // setup notification stream + stream, err := client.GetDeployEvent(context.Background(), in) + if err != nil { + zap.S().Debugf("Open stream error: %v", err) + return err + } + + r.stream = stream + zap.S().Infof("Opened stream with notifier at %s", r.notifierEp) return nil } + +func (r *resourceHandler) do() { + for { + resp, err := r.stream.Recv() + if err != nil { + zap.S().Errorf("Failed to receive notification: %v", err) + break + } + + r.dealWith(resp) + } + + zap.S().Info("Disconnected from notifier") +} + +func (r *resourceHandler) dealWith(in *pbNotify.DeployEvent) { + switch in.GetType() { + case pbNotify.DeployEventType_ADD_RESOURCE: + r.addResource(in.JobId) + + case pbNotify.DeployEventType_REVOKE_RESOURCE: + r.revokeResource(in.JobId) + + case pbNotify.DeployEventType_UNKNOWN_DEPLOYMENT_TYPE: + fallthrough + default: + zap.S().Errorf("Invalid message type: %s", in.GetType()) + } +} + +func (r *resourceHandler) addResource(jobId string) { + zap.S().Infof("Received add resource request for job %s", jobId) + // TODO: implement addResource method +} + +func (r *resourceHandler) revokeResource(jobId string) { + zap.S().Infof("Received revoke resource request for job %s", jobId) + // TODO: implement revokeResource method +} diff --git a/cmd/notifier/app/eventroute.go b/cmd/notifier/app/eventroute.go index 82fba08fe..3a654d248 100644 --- a/cmd/notifier/app/eventroute.go +++ b/cmd/notifier/app/eventroute.go @@ -45,9 +45,9 @@ func (s *notificationServer) GetJobEvent(in *pbNotify.JobTaskInfo, stream pbNoti case <-stream.Context().Done(): zap.S().Infof("Stream context is done for task %s", taskId) - s.mutex.Lock() - delete(s.eventQueues, taskId) - s.mutex.Unlock() + s.mutexJob.Lock() + delete(s.jobEventQueues, taskId) + s.mutexJob.Unlock() return nil } } @@ -56,14 +56,56 @@ func (s *notificationServer) GetJobEvent(in *pbNotify.JobTaskInfo, stream pbNoti func (s *notificationServer) getJobEventChannel(taskId string) chan *pbNotify.JobEvent { var eventCh chan *pbNotify.JobEvent - s.mutex.Lock() - if _, ok := s.eventQueues[taskId]; !ok { + s.mutexJob.Lock() + if _, ok := s.jobEventQueues[taskId]; !ok { eventCh = make(chan *pbNotify.JobEvent, eventChannelLen) - s.eventQueues[taskId] = eventCh + s.jobEventQueues[taskId] = eventCh } else { - eventCh = s.eventQueues[taskId] + eventCh = s.jobEventQueues[taskId] } - s.mutex.Unlock() + s.mutexJob.Unlock() + + return eventCh +} + +// GetDeployEvent is called by the client to subscribe to the notification service. +// Adds the client to the server client map and stores the client stream. +func (s *notificationServer) GetDeployEvent(in *pbNotify.DeployInfo, stream pbNotify.DeployEventRoute_GetDeployEventServer) error { + zap.S().Debugf("Serving event for deployer %v", in) + + computeId := in.GetComputeId() + + eventCh := s.getDeployEventChannel(computeId) + for { + select { + case event := <-eventCh: + zap.S().Infof("Pushing event %v to deployer %s", event, computeId) + err := stream.Send(event) + if err != nil { + zap.S().Warnf("Failed to push notification to deployer %s: %v", computeId, err) + } + + case <-stream.Context().Done(): + zap.S().Infof("Stream context is done for deployer %s", computeId) + s.mutexDeploy.Lock() + delete(s.deployEventQueues, computeId) + s.mutexDeploy.Unlock() + return nil + } + } +} + +func (s *notificationServer) getDeployEventChannel(computeId string) chan *pbNotify.DeployEvent { + var eventCh chan *pbNotify.DeployEvent + + s.mutexDeploy.Lock() + if _, ok := s.deployEventQueues[computeId]; !ok { + eventCh = make(chan *pbNotify.DeployEvent, eventChannelLen) + s.deployEventQueues[computeId] = eventCh + } else { + eventCh = s.deployEventQueues[computeId] + } + s.mutexDeploy.Unlock() return eventCh } diff --git a/cmd/notifier/app/server.go b/cmd/notifier/app/server.go index d6ebc8f47..d45f32860 100644 --- a/cmd/notifier/app/server.go +++ b/cmd/notifier/app/server.go @@ -31,11 +31,16 @@ import ( // notificationServer implement the notification service and include - proto unimplemented method and // maintains list of connected clients & their streams. type notificationServer struct { - eventQueues map[string]chan *pbNotify.JobEvent - mutex sync.Mutex + jobEventQueues map[string]chan *pbNotify.JobEvent + deployEventQueues map[string]chan *pbNotify.DeployEvent + mutexJob sync.Mutex + mutexDeploy sync.Mutex pbNotify.UnimplementedJobEventRouteServer pbNotify.UnimplementedJobTriggerRouteServer + + pbNotify.UnimplementedDeployEventRouteServer + pbNotify.UnimplementedDeployTriggerRouteServer } // StartGRPCService starts the notification grpc server @@ -48,13 +53,18 @@ func StartGRPCService(portNo uint16) { // create grpc server s := grpc.NewServer() server := ¬ificationServer{ - eventQueues: make(map[string]chan *pbNotify.JobEvent), + jobEventQueues: make(map[string]chan *pbNotify.JobEvent), + deployEventQueues: make(map[string]chan *pbNotify.DeployEvent), } - // register grpc services + // register job grpc services pbNotify.RegisterJobEventRouteServer(s, server) pbNotify.RegisterJobTriggerRouteServer(s, server) + // register deploy grpc services + pbNotify.RegisterDeployEventRouteServer(s, server) + pbNotify.RegisterDeployTriggerRouteServer(s, server) + reflection.Register(s) zap.S().Infof("Notification GRPC server listening at %v", lis.Addr()) diff --git a/cmd/notifier/app/triggerroute.go b/cmd/notifier/app/triggerroute.go index 792e4e891..4ebfeabd4 100644 --- a/cmd/notifier/app/triggerroute.go +++ b/cmd/notifier/app/triggerroute.go @@ -34,7 +34,7 @@ func (s *notificationServer) NotifyJob(ctx context.Context, in *pbNotify.JobEven case pbNotify.JobEventType_UNKNOWN_EVENT_TYPE: fallthrough default: - return nil, fmt.Errorf("unknown event type: %s", in.GetType()) + return nil, fmt.Errorf("unknown job event type: %s", in.GetType()) } failedTasks := make([]string, 0) @@ -72,3 +72,51 @@ func (s *notificationServer) NotifyJob(ctx context.Context, in *pbNotify.JobEven return resp, nil } + +func (s *notificationServer) NotifyDeploy(ctx context.Context, in *pbNotify.DeployEventRequest) (*pbNotify.DeployResponse, error) { + zap.S().Info("TriggerRoute - received message from controller to %v for compute %v", in.GetType(), in.ComputeIds) + switch in.Type { + case pbNotify.DeployEventType_ADD_RESOURCE: + case pbNotify.DeployEventType_REVOKE_RESOURCE: + + case pbNotify.DeployEventType_UNKNOWN_DEPLOYMENT_TYPE: + fallthrough + default: + return nil, fmt.Errorf("unknown deploy event type: %s", in.GetType()) + } + + failedDeployers := make([]string, 0) + for _, computeId := range in.ComputeIds { + event := pbNotify.DeployEvent{ + Type: in.Type, + JobId: in.JobId, + } + + eventCh := s.getDeployEventChannel(computeId) + + select { + case eventCh <- &event: + // Do nothing + default: + failedDeployers = append(failedDeployers, computeId) + } + } + + resp := &pbNotify.DeployResponse{ + Status: pbNotify.DeployResponse_SUCCESS, + Message: "Successfully issued deployment instructions to deployers", + FailedDeployers: failedDeployers, + } + + if len(in.ComputeIds) > 0 && len(failedDeployers) == len(in.ComputeIds) { + resp.Message = "Failed to issue deployment instructions for all deployers" + resp.Status = pbNotify.DeployResponse_ERROR + } else if len(failedDeployers) > 0 && len(failedDeployers) < len(in.ComputeIds) { + resp.Message = "Issued deployment instructions for some deployers successfully" + resp.Status = pbNotify.DeployResponse_PARTIAL_SUCCESS + } + + zap.S().Info(resp.Message) + + return resp, nil +}