From 65dd9ff3723583b26ddf3c06245e845323e2df97 Mon Sep 17 00:00:00 2001 From: zhenghaoz Date: Sun, 4 Jun 2023 12:27:14 +0800 Subject: [PATCH] fix(worker): concurrent map iteration and map write (cherry picked from commit aa83c5b023e1f5b8b84adfd873e0758e8866d65d) --- config/config.go | 2 ++ worker/worker_test.go | 49 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 51 insertions(+) diff --git a/config/config.go b/config/config.go index bcabd903f..cc818c045 100644 --- a/config/config.go +++ b/config/config.go @@ -334,6 +334,7 @@ func (config *Config) OfflineRecommendDigest(option ...DigestOption) string { }) var builder strings.Builder + config.Recommend.Offline.Lock() builder.WriteString(fmt.Sprintf("%v-%v-%v-%v-%v-%v-%v-%v", config.Recommend.Offline.ExploreRecommend, config.Recommend.Offline.EnableLatestRecommend, @@ -344,6 +345,7 @@ func (config *Config) OfflineRecommendDigest(option ...DigestOption) string { options.enableRanking, config.Recommend.Replacement.EnableReplacement, )) + config.Recommend.Offline.UnLock() if config.Recommend.Offline.EnablePopularRecommend { builder.WriteString(fmt.Sprintf("-%v", config.Recommend.Popular.PopularWindow)) } diff --git a/worker/worker_test.go b/worker/worker_test.go index f5809c42b..86158285d 100644 --- a/worker/worker_test.go +++ b/worker/worker_test.go @@ -727,6 +727,55 @@ func TestWorker_Sync(t *testing.T) { done <- struct{}{} } +func TestWorker_SyncRecommend(t *testing.T) { + cfg := config.GetDefaultConfig() + cfg.Recommend.Offline.ExploreRecommend = map[string]float64{"popular": 0.5} + master := newMockMaster(t) + master.meta.Config = marshal(t, cfg) + go master.Start(t) + address := <-master.addr + conn, err := grpc.Dial(address, grpc.WithTransportCredentials(insecure.NewCredentials())) + assert.NoError(t, err) + worker := &Worker{ + Settings: config.NewSettings(), + jobs: 1, + testMode: true, + masterClient: protocol.NewMasterClient(conn), + syncedChan: parallel.NewConditionChannel(), + ticker: time.NewTicker(time.Minute), + } + worker.Sync() + + stopSync := make(chan struct{}) + go func() { + for { + select { + case <-stopSync: + return + default: + worker.Sync() + } + } + }() + + stopRecommend := make(chan struct{}) + go func() { + for { + select { + case <-stopRecommend: + return + default: + worker.Settings.Config.OfflineRecommendDigest() + } + } + }() + + time.Sleep(time.Second) + stopSync <- struct{}{} + stopRecommend <- struct{}{} + master.Stop() +} + type mockFactorizationMachine struct { click.BaseFactorizationMachine }