Skip to content

Commit

Permalink
feat: support multi-thread inference (#762)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhenghaoz committed Aug 19, 2023
1 parent 6821049 commit 80542ed
Show file tree
Hide file tree
Showing 16 changed files with 210 additions and 81 deletions.
114 changes: 114 additions & 0 deletions base/sizeof/size.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Copyright 2020 gorse Project Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package sizeof

import (
"math"
"reflect"
)

// DeepSize reports the size of v in bytes, as reflect.Size, but also including
// all recursive substructures of v via maps, pointers, and slices. If v
// contains any cycles, the size of each pointer (re)introducing the cycle is
// included but the acyclic substructure is counted only once.
//
// Only values whose size and structure can be obtained by the reflect package
// are counted. Some values have components that are not visible by
// reflection, and so are not counted or may be undercounted. In particular:
//
// The space occupied by code and data, reachable through variables captured in
// the closure of a function pointer, are not counted. A value of function type
// is counted only as a pointer.
//
// The unused buckets of a map cannot be inspected by the reflect package.
// Their size is estimated by assuming unfilled slots contain zeroes of their
// type.
//
// The unused capacity of the array under a slice is estimated by assuming the
// unused slots contain zeroes of their type. It is possible they contain non
// zero values from sharing or reslicing, but without explicitly reslicing the
// reflect package cannot touch them.
func DeepSize(v any) int {
return int(valueSize(reflect.ValueOf(v), make(map[uintptr]bool)))
}

func valueSize(v reflect.Value, seen map[uintptr]bool) uintptr {
base := v.Type().Size()
switch v.Kind() {
case reflect.Ptr:
p := v.Pointer()
if !seen[p] && !v.IsNil() {
seen[p] = true
return base + valueSize(v.Elem(), seen)
}

case reflect.Slice:
n := v.Len()
for i := 0; i < n; i++ {
base += valueSize(v.Index(i), seen)
}

// Account for the parts of the array not covered by this slice. Since
// we can't get the values directly, assume they're zeroes. That may be
// incorrect, in which case we may underestimate.
if cap := v.Cap(); cap > n {
base += v.Type().Size() * uintptr(cap-n)
}

case reflect.Map:
// A map m has len(m) / 6.5 buckets, rounded up to a power of two, and
// a minimum of one bucket. Each bucket is 16 bytes + 8*(keysize + valsize).
//
// We can't tell which keys are in which bucket by reflection, however,
// so here we count the 16-byte header for each bucket, and then just add
// in the computed key and value sizes.
nb := uintptr(math.Pow(2, math.Ceil(math.Log(float64(v.Len())/6.5)/math.Log(2))))
if nb == 0 {
nb = 1
}
base = 16 * nb
for _, key := range v.MapKeys() {
base += valueSize(key, seen)
base += valueSize(v.MapIndex(key), seen)
}

// We have nb buckets of 8 slots each, and v.Len() slots are filled.
// The remaining slots we will assume contain zero key/value pairs.
zk := v.Type().Key().Size() // a zero key
zv := v.Type().Elem().Size() // a zero value
base += (8*nb - uintptr(v.Len())) * (zk + zv)

case reflect.Struct:
// Chase pointer and slice fields and add the size of their members.
for i := 0; i < v.NumField(); i++ {
f := v.Field(i)
switch f.Kind() {
case reflect.Ptr:
p := f.Pointer()
if !seen[p] && !f.IsNil() {
seen[p] = true
base += valueSize(f.Elem(), seen)
}
case reflect.Slice:
base += valueSize(f, seen)
}
}

case reflect.String:
return base + uintptr(v.Len())

}
return base
}
34 changes: 34 additions & 0 deletions base/sizeof/size_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright 2020 gorse Project Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package sizeof

import (
"testing"
)

func TestDeepSize(t *testing.T) {
type V struct {
Z int
E *V
}

v := &V{Z: 25}
want := DeepSize(v)
v.E = v // induce a cycle
got := DeepSize(v)
if got != want {
t.Errorf("Cyclic size: got %d, want %d", got, want)
}
}
8 changes: 7 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,8 @@ type TracingConfig struct {
}

type ExperimentalConfig struct {
EnableDeepLearning bool `mapstructure:"enable_deep_learning"`
EnableDeepLearning bool `mapstructure:"enable_deep_learning"`
DeepLearningBatchSize int `mapstructure:"deep_learning_batch_size"`
}

func GetDefaultConfig() *Config {
Expand Down Expand Up @@ -248,6 +249,9 @@ func GetDefaultConfig() *Config {
Exporter: "jaeger",
Sampler: "always",
},
Experimental: ExperimentalConfig{
DeepLearningBatchSize: 128,
},
}
}

Expand Down Expand Up @@ -522,6 +526,8 @@ func setDefault() {
// [tracing]
viper.SetDefault("tracing.exporter", defaultConfig.Tracing.Exporter)
viper.SetDefault("tracing.sampler", defaultConfig.Tracing.Sampler)
// [experimental]
viper.SetDefault("experimental.deep_learning_batch_size", defaultConfig.Experimental.DeepLearningBatchSize)
}

type configBinding struct {
Expand Down
3 changes: 3 additions & 0 deletions config/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -254,3 +254,6 @@ ratio = 1

# Enable deep learning recommenders. The default value is false.
enable_deep_learning = false

# Batch size for deep learning recommenders. The default value is 128.
deep_learning_batch_size = 128
2 changes: 2 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ func TestUnmarshal(t *testing.T) {
assert.Equal(t, "http://localhost:14268/api/traces", config.Tracing.CollectorEndpoint)
assert.Equal(t, "always", config.Tracing.Sampler)
assert.Equal(t, 1.0, config.Tracing.Ratio)
// [experimental]
assert.Equal(t, 128, config.Experimental.DeepLearningBatchSize)
})
}
}
Expand Down
8 changes: 6 additions & 2 deletions master/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@ import (
"github.com/zhenghaoz/gorse/base/log"
"github.com/zhenghaoz/gorse/base/parallel"
"github.com/zhenghaoz/gorse/base/progress"
"github.com/zhenghaoz/gorse/base/sizeof"
"github.com/zhenghaoz/gorse/base/task"
"github.com/zhenghaoz/gorse/config"
"github.com/zhenghaoz/gorse/model"
"github.com/zhenghaoz/gorse/model/click"
"github.com/zhenghaoz/gorse/model/ranking"
"github.com/zhenghaoz/gorse/protocol"
Expand Down Expand Up @@ -157,7 +159,9 @@ func NewMaster(cfg *config.Config, cacheFile string, managedMode bool) *Master {
// enable deep learning
if cfg.Experimental.EnableDeepLearning {
log.Logger().Debug("enable deep learning")
m.ClickModel = click.NewDeepFM(nil)
m.ClickModel = click.NewDeepFM(model.Params{
model.BatchSize: cfg.Experimental.DeepLearningBatchSize,
})
}

return m
Expand Down Expand Up @@ -202,7 +206,7 @@ func (m *Master) Serve() {
RankingPrecision.Set(float64(m.clickScore.Precision))
RankingRecall.Set(float64(m.clickScore.Recall))
RankingAUC.Set(float64(m.clickScore.AUC))
MemoryInUseBytesVec.WithLabelValues("ranking_model").Set(float64(m.ClickModel.Bytes()))
MemoryInUseBytesVec.WithLabelValues("ranking_model").Set(float64(sizeof.DeepSize(m.ClickModel)))
}

// create cluster meta cache
Expand Down
7 changes: 4 additions & 3 deletions master/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/zhenghaoz/gorse/base/parallel"
"github.com/zhenghaoz/gorse/base/progress"
"github.com/zhenghaoz/gorse/base/search"
"github.com/zhenghaoz/gorse/base/sizeof"
"github.com/zhenghaoz/gorse/base/task"
"github.com/zhenghaoz/gorse/config"
"github.com/zhenghaoz/gorse/model/click"
Expand Down Expand Up @@ -191,8 +192,8 @@ func (m *Master) runLoadDatasetTask() error {
clickDataset = nil
m.clickDataMutex.Unlock()
LoadDatasetStepSecondsVec.WithLabelValues("split_click_dataset").Set(time.Since(startTime).Seconds())
MemoryInUseBytesVec.WithLabelValues("ranking_train_set").Set(float64(m.clickTrainSet.Bytes()))
MemoryInUseBytesVec.WithLabelValues("ranking_test_set").Set(float64(m.clickTestSet.Bytes()))
MemoryInUseBytesVec.WithLabelValues("ranking_train_set").Set(float64(sizeof.DeepSize(m.clickTrainSet)))
MemoryInUseBytesVec.WithLabelValues("ranking_test_set").Set(float64(sizeof.DeepSize(m.clickTestSet)))

LoadDatasetTotalSeconds.Set(time.Since(initialStartTime).Seconds())
return nil
Expand Down Expand Up @@ -1153,7 +1154,7 @@ func (t *FitClickModelTask) run(ctx context.Context, j *task.JobsAllocator) erro
RankingPrecision.Set(float64(score.Precision))
RankingRecall.Set(float64(score.Recall))
RankingAUC.Set(float64(score.AUC))
MemoryInUseBytesVec.WithLabelValues("ranking_model").Set(float64(t.ClickModel.Bytes()))
MemoryInUseBytesVec.WithLabelValues("ranking_model").Set(float64(sizeof.DeepSize(t.ClickModel)))
if err := t.CacheClient.Set(ctx, cache.Time(cache.Key(cache.GlobalMeta, cache.LastFitRankingModelTime), time.Now())); err != nil {
log.Logger().Error("failed to write meta", zap.Error(err))
}
Expand Down
9 changes: 0 additions & 9 deletions model/click/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,15 +108,6 @@ func (dataset *Dataset) ItemCount() int {
return int(dataset.Index.CountItems())
}

func (dataset *Dataset) Bytes() int {
var bytes uintptr
bytes += uintptr(dataset.Index.Bytes())
bytes += uintptr(dataset.Users.Bytes())
bytes += uintptr(dataset.Items.Bytes())
bytes += uintptr(dataset.Target.Bytes())
return int(bytes)
}

// Count returns the number of samples.
func (dataset *Dataset) Count() int {
if dataset.Users.Len() != dataset.Items.Len() {
Expand Down
12 changes: 4 additions & 8 deletions model/click/deepfm.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,14 +378,6 @@ func (fm *DeepFM) Unmarshal(r io.Reader) error {
return nil
}

func (fm *DeepFM) Bytes() int {
return 0
}

func (fm *DeepFM) Complexity() int {
return 0
}

func (fm *DeepFM) build() {
// init Adam optimizer variables
fm.m_v = zeros(fm.numFeatures, fm.nFactors)
Expand Down Expand Up @@ -707,6 +699,10 @@ func (fm *DeepFM) Clone() FactorizationMachine {
}
}

func (fm *DeepFM) Spawn() FactorizationMachine {
return fm.Clone()
}

func zeros(a, b int) [][]float32 {
retVal := make([][]float32, a)
for i := range retVal {
Expand Down
27 changes: 11 additions & 16 deletions model/click/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ type FactorizationMachine interface {
InternalPredict(x []int32, values []float32) float32
Fit(ctx context.Context, trainSet *Dataset, testSet *Dataset, config *FitConfig) Score
Marshal(w io.Writer) error
Bytes() int
}

type BatchInference interface {
Expand All @@ -139,6 +138,10 @@ type FactorizationMachineCloner interface {
Clone() FactorizationMachine
}

type FactorizationMachineSpawner interface {
Spawn() FactorizationMachine
}

type BaseFactorizationMachine struct {
model.BaseModel
Index UnifiedIndex
Expand Down Expand Up @@ -527,21 +530,6 @@ func (fm *FM) Init(trainSet *Dataset) {
fm.BaseFactorizationMachine.Init(trainSet)
}

func (fm *FM) Bytes() int {
// The memory usage of FM consists of:
// 1. struct
// 2. float32 in fm.W
// 3. slice in fm.V
// 4. UnifiedIndex
bytes := reflect.TypeOf(fm).Elem().Size()
bytes += reflect.TypeOf(fm.W).Elem().Size() * uintptr(len(fm.W))
if len(fm.V) > 0 {
bytes += reflect.TypeOf(fm.V).Elem().Size() * uintptr(len(fm.V))
bytes += reflect.TypeOf(fm.V).Elem().Elem().Size() * uintptr(len(fm.V)) * uintptr(fm.nFactors)
}
return int(bytes) + fm.Index.Bytes()
}

func MarshalModel(w io.Writer, m FactorizationMachine) error {
// write header
var err error
Expand Down Expand Up @@ -601,6 +589,13 @@ func Clone(m FactorizationMachine) FactorizationMachine {
}
}

func Spawn(m FactorizationMachine) FactorizationMachine {
if cloner, ok := m.(FactorizationMachineSpawner); ok {
return cloner.Spawn()
}
return m
}

// Marshal model into byte stream.
func (fm *FM) Marshal(w io.Writer) error {
// write params
Expand Down
4 changes: 0 additions & 4 deletions model/click/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,6 @@ func (searcher *ModelSearcher) GetBestModel() (FactorizationMachine, Score) {
return searcher.bestModel, searcher.bestScore
}

func (searcher *ModelSearcher) Complexity() int {
return searcher.numTrials * searcher.numEpochs
}

func (searcher *ModelSearcher) Fit(ctx context.Context, trainSet, valSet *Dataset, j *task.JobsAllocator) error {
log.Logger().Info("click model search",
zap.Int("n_users", trainSet.UserCount()),
Expand Down
8 changes: 0 additions & 8 deletions model/click/search_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,6 @@ type mockFactorizationMachineForSearch struct {
model.BaseModel
}

func (m *mockFactorizationMachineForSearch) Complexity() int {
panic("implement me")
}

func (m *mockFactorizationMachineForSearch) Bytes() int {
panic("implement me")
}

func (m *mockFactorizationMachineForSearch) Marshal(_ io.Writer) error {
panic("implement me")
}
Expand Down
Loading

0 comments on commit 80542ed

Please sign in to comment.