From d629c3379579016986e4e7587505da89e691cf75 Mon Sep 17 00:00:00 2001 From: Forrest Marshall Date: Mon, 26 Aug 2024 14:28:32 -0700 Subject: [PATCH] fix duplicate session recording creation --- lib/events/filesessions/fileasync.go | 7 +++- .../s3sessions/s3handler_thirdparty_test.go | 3 ++ lib/events/stream.go | 14 ++++++-- lib/events/test/streamsuite.go | 36 +++++++++++++++++++ 4 files changed, 57 insertions(+), 3 deletions(-) diff --git a/lib/events/filesessions/fileasync.go b/lib/events/filesessions/fileasync.go index 3359bcee968b..17c71a1c745c 100644 --- a/lib/events/filesessions/fileasync.go +++ b/lib/events/filesessions/fileasync.go @@ -538,7 +538,12 @@ func (u *Uploader) upload(ctx context.Context, up *upload) error { } return trace.ConnectionProblem(nil, "upload stream terminated unexpectedly") - case <-stream.Status(): + case status := <-stream.Status(): + if err := up.writeStatus(status); err != nil { + // all other stream status writes are optimistic, but we want to make sure the initial + // status is written to disk so that we don't create orphaned multipart uploads. + return trace.Errorf("failed to write initial stream status: %v", err) + } case <-time.After(events.NetworkRetryDuration): return trace.ConnectionProblem(nil, "timeout waiting for stream status update") case <-ctx.Done(): diff --git a/lib/events/s3sessions/s3handler_thirdparty_test.go b/lib/events/s3sessions/s3handler_thirdparty_test.go index 1eede3cb4910..9849dbe3fa6f 100644 --- a/lib/events/s3sessions/s3handler_thirdparty_test.go +++ b/lib/events/s3sessions/s3handler_thirdparty_test.go @@ -67,6 +67,9 @@ func TestThirdpartyStreams(t *testing.T) { t.Run("UploadDownload", func(t *testing.T) { test.UploadDownload(t, handler) }) + t.Run("StreamEmpty", func(t *testing.T) { + test.StreamEmpty(t, handler) + }) t.Run("DownloadNotFound", func(t *testing.T) { test.DownloadNotFound(t, handler) }) diff --git a/lib/events/stream.go b/lib/events/stream.go index f92bb01996db..aa78605a72d7 100644 --- a/lib/events/stream.go +++ b/lib/events/stream.go @@ -533,7 +533,7 @@ func (w *sliceWriter) receiveAndUpload() error { return nil case <-w.proto.completeCtx.Done(): // if present, send remaining data for upload - if w.current != nil { + if w.current != nil && !w.current.isEmpty() { // mark that the current part is last (last parts are allowed to be // smaller than the certain size, otherwise the padding // have to be added (this is due to S3 API limits) @@ -572,7 +572,7 @@ func (w *sliceWriter) receiveAndUpload() error { // there is no need to schedule a timer until the next // event occurs, set the timer channel to nil flushCh = nil - if w.current != nil { + if w.current != nil && !w.current.isEmpty() { log.Debugf("Inactivity timer ticked at %v, inactivity period: %v exceeded threshold and have data. Flushing.", now, inactivityPeriod) if err := w.startUploadCurrentSlice(); err != nil { return trace.Wrap(err) @@ -853,6 +853,7 @@ type slice struct { buffer *bytes.Buffer isLast bool lastEventIndex int64 + eventCount uint64 } // reader returns a reader for the bytes written, no writes should be done after this @@ -895,11 +896,19 @@ func (s *slice) shouldUpload() bool { return int64(s.buffer.Len()) >= s.proto.cfg.MinUploadBytes } +// isEmpty returns true if the slice hasn't had any events written to +// it yet. +func (s *slice) isEmpty() bool { + return s.eventCount == 0 +} + // recordEvent emits a single session event to the stream func (s *slice) recordEvent(event protoEvent) error { bytes := s.proto.cfg.SlicePool.Get() defer s.proto.cfg.SlicePool.Put(bytes) + s.eventCount++ + messageSize := event.oneof.Size() recordSize := ProtoStreamV1RecordHeaderSize + messageSize @@ -907,6 +916,7 @@ func (s *slice) recordEvent(event protoEvent) error { return trace.BadParameter( "error in buffer allocation, expected size to be >= %v, got %v", recordSize, len(bytes)) } + binary.BigEndian.PutUint32(bytes, uint32(messageSize)) _, err := event.oneof.MarshalTo(bytes[Int32Size:]) if err != nil { diff --git a/lib/events/test/streamsuite.go b/lib/events/test/streamsuite.go index fca49071373d..ea8d0f169b43 100644 --- a/lib/events/test/streamsuite.go +++ b/lib/events/test/streamsuite.go @@ -28,6 +28,7 @@ import ( "github.com/gravitational/teleport/lib/events" "github.com/gravitational/teleport/lib/events/eventstest" + "github.com/gravitational/teleport/lib/fixtures" "github.com/gravitational/teleport/lib/session" ) @@ -149,6 +150,41 @@ func StreamWithPermutedParameters(t *testing.T, handler events.MultipartHandler, } } +// StreamEmpty verifies stream upload with zero events gets correctly discarded. This behavior is +// necessary in order to prevent a bug where agents might think they have failed to create a multipart +// upload and create a new one, resulting in duplicate recordings overwriiting each other. +func StreamEmpty(t *testing.T, handler events.MultipartHandler) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + sid := session.NewID() + + streamer, err := events.NewProtoStreamer(events.ProtoStreamerConfig{ + Uploader: handler, + MinUploadBytes: 1024, + ConcurrentUploads: 2, + }) + require.NoError(t, err) + + stream, err := streamer.CreateAuditStream(ctx, sid) + require.NoError(t, err) + + select { + case status := <-stream.Status(): + require.Equal(t, status.LastEventIndex, int64(-1)) + case <-time.After(time.Second): + t.Fatalf("Timed out waiting for status update.") + } + + require.NoError(t, stream.Complete(ctx)) + + f, err := os.CreateTemp("", string(sid)) + require.NoError(t, err) + defer os.Remove(f.Name()) + + fixtures.AssertNotFound(t, handler.Download(ctx, sid, f)) +} + // StreamWithParameters tests stream upload and subsequent download and reads the results func StreamWithParameters(t *testing.T, handler events.MultipartHandler, params StreamParams) { ctx := context.TODO()