Skip to content

Commit

Permalink
stream: use index and fd to indicate io streams rather than name
Browse files Browse the repository at this point in the history
  • Loading branch information
criyle committed Feb 5, 2024
1 parent 9fe356f commit 9966f49
Show file tree
Hide file tree
Showing 11 changed files with 380 additions and 348 deletions.
24 changes: 24 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ A REST service to run program in restricted environment (Listening on `localhost
- /file/:fileId GET downloads file from go judge (in memory), returns file content
- /file/:fileId DELETE delete file specified by fileId
- /ws WebSocket for /run
- TODO: /stream WebSocket for stream run
- /version gets build git version (e.g. `v1.4.0`) together with runtime information (go version, os, platform)
- /config gets some configuration (e.g. `fileStorePath`, `runnerConfig`) together with some supported features

Expand Down Expand Up @@ -752,6 +753,29 @@ Due to limitation of GO runtime, the memory will not return to OS automatically,
- `-force-gc-target` default `20m`, the minimal size to trigger GC
- `-force-gc-interval` default `5s`, the interval to check memory usage

### WebSocket Stream Interface

Websocket stream interface is used to run command interactively with inputs and outputs pumping from the command. All message is transmitted in binary format for maximum compatibility.

```text
+--------+--------+---...
| type | payload ...
+--------|--------+---...
request:
type =
1 - request (payload = JSON encoded request)
2 - resize (payload = JSON encoded resize request)
3 - input (payload = 1 byte (4-bit index + 4-bit fd), followed by content)
4 - cancel (no payload)
response:
type =
1 - response (payload = JSON encoded response)
2 - output (payload = 1 byte (4-bit index + 4-bit fd), followed by content)
```

Any incomplete / invalid message will be treated as error.

### Benchmark

By `wrk` with `t.lua`: `wrk -s t.lua -c 1 -t 1 -d 30s --latency http://localhost:5050/run`.
Expand Down
25 changes: 14 additions & 11 deletions cmd/go-judge-shell/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,18 @@ func (w *grpcWrapper) Send(req *stream.Request) error {
w.sc.Send(convertPBRequest(req.Request))
case req.Input != nil:
w.sc.Send(&pb.StreamRequest{Request: &pb.StreamRequest_ExecInput{ExecInput: &pb.StreamRequest_Input{
Name: req.Input.Name,
Index: uint32(req.Input.Index),
Fd: uint32(req.Input.Fd),
Content: req.Input.Content,
}}})
case req.Resize != nil:
w.sc.Send(&pb.StreamRequest{Request: &pb.StreamRequest_ExecResize{ExecResize: &pb.StreamRequest_Resize{
Name: req.Resize.Name,
Rows: uint32(req.Resize.Rows),
Cols: uint32(req.Resize.Cols),
X: uint32(req.Resize.X),
Y: uint32(req.Resize.Y),
Index: uint32(req.Resize.Index),
Fd: uint32(req.Resize.Fd),
Rows: uint32(req.Resize.Rows),
Cols: uint32(req.Resize.Cols),
X: uint32(req.Resize.X),
Y: uint32(req.Resize.Y),
}}})
case req.Cancel != nil:
w.sc.Send(&pb.StreamRequest{Request: &pb.StreamRequest_ExecCancel{}})
Expand All @@ -73,7 +75,8 @@ func (w *grpcWrapper) Recv() (*stream.Response, error) {
switch i := resp.Response.(type) {
case *pb.StreamResponse_ExecOutput:
return &stream.Response{Output: &stream.OutputResponse{
Name: i.ExecOutput.Name,
Index: int(i.ExecOutput.Index),
Fd: int(i.ExecOutput.Fd),
Content: i.ExecOutput.Content,
}}, nil
case *pb.StreamResponse_ExecResponse:
Expand Down Expand Up @@ -227,10 +230,10 @@ func convertPBFile(i model.CmdFile) *pb.Request_File {
return &pb.Request_File{File: &pb.Request_File_Cached{Cached: &pb.Request_CachedFile{FileID: *i.FileID}}}
case i.Name != nil && i.Max != nil:
return &pb.Request_File{File: &pb.Request_File_Pipe{Pipe: &pb.Request_PipeCollector{Name: *i.Name, Max: *i.Max, Pipe: i.Pipe}}}
case i.StreamIn != nil:
return &pb.Request_File{File: &pb.Request_File_StreamIn{StreamIn: &pb.Request_StreamInput{Name: *i.StreamIn}}}
case i.StreamOut != nil:
return &pb.Request_File{File: &pb.Request_File_StreamOut{StreamOut: &pb.Request_StreamOutput{Name: *i.StreamOut}}}
case i.StreamIn:
return &pb.Request_File{File: &pb.Request_File_StreamIn{}}
case i.StreamOut:
return &pb.Request_File{File: &pb.Request_File_StreamOut{}}
}
return nil
}
Expand Down
19 changes: 6 additions & 13 deletions cmd/go-judge-shell/shell.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,19 +49,15 @@ func main() {
log.Printf("finished: %+v %v", r, err)
}

func stringPointer(s string) *string {
return &s
}

func run(sc Stream, args []string) (*model.Response, error) {
req := model.Request{
Cmd: []model.Cmd{{
Args: args,
Env: env,
Files: []*model.CmdFile{
{StreamIn: stringPointer("stdin")},
{StreamOut: stringPointer("stdout")},
{StreamOut: stringPointer("stderr")},
{StreamIn: true},
{StreamOut: true},
{StreamOut: true},
},
CPULimit: uint64(cpuLimit.Nanoseconds()),
ClockLimit: uint64(sessionLimit.Nanoseconds()),
Expand Down Expand Up @@ -104,7 +100,6 @@ func run(sc Stream, args []string) (*model.Response, error) {
if err == io.EOF {
sendCh <- &stream.Request{
Input: &stream.InputRequest{
Name: "stdin",
Content: []byte("\004"),
},
}
Expand All @@ -127,7 +122,6 @@ func run(sc Stream, args []string) (*model.Response, error) {
}
sendCh <- &stream.Request{
Input: &stream.InputRequest{
Name: "stdin",
Content: buf[:n],
},
}
Expand All @@ -141,7 +135,6 @@ func run(sc Stream, args []string) (*model.Response, error) {
for range sigCh {
sendCh <- &stream.Request{
Input: &stream.InputRequest{
Name: "stdin",
Content: []byte("\003"),
},
}
Expand All @@ -159,10 +152,10 @@ func run(sc Stream, args []string) (*model.Response, error) {
}
switch {
case sr.Output != nil:
switch sr.Output.Name {
case "stdout":
switch sr.Output.Fd {
case 1:
os.Stdout.Write(sr.Output.Content)
case "stderr":
case 2:
os.Stderr.Write(sr.Output.Content)
}
case sr.Response != nil:
Expand Down
1 change: 0 additions & 1 deletion cmd/go-judge-shell/shell_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ func handleSizeChange(sendCh chan *stream.Request) {
}
sendCh <- &stream.Request{
Resize: &stream.ResizeRequest{
Name: "stdin",
Rows: int(winSize.Rows),
Cols: int(winSize.Cols),
X: int(winSize.X),
Expand Down
21 changes: 12 additions & 9 deletions cmd/go-judge/grpc_executor/grpc_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ func (sw *streamWrapper) Send(r stream.Response) error {
res.Response = &pb.StreamResponse_ExecResponse{ExecResponse: resp}
case r.Output != nil:
res.Response = &pb.StreamResponse_ExecOutput{ExecOutput: &pb.StreamResponse_Output{
Name: r.Output.Name,
Index: uint32(r.Output.Index),
Fd: uint32(r.Output.Fd),
Content: r.Output.Content,
}}
}
Expand All @@ -44,16 +45,18 @@ func (sw *streamWrapper) Recv() (*stream.Request, error) {
return &stream.Request{Request: convertPBStreamRequest(i.ExecRequest)}, nil
case *pb.StreamRequest_ExecInput:
return &stream.Request{Input: &stream.InputRequest{
Name: i.ExecInput.Name,
Index: int(i.ExecInput.Index),
Fd: int(i.ExecInput.Fd),
Content: i.ExecInput.Content,
}}, nil
case *pb.StreamRequest_ExecResize:
return &stream.Request{Resize: &stream.ResizeRequest{
Name: i.ExecResize.Name,
Rows: int(i.ExecResize.Rows),
Cols: int(i.ExecResize.Cols),
X: int(i.ExecResize.X),
Y: int(i.ExecResize.Y),
Index: int(i.ExecResize.Index),
Fd: int(i.ExecResize.Fd),
Rows: int(i.ExecResize.Rows),
Cols: int(i.ExecResize.Cols),
X: int(i.ExecResize.X),
Y: int(i.ExecResize.Y),
}}, nil
case *pb.StreamRequest_ExecCancel:
return &stream.Request{Cancel: &struct{}{}}, nil
Expand Down Expand Up @@ -142,9 +145,9 @@ func convertPBStreamFile(i *pb.Request_File) model.CmdFile {
case *pb.Request_File_Pipe:
return model.CmdFile{Name: &c.Pipe.Name, Max: &c.Pipe.Max, Pipe: c.Pipe.Pipe}
case *pb.Request_File_StreamIn:
return model.CmdFile{StreamIn: &c.StreamIn.Name}
return model.CmdFile{StreamIn: true}
case *pb.Request_File_StreamOut:
return model.CmdFile{StreamOut: &c.StreamOut.Name}
return model.CmdFile{StreamOut: true}
}
return model.CmdFile{}
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/go-judge/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ type CmdFile struct {
Name *string `json:"name"`
Max *int64 `json:"max"`
Symlink *string `json:"symlink"`
StreamIn *string `json:"streamIn"`
StreamOut *string `json:"streamOut"`
StreamIn bool `json:"streamIn"`
StreamOut bool `json:"streamOut"`
Pipe bool `json:"pipe"`
}

Expand Down
30 changes: 12 additions & 18 deletions cmd/go-judge/stream/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ var (
)

type fileStreamIn struct {
name string
index int
fd int
r io.ReadCloser
w *io.PipeWriter
tty *os.File
Expand All @@ -35,9 +36,9 @@ func (f *fileStreamInReader) TTY(tty *os.File) {
close(f.fi.done)
}

func newFileStreamIn(name string, hasTTY bool) *fileStreamIn {
func newFileStreamIn(index, fd int, hasTTY bool) *fileStreamIn {
r, w := io.Pipe()
fi := &fileStreamIn{name: name, w: w, done: make(chan struct{}), hasTTY: hasTTY}
fi := &fileStreamIn{index: index, fd: fd, w: w, done: make(chan struct{}), hasTTY: hasTTY}
fi.r = &fileStreamInReader{r, fi}
return fi
}
Expand All @@ -50,10 +51,6 @@ func (f *fileStreamIn) GetTTY() *os.File {
return f.tty
}

func (f *fileStreamIn) Name() string {
return f.name
}

func (f *fileStreamIn) Write(b []byte) (int, error) {
return f.w.Write(b)
}
Expand All @@ -63,7 +60,7 @@ func (f *fileStreamIn) EnvFile(fs filestore.FileStore) (envexec.File, error) {
}

func (f *fileStreamIn) String() string {
return fmt.Sprintf("fileStreamIn:%s", f.name)
return fmt.Sprintf("fileStreamIn:(index:%d,fd:%d)", f.index, f.fd)
}

func (f *fileStreamIn) Close() error {
Expand All @@ -72,18 +69,15 @@ func (f *fileStreamIn) Close() error {
}

type fileStreamOut struct {
name string
r *io.PipeReader
w *io.PipeWriter
index int
fd int
r *io.PipeReader
w *io.PipeWriter
}

func newFileStreamOut(name string) *fileStreamOut {
func newFileStreamOut(index, fd int) *fileStreamOut {
r, w := io.Pipe()
return &fileStreamOut{name: name, r: r, w: w}
}

func (f *fileStreamOut) Name() string {
return f.name
return &fileStreamOut{index: index, fd: fd, r: r, w: w}
}

func (f *fileStreamOut) Read(b []byte) (int, error) {
Expand All @@ -95,7 +89,7 @@ func (f *fileStreamOut) EnvFile(fs filestore.FileStore) (envexec.File, error) {
}

func (f *fileStreamOut) String() string {
return fmt.Sprintf("fileStreamOut:%s", f.name)
return fmt.Sprintf("fileStreamOut:(index:%d,fd:%d)", f.index, f.fd)
}

func (f *fileStreamOut) Close() error {
Expand Down
Loading

0 comments on commit 9966f49

Please sign in to comment.