diff --git a/pkg/session/server.go b/pkg/session/server.go index 18e258e..452c4e8 100644 --- a/pkg/session/server.go +++ b/pkg/session/server.go @@ -3,15 +3,11 @@ package session import ( "errors" "fmt" - "os" - "path/filepath" - "strings" "time" "github.com/gin-gonic/gin" "lion/pkg/common" - "lion/pkg/config" "lion/pkg/guacd" "lion/pkg/jms-sdk-go/model" "lion/pkg/jms-sdk-go/service" @@ -286,7 +282,6 @@ func (s *Server) Create(ctx *gin.Context, opts ...TunnelOption) (sess TunnelSess sess.ConnectedSuccessCallback = s.RegisterConnectedSuccessCallback(jmsSession) sess.ConnectedFailedCallback = s.RegisterConnectedFailedCallback(jmsSession) sess.DisConnectedCallback = s.RegisterDisConnectedCallback(jmsSession) - sess.FinishReplayCallback = s.RegisterFinishReplayCallback(sess) sess.ReleaseAppletAccount = func() error { if opt.appletOpt != nil { return s.JmsService.ReleaseAppletAccount(opt.appletOpt.ID) @@ -368,87 +363,6 @@ func (s *Server) UploadReplayToVideoWorker(tunnel TunnelSession, info guacd.Clie return true } -func (s *Server) RegisterFinishReplayCallback(tunnel TunnelSession) func(guacd.ClientInformation) error { - return func(info guacd.ClientInformation) error { - replayConfig := tunnel.TerminalConfig.ReplayStorage - storageType := replayConfig.TypeName - if storageType == "null" { - logger.Error("录像存储设置为 null,无存储") - reason := model.SessionLifecycleLog{Reason: string(model.ReasonErrNullStorage)} - s.RecordLifecycleLog(tunnel.ID, model.ReplayUploadFailure, reason) - return nil - } - var replayErrReason model.ReplayError - - defer func() { - if replayErrReason != "" { - if err1 := s.JmsService.SessionReplayFailed(tunnel.ID, replayErrReason); err1 != nil { - logger.Errorf("Update %s replay status %s failed err: %s", tunnel.ID, replayErrReason, err1) - } - } - }() - - recordDirPath := filepath.Join(config.GlobalConfig.RecordPath, - tunnel.Created.Format(recordDirTimeFormat)) - originReplayFilePath := filepath.Join(recordDirPath, tunnel.ID) - dstReplayFilePath := originReplayFilePath + ReplayFileNameSuffix - fi, err := os.Stat(originReplayFilePath) - if err != nil { - replayErrReason = model.SessionReplayErrConnectFailed - return err - } - if fi.Size() < 1024 { - logger.Error("录像文件小于1024字节,可判断连接失败,未能产生有效的录像文件") - _ = os.Remove(originReplayFilePath) - replayErrReason = model.SessionReplayErrConnectFailed - return s.JmsService.SessionFailed(tunnel.ID, replayErrReason) - } - // 压缩文件 - err = common.CompressToGzipFile(originReplayFilePath, dstReplayFilePath) - if err != nil { - logger.Error("压缩文件失败: ", err) - replayErrReason = model.SessionReplayErrCreatedFailed - return err - } - // 压缩完成则删除源文件 - defer os.Remove(originReplayFilePath) - - if s.VideoWorkerClient != nil && s.UploadReplayToVideoWorker(tunnel, info, dstReplayFilePath) { - logger.Infof("Upload replay file to video worker: %s", dstReplayFilePath) - _ = os.Remove(dstReplayFilePath) - return nil - } - s.RecordLifecycleLog(tunnel.ID, model.ReplayUploadStart, model.EmptyLifecycleLog) - defaultStorage := storage.ServerStorage{StorageType: "server", JmsService: s.JmsService} - logger.Infof("Upload record file: %s, type: %s", dstReplayFilePath, storageType) - targetName := strings.Join([]string{tunnel.Created.Format(recordDirTimeFormat), - tunnel.ID + ReplayFileNameSuffix}, "/") - if replayStorage := storage.NewReplayStorage(s.JmsService, replayConfig); replayStorage != nil { - if err = replayStorage.Upload(dstReplayFilePath, targetName); err != nil { - logger.Errorf("Upload replay failed: %s", err) - logger.Errorf("Upload replay by type %s failed, try use default", storageType) - err = defaultStorage.Upload(dstReplayFilePath, targetName) - } - } else { - err = defaultStorage.Upload(dstReplayFilePath, targetName) - } - // 上传文件 - if err != nil { - logger.Errorf("Upload replay failed: %s", err.Error()) - replayErrReason = model.SessionReplayErrUploadFailed - reason := model.SessionLifecycleLog{Reason: err.Error()} - s.RecordLifecycleLog(tunnel.ID, model.ReplayUploadFailure, reason) - return err - } - // 上传成功,删除压缩文件 - defer os.Remove(dstReplayFilePath) - // 通知core上传完成 - err = s.JmsService.FinishReply(tunnel.ID) - s.RecordLifecycleLog(tunnel.ID, model.ReplayUploadSuccess, model.EmptyLifecycleLog) - return err - } -} - func (s *Server) RecordLifecycleLog(sid string, event model.LifecycleEvent, logObj model.SessionLifecycleLog) { if err := s.JmsService.RecordSessionLifecycleLog(sid, event, logObj); err != nil { logger.Errorf("Record session %s lifecycle %s log err: %s", sid, event, err) diff --git a/pkg/session/session.go b/pkg/session/session.go index 321f165..2b69212 100644 --- a/pkg/session/session.go +++ b/pkg/session/session.go @@ -32,8 +32,6 @@ type TunnelSession struct { ConnectedFailedCallback func(err error) error `json:"-"` DisConnectedCallback func() error `json:"-"` - FinishReplayCallback func(guacd.ClientInformation) error `json:"-"` - ReleaseAppletAccount func() error `json:"-"` ModelSession *model.Session `json:"-"` diff --git a/pkg/tunnel/replay_part_upload.go b/pkg/tunnel/replay_part_upload.go index 26f5f37..7e9900c 100644 --- a/pkg/tunnel/replay_part_upload.go +++ b/pkg/tunnel/replay_part_upload.go @@ -199,10 +199,12 @@ func (p *PartUploader) uploadToStorage(uploadPath string) { return } //defaultStorage := storage.ServerStorage{StorageType: "server", JmsService: p.apiClient} + p.RecordLifecycleLog(model.ReplayUploadStart, model.EmptyLifecycleLog) replayStorage := p.GetStorage() + storageType := replayStorage.TypeName() dateRoot := p.replayMeta.DateStart.Format(recordDirTimeFormat) targetRoot := strings.Join([]string{dateRoot, p.SessionId}, "/") - logger.Debugf("PartUploader %s upload replay files: %v", p.SessionId, uploadFiles) + logger.Infof("PartUploader %s upload replay files: %v, type: %s", p.SessionId, uploadFiles, storageType) for _, uploadFile := range uploadFiles { if uploadFile.IsDir() { continue @@ -211,6 +213,8 @@ func (p *PartUploader) uploadToStorage(uploadPath string) { targetFile := strings.Join([]string{targetRoot, uploadFile.Name()}, "/") if err1 := replayStorage.Upload(uploadFilePath, targetFile); err1 != nil { logger.Errorf("PartUploader %s upload file %s error: %v", p.SessionId, uploadFilePath, err1) + reason := model.SessionLifecycleLog{Reason: err1.Error()} + p.RecordLifecycleLog(model.ReplayUploadFailure, reason) return } logger.Debugf("PartUploader %s upload file %s success", p.SessionId, uploadFilePath) @@ -219,8 +223,10 @@ func (p *PartUploader) uploadToStorage(uploadPath string) { logger.Errorf("PartUploader %s finish replay error: %v", p.SessionId, err) return } + + p.RecordLifecycleLog(model.ReplayUploadSuccess, model.EmptyLifecycleLog) logger.Infof("PartUploader %s upload replay success", p.SessionId) - if err := os.RemoveAll(p.RootPath); err != nil { + if err = os.RemoveAll(p.RootPath); err != nil { logger.Errorf("PartUploader %s remove root path %s error: %v", p.SessionId, p.RootPath, err) return } @@ -228,6 +234,12 @@ func (p *PartUploader) uploadToStorage(uploadPath string) { } +func (p *PartUploader) RecordLifecycleLog(event model.LifecycleEvent, logObj model.SessionLifecycleLog) { + if err := p.ApiClient.RecordSessionLifecycleLog(p.SessionId, event, logObj); err != nil { + logger.Errorf("Record session %s lifecycle %s log err: %s", p.SessionId, event, err) + } +} + func ReadInstruction(r *bufio.Reader) (guacd.Instruction, error) { var ret strings.Builder for { diff --git a/pkg/tunnel/server.go b/pkg/tunnel/server.go index 1bfd6bf..781940b 100644 --- a/pkg/tunnel/server.go +++ b/pkg/tunnel/server.go @@ -274,9 +274,6 @@ func (g *GuacamoleTunnelServer) Connect(ctx *gin.Context) { if err = tunnelSession.DisConnectedCallback(); err != nil { logger.Errorf("Session DisConnectedCallback err: %+v", err) } - if err = tunnelSession.FinishReplayCallback(info); err != nil { - logger.Errorf("Session Replay upload err: %+v", err) - } logger.Infof("Session[%s] disconnect", sessionId) }