Skip to content

Commit

Permalink
Make the uploader responsible for creating the directories it needs
Browse files Browse the repository at this point in the history
This removes a bunch of custom code where we manually create uploader
directories in tests because the full uploader service is not running.
  • Loading branch information
zmb3 committed Dec 4, 2022
1 parent 2314ebf commit c9e64e1
Show file tree
Hide file tree
Showing 14 changed files with 17 additions and 125 deletions.
1 change: 0 additions & 1 deletion integration/utmp_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,6 @@ func newSrvCtx(ctx context.Context, t *testing.T) *SrvCtx {
)
require.NoError(t, err)
s.srv = srv
require.NoError(t, auth.CreateUploaderDirs(nodeDir))
require.NoError(t, s.srv.Start())
return s
}
Expand Down
15 changes: 0 additions & 15 deletions lib/auth/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ import (
"crypto/tls"
"crypto/x509"
"net"
"os"
"path/filepath"
"time"

"github.com/gravitational/trace"
Expand All @@ -35,7 +33,6 @@ import (
"github.com/gravitational/teleport/api/client"
"github.com/gravitational/teleport/api/client/proto"
"github.com/gravitational/teleport/api/constants"
apidefaults "github.com/gravitational/teleport/api/defaults"
"github.com/gravitational/teleport/api/types"
apiutils "github.com/gravitational/teleport/api/utils"
"github.com/gravitational/teleport/lib/auth/keystore"
Expand Down Expand Up @@ -102,18 +99,6 @@ func (cfg *TestAuthServerConfig) CheckAndSetDefaults() error {
return nil
}

// CreateUploaderDirs creates the directory structure for the file uploader service.
func CreateUploaderDirs(dir string) error {
var errs []error

for _, dirname := range []string{events.StreamingSessionsDir, events.CorruptedSessionsDir} {
path := filepath.Join(dir, teleport.LogsDir, teleport.ComponentUpload, dirname, apidefaults.Namespace)
errs = append(errs, trace.ConvertSystemError(os.MkdirAll(path, teleport.SharedDirMode)))
}

return trace.NewAggregate(errs...)
}

// TestServer defines the set of server components for a test
type TestServer struct {
TLS *TestTLSServer
Expand Down
7 changes: 7 additions & 0 deletions lib/events/filesessions/fileasync.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,13 @@ func NewUploader(cfg UploaderConfig) (*Uploader, error) {
return nil, trace.Wrap(err)
}

if err := os.MkdirAll(cfg.ScanDir, teleport.SharedDirMode); err != nil {
return nil, trace.ConvertSystemError(err)
}
if err := os.MkdirAll(cfg.CorruptedDir, teleport.SharedDirMode); err != nil {
return nil, trace.ConvertSystemError(err)
}

uploader := &Uploader{
cfg: cfg,
log: log.WithFields(log.Fields{
Expand Down
4 changes: 1 addition & 3 deletions lib/events/filesessions/filestream.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,7 @@ const minUploadBytes = events.MaxProtoMessageSizeBytes * 2

// NewStreamer creates a streamer sending uploads to disk
func NewStreamer(dir string) (*events.ProtoStreamer, error) {
handler, err := NewHandler(Config{
Directory: dir,
})
handler, err := NewHandler(Config{Directory: dir})
if err != nil {
return nil, trace.Wrap(err)
}
Expand Down
4 changes: 4 additions & 0 deletions lib/events/filesessions/fileuploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ func (s *Config) CheckAndSetDefaults() error {

// NewHandler returns new file sessions handler
func NewHandler(cfg Config) (*Handler, error) {
if err := os.MkdirAll(cfg.Directory, teleport.SharedDirMode); err != nil {
return nil, trace.ConvertSystemError(err)
}

if err := cfg.CheckAndSetDefaults(); err != nil {
return nil, trace.Wrap(err)
}
Expand Down
4 changes: 2 additions & 2 deletions lib/kube/proxy/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -802,10 +802,10 @@ func (f *Forwarder) authorize(ctx context.Context, actx *authContext) error {
// async streamer buffers the events to disk and uploads the events later
func (f *Forwarder) newStreamer(ctx *authContext) (events.Streamer, error) {
if services.IsRecordSync(ctx.recordingConfig.GetMode()) {
f.log.Debugf("Using sync streamer for session.")
f.log.Debug("Using sync streamer for session.")
return f.cfg.AuthClient, nil
}
f.log.Debugf("Using async streamer for session.")
f.log.Debug("Using async streamer for session.")
dir := filepath.Join(
f.cfg.DataDir, teleport.LogsDir, teleport.ComponentUpload,
events.StreamingSessionsDir, apidefaults.Namespace,
Expand Down
22 changes: 0 additions & 22 deletions lib/kube/proxy/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"net"
"net/http"
"net/url"
"os"
"path/filepath"
"testing"
"time"
Expand All @@ -45,7 +44,6 @@ import (
"github.com/gravitational/teleport/lib/auth"
"github.com/gravitational/teleport/lib/auth/native"
"github.com/gravitational/teleport/lib/auth/testauthority"
"github.com/gravitational/teleport/lib/events"
"github.com/gravitational/teleport/lib/events/eventstest"
"github.com/gravitational/teleport/lib/kube/proxy/streamproto"
"github.com/gravitational/teleport/lib/limiter"
Expand Down Expand Up @@ -205,26 +203,6 @@ func setupTestContext(ctx context.Context, t *testing.T, cfg testConfig) *testCo
OnReconcile: cfg.onReconcile,
})
require.NoError(t, err)
// create session recording path
// testCtx.kubeServer.DataDir/log/upload/streaming/default
err = os.MkdirAll(
filepath.Join(
testCtx.kubeServer.DataDir,
teleport.LogsDir,
teleport.ComponentUpload,
events.StreamingSessionsDir,
apidefaults.Namespace,
), os.ModePerm)
require.NoError(t, err)
err = os.MkdirAll(
filepath.Join(
testCtx.kubeServer.DataDir,
teleport.LogsDir,
teleport.ComponentUpload,
events.CorruptedSessionsDir,
apidefaults.Namespace,
), os.ModePerm)
require.NoError(t, err)

// Waits for len(clusters) heartbeats to start
waitForHeartbeats := len(cfg.clusters)
Expand Down
13 changes: 0 additions & 13 deletions lib/srv/app/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"net/http/httptest"
"net/url"
"os"
"path/filepath"
"sort"
"strings"
"sync"
Expand Down Expand Up @@ -276,18 +275,6 @@ func SetUpSuiteWithConfig(t *testing.T, config suiteConfig) *Suite {
// Generate certificate for AWS console application.
s.awsConsoleCertificate = s.generateCertificate(t, s.user, "aws.example.com", "readonly")

// Make sure the upload directory is created.
err = os.MkdirAll(filepath.Join(
s.dataDir, teleport.LogsDir, teleport.ComponentUpload,
events.StreamingSessionsDir, defaults.Namespace,
), 0o755)
require.NoError(t, err)
err = os.MkdirAll(filepath.Join(
s.dataDir, teleport.LogsDir, teleport.ComponentUpload,
events.CorruptedSessionsDir, defaults.Namespace,
), 0o755)
require.NoError(t, err)

lockWatcher, err := services.NewLockWatcher(s.closeContext, services.LockWatcherConfig{
ResourceWatcherConfig: services.ResourceWatcherConfig{
Component: teleport.ComponentApp,
Expand Down
20 changes: 0 additions & 20 deletions lib/srv/db/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package db

import (
"context"
"os"
"path/filepath"

"github.com/gravitational/trace"
Expand All @@ -31,7 +30,6 @@ import (
"github.com/gravitational/teleport/lib/services"
"github.com/gravitational/teleport/lib/session"
"github.com/gravitational/teleport/lib/srv/db/common"
"github.com/gravitational/teleport/lib/utils"
)

// newStreamWriter creates a streamer that will be used to stream the
Expand Down Expand Up @@ -79,24 +77,6 @@ func (s *Server) newStreamer(ctx context.Context, sessionID string, recConfig ty
uploadDir := filepath.Join(
s.cfg.DataDir, teleport.LogsDir, teleport.ComponentUpload,
libevents.StreamingSessionsDir, apidefaults.Namespace)
corruptedDir := filepath.Join(
s.cfg.DataDir, teleport.LogsDir, teleport.ComponentUpload,
libevents.CorruptedSessionsDir, apidefaults.Namespace)

for _, dir := range []string{uploadDir, corruptedDir} {
// Make sure the upload dir exists, otherwise file streamer will fail.
_, err := utils.StatDir(dir)
if err != nil && !trace.IsNotFound(err) {
return nil, trace.Wrap(err)
}
if trace.IsNotFound(err) {
s.log.Debugf("Creating upload dir %v.", uploadDir)
if err := os.MkdirAll(uploadDir, 0755); err != nil {
return nil, trace.Wrap(err)
}
}
}

fileStreamer, err := filesessions.NewStreamer(uploadDir)
if err != nil {
return nil, trace.Wrap(err)
Expand Down
17 changes: 0 additions & 17 deletions lib/srv/desktop/windows_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"crypto/x509"
"fmt"
"net"
"os"
"path/filepath"
"strings"
"sync"
Expand Down Expand Up @@ -407,22 +406,6 @@ func (s *WindowsService) newStreamer(ctx context.Context, recConfig types.Sessio
s.cfg.Log.Debugf("using async streamer (for mode %v)", recConfig.GetMode())
uploadDir := filepath.Join(s.cfg.DataDir, teleport.LogsDir, teleport.ComponentUpload,
libevents.StreamingSessionsDir, apidefaults.Namespace)
corruptedDir := filepath.Join(s.cfg.DataDir, teleport.LogsDir, teleport.ComponentUpload,
libevents.CorruptedSessionsDir, apidefaults.Namespace)

// ensure uploader directories exist
for _, dir := range []string{uploadDir, corruptedDir} {
_, err := utils.StatDir(dir)
if trace.IsNotFound(err) {
s.cfg.Log.Debugf("Creating upload dir %v.", dir)
if err := os.MkdirAll(dir, 0755); err != nil {
return nil, trace.Wrap(err)
}
} else if err != nil {
return nil, trace.Wrap(err)
}
}

fileStreamer, err := filesessions.NewStreamer(uploadDir)
if err != nil {
return nil, trace.Wrap(err)
Expand Down
4 changes: 3 additions & 1 deletion lib/srv/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,13 +136,15 @@ func newMockServer(t *testing.T) *mockServer {

return &mockServer{
auth: authServer,
datadir: t.TempDir(),
MockEmitter: &eventstest.MockEmitter{},
clock: clock,
}
}

type mockServer struct {
*eventstest.MockEmitter
datadir string
auth *auth.Server
component string
clock clockwork.FakeClock
Expand Down Expand Up @@ -187,7 +189,7 @@ func (m *mockServer) GetAccessPoint() AccessPoint {

// GetDataDir returns data directory of the server
func (m *mockServer) GetDataDir() string {
return "testDataDir"
return m.datadir
}

// GetPAM returns PAM configuration for this server.
Expand Down
2 changes: 0 additions & 2 deletions lib/srv/regular/sshserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,6 @@ func newCustomFixture(t *testing.T, mutateCfg func(*auth.TestServerConfig), sshO
nodeClient,
serverOptions...)
require.NoError(t, err)
require.NoError(t, auth.CreateUploaderDirs(nodeDir))
require.NoError(t, sshSrv.Start())
t.Cleanup(func() {
require.NoError(t, sshSrv.Close())
Expand Down Expand Up @@ -1781,7 +1780,6 @@ func TestLimiter(t *testing.T) {
require.NoError(t, err)
require.NoError(t, srv.Start())

require.NoError(t, auth.CreateUploaderDirs(nodeStateDir))
defer srv.Close()

config := &ssh.ClientConfig{
Expand Down
27 changes: 0 additions & 27 deletions lib/srv/sess_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,6 @@ func TestSession_newRecorder(t *testing.T) {
})
require.NoError(t, err)

nodeRecording, err := types.NewSessionRecordingConfigFromConfigFile(types.SessionRecordingConfigSpecV2{
Mode: types.RecordAtNode,
})
require.NoError(t, err)

nodeRecordingSync, err := types.NewSessionRecordingConfigFromConfigFile(types.SessionRecordingConfigSpecV2{
Mode: types.RecordAtNodeSync,
})
Expand Down Expand Up @@ -168,28 +163,6 @@ func TestSession_newRecorder(t *testing.T) {
require.True(t, ok)
},
},
{
desc: "err-new-streamer-fails",
sess: &session{
id: "test",
log: logger,
registry: &SessionRegistry{
SessionRegistryConfig: SessionRegistryConfig{
Srv: &mockServer{
component: teleport.ComponentNode,
},
},
},
},
sctx: &ServerContext{
SessionRecordingConfig: nodeRecording,
srv: &mockServer{
component: teleport.ComponentNode,
},
},
errAssertion: require.Error,
recAssertion: require.Nil,
},
{
desc: "strict-err-new-audit-writer-fails",
sess: &session{
Expand Down
2 changes: 0 additions & 2 deletions lib/web/apiserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,6 @@ func newWebSuiteWithConfig(t *testing.T, cfg webSuiteConfig) *WebSuite {
s.node = node
s.srvID = node.ID()
require.NoError(t, s.node.Start())
require.NoError(t, auth.CreateUploaderDirs(nodeDataDir))

// create reverse tunnel service:
proxyID := "proxy"
Expand Down Expand Up @@ -6052,7 +6051,6 @@ func newWebPack(t *testing.T, numProxies int) *webPack {

require.NoError(t, node.Start())
t.Cleanup(func() { require.NoError(t, node.Close()) })
require.NoError(t, auth.CreateUploaderDirs(nodeDataDir))

var proxies []*testProxy
for p := 0; p < numProxies; p++ {
Expand Down

0 comments on commit c9e64e1

Please sign in to comment.