Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: protocol debug logs using configured logger #736

Merged
merged 1 commit into from
Oct 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions protocol/blockfetch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func NewClient(protoOptions protocol.ProtocolOptions, cfg *Config) *Client {
Name: ProtocolName,
ProtocolId: ProtocolId,
Muxer: protoOptions.Muxer,
Logger: protoOptions.Logger,
ErrorChan: protoOptions.ErrorChan,
Mode: protoOptions.Mode,
Role: protocol.ProtocolRoleClient,
Expand All @@ -80,6 +81,8 @@ func NewClient(protoOptions protocol.ProtocolOptions, cfg *Config) *Client {

func (c *Client) Start() {
c.onceStart.Do(func() {
c.Protocol.Logger().
Debug(fmt.Sprintf("starting protocol: %s", ProtocolName))
c.Protocol.Start()
// Start goroutine to cleanup resources on protocol shutdown
go func() {
Expand All @@ -93,6 +96,8 @@ func (c *Client) Start() {
func (c *Client) Stop() error {
var err error
c.onceStop.Do(func() {
c.Protocol.Logger().
Debug(fmt.Sprintf("stopping protocol: %s", ProtocolName))
msg := NewMsgClientDone()
err = c.SendMessage(msg)
})
Expand All @@ -101,6 +106,8 @@ func (c *Client) Stop() error {

// GetBlockRange starts an async process to fetch all blocks in the specified range (inclusive)
func (c *Client) GetBlockRange(start common.Point, end common.Point) error {
c.Protocol.Logger().
Debug(fmt.Sprintf("client called %s GetBlockRange(start: %+v, end: %+v)", ProtocolName, start, end))
c.busyMutex.Lock()
c.blockUseCallback = true
msg := NewMsgRequestRange(start, end)
Expand All @@ -121,6 +128,8 @@ func (c *Client) GetBlockRange(start common.Point, end common.Point) error {

// GetBlock requests and returns a single block specified by the provided point
func (c *Client) GetBlock(point common.Point) (ledger.Block, error) {
c.Protocol.Logger().
Debug(fmt.Sprintf("client called %s GetBlock(point: %+v)", ProtocolName, point))
c.busyMutex.Lock()
c.blockUseCallback = false
msg := NewMsgRequestRange(point, point)
Expand All @@ -144,6 +153,8 @@ func (c *Client) GetBlock(point common.Point) (ledger.Block, error) {
}

func (c *Client) messageHandler(msg protocol.Message) error {
c.Protocol.Logger().
Debug(fmt.Sprintf("handling client message for %s", ProtocolName))
var err error
switch msg.Type() {
case MessageTypeStartBatch:
Expand All @@ -165,17 +176,23 @@ func (c *Client) messageHandler(msg protocol.Message) error {
}

func (c *Client) handleStartBatch() error {
c.Protocol.Logger().
Debug(fmt.Sprintf("handling client start batch for %s", ProtocolName))
c.startBatchResultChan <- nil
return nil
}

func (c *Client) handleNoBlocks() error {
c.Protocol.Logger().
Debug(fmt.Sprintf("handling client no blocks found for %s", ProtocolName))
err := fmt.Errorf("block(s) not found")
c.startBatchResultChan <- err
return nil
}

func (c *Client) handleBlock(msgGeneric protocol.Message) error {
c.Protocol.Logger().
Debug(fmt.Sprintf("handling client block found for %s", ProtocolName))
msg := msgGeneric.(*MsgBlock)
// Decode only enough to get the block type value
var wrappedBlock WrappedBlock
Expand All @@ -201,6 +218,8 @@ func (c *Client) handleBlock(msgGeneric protocol.Message) error {
}

func (c *Client) handleBatchDone() error {
c.Protocol.Logger().
Debug(fmt.Sprintf("handling client batch done for %s", ProtocolName))
c.busyMutex.Unlock()
return nil
}
15 changes: 15 additions & 0 deletions protocol/blockfetch/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func (s *Server) initProtocol() {
Name: ProtocolName,
ProtocolId: ProtocolId,
Muxer: s.protoOptions.Muxer,
Logger: s.protoOptions.Logger,
ErrorChan: s.protoOptions.ErrorChan,
Mode: s.protoOptions.Mode,
Role: protocol.ProtocolRoleServer,
Expand All @@ -59,16 +60,22 @@ func (s *Server) initProtocol() {
}

func (s *Server) NoBlocks() error {
s.Protocol.Logger().
Debug(fmt.Sprintf("server called %s NoBlocks()", ProtocolName))
msg := NewMsgNoBlocks()
return s.SendMessage(msg)
}

func (s *Server) StartBatch() error {
s.Protocol.Logger().
Debug(fmt.Sprintf("server called %s StartBatch()", ProtocolName))
msg := NewMsgStartBatch()
return s.SendMessage(msg)
}

func (s *Server) Block(blockType uint, blockData []byte) error {
s.Protocol.Logger().
Debug(fmt.Sprintf("server called %s Block(blockType: %+v, blockData: %x)", ProtocolName, blockType, blockData))
wrappedBlock := WrappedBlock{
Type: blockType,
RawBlock: blockData,
Expand All @@ -82,11 +89,15 @@ func (s *Server) Block(blockType uint, blockData []byte) error {
}

func (s *Server) BatchDone() error {
s.Protocol.Logger().
Debug(fmt.Sprintf("server called %s BatchDone()", ProtocolName))
msg := NewMsgBatchDone()
return s.SendMessage(msg)
}

func (s *Server) messageHandler(msg protocol.Message) error {
s.Protocol.Logger().
Debug(fmt.Sprintf("handling server message for %s", ProtocolName))
var err error
switch msg.Type() {
case MessageTypeRequestRange:
Expand All @@ -104,6 +115,8 @@ func (s *Server) messageHandler(msg protocol.Message) error {
}

func (s *Server) handleRequestRange(msg protocol.Message) error {
s.Protocol.Logger().
Debug(fmt.Sprintf("handling server request range for %s", ProtocolName))
if s.config == nil || s.config.RequestRangeFunc == nil {
return fmt.Errorf(
"received block-fetch RequestRange message but no callback function is defined",
Expand All @@ -118,6 +131,8 @@ func (s *Server) handleRequestRange(msg protocol.Message) error {
}

func (s *Server) handleClientDone() error {
s.Protocol.Logger().
Debug(fmt.Sprintf("handling server client done for %s", ProtocolName))
// Restart protocol
s.Protocol.Stop()
s.initProtocol()
Expand Down
69 changes: 46 additions & 23 deletions protocol/chainsync/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ func NewClient(
Name: ProtocolName,
ProtocolId: ProtocolId,
Muxer: protoOptions.Muxer,
Logger: protoOptions.Logger,
ErrorChan: protoOptions.ErrorChan,
Mode: protoOptions.Mode,
Role: protocol.ProtocolRoleClient,
Expand All @@ -115,6 +116,8 @@ func NewClient(

func (c *Client) Start() {
c.onceStart.Do(func() {
c.Protocol.Logger().
Debug(fmt.Sprintf("starting protocol: %s", ProtocolName))
c.Protocol.Start()
// Start goroutine to cleanup resources on protocol shutdown
go func() {
Expand All @@ -124,33 +127,12 @@ func (c *Client) Start() {
})
}

func (c *Client) messageHandler(msg protocol.Message) error {
var err error
switch msg.Type() {
case MessageTypeAwaitReply:
err = c.handleAwaitReply()
case MessageTypeRollForward:
err = c.handleRollForward(msg)
case MessageTypeRollBackward:
err = c.handleRollBackward(msg)
case MessageTypeIntersectFound:
err = c.handleIntersectFound(msg)
case MessageTypeIntersectNotFound:
err = c.handleIntersectNotFound(msg)
default:
err = fmt.Errorf(
"%s: received unexpected message type %d",
ProtocolName,
msg.Type(),
)
}
return err
}

// Stop transitions the protocol to the Done state. No more protocol operations will be possible afterward
func (c *Client) Stop() error {
var err error
c.onceStop.Do(func() {
c.Protocol.Logger().
Debug(fmt.Sprintf("stopping protocol: %s", ProtocolName))
c.busyMutex.Lock()
defer c.busyMutex.Unlock()
msg := NewMsgDone()
Expand All @@ -163,6 +145,8 @@ func (c *Client) Stop() error {

// GetCurrentTip returns the current chain tip
func (c *Client) GetCurrentTip() (*Tip, error) {
c.Protocol.Logger().
Debug(fmt.Sprintf("client called %s GetCurrentTip()", ProtocolName))
done := atomic.Bool{}
requestResultChan := make(chan Tip, 1)
requestErrorChan := make(chan error, 1)
Expand Down Expand Up @@ -220,6 +204,8 @@ func (c *Client) GetCurrentTip() (*Tip, error) {
func (c *Client) GetAvailableBlockRange(
intersectPoints []common.Point,
) (common.Point, common.Point, error) {
c.Protocol.Logger().
Debug(fmt.Sprintf("client called %s GetAvailableBlockRange(intersectPoints: %+v)", ProtocolName, intersectPoints))
c.busyMutex.Lock()
defer c.busyMutex.Unlock()

Expand Down Expand Up @@ -293,6 +279,8 @@ func (c *Client) GetAvailableBlockRange(
// Sync begins a chain-sync operation using the provided intersect point(s). Incoming blocks will be delivered
// via the RollForward callback function specified in the protocol config
func (c *Client) Sync(intersectPoints []common.Point) error {
c.Protocol.Logger().
Debug(fmt.Sprintf("client called %s Sync(intersectPoints: %+v)", ProtocolName, intersectPoints))
c.busyMutex.Lock()
defer c.busyMutex.Unlock()
// Use origin if no intersect points were specified
Expand Down Expand Up @@ -441,11 +429,40 @@ func (c *Client) requestFindIntersect(
}
}

func (c *Client) messageHandler(msg protocol.Message) error {
c.Protocol.Logger().
Debug(fmt.Sprintf("handling client message for %s", ProtocolName))
var err error
switch msg.Type() {
case MessageTypeAwaitReply:
err = c.handleAwaitReply()
case MessageTypeRollForward:
err = c.handleRollForward(msg)
case MessageTypeRollBackward:
err = c.handleRollBackward(msg)
case MessageTypeIntersectFound:
err = c.handleIntersectFound(msg)
case MessageTypeIntersectNotFound:
err = c.handleIntersectNotFound(msg)
default:
err = fmt.Errorf(
"%s: received unexpected message type %d",
ProtocolName,
msg.Type(),
)
}
return err
}

func (c *Client) handleAwaitReply() error {
c.Protocol.Logger().
Debug(fmt.Sprintf("handling client await reply for %s", ProtocolName))
return nil
}

func (c *Client) handleRollForward(msgGeneric protocol.Message) error {
c.Protocol.Logger().
Debug(fmt.Sprintf("handling client roll forward for %s", ProtocolName))
firstBlockChan := func() chan<- clientPointResult {
select {
case ch := <-c.wantFirstBlockChan:
Expand Down Expand Up @@ -554,6 +571,8 @@ func (c *Client) handleRollForward(msgGeneric protocol.Message) error {
}

func (c *Client) handleRollBackward(msg protocol.Message) error {
c.Protocol.Logger().
Debug(fmt.Sprintf("handling client roll backward for %s", ProtocolName))
msgRollBackward := msg.(*MsgRollBackward)
c.sendCurrentTip(msgRollBackward.Tip)
if len(c.wantFirstBlockChan) == 0 {
Expand All @@ -579,6 +598,8 @@ func (c *Client) handleRollBackward(msg protocol.Message) error {
}

func (c *Client) handleIntersectFound(msg protocol.Message) error {
c.Protocol.Logger().
Debug(fmt.Sprintf("handling client intersect found for %s", ProtocolName))
msgIntersectFound := msg.(*MsgIntersectFound)
c.sendCurrentTip(msgIntersectFound.Tip)

Expand All @@ -591,6 +612,8 @@ func (c *Client) handleIntersectFound(msg protocol.Message) error {
}

func (c *Client) handleIntersectNotFound(msgGeneric protocol.Message) error {
c.Protocol.Logger().
Debug(fmt.Sprintf("handling client intersect not found for %s", ProtocolName))
msgIntersectNotFound := msgGeneric.(*MsgIntersectNotFound)
c.sendCurrentTip(msgIntersectNotFound.Tip)

Expand Down
17 changes: 17 additions & 0 deletions protocol/chainsync/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func (s *Server) initProtocol() {
Name: ProtocolName,
ProtocolId: ProtocolId,
Muxer: s.protoOptions.Muxer,
Logger: s.protoOptions.Logger,
ErrorChan: s.protoOptions.ErrorChan,
Mode: s.protoOptions.Mode,
Role: protocol.ProtocolRoleServer,
Expand All @@ -77,16 +78,22 @@ func (s *Server) initProtocol() {
}

func (s *Server) RollBackward(point common.Point, tip Tip) error {
s.Protocol.Logger().
Debug(fmt.Sprintf("server called %s RollBackward(point: %+v, tip: %+v)", ProtocolName, point, tip))
msg := NewMsgRollBackward(point, tip)
return s.SendMessage(msg)
}

func (s *Server) AwaitReply() error {
s.Protocol.Logger().
Debug(fmt.Sprintf("server called %s AwaitReply()", ProtocolName))
msg := NewMsgAwaitReply()
return s.SendMessage(msg)
}

func (s *Server) RollForward(blockType uint, blockData []byte, tip Tip) error {
s.Protocol.Logger().
Debug(fmt.Sprintf("server called %s Rollforward(blockType: %+v, blockData: %x, tip: %+v)", ProtocolName, blockType, blockData, tip))
if s.Mode() == protocol.ProtocolModeNodeToNode {
eraId := ledger.BlockToBlockHeaderTypeMap[blockType]
msg := NewMsgRollForwardNtN(
Expand All @@ -107,6 +114,8 @@ func (s *Server) RollForward(blockType uint, blockData []byte, tip Tip) error {
}

func (s *Server) messageHandler(msg protocol.Message) error {
s.Protocol.Logger().
Debug(fmt.Sprintf("handling server message for %s", ProtocolName))
var err error
switch msg.Type() {
case MessageTypeRequestNext:
Expand All @@ -126,6 +135,10 @@ func (s *Server) messageHandler(msg protocol.Message) error {
}

func (s *Server) handleRequestNext() error {
// TODO: figure out why this one log message causes a panic (and only this one)
// during tests
// s.Protocol.Logger().
// Debug(fmt.Sprintf("handling server request next for %s", ProtocolName))
if s.config == nil || s.config.RequestNextFunc == nil {
return fmt.Errorf(
"received chain-sync RequestNext message but no callback function is defined",
Expand All @@ -135,6 +148,8 @@ func (s *Server) handleRequestNext() error {
}

func (s *Server) handleFindIntersect(msg protocol.Message) error {
s.Protocol.Logger().
Debug(fmt.Sprintf("handling server find intersect for %s", ProtocolName))
if s.config == nil || s.config.FindIntersectFunc == nil {
return fmt.Errorf(
"received chain-sync FindIntersect message but no callback function is defined",
Expand Down Expand Up @@ -163,6 +178,8 @@ func (s *Server) handleFindIntersect(msg protocol.Message) error {
}

func (s *Server) handleDone() error {
s.Protocol.Logger().
Debug(fmt.Sprintf("handling server done for %s", ProtocolName))
// Restart protocol
s.Protocol.Stop()
s.initProtocol()
Expand Down
Loading