Skip to content

Commit

Permalink
tx-submission: debug tracer for SharedTxState
Browse files Browse the repository at this point in the history
  • Loading branch information
coot committed Jul 30, 2024
1 parent 188be50 commit 992a7de
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import Control.Concurrent.Class.MonadMVar.Strict
import Control.Concurrent.Class.MonadSTM.Strict
import Control.Monad.Class.MonadThrow
import Control.Monad.Class.MonadTimer.SI
import Control.Tracer (Tracer (..), traceWith)

import Data.Foldable (foldl', traverse_)
import Data.Map.Strict (Map)
Expand Down Expand Up @@ -84,15 +85,17 @@ withPeer
, Ord peeraddr
, Show peeraddr
)
=> TxChannelsVar m peeraddr txid tx
=> Tracer m (DebugSharedTxState peeraddr txid tx)
-> TxChannelsVar m peeraddr txid tx
-> SharedTxStateVar m peeraddr txid tx
-> TxSubmissionMempoolReader txid tx idx m
-> peeraddr
-- ^ new peer
-> (PeerTxAPI m txid tx -> m a)
-- ^ callback which gives access to `PeerTxStateAPI`
-> m a
withPeer channelsVar
withPeer tracer
channelsVar
sharedStateVar
TxSubmissionMempoolReader { mempoolGetSnapshot }
peeraddr io =
Expand Down Expand Up @@ -188,7 +191,8 @@ withPeer channelsVar
-- TODO: hide this inside `receivedTxIds` so it's run in the same STM
-- transaction.
mempoolSnapshot <- atomically mempoolGetSnapshot
receivedTxIds sharedStateVar
receivedTxIds tracer
sharedStateVar
mempoolSnapshot
peeraddr
numTxIdsToReq
Expand All @@ -202,7 +206,7 @@ withPeer channelsVar
-- ^ received txs
-> m ()
handleReceivedTxs txids txs =
collectTxs sharedStateVar peeraddr txids txs
collectTxs tracer sharedStateVar peeraddr txids txs


decisionLogicThread
Expand All @@ -213,20 +217,21 @@ decisionLogicThread
, Ord peeraddr
, Ord txid
)
=> TxDecisionPolicy
=> Tracer m (DebugSharedTxState peeraddr txid tx)
-> TxDecisionPolicy
-> StrictTVar m (Map peeraddr PeerGSV)
-> TxChannelsVar m peeraddr txid tx
-> SharedTxStateVar m peeraddr txid tx
-> m Void
decisionLogicThread policy gsvVar txChannelsVar sharedStateVar = go
decisionLogicThread tracer policy gsvVar txChannelsVar sharedStateVar = go
where
go :: m Void
go = do
-- We rate limit the decision making process, it could overwhelm the CPU
-- if there are too many inbound connections.
threadDelay 0.005 -- 5ms

decisions <- atomically do
(decisions, st) <- atomically do
sharedCtx <-
SharedDecisionContext
<$> readTVar gsvVar
Expand All @@ -238,7 +243,8 @@ decisionLogicThread policy gsvVar txChannelsVar sharedStateVar = go

let (sharedState, decisions) = makeDecisions policy sharedCtx activePeers
writeTVar sharedStateVar sharedState
return decisions
return (decisions, sharedState)
traceWith tracer (DebugSharedTxState st)
TxChannels { txChannelMap } <- readMVar txChannelsVar
traverse_
(\(mvar, d) -> modifyMVar_ mvar (\d' -> pure (d' <> d)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ module Ouroboros.Network.TxSubmission.Inbound.State
, collectTxs
, acknowledgeTxIds
, hasTxIdsToAcknowledge
-- * Debug output
, DebugSharedTxState (..)
-- * Internals, only exported for testing purposes:
, RefCountDiff (..)
, updateRefCounts
Expand All @@ -25,6 +27,7 @@ module Ouroboros.Network.TxSubmission.Inbound.State

import Control.Concurrent.Class.MonadSTM.Strict
import Control.Exception (assert)
import Control.Tracer (Tracer, traceWith)

import Data.Foldable (fold, foldl', toList)
import Data.Map.Merge.Strict qualified as Map
Expand Down Expand Up @@ -534,7 +537,8 @@ newSharedTxStateVar = newTVarIO SharedTxState { peerTxStates = Map.empty,
receivedTxIds
:: forall m peeraddr idx tx txid.
(MonadSTM m, Ord txid, Ord peeraddr)
=> SharedTxStateVar m peeraddr txid tx
=> Tracer m (DebugSharedTxState peeraddr txid tx)
-> SharedTxStateVar m peeraddr txid tx
-> MempoolSnapshot txid tx idx
-> peeraddr
-> NumTxIdsToReq
Expand All @@ -545,9 +549,10 @@ receivedTxIds
-> Map txid SizeInBytes
-- ^ received `txid`s with sizes
-> m ()
receivedTxIds sharedVar MempoolSnapshot{mempoolHasTx} peeraddr reqNo txidsSeq txidsMap =
atomically $
modifyTVar sharedVar (receivedTxIdsImpl mempoolHasTx peeraddr reqNo txidsSeq txidsMap)
receivedTxIds tracer sharedVar MempoolSnapshot{mempoolHasTx} peeraddr reqNo txidsSeq txidsMap = do
st <- atomically $
stateTVar sharedVar ((\a -> (a,a)) . receivedTxIdsImpl mempoolHasTx peeraddr reqNo txidsSeq txidsMap)
traceWith tracer (DebugSharedTxState st)


-- | Include received `tx`s in `SharedTxState`. Return number of `txids`
Expand All @@ -556,14 +561,25 @@ receivedTxIds sharedVar MempoolSnapshot{mempoolHasTx} peeraddr reqNo txidsSeq tx
collectTxs
:: forall m peeraddr tx txid.
(MonadSTM m, Ord txid, Ord peeraddr)
=> SharedTxStateVar m peeraddr txid tx
=> Tracer m (DebugSharedTxState peeraddr txid tx)
-> SharedTxStateVar m peeraddr txid tx
-> peeraddr
-> Set txid -- ^ set of requested txids
-> Map txid tx -- ^ received txs
-> m ()
-- ^ number of txids to be acknowledged and txs to be added to the
-- mempool
collectTxs sharedVar peeraddr txidsRequested txsMap =
atomically $
modifyTVar sharedVar
(collectTxsImpl peeraddr txidsRequested txsMap)
collectTxs tracer sharedVar peeraddr txidsRequested txsMap = do
st <- atomically $
stateTVar sharedVar
((\a -> (a,a)) . collectTxsImpl peeraddr txidsRequested txsMap)
traceWith tracer (DebugSharedTxState st)

--
--
--

-- | Debug tracer.
--
newtype DebugSharedTxState peeraddr txid tx = DebugSharedTxState (SharedTxState peeraddr txid tx)
deriving Show

0 comments on commit 992a7de

Please sign in to comment.