Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option to cleanup data for offline deals after add piece #1341

Merged
merged 1 commit into from
Apr 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 14 additions & 14 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,20 @@ type Boost interface {
Net

// MethodGroup: Boost
BoostIndexerAnnounceAllDeals(ctx context.Context) error //perm:admin
BoostOfflineDealWithData(ctx context.Context, dealUuid uuid.UUID, filePath string) (*ProviderDealRejectionInfo, error) //perm:admin
BoostDeal(ctx context.Context, dealUuid uuid.UUID) (*smtypes.ProviderDealState, error) //perm:admin
BoostDealBySignedProposalCid(ctx context.Context, proposalCid cid.Cid) (*smtypes.ProviderDealState, error) //perm:admin
BoostDummyDeal(context.Context, smtypes.DealParams) (*ProviderDealRejectionInfo, error) //perm:admin
BoostDagstoreRegisterShard(ctx context.Context, key string) error //perm:admin
BoostDagstoreDestroyShard(ctx context.Context, key string) error //perm:admin
BoostDagstoreInitializeShard(ctx context.Context, key string) error //perm:admin
BoostDagstoreInitializeAll(ctx context.Context, params DagstoreInitializeAllParams) (<-chan DagstoreInitializeAllEvent, error) //perm:admin
BoostDagstoreRecoverShard(ctx context.Context, key string) error //perm:admin
BoostDagstoreGC(ctx context.Context) ([]DagstoreShardResult, error) //perm:admin
BoostDagstorePiecesContainingMultihash(ctx context.Context, mh multihash.Multihash) ([]cid.Cid, error) //perm:read
BoostDagstoreListShards(ctx context.Context) ([]DagstoreShardInfo, error) //perm:admin
BoostMakeDeal(context.Context, smtypes.DealParams) (*ProviderDealRejectionInfo, error) //perm:write
BoostIndexerAnnounceAllDeals(ctx context.Context) error //perm:admin
BoostOfflineDealWithData(ctx context.Context, dealUuid uuid.UUID, filePath string, delAfterImport bool) (*ProviderDealRejectionInfo, error) //perm:admin
BoostDeal(ctx context.Context, dealUuid uuid.UUID) (*smtypes.ProviderDealState, error) //perm:admin
BoostDealBySignedProposalCid(ctx context.Context, proposalCid cid.Cid) (*smtypes.ProviderDealState, error) //perm:admin
BoostDummyDeal(context.Context, smtypes.DealParams) (*ProviderDealRejectionInfo, error) //perm:admin
BoostDagstoreRegisterShard(ctx context.Context, key string) error //perm:admin
BoostDagstoreDestroyShard(ctx context.Context, key string) error //perm:admin
BoostDagstoreInitializeShard(ctx context.Context, key string) error //perm:admin
BoostDagstoreInitializeAll(ctx context.Context, params DagstoreInitializeAllParams) (<-chan DagstoreInitializeAllEvent, error) //perm:admin
BoostDagstoreRecoverShard(ctx context.Context, key string) error //perm:admin
BoostDagstoreGC(ctx context.Context) ([]DagstoreShardResult, error) //perm:admin
BoostDagstorePiecesContainingMultihash(ctx context.Context, mh multihash.Multihash) ([]cid.Cid, error) //perm:read
BoostDagstoreListShards(ctx context.Context) ([]DagstoreShardInfo, error) //perm:admin
BoostMakeDeal(context.Context, smtypes.DealParams) (*ProviderDealRejectionInfo, error) //perm:write

// MethodGroup: Blockstore
BlockstoreGet(ctx context.Context, c cid.Cid) ([]byte, error) //perm:read
Expand Down
8 changes: 4 additions & 4 deletions api/proxy_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Binary file modified build/openrpc/boost.json.gz
Binary file not shown.
15 changes: 14 additions & 1 deletion cmd/boostd/import_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@ var importDataCmd = &cli.Command{
Name: "import-data",
Usage: "Import data for offline deal made with Boost",
ArgsUsage: "<proposal CID> <file> or <deal UUID> <file>",
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "delete-after-import",
Usage: "whether to delete the data for the offline deal after the deal has been added to a sector",
Value: false,
},
},
Action: func(cctx *cli.Context) error {
if cctx.Args().Len() < 2 {
return fmt.Errorf("must specify proposal CID / deal UUID and file path")
Expand Down Expand Up @@ -58,7 +65,12 @@ var importDataCmd = &cli.Command{
defer closer()

// If the user has supplied a signed proposal cid
deleteAfterImport := cctx.Bool("delete-after-import")
if proposalCid != nil {
if deleteAfterImport {
return fmt.Errorf("legacy deal data cannot be automatically deleted after import (only new deals)")
}

// Look up the deal in the boost database
deal, err := napi.BoostDealBySignedProposalCid(cctx.Context, *proposalCid)
if err != nil {
Expand All @@ -74,6 +86,7 @@ var importDataCmd = &cli.Command{
if err != nil {
return fmt.Errorf("couldnt import v1.1.0 deal, or find boost deal: %w", err)
}

fmt.Printf("Offline deal import for v1.1.0 deal %s scheduled for execution\n", proposalCid.String())
return nil
}
Expand All @@ -83,7 +96,7 @@ var importDataCmd = &cli.Command{
}

// Deal proposal by deal uuid (v1.2.0 deal)
rej, err := napi.BoostOfflineDealWithData(cctx.Context, dealUuid, filePath)
rej, err := napi.BoostOfflineDealWithData(cctx.Context, dealUuid, filePath, deleteAfterImport)
if err != nil {
return fmt.Errorf("failed to execute offline deal: %w", err)
}
Expand Down
1 change: 1 addition & 0 deletions db/deals.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func newDealAccessor(db *sql.DB, deal *types.ProviderDealState) *dealAccessor {
"PieceSize": &fielddef.FieldDef{F: &deal.ClientDealProposal.Proposal.PieceSize},
"VerifiedDeal": &fielddef.FieldDef{F: &deal.ClientDealProposal.Proposal.VerifiedDeal},
"IsOffline": &fielddef.FieldDef{F: &deal.IsOffline},
"CleanupData": &fielddef.FieldDef{F: &deal.CleanupData},
"ClientAddress": &fielddef.AddrFieldDef{F: &deal.ClientDealProposal.Proposal.Client},
"ProviderAddress": &fielddef.AddrFieldDef{F: &deal.ClientDealProposal.Proposal.Provider},
"Label": &fielddef.LabelFieldDef{F: &deal.ClientDealProposal.Proposal.Label},
Expand Down
7 changes: 4 additions & 3 deletions db/fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,10 @@ func GenerateNDeals(count int) ([]types.ProviderDealState, error) {
return nil, err
}
deal := types.ProviderDealState{
DealUuid: uuid.New(),
CreatedAt: time.Now(),
IsOffline: true,
DealUuid: uuid.New(),
CreatedAt: time.Now(),
IsOffline: true,
CleanupData: false,
ClientDealProposal: market.ClientDealProposal{
Proposal: market.DealProposal{
PieceCID: testutil.GenerateCid(),
Expand Down
10 changes: 10 additions & 0 deletions db/migrations/20230330111514_deals_cleanup_data.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
-- +goose Up
-- +goose StatementBegin
ALTER TABLE Deals
ADD CleanupData BOOL;
-- +goose StatementEnd

-- +goose Down
-- +goose StatementBegin
SELECT 'down SQL query';
-- +goose StatementEnd
23 changes: 23 additions & 0 deletions db/migrations/20230330111524_deals_cleanup_data.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package migrations

import (
"database/sql"
"github.com/pressly/goose/v3"
)

func init() {
goose.AddMigration(upDealsCleanupData, downDealsCleanupData)
}

func upDealsCleanupData(tx *sql.Tx) error {
_, err := tx.Exec("UPDATE Deals SET CleanupData = NOT IsOffline")
if err != nil {
return err
}
return nil
}

func downDealsCleanupData(tx *sql.Tx) error {
// This code is executed when the migration is rolled back.
return nil
}
33 changes: 22 additions & 11 deletions db/migrations_tests/deals_announce_to_ipni_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,31 @@ func TestDealAnnounceToIPNI(t *testing.T) {
deals, err := db.GenerateNDeals(1)
req.NoError(err)

// Insert the deals in DB
err = dealsDB.Insert(ctx, &deals[0])
require.NoError(t, err)
// Insert the deal into the DB
deal := deals[0]
_, err = sqldb.Exec(`INSERT INTO Deals ("ID", "CreatedAt", "DealProposalSignature", "PieceCID", "PieceSize",
"VerifiedDeal", "IsOffline", "ClientAddress", "ProviderAddress","Label", "StartEpoch", "EndEpoch",
"StoragePricePerEpoch", "ProviderCollateral", "ClientCollateral", "ClientPeerID", "DealDataRoot",
"InboundFilePath", "TransferType", "TransferParams", "TransferSize", "ChainDealID", "PublishCID",
"SectorID", "Offset", "Length", "Checkpoint", "CheckpointAt", "Error", "Retry", "SignedProposalCID",
"FastRetrieval")
VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)`,
deal.DealUuid, deal.CreatedAt, []byte("test"), deal.ClientDealProposal.Proposal.PieceCID.String(),
deal.ClientDealProposal.Proposal.PieceSize, deal.ClientDealProposal.Proposal.VerifiedDeal, deal.IsOffline,
deal.ClientDealProposal.Proposal.Client.String(), deal.ClientDealProposal.Proposal.Provider.String(), "test",
deal.ClientDealProposal.Proposal.StartEpoch, deal.ClientDealProposal.Proposal.EndEpoch, deal.ClientDealProposal.Proposal.StoragePricePerEpoch.Uint64(),
deal.ClientDealProposal.Proposal.ProviderCollateral.Int64(), deal.ClientDealProposal.Proposal.ClientCollateral.Uint64(), deal.ClientPeerID.String(),
deal.DealDataRoot.String(), deal.InboundFilePath, deal.Transfer.Type, deal.Transfer.Params, deal.Transfer.Size, deal.ChainDealID,
deal.PublishCID.String(), deal.SectorID, deal.Offset, deal.Length, deal.Checkpoint.String(), deal.CheckpointAt, deal.Err, deal.Retry, []byte("test"),
deal.FastRetrieval)

// Get deal state
dealState, err := dealsDB.ByID(ctx, deals[0].DealUuid)
require.NoError(t, err)
require.False(t, dealState.AnnounceToIPNI)
req.NoError(err)

//Run migration
req.NoError(goose.UpByOne(sqldb, "."))
// Run migration
req.NoError(goose.Up(sqldb, "."))

// Check the deal state again
dealState, err = dealsDB.ByID(ctx, deals[0].DealUuid)
// Get the deal state
dealState, err := dealsDB.ByID(ctx, deals[0].DealUuid)
require.NoError(t, err)
require.True(t, dealState.AnnounceToIPNI)
}
61 changes: 61 additions & 0 deletions db/migrations_tests/deals_cleanup_data_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package migrations_tests

import (
"context"
"testing"

"github.com/filecoin-project/boost/db"
"github.com/filecoin-project/boost/db/migrations"
"github.com/pressly/goose/v3"
"github.com/stretchr/testify/require"
)

func TestDealCleanupData(t *testing.T) {
req := require.New(t)
ctx := context.Background()

sqldb := db.CreateTestTmpDB(t)
req.NoError(db.CreateAllBoostTables(ctx, sqldb, sqldb))

// Run migrations up to the one that adds the CleanupData field to Deals
goose.SetBaseFS(migrations.EmbedMigrations)
req.NoError(goose.SetDialect("sqlite3"))
req.NoError(goose.UpTo(sqldb, ".", 20230330111514))

// Generate 1 deal
dealsDB := db.NewDealsDB(sqldb)
deals, err := db.GenerateNDeals(1)
req.NoError(err)

// Insert the deal into the DB
deal := deals[0]
deal.IsOffline = false
_, err = sqldb.Exec(`INSERT INTO Deals ("ID", "CreatedAt", "DealProposalSignature", "PieceCID", "PieceSize",
"VerifiedDeal", "IsOffline", "ClientAddress", "ProviderAddress","Label", "StartEpoch", "EndEpoch",
"StoragePricePerEpoch", "ProviderCollateral", "ClientCollateral", "ClientPeerID", "DealDataRoot",
"InboundFilePath", "TransferType", "TransferParams", "TransferSize", "ChainDealID", "PublishCID",
"SectorID", "Offset", "Length", "Checkpoint", "CheckpointAt", "Error", "Retry", "SignedProposalCID",
"FastRetrieval", "AnnounceToIPNI")
VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)`,
deal.DealUuid, deal.CreatedAt, []byte("test"), deal.ClientDealProposal.Proposal.PieceCID.String(),
deal.ClientDealProposal.Proposal.PieceSize, deal.ClientDealProposal.Proposal.VerifiedDeal, deal.IsOffline,
deal.ClientDealProposal.Proposal.Client.String(), deal.ClientDealProposal.Proposal.Provider.String(), "test",
deal.ClientDealProposal.Proposal.StartEpoch, deal.ClientDealProposal.Proposal.EndEpoch, deal.ClientDealProposal.Proposal.StoragePricePerEpoch.Uint64(),
deal.ClientDealProposal.Proposal.ProviderCollateral.Int64(), deal.ClientDealProposal.Proposal.ClientCollateral.Uint64(), deal.ClientPeerID.String(),
deal.DealDataRoot.String(), deal.InboundFilePath, deal.Transfer.Type, deal.Transfer.Params, deal.Transfer.Size, deal.ChainDealID,
deal.PublishCID.String(), deal.SectorID, deal.Offset, deal.Length, deal.Checkpoint.String(), deal.CheckpointAt, deal.Err, deal.Retry, []byte("test"),
deal.FastRetrieval, deal.AnnounceToIPNI)

req.NoError(err)

// Run migration
req.NoError(goose.Up(sqldb, "."))

// Get the deal state
dealState, err := dealsDB.ByID(ctx, deals[0].DealUuid)
require.NoError(t, err)

// Expect CleanupData to be true because the migration should set
// CleanupData to be the opposite of IsOffline
require.True(t, dealState.CleanupData)
}
5 changes: 4 additions & 1 deletion documentation/en/api-v1-methods.md
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,7 @@ Response:
}
},
"IsOffline": true,
"CleanupData": true,
"ClientPeerID": "12D3KooWGzxzKZYveHXtpG6AsrUJBcWxHBFS2HsEoGTxrMLvKXtf",
"DealDataRoot": {
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
Expand Down Expand Up @@ -447,6 +448,7 @@ Response:
}
},
"IsOffline": true,
"CleanupData": true,
"ClientPeerID": "12D3KooWGzxzKZYveHXtpG6AsrUJBcWxHBFS2HsEoGTxrMLvKXtf",
"DealDataRoot": {
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
Expand Down Expand Up @@ -601,7 +603,8 @@ Inputs:
```json
[
"07070707-0707-0707-0707-070707070707",
"string value"
"string value",
true
]
```

Expand Down
2 changes: 1 addition & 1 deletion gql/resolver_piece.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func (r *resolver) PieceStatus(ctx context.Context, args struct{ PieceCid string
}
bd.Message = dl.Checkpoint.String()

// Only check the unseal state if the deal has already been added to a piece
// Only check the unseal state if the deal has already been added to a sector
st := &sealStatus{IsUnsealed: false}
if dl.Checkpoint >= dealcheckpoints.AddedPiece {
isUnsealed, err := r.sa.IsUnsealed(ctx, dl.SectorID, dl.Offset.Unpadded(), dl.Length.Unpadded())
Expand Down
1 change: 1 addition & 0 deletions gql/schema.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ type Deal {
ChainDealID: Uint64!
PublishCid: String!
IsOffline: Boolean!
CleanupData: Boolean!
Transfer: TransferParams!
TransferSamples: [TransferPoint]!
IsTransferStalled: Boolean!
Expand Down
2 changes: 1 addition & 1 deletion itests/dummydeal_offline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestDummydealOffline(t *testing.T) {
res := dealRes.Result
require.NoError(t, err)
require.True(t, res.Accepted)
res, err = f.Boost.BoostOfflineDealWithData(context.Background(), offlineDealUuid, carFilepath)
res, err = f.Boost.BoostOfflineDealWithData(context.Background(), offlineDealUuid, carFilepath, false)
require.NoError(t, err)
require.True(t, res.Accepted)
err = f.WaitForDealAddedToSector(offlineDealUuid)
Expand Down
4 changes: 2 additions & 2 deletions node/impl/boost.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,8 @@ func (sm *BoostAPI) BoostIndexerAnnounceAllDeals(ctx context.Context) error {
return sm.IndexProvider.IndexerAnnounceAllDeals(ctx)
}

func (sm *BoostAPI) BoostOfflineDealWithData(ctx context.Context, dealUuid uuid.UUID, filePath string) (*api.ProviderDealRejectionInfo, error) {
res, err := sm.StorageProvider.ImportOfflineDealData(ctx, dealUuid, filePath)
func (sm *BoostAPI) BoostOfflineDealWithData(ctx context.Context, dealUuid uuid.UUID, filePath string, delAfterImport bool) (*api.ProviderDealRejectionInfo, error) {
res, err := sm.StorageProvider.ImportOfflineDealData(ctx, dealUuid, filePath, delAfterImport)
return res, err
}

Expand Down
4 changes: 4 additions & 0 deletions react/src/DealDetail.js
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,10 @@ export function DealDetail(props) {
<th>Inbound File Path</th>
<td>{deal.InboundFilePath}</td>
</tr>
<tr>
<th>Delete After Add Piece</th>
<td>{deal.CleanupData ? 'Yes' : 'No'}</td>
</tr>
{deal.Sector.ID > 0 ? (
<>
<tr>
Expand Down
2 changes: 2 additions & 0 deletions react/src/gql.js
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ const DealsListQuery = gql`
AnnounceToIPNI
KeepUnsealedCopy
IsOffline
CleanupData
Err
Retry
Message
Expand Down Expand Up @@ -166,6 +167,7 @@ const DealSubscription = gql`
ChainDealID
PublishCid
IsOffline
CleanupData
Checkpoint
CheckpointAt
AnnounceToIPNI
Expand Down
4 changes: 2 additions & 2 deletions storagemarket/deal_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,9 +217,9 @@ func (p *Provider) execDealUptoAddPiece(ctx context.Context, deal *types.Provide
}

// as deal has already been handed to the sealer, we can remove the inbound file and reclaim the tagged space
if !deal.IsOffline {
if deal.CleanupData {
_ = os.Remove(deal.InboundFilePath)
p.dealLogger.Infow(deal.DealUuid, "removed inbound file as deal handed to sealer", "path", deal.InboundFilePath)
p.dealLogger.Infow(deal.DealUuid, "removed piece data from disk as deal has been added to a sector", "path", deal.InboundFilePath)
}
if err := p.untagStorageSpaceAfterSealing(ctx, deal); err != nil {
// If there's an error untagging storage space we should still try to continue,
Expand Down
Loading