Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

polygon/p2p: Add blk/s and bytes/s to periodic log #9976

Merged
merged 15 commits into from
Apr 27, 2024
83 changes: 48 additions & 35 deletions polygon/p2p/fetcher_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ type FetcherConfig struct {

type Fetcher interface {
// FetchHeaders fetches [start,end) headers from a peer. Blocks until data is received.
FetchHeaders(ctx context.Context, start uint64, end uint64, peerId *PeerId) ([]*types.Header, error)
FetchHeaders(ctx context.Context, start uint64, end uint64, peerId *PeerId) ([]*types.Header, int, error)
taratorio marked this conversation as resolved.
Show resolved Hide resolved
// FetchBodies fetches block bodies for the given headers from a peer. Blocks until data is received.
FetchBodies(ctx context.Context, headers []*types.Header, peerId *PeerId) ([]*types.Body, error)
FetchBodies(ctx context.Context, headers []*types.Header, peerId *PeerId) ([]*types.Body, int, error)
// FetchBlocks fetches headers and bodies for a given [start, end) range from a peer and
// assembles them into blocks. Blocks until data is received.
FetchBlocks(ctx context.Context, start uint64, end uint64, peerId *PeerId) ([]*types.Block, error)
FetchBlocks(ctx context.Context, start uint64, end uint64, peerId *PeerId) ([]*types.Block, int, error)
}

func NewFetcher(
Expand Down Expand Up @@ -63,9 +63,18 @@ type fetcher struct {
requestIdGenerator RequestIdGenerator
}

func (f *fetcher) FetchHeaders(ctx context.Context, start uint64, end uint64, peerId *PeerId) ([]*types.Header, error) {
type FetcherResponseTypes interface {
shohamc1 marked this conversation as resolved.
Show resolved Hide resolved
*types.Header | *types.Body
}

type FetcherResponse[T FetcherResponseTypes] struct {
shohamc1 marked this conversation as resolved.
Show resolved Hide resolved
Data []T
TotalSize int
}

func (f *fetcher) FetchHeaders(ctx context.Context, start uint64, end uint64, peerId *PeerId) ([]*types.Header, int, error) {
if start >= end {
return nil, &ErrInvalidFetchHeadersRange{
return nil, 0, &ErrInvalidFetchHeadersRange{
start: start,
end: end,
}
Expand All @@ -82,6 +91,7 @@ func (f *fetcher) FetchHeaders(ctx context.Context, start uint64, end uint64, pe
if amount%eth.MaxHeadersServe > 0 {
numChunks++
}
totalHeadersSize := 0

headers := make([]*types.Header, 0, amount)
for chunkNum := uint64(0); chunkNum < numChunks; chunkNum++ {
Expand All @@ -91,30 +101,32 @@ func (f *fetcher) FetchHeaders(ctx context.Context, start uint64, end uint64, pe
// a node may not respond with all MaxHeadersServe in 1 response,
// so we keep on consuming from last received number (akin to consuming a paging api)
// until we have all headers of the chunk or the peer stopped returning headers
headersChunk, err := fetchWithRetry(f.config, func() ([]*types.Header, error) {
headersChunk, err := fetchWithRetry(f.config, func() (*FetcherResponse[*types.Header], error) {
return f.fetchHeaders(ctx, chunkStart, chunkEnd, peerId)
})
if err != nil {
return nil, err
return nil, 0, err
}
if len(headersChunk) == 0 {
if len(headersChunk.Data) == 0 {
break
}

headers = append(headers, headersChunk...)
chunkStart += uint64(len(headersChunk))
headers = append(headers, headersChunk.Data...)
chunkStart += uint64(len(headersChunk.Data))
totalHeadersSize += headersChunk.TotalSize
}
}

if err := f.validateHeadersResponse(headers, start, amount); err != nil {
return nil, err
return nil, 0, err
}

return headers, nil
return headers, totalHeadersSize, nil
}

func (f *fetcher) FetchBodies(ctx context.Context, headers []*types.Header, peerId *PeerId) ([]*types.Body, error) {
func (f *fetcher) FetchBodies(ctx context.Context, headers []*types.Header, peerId *PeerId) ([]*types.Body, int, error) {
var bodies []*types.Body
totalBodiesSize := 0

for len(headers) > 0 {
// Note: we always request MaxBodiesServe for optimal response sizes (fully utilising the 2 MB soft limit).
Expand All @@ -128,43 +140,44 @@ func (f *fetcher) FetchBodies(ctx context.Context, headers []*types.Header, peer
headersChunk = headers
}

bodiesChunk, err := fetchWithRetry(f.config, func() ([]*types.Body, error) {
bodiesChunk, err := fetchWithRetry(f.config, func() (*FetcherResponse[*types.Body], error) {
return f.fetchBodies(ctx, headersChunk, peerId)
})
if err != nil {
return nil, err
return nil, 0, err
}
if len(bodiesChunk) == 0 {
return nil, NewErrMissingBodies(headers)
if len(bodiesChunk.Data) == 0 {
return nil, 0, NewErrMissingBodies(headers)
}

bodies = append(bodies, bodiesChunk...)
headers = headers[len(bodiesChunk):]
bodies = append(bodies, bodiesChunk.Data...)
headers = headers[len(bodiesChunk.Data):]
totalBodiesSize += bodiesChunk.TotalSize
}

return bodies, nil
return bodies, totalBodiesSize, nil
}

func (f *fetcher) FetchBlocks(ctx context.Context, start, end uint64, peerId *PeerId) ([]*types.Block, error) {
headers, err := f.FetchHeaders(ctx, start, end, peerId)
func (f *fetcher) FetchBlocks(ctx context.Context, start, end uint64, peerId *PeerId) ([]*types.Block, int, error) {
headers, headerSize, err := f.FetchHeaders(ctx, start, end, peerId)
if err != nil {
return nil, err
return nil, 0, err
}

bodies, err := f.FetchBodies(ctx, headers, peerId)
bodies, bodiesSize, err := f.FetchBodies(ctx, headers, peerId)
if err != nil {
return nil, err
return nil, 0, err
}

blocks := make([]*types.Block, len(headers))
for i, header := range headers {
blocks[i] = types.NewBlockFromNetwork(header, bodies[i])
}

return blocks, nil
return blocks, headerSize + bodiesSize, nil
}

func (f *fetcher) fetchHeaders(ctx context.Context, start, end uint64, peerId *PeerId) ([]*types.Header, error) {
func (f *fetcher) fetchHeaders(ctx context.Context, start, end uint64, peerId *PeerId) (*FetcherResponse[*types.Header], error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

Expand Down Expand Up @@ -195,12 +208,12 @@ func (f *fetcher) fetchHeaders(ctx context.Context, start, end uint64, peerId *P
return nil, err
}

message, err := awaitResponse(ctx, f.config.responseTimeout, messages, filterBlockHeaders(peerId, requestId))
message, messageSize, err := awaitResponse(ctx, f.config.responseTimeout, messages, filterBlockHeaders(peerId, requestId))
if err != nil {
return nil, err
}

return message.BlockHeadersPacket, nil
return &FetcherResponse[*types.Header]{message.BlockHeadersPacket, messageSize}, nil
}

func (f *fetcher) validateHeadersResponse(headers []*types.Header, start, amount uint64) error {
Expand Down Expand Up @@ -234,7 +247,7 @@ func (f *fetcher) validateHeadersResponse(headers []*types.Header, start, amount
return nil
}

func (f *fetcher) fetchBodies(ctx context.Context, headers []*types.Header, peerId *PeerId) ([]*types.Body, error) {
func (f *fetcher) fetchBodies(ctx context.Context, headers []*types.Header, peerId *PeerId) (*FetcherResponse[*types.Body], error) {
// cleanup for the chan message observer
ctx, cancel := context.WithCancel(ctx)
defer cancel()
Expand Down Expand Up @@ -266,7 +279,7 @@ func (f *fetcher) fetchBodies(ctx context.Context, headers []*types.Header, peer
return nil, err
}

message, err := awaitResponse(ctx, f.config.responseTimeout, messages, filterBlockBodies(peerId, requestId))
message, messageSize, err := awaitResponse(ctx, f.config.responseTimeout, messages, filterBlockBodies(peerId, requestId))
if err != nil {
return nil, err
}
Expand All @@ -275,7 +288,7 @@ func (f *fetcher) fetchBodies(ctx context.Context, headers []*types.Header, peer
return nil, err
}

return message.BlockBodiesPacket, nil
return &FetcherResponse[*types.Body]{message.BlockBodiesPacket, messageSize}, nil
}

func (f *fetcher) validateBodies(bodies []*types.Body, headers []*types.Header) error {
Expand Down Expand Up @@ -318,21 +331,21 @@ func awaitResponse[TPacket any](
timeout time.Duration,
messages chan *DecodedInboundMessage[TPacket],
filter func(*DecodedInboundMessage[TPacket]) bool,
) (TPacket, error) {
) (TPacket, int, error) {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

for {
select {
case <-ctx.Done():
var nilPacket TPacket
return nilPacket, fmt.Errorf("await %v response interrupted: %w", reflect.TypeOf(nilPacket), ctx.Err())
return nilPacket, 0, fmt.Errorf("await %v response interrupted: %w", reflect.TypeOf(nilPacket), ctx.Err())
case message := <-messages:
if filter(message) {
continue
}

return message.Decoded, nil
return message.Decoded, len(message.Data), nil
}
}
}
Expand Down
20 changes: 10 additions & 10 deletions polygon/p2p/fetcher_base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestFetcherFetchHeaders(t *testing.T) {
test := newFetcherTest(t, newMockRequestGenerator(requestId))
test.mockSentryStreams(mockRequestResponse)
test.run(func(ctx context.Context, t *testing.T) {
headers, err := test.fetcher.FetchHeaders(ctx, 1, 3, peerId)
headers, _, err := test.fetcher.FetchHeaders(ctx, 1, 3, peerId)
require.NoError(t, err)
require.Len(t, headers, 2)
require.Equal(t, uint64(1), headers[0].Number.Uint64())
Expand Down Expand Up @@ -102,7 +102,7 @@ func TestFetcherFetchHeadersWithChunking(t *testing.T) {
test := newFetcherTest(t, newMockRequestGenerator(requestId1, requestId2))
test.mockSentryStreams(mockRequestResponse1, mockRequestResponse2)
test.run(func(ctx context.Context, t *testing.T) {
headers, err := test.fetcher.FetchHeaders(ctx, 1, 2000, peerId)
headers, _, err := test.fetcher.FetchHeaders(ctx, 1, 2000, peerId)
require.NoError(t, err)
require.Len(t, headers, 1999)
require.Equal(t, uint64(1), headers[0].Number.Uint64())
Expand Down Expand Up @@ -154,7 +154,7 @@ func TestFetcherFetchHeadersResponseTimeout(t *testing.T) {
test := newFetcherTest(t, newMockRequestGenerator(requestId1, requestId2))
test.mockSentryStreams(mockRequestResponse1, mockRequestResponse2)
test.run(func(ctx context.Context, t *testing.T) {
headers, err := test.fetcher.FetchHeaders(ctx, 1, 11, peerId)
headers, _, err := test.fetcher.FetchHeaders(ctx, 1, 11, peerId)
require.ErrorIs(t, err, context.DeadlineExceeded)
require.Nil(t, headers)
})
Expand Down Expand Up @@ -219,7 +219,7 @@ func TestFetcherFetchHeadersResponseTimeoutRetrySuccess(t *testing.T) {
test := newFetcherTest(t, newMockRequestGenerator(requestId1, requestId2, requestId3))
test.mockSentryStreams(mockRequestResponse1, mockRequestResponse2, mockRequestResponse3)
test.run(func(ctx context.Context, t *testing.T) {
headers, err := test.fetcher.FetchHeaders(ctx, 1, 2000, peerId)
headers, _, err := test.fetcher.FetchHeaders(ctx, 1, 2000, peerId)
require.NoError(t, err)
require.Len(t, headers, 1999)
require.Equal(t, uint64(1), headers[0].Number.Uint64())
Expand All @@ -233,7 +233,7 @@ func TestFetcherErrInvalidFetchHeadersRange(t *testing.T) {
test := newFetcherTest(t, newMockRequestGenerator(1))
test.mockSentryStreams()
test.run(func(ctx context.Context, t *testing.T) {
headers, err := test.fetcher.FetchHeaders(ctx, 3, 1, PeerIdFromUint64(1))
headers, _, err := test.fetcher.FetchHeaders(ctx, 3, 1, PeerIdFromUint64(1))
var errInvalidFetchHeadersRange *ErrInvalidFetchHeadersRange
require.ErrorAs(t, err, &errInvalidFetchHeadersRange)
require.Equal(t, uint64(3), errInvalidFetchHeadersRange.start)
Expand Down Expand Up @@ -281,7 +281,7 @@ func TestFetcherFetchHeadersErrIncompleteResponse(t *testing.T) {
test.mockSentryStreams(mockRequestResponse1, mockRequestResponse2)
test.run(func(ctx context.Context, t *testing.T) {
var errIncompleteHeaders *ErrIncompleteHeaders
headers, err := test.fetcher.FetchHeaders(ctx, 1, 4, peerId)
headers, _, err := test.fetcher.FetchHeaders(ctx, 1, 4, peerId)
require.ErrorAs(t, err, &errIncompleteHeaders)
require.Equal(t, uint64(3), errIncompleteHeaders.LowestMissingBlockNum())
require.Nil(t, headers)
Expand Down Expand Up @@ -363,7 +363,7 @@ func TestFetcherFetchBodies(t *testing.T) {
test := newFetcherTest(t, newMockRequestGenerator(requestId1, requestId2))
test.mockSentryStreams(mockRequestResponse1, mockRequestResponse2)
test.run(func(ctx context.Context, t *testing.T) {
bodies, err := test.fetcher.FetchBodies(ctx, mockHeaders, peerId)
bodies, _, err := test.fetcher.FetchBodies(ctx, mockHeaders, peerId)
require.NoError(t, err)
require.Len(t, bodies, 2)
})
Expand Down Expand Up @@ -402,7 +402,7 @@ func TestFetcherFetchBodiesResponseTimeout(t *testing.T) {
test := newFetcherTest(t, newMockRequestGenerator(requestId1, requestId2))
test.mockSentryStreams(mockRequestResponse1, mockRequestResponse2)
test.run(func(ctx context.Context, t *testing.T) {
bodies, err := test.fetcher.FetchBodies(ctx, mockHeaders, peerId)
bodies, _, err := test.fetcher.FetchBodies(ctx, mockHeaders, peerId)
require.ErrorIs(t, err, context.DeadlineExceeded)
require.Nil(t, bodies)
})
Expand Down Expand Up @@ -461,7 +461,7 @@ func TestFetcherFetchBodiesResponseTimeoutRetrySuccess(t *testing.T) {
test := newFetcherTest(t, newMockRequestGenerator(requestId1, requestId2))
test.mockSentryStreams(mockRequestResponse1, mockRequestResponse2)
test.run(func(ctx context.Context, t *testing.T) {
bodies, err := test.fetcher.FetchBodies(ctx, mockHeaders, peerId)
bodies, _, err := test.fetcher.FetchBodies(ctx, mockHeaders, peerId)
require.NoError(t, err)
require.Len(t, bodies, 1)
})
Expand Down Expand Up @@ -492,7 +492,7 @@ func TestFetcherFetchBodiesErrMissingBodies(t *testing.T) {
test.mockSentryStreams(mockRequestResponse)
test.run(func(ctx context.Context, t *testing.T) {
var errMissingBlocks *ErrMissingBodies
bodies, err := test.fetcher.FetchBodies(ctx, mockHeaders, peerId)
bodies, _, err := test.fetcher.FetchBodies(ctx, mockHeaders, peerId)
require.ErrorAs(t, err, &errMissingBlocks)
lowest, exists := errMissingBlocks.LowestMissingBlockNum()
require.Equal(t, uint64(1), lowest)
Expand Down
16 changes: 8 additions & 8 deletions polygon/p2p/fetcher_penalizing.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,22 @@ type penalizingFetcher struct {
peerPenalizer PeerPenalizer
}

func (pf *penalizingFetcher) FetchHeaders(ctx context.Context, start uint64, end uint64, peerId *PeerId) ([]*types.Header, error) {
headers, err := pf.Fetcher.FetchHeaders(ctx, start, end, peerId)
func (pf *penalizingFetcher) FetchHeaders(ctx context.Context, start uint64, end uint64, peerId *PeerId) ([]*types.Header, int, error) {
headers, size, err := pf.Fetcher.FetchHeaders(ctx, start, end, peerId)
if err != nil {
return nil, pf.maybePenalize(ctx, peerId, err, &ErrTooManyHeaders{}, &ErrNonSequentialHeaderNumbers{})
return nil, 0, pf.maybePenalize(ctx, peerId, err, &ErrTooManyHeaders{}, &ErrNonSequentialHeaderNumbers{})
}

return headers, nil
return headers, size, nil
}

func (pf *penalizingFetcher) FetchBodies(ctx context.Context, headers []*types.Header, peerId *PeerId) ([]*types.Body, error) {
bodies, err := pf.Fetcher.FetchBodies(ctx, headers, peerId)
func (pf *penalizingFetcher) FetchBodies(ctx context.Context, headers []*types.Header, peerId *PeerId) ([]*types.Body, int, error) {
bodies, size, err := pf.Fetcher.FetchBodies(ctx, headers, peerId)
if err != nil {
return nil, pf.maybePenalize(ctx, peerId, err, &ErrTooManyBodies{})
return nil, 0, pf.maybePenalize(ctx, peerId, err, &ErrTooManyBodies{})
}

return bodies, nil
return bodies, size, nil
}

func (pf *penalizingFetcher) maybePenalize(ctx context.Context, peerId *PeerId, err error, penalizeErrs ...error) error {
Expand Down
8 changes: 4 additions & 4 deletions polygon/p2p/fetcher_penalizing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestPenalizingFetcherFetchHeadersShouldPenalizePeerWhenErrTooManyHeaders(t
mockExpectPenalizePeer(t, test.sentryClient, peerId)
test.run(func(ctx context.Context, t *testing.T) {
var errTooManyHeaders *ErrTooManyHeaders
headers, err := test.penalizingFetcher.FetchHeaders(ctx, 1, 3, peerId)
headers, _, err := test.penalizingFetcher.FetchHeaders(ctx, 1, 3, peerId)
require.ErrorAs(t, err, &errTooManyHeaders)
require.Equal(t, 2, errTooManyHeaders.requested)
require.Equal(t, 5, errTooManyHeaders.received)
Expand Down Expand Up @@ -78,7 +78,7 @@ func TestPenalizingFetcherFetchHeadersShouldPenalizePeerWhenErrNonSequentialHead
mockExpectPenalizePeer(t, test.sentryClient, peerId)
test.run(func(ctx context.Context, t *testing.T) {
var errNonSequentialHeaderNumbers *ErrNonSequentialHeaderNumbers
headers, err := test.penalizingFetcher.FetchHeaders(ctx, 1, 4, peerId)
headers, _, err := test.penalizingFetcher.FetchHeaders(ctx, 1, 4, peerId)
require.ErrorAs(t, err, &errNonSequentialHeaderNumbers)
require.Equal(t, uint64(3), errNonSequentialHeaderNumbers.current)
require.Equal(t, uint64(2), errNonSequentialHeaderNumbers.expected)
Expand Down Expand Up @@ -115,7 +115,7 @@ func TestPenalizingFetcherFetchHeadersShouldPenalizePeerWhenIncorrectOrigin(t *t
mockExpectPenalizePeer(t, test.sentryClient, peerId)
test.run(func(ctx context.Context, t *testing.T) {
var errNonSequentialHeaderNumbers *ErrNonSequentialHeaderNumbers
headers, err := test.penalizingFetcher.FetchHeaders(ctx, 1, 3, peerId)
headers, _, err := test.penalizingFetcher.FetchHeaders(ctx, 1, 3, peerId)
require.ErrorAs(t, err, &errNonSequentialHeaderNumbers)
require.Equal(t, uint64(2), errNonSequentialHeaderNumbers.current)
require.Equal(t, uint64(1), errNonSequentialHeaderNumbers.expected)
Expand Down Expand Up @@ -150,7 +150,7 @@ func TestPenalizingFetcherFetchBodiesShouldPenalizePeerWhenErrTooManyBodies(t *t
mockExpectPenalizePeer(t, test.sentryClient, peerId)
test.run(func(ctx context.Context, t *testing.T) {
var errTooManyBodies *ErrTooManyBodies
bodies, err := test.penalizingFetcher.FetchBodies(ctx, headers, peerId)
bodies, _, err := test.penalizingFetcher.FetchBodies(ctx, headers, peerId)
require.ErrorAs(t, err, &errTooManyBodies)
require.Equal(t, 1, errTooManyBodies.requested)
require.Equal(t, 2, errTooManyBodies.received)
Expand Down
Loading
Loading