diff --git a/gql/resolver_dealpublish.go b/gql/resolver_dealpublish.go index 051b294fb..e47fc4d33 100644 --- a/gql/resolver_dealpublish.go +++ b/gql/resolver_dealpublish.go @@ -8,7 +8,9 @@ import ( "github.com/filecoin-project/boost/gql/types" cborutil "github.com/filecoin-project/go-cbor-util" + "github.com/google/uuid" "github.com/graph-gophers/graphql-go" + "github.com/ipfs/go-cid" ) // basicDealResolver just has simple types (as opposed to dealResolver which @@ -32,6 +34,7 @@ type basicDealResolver struct { } type dealPublishResolver struct { + ManualPSD bool Start graphql.Time Period int32 MaxDealsPerMsg int32 @@ -151,6 +154,7 @@ func (r *resolver) DealPublish(ctx context.Context) (*dealPublishResolver, error } return &dealPublishResolver{ + ManualPSD: r.publisher.ManualPSD(), Deals: basicDeals, Period: int32(pending.PublishPeriod.Seconds()), Start: graphql.Time{Time: pending.PublishPeriodStart}, @@ -158,8 +162,39 @@ func (r *resolver) DealPublish(ctx context.Context) (*dealPublishResolver, error }, nil } -// mutation: dealPublishNow(): bool func (r *resolver) DealPublishNow(ctx context.Context) (bool, error) { r.publisher.ForcePublishPendingDeals() return true, nil } + +// mutation: publishPendingDeals([ID!]!): [ID!]! +func (r *resolver) PublishPendingDeals(ctx context.Context, args struct{ IDs []graphql.ID }) ([]graphql.ID, error) { + var pcids []cid.Cid + uuidToPcid := make(map[cid.Cid]uuid.UUID) + var ret []graphql.ID + + for _, id := range args.IDs { + dealId, err := toUuid(id) + if err != nil { + return nil, err + } + deal, err := r.dealsDB.ByID(ctx, dealId) + if err != nil { + return nil, fmt.Errorf("failed to get deal details from DB %s: %w", dealId.String(), err) + } + signedProp, err := cborutil.AsIpld(&deal.ClientDealProposal) + if err != nil { + return nil, fmt.Errorf("error in generating proposal cid for deal %s: %w", dealId.String(), err) + } + pcid := signedProp.Cid() + uuidToPcid[pcid] = dealId + pcids = append(pcids, pcid) + } + + publishedCids := r.publisher.PublishQueuedDeals(pcids) + for _, c := range publishedCids { + ret = append(ret, graphql.ID(uuidToPcid[c].String())) + } + + return ret, nil +} diff --git a/gql/schema.graphql b/gql/schema.graphql index 79b0b544e..dc31e3ef3 100644 --- a/gql/schema.graphql +++ b/gql/schema.graphql @@ -412,6 +412,7 @@ type FundsLog { } type DealPublish { + ManualPSD: Boolean! Period: Int! Start: Time! MaxDealsPerMsg: Int! @@ -609,6 +610,9 @@ type RootMutation { """Update the Storage Ask (price of doing a storage deal)""" storageAskUpdate(update: StorageAskUpdate!): Boolean! + + """Publish the deal for the supplied deal UUIDs""" + publishPendingDeals(ids: [ID!]!): [ID!]! } type RootSubscription { diff --git a/markets/storageadapter/dealpublisher.go b/markets/storageadapter/dealpublisher.go index 949b7d3f2..74d8a7d19 100644 --- a/markets/storageadapter/dealpublisher.go +++ b/markets/storageadapter/dealpublisher.go @@ -7,6 +7,7 @@ import ( "sync" "time" + cborutil "github.com/filecoin-project/go-cbor-util" "github.com/ipfs/go-cid" "go.uber.org/fx" "golang.org/x/xerrors" @@ -53,12 +54,13 @@ type DealPublisher struct { ctx context.Context Shutdown context.CancelFunc + manualPSD bool maxDealsPerPublishMsg uint64 publishPeriod time.Duration publishSpec *api.MessageSendSpec lk sync.Mutex - pending []*pendingDeal + pending map[cid.Cid]*pendingDeal cancelWaitForMoreDeals context.CancelFunc publishPeriodStart time.Time startEpochSealingBuffer abi.ChainEpoch @@ -77,12 +79,19 @@ type publishResult struct { err error } -func newPendingDeal(ctx context.Context, deal market.ClientDealProposal) *pendingDeal { +func newPendingDeal(ctx context.Context, deal market.ClientDealProposal) (*pendingDeal, cid.Cid, error) { + + // Generate unique identifier for the deal + signedProp, err := cborutil.AsIpld(&deal) + if err != nil { + return nil, cid.Undef, fmt.Errorf("failed to compute signed deal proposal ipld node: %w", err) + } + return &pendingDeal{ ctx: ctx, deal: deal, Result: make(chan publishResult), - } + }, signedProp.Cid(), nil } type PublishMsgConfig struct { @@ -94,6 +103,10 @@ type PublishMsgConfig struct { MaxDealsPerMsg uint64 // Minimum start epoch buffer to give time for sealing of sector with deal StartEpochSealingBuffer uint64 + // When set to true, the user is responsible for publishing deals manually. + // The values of MaxDealsPerMsg and Period will be ignored, and deals will + // remain in the pending state until manually published. + ManualDealPublish bool } func NewDealPublisher( @@ -133,6 +146,8 @@ func newDealPublisher( publishPeriod: publishMsgCfg.Period, startEpochSealingBuffer: abi.ChainEpoch(publishMsgCfg.StartEpochSealingBuffer), publishSpec: publishSpec, + manualPSD: publishMsgCfg.ManualDealPublish, + pending: make(map[cid.Cid]*pendingDeal), } } @@ -172,10 +187,13 @@ func (p *DealPublisher) ForcePublishPendingDeals() { } func (p *DealPublisher) Publish(ctx context.Context, deal market.ClientDealProposal) (cid.Cid, error) { - pdeal := newPendingDeal(ctx, deal) + pdeal, pcid, err := newPendingDeal(ctx, deal) + if err != nil { + return cid.Undef, fmt.Errorf("failed create pending deal: %w", err) + } // Add the deal to the queue - p.processNewDeal(pdeal) + p.processNewDeal(pdeal, pcid) // Wait for the deal to be submitted select { @@ -186,7 +204,7 @@ func (p *DealPublisher) Publish(ctx context.Context, deal market.ClientDealPropo } } -func (p *DealPublisher) processNewDeal(pdeal *pendingDeal) { +func (p *DealPublisher) processNewDeal(pdeal *pendingDeal, pcid cid.Cid) { p.lk.Lock() defer p.lk.Unlock() @@ -205,10 +223,15 @@ func (p *DealPublisher) processNewDeal(pdeal *pendingDeal) { } // Add the new deal to the queue - p.pending = append(p.pending, pdeal) + p.pending[pcid] = pdeal log.Infof("add deal with piece CID %s to publish deals queue - %d deals in queue (max queue size %d)", pdeal.deal.Proposal.PieceCID, len(p.pending), p.maxDealsPerPublishMsg) + // Return from here if manual PSD is enabled + if p.manualPSD { + return + } + // If the maximum number of deals per message has been reached or we're not batching, send a // publish message if uint64(len(p.pending)) >= p.maxDealsPerPublishMsg || p.publishPeriod == 0 { @@ -257,6 +280,7 @@ func (p *DealPublisher) waitForMoreDeals() { } func (p *DealPublisher) publishAllDeals() { + // If the timeout hasn't yet been cancelled, cancel it if p.cancelWaitForMoreDeals != nil { p.cancelWaitForMoreDeals() @@ -266,10 +290,13 @@ func (p *DealPublisher) publishAllDeals() { // Filter out any deals that have been cancelled p.filterCancelledDeals() - deals := p.pending - p.pending = nil // Send the publish message + var deals []*pendingDeal + for _, deal := range p.pending { + deals = append(deals, deal) + } + p.pending = make(map[cid.Cid]*pendingDeal) go p.publishReady(deals) } @@ -434,12 +461,38 @@ func pieceCids(deals []market.ClientDealProposal) string { // filter out deals that have been cancelled func (p *DealPublisher) filterCancelledDeals() { - filtered := p.pending[:0] - for _, pd := range p.pending { + for c, pd := range p.pending { if pd.ctx.Err() != nil { - continue + delete(p.pending, c) } - filtered = append(filtered, pd) } - p.pending = filtered +} + +func (p *DealPublisher) PublishQueuedDeals(deals []cid.Cid) []cid.Cid { + p.lk.Lock() + defer p.lk.Unlock() + var ret []cid.Cid + var toPublish []*pendingDeal + + p.filterCancelledDeals() + + for _, c := range deals { + if p.pending[c] != nil { + toPublish = append(toPublish, p.pending[c]) + ret = append(ret, c) + delete(p.pending, c) + } else { + log.Debugf("failed to find the proposal %s in pending deals", c) + } + } + + log.Infof("publishing deal proposals: %s", ret) + + // Send the publish message + go p.publishReady(toPublish) + return ret +} + +func (p *DealPublisher) ManualPSD() bool { + return p.manualPSD } diff --git a/markets/storageadapter/dealpublisher_test.go b/markets/storageadapter/dealpublisher_test.go index c3204423b..7b2b182fc 100644 --- a/markets/storageadapter/dealpublisher_test.go +++ b/markets/storageadapter/dealpublisher_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + cborutil "github.com/filecoin-project/go-cbor-util" "github.com/ipfs/go-cid" "github.com/raulk/clock" "github.com/stretchr/testify/require" @@ -232,6 +233,69 @@ func TestForcePublish(t *testing.T) { checkPublishedDeals(t, dpapi, dealsToPublish, []int{2}) } +func TestPublishPendingDeals(t *testing.T) { + dpapi := newDPAPI(t) + + // Create a deal publisher + publishPeriod := time.Hour + dp := newDealPublisher(dpapi, nil, PublishMsgConfig{ + Period: publishPeriod, + MaxDealsPerMsg: 10, + ManualDealPublish: true, + }, &api.MessageSendSpec{MaxFee: abi.NewTokenAmount(1)}) + + // Queue three deals for publishing, one with a cancelled context + // 1. Regular deal + publishDeal(t, dp, 0, false, false) + // 2. Deal with cancelled context + publishDeal(t, dp, 0, true, false) + // 3. Regular deal + publishDeal(t, dp, 0, false, false) + // 4. Regular deal + publishDeal(t, dp, 0, false, false) + + // Allow a moment for them to be queued + build.Clock.Sleep(10 * time.Millisecond) + + // Should be three deals in the pending deals list + // (deal with cancelled context is ignored) + pendingInfo := dp.PendingDeals() + require.Len(t, pendingInfo.Deals, 3) + + var pcids []cid.Cid + props := pendingInfo.Deals + for _, p := range props { + signedProp, err := cborutil.AsIpld(&p) + require.NoError(t, err) + pcids = append(pcids, signedProp.Cid()) + } + + toPublish := pcids[1:] + pending := []cid.Cid{pcids[0]} + + // Send an additional CID not present in publisher + c, err := cid.Decode("bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4") + require.NoError(t, err) + + // Publish three pending deals and verify all deals whose context has not expired have been published + publishedDeals := dp.PublishQueuedDeals(append(toPublish, c)) + require.Equal(t, toPublish, publishedDeals) + + // Should be one remaining pending deal + pendingInfo1 := dp.PendingDeals() + var ppcids []cid.Cid + require.Len(t, pendingInfo1.Deals, 1) + for _, p := range pendingInfo1.Deals { + signedProp, err := cborutil.AsIpld(&p) + require.NoError(t, err) + ppcids = append(ppcids, signedProp.Cid()) + } + require.Equal(t, pending, ppcids) + + // Make sure the expected deals were published + checkPublishedDeals(t, dpapi, props[1:], []int{2}) +} + func publishDeal(t *testing.T, dp *DealPublisher, invalid int, ctxCancelled bool, expired bool) markettypes.ClientDealProposal { ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) diff --git a/node/builder.go b/node/builder.go index 7bd6aeeb8..6c7e5ff46 100644 --- a/node/builder.go +++ b/node/builder.go @@ -629,6 +629,7 @@ func ConfigBoost(cfg *config.Boost) Option { Period: time.Duration(cfg.LotusDealmaking.PublishMsgPeriod), MaxDealsPerMsg: cfg.LotusDealmaking.MaxDealsPerPublishMsg, StartEpochSealingBuffer: cfg.LotusDealmaking.StartEpochSealingBuffer, + ManualDealPublish: cfg.Dealmaking.ManualDealPublish, })), Override(new(sealer.Unsealer), From(new(lotus_modules.MinerStorageService))), diff --git a/node/config/def.go b/node/config/def.go index d7e20f6d8..5385611b0 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -142,6 +142,7 @@ func DefaultBoost() *Boost { SealingPipelineCacheTimeout: Duration(30 * time.Second), FundsTaggingEnabled: true, EnableLegacyStorageDeals: false, + ManualDealPublish: false, BitswapPublicAddresses: []string{}, }, diff --git a/node/config/doc_gen.go b/node/config/doc_gen.go index b2fccb06f..cc5ff5746 100644 --- a/node/config/doc_gen.go +++ b/node/config/doc_gen.go @@ -426,6 +426,14 @@ for any other deal.`, Comment: `Whether to enable legacy deals on the Boost node or not. We recommend keeping them disabled. These will be completely deprecated soon.`, }, + { + Name: "ManualDealPublish", + Type: "bool", + + Comment: `When set to true, the user is responsible for publishing deals manually. +The values of MaxDealsPerPublishMsg and PublishMsgPeriod will be +ignored, and deals will remain in the pending state until manually published.`, + }, }, "FeeConfig": []DocField{ { diff --git a/node/config/types.go b/node/config/types.go index 5201f2756..49f7f111f 100644 --- a/node/config/types.go +++ b/node/config/types.go @@ -272,6 +272,11 @@ type DealmakingConfig struct { // Whether to enable legacy deals on the Boost node or not. We recommend keeping // them disabled. These will be completely deprecated soon. EnableLegacyStorageDeals bool + + // When set to true, the user is responsible for publishing deals manually. + // The values of MaxDealsPerPublishMsg and PublishMsgPeriod will be + // ignored, and deals will remain in the pending state until manually published. + ManualDealPublish bool } type ContractDealsConfig struct { diff --git a/react/src/DealPublish.js b/react/src/DealPublish.js index 98dff692a..194a1b154 100644 --- a/react/src/DealPublish.js +++ b/react/src/DealPublish.js @@ -40,6 +40,29 @@ function DealPublishContent() { var publishTime = moment(data.dealPublish.Start).add(period) var deals = data.dealPublish.Deals + + if (data.dealPublish.ManualPSD) { + return
+ {deals.length ? ( + <> +

+ {deals.length} deal{deals.length === 1 ? '' : 's'} pending to be published +

+ +
+
Publish Now
+
+ + ) : null} + +
Note: Manual deal publishing is enabled in config: deals will not be published automatically, the Storage Provider must publish deals manually
+ + { deals.length ? : ( +

There are no deals in the batch publish queue

+ ) } +
+ } + return
{deals.length ? ( <> diff --git a/react/src/gql.js b/react/src/gql.js index 2cac3b96d..0bd823464 100644 --- a/react/src/gql.js +++ b/react/src/gql.js @@ -685,6 +685,7 @@ const FundsLogsQuery = gql` const DealPublishQuery = gql` query AppDealPublishQuery { dealPublish { + ManualPSD Start Period MaxDealsPerMsg @@ -764,6 +765,12 @@ const StorageAskQuery = gql` } `; +const PublishPendingDealsMutation = gql` + mutation AppPublishPendingDealMutation($ids: [ID!]!) { + publishPendingDeals(ids: $ids) + } +`; + export { gqlClient, EpochQuery, @@ -807,4 +814,5 @@ export { SealingPipelineQuery, Libp2pAddrInfoQuery, StorageAskQuery, + PublishPendingDealsMutation, }