Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
marcoandredinis committed Oct 15, 2024
1 parent a550846 commit d452cdd
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 11 deletions.
9 changes: 9 additions & 0 deletions api/types/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -1093,6 +1093,15 @@ const (
// NotificationUserCreatedWarningSubKind is the subkind for a user-created warning notification.
NotificationUserCreatedWarningSubKind = "user-created-warning"

// NotificationUserTaskIntegrationSubKind is the subkind for a notification that warns the user about pending User Tasks for a given integration.
NotificationUserTaskIntegrationSubKind = "user-task-integration"
// NotificationIntegrationSubKindLabel is the label which contains the subkind of the integration.
// To be used with NotificationUserTaskIntegrationSubKind.
NotificationIntegrationSubKindLabel = TeleportInternalLabelPrefix + "integration-sub-kind"
// NotificationIntegrationLabel is the label which contains the name of the integration.
// To be used with NotificationUserTaskIntegrationSubKind.
NotificationIntegrationLabel = TeleportInternalLabelPrefix + "integration-name"

// NotificationAccessRequestPendingSubKind is the subkind for a notification for an access request pending review.
NotificationAccessRequestPendingSubKind = "access-request-pending"
// NotificationAccessRequestApprovedSubKind is the subkind for a notification for a user's access request being approved.
Expand Down
9 changes: 9 additions & 0 deletions lib/auth/authclient/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
integrationpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/integration/v1"
kubewaitingcontainerpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/kubewaitingcontainer/v1"
machineidv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/machineid/v1"
notificationsv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/notifications/v1"
userprovisioningpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/userprovisioning/v2"
userspb "github.com/gravitational/teleport/api/gen/proto/go/teleport/users/v1"
usertasksv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/usertasks/v1"
Expand Down Expand Up @@ -809,6 +810,9 @@ type DiscoveryAccessPoint interface {

// UpsertUserTask creates or updates an User Task
UpsertUserTask(ctx context.Context, req *usertasksv1.UserTask) (*usertasksv1.UserTask, error)

// UpsertGlobalNotification upserts a global notification.
UpsertGlobalNotification(ctx context.Context, globalNotification *notificationsv1.GlobalNotification) (*notificationsv1.GlobalNotification, error)
}

// ReadOktaAccessPoint is a read only API interface to be
Expand Down Expand Up @@ -1459,6 +1463,11 @@ func (w *DiscoveryWrapper) UpsertUserTask(ctx context.Context, req *usertasksv1.
return w.NoCache.UpsertUserTask(ctx, req)
}

// UpsertGlobalNotification upserts a global notification.
func (w *DiscoveryWrapper) UpsertGlobalNotification(ctx context.Context, globalNotification *notificationsv1.GlobalNotification) (*notificationsv1.GlobalNotification, error) {
return w.NoCache.UpsertGlobalNotification(ctx, globalNotification)
}

// Close closes all associated resources
func (w *DiscoveryWrapper) Close() error {
err := w.NoCache.Close()
Expand Down
1 change: 1 addition & 0 deletions lib/authz/permissions.go
Original file line number Diff line number Diff line change
Expand Up @@ -1199,6 +1199,7 @@ func definitionForBuiltinRole(clusterName string, recConfig readonly.SessionReco
types.NewRule(types.KindIntegration, append(services.RO(), types.VerbUse)),
types.NewRule(types.KindSemaphore, services.RW()),
types.NewRule(types.KindUserTask, services.RW()),
types.NewRule(types.KindNotification, services.RW()),
},
// Discovery service should only access kubes/apps/dbs that originated from discovery.
KubernetesLabels: types.Labels{types.OriginLabel: []string{types.OriginCloud}},
Expand Down
59 changes: 59 additions & 0 deletions lib/services/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,14 @@ package services

import (
"context"
"encoding/binary"
"time"

"github.com/google/uuid"
"github.com/gravitational/trace"
"google.golang.org/protobuf/types/known/timestamppb"

headerv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/header/v1"
notificationsv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/notifications/v1"
"github.com/gravitational/teleport/api/types"
)
Expand Down Expand Up @@ -106,6 +111,60 @@ func ValidateGlobalNotification(globalNotification *notificationsv1.GlobalNotifi
return nil
}

// notificationNameForIntegration returns a deterministic name for a Notification of type "user-task-integration" for a given Integration name.
// This method is used to ensure a single Notification is created to report issues related to User Task Integrations for a given integration.
func notificationNameForIntegration(integration string) string {
var bs []byte
bs = append(bs, binary.LittleEndian.AppendUint64(nil, uint64(len(integration)))...)
bs = append(bs, []byte(integration)...)
return uuid.NewSHA1(notificationNameForIntegrationNamespace, bs).String()
}

// notificationNameForIntegrationNamespace is an UUID that represents the name space to be used for generating UUIDs for DiscoverEC2 User Task names.
var notificationNameForIntegrationNamespace = uuid.Must(uuid.Parse("6ba7b815-9dad-11d1-80b4-00c04fd430c9"))

// NotificationForIntegrationPendingTasks builds a GlobalNotification for notifying users about UserTasks for an Integration.
func NotificationForIntegrationPendingTasks(integrationName string, expiration *timestamppb.Timestamp, createdTime time.Time) *notificationsv1.GlobalNotification {
resourceName := notificationNameForIntegration(integrationName)
return &notificationsv1.GlobalNotification{
Kind: types.KindGlobalNotification,
Version: types.V1,
Metadata: &headerv1.Metadata{
Name: resourceName,
Expires: expiration,
},
Spec: &notificationsv1.GlobalNotificationSpec{
Matcher: &notificationsv1.GlobalNotificationSpec_ByPermissions{
ByPermissions: &notificationsv1.ByPermissions{
RoleConditions: []*types.RoleConditions{{
Rules: []types.Rule{{
Resources: []string{types.KindIntegration},
Verbs: []string{types.VerbList, types.VerbRead},
}},
}},
},
},
Notification: &notificationsv1.Notification{
Kind: types.KindGlobalNotification,
Version: types.V1,
Spec: &notificationsv1.NotificationSpec{
Created: timestamppb.New(createdTime),
},
SubKind: types.NotificationUserTaskIntegrationSubKind,
Metadata: &headerv1.Metadata{
Name: resourceName,
Expires: expiration,
Labels: map[string]string{
types.NotificationTitleLabel: "Your integration needs attention.",
types.NotificationIntegrationLabel: integrationName,
types.NotificationScope: "global",
},
},
},
},
}
}

// MarshalGlobalNotification marshals a GlobalNotification resource to JSON.
func MarshalGlobalNotification(globalNotification *notificationsv1.GlobalNotification, opts ...MarshalOption) ([]byte, error) {
if err := ValidateGlobalNotification(globalNotification); err != nil {
Expand Down
40 changes: 29 additions & 11 deletions lib/srv/discovery/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ func (s *Server) acquireSemaphoreForUserTask(userTaskName string) (releaseFn fun
// merges them against the ones that exist in the cluster.
//
// All of this flow is protected by a lock to ensure there's no race between this and other DiscoveryServices.
func (s *Server) mergeUpsertDiscoverEC2Task(taskGroup awsEC2TaskKey, failedInstances map[string]*usertasksv1.DiscoverEC2Instance) error {
func (s *Server) mergeUpsertDiscoverEC2Task(taskGroup awsEC2TaskKey, failedInstances map[string]*usertasksv1.DiscoverEC2Instance) (*usertasksv1.UserTask, error) {
userTaskName := usertasks.TaskNameForDiscoverEC2(usertasks.TaskNameForDiscoverEC2Parts{
Integration: taskGroup.integration,
IssueType: taskGroup.issueType,
Expand All @@ -439,7 +439,7 @@ func (s *Server) mergeUpsertDiscoverEC2Task(taskGroup awsEC2TaskKey, failedInsta

releaseFn, ctxWithLease, err := s.acquireSemaphoreForUserTask(userTaskName)
if err != nil {
return trace.Wrap(err)
return nil, trace.Wrap(err)
}
defer releaseFn()

Expand All @@ -448,7 +448,7 @@ func (s *Server) mergeUpsertDiscoverEC2Task(taskGroup awsEC2TaskKey, failedInsta
switch {
case trace.IsNotFound(err):
case err != nil:
return trace.Wrap(err)
return nil, trace.Wrap(err)
default:
failedInstances = s.discoverEC2UserTaskAddExistingInstances(currentUserTask, failedInstances)
}
Expand All @@ -472,14 +472,11 @@ func (s *Server) mergeUpsertDiscoverEC2Task(taskGroup awsEC2TaskKey, failedInsta
usertasks.WithExpiration(taskExpiration),
)
if err != nil {
return trace.Wrap(err)
}

if _, err := s.AccessPoint.UpsertUserTask(ctxWithLease, task); err != nil {
return trace.Wrap(err)
return nil, trace.Wrap(err)
}

return nil
taskUpserted, err := s.AccessPoint.UpsertUserTask(ctxWithLease, task)
return taskUpserted, trace.Wrap(err)
}

// discoverEC2UserTaskAddExistingInstances takes the UserTask stored in the cluster and merges it into the existing map of failed instances.
Expand All @@ -506,6 +503,16 @@ func (s *Server) discoverEC2UserTaskAddExistingInstances(currentUserTask *userta
return failedInstances
}

func (s *Server) upsertUserNotificationForIntegration(integrationName string, expiration *timestamppb.Timestamp) error {
notificationForIntegrationPendingTasks := services.NotificationForIntegrationPendingTasks(integrationName, expiration, s.clock.Now())

s.Log.WithField("Notification", notificationForIntegrationPendingTasks).Warn("Notification upsert will fail")
if _, err := s.AccessPoint.UpsertGlobalNotification(s.ctx, notificationForIntegrationPendingTasks); err != nil {
return trace.Wrap(err)
}
return nil
}

func (s *Server) upsertTasksForAWSEC2FailedEnrollments() {
s.awsEC2Tasks.mu.Lock()
defer s.awsEC2Tasks.mu.Unlock()
Expand All @@ -515,17 +522,28 @@ func (s *Server) upsertTasksForAWSEC2FailedEnrollments() {
continue
}

if err := s.mergeUpsertDiscoverEC2Task(g, instancesIssueByID); err != nil {
upsertedTask, err := s.mergeUpsertDiscoverEC2Task(g, instancesIssueByID)
if err != nil {
s.Log.WithError(err).WithFields(logrus.Fields{
"integration": g.integration,
"issue_type": g.issueType,
"aws_account_id": g.accountID,
"aws_region": g.region,
},
).Warning("Failed to create discover ec2 user task.", g.integration, g.issueType, g.accountID, g.region)
).Warning("Failed to create discover ec2 user task.")
continue
}

delete(s.awsEC2Tasks.issuesSyncQueue, g)

if err := s.upsertUserNotificationForIntegration(g.integration, upsertedTask.GetMetadata().GetExpires()); err != nil {
s.Log.WithError(err).WithFields(logrus.Fields{
"integration": g.integration,
"issue_type": g.issueType,
"aws_account_id": g.accountID,
"aws_region": g.region,
},
).Warning("Failed to notify user about new task for integration.")
}
}
}

0 comments on commit d452cdd

Please sign in to comment.