Skip to content

Commit

Permalink
perf: update session lifecycle log
Browse files Browse the repository at this point in the history
  • Loading branch information
LeeEirc committed Sep 10, 2024
1 parent 00a2a38 commit 01f731a
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 93 deletions.
86 changes: 0 additions & 86 deletions pkg/session/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,11 @@ package session
import (
"errors"
"fmt"
"os"
"path/filepath"
"strings"
"time"

"github.com/gin-gonic/gin"

"lion/pkg/common"
"lion/pkg/config"
"lion/pkg/guacd"
"lion/pkg/jms-sdk-go/model"
"lion/pkg/jms-sdk-go/service"
Expand Down Expand Up @@ -286,7 +282,6 @@ func (s *Server) Create(ctx *gin.Context, opts ...TunnelOption) (sess TunnelSess
sess.ConnectedSuccessCallback = s.RegisterConnectedSuccessCallback(jmsSession)
sess.ConnectedFailedCallback = s.RegisterConnectedFailedCallback(jmsSession)
sess.DisConnectedCallback = s.RegisterDisConnectedCallback(jmsSession)
sess.FinishReplayCallback = s.RegisterFinishReplayCallback(sess)
sess.ReleaseAppletAccount = func() error {
if opt.appletOpt != nil {
return s.JmsService.ReleaseAppletAccount(opt.appletOpt.ID)
Expand Down Expand Up @@ -368,87 +363,6 @@ func (s *Server) UploadReplayToVideoWorker(tunnel TunnelSession, info guacd.Clie
return true
}

func (s *Server) RegisterFinishReplayCallback(tunnel TunnelSession) func(guacd.ClientInformation) error {
return func(info guacd.ClientInformation) error {
replayConfig := tunnel.TerminalConfig.ReplayStorage
storageType := replayConfig.TypeName
if storageType == "null" {
logger.Error("录像存储设置为 null,无存储")
reason := model.SessionLifecycleLog{Reason: string(model.ReasonErrNullStorage)}
s.RecordLifecycleLog(tunnel.ID, model.ReplayUploadFailure, reason)
return nil
}
var replayErrReason model.ReplayError

defer func() {
if replayErrReason != "" {
if err1 := s.JmsService.SessionReplayFailed(tunnel.ID, replayErrReason); err1 != nil {
logger.Errorf("Update %s replay status %s failed err: %s", tunnel.ID, replayErrReason, err1)
}
}
}()

recordDirPath := filepath.Join(config.GlobalConfig.RecordPath,
tunnel.Created.Format(recordDirTimeFormat))
originReplayFilePath := filepath.Join(recordDirPath, tunnel.ID)
dstReplayFilePath := originReplayFilePath + ReplayFileNameSuffix
fi, err := os.Stat(originReplayFilePath)
if err != nil {
replayErrReason = model.SessionReplayErrConnectFailed
return err
}
if fi.Size() < 1024 {
logger.Error("录像文件小于1024字节,可判断连接失败,未能产生有效的录像文件")
_ = os.Remove(originReplayFilePath)
replayErrReason = model.SessionReplayErrConnectFailed
return s.JmsService.SessionFailed(tunnel.ID, replayErrReason)
}
// 压缩文件
err = common.CompressToGzipFile(originReplayFilePath, dstReplayFilePath)
if err != nil {
logger.Error("压缩文件失败: ", err)
replayErrReason = model.SessionReplayErrCreatedFailed
return err
}
// 压缩完成则删除源文件
defer os.Remove(originReplayFilePath)

if s.VideoWorkerClient != nil && s.UploadReplayToVideoWorker(tunnel, info, dstReplayFilePath) {
logger.Infof("Upload replay file to video worker: %s", dstReplayFilePath)
_ = os.Remove(dstReplayFilePath)
return nil
}
s.RecordLifecycleLog(tunnel.ID, model.ReplayUploadStart, model.EmptyLifecycleLog)
defaultStorage := storage.ServerStorage{StorageType: "server", JmsService: s.JmsService}
logger.Infof("Upload record file: %s, type: %s", dstReplayFilePath, storageType)
targetName := strings.Join([]string{tunnel.Created.Format(recordDirTimeFormat),
tunnel.ID + ReplayFileNameSuffix}, "/")
if replayStorage := storage.NewReplayStorage(s.JmsService, replayConfig); replayStorage != nil {
if err = replayStorage.Upload(dstReplayFilePath, targetName); err != nil {
logger.Errorf("Upload replay failed: %s", err)
logger.Errorf("Upload replay by type %s failed, try use default", storageType)
err = defaultStorage.Upload(dstReplayFilePath, targetName)
}
} else {
err = defaultStorage.Upload(dstReplayFilePath, targetName)
}
// 上传文件
if err != nil {
logger.Errorf("Upload replay failed: %s", err.Error())
replayErrReason = model.SessionReplayErrUploadFailed
reason := model.SessionLifecycleLog{Reason: err.Error()}
s.RecordLifecycleLog(tunnel.ID, model.ReplayUploadFailure, reason)
return err
}
// 上传成功,删除压缩文件
defer os.Remove(dstReplayFilePath)
// 通知core上传完成
err = s.JmsService.FinishReply(tunnel.ID)
s.RecordLifecycleLog(tunnel.ID, model.ReplayUploadSuccess, model.EmptyLifecycleLog)
return err
}
}

func (s *Server) RecordLifecycleLog(sid string, event model.LifecycleEvent, logObj model.SessionLifecycleLog) {
if err := s.JmsService.RecordSessionLifecycleLog(sid, event, logObj); err != nil {
logger.Errorf("Record session %s lifecycle %s log err: %s", sid, event, err)
Expand Down
2 changes: 0 additions & 2 deletions pkg/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ type TunnelSession struct {
ConnectedFailedCallback func(err error) error `json:"-"`
DisConnectedCallback func() error `json:"-"`

FinishReplayCallback func(guacd.ClientInformation) error `json:"-"`

ReleaseAppletAccount func() error `json:"-"`

ModelSession *model.Session `json:"-"`
Expand Down
16 changes: 14 additions & 2 deletions pkg/tunnel/replay_part_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,10 +199,12 @@ func (p *PartUploader) uploadToStorage(uploadPath string) {
return
}
//defaultStorage := storage.ServerStorage{StorageType: "server", JmsService: p.apiClient}
p.RecordLifecycleLog(model.ReplayUploadStart, model.EmptyLifecycleLog)
replayStorage := p.GetStorage()
storageType := replayStorage.TypeName()
dateRoot := p.replayMeta.DateStart.Format(recordDirTimeFormat)
targetRoot := strings.Join([]string{dateRoot, p.SessionId}, "/")
logger.Debugf("PartUploader %s upload replay files: %v", p.SessionId, uploadFiles)
logger.Infof("PartUploader %s upload replay files: %v, type: %s", p.SessionId, uploadFiles, storageType)
for _, uploadFile := range uploadFiles {
if uploadFile.IsDir() {
continue
Expand All @@ -211,6 +213,8 @@ func (p *PartUploader) uploadToStorage(uploadPath string) {
targetFile := strings.Join([]string{targetRoot, uploadFile.Name()}, "/")
if err1 := replayStorage.Upload(uploadFilePath, targetFile); err1 != nil {
logger.Errorf("PartUploader %s upload file %s error: %v", p.SessionId, uploadFilePath, err1)
reason := model.SessionLifecycleLog{Reason: err1.Error()}
p.RecordLifecycleLog(model.ReplayUploadFailure, reason)
return
}
logger.Debugf("PartUploader %s upload file %s success", p.SessionId, uploadFilePath)
Expand All @@ -219,15 +223,23 @@ func (p *PartUploader) uploadToStorage(uploadPath string) {
logger.Errorf("PartUploader %s finish replay error: %v", p.SessionId, err)
return
}

p.RecordLifecycleLog(model.ReplayUploadSuccess, model.EmptyLifecycleLog)
logger.Infof("PartUploader %s upload replay success", p.SessionId)
if err := os.RemoveAll(p.RootPath); err != nil {
if err = os.RemoveAll(p.RootPath); err != nil {
logger.Errorf("PartUploader %s remove root path %s error: %v", p.SessionId, p.RootPath, err)
return
}
logger.Infof("PartUploader %s remove root path %s success", p.SessionId, p.RootPath)

}

func (p *PartUploader) RecordLifecycleLog(event model.LifecycleEvent, logObj model.SessionLifecycleLog) {
if err := p.ApiClient.RecordSessionLifecycleLog(p.SessionId, event, logObj); err != nil {
logger.Errorf("Record session %s lifecycle %s log err: %s", p.SessionId, event, err)
}
}

func ReadInstruction(r *bufio.Reader) (guacd.Instruction, error) {
var ret strings.Builder
for {
Expand Down
3 changes: 0 additions & 3 deletions pkg/tunnel/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,9 +274,6 @@ func (g *GuacamoleTunnelServer) Connect(ctx *gin.Context) {
if err = tunnelSession.DisConnectedCallback(); err != nil {
logger.Errorf("Session DisConnectedCallback err: %+v", err)
}
if err = tunnelSession.FinishReplayCallback(info); err != nil {
logger.Errorf("Session Replay upload err: %+v", err)
}
logger.Infof("Session[%s] disconnect", sessionId)
}

Expand Down

0 comments on commit 01f731a

Please sign in to comment.