diff --git a/api/api.go b/api/api.go index d6d7441e0..cc067d49c 100644 --- a/api/api.go +++ b/api/api.go @@ -34,20 +34,20 @@ type Boost interface { Net // MethodGroup: Boost - BoostIndexerAnnounceAllDeals(ctx context.Context) error //perm:admin - BoostOfflineDealWithData(ctx context.Context, dealUuid uuid.UUID, filePath string) (*ProviderDealRejectionInfo, error) //perm:admin - BoostDeal(ctx context.Context, dealUuid uuid.UUID) (*smtypes.ProviderDealState, error) //perm:admin - BoostDealBySignedProposalCid(ctx context.Context, proposalCid cid.Cid) (*smtypes.ProviderDealState, error) //perm:admin - BoostDummyDeal(context.Context, smtypes.DealParams) (*ProviderDealRejectionInfo, error) //perm:admin - BoostDagstoreRegisterShard(ctx context.Context, key string) error //perm:admin - BoostDagstoreDestroyShard(ctx context.Context, key string) error //perm:admin - BoostDagstoreInitializeShard(ctx context.Context, key string) error //perm:admin - BoostDagstoreInitializeAll(ctx context.Context, params DagstoreInitializeAllParams) (<-chan DagstoreInitializeAllEvent, error) //perm:admin - BoostDagstoreRecoverShard(ctx context.Context, key string) error //perm:admin - BoostDagstoreGC(ctx context.Context) ([]DagstoreShardResult, error) //perm:admin - BoostDagstorePiecesContainingMultihash(ctx context.Context, mh multihash.Multihash) ([]cid.Cid, error) //perm:read - BoostDagstoreListShards(ctx context.Context) ([]DagstoreShardInfo, error) //perm:admin - BoostMakeDeal(context.Context, smtypes.DealParams) (*ProviderDealRejectionInfo, error) //perm:write + BoostIndexerAnnounceAllDeals(ctx context.Context) error //perm:admin + BoostOfflineDealWithData(ctx context.Context, dealUuid uuid.UUID, filePath string, delAfterImport bool) (*ProviderDealRejectionInfo, error) //perm:admin + BoostDeal(ctx context.Context, dealUuid uuid.UUID) (*smtypes.ProviderDealState, error) //perm:admin + BoostDealBySignedProposalCid(ctx context.Context, proposalCid cid.Cid) (*smtypes.ProviderDealState, error) //perm:admin + BoostDummyDeal(context.Context, smtypes.DealParams) (*ProviderDealRejectionInfo, error) //perm:admin + BoostDagstoreRegisterShard(ctx context.Context, key string) error //perm:admin + BoostDagstoreDestroyShard(ctx context.Context, key string) error //perm:admin + BoostDagstoreInitializeShard(ctx context.Context, key string) error //perm:admin + BoostDagstoreInitializeAll(ctx context.Context, params DagstoreInitializeAllParams) (<-chan DagstoreInitializeAllEvent, error) //perm:admin + BoostDagstoreRecoverShard(ctx context.Context, key string) error //perm:admin + BoostDagstoreGC(ctx context.Context) ([]DagstoreShardResult, error) //perm:admin + BoostDagstorePiecesContainingMultihash(ctx context.Context, mh multihash.Multihash) ([]cid.Cid, error) //perm:read + BoostDagstoreListShards(ctx context.Context) ([]DagstoreShardInfo, error) //perm:admin + BoostMakeDeal(context.Context, smtypes.DealParams) (*ProviderDealRejectionInfo, error) //perm:write // MethodGroup: Blockstore BlockstoreGet(ctx context.Context, c cid.Cid) ([]byte, error) //perm:read diff --git a/api/proxy_gen.go b/api/proxy_gen.go index e32b865cc..31b9e6183 100644 --- a/api/proxy_gen.go +++ b/api/proxy_gen.go @@ -70,7 +70,7 @@ type BoostStruct struct { BoostMakeDeal func(p0 context.Context, p1 smtypes.DealParams) (*ProviderDealRejectionInfo, error) `perm:"write"` - BoostOfflineDealWithData func(p0 context.Context, p1 uuid.UUID, p2 string) (*ProviderDealRejectionInfo, error) `perm:"admin"` + BoostOfflineDealWithData func(p0 context.Context, p1 uuid.UUID, p2 string, p3 bool) (*ProviderDealRejectionInfo, error) `perm:"admin"` DealsConsiderOfflineRetrievalDeals func(p0 context.Context) (bool, error) `perm:"admin"` @@ -452,14 +452,14 @@ func (s *BoostStub) BoostMakeDeal(p0 context.Context, p1 smtypes.DealParams) (*P return nil, ErrNotSupported } -func (s *BoostStruct) BoostOfflineDealWithData(p0 context.Context, p1 uuid.UUID, p2 string) (*ProviderDealRejectionInfo, error) { +func (s *BoostStruct) BoostOfflineDealWithData(p0 context.Context, p1 uuid.UUID, p2 string, p3 bool) (*ProviderDealRejectionInfo, error) { if s.Internal.BoostOfflineDealWithData == nil { return nil, ErrNotSupported } - return s.Internal.BoostOfflineDealWithData(p0, p1, p2) + return s.Internal.BoostOfflineDealWithData(p0, p1, p2, p3) } -func (s *BoostStub) BoostOfflineDealWithData(p0 context.Context, p1 uuid.UUID, p2 string) (*ProviderDealRejectionInfo, error) { +func (s *BoostStub) BoostOfflineDealWithData(p0 context.Context, p1 uuid.UUID, p2 string, p3 bool) (*ProviderDealRejectionInfo, error) { return nil, ErrNotSupported } diff --git a/build/openrpc/boost.json.gz b/build/openrpc/boost.json.gz index e35629efc..fb230f3d9 100644 Binary files a/build/openrpc/boost.json.gz and b/build/openrpc/boost.json.gz differ diff --git a/cmd/boostd/import_data.go b/cmd/boostd/import_data.go index 72c12dc85..92aed947e 100644 --- a/cmd/boostd/import_data.go +++ b/cmd/boostd/import_data.go @@ -17,6 +17,13 @@ var importDataCmd = &cli.Command{ Name: "import-data", Usage: "Import data for offline deal made with Boost", ArgsUsage: " or ", + Flags: []cli.Flag{ + &cli.BoolFlag{ + Name: "delete-after-import", + Usage: "whether to delete the data for the offline deal after the deal has been added to a sector", + Value: false, + }, + }, Action: func(cctx *cli.Context) error { if cctx.Args().Len() < 2 { return fmt.Errorf("must specify proposal CID / deal UUID and file path") @@ -58,7 +65,12 @@ var importDataCmd = &cli.Command{ defer closer() // If the user has supplied a signed proposal cid + deleteAfterImport := cctx.Bool("delete-after-import") if proposalCid != nil { + if deleteAfterImport { + return fmt.Errorf("legacy deal data cannot be automatically deleted after import (only new deals)") + } + // Look up the deal in the boost database deal, err := napi.BoostDealBySignedProposalCid(cctx.Context, *proposalCid) if err != nil { @@ -74,6 +86,7 @@ var importDataCmd = &cli.Command{ if err != nil { return fmt.Errorf("couldnt import v1.1.0 deal, or find boost deal: %w", err) } + fmt.Printf("Offline deal import for v1.1.0 deal %s scheduled for execution\n", proposalCid.String()) return nil } @@ -83,7 +96,7 @@ var importDataCmd = &cli.Command{ } // Deal proposal by deal uuid (v1.2.0 deal) - rej, err := napi.BoostOfflineDealWithData(cctx.Context, dealUuid, filePath) + rej, err := napi.BoostOfflineDealWithData(cctx.Context, dealUuid, filePath, deleteAfterImport) if err != nil { return fmt.Errorf("failed to execute offline deal: %w", err) } diff --git a/db/deals.go b/db/deals.go index ef365e5eb..9e31a6550 100644 --- a/db/deals.go +++ b/db/deals.go @@ -62,6 +62,7 @@ func newDealAccessor(db *sql.DB, deal *types.ProviderDealState) *dealAccessor { "PieceSize": &fielddef.FieldDef{F: &deal.ClientDealProposal.Proposal.PieceSize}, "VerifiedDeal": &fielddef.FieldDef{F: &deal.ClientDealProposal.Proposal.VerifiedDeal}, "IsOffline": &fielddef.FieldDef{F: &deal.IsOffline}, + "CleanupData": &fielddef.FieldDef{F: &deal.CleanupData}, "ClientAddress": &fielddef.AddrFieldDef{F: &deal.ClientDealProposal.Proposal.Client}, "ProviderAddress": &fielddef.AddrFieldDef{F: &deal.ClientDealProposal.Proposal.Provider}, "Label": &fielddef.LabelFieldDef{F: &deal.ClientDealProposal.Proposal.Label}, diff --git a/db/fixtures.go b/db/fixtures.go index 956c5efe4..75566c59b 100644 --- a/db/fixtures.go +++ b/db/fixtures.go @@ -54,9 +54,10 @@ func GenerateNDeals(count int) ([]types.ProviderDealState, error) { return nil, err } deal := types.ProviderDealState{ - DealUuid: uuid.New(), - CreatedAt: time.Now(), - IsOffline: true, + DealUuid: uuid.New(), + CreatedAt: time.Now(), + IsOffline: true, + CleanupData: false, ClientDealProposal: market.ClientDealProposal{ Proposal: market.DealProposal{ PieceCID: testutil.GenerateCid(), diff --git a/db/migrations/20230330111514_deals_cleanup_data.sql b/db/migrations/20230330111514_deals_cleanup_data.sql new file mode 100644 index 000000000..dc63f0144 --- /dev/null +++ b/db/migrations/20230330111514_deals_cleanup_data.sql @@ -0,0 +1,10 @@ +-- +goose Up +-- +goose StatementBegin +ALTER TABLE Deals + ADD CleanupData BOOL; +-- +goose StatementEnd + +-- +goose Down +-- +goose StatementBegin +SELECT 'down SQL query'; +-- +goose StatementEnd diff --git a/db/migrations/20230330111524_deals_cleanup_data.go b/db/migrations/20230330111524_deals_cleanup_data.go new file mode 100644 index 000000000..24c02ffb8 --- /dev/null +++ b/db/migrations/20230330111524_deals_cleanup_data.go @@ -0,0 +1,23 @@ +package migrations + +import ( + "database/sql" + "github.com/pressly/goose/v3" +) + +func init() { + goose.AddMigration(upDealsCleanupData, downDealsCleanupData) +} + +func upDealsCleanupData(tx *sql.Tx) error { + _, err := tx.Exec("UPDATE Deals SET CleanupData = NOT IsOffline") + if err != nil { + return err + } + return nil +} + +func downDealsCleanupData(tx *sql.Tx) error { + // This code is executed when the migration is rolled back. + return nil +} diff --git a/db/migrations_tests/deals_announce_to_ipni_test.go b/db/migrations_tests/deals_announce_to_ipni_test.go index 086235fcc..2a766e828 100644 --- a/db/migrations_tests/deals_announce_to_ipni_test.go +++ b/db/migrations_tests/deals_announce_to_ipni_test.go @@ -27,20 +27,31 @@ func TestDealAnnounceToIPNI(t *testing.T) { deals, err := db.GenerateNDeals(1) req.NoError(err) - // Insert the deals in DB - err = dealsDB.Insert(ctx, &deals[0]) - require.NoError(t, err) + // Insert the deal into the DB + deal := deals[0] + _, err = sqldb.Exec(`INSERT INTO Deals ("ID", "CreatedAt", "DealProposalSignature", "PieceCID", "PieceSize", + "VerifiedDeal", "IsOffline", "ClientAddress", "ProviderAddress","Label", "StartEpoch", "EndEpoch", + "StoragePricePerEpoch", "ProviderCollateral", "ClientCollateral", "ClientPeerID", "DealDataRoot", + "InboundFilePath", "TransferType", "TransferParams", "TransferSize", "ChainDealID", "PublishCID", + "SectorID", "Offset", "Length", "Checkpoint", "CheckpointAt", "Error", "Retry", "SignedProposalCID", + "FastRetrieval") + VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)`, + deal.DealUuid, deal.CreatedAt, []byte("test"), deal.ClientDealProposal.Proposal.PieceCID.String(), + deal.ClientDealProposal.Proposal.PieceSize, deal.ClientDealProposal.Proposal.VerifiedDeal, deal.IsOffline, + deal.ClientDealProposal.Proposal.Client.String(), deal.ClientDealProposal.Proposal.Provider.String(), "test", + deal.ClientDealProposal.Proposal.StartEpoch, deal.ClientDealProposal.Proposal.EndEpoch, deal.ClientDealProposal.Proposal.StoragePricePerEpoch.Uint64(), + deal.ClientDealProposal.Proposal.ProviderCollateral.Int64(), deal.ClientDealProposal.Proposal.ClientCollateral.Uint64(), deal.ClientPeerID.String(), + deal.DealDataRoot.String(), deal.InboundFilePath, deal.Transfer.Type, deal.Transfer.Params, deal.Transfer.Size, deal.ChainDealID, + deal.PublishCID.String(), deal.SectorID, deal.Offset, deal.Length, deal.Checkpoint.String(), deal.CheckpointAt, deal.Err, deal.Retry, []byte("test"), + deal.FastRetrieval) - // Get deal state - dealState, err := dealsDB.ByID(ctx, deals[0].DealUuid) - require.NoError(t, err) - require.False(t, dealState.AnnounceToIPNI) + req.NoError(err) - //Run migration - req.NoError(goose.UpByOne(sqldb, ".")) + // Run migration + req.NoError(goose.Up(sqldb, ".")) - // Check the deal state again - dealState, err = dealsDB.ByID(ctx, deals[0].DealUuid) + // Get the deal state + dealState, err := dealsDB.ByID(ctx, deals[0].DealUuid) require.NoError(t, err) require.True(t, dealState.AnnounceToIPNI) } diff --git a/db/migrations_tests/deals_cleanup_data_test.go b/db/migrations_tests/deals_cleanup_data_test.go new file mode 100644 index 000000000..c3767c823 --- /dev/null +++ b/db/migrations_tests/deals_cleanup_data_test.go @@ -0,0 +1,61 @@ +package migrations_tests + +import ( + "context" + "testing" + + "github.com/filecoin-project/boost/db" + "github.com/filecoin-project/boost/db/migrations" + "github.com/pressly/goose/v3" + "github.com/stretchr/testify/require" +) + +func TestDealCleanupData(t *testing.T) { + req := require.New(t) + ctx := context.Background() + + sqldb := db.CreateTestTmpDB(t) + req.NoError(db.CreateAllBoostTables(ctx, sqldb, sqldb)) + + // Run migrations up to the one that adds the CleanupData field to Deals + goose.SetBaseFS(migrations.EmbedMigrations) + req.NoError(goose.SetDialect("sqlite3")) + req.NoError(goose.UpTo(sqldb, ".", 20230330111514)) + + // Generate 1 deal + dealsDB := db.NewDealsDB(sqldb) + deals, err := db.GenerateNDeals(1) + req.NoError(err) + + // Insert the deal into the DB + deal := deals[0] + deal.IsOffline = false + _, err = sqldb.Exec(`INSERT INTO Deals ("ID", "CreatedAt", "DealProposalSignature", "PieceCID", "PieceSize", + "VerifiedDeal", "IsOffline", "ClientAddress", "ProviderAddress","Label", "StartEpoch", "EndEpoch", + "StoragePricePerEpoch", "ProviderCollateral", "ClientCollateral", "ClientPeerID", "DealDataRoot", + "InboundFilePath", "TransferType", "TransferParams", "TransferSize", "ChainDealID", "PublishCID", + "SectorID", "Offset", "Length", "Checkpoint", "CheckpointAt", "Error", "Retry", "SignedProposalCID", + "FastRetrieval", "AnnounceToIPNI") + VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)`, + deal.DealUuid, deal.CreatedAt, []byte("test"), deal.ClientDealProposal.Proposal.PieceCID.String(), + deal.ClientDealProposal.Proposal.PieceSize, deal.ClientDealProposal.Proposal.VerifiedDeal, deal.IsOffline, + deal.ClientDealProposal.Proposal.Client.String(), deal.ClientDealProposal.Proposal.Provider.String(), "test", + deal.ClientDealProposal.Proposal.StartEpoch, deal.ClientDealProposal.Proposal.EndEpoch, deal.ClientDealProposal.Proposal.StoragePricePerEpoch.Uint64(), + deal.ClientDealProposal.Proposal.ProviderCollateral.Int64(), deal.ClientDealProposal.Proposal.ClientCollateral.Uint64(), deal.ClientPeerID.String(), + deal.DealDataRoot.String(), deal.InboundFilePath, deal.Transfer.Type, deal.Transfer.Params, deal.Transfer.Size, deal.ChainDealID, + deal.PublishCID.String(), deal.SectorID, deal.Offset, deal.Length, deal.Checkpoint.String(), deal.CheckpointAt, deal.Err, deal.Retry, []byte("test"), + deal.FastRetrieval, deal.AnnounceToIPNI) + + req.NoError(err) + + // Run migration + req.NoError(goose.Up(sqldb, ".")) + + // Get the deal state + dealState, err := dealsDB.ByID(ctx, deals[0].DealUuid) + require.NoError(t, err) + + // Expect CleanupData to be true because the migration should set + // CleanupData to be the opposite of IsOffline + require.True(t, dealState.CleanupData) +} diff --git a/documentation/en/api-v1-methods.md b/documentation/en/api-v1-methods.md index b752e996a..d355958e9 100644 --- a/documentation/en/api-v1-methods.md +++ b/documentation/en/api-v1-methods.md @@ -380,6 +380,7 @@ Response: } }, "IsOffline": true, + "CleanupData": true, "ClientPeerID": "12D3KooWGzxzKZYveHXtpG6AsrUJBcWxHBFS2HsEoGTxrMLvKXtf", "DealDataRoot": { "/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4" @@ -447,6 +448,7 @@ Response: } }, "IsOffline": true, + "CleanupData": true, "ClientPeerID": "12D3KooWGzxzKZYveHXtpG6AsrUJBcWxHBFS2HsEoGTxrMLvKXtf", "DealDataRoot": { "/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4" @@ -601,7 +603,8 @@ Inputs: ```json [ "07070707-0707-0707-0707-070707070707", - "string value" + "string value", + true ] ``` diff --git a/gql/resolver_piece.go b/gql/resolver_piece.go index 55597f62f..b4f8038d2 100644 --- a/gql/resolver_piece.go +++ b/gql/resolver_piece.go @@ -185,7 +185,7 @@ func (r *resolver) PieceStatus(ctx context.Context, args struct{ PieceCid string } bd.Message = dl.Checkpoint.String() - // Only check the unseal state if the deal has already been added to a piece + // Only check the unseal state if the deal has already been added to a sector st := &sealStatus{IsUnsealed: false} if dl.Checkpoint >= dealcheckpoints.AddedPiece { isUnsealed, err := r.sa.IsUnsealed(ctx, dl.SectorID, dl.Offset.Unpadded(), dl.Length.Unpadded()) diff --git a/gql/schema.graphql b/gql/schema.graphql index f6780f96d..411c08071 100644 --- a/gql/schema.graphql +++ b/gql/schema.graphql @@ -70,6 +70,7 @@ type Deal { ChainDealID: Uint64! PublishCid: String! IsOffline: Boolean! + CleanupData: Boolean! Transfer: TransferParams! TransferSamples: [TransferPoint]! IsTransferStalled: Boolean! diff --git a/itests/dummydeal_offline_test.go b/itests/dummydeal_offline_test.go index ecb87070b..d4302d842 100644 --- a/itests/dummydeal_offline_test.go +++ b/itests/dummydeal_offline_test.go @@ -38,7 +38,7 @@ func TestDummydealOffline(t *testing.T) { res := dealRes.Result require.NoError(t, err) require.True(t, res.Accepted) - res, err = f.Boost.BoostOfflineDealWithData(context.Background(), offlineDealUuid, carFilepath) + res, err = f.Boost.BoostOfflineDealWithData(context.Background(), offlineDealUuid, carFilepath, false) require.NoError(t, err) require.True(t, res.Accepted) err = f.WaitForDealAddedToSector(offlineDealUuid) diff --git a/node/impl/boost.go b/node/impl/boost.go index 7353b9327..5d2927966 100644 --- a/node/impl/boost.go +++ b/node/impl/boost.go @@ -145,8 +145,8 @@ func (sm *BoostAPI) BoostIndexerAnnounceAllDeals(ctx context.Context) error { return sm.IndexProvider.IndexerAnnounceAllDeals(ctx) } -func (sm *BoostAPI) BoostOfflineDealWithData(ctx context.Context, dealUuid uuid.UUID, filePath string) (*api.ProviderDealRejectionInfo, error) { - res, err := sm.StorageProvider.ImportOfflineDealData(ctx, dealUuid, filePath) +func (sm *BoostAPI) BoostOfflineDealWithData(ctx context.Context, dealUuid uuid.UUID, filePath string, delAfterImport bool) (*api.ProviderDealRejectionInfo, error) { + res, err := sm.StorageProvider.ImportOfflineDealData(ctx, dealUuid, filePath, delAfterImport) return res, err } diff --git a/react/src/DealDetail.js b/react/src/DealDetail.js index cc68126a2..9d895d3f5 100644 --- a/react/src/DealDetail.js +++ b/react/src/DealDetail.js @@ -267,6 +267,10 @@ export function DealDetail(props) { Inbound File Path {deal.InboundFilePath} + + Delete After Add Piece + {deal.CleanupData ? 'Yes' : 'No'} + {deal.Sector.ID > 0 ? ( <> diff --git a/react/src/gql.js b/react/src/gql.js index 58377b889..450190ac6 100644 --- a/react/src/gql.js +++ b/react/src/gql.js @@ -81,6 +81,7 @@ const DealsListQuery = gql` AnnounceToIPNI KeepUnsealedCopy IsOffline + CleanupData Err Retry Message @@ -166,6 +167,7 @@ const DealSubscription = gql` ChainDealID PublishCid IsOffline + CleanupData Checkpoint CheckpointAt AnnounceToIPNI diff --git a/storagemarket/deal_execution.go b/storagemarket/deal_execution.go index c37ad3a83..ba2d145d9 100644 --- a/storagemarket/deal_execution.go +++ b/storagemarket/deal_execution.go @@ -217,9 +217,9 @@ func (p *Provider) execDealUptoAddPiece(ctx context.Context, deal *types.Provide } // as deal has already been handed to the sealer, we can remove the inbound file and reclaim the tagged space - if !deal.IsOffline { + if deal.CleanupData { _ = os.Remove(deal.InboundFilePath) - p.dealLogger.Infow(deal.DealUuid, "removed inbound file as deal handed to sealer", "path", deal.InboundFilePath) + p.dealLogger.Infow(deal.DealUuid, "removed piece data from disk as deal has been added to a sector", "path", deal.InboundFilePath) } if err := p.untagStorageSpaceAfterSealing(ctx, deal); err != nil { // If there's an error untagging storage space we should still try to continue, diff --git a/storagemarket/provider.go b/storagemarket/provider.go index e6aa5bd3f..79faa2eeb 100644 --- a/storagemarket/provider.go +++ b/storagemarket/provider.go @@ -239,10 +239,10 @@ func (p *Provider) GetAsk() *storagemarket.SignedStorageAsk { // ImportOfflineDealData is called when the Storage Provider imports data for // an offline deal (the deal must already have been proposed by the client) -func (p *Provider) ImportOfflineDealData(ctx context.Context, dealUuid uuid.UUID, filePath string) (pi *api.ProviderDealRejectionInfo, err error) { - p.dealLogger.Infow(dealUuid, "import data for offline deal", "filepath", filePath) +func (p *Provider) ImportOfflineDealData(ctx context.Context, dealUuid uuid.UUID, filePath string, delAfterImport bool) (pi *api.ProviderDealRejectionInfo, err error) { + p.dealLogger.Infow(dealUuid, "import data for offline deal", "filepath", filePath, "delete after import", delAfterImport) - // db should already have a deal with this uuid as the deal proposal should have been agreed before hand + // db should already have a deal with this uuid as the deal proposal should have been made beforehand ds, err := p.dealsDB.ByID(p.ctx, dealUuid) if err != nil { if errors.Is(err, sql.ErrNoRows) { @@ -258,6 +258,7 @@ func (p *Provider) ImportOfflineDealData(ctx context.Context, dealUuid uuid.UUID } ds.InboundFilePath = filePath + ds.CleanupData = delAfterImport resp, err := p.checkForDealAcceptance(ctx, ds, true) if err != nil { @@ -265,12 +266,12 @@ func (p *Provider) ImportOfflineDealData(ctx context.Context, dealUuid uuid.UUID return nil, fmt.Errorf("failed to send deal for acceptance: %w", err) } - // if there was an error, we don't return a rejection reason + // if there was an error, we just return the error message (there is no rejection reason) if resp.err != nil { return nil, fmt.Errorf("failed to accept deal: %w", resp.err) } - // return rejection reason as provider has rejected the deal. + // return rejection reason as provider has rejected the deal if !resp.ri.Accepted { p.dealLogger.Infow(dealUuid, "deal execution rejected by provider", "reason", resp.ri.Reason) return resp.ri, nil @@ -297,6 +298,7 @@ func (p *Provider) ExecuteDeal(ctx context.Context, dp *types.DealParams, client DealDataRoot: dp.DealDataRoot, Transfer: dp.Transfer, IsOffline: dp.IsOffline, + CleanupData: !dp.IsOffline, Retry: smtypes.DealRetryAuto, FastRetrieval: !dp.RemoveUnsealedCopy, AnnounceToIPNI: !dp.SkipIPNIAnnounce, diff --git a/storagemarket/provider_offline_test.go b/storagemarket/provider_offline_test.go index 46a395d39..dc50b37c9 100644 --- a/storagemarket/provider_offline_test.go +++ b/storagemarket/provider_offline_test.go @@ -2,6 +2,7 @@ package storagemarket import ( "context" + "fmt" "testing" "github.com/libp2p/go-libp2p/core/peer" @@ -31,7 +32,7 @@ func TestSimpleOfflineDealHappy(t *testing.T) { td.waitForAndAssert(t, ctx, dealcheckpoints.Accepted) // import data for offline deal - require.NoError(t, td.executeAndSubscribeImportOfflineDeal()) + require.NoError(t, td.executeAndSubscribeImportOfflineDeal(false)) // unblock commp -> wait for Transferred checkpoint td.unblockCommp() @@ -79,8 +80,36 @@ func TestOfflineDealInsufficientProviderFunds(t *testing.T) { // expect that when the deal data is imported, the import will fail because // there are not enough funds for the deal - pi, err = td.ph.Provider.ImportOfflineDealData(context.Background(), td.params.DealUUID, td.carv2FilePath) + pi, err = td.ph.Provider.ImportOfflineDealData(context.Background(), td.params.DealUUID, td.carv2FilePath, false) require.NoError(t, err) require.False(t, pi.Accepted) require.Contains(t, pi.Reason, "insufficient funds") } + +func TestOfflineDealDataCleanup(t *testing.T) { + ctx := context.Background() + + for _, delAfterImport := range []bool{true, false} { + t.Run(fmt.Sprintf("delete after import: %t", delAfterImport), func(t *testing.T) { + harness := NewHarness(t) + harness.Start(t, ctx) + defer harness.Stop() + + // first make an offline deal proposal + td := harness.newDealBuilder(t, 1, withOfflineDeal()).withAllMinerCallsNonBlocking().build() + + // execute deal + require.NoError(t, td.executeAndSubscribe()) + + // wait for Accepted checkpoint + td.waitForAndAssert(t, ctx, dealcheckpoints.Accepted) + + // import the deal data + require.NoError(t, td.executeAndSubscribeImportOfflineDeal(delAfterImport)) + + // check whether the deal data was removed after add piece + td.waitForAndAssert(t, ctx, dealcheckpoints.AddedPiece) + harness.EventuallyAssertNoTagged(t, ctx) + }) + } +} diff --git a/storagemarket/provider_test.go b/storagemarket/provider_test.go index 61f90b1ef..37eff0abf 100644 --- a/storagemarket/provider_test.go +++ b/storagemarket/provider_test.go @@ -592,7 +592,7 @@ func TestOfflineDealRestartAfterManualRecoverableErrors(t *testing.T) { require.NoError(t, err) // execute deal - err = td.executeAndSubscribeImportOfflineDeal() + err = td.executeAndSubscribeImportOfflineDeal(false) require.NoError(t, err) // expect recoverable error with retry type Manual @@ -1218,10 +1218,11 @@ func (h *ProviderHarness) AssertEventuallyDealCleanedup(t *testing.T, ctx contex return false } - // the deal inbound file should no longer exist if it is an online deal - if !dp.IsOffline { + // the deal inbound file should no longer exist if it is an online deal, + // or if it is an offline deal with the delete after import flag set + if dbState.CleanupData { _, statErr := os.Stat(dbState.InboundFilePath) - return statErr != nil + return os.IsNotExist(statErr) } return true }, 5*time.Second, 200*time.Millisecond) @@ -1822,11 +1823,18 @@ func (ph *ProviderHarness) newDealBuilder(t *testing.T, seed int, opts ...dealPr RemoveUnsealedCopy: true, } + // Create a copy of the car file so that if the original car file gets + // cleaned up after the deal is added to a sector, we still have a copy + // we can use to compare with the contents of the unsealed file. + carFileCopyPath := carFilePath + ".copy" + err = copyFile(carFilePath, carFileCopyPath) + require.NoError(tbuilder.t, err) td := &testDeal{ - ph: tbuilder.ph, - params: dealParams, - carv2FilePath: carFilePath, - carv2FileName: name, + ph: tbuilder.ph, + params: dealParams, + carv2FilePath: carFilePath, + carv2CopyFilePath: carFileCopyPath, + carv2FileName: name, } publishCid := testutil.GenerateCid() @@ -2053,18 +2061,19 @@ func (tbuilder *testDealBuilder) buildAnnounce() *testDealBuilder { } type testDeal struct { - ph *ProviderHarness - params *types.DealParams - carv2FilePath string - carv2FileName string - stubOutput *smtestutil.StubbedMinerOutput - sub event.Subscription + ph *ProviderHarness + params *types.DealParams + carv2FilePath string + carv2FileName string + carv2CopyFilePath string + stubOutput *smtestutil.StubbedMinerOutput + sub event.Subscription tBuilder *testDealBuilder } -func (td *testDeal) executeAndSubscribeImportOfflineDeal() error { - pi, err := td.ph.Provider.ImportOfflineDealData(context.Background(), td.params.DealUUID, td.carv2FilePath) +func (td *testDeal) executeAndSubscribeImportOfflineDeal(delAfterImport bool) error { + pi, err := td.ph.Provider.ImportOfflineDealData(context.Background(), td.params.DealUUID, td.carv2FilePath, delAfterImport) if err != nil { return err } @@ -2195,7 +2204,7 @@ func (td *testDeal) waitForAndAssert(t *testing.T, ctx context.Context, cp dealc case dealcheckpoints.PublishConfirmed: td.ph.AssertPublishConfirmed(t, ctx, td.params, td.stubOutput) case dealcheckpoints.AddedPiece: - td.ph.AssertPieceAdded(t, ctx, td.params, td.stubOutput, td.carv2FilePath) + td.ph.AssertPieceAdded(t, ctx, td.params, td.stubOutput, td.carv2CopyFilePath) case dealcheckpoints.IndexedAndAnnounced: td.ph.AssertDealIndexed(t, ctx, td.params, td.stubOutput) default: @@ -2224,7 +2233,7 @@ func (td *testDeal) unblockAddPiece() { } func (td *testDeal) assertPieceAdded(t *testing.T, ctx context.Context) { - td.ph.AssertPieceAdded(t, ctx, td.params, td.stubOutput, td.carv2FilePath) + td.ph.AssertPieceAdded(t, ctx, td.params, td.stubOutput, td.carv2CopyFilePath) } func (td *testDeal) assertDealFailedTransferNonRecoverable(t *testing.T, ctx context.Context, errStr string) { @@ -2273,3 +2282,17 @@ type mockSignatureVerifier struct { func (m *mockSignatureVerifier) VerifySignature(ctx context.Context, sig acrypto.Signature, addr address.Address, input []byte) (bool, error) { return m.valid, m.err } + +func copyFile(source string, dest string) error { + input, err := os.ReadFile(source) + if err != nil { + return err + } + + err = os.WriteFile(dest, input, 0644) + if err != nil { + return err + } + + return nil +} diff --git a/storagemarket/storagespace/storagespace.go b/storagemarket/storagespace/storagespace.go index 521e9b8b4..7ba73ec14 100644 --- a/storagemarket/storagespace/storagespace.go +++ b/storagemarket/storagespace/storagespace.go @@ -13,7 +13,7 @@ type Status struct { TotalAvailable uint64 // The number of bytes reserved for accepted deals Tagged uint64 - // The number of bytes that have been downloaded and are waiting to be added to a piece + // The number of bytes that have been downloaded and are waiting to be added to a sector Staged uint64 // The number of bytes that are not tagged Free uint64 diff --git a/storagemarket/types/deal_state.go b/storagemarket/types/deal_state.go index 092536b0d..d109ec05c 100644 --- a/storagemarket/types/deal_state.go +++ b/storagemarket/types/deal_state.go @@ -24,6 +24,9 @@ type ProviderDealState struct { // IsOffline is true for offline deals i.e. deals where the actual data to be stored by the SP is sent out of band // and not via an online data transfer. IsOffline bool + // CleanupData indicates whether to remove the data for a deal after the deal has been added to a sector. + // This is always true for online deals, and can be set as a flag for offline deals. + CleanupData bool // ClientPeerID is the Clients libp2p Peer ID. ClientPeerID peer.ID