From 34747ef6447ecb72241b7ce9a34e8d574d1b7e8f Mon Sep 17 00:00:00 2001 From: "duanyi.aster" Date: Mon, 8 Jul 2024 17:27:23 +0800 Subject: [PATCH] opt: limit buffer pool size --- README.md | 3 +++ README_ZH_CN.md | 3 +++ ast/encode.go | 22 +++++++++++++--------- internal/decoder/api/stream.go | 10 ++++++++-- internal/encoder/encode_race.go | 18 +++++++++++++++++- internal/encoder/encoder.go | 30 +++++++++++++++++++----------- internal/encoder/stream.go | 17 ++++++++--------- internal/rt/fastmem.go | 11 +++++++++-- option/option.go | 12 +++++++++--- 9 files changed, 89 insertions(+), 37 deletions(-) diff --git a/README.md b/README.md index f3c736352..62a2fdfaa 100644 --- a/README.md +++ b/README.md @@ -466,6 +466,9 @@ For better performance, in previous case the `ast.Visitor` will be the better ch But `ast.Visitor` is not a very handy API. You might need to write a lot of code to implement your visitor and carefully maintain the tree hierarchy during decoding. Please read the comments in [ast/visitor.go](https://github.com/bytedance/sonic/blob/main/ast/visitor.go) carefully if you decide to use this API. +### Buffer Size +Sonic use memory pool in many places like `encoder.Encode`, `ast.Node.MarshalJSON` to improve performace, which may produce more memory usage (in-use) when server's load is high. See [issue 614](https://github.com/bytedance/sonic/issues/614). Therefore, we introduce some options to let user control the behavior of memory pool. See [option](https://pkg.go.dev/github.com/bytedance/sonic@v1.11.9/option#pkg-variables) package. + ## Community Sonic is a subproject of [CloudWeGo](https://www.cloudwego.io/). We are committed to building a cloud native ecosystem. diff --git a/README_ZH_CN.md b/README_ZH_CN.md index d0341ab72..2f7a4b6a4 100644 --- a/README_ZH_CN.md +++ b/README_ZH_CN.md @@ -464,6 +464,9 @@ go someFunc(user) 但是,`ast.Visitor` 并不是一个很易用的 API。你可能需要写大量的代码去实现自己的 `ast.Visitor`,并且需要在解析过程中仔细维护树的层级。如果你决定要使用这个 API,请先仔细阅读 [ast/visitor.go](https://github.com/bytedance/sonic/blob/main/ast/visitor.go) 中的注释。 +### 缓冲区大小 +Sonic在许多地方使用内存池,如`encoder.Encode`, `ast.Node.MarshalJSON`等来提高性能,这可能会在服务器负载高时产生更多的内存使用(in-use)。参见[issue 614](https://github.com/bytedance/sonic/issues/614)。因此,我们引入了一些选项来让用户配置内存池的行为。参见[option](https://pkg.go.dev/github.com/bytedance/sonic@v1.11.9/option#pkg-variables)包。 + ## 社区 Sonic 是 [CloudWeGo](https://www.cloudwego.io/) 下的一个子项目。我们致力于构建云原生生态系统。 diff --git a/ast/encode.go b/ast/encode.go index b5bcd4c2d..564b96305 100644 --- a/ast/encode.go +++ b/ast/encode.go @@ -21,10 +21,7 @@ import ( "unicode/utf8" "github.com/bytedance/sonic/internal/rt" -) - -const ( - _MaxBuffer = 1024 // 1KB buffer size + "github.com/bytedance/sonic/option" ) func quoteString(e *[]byte, s string) { @@ -100,10 +97,14 @@ func (self *Node) MarshalJSON() ([]byte, error) { freeBuffer(buf) return nil, err } - - ret := make([]byte, len(*buf)) - copy(ret, *buf) - freeBuffer(buf) + var ret []byte + if !rt.CanSizeResue(cap(*buf)) { + ret = *buf + } else { + ret = make([]byte, len(*buf)) + copy(ret, *buf) + freeBuffer(buf) + } return ret, err } @@ -111,12 +112,15 @@ func newBuffer() *[]byte { if ret := bytesPool.Get(); ret != nil { return ret.(*[]byte) } else { - buf := make([]byte, 0, _MaxBuffer) + buf := make([]byte, 0, option.DefaultAstBufferSize) return &buf } } func freeBuffer(buf *[]byte) { + if !rt.CanSizeResue(cap(*buf)) { + return + } *buf = (*buf)[:0] bytesPool.Put(buf) } diff --git a/internal/decoder/api/stream.go b/internal/decoder/api/stream.go index 2542fe88d..2bc522c7a 100644 --- a/internal/decoder/api/stream.go +++ b/internal/decoder/api/stream.go @@ -47,6 +47,12 @@ var bufPool = sync.Pool{ }, } +func freeBytes(buf []byte) { + if rt.CanSizeResue(cap(buf)) { + bufPool.Put(buf[:0]) + } +} + // NewStreamDecoder adapts to encoding/json.NewDecoder API. // // NewStreamDecoder returns a new decoder that reads from r. @@ -105,7 +111,7 @@ func (self *StreamDecoder) Decode(val interface{}) (err error) { // no remain valid bytes, thus we just recycle buffer mem := self.buf self.buf = nil - bufPool.Put(mem[:0]) + freeBytes(mem) } else { // println("keep") // remain undecoded bytes, move them onto head @@ -178,7 +184,7 @@ func (self *StreamDecoder) setErr(err error) { self.err = err mem := self.buf[:0] self.buf = nil - bufPool.Put(mem) + freeBytes(mem) } func (self *StreamDecoder) peek() (byte, error) { diff --git a/internal/encoder/encode_race.go b/internal/encoder/encode_race.go index 352a5120c..db4ccf9c1 100644 --- a/internal/encoder/encode_race.go +++ b/internal/encoder/encode_race.go @@ -25,7 +25,15 @@ import ( func helpDetectDataRace(val interface{}) { - _, _ = json.Marshal(val) + var out []byte + defer func() { + if v := recover(); v != nil { + // NOTICE: help user to locate where panic occurs + println("panic when encoding on: ", truncate(out)) + panic(v) + } + }() + out, _ = json.Marshal(val) } func encodeIntoCheckRace(buf *[]byte, val interface{}, opts Options) error { @@ -34,3 +42,11 @@ func encodeIntoCheckRace(buf *[]byte, val interface{}, opts Options) error { helpDetectDataRace(val) return err } + +func truncate(json []byte) string { + if len(json) <= 256 { + return rt.Mem2Str(json) + } else { + return rt.Mem2Str(json[len(json)-256:]) + } +} diff --git a/internal/encoder/encoder.go b/internal/encoder/encoder.go index 4cd03f803..fc58c76d3 100644 --- a/internal/encoder/encoder.go +++ b/internal/encoder/encoder.go @@ -188,10 +188,14 @@ func Encode(val interface{}, opts Options) ([]byte, error) { } /* make a copy of the result */ - ret = make([]byte, len(*buf)) - copy(ret, *buf) - - vars.FreeBytes(buf) + if rt.CanSizeResue(cap(*buf)) { + ret = make([]byte, len(*buf)) + copy(ret, *buf) + vars.FreeBytes(buf) + } else { + ret = *buf + } + /* return the buffer into pool */ return ret, nil } @@ -265,21 +269,25 @@ func EncodeIndented(val interface{}, prefix string, indent string, opts Options) /* indent the JSON */ buf = vars.NewBuffer() err = json.Indent(buf, *out, prefix, indent) + vars.FreeBytes(out) /* check for errors */ if err != nil { - vars.FreeBytes(out) vars.FreeBuffer(buf) return nil, err } /* copy to the result buffer */ - ret := make([]byte, buf.Len()) - copy(ret, buf.Bytes()) - - /* return the buffers into pool */ - vars.FreeBytes(out) - vars.FreeBuffer(buf) + var ret []byte + if rt.CanSizeResue(cap(buf.Bytes())) { + ret = make([]byte, buf.Len()) + copy(ret, buf.Bytes()) + /* return the buffers into pool */ + vars.FreeBuffer(buf) + } else { + ret = buf.Bytes() + } + return ret, nil } diff --git a/internal/encoder/stream.go b/internal/encoder/stream.go index 45333436c..c2d026a0c 100644 --- a/internal/encoder/stream.go +++ b/internal/encoder/stream.go @@ -38,11 +38,10 @@ func NewStreamEncoder(w io.Writer) *StreamEncoder { // Encode encodes interface{} as JSON to io.Writer func (enc *StreamEncoder) Encode(val interface{}) (err error) { - buf := vars.NewBytes() - out := *buf + out := vars.NewBytes() /* encode into the buffer */ - err = EncodeInto(&out, val, enc.Opts) + err = EncodeInto(out, val, enc.Opts) if err != nil { goto free_bytes } @@ -50,7 +49,7 @@ func (enc *StreamEncoder) Encode(val interface{}) (err error) { if enc.indent != "" || enc.prefix != "" { /* indent the JSON */ buf := vars.NewBuffer() - err = json.Indent(buf, out, enc.prefix, enc.indent) + err = json.Indent(buf, *out, enc.prefix, enc.indent) if err != nil { vars.FreeBuffer(buf) goto free_bytes @@ -71,9 +70,10 @@ func (enc *StreamEncoder) Encode(val interface{}) (err error) { } else { /* copy into io.Writer */ var n int - for len(out) > 0 { - n, err = enc.w.Write(out) - out = out[n:] + buf := *out + for len(buf) > 0 { + n, err = enc.w.Write(buf) + buf = buf[n:] if err != nil { goto free_bytes } @@ -86,7 +86,6 @@ func (enc *StreamEncoder) Encode(val interface{}) (err error) { } free_bytes: - *buf = out - vars.FreeBytes(buf) + vars.FreeBytes(out) return err } diff --git a/internal/rt/fastmem.go b/internal/rt/fastmem.go index 60e96887b..508be4765 100644 --- a/internal/rt/fastmem.go +++ b/internal/rt/fastmem.go @@ -17,8 +17,10 @@ package rt import ( - `unsafe` - `reflect` + "reflect" + "unsafe" + + "github.com/bytedance/sonic/option" ) //go:nosplit @@ -146,3 +148,8 @@ func MoreStack(size uintptr) func Add(ptr unsafe.Pointer, off uintptr) unsafe.Pointer { return unsafe.Pointer(uintptr(ptr) + off) } + +// CanSizeResue +func CanSizeResue(cap int) bool { + return cap <= int(option.LimitBufferSize) +} diff --git a/option/option.go b/option/option.go index 71527cdf0..4d9965260 100644 --- a/option/option.go +++ b/option/option.go @@ -18,10 +18,17 @@ package option var ( // DefaultDecoderBufferSize is the initial buffer size of StreamDecoder - DefaultDecoderBufferSize uint = 128 * 1024 + DefaultDecoderBufferSize uint = 4 * 1024 // DefaultEncoderBufferSize is the initial buffer size of Encoder - DefaultEncoderBufferSize uint = 128 * 1024 + DefaultEncoderBufferSize uint = 4 * 1024 + + // DefaultAstBufferSize is the initial buffer size of ast.Node.MarshalJSON() + DefaultAstBufferSize uint = 4 * 1024 + + // LimitBufferSize indicates the max pool buffer size, in case of OOM. + // See issue https://github.com/bytedance/sonic/issues/614 + LimitBufferSize uint = 1024 * 1024 ) // CompileOptions includes all options for encoder or decoder compiler. @@ -83,4 +90,3 @@ func WithCompileMaxInlineDepth(depth int) CompileOption { o.MaxInlineDepth = depth } } - \ No newline at end of file