Skip to content

Commit

Permalink
polygon/p2p: Add blk/s and bytes/s to periodic log (#9976)
Browse files Browse the repository at this point in the history
  • Loading branch information
shohamc1 authored Apr 27, 2024
1 parent 7ac8b10 commit d450357
Show file tree
Hide file tree
Showing 10 changed files with 157 additions and 105 deletions.
102 changes: 63 additions & 39 deletions polygon/p2p/fetcher_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ type FetcherConfig struct {

type Fetcher interface {
// FetchHeaders fetches [start,end) headers from a peer. Blocks until data is received.
FetchHeaders(ctx context.Context, start uint64, end uint64, peerId *PeerId) ([]*types.Header, error)
FetchHeaders(ctx context.Context, start uint64, end uint64, peerId *PeerId) (FetcherResponse[[]*types.Header], error)
// FetchBodies fetches block bodies for the given headers from a peer. Blocks until data is received.
FetchBodies(ctx context.Context, headers []*types.Header, peerId *PeerId) ([]*types.Body, error)
FetchBodies(ctx context.Context, headers []*types.Header, peerId *PeerId) (FetcherResponse[[]*types.Body], error)
// FetchBlocks fetches headers and bodies for a given [start, end) range from a peer and
// assembles them into blocks. Blocks until data is received.
FetchBlocks(ctx context.Context, start uint64, end uint64, peerId *PeerId) ([]*types.Block, error)
FetchBlocks(ctx context.Context, start uint64, end uint64, peerId *PeerId) (FetcherResponse[[]*types.Block], error)
}

func NewFetcher(
Expand Down Expand Up @@ -63,9 +63,14 @@ type fetcher struct {
requestIdGenerator RequestIdGenerator
}

func (f *fetcher) FetchHeaders(ctx context.Context, start uint64, end uint64, peerId *PeerId) ([]*types.Header, error) {
type FetcherResponse[T any] struct {
Data T
TotalSize int
}

func (f *fetcher) FetchHeaders(ctx context.Context, start uint64, end uint64, peerId *PeerId) (FetcherResponse[[]*types.Header], error) {
if start >= end {
return nil, &ErrInvalidFetchHeadersRange{
return FetcherResponse[[]*types.Header]{}, &ErrInvalidFetchHeadersRange{
start: start,
end: end,
}
Expand All @@ -82,6 +87,7 @@ func (f *fetcher) FetchHeaders(ctx context.Context, start uint64, end uint64, pe
if amount%eth.MaxHeadersServe > 0 {
numChunks++
}
totalHeadersSize := 0

headers := make([]*types.Header, 0, amount)
for chunkNum := uint64(0); chunkNum < numChunks; chunkNum++ {
Expand All @@ -91,30 +97,35 @@ func (f *fetcher) FetchHeaders(ctx context.Context, start uint64, end uint64, pe
// a node may not respond with all MaxHeadersServe in 1 response,
// so we keep on consuming from last received number (akin to consuming a paging api)
// until we have all headers of the chunk or the peer stopped returning headers
headersChunk, err := fetchWithRetry(f.config, func() ([]*types.Header, error) {
headersChunk, err := fetchWithRetry(f.config, func() (FetcherResponse[[]*types.Header], error) {
return f.fetchHeaders(ctx, chunkStart, chunkEnd, peerId)
})
if err != nil {
return nil, err
return FetcherResponse[[]*types.Header]{}, err
}
if len(headersChunk) == 0 {
if len(headersChunk.Data) == 0 {
break
}

headers = append(headers, headersChunk...)
chunkStart += uint64(len(headersChunk))
headers = append(headers, headersChunk.Data...)
chunkStart += uint64(len(headersChunk.Data))
totalHeadersSize += headersChunk.TotalSize
}
}

if err := f.validateHeadersResponse(headers, start, amount); err != nil {
return nil, err
return FetcherResponse[[]*types.Header]{}, err
}

return headers, nil
return FetcherResponse[[]*types.Header]{
Data: headers,
TotalSize: totalHeadersSize,
}, nil
}

func (f *fetcher) FetchBodies(ctx context.Context, headers []*types.Header, peerId *PeerId) ([]*types.Body, error) {
func (f *fetcher) FetchBodies(ctx context.Context, headers []*types.Header, peerId *PeerId) (FetcherResponse[[]*types.Body], error) {
var bodies []*types.Body
totalBodiesSize := 0

for len(headers) > 0 {
// Note: we always request MaxBodiesServe for optimal response sizes (fully utilising the 2 MB soft limit).
Expand All @@ -128,43 +139,50 @@ func (f *fetcher) FetchBodies(ctx context.Context, headers []*types.Header, peer
headersChunk = headers
}

bodiesChunk, err := fetchWithRetry(f.config, func() ([]*types.Body, error) {
bodiesChunk, err := fetchWithRetry(f.config, func() (*FetcherResponse[[]*types.Body], error) {
return f.fetchBodies(ctx, headersChunk, peerId)
})
if err != nil {
return nil, err
return FetcherResponse[[]*types.Body]{}, err
}
if len(bodiesChunk) == 0 {
return nil, NewErrMissingBodies(headers)
if len(bodiesChunk.Data) == 0 {
return FetcherResponse[[]*types.Body]{}, NewErrMissingBodies(headers)
}

bodies = append(bodies, bodiesChunk...)
headers = headers[len(bodiesChunk):]
bodies = append(bodies, bodiesChunk.Data...)
headers = headers[len(bodiesChunk.Data):]
totalBodiesSize += bodiesChunk.TotalSize
}

return bodies, nil
return FetcherResponse[[]*types.Body]{
Data: bodies,
TotalSize: totalBodiesSize,
}, nil
}

func (f *fetcher) FetchBlocks(ctx context.Context, start, end uint64, peerId *PeerId) ([]*types.Block, error) {
func (f *fetcher) FetchBlocks(ctx context.Context, start, end uint64, peerId *PeerId) (FetcherResponse[[]*types.Block], error) {
headers, err := f.FetchHeaders(ctx, start, end, peerId)
if err != nil {
return nil, err
return FetcherResponse[[]*types.Block]{}, err
}

bodies, err := f.FetchBodies(ctx, headers, peerId)
bodies, err := f.FetchBodies(ctx, headers.Data, peerId)
if err != nil {
return nil, err
return FetcherResponse[[]*types.Block]{}, err
}

blocks := make([]*types.Block, len(headers))
for i, header := range headers {
blocks[i] = types.NewBlockFromNetwork(header, bodies[i])
blocks := make([]*types.Block, len(headers.Data))
for i, header := range headers.Data {
blocks[i] = types.NewBlockFromNetwork(header, bodies.Data[i])
}

return blocks, nil
return FetcherResponse[[]*types.Block]{
Data: blocks,
TotalSize: headers.TotalSize + bodies.TotalSize,
}, nil
}

func (f *fetcher) fetchHeaders(ctx context.Context, start, end uint64, peerId *PeerId) ([]*types.Header, error) {
func (f *fetcher) fetchHeaders(ctx context.Context, start, end uint64, peerId *PeerId) (FetcherResponse[[]*types.Header], error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

Expand Down Expand Up @@ -192,15 +210,18 @@ func (f *fetcher) fetchHeaders(ctx context.Context, start, end uint64, peerId *P
},
})
if err != nil {
return nil, err
return FetcherResponse[[]*types.Header]{}, err
}

message, err := awaitResponse(ctx, f.config.responseTimeout, messages, filterBlockHeaders(peerId, requestId))
message, messageSize, err := awaitResponse(ctx, f.config.responseTimeout, messages, filterBlockHeaders(peerId, requestId))
if err != nil {
return nil, err
return FetcherResponse[[]*types.Header]{}, err
}

return message.BlockHeadersPacket, nil
return FetcherResponse[[]*types.Header]{
Data: message.BlockHeadersPacket,
TotalSize: messageSize,
}, nil
}

func (f *fetcher) validateHeadersResponse(headers []*types.Header, start, amount uint64) error {
Expand Down Expand Up @@ -234,7 +255,7 @@ func (f *fetcher) validateHeadersResponse(headers []*types.Header, start, amount
return nil
}

func (f *fetcher) fetchBodies(ctx context.Context, headers []*types.Header, peerId *PeerId) ([]*types.Body, error) {
func (f *fetcher) fetchBodies(ctx context.Context, headers []*types.Header, peerId *PeerId) (*FetcherResponse[[]*types.Body], error) {
// cleanup for the chan message observer
ctx, cancel := context.WithCancel(ctx)
defer cancel()
Expand Down Expand Up @@ -266,7 +287,7 @@ func (f *fetcher) fetchBodies(ctx context.Context, headers []*types.Header, peer
return nil, err
}

message, err := awaitResponse(ctx, f.config.responseTimeout, messages, filterBlockBodies(peerId, requestId))
message, messageSize, err := awaitResponse(ctx, f.config.responseTimeout, messages, filterBlockBodies(peerId, requestId))
if err != nil {
return nil, err
}
Expand All @@ -275,7 +296,10 @@ func (f *fetcher) fetchBodies(ctx context.Context, headers []*types.Header, peer
return nil, err
}

return message.BlockBodiesPacket, nil
return &FetcherResponse[[]*types.Body]{
Data: message.BlockBodiesPacket,
TotalSize: messageSize,
}, nil
}

func (f *fetcher) validateBodies(bodies []*types.Body, headers []*types.Header) error {
Expand Down Expand Up @@ -318,21 +342,21 @@ func awaitResponse[TPacket any](
timeout time.Duration,
messages chan *DecodedInboundMessage[TPacket],
filter func(*DecodedInboundMessage[TPacket]) bool,
) (TPacket, error) {
) (TPacket, int, error) {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

for {
select {
case <-ctx.Done():
var nilPacket TPacket
return nilPacket, fmt.Errorf("await %v response interrupted: %w", reflect.TypeOf(nilPacket), ctx.Err())
return nilPacket, 0, fmt.Errorf("await %v response interrupted: %w", reflect.TypeOf(nilPacket), ctx.Err())
case message := <-messages:
if filter(message) {
continue
}

return message.Decoded, nil
return message.Decoded, len(message.Data), nil
}
}
}
Expand Down
35 changes: 19 additions & 16 deletions polygon/p2p/fetcher_base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,11 @@ func TestFetcherFetchHeaders(t *testing.T) {
test.mockSentryStreams(mockRequestResponse)
test.run(func(ctx context.Context, t *testing.T) {
headers, err := test.fetcher.FetchHeaders(ctx, 1, 3, peerId)
headersData := headers.Data
require.NoError(t, err)
require.Len(t, headers, 2)
require.Equal(t, uint64(1), headers[0].Number.Uint64())
require.Equal(t, uint64(2), headers[1].Number.Uint64())
require.Len(t, headersData, 2)
require.Equal(t, uint64(1), headersData[0].Number.Uint64())
require.Equal(t, uint64(2), headersData[1].Number.Uint64())
})
}

Expand Down Expand Up @@ -103,10 +104,11 @@ func TestFetcherFetchHeadersWithChunking(t *testing.T) {
test.mockSentryStreams(mockRequestResponse1, mockRequestResponse2)
test.run(func(ctx context.Context, t *testing.T) {
headers, err := test.fetcher.FetchHeaders(ctx, 1, 2000, peerId)
headersData := headers.Data
require.NoError(t, err)
require.Len(t, headers, 1999)
require.Equal(t, uint64(1), headers[0].Number.Uint64())
require.Equal(t, uint64(1999), headers[len(headers)-1].Number.Uint64())
require.Len(t, headersData, 1999)
require.Equal(t, uint64(1), headersData[0].Number.Uint64())
require.Equal(t, uint64(1999), headersData[len(headersData)-1].Number.Uint64())
})
}

Expand Down Expand Up @@ -156,7 +158,7 @@ func TestFetcherFetchHeadersResponseTimeout(t *testing.T) {
test.run(func(ctx context.Context, t *testing.T) {
headers, err := test.fetcher.FetchHeaders(ctx, 1, 11, peerId)
require.ErrorIs(t, err, context.DeadlineExceeded)
require.Nil(t, headers)
require.Nil(t, headers.Data)
})
}

Expand Down Expand Up @@ -220,10 +222,11 @@ func TestFetcherFetchHeadersResponseTimeoutRetrySuccess(t *testing.T) {
test.mockSentryStreams(mockRequestResponse1, mockRequestResponse2, mockRequestResponse3)
test.run(func(ctx context.Context, t *testing.T) {
headers, err := test.fetcher.FetchHeaders(ctx, 1, 2000, peerId)
headersData := headers.Data
require.NoError(t, err)
require.Len(t, headers, 1999)
require.Equal(t, uint64(1), headers[0].Number.Uint64())
require.Equal(t, uint64(1999), headers[len(headers)-1].Number.Uint64())
require.Len(t, headersData, 1999)
require.Equal(t, uint64(1), headersData[0].Number.Uint64())
require.Equal(t, uint64(1999), headersData[len(headersData)-1].Number.Uint64())
})
}

Expand All @@ -238,7 +241,7 @@ func TestFetcherErrInvalidFetchHeadersRange(t *testing.T) {
require.ErrorAs(t, err, &errInvalidFetchHeadersRange)
require.Equal(t, uint64(3), errInvalidFetchHeadersRange.start)
require.Equal(t, uint64(1), errInvalidFetchHeadersRange.end)
require.Nil(t, headers)
require.Nil(t, headers.Data)
})
}

Expand Down Expand Up @@ -284,7 +287,7 @@ func TestFetcherFetchHeadersErrIncompleteResponse(t *testing.T) {
headers, err := test.fetcher.FetchHeaders(ctx, 1, 4, peerId)
require.ErrorAs(t, err, &errIncompleteHeaders)
require.Equal(t, uint64(3), errIncompleteHeaders.LowestMissingBlockNum())
require.Nil(t, headers)
require.Nil(t, headers.Data)
})
}

Expand Down Expand Up @@ -365,7 +368,7 @@ func TestFetcherFetchBodies(t *testing.T) {
test.run(func(ctx context.Context, t *testing.T) {
bodies, err := test.fetcher.FetchBodies(ctx, mockHeaders, peerId)
require.NoError(t, err)
require.Len(t, bodies, 2)
require.Len(t, bodies.Data, 2)
})
}

Expand Down Expand Up @@ -404,7 +407,7 @@ func TestFetcherFetchBodiesResponseTimeout(t *testing.T) {
test.run(func(ctx context.Context, t *testing.T) {
bodies, err := test.fetcher.FetchBodies(ctx, mockHeaders, peerId)
require.ErrorIs(t, err, context.DeadlineExceeded)
require.Nil(t, bodies)
require.Nil(t, bodies.Data)
})
}

Expand Down Expand Up @@ -463,7 +466,7 @@ func TestFetcherFetchBodiesResponseTimeoutRetrySuccess(t *testing.T) {
test.run(func(ctx context.Context, t *testing.T) {
bodies, err := test.fetcher.FetchBodies(ctx, mockHeaders, peerId)
require.NoError(t, err)
require.Len(t, bodies, 1)
require.Len(t, bodies.Data, 1)
})
}

Expand Down Expand Up @@ -497,7 +500,7 @@ func TestFetcherFetchBodiesErrMissingBodies(t *testing.T) {
lowest, exists := errMissingBlocks.LowestMissingBlockNum()
require.Equal(t, uint64(1), lowest)
require.True(t, exists)
require.Nil(t, bodies)
require.Nil(t, bodies.Data)
})
}

Expand Down
8 changes: 4 additions & 4 deletions polygon/p2p/fetcher_penalizing.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,19 @@ type penalizingFetcher struct {
peerPenalizer PeerPenalizer
}

func (pf *penalizingFetcher) FetchHeaders(ctx context.Context, start uint64, end uint64, peerId *PeerId) ([]*types.Header, error) {
func (pf *penalizingFetcher) FetchHeaders(ctx context.Context, start uint64, end uint64, peerId *PeerId) (FetcherResponse[[]*types.Header], error) {
headers, err := pf.Fetcher.FetchHeaders(ctx, start, end, peerId)
if err != nil {
return nil, pf.maybePenalize(ctx, peerId, err, &ErrTooManyHeaders{}, &ErrNonSequentialHeaderNumbers{})
return FetcherResponse[[]*types.Header]{}, pf.maybePenalize(ctx, peerId, err, &ErrTooManyHeaders{}, &ErrNonSequentialHeaderNumbers{})
}

return headers, nil
}

func (pf *penalizingFetcher) FetchBodies(ctx context.Context, headers []*types.Header, peerId *PeerId) ([]*types.Body, error) {
func (pf *penalizingFetcher) FetchBodies(ctx context.Context, headers []*types.Header, peerId *PeerId) (FetcherResponse[[]*types.Body], error) {
bodies, err := pf.Fetcher.FetchBodies(ctx, headers, peerId)
if err != nil {
return nil, pf.maybePenalize(ctx, peerId, err, &ErrTooManyBodies{})
return FetcherResponse[[]*types.Body]{}, pf.maybePenalize(ctx, peerId, err, &ErrTooManyBodies{})
}

return bodies, nil
Expand Down
8 changes: 4 additions & 4 deletions polygon/p2p/fetcher_penalizing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestPenalizingFetcherFetchHeadersShouldPenalizePeerWhenErrTooManyHeaders(t
require.ErrorAs(t, err, &errTooManyHeaders)
require.Equal(t, 2, errTooManyHeaders.requested)
require.Equal(t, 5, errTooManyHeaders.received)
require.Nil(t, headers)
require.Nil(t, headers.Data)
})
}

Expand Down Expand Up @@ -82,7 +82,7 @@ func TestPenalizingFetcherFetchHeadersShouldPenalizePeerWhenErrNonSequentialHead
require.ErrorAs(t, err, &errNonSequentialHeaderNumbers)
require.Equal(t, uint64(3), errNonSequentialHeaderNumbers.current)
require.Equal(t, uint64(2), errNonSequentialHeaderNumbers.expected)
require.Nil(t, headers)
require.Nil(t, headers.Data)
})
}

Expand Down Expand Up @@ -119,7 +119,7 @@ func TestPenalizingFetcherFetchHeadersShouldPenalizePeerWhenIncorrectOrigin(t *t
require.ErrorAs(t, err, &errNonSequentialHeaderNumbers)
require.Equal(t, uint64(2), errNonSequentialHeaderNumbers.current)
require.Equal(t, uint64(1), errNonSequentialHeaderNumbers.expected)
require.Nil(t, headers)
require.Nil(t, headers.Data)
})
}

Expand Down Expand Up @@ -154,7 +154,7 @@ func TestPenalizingFetcherFetchBodiesShouldPenalizePeerWhenErrTooManyBodies(t *t
require.ErrorAs(t, err, &errTooManyBodies)
require.Equal(t, 1, errTooManyBodies.requested)
require.Equal(t, 2, errTooManyBodies.received)
require.Nil(t, bodies)
require.Nil(t, bodies.Data)
})
}

Expand Down
Loading

0 comments on commit d450357

Please sign in to comment.