-
Notifications
You must be signed in to change notification settings - Fork 112
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(dot/sync): improve worker pool #4258
base: development
Are you sure you want to change the base?
Conversation
Created as a draft for two reasons:
|
987cb6f
to
4c0d5cb
Compare
The main difference in the worker pool API is that SubmitBatch() does not block until the whole batch has been processed. Instead, it returns an ID which can be used to retrieve the current state of the batch. In addition, Results() returns a channel over which task results are sent as they become available. The main improvement this brings is increased concurrency, since results can be processed before the whole batch has been completed. What has not changed is the overall flow of the Strategy interface; getting a new batch of tasks with NextActions() and processing the results with Process(). Closes #4232
4c0d5cb
to
c875d08
Compare
workerPool: NewWorkerPool(WorkerPoolConfig{ | ||
MaxRetries: 5, | ||
// TODO: This should depend on the actual configuration of the currently used sync strategy. | ||
Capacity: defaultNumOfTasks * 10, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why times 10?
ShowMetrics() | ||
IsSynced() bool | ||
} | ||
|
||
type syncTask struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please move this to fullsync.go
since it is specific for that strategy?
} | ||
|
||
func (s *syncTask) ID() TaskID { | ||
return TaskID(s.request.String()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if this is a good ID since it is the string representation of the request, if we send the same request for multiple peers we are gonna get the same ID here.
What if we just generate an UUID?
Also, we can add a String()
for the string representation that could be useful for debugging purposes
@@ -119,6 +135,11 @@ func NewSyncService(cfgs ...ServiceConfig) *SyncService { | |||
waitPeersDuration: waitPeersDefaultTimeout, | |||
stopCh: make(chan struct{}), | |||
seenBlockSyncRequests: lrucache.NewLRUCache[common.Hash, uint](100), | |||
workerPool: NewWorkerPool(WorkerPoolConfig{ | |||
MaxRetries: 5, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you move the magic numbers to a const? 😄
}, | ||
}) | ||
} | ||
task, ok := result.Task.(*syncTask) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if we define result.Task
over a generic? so we can skip this casting?
|
||
continue | ||
} | ||
request := task.request.(*messages.BlockRequestMessage) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can skip this cast if you change the syncTask.request
from messages.P2PMessage
to *messages.BlockRequestMessage
since (if i'm not wrong) this is the only type we are expecting
The main difference in the worker pool API is that
SubmitBatch()
does not block until the whole batch has been processed. Instead, it returns an ID which can be used to retrieve the current state of the batch. In addition,Results()
returns a channel over which task results are sent as they become available.The main improvement this brings is increased concurrency, since results can be processed before the whole batch has been completed.
What has not changed is the overall flow of the Strategy interface; getting a new batch of tasks with
NextActions()
and processing the results withProcess()
.Changes
dot/sync/worker_pool.go
SyncService
to the API changes of the new worker poolTests
go test github.com/ChainSafe/gossamer/dot/sync
Issues
Closes #4232