Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dev: optimized tty plugin and AIS #112

Merged
merged 5 commits into from
Jun 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 78 additions & 6 deletions device/generic_ais_txrx_device.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
package device

import (
"bufio"
"fmt"
"net"
"strings"

"github.com/hootrhino/rulex/glogger"
"github.com/hootrhino/rulex/typex"
"github.com/hootrhino/rulex/utils"
)
Expand Down Expand Up @@ -53,14 +59,16 @@ type _AISDevicePacket struct {

// --------------------------------------------------------------------------------------------------
type _AISDeviceConfig struct {
Host string `json:"host" validate:"required" title:"服务地址"`
Port int `json:"port" validate:"required" title:"服务端口"`
Mode string `json:"mode"` // TCP UDP UART
Host string `json:"host" validate:"required"`
Port int `json:"port" validate:"required"`
}
type AISDevice struct {
typex.XStatus
status typex.DeviceState
mainConfig _AISDeviceConfig
RuleEngine typex.RuleX
status typex.DeviceState
mainConfig _AISDeviceConfig
RuleEngine typex.RuleX
tcpListener net.Listener // TCP 接收端
}

/*
Expand All @@ -71,7 +79,11 @@ type AISDevice struct {
func NewAISDevice(e typex.RuleX) typex.XDevice {
aisd := new(AISDevice)
aisd.RuleEngine = e
aisd.mainConfig = _AISDeviceConfig{}
aisd.mainConfig = _AISDeviceConfig{
Mode: "TCP",
Host: "0.0.0.0",
Port: 2600,
}
return aisd
}

Expand All @@ -90,6 +102,15 @@ func (aisd *AISDevice) Start(cctx typex.CCTX) error {
aisd.Ctx = cctx.Ctx
aisd.CancelCTX = cctx.CancelCTX
//
listener, err := net.Listen("tcp",
fmt.Sprintf("%s%v", aisd.mainConfig.Host, aisd.mainConfig.Port))
if err != nil {
return err
}
aisd.tcpListener = listener
glogger.GLogger.Infof("AIS TCP server started on http://%s:%v",
aisd.mainConfig.Host, aisd.mainConfig.Port)
go aisd.handleConnect(listener)
return nil
}

Expand All @@ -112,6 +133,9 @@ func (aisd *AISDevice) Status() typex.DeviceState {
func (aisd *AISDevice) Stop() {
aisd.status = typex.DEV_DOWN
aisd.CancelCTX()
if aisd.tcpListener != nil {
aisd.tcpListener.Close()
}
}

// 设备属性,是一系列属性描述
Expand Down Expand Up @@ -147,3 +171,51 @@ func (aisd *AISDevice) OnDCACall(UUID string, Command string, Args interface{})
func (aisd *AISDevice) OnCtrl(cmd []byte, args []byte) ([]byte, error) {
return []byte{}, nil
}

//--------------------------------------------------------------------------------------------------
// 内部
//--------------------------------------------------------------------------------------------------
/*
*
* 处理连接
*
*/
func (aisd *AISDevice) handleConnect(listener net.Listener) {
for {
select {
case <-aisd.Ctx.Done():
{
return
}
default:
{
}
}
tcpcon, err := listener.Accept()
if err != nil {
glogger.GLogger.Error(err)
continue
}
go aisd.handleIO(tcpcon)
}

}

/*
*
* 数据处理
*
*/
func (aisd *AISDevice) handleIO(conn net.Conn) {
defer conn.Close()
reader := bufio.NewReader(conn)

s, err := reader.ReadString('\n')
if err != nil {
glogger.GLogger.Error(err)
return
}
if strings.HasSuffix(s, "\r\n") {
glogger.GLogger.Debug("Received data:", s)
}
}
96 changes: 96 additions & 0 deletions device/generic_camera_stream_linux_mipsle.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package device

import (
"fmt"

"github.com/hootrhino/rulex/typex"
)

// RTSP URL格式= rtsp://<username>:<password>@<ip>:<port>,
type _MainConfig struct {
MaxThread int `json:"maxThread"` // 最大连接数, 防止连接过多导致摄像头拉流失败
InputMode string `json:"inputMode"` // 视频输入模式:RTSP | LOCAL
Device string `json:"device"` // 本地视频设备路径,在输入模式=LOCAL时生效
RtspUrl string `json:"rtspUrl"` // 远程视频设备地址,在输入模式=RTSP时生效
OutputMode string `json:"outputMode"` // 输出模式:JPEG_STREAM | RTSP_STREAM
OutputAddr string `json:"outputAddr"` // 输出地址, 格式为: "Ip:Port",例如127.0.0.1:7890
}

// 摄像头
type videoCamera struct {
typex.XStatus
status typex.DeviceState
mainConfig _MainConfig
}

func NewVideoCamera(e typex.RuleX) typex.XDevice {
videoCamera := new(videoCamera)
videoCamera.RuleEngine = e
videoCamera.status = typex.DEV_DOWN
videoCamera.mainConfig = _MainConfig{
Device: "video0",
RtspUrl: "rtsp://127.0.0.1",
InputMode: "LOCAL",
OutputMode: "JPEG_STREAM",
}
return videoCamera
}

// 初始化 通常用来获取设备的配置
func (vc *videoCamera) Init(devId string, configMap map[string]interface{}) error {
return fmt.Errorf("video camera not support windows")
}

// 启动, 设备的工作进程
func (vc *videoCamera) Start(cctx typex.CCTX) error {
vc.status = typex.DEV_UP
return nil
}

func (vc *videoCamera) OnRead(cmd []byte, data []byte) (int, error) {
return 0, nil
}

func (vc *videoCamera) OnWrite(cmd []byte, data []byte) (int, error) {
return 0, nil
}

/*
*
* 外部指令,未来可以实现一些对摄像头的操作,例如拍照等
*
*/
func (vc *videoCamera) OnCtrl(cmd []byte, args []byte) ([]byte, error) {
return nil, nil
}

// 设备当前状态
func (vc *videoCamera) Status() typex.DeviceState {
return vc.status
}

func (vc *videoCamera) Stop() {
vc.status = typex.DEV_STOP
vc.CancelCTX()
}

func (vc *videoCamera) Property() []typex.DeviceProperty {
return []typex.DeviceProperty{}
}

func (vc *videoCamera) Details() *typex.Device {
return vc.RuleEngine.GetDevice(vc.PointId)

}

func (vc *videoCamera) SetState(_ typex.DeviceState) {

}

func (vc *videoCamera) Driver() typex.XExternalDriver {
return nil
}

func (vc *videoCamera) OnDCACall(_ string, _ string, _ interface{}) typex.DCAResult {
return typex.DCAResult{}
}
12 changes: 3 additions & 9 deletions gen_info.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,16 @@ HASH=$(git rev-list --tags --max-count=1)
## Gen Version
#######################################################################
cat >./typex/version.go <<EOF
//
// Warning:
// This file is generated by go compiler, don't change it!!!
// Build on: $(cat /etc/issue)
//
// This file is generated by go compiler, don't change it!!!
package typex

import "fmt"

type Version struct {
Version string
ReleaseTime string
}

func (v Version) String() string {
return fmt.Sprintf("{\"releaseTime\":\"%s\",\"version\":\"%s\"}", v.ReleaseTime, v.Version)
Arch string
Dist string
}

var DefaultVersion = Version{
Expand Down
7 changes: 7 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/hootrhino/rulex/engine"
"github.com/hootrhino/rulex/glogger"
"github.com/hootrhino/rulex/typex"
"github.com/hootrhino/rulex/utils"
)

func init() {
Expand All @@ -28,6 +29,12 @@ func init() {
}
}
}()
dist, err := utils.GetOSDistribution()
if err != nil {
panic(err)
}
typex.DefaultVersion.Dist = dist
typex.DefaultVersion.Arch = runtime.GOOS + "-" + runtime.GOARCH
}

/*
Expand Down
19 changes: 8 additions & 11 deletions plugin/http_server/system_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,17 +90,14 @@ func System(c *gin.Context, hh *HttpApiServer, e typex.RuleX) {
runtime.ReadMemStats(&m)
ip, err0 := utils.HostNameI()
hardWareInfo := map[string]interface{}{
"version": e.Version().Version,
"diskInfo": calculateDiskInfo(diskInfo),
"systemMem": bToMb(m.Sys),
"allocMem": bToMb(m.Alloc),
"totalMem": bToMb(m.TotalAlloc),
"cpuPercent": calculateCpuPercent(cpuPercent),
"osArch": runtime.GOOS + "-" + runtime.GOARCH,
"osDist": func() string {
v, _ := utils.GetOSDistribution()
return v
}(),
"version": e.Version().Version,
"diskInfo": calculateDiskInfo(diskInfo),
"systemMem": bToMb(m.Sys),
"allocMem": bToMb(m.Alloc),
"totalMem": bToMb(m.TotalAlloc),
"cpuPercent": calculateCpuPercent(cpuPercent),
"osArch": e.Version().Arch,
"osDist": e.Version().Dist,
"startedTime": StartedTime,
"ip": func() []string {
if err0 != nil {
Expand Down
7 changes: 1 addition & 6 deletions plugin/mqtt_server/mqtt_server_services.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,7 @@ func (s *MqttServer) Service(arg typex.ServiceArg) typex.ServiceResult {
}
}
}
default:
{
return typex.ServiceResult{Out: []string{"unsupported args:" + arg.Name}}
}
}
return typex.ServiceResult{Out: []string{"kick out success"}}
}
return typex.ServiceResult{Out: []string{"no such service name:" + arg.Name}}
return typex.ServiceResult{Out: []Client{Client{}}}
}
1 change: 0 additions & 1 deletion plugin/sensor_server/sensor_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

"github.com/hootrhino/rulex/glogger"
"github.com/hootrhino/rulex/typex"
"github.com/hootrhino/rulex/utils"
"gopkg.in/ini.v1"
)

Expand Down
2 changes: 1 addition & 1 deletion plugin/ttyd_terminal/ttyd_terminal.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (tty *WebTTYPlugin) Start(typex.RuleX) error {
// tty.ttydCmd.Stdout = glogger.GLogger.Out
// tty.ttydCmd.Stderr = glogger.GLogger.Out
if err1 := tty.ttydCmd.Start(); err1 != nil {
glogger.GLogger.Info("cmd.Start error: %v", err1)
glogger.GLogger.Infof("cmd.Start error: %v", err1)
return err1
}
go func(cmd *exec.Cmd) {
Expand Down
Loading