Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

opt: add limit on reusing buffer size #664

Merged
merged 3 commits into from
Jul 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,9 @@ For better performance, in previous case the `ast.Visitor` will be the better ch
But `ast.Visitor` is not a very handy API. You might need to write a lot of code to implement your visitor and carefully maintain the tree hierarchy during decoding. Please read the comments in [ast/visitor.go](https://github.com/bytedance/sonic/blob/main/ast/visitor.go) carefully if you decide to use this API.
### Buffer Size
Sonic use memory pool in many places like `encoder.Encode`, `ast.Node.MarshalJSON` to improve performace, which may produce more memory usage (in-use) when server's load is high. See [issue 614](https://github.com/bytedance/sonic/issues/614). Therefore, we introduce some options to let user control the behavior of memory pool. See [option](https://pkg.go.dev/github.com/bytedance/sonic@v1.11.9/option#pkg-variables) package.

## Community

Sonic is a subproject of [CloudWeGo](https://www.cloudwego.io/). We are committed to building a cloud native ecosystem.
3 changes: 3 additions & 0 deletions README_ZH_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,9 @@ go someFunc(user)

但是,`ast.Visitor` 并不是一个很易用的 API。你可能需要写大量的代码去实现自己的 `ast.Visitor`,并且需要在解析过程中仔细维护树的层级。如果你决定要使用这个 API,请先仔细阅读 [ast/visitor.go](https://github.com/bytedance/sonic/blob/main/ast/visitor.go) 中的注释。

### 缓冲区大小
Sonic在许多地方使用内存池,如`encoder.Encode`, `ast.Node.MarshalJSON`等来提高性能,这可能会在服务器负载高时产生更多的内存使用(in-use)。参见[issue 614](https://github.com/bytedance/sonic/issues/614)。因此,我们引入了一些选项来让用户配置内存池的行为。参见[option](https://pkg.go.dev/github.com/bytedance/sonic@v1.11.9/option#pkg-variables)包。

## 社区

Sonic 是 [CloudWeGo](https://www.cloudwego.io/) 下的一个子项目。我们致力于构建云原生生态系统。
22 changes: 13 additions & 9 deletions ast/encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,7 @@ import (
"unicode/utf8"

"github.com/bytedance/sonic/internal/rt"
)

const (
_MaxBuffer = 1024 // 1KB buffer size
"github.com/bytedance/sonic/option"
)

func quoteString(e *[]byte, s string) {
Expand Down Expand Up @@ -100,23 +97,30 @@ func (self *Node) MarshalJSON() ([]byte, error) {
freeBuffer(buf)
return nil, err
}

ret := make([]byte, len(*buf))
copy(ret, *buf)
freeBuffer(buf)
var ret []byte
if !rt.CanSizeResue(cap(*buf)) {
ret = *buf
} else {
ret = make([]byte, len(*buf))
liuq19 marked this conversation as resolved.
Show resolved Hide resolved
copy(ret, *buf)
freeBuffer(buf)
}
return ret, err
}

func newBuffer() *[]byte {
if ret := bytesPool.Get(); ret != nil {
return ret.(*[]byte)
} else {
buf := make([]byte, 0, _MaxBuffer)
buf := make([]byte, 0, option.DefaultAstBufferSize)
return &buf
}
}

func freeBuffer(buf *[]byte) {
if !rt.CanSizeResue(cap(*buf)) {
return
}
*buf = (*buf)[:0]
bytesPool.Put(buf)
}
Expand Down
10 changes: 8 additions & 2 deletions internal/decoder/api/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ var bufPool = sync.Pool{
},
}

func freeBytes(buf []byte) {
if rt.CanSizeResue(cap(buf)) {
bufPool.Put(buf[:0])
}
}

// NewStreamDecoder adapts to encoding/json.NewDecoder API.
//
// NewStreamDecoder returns a new decoder that reads from r.
Expand Down Expand Up @@ -105,7 +111,7 @@ func (self *StreamDecoder) Decode(val interface{}) (err error) {
// no remain valid bytes, thus we just recycle buffer
mem := self.buf
self.buf = nil
bufPool.Put(mem[:0])
freeBytes(mem)
} else {
// println("keep")
// remain undecoded bytes, move them onto head
Expand Down Expand Up @@ -178,7 +184,7 @@ func (self *StreamDecoder) setErr(err error) {
self.err = err
mem := self.buf[:0]
self.buf = nil
bufPool.Put(mem)
freeBytes(mem)
}

func (self *StreamDecoder) peek() (byte, error) {
Expand Down
20 changes: 19 additions & 1 deletion internal/encoder/encode_race.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,21 @@ package encoder

import (
`encoding/json`

`github.com/bytedance/sonic/internal/rt`
)


func helpDetectDataRace(val interface{}) {
_, _ = json.Marshal(val)
var out []byte
defer func() {
if v := recover(); v != nil {
// NOTICE: help user to locate where panic occurs
println("panic when encoding on: ", truncate(out))
panic(v)
}
}()
out, _ = json.Marshal(val)
}

func encodeIntoCheckRace(buf *[]byte, val interface{}, opts Options) error {
Expand All @@ -34,3 +44,11 @@ func encodeIntoCheckRace(buf *[]byte, val interface{}, opts Options) error {
helpDetectDataRace(val)
return err
}

func truncate(json []byte) string {
if len(json) <= 256 {
return rt.Mem2Str(json)
} else {
return rt.Mem2Str(json[len(json)-256:])
}
}
30 changes: 19 additions & 11 deletions internal/encoder/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,10 +192,14 @@ func Encode(val interface{}, opts Options) ([]byte, error) {
}

/* make a copy of the result */
ret = make([]byte, len(*buf))
copy(ret, *buf)

vars.FreeBytes(buf)
if rt.CanSizeResue(cap(*buf)) {
ret = make([]byte, len(*buf))
copy(ret, *buf)
vars.FreeBytes(buf)
} else {
ret = *buf
}

/* return the buffer into pool */
return ret, nil
}
Expand Down Expand Up @@ -269,21 +273,25 @@ func EncodeIndented(val interface{}, prefix string, indent string, opts Options)
/* indent the JSON */
buf = vars.NewBuffer()
err = json.Indent(buf, *out, prefix, indent)
vars.FreeBytes(out)

/* check for errors */
if err != nil {
vars.FreeBytes(out)
vars.FreeBuffer(buf)
return nil, err
}

/* copy to the result buffer */
ret := make([]byte, buf.Len())
copy(ret, buf.Bytes())

/* return the buffers into pool */
vars.FreeBytes(out)
vars.FreeBuffer(buf)
var ret []byte
if rt.CanSizeResue(cap(buf.Bytes())) {
ret = make([]byte, buf.Len())
copy(ret, buf.Bytes())
/* return the buffers into pool */
vars.FreeBuffer(buf)
} else {
ret = buf.Bytes()
}

return ret, nil
}

Expand Down
17 changes: 8 additions & 9 deletions internal/encoder/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,18 @@ func NewStreamEncoder(w io.Writer) *StreamEncoder {

// Encode encodes interface{} as JSON to io.Writer
func (enc *StreamEncoder) Encode(val interface{}) (err error) {
buf := vars.NewBytes()
out := *buf
out := vars.NewBytes()

/* encode into the buffer */
err = EncodeInto(&out, val, enc.Opts)
err = EncodeInto(out, val, enc.Opts)
if err != nil {
goto free_bytes
}

if enc.indent != "" || enc.prefix != "" {
/* indent the JSON */
buf := vars.NewBuffer()
err = json.Indent(buf, out, enc.prefix, enc.indent)
err = json.Indent(buf, *out, enc.prefix, enc.indent)
if err != nil {
vars.FreeBuffer(buf)
goto free_bytes
Expand All @@ -71,9 +70,10 @@ func (enc *StreamEncoder) Encode(val interface{}) (err error) {
} else {
/* copy into io.Writer */
var n int
for len(out) > 0 {
n, err = enc.w.Write(out)
out = out[n:]
buf := *out
for len(buf) > 0 {
n, err = enc.w.Write(buf)
buf = buf[n:]
if err != nil {
goto free_bytes
}
Expand All @@ -86,7 +86,6 @@ func (enc *StreamEncoder) Encode(val interface{}) (err error) {
}

free_bytes:
*buf = out
vars.FreeBytes(buf)
vars.FreeBytes(out)
return err
}
12 changes: 8 additions & 4 deletions internal/encoder/vars/stack.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,10 @@ func NewBuffer() *bytes.Buffer {
}

func FreeBytes(p *[]byte) {
(*p) = (*p)[:0]
bytesPool.Put(p)
if rt.CanSizeResue(cap(*p)) {
(*p) = (*p)[:0]
bytesPool.Put(p)
}
}

func FreeStack(p *Stack) {
Expand All @@ -129,8 +131,10 @@ func FreeStack(p *Stack) {
}

func FreeBuffer(p *bytes.Buffer) {
p.Reset()
bufferPool.Put(p)
if rt.CanSizeResue(cap(p.Bytes())) {
p.Reset()
bufferPool.Put(p)
}
}

var (
Expand Down
11 changes: 9 additions & 2 deletions internal/rt/fastmem.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
package rt

import (
`unsafe`
`reflect`
"reflect"
"unsafe"

"github.com/bytedance/sonic/option"
)

//go:nosplit
Expand Down Expand Up @@ -146,3 +148,8 @@ func MoreStack(size uintptr)
func Add(ptr unsafe.Pointer, off uintptr) unsafe.Pointer {
return unsafe.Pointer(uintptr(ptr) + off)
}

// CanSizeResue
func CanSizeResue(cap int) bool {
return cap <= int(option.LimitBufferSize)
}
12 changes: 9 additions & 3 deletions option/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,17 @@ package option

var (
// DefaultDecoderBufferSize is the initial buffer size of StreamDecoder
DefaultDecoderBufferSize uint = 128 * 1024
DefaultDecoderBufferSize uint = 4 * 1024

// DefaultEncoderBufferSize is the initial buffer size of Encoder
DefaultEncoderBufferSize uint = 128 * 1024
DefaultEncoderBufferSize uint = 4 * 1024

// DefaultAstBufferSize is the initial buffer size of ast.Node.MarshalJSON()
DefaultAstBufferSize uint = 4 * 1024

// LimitBufferSize indicates the max pool buffer size, in case of OOM.
// See issue https://github.com/bytedance/sonic/issues/614
LimitBufferSize uint = 1024 * 1024
liuq19 marked this conversation as resolved.
Show resolved Hide resolved
)

// CompileOptions includes all options for encoder or decoder compiler.
Expand Down Expand Up @@ -83,4 +90,3 @@ func WithCompileMaxInlineDepth(depth int) CompileOption {
o.MaxInlineDepth = depth
}
}

Loading