Skip to content

Commit

Permalink
tx-submission: put common types in one place
Browse files Browse the repository at this point in the history
This allows us to have just one tracer for tx-submission decision logic.
  • Loading branch information
coot committed Sep 17, 2024
1 parent aaf1b52 commit d223e7c
Show file tree
Hide file tree
Showing 6 changed files with 260 additions and 245 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ import Ouroboros.Network.NodeToNode.Version (NodeToNodeVersion)
import Ouroboros.Network.Protocol.Limits
import Ouroboros.Network.Protocol.TxSubmission2.Server
import Ouroboros.Network.Protocol.TxSubmission2.Type
import Ouroboros.Network.TxSubmission.Inbound.Types
import Ouroboros.Network.TxSubmission.Inbound.Types (ProcessedTxCount (..),
TxSubmissionMempoolWriter (..), TraceTxSubmissionInbound (..),
TxSubmissionProtocolError (..))
import Ouroboros.Network.TxSubmission.Mempool.Reader (MempoolSnapshot (..),
TxSubmissionMempoolReader (..))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,86 +34,9 @@ import Ouroboros.Network.DeltaQ (PeerGSV (..), defaultGSV,
import Ouroboros.Network.Protocol.TxSubmission2.Type
import Ouroboros.Network.TxSubmission.Inbound.Policy
import Ouroboros.Network.TxSubmission.Inbound.State
import Ouroboros.Network.TxSubmission.Inbound.Types


-- | Decision made by the decision logic. Each peer will receive a 'Decision'.
--
-- /note:/ it is rather non-standard to represent a choice between requesting
-- `txid`s and `tx`'s as a product rather than a sum type. The client will
-- need to download `tx`s first and then send a request for more txids (and
-- acknowledge some `txid`s). Due to pipelining each client will request
-- decision from the decision logic quite often (every two pipelined requests),
-- but with this design a decision once taken will make the peer non-active
-- (e.g. it won't be returned by `filterActivePeers`) for longer, and thus the
-- expensive `makeDecision` computation will not need to take that peer into
-- account.
--
data TxDecision txid tx = TxDecision {
txdTxIdsToAcknowledge :: !NumTxIdsToAck,
-- ^ txid's to acknowledge

txdTxIdsToRequest :: !NumTxIdsToReq,
-- ^ number of txid's to request

txdPipelineTxIds :: !Bool,
-- ^ the tx-submission protocol only allows to pipeline `txid`'s requests
-- if we have non-acknowledged `txid`s.

txdTxsToRequest :: !(Set txid),
-- ^ txid's to download.

txdTxsToMempool :: ![tx]
-- ^ list of `tx`s to submit to the mempool.
}
deriving (Show, Eq)

-- | A non-commutative semigroup instance.
--
-- /note:/ this instance must be consistent with `pickTxsToDownload` and how
-- `PeerTxState` is updated. It is designed to work with `TMergeVar`s.
--
instance Ord txid => Semigroup (TxDecision txid tx) where
TxDecision { txdTxIdsToAcknowledge,
txdTxIdsToRequest,
txdPipelineTxIds = _ignored,
txdTxsToRequest,
txdTxsToMempool }
<>
TxDecision { txdTxIdsToAcknowledge = txdTxIdsToAcknowledge',
txdTxIdsToRequest = txdTxIdsToRequest',
txdPipelineTxIds = txdPipelineTxIds',
txdTxsToRequest = txdTxsToRequest',
txdTxsToMempool = txdTxsToMempool' }
=
TxDecision { txdTxIdsToAcknowledge = txdTxIdsToAcknowledge + txdTxIdsToAcknowledge',
txdTxIdsToRequest = txdTxIdsToRequest + txdTxIdsToRequest',
txdPipelineTxIds = txdPipelineTxIds',
txdTxsToRequest = txdTxsToRequest <> txdTxsToRequest',
txdTxsToMempool = txdTxsToMempool ++ txdTxsToMempool'
}

-- | A no-op decision.
emptyTxDecision :: TxDecision txid tx
emptyTxDecision = TxDecision {
txdTxIdsToAcknowledge = 0,
txdTxIdsToRequest = 0,
txdPipelineTxIds = False,
txdTxsToRequest = Set.empty,
txdTxsToMempool = []
}

data SharedDecisionContext peeraddr txid tx = SharedDecisionContext {
-- TODO: check how to access it.
sdcPeerGSV :: !(Map peeraddr PeerGSV),

sdcSharedTxState :: !(SharedTxState peeraddr txid tx)
}
deriving Show

--
-- Decision Logic
--

-- | Make download decisions.
--
makeDecisions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ module Ouroboros.Network.TxSubmission.Inbound.Registry
, newTxChannelsVar
, PeerTxAPI (..)
, decisionLogicThread
, DebugTxLogic (..)
, withPeer
) where

Expand All @@ -38,6 +37,7 @@ import Ouroboros.Network.Protocol.TxSubmission2.Type
import Ouroboros.Network.TxSubmission.Inbound.Decision
import Ouroboros.Network.TxSubmission.Inbound.Policy
import Ouroboros.Network.TxSubmission.Inbound.State
import Ouroboros.Network.TxSubmission.Inbound.Types
import Ouroboros.Network.TxSubmission.Mempool.Reader

-- | Communication channels between `TxSubmission` client mini-protocol and
Expand Down Expand Up @@ -75,10 +75,6 @@ data PeerTxAPI m txid tx = PeerTxAPI {
}


data TraceDecision peeraddr txid tx =
TraceDecisions (Map peeraddr (TxDecision txid tx))
deriving (Eq, Show)

-- | A bracket function which registers / de-registers a new peer in
-- `SharedTxStateVar` and `PeerTxStateVar`s, which exposes `PeerTxStateAPI`.
-- `PeerTxStateAPI` is only safe inside the `withPeer` scope.
Expand All @@ -92,7 +88,7 @@ withPeer
, Ord peeraddr
, Show peeraddr
)
=> Tracer m (DebugSharedTxState peeraddr txid tx)
=> Tracer m (TraceTxLogic peeraddr txid tx)
-> TxChannelsVar m peeraddr txid tx
-> SharedTxStateVar m peeraddr txid tx
-> TxSubmissionMempoolReader txid tx idx m
Expand Down Expand Up @@ -213,13 +209,6 @@ withPeer tracer
collectTxs tracer sharedStateVar peeraddr txids txs


-- | TODO: reorganise modules so there's just one `Debug` tracer.
data DebugTxLogic peeraddr txid tx =
DebugTxLogicSharedTxState (SharedTxState peeraddr txid tx)
| DebugTxLogicDecisions (Map peeraddr (TxDecision txid tx))
deriving Show


decisionLogicThread
:: forall m peeraddr txid tx.
( MonadDelay m
Expand All @@ -230,7 +219,7 @@ decisionLogicThread
, Ord peeraddr
, Ord txid
)
=> Tracer m (DebugTxLogic peeraddr txid tx)
=> Tracer m (TraceTxLogic peeraddr txid tx)
-> TxDecisionPolicy
-> STM m (Map peeraddr PeerGSV)
-> TxChannelsVar m peeraddr txid tx
Expand Down Expand Up @@ -259,8 +248,8 @@ decisionLogicThread tracer policy readGSVVar txChannelsVar sharedStateVar = do
let (sharedState, decisions) = makeDecisions policy sharedCtx activePeers
writeTVar sharedStateVar sharedState
return (decisions, sharedState)
traceWith tracer (DebugTxLogicSharedTxState st)
traceWith tracer (DebugTxLogicDecisions decisions)
traceWith tracer (TraceSharedTxState "decisionLogicThread" st)
traceWith tracer (TraceTxDecisions decisions)
TxChannels { txChannelMap } <- readMVar txChannelsVar
traverse_
(\(mvar, d) -> modifyMVarWithDefault_ mvar d (\d' -> pure (d' <> d)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import Network.TypedProtocol.Pipelined

import Control.Monad (unless)
import Ouroboros.Network.Protocol.TxSubmission2.Server
import Ouroboros.Network.TxSubmission.Inbound.Decision (TxDecision (..))
import Ouroboros.Network.TxSubmission.Inbound.Registry (PeerTxAPI (..))
import Ouroboros.Network.TxSubmission.Inbound.Types

Expand Down
152 changes: 5 additions & 147 deletions ouroboros-network/src/Ouroboros/Network/TxSubmission/Inbound/State.hs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ module Ouroboros.Network.TxSubmission.Inbound.State
, receivedTxIds
, collectTxs
, acknowledgeTxIds
-- * Debug output
, DebugSharedTxState (..)
-- * Internals, only exported for testing purposes:
, RefCountDiff (..)
, updateRefCounts
Expand All @@ -37,63 +35,16 @@ import Data.Sequence.Strict (StrictSeq)
import Data.Sequence.Strict qualified as StrictSeq
import Data.Set (Set)
import Data.Set qualified as Set
import GHC.Generics (Generic)

import NoThunks.Class (NoThunks (..))

import GHC.Stack (HasCallStack)
import Ouroboros.Network.Protocol.TxSubmission2.Type (NumTxIdsToAck (..),
NumTxIdsToReq (..))
import Ouroboros.Network.SizeInBytes (SizeInBytes (..))
import Ouroboros.Network.TxSubmission.Inbound.Policy
import Ouroboros.Network.TxSubmission.Inbound.Types
import Ouroboros.Network.TxSubmission.Mempool.Reader (MempoolSnapshot (..))


data PeerTxState txid tx = PeerTxState {
-- | Those transactions (by their identifier) that the client has told
-- us about, and which we have not yet acknowledged. This is kept in
-- the order in which the client gave them to us. This is the same order
-- in which we submit them to the mempool (or for this example, the final
-- result order). It is also the order we acknowledge in.
--
unacknowledgedTxIds :: !(StrictSeq txid),

-- | Set of known transaction ids which can be requested from this peer.
--
availableTxIds :: !(Map txid SizeInBytes),

-- | The number of transaction identifiers that we have requested but
-- which have not yet been replied to. We need to track this it keep
-- our requests within the limit on the number of unacknowledged txids.
--
requestedTxIdsInflight :: !NumTxIdsToReq,

-- | The size in bytes of transactions that we have requested but which
-- have not yet been replied to. We need to track this it keep our
-- requests within the limit on the number of unacknowledged txids.
--
requestedTxsInflightSize :: !SizeInBytes,

-- | The set of requested `txid`s.
--
requestedTxsInflight :: !(Set txid),

-- | A subset of `unacknowledgedTxIds` which were unknown to the peer.
-- We need to track these `txid`s since they need to be acknowledged.
--
-- We track these `txid` per peer, rather than in `bufferedTxs` map,
-- since that could potentially lead to corrupting the node, not being
-- able to download a `tx` which is needed & available from other nodes.
--
unknownTxs :: !(Set txid)
}
deriving (Eq, Show, Generic)

instance ( NoThunks txid
, NoThunks tx
) => NoThunks (PeerTxState txid tx)


-- | Compute number of `txids` to request respecting `TxDecisionPolicy`; update
-- `PeerTxState`.
--
Expand Down Expand Up @@ -130,90 +81,6 @@ numTxIdsToRequest
unackedAndRequested = unacked + requestedTxIdsInflight
unacked = fromIntegral $ StrictSeq.length unacknowledgedTxIds


-- | Shared state of all `TxSubmission` clients.
--
-- New `txid` enters `unacknowledgedTxIds` it is also added to `availableTxIds`
-- and `referenceCounts` (see `acknowledgeTxIdsImpl`).
--
-- When a `txid` id is selected to be downloaded, it's added to
-- `requestedTxsInflightSize` (see
-- `Ouroboros.Network.TxSubmission.Inbound.Decision.pickTxsToDownload`).
--
-- When the request arrives, the `txid` is removed from `inflightTxs`. It
-- might be added to `unknownTxs` if the server didn't have that `txid`, or
-- it's added to `bufferedTxs` (see `collectTxsImpl`).
--
-- Whenever we choose `txid` to acknowledge (either in `acknowledtxsIdsImpl`,
-- `collectTxsImpl` or
-- `Ouroboros.Network.TxSubmission.Inbound.Decision.pickTxsToDownload`, we also
-- recalculate `referenceCounts` and only keep live `txid`s in other maps (e.g.
-- `availableTxIds`, `bufferedTxs`, `unknownTxs`).
--
data SharedTxState peeraddr txid tx = SharedTxState {

-- | Map of peer states.
--
-- /Invariant:/ for peeraddr's which are registered using `withPeer`,
-- there's always an entry in this map even if the set of `txid`s is
-- empty.
--
peerTxStates :: !(Map peeraddr (PeerTxState txid tx)),

-- | Set of transactions which are in-flight (have already been
-- requested) together with multiplicities (from how many peers it is
-- currently in-flight)
--
-- This set can intersect with `availableTxIds`.
--
inflightTxs :: !(Map txid Int),

-- | Overall size of all `tx`s in-flight.
--
inflightTxsSize :: !SizeInBytes,

-- | Map of `tx` which:
--
-- * were downloaded,
-- * are already in the mempool (`Nothing` is inserted in that case),
--
-- We only keep live `txid`, e.g. ones which `txid` is unacknowledged by
-- at least one peer.
--
-- /Note:/ `txid`s which `tx` were unknown by a peer are tracked
-- separately in `unknownTxs`.
--
-- /Note:/ previous implementation also needed to explicitly tracked
-- `txid`s which were already acknowledged, but are still unacknowledged.
-- In this implementation, this is done due to reference counting.
--
-- This map is useful to acknowledge `txid`s, it's basically taking the
-- longest prefix which contains entries in `bufferedTxs` or `unknownTxs`.
--
bufferedTxs :: !(Map txid (Maybe tx)),

-- | We track reference counts of all unacknowledged txids. Once the
-- count reaches 0, a tx is removed from `bufferedTxs`.
--
-- The `bufferedTx` map contains a subset of `txid` which
-- `referenceCounts` contains.
--
-- /Invariants:/
--
-- * the txid count is equal to multiplicity of txid in all
-- `unacknowledgedTxIds` sequences;
-- * @Map.keysSet bufferedTxs `Set.isSubsetOf` Map.keysSet referenceCounts@;
-- * all counts are positive integers.
--
referenceCounts :: !(Map txid Int)
}
deriving (Eq, Show, Generic)

instance ( NoThunks peeraddr
, NoThunks tx
, NoThunks txid
) => NoThunks (SharedTxState peeraddr txid tx)

--
-- Pure public API
--
Expand Down Expand Up @@ -550,7 +417,7 @@ newSharedTxStateVar = newTVarIO SharedTxState { peerTxStates = Map.empty,
receivedTxIds
:: forall m peeraddr idx tx txid.
(MonadSTM m, Ord txid, Ord peeraddr)
=> Tracer m (DebugSharedTxState peeraddr txid tx)
=> Tracer m (TraceTxLogic peeraddr txid tx)
-> SharedTxStateVar m peeraddr txid tx
-> STM m (MempoolSnapshot txid tx idx)
-> peeraddr
Expand All @@ -566,7 +433,7 @@ receivedTxIds tracer sharedVar getMempoolSnapshot peeraddr reqNo txidsSeq txidsM
st <- atomically $ do
MempoolSnapshot{mempoolHasTx} <- getMempoolSnapshot
stateTVar sharedVar ((\a -> (a,a)) . receivedTxIdsImpl mempoolHasTx peeraddr reqNo txidsSeq txidsMap)
traceWith tracer (DebugSharedTxState "receivedTxIds" st)
traceWith tracer (TraceSharedTxState "receivedTxIds" st)


-- | Include received `tx`s in `SharedTxState`. Return number of `txids`
Expand All @@ -575,7 +442,7 @@ receivedTxIds tracer sharedVar getMempoolSnapshot peeraddr reqNo txidsSeq txidsM
collectTxs
:: forall m peeraddr tx txid.
(MonadSTM m, Ord txid, Ord peeraddr)
=> Tracer m (DebugSharedTxState peeraddr txid tx)
=> Tracer m (TraceTxLogic peeraddr txid tx)
-> SharedTxStateVar m peeraddr txid tx
-> peeraddr
-> Set txid -- ^ set of requested txids
Expand All @@ -587,13 +454,4 @@ collectTxs tracer sharedVar peeraddr txidsRequested txsMap = do
st <- atomically $
stateTVar sharedVar
((\a -> (a,a)) . collectTxsImpl peeraddr txidsRequested txsMap)
traceWith tracer (DebugSharedTxState "collectTxs" st)

--
--
--

-- | Debug tracer.
--
data DebugSharedTxState peeraddr txid tx = DebugSharedTxState String (SharedTxState peeraddr txid tx)
deriving Show
traceWith tracer (TraceSharedTxState "collectTxs" st)
Loading

0 comments on commit d223e7c

Please sign in to comment.