Skip to content

Commit

Permalink
fix duplicate session recording creation
Browse files Browse the repository at this point in the history
  • Loading branch information
fspmarshall committed Sep 3, 2024
1 parent c0241f3 commit 3c489ef
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 3 deletions.
7 changes: 6 additions & 1 deletion lib/events/filesessions/fileasync.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,12 @@ func (u *Uploader) upload(ctx context.Context, up *upload) error {
}

return trace.ConnectionProblem(nil, "upload stream terminated unexpectedly")
case <-stream.Status():
case status := <-stream.Status():
if err := up.writeStatus(status); err != nil {
// all other stream status writes are optimistic, but we want to make sure the initial
// status is written to disk so that we don't create orphaned multipart uploads.
return trace.Errorf("failed to write initial stream status: %v", err)
}
case <-time.After(events.NetworkRetryDuration):
return trace.ConnectionProblem(nil, "timeout waiting for stream status update")
case <-ctx.Done():
Expand Down
3 changes: 3 additions & 0 deletions lib/events/s3sessions/s3handler_thirdparty_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ func TestThirdpartyStreams(t *testing.T) {
t.Run("UploadDownload", func(t *testing.T) {
test.UploadDownload(t, handler)
})
t.Run("StreamEmpty", func(t *testing.T) {
test.StreamEmpty(t, handler)
})
t.Run("DownloadNotFound", func(t *testing.T) {
test.DownloadNotFound(t, handler)
})
Expand Down
14 changes: 12 additions & 2 deletions lib/events/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ func (w *sliceWriter) receiveAndUpload() error {
return nil
case <-w.proto.completeCtx.Done():
// if present, send remaining data for upload
if w.current != nil {
if w.current != nil && !w.current.isEmpty() {
// mark that the current part is last (last parts are allowed to be
// smaller than the certain size, otherwise the padding
// have to be added (this is due to S3 API limits)
Expand Down Expand Up @@ -574,7 +574,7 @@ func (w *sliceWriter) receiveAndUpload() error {
// there is no need to schedule a timer until the next
// event occurs, set the timer channel to nil
flushCh = nil
if w.current != nil {
if w.current != nil && !w.current.isEmpty() {
log.Debugf("Inactivity timer ticked at %v, inactivity period: %v exceeded threshold and have data. Flushing.", now, inactivityPeriod)
if err := w.startUploadCurrentSlice(); err != nil {
return trace.Wrap(err)
Expand Down Expand Up @@ -855,6 +855,7 @@ type slice struct {
buffer *bytes.Buffer
isLast bool
lastEventIndex int64
eventCount uint64
}

// reader returns a reader for the bytes written, no writes should be done after this
Expand Down Expand Up @@ -897,18 +898,27 @@ func (s *slice) shouldUpload() bool {
return int64(s.buffer.Len()) >= s.proto.cfg.MinUploadBytes
}

// isEmpty returns true if the slice hasn't had any events written to
// it yet.
func (s *slice) isEmpty() bool {
return s.eventCount == 0
}

// recordEvent emits a single session event to the stream
func (s *slice) recordEvent(event protoEvent) error {
bytes := s.proto.cfg.SlicePool.Get()
defer s.proto.cfg.SlicePool.Put(bytes)

s.eventCount++

messageSize := event.oneof.Size()
recordSize := ProtoStreamV1RecordHeaderSize + messageSize

if len(bytes) < recordSize {
return trace.BadParameter(
"error in buffer allocation, expected size to be >= %v, got %v", recordSize, len(bytes))
}

binary.BigEndian.PutUint32(bytes, uint32(messageSize))
_, err := event.oneof.MarshalTo(bytes[Int32Size:])
if err != nil {
Expand Down
36 changes: 36 additions & 0 deletions lib/events/test/streamsuite.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (

"github.com/gravitational/teleport/lib/events"
"github.com/gravitational/teleport/lib/events/eventstest"
"github.com/gravitational/teleport/lib/fixtures"
"github.com/gravitational/teleport/lib/session"
)

Expand Down Expand Up @@ -153,6 +154,41 @@ func StreamWithPermutedParameters(t *testing.T, handler events.MultipartHandler,
}
}

// StreamEmpty verifies stream upload with zero events gets correctly discarded. This behavior is
// necessary in order to prevent a bug where agents might think they have failed to create a multipart
// upload and create a new one, resulting in duplicate recordings overwriiting each other.
func StreamEmpty(t *testing.T, handler events.MultipartHandler) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

sid := session.NewID()

streamer, err := events.NewProtoStreamer(events.ProtoStreamerConfig{
Uploader: handler,
MinUploadBytes: 1024,
ConcurrentUploads: 2,
})
require.NoError(t, err)

stream, err := streamer.CreateAuditStream(ctx, sid)
require.NoError(t, err)

select {
case status := <-stream.Status():
require.Equal(t, status.LastEventIndex, int64(-1))
case <-time.After(time.Second):
t.Fatalf("Timed out waiting for status update.")
}

require.NoError(t, stream.Complete(ctx))

f, err := os.CreateTemp("", string(sid))
require.NoError(t, err)
defer os.Remove(f.Name())

fixtures.AssertNotFound(t, handler.Download(ctx, sid, f))
}

// StreamWithParameters tests stream upload and subsequent download and reads the results
func StreamWithParameters(t *testing.T, handler events.MultipartHandler, params StreamParams) {
ctx := context.TODO()
Expand Down

0 comments on commit 3c489ef

Please sign in to comment.