Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: support to upload and write part replay #355

Merged
merged 1 commit into from
Sep 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ func registerRouter(jmsService *service.JMService, tunnelService *tunnel.Guacamo

wsGroup.Group("/token").Use(
middleware.SessionAuth(jmsService)).GET("/", tunnelService.Connect)

}

{
Expand Down Expand Up @@ -316,9 +317,11 @@ func registerRouter(jmsService *service.JMService, tunnelService *tunnel.Guacamo
func bootstrap(jmsService *service.JMService) {
replayDir := config.GlobalConfig.RecordPath
ftpFilePath := config.GlobalConfig.FTPFilePath
sessionDir := config.GlobalConfig.SessionFolderPath
allRemainFiles := scanRemainReplay(jmsService, replayDir)
go uploadRemainReplay(jmsService, allRemainFiles)
go uploadRemainFTPFile(jmsService, ftpFilePath)
go uploadRemainSessionPartReplay(jmsService, sessionDir)
}

func uploadRemainFTPFile(jmsService *service.JMService, fileStoreDir string) {
Expand Down Expand Up @@ -434,6 +437,30 @@ func scanRemainReplay(jmsService *service.JMService, replayDir string) map[strin
return allRemainFiles
}

func uploadRemainSessionPartReplay(jmsService *service.JMService, sessionDir string) {
sessions, err := os.ReadDir(sessionDir)
if err != nil {
logger.Errorf("Read session dir failed: %s", err)
return
}
terminalConf, _ := jmsService.GetTerminalConfig()
for _, sessionEntry := range sessions {
sessionId := sessionEntry.Name()
if !common.ValidUUIDString(sessionId) {
continue
}
sessionRootPath := filepath.Join(sessionDir, sessionId)
uploader := tunnel.PartUploader{
RootPath: sessionRootPath,
SessionId: sessionId,
ApiClient: jmsService,
TermCfg: &terminalConf,
}
uploader.Start()
logger.Infof("Upload remain session part replay %s finish", sessionId)
}
}

func GetStatusData(tunnelCache *tunnel.GuaTunnelCacheManager) interface{} {
sids := tunnelCache.RangeActiveSessionIds()
payload := model.HeartbeatData{
Expand Down
11 changes: 10 additions & 1 deletion pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type Config struct {
LogDirPath string
AccessKeyFilePath string
CertsFolderPath string
SessionFolderPath string

Name string `mapstructure:"NAME"`
CoreHost string `mapstructure:"CORE_HOST"`
Expand Down Expand Up @@ -56,6 +57,8 @@ type Config struct {
IgnoreVerifyCerts bool `mapstructure:"IGNORE_VERIFY_CERTS"`
PandaHost string `mapstructure:"PANDA_HOST"`
EnablePanda bool `mapstructure:"ENABLE_PANDA"`

ReplayMaxSize int `mapstructure:"REPLAY_MAX_SIZE"`
}

func (c *Config) SelectGuacdAddr() string {
Expand All @@ -81,14 +84,15 @@ func getDefaultConfig() Config {
dataFolderPath := filepath.Join(rootPath, "data")
driveFolderPath := filepath.Join(dataFolderPath, "drive")
recordFolderPath := filepath.Join(dataFolderPath, "replays")
sessionsPath := filepath.Join(dataFolderPath, "sessions")
ftpFileFolderPath := filepath.Join(dataFolderPath, "ftp_files")
LogDirPath := filepath.Join(dataFolderPath, "logs")
keyFolderPath := filepath.Join(dataFolderPath, "keys")
CertsFolderPath := filepath.Join(dataFolderPath, "certs")
accessKeyFilePath := filepath.Join(keyFolderPath, ".access_key")

folders := []string{dataFolderPath, driveFolderPath, recordFolderPath,
keyFolderPath, LogDirPath, CertsFolderPath}
keyFolderPath, LogDirPath, CertsFolderPath, sessionsPath}
for i := range folders {
if err := EnsureDirExist(folders[i]); err != nil {
log.Fatalf("Create folder failed: %s", err.Error())
Expand All @@ -103,6 +107,7 @@ func getDefaultConfig() Config {
DrivePath: driveFolderPath,
CertsFolderPath: CertsFolderPath,
AccessKeyFilePath: accessKeyFilePath,
SessionFolderPath: sessionsPath,
CoreHost: "http://localhost:8080",
BootstrapToken: "",
BindHost: "0.0.0.0",
Expand All @@ -116,10 +121,14 @@ func getDefaultConfig() Config {
EnableRemoteAPPCopyPaste: false,
CleanDriveScheduleTime: 1,
PandaHost: "http://localhost:9001",
ReplayMaxSize: defaultMaxSize,
}

}

// 300MB
const defaultMaxSize = 1024 * 1024 * 300

func EnsureDirExist(path string) error {
if !haveDir(path) {
if err := os.MkdirAll(path, os.ModePerm); err != nil {
Expand Down
10 changes: 10 additions & 0 deletions pkg/guacd/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,13 @@ func (conf *Configuration) UnSetParameter(name string) {
func (conf *Configuration) GetParameter(name string) string {
return conf.Parameters[name]
}

func (conf *Configuration) Clone() Configuration {
newConf := NewConfiguration()
newConf.ConnectionID = conf.ConnectionID
newConf.Protocol = conf.Protocol
for k, v := range conf.Parameters {
newConf.Parameters[k] = v
}
return newConf
}
11 changes: 11 additions & 0 deletions pkg/guacd/information.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,14 @@ func (info *ClientInformation) ExtraConfig() map[string]string {
}
return ret
}

func (info *ClientInformation) Clone() ClientInformation {
return ClientInformation{
OptimalScreenWidth: info.OptimalScreenWidth,
OptimalScreenHeight: info.OptimalScreenHeight,
OptimalResolution: info.OptimalResolution,
ImageMimetypes: []string{"image/jpeg", "image/png", "image/webp"},
Timezone: info.Timezone,
KeyboardLayout: info.KeyboardLayout,
}
}
2 changes: 0 additions & 2 deletions pkg/guacd/instruction_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package guacd

import (
"errors"
"testing"
)

Expand All @@ -28,7 +27,6 @@ func TestValidateInstructionString(t *testing.T) {
for i := range tests {
ins, err := ParseInstructionString(tests[i])
if err != nil {
t.Log(errors.As(err, &ErrInstructionBadDigit))
t.Log(err)
continue
}
Expand Down
9 changes: 9 additions & 0 deletions pkg/guacd/parameters.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,12 @@ const (
WolBroadcastAddr = "wol-broadcast-addr"
WolWaitTime = "wol-wait-time"
)

const (
READONLY = "read-only"
)

const (
BoolFalse = "false"
BoolTrue = "true"
)
1 change: 1 addition & 0 deletions pkg/guacd/tunnel.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ func NewTunnel(address string, config Configuration, info ClientInformation) (tu

tunnel.uuid = ready.Args[0]
tunnel.IsOpen = true
tunnel.Config = config
return tunnel, nil
}

Expand Down
17 changes: 11 additions & 6 deletions pkg/jms-sdk-go/model/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,25 @@ const (
UnKnown ReplayVersion = ""
Version2 ReplayVersion = "2"
Version3 ReplayVersion = "3"
Version5 ReplayVersion = "5"
)

const (
SuffixReplayGz = ".replay.gz"
SuffixCastGz = ".cast.gz"
SuffixReplayGz = ".replay.gz"
SuffixCastGz = ".cast.gz"
SuffixPartReplay = ".part.gz"
SuffixReplayJson = ".replay.json"
)

var SuffixMap = map[ReplayVersion]string{
Version2: SuffixReplayGz,
Version3: SuffixCastGz,
var SuffixVersionMap = map[string]ReplayVersion{
SuffixPartReplay: Version5,
SuffixReplayJson: Version5,
SuffixReplayGz: Version2,
SuffixCastGz: Version3,
}

func ParseReplayVersion(gzFile string, defaultValue ReplayVersion) ReplayVersion {
for version, suffix := range SuffixMap {
for suffix, version := range SuffixVersionMap {
if strings.HasSuffix(gzFile, suffix) {
return version
}
Expand Down
44 changes: 22 additions & 22 deletions pkg/session/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,14 @@ func (r RDPConfiguration) GetGuacdConfiguration() guacd.Configuration {
conf.SetParameter(guacd.RDPDomain, adDomain)
}

// 设置 录像路径
if r.TerminalConfig.ReplayStorage.TypeName != "null" {
recordDirPath := filepath.Join(config.GlobalConfig.RecordPath,
r.Created.Format(recordDirTimeFormat))
conf.SetParameter(guacd.RecordingPath, recordDirPath)
conf.SetParameter(guacd.CreateRecordingPath, BoolTrue)
conf.SetParameter(guacd.RecordingName, r.SessionId)
}
//// 设置 录像路径
//if r.TerminalConfig.ReplayStorage.TypeName != "null" {
// recordDirPath := filepath.Join(config.GlobalConfig.RecordPath,
// r.Created.Format(recordDirTimeFormat))
// conf.SetParameter(guacd.RecordingPath, recordDirPath)
// conf.SetParameter(guacd.CreateRecordingPath, BoolTrue)
// conf.SetParameter(guacd.RecordingName, r.SessionId)
//}

// display 相关
{
Expand Down Expand Up @@ -181,13 +181,13 @@ func (r VNCConfiguration) GetGuacdConfiguration() guacd.Configuration {
conf.SetParameter(guacd.VNCAutoretry, "3")
}
// 设置存储
replayCfg := r.TerminalConfig.ReplayStorage
if replayCfg.TypeName != "null" {
recordDirPath := filepath.Join(config.GlobalConfig.RecordPath, r.Created.Format(recordDirTimeFormat))
conf.SetParameter(guacd.RecordingPath, recordDirPath)
conf.SetParameter(guacd.CreateRecordingPath, BoolTrue)
conf.SetParameter(guacd.RecordingName, r.SessionId)
}
//replayCfg := r.TerminalConfig.ReplayStorage
//if replayCfg.TypeName != "null" {
// recordDirPath := filepath.Join(config.GlobalConfig.RecordPath, r.Created.Format(recordDirTimeFormat))
// conf.SetParameter(guacd.RecordingPath, recordDirPath)
// conf.SetParameter(guacd.CreateRecordingPath, BoolTrue)
// conf.SetParameter(guacd.RecordingName, r.SessionId)
//}
{
for key, value := range VNCDisplay.GetDisplayParams() {
conf.SetParameter(key, value)
Expand Down Expand Up @@ -247,13 +247,13 @@ func (r VirtualAppConfiguration) GetGuacdConfiguration() guacd.Configuration {
conf.SetParameter(guacd.VNCAutoretry, "10")
}
// 设置存储
replayCfg := r.TerminalConfig.ReplayStorage
if replayCfg.TypeName != "null" {
recordDirPath := filepath.Join(config.GlobalConfig.RecordPath, r.Created.Format(recordDirTimeFormat))
conf.SetParameter(guacd.RecordingPath, recordDirPath)
conf.SetParameter(guacd.CreateRecordingPath, BoolTrue)
conf.SetParameter(guacd.RecordingName, r.SessionId)
}
//replayCfg := r.TerminalConfig.ReplayStorage
//if replayCfg.TypeName != "null" {
// recordDirPath := filepath.Join(config.GlobalConfig.RecordPath, r.Created.Format(recordDirTimeFormat))
// conf.SetParameter(guacd.RecordingPath, recordDirPath)
// conf.SetParameter(guacd.CreateRecordingPath, BoolTrue)
// conf.SetParameter(guacd.RecordingName, r.SessionId)
//}
{
for key, value := range VNCDisplay.GetDisplayParams() {
conf.SetParameter(key, value)
Expand Down
87 changes: 1 addition & 86 deletions pkg/session/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,11 @@ package session
import (
"errors"
"fmt"
"os"
"path/filepath"
"strings"
"time"

"github.com/gin-gonic/gin"

"lion/pkg/common"
"lion/pkg/config"
"lion/pkg/guacd"
"lion/pkg/jms-sdk-go/model"
"lion/pkg/jms-sdk-go/service"
Expand Down Expand Up @@ -281,11 +277,11 @@ func (s *Server) Create(ctx *gin.Context, opts ...TunnelOption) (sess TunnelSess
AccountID: opt.Account.ID,
Comment: comment,
}
sess.ModelSession = &jmsSession
sess.ConnectedCallback = s.RegisterConnectedCallback(jmsSession)
sess.ConnectedSuccessCallback = s.RegisterConnectedSuccessCallback(jmsSession)
sess.ConnectedFailedCallback = s.RegisterConnectedFailedCallback(jmsSession)
sess.DisConnectedCallback = s.RegisterDisConnectedCallback(jmsSession)
sess.FinishReplayCallback = s.RegisterFinishReplayCallback(sess)
sess.ReleaseAppletAccount = func() error {
if opt.appletOpt != nil {
return s.JmsService.ReleaseAppletAccount(opt.appletOpt.ID)
Expand Down Expand Up @@ -367,87 +363,6 @@ func (s *Server) UploadReplayToVideoWorker(tunnel TunnelSession, info guacd.Clie
return true
}

func (s *Server) RegisterFinishReplayCallback(tunnel TunnelSession) func(guacd.ClientInformation) error {
return func(info guacd.ClientInformation) error {
replayConfig := tunnel.TerminalConfig.ReplayStorage
storageType := replayConfig.TypeName
if storageType == "null" {
logger.Error("录像存储设置为 null,无存储")
reason := model.SessionLifecycleLog{Reason: string(model.ReasonErrNullStorage)}
s.RecordLifecycleLog(tunnel.ID, model.ReplayUploadFailure, reason)
return nil
}
var replayErrReason model.ReplayError

defer func() {
if replayErrReason != "" {
if err1 := s.JmsService.SessionReplayFailed(tunnel.ID, replayErrReason); err1 != nil {
logger.Errorf("Update %s replay status %s failed err: %s", tunnel.ID, replayErrReason, err1)
}
}
}()

recordDirPath := filepath.Join(config.GlobalConfig.RecordPath,
tunnel.Created.Format(recordDirTimeFormat))
originReplayFilePath := filepath.Join(recordDirPath, tunnel.ID)
dstReplayFilePath := originReplayFilePath + ReplayFileNameSuffix
fi, err := os.Stat(originReplayFilePath)
if err != nil {
replayErrReason = model.SessionReplayErrConnectFailed
return err
}
if fi.Size() < 1024 {
logger.Error("录像文件小于1024字节,可判断连接失败,未能产生有效的录像文件")
_ = os.Remove(originReplayFilePath)
replayErrReason = model.SessionReplayErrConnectFailed
return s.JmsService.SessionFailed(tunnel.ID, replayErrReason)
}
// 压缩文件
err = common.CompressToGzipFile(originReplayFilePath, dstReplayFilePath)
if err != nil {
logger.Error("压缩文件失败: ", err)
replayErrReason = model.SessionReplayErrCreatedFailed
return err
}
// 压缩完成则删除源文件
defer os.Remove(originReplayFilePath)

if s.VideoWorkerClient != nil && s.UploadReplayToVideoWorker(tunnel, info, dstReplayFilePath) {
logger.Infof("Upload replay file to video worker: %s", dstReplayFilePath)
_ = os.Remove(dstReplayFilePath)
return nil
}
s.RecordLifecycleLog(tunnel.ID, model.ReplayUploadStart, model.EmptyLifecycleLog)
defaultStorage := storage.ServerStorage{StorageType: "server", JmsService: s.JmsService}
logger.Infof("Upload record file: %s, type: %s", dstReplayFilePath, storageType)
targetName := strings.Join([]string{tunnel.Created.Format(recordDirTimeFormat),
tunnel.ID + ReplayFileNameSuffix}, "/")
if replayStorage := storage.NewReplayStorage(s.JmsService, replayConfig); replayStorage != nil {
if err = replayStorage.Upload(dstReplayFilePath, targetName); err != nil {
logger.Errorf("Upload replay failed: %s", err)
logger.Errorf("Upload replay by type %s failed, try use default", storageType)
err = defaultStorage.Upload(dstReplayFilePath, targetName)
}
} else {
err = defaultStorage.Upload(dstReplayFilePath, targetName)
}
// 上传文件
if err != nil {
logger.Errorf("Upload replay failed: %s", err.Error())
replayErrReason = model.SessionReplayErrUploadFailed
reason := model.SessionLifecycleLog{Reason: err.Error()}
s.RecordLifecycleLog(tunnel.ID, model.ReplayUploadFailure, reason)
return err
}
// 上传成功,删除压缩文件
defer os.Remove(dstReplayFilePath)
// 通知core上传完成
err = s.JmsService.FinishReply(tunnel.ID)
s.RecordLifecycleLog(tunnel.ID, model.ReplayUploadSuccess, model.EmptyLifecycleLog)
return err
}
}

func (s *Server) RecordLifecycleLog(sid string, event model.LifecycleEvent, logObj model.SessionLifecycleLog) {
if err := s.JmsService.RecordSessionLifecycleLog(sid, event, logObj); err != nil {
logger.Errorf("Record session %s lifecycle %s log err: %s", sid, event, err)
Expand Down
4 changes: 2 additions & 2 deletions pkg/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ type TunnelSession struct {
ConnectedFailedCallback func(err error) error `json:"-"`
DisConnectedCallback func() error `json:"-"`

FinishReplayCallback func(guacd.ClientInformation) error `json:"-"`

ReleaseAppletAccount func() error `json:"-"`

ModelSession *model.Session `json:"-"`
}

const (
Expand Down
Loading