diff --git a/cmd/controller/app/job/default_handler.go b/cmd/controller/app/job/default_handler.go index ae9671291..fab93b0cb 100644 --- a/cmd/controller/app/job/default_handler.go +++ b/cmd/controller/app/job/default_handler.go @@ -214,6 +214,10 @@ func (h *DefaultHandler) ChangeState(state JobHandlerState) { } func (h *DefaultHandler) allocateComputes() error { + // Placeholder invocation for new deployer + h.notifyDeploy(pbNotify.DeployEventType_ADD_RESOURCE) + zap.S().Infof("Invoking notifyDeploy - add resource - from allocateComputes()") + deploymentChartPath := filepath.Join(deploymentDirPath, h.jobId) targetTemplateDirPath := filepath.Join(deploymentChartPath, deploymentTemplateDir) if err := os.MkdirAll(targetTemplateDirPath, util.FilePerm0644); err != nil { @@ -297,7 +301,7 @@ func (h *DefaultHandler) allocateComputes() error { return nil } -func (h *DefaultHandler) notify(evtType pbNotify.JobEventType) error { +func (h *DefaultHandler) notifyJob(evtType pbNotify.JobEventType) error { req := &pbNotify.JobEventRequest{ Type: evtType, TaskIds: make([]string, 0), @@ -308,12 +312,32 @@ func (h *DefaultHandler) notify(evtType pbNotify.JobEventType) error { req.TaskIds = append(req.TaskIds, taskInfo.TaskId) } - resp, err := newNotifyClient(h.notifier, h.bInsecure, h.bPlain).sendNotification(req) + resp, err := newNotifyClient(h.notifier, h.bInsecure, h.bPlain).sendJobNotification(req) + if err != nil { + return fmt.Errorf("failed to notify for job: %v", err) + } + + zap.S().Infof("response status from notifyJob = %s", resp.Status.String()) + + return nil +} + +func (h *DefaultHandler) notifyDeploy(evtType pbNotify.DeployEventType) error { + // TODO: add computeId field to tasks and remove hard-coding + computeIds := []string{"compute-1"} + + req := &pbNotify.DeployEventRequest{ + Type: evtType, + ComputeIds: computeIds, + JobId: h.jobId, + } + + resp, err := newNotifyClient(h.notifier, h.bInsecure, h.bPlain).sendDeployNotification(req) if err != nil { - return fmt.Errorf("failed to notify: %v", err) + return fmt.Errorf("failed to notify for deployment: %v", err) } - zap.S().Infof("response status = %s", resp.Status.String()) + zap.S().Infof("response status from notifyDeploy = %s", resp.Status.String()) return nil } diff --git a/cmd/controller/app/job/job_state.go b/cmd/controller/app/job/job_state.go index 65cee1710..9b96717ed 100644 --- a/cmd/controller/app/job/job_state.go +++ b/cmd/controller/app/job/job_state.go @@ -232,7 +232,7 @@ func (s *StateStarting) Deploy(event *JobEvent) { } // send a job start message to notifier - err = s.hdlr.notify(pbNotify.JobEventType_START_JOB) + err = s.hdlr.notifyJob(pbNotify.JobEventType_START_JOB) if err != nil { zap.S().Debugf("%v", err) diff --git a/cmd/controller/app/job/notify.go b/cmd/controller/app/job/notify.go index 2027c9b63..50d497b62 100644 --- a/cmd/controller/app/job/notify.go +++ b/cmd/controller/app/job/notify.go @@ -55,22 +55,45 @@ func newNotifyClient(endpoint string, bInsecure bool, bPlain bool) *notifyClient return ¬ifyClient{endpoint: endpoint, grpcDialOpt: grpcDialOpt} } -// sendNotification sends a notification request to the notifier -func (nc *notifyClient) sendNotification(req *pbNotify.JobEventRequest) (*pbNotify.JobResponse, error) { +// sendJobNotification sends a job notification request to the notifier +func (nc *notifyClient) sendJobNotification(req *pbNotify.JobEventRequest) (*pbNotify.JobResponse, error) { conn, err := grpc.Dial(nc.endpoint, nc.grpcDialOpt) if err != nil { - return nil, fmt.Errorf("failed to connect to notifier: %v", err) + return nil, fmt.Errorf("sendJobNotification failed to connect to notifier: %v", err) } defer conn.Close() trClient := pbNotify.NewJobTriggerRouteClient(conn) - zap.S().Infof("Successfully connected to notifier: %s", nc.endpoint) + zap.S().Infof("sendJobNotification successfully connected to notifier: %s", nc.endpoint) response, err := trClient.NotifyJob(context.Background(), req) if err != nil { - errMsg := fmt.Sprintf("notification failed: %v", err) + errMsg := fmt.Sprintf("sendJobNotification failed: %v", err) + zap.S().Warn(errMsg) + return nil, fmt.Errorf(errMsg) + } + + return response, nil +} + +// sendDeployNotification sends a deployment notification request to the notifier +func (nc *notifyClient) sendDeployNotification(req *pbNotify.DeployEventRequest) (*pbNotify.DeployResponse, error) { + conn, err := grpc.Dial(nc.endpoint, nc.grpcDialOpt) + if err != nil { + return nil, fmt.Errorf("sendDeployNotification failed to connect to notifier: %v", err) + } + defer conn.Close() + + trClient := pbNotify.NewDeployTriggerRouteClient(conn) + + zap.S().Infof("sendDeployNotification successfully connected to notifier: %s", nc.endpoint) + + response, err := trClient.NotifyDeploy(context.Background(), req) + + if err != nil { + errMsg := fmt.Sprintf("sendDeployNotification failed: %v", err) zap.S().Warn(errMsg) return nil, fmt.Errorf(errMsg) } diff --git a/cmd/deployer/app/resource_handler.go b/cmd/deployer/app/resource_handler.go index 227839206..97dd7af0f 100644 --- a/cmd/deployer/app/resource_handler.go +++ b/cmd/deployer/app/resource_handler.go @@ -17,6 +17,7 @@ package app import ( + "context" "crypto/tls" "net/http" "time" @@ -28,6 +29,7 @@ import ( "google.golang.org/grpc/credentials/insecure" "github.com/cisco-open/flame/pkg/openapi" + pbNotify "github.com/cisco-open/flame/pkg/proto/notification" ) type resourceHandler struct { @@ -35,6 +37,8 @@ type resourceHandler struct { notifierEp string spec openapi.ComputeSpec + stream pbNotify.DeployEventRoute_GetDeployEventClient + grpcDialOpt grpc.DialOption } @@ -81,6 +85,8 @@ func (r *resourceHandler) doStart() { zap.S().Fatalf("Cannot connect with notifier: %v", err) } + r.do() + // if connection is broken right after connection is made, this can cause // too many connection/disconnection events. To migitage that, add some static // pause time. @@ -95,7 +101,62 @@ func (r *resourceHandler) connect() error { zap.S().Debugf("Cannot connect with notifier: %v, conn: %v", err, conn) return err } - zap.S().Infof("Connected with notifier at: %s", r.notifierEp) + zap.S().Infof("Connected with notifier at: %s, will open stream", r.notifierEp) + + client := pbNotify.NewDeployEventRouteClient(conn) + in := &pbNotify.DeployInfo{ + ComputeId: r.spec.ComputeId, + ApiKey: r.spec.ApiKey, + } + + // setup notification stream + stream, err := client.GetDeployEvent(context.Background(), in) + if err != nil { + zap.S().Debugf("Open stream error: %v", err) + return err + } + + r.stream = stream + zap.S().Infof("Opened stream with notifier at %s", r.notifierEp) return nil } + +func (r *resourceHandler) do() { + for { + resp, err := r.stream.Recv() + if err != nil { + zap.S().Errorf("Failed to receive notification: %v", err) + break + } + + r.dealWith(resp) + } + + zap.S().Info("Disconnected from notifier") +} + +func (r *resourceHandler) dealWith(in *pbNotify.DeployEvent) { + switch in.GetType() { + case pbNotify.DeployEventType_ADD_RESOURCE: + r.addResource(in.JobId) + + case pbNotify.DeployEventType_REVOKE_RESOURCE: + r.revokeResource(in.JobId) + + case pbNotify.DeployEventType_UNKNOWN_DEPLOYMENT_TYPE: + fallthrough + default: + zap.S().Errorf("Invalid message type: %s", in.GetType()) + } +} + +func (r *resourceHandler) addResource(jobId string) { + zap.S().Infof("Received add resource request for job %s", jobId) + // TODO: implement addResource method +} + +func (r *resourceHandler) revokeResource(jobId string) { + zap.S().Infof("Received revoke resource request for job %s", jobId) + // TODO: implement revokeResource method +} diff --git a/cmd/notifier/app/eventroute.go b/cmd/notifier/app/eventroute.go index 82fba08fe..3a654d248 100644 --- a/cmd/notifier/app/eventroute.go +++ b/cmd/notifier/app/eventroute.go @@ -45,9 +45,9 @@ func (s *notificationServer) GetJobEvent(in *pbNotify.JobTaskInfo, stream pbNoti case <-stream.Context().Done(): zap.S().Infof("Stream context is done for task %s", taskId) - s.mutex.Lock() - delete(s.eventQueues, taskId) - s.mutex.Unlock() + s.mutexJob.Lock() + delete(s.jobEventQueues, taskId) + s.mutexJob.Unlock() return nil } } @@ -56,14 +56,56 @@ func (s *notificationServer) GetJobEvent(in *pbNotify.JobTaskInfo, stream pbNoti func (s *notificationServer) getJobEventChannel(taskId string) chan *pbNotify.JobEvent { var eventCh chan *pbNotify.JobEvent - s.mutex.Lock() - if _, ok := s.eventQueues[taskId]; !ok { + s.mutexJob.Lock() + if _, ok := s.jobEventQueues[taskId]; !ok { eventCh = make(chan *pbNotify.JobEvent, eventChannelLen) - s.eventQueues[taskId] = eventCh + s.jobEventQueues[taskId] = eventCh } else { - eventCh = s.eventQueues[taskId] + eventCh = s.jobEventQueues[taskId] } - s.mutex.Unlock() + s.mutexJob.Unlock() + + return eventCh +} + +// GetDeployEvent is called by the client to subscribe to the notification service. +// Adds the client to the server client map and stores the client stream. +func (s *notificationServer) GetDeployEvent(in *pbNotify.DeployInfo, stream pbNotify.DeployEventRoute_GetDeployEventServer) error { + zap.S().Debugf("Serving event for deployer %v", in) + + computeId := in.GetComputeId() + + eventCh := s.getDeployEventChannel(computeId) + for { + select { + case event := <-eventCh: + zap.S().Infof("Pushing event %v to deployer %s", event, computeId) + err := stream.Send(event) + if err != nil { + zap.S().Warnf("Failed to push notification to deployer %s: %v", computeId, err) + } + + case <-stream.Context().Done(): + zap.S().Infof("Stream context is done for deployer %s", computeId) + s.mutexDeploy.Lock() + delete(s.deployEventQueues, computeId) + s.mutexDeploy.Unlock() + return nil + } + } +} + +func (s *notificationServer) getDeployEventChannel(computeId string) chan *pbNotify.DeployEvent { + var eventCh chan *pbNotify.DeployEvent + + s.mutexDeploy.Lock() + if _, ok := s.deployEventQueues[computeId]; !ok { + eventCh = make(chan *pbNotify.DeployEvent, eventChannelLen) + s.deployEventQueues[computeId] = eventCh + } else { + eventCh = s.deployEventQueues[computeId] + } + s.mutexDeploy.Unlock() return eventCh } diff --git a/cmd/notifier/app/server.go b/cmd/notifier/app/server.go index d6ebc8f47..d45f32860 100644 --- a/cmd/notifier/app/server.go +++ b/cmd/notifier/app/server.go @@ -31,11 +31,16 @@ import ( // notificationServer implement the notification service and include - proto unimplemented method and // maintains list of connected clients & their streams. type notificationServer struct { - eventQueues map[string]chan *pbNotify.JobEvent - mutex sync.Mutex + jobEventQueues map[string]chan *pbNotify.JobEvent + deployEventQueues map[string]chan *pbNotify.DeployEvent + mutexJob sync.Mutex + mutexDeploy sync.Mutex pbNotify.UnimplementedJobEventRouteServer pbNotify.UnimplementedJobTriggerRouteServer + + pbNotify.UnimplementedDeployEventRouteServer + pbNotify.UnimplementedDeployTriggerRouteServer } // StartGRPCService starts the notification grpc server @@ -48,13 +53,18 @@ func StartGRPCService(portNo uint16) { // create grpc server s := grpc.NewServer() server := ¬ificationServer{ - eventQueues: make(map[string]chan *pbNotify.JobEvent), + jobEventQueues: make(map[string]chan *pbNotify.JobEvent), + deployEventQueues: make(map[string]chan *pbNotify.DeployEvent), } - // register grpc services + // register job grpc services pbNotify.RegisterJobEventRouteServer(s, server) pbNotify.RegisterJobTriggerRouteServer(s, server) + // register deploy grpc services + pbNotify.RegisterDeployEventRouteServer(s, server) + pbNotify.RegisterDeployTriggerRouteServer(s, server) + reflection.Register(s) zap.S().Infof("Notification GRPC server listening at %v", lis.Addr()) diff --git a/cmd/notifier/app/triggerroute.go b/cmd/notifier/app/triggerroute.go index 792e4e891..4ebfeabd4 100644 --- a/cmd/notifier/app/triggerroute.go +++ b/cmd/notifier/app/triggerroute.go @@ -34,7 +34,7 @@ func (s *notificationServer) NotifyJob(ctx context.Context, in *pbNotify.JobEven case pbNotify.JobEventType_UNKNOWN_EVENT_TYPE: fallthrough default: - return nil, fmt.Errorf("unknown event type: %s", in.GetType()) + return nil, fmt.Errorf("unknown job event type: %s", in.GetType()) } failedTasks := make([]string, 0) @@ -72,3 +72,51 @@ func (s *notificationServer) NotifyJob(ctx context.Context, in *pbNotify.JobEven return resp, nil } + +func (s *notificationServer) NotifyDeploy(ctx context.Context, in *pbNotify.DeployEventRequest) (*pbNotify.DeployResponse, error) { + zap.S().Info("TriggerRoute - received message from controller to %v for compute %v", in.GetType(), in.ComputeIds) + switch in.Type { + case pbNotify.DeployEventType_ADD_RESOURCE: + case pbNotify.DeployEventType_REVOKE_RESOURCE: + + case pbNotify.DeployEventType_UNKNOWN_DEPLOYMENT_TYPE: + fallthrough + default: + return nil, fmt.Errorf("unknown deploy event type: %s", in.GetType()) + } + + failedDeployers := make([]string, 0) + for _, computeId := range in.ComputeIds { + event := pbNotify.DeployEvent{ + Type: in.Type, + JobId: in.JobId, + } + + eventCh := s.getDeployEventChannel(computeId) + + select { + case eventCh <- &event: + // Do nothing + default: + failedDeployers = append(failedDeployers, computeId) + } + } + + resp := &pbNotify.DeployResponse{ + Status: pbNotify.DeployResponse_SUCCESS, + Message: "Successfully issued deployment instructions to deployers", + FailedDeployers: failedDeployers, + } + + if len(in.ComputeIds) > 0 && len(failedDeployers) == len(in.ComputeIds) { + resp.Message = "Failed to issue deployment instructions for all deployers" + resp.Status = pbNotify.DeployResponse_ERROR + } else if len(failedDeployers) > 0 && len(failedDeployers) < len(in.ComputeIds) { + resp.Message = "Issued deployment instructions for some deployers successfully" + resp.Status = pbNotify.DeployResponse_PARTIAL_SUCCESS + } + + zap.S().Info(resp.Message) + + return resp, nil +}