From 98297cef4d601319b5c03128f83d03330f25b299 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Thu, 22 Oct 2020 13:40:26 -0700 Subject: [PATCH 1/2] feat(data-transfer): fill in utils --- api/api_full.go | 2 + api/api_storage.go | 6 ++ api/apistruct/struct.go | 15 ++++ cli/client.go | 76 ++++++++++++++++- cmd/lotus-storage-miner/market.go | 135 +++++++++++++++++++++++++++++- node/impl/client/client.go | 8 ++ node/impl/storminer.go | 17 ++++ 7 files changed, 252 insertions(+), 7 deletions(-) diff --git a/api/api_full.go b/api/api_full.go index e2025f58164..c7625e3734d 100644 --- a/api/api_full.go +++ b/api/api_full.go @@ -312,6 +312,8 @@ type FullNode interface { ClientDataTransferUpdates(ctx context.Context) (<-chan DataTransferChannel, error) // ClientRestartDataTransfer attempts to restart a data transfer with the given transfer ID and other peer ClientRestartDataTransfer(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error + // ClientCancelDataTransfer cancels a data transfer with the given transfer ID and other peer + ClientCancelDataTransfer(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error // ClientRetrieveTryRestartInsufficientFunds attempts to restart stalled retrievals on a given payment channel // which are stuck due to insufficient funds ClientRetrieveTryRestartInsufficientFunds(ctx context.Context, paymentChannel address.Address) error diff --git a/api/api_storage.go b/api/api_storage.go index 529224f6ea4..54f8e8780ef 100644 --- a/api/api_storage.go +++ b/api/api_storage.go @@ -5,7 +5,9 @@ import ( "context" "time" + datatransfer "github.com/filecoin-project/go-data-transfer" "github.com/ipfs/go-cid" + "github.com/libp2p/go-libp2p-core/peer" "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-fil-markets/piecestore" @@ -81,6 +83,10 @@ type StorageMiner interface { MarketGetRetrievalAsk(ctx context.Context) (*retrievalmarket.Ask, error) MarketListDataTransfers(ctx context.Context) ([]DataTransferChannel, error) MarketDataTransferUpdates(ctx context.Context) (<-chan DataTransferChannel, error) + // MinerRestartDataTransfer attempts to restart a data transfer with the given transfer ID and other peer + MarketRestartDataTransfer(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error + // ClientCancelDataTransfer cancels a data transfer with the given transfer ID and other peer + MarketCancelDataTransfer(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error DealsImportData(ctx context.Context, dealPropCid cid.Cid, file string) error DealsList(ctx context.Context) ([]MarketDeal, error) diff --git a/api/apistruct/struct.go b/api/apistruct/struct.go index e8e4ee33c7e..76c5c536999 100644 --- a/api/apistruct/struct.go +++ b/api/apistruct/struct.go @@ -172,6 +172,7 @@ type FullNodeStruct struct { ClientListDataTransfers func(ctx context.Context) ([]api.DataTransferChannel, error) `perm:"write"` ClientDataTransferUpdates func(ctx context.Context) (<-chan api.DataTransferChannel, error) `perm:"write"` ClientRestartDataTransfer func(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error `perm:"write"` + ClientCancelDataTransfer func(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error `perm:"write"` ClientRetrieveTryRestartInsufficientFunds func(ctx context.Context, paymentChannel address.Address) error `perm:"write"` StateNetworkName func(context.Context) (dtypes.NetworkName, error) `perm:"read"` @@ -284,6 +285,8 @@ type StorageMinerStruct struct { MarketGetRetrievalAsk func(ctx context.Context) (*retrievalmarket.Ask, error) `perm:"read"` MarketListDataTransfers func(ctx context.Context) ([]api.DataTransferChannel, error) `perm:"write"` MarketDataTransferUpdates func(ctx context.Context) (<-chan api.DataTransferChannel, error) `perm:"write"` + MarketRestartDataTransfer func(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error `perm:"read"` + MarketCancelDataTransfer func(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error `perm:"read"` PledgeSector func(context.Context) error `perm:"write"` @@ -568,6 +571,10 @@ func (c *FullNodeStruct) ClientRestartDataTransfer(ctx context.Context, transfer return c.Internal.ClientRestartDataTransfer(ctx, transferID, otherPeer, isInitiator) } +func (c *FullNodeStruct) ClientCancelDataTransfer(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error { + return c.Internal.ClientCancelDataTransfer(ctx, transferID, otherPeer, isInitiator) +} + func (c *FullNodeStruct) ClientRetrieveTryRestartInsufficientFunds(ctx context.Context, paymentChannel address.Address) error { return c.Internal.ClientRetrieveTryRestartInsufficientFunds(ctx, paymentChannel) } @@ -1304,6 +1311,14 @@ func (c *StorageMinerStruct) MarketDataTransferUpdates(ctx context.Context) (<-c return c.Internal.MarketDataTransferUpdates(ctx) } +func (c *StorageMinerStruct) MarketRestartDataTransfer(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error { + return c.Internal.MarketRestartDataTransfer(ctx, transferID, otherPeer, isInitiator) +} + +func (c *StorageMinerStruct) MarketCancelDataTransfer(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error { + return c.Internal.MarketCancelDataTransfer(ctx, transferID, otherPeer, isInitiator) +} + func (c *StorageMinerStruct) DealsImportData(ctx context.Context, dealPropCid cid.Cid, file string) error { return c.Internal.DealsImportData(ctx, dealPropCid, file) } diff --git a/cli/client.go b/cli/client.go index 83c45255521..4331251298b 100644 --- a/cli/client.go +++ b/cli/client.go @@ -91,6 +91,7 @@ var clientCmd = &cli.Command{ WithCategory("util", clientInfoCmd), WithCategory("util", clientListTransfers), WithCategory("util", clientRestartTransfer), + WithCategory("util", clientCancelTransfer), }, } @@ -1694,6 +1695,66 @@ var clientRestartTransfer = &cli.Command{ }, } +var clientCancelTransfer = &cli.Command{ + Name: "cancel-transfer", + Usage: "Force cancel a data transfer", + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "peerid", + Usage: "narrow to transfer with specific peer", + }, + &cli.BoolFlag{ + Name: "initiator", + Usage: "specify only transfers where peer is/is not initiator", + Value: true, + }, + }, + Action: func(cctx *cli.Context) error { + if !cctx.Args().Present() { + return cli.ShowCommandHelp(cctx, cctx.Command.Name) + } + api, closer, err := GetFullNodeAPI(cctx) + if err != nil { + return err + } + defer closer() + ctx := ReqContext(cctx) + + transferUint, err := strconv.ParseUint(cctx.Args().First(), 10, 64) + if err != nil { + return fmt.Errorf("Error reading transfer ID: %w", err) + } + transferID := datatransfer.TransferID(transferUint) + initiator := cctx.Bool("initiator") + var other peer.ID + if pidstr := cctx.String("peerid"); pidstr != "" { + p, err := peer.Decode(pidstr) + if err != nil { + return err + } + other = p + } else { + channels, err := api.ClientListDataTransfers(ctx) + if err != nil { + return err + } + found := false + for _, channel := range channels { + if channel.IsInitiator == initiator && channel.TransferID == transferID { + other = channel.OtherPeer + found = true + break + } + } + if !found { + return errors.New("unable to find matching data transfer") + } + } + + return api.ClientCancelDataTransfer(ctx, transferID, other, initiator) + }, +} + var clientListTransfers = &cli.Command{ Name: "list-transfers", Usage: "List ongoing data transfers for deals", @@ -1711,6 +1772,10 @@ var clientListTransfers = &cli.Command{ Name: "watch", Usage: "watch deal updates in real-time, rather than a one time list", }, + &cli.BoolFlag{ + Name: "show-failed", + Usage: "show failed/cancelled transfers", + }, }, Action: func(cctx *cli.Context) error { api, closer, err := GetFullNodeAPI(cctx) @@ -1728,7 +1793,7 @@ var clientListTransfers = &cli.Command{ completed := cctx.Bool("completed") color := cctx.Bool("color") watch := cctx.Bool("watch") - + showFailed := cctx.Bool("show-failed") if watch { channelUpdates, err := api.ClientDataTransferUpdates(ctx) if err != nil { @@ -1740,7 +1805,7 @@ var clientListTransfers = &cli.Command{ tm.MoveCursor(1, 1) - OutputDataTransferChannels(tm.Screen, channels, completed, color) + OutputDataTransferChannels(tm.Screen, channels, completed, color, showFailed) tm.Flush() @@ -1765,13 +1830,13 @@ var clientListTransfers = &cli.Command{ } } } - OutputDataTransferChannels(os.Stdout, channels, completed, color) + OutputDataTransferChannels(os.Stdout, channels, completed, color, showFailed) return nil }, } // OutputDataTransferChannels generates table output for a list of channels -func OutputDataTransferChannels(out io.Writer, channels []lapi.DataTransferChannel, completed bool, color bool) { +func OutputDataTransferChannels(out io.Writer, channels []lapi.DataTransferChannel, completed bool, color bool, showFailed bool) { sort.Slice(channels, func(i, j int) bool { return channels[i].TransferID < channels[j].TransferID }) @@ -1781,6 +1846,9 @@ func OutputDataTransferChannels(out io.Writer, channels []lapi.DataTransferChann if !completed && channel.Status == datatransfer.Completed { continue } + if !showFailed && (channel.Status == datatransfer.Failed || channel.Status == datatransfer.Cancelled) { + continue + } if channel.IsSender { sendingChannels = append(sendingChannels, channel) } else { diff --git a/cmd/lotus-storage-miner/market.go b/cmd/lotus-storage-miner/market.go index bb1ebd9ec5c..be4a529e982 100644 --- a/cmd/lotus-storage-miner/market.go +++ b/cmd/lotus-storage-miner/market.go @@ -2,6 +2,7 @@ package main import ( "bufio" + "errors" "fmt" "io" "os" @@ -13,8 +14,10 @@ import ( tm "github.com/buger/goterm" "github.com/docker/go-units" + datatransfer "github.com/filecoin-project/go-data-transfer" "github.com/ipfs/go-cid" "github.com/ipfs/go-cidutil/cidenc" + "github.com/libp2p/go-libp2p-core/peer" "github.com/multiformats/go-multibase" "github.com/urfave/cli/v2" "golang.org/x/xerrors" @@ -569,6 +572,128 @@ var dataTransfersCmd = &cli.Command{ Usage: "Manage data transfers", Subcommands: []*cli.Command{ transfersListCmd, + marketRestartTransfer, + marketCancelTransfer, + }, +} + +var marketRestartTransfer = &cli.Command{ + Name: "restart", + Usage: "Force restart a stalled data transfer", + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "peerid", + Usage: "narrow to transfer with specific peer", + }, + &cli.BoolFlag{ + Name: "initiator", + Usage: "specify only transfers where peer is/is not initiator", + Value: false, + }, + }, + Action: func(cctx *cli.Context) error { + if !cctx.Args().Present() { + return cli.ShowCommandHelp(cctx, cctx.Command.Name) + } + nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx) + if err != nil { + return err + } + defer closer() + ctx := lcli.ReqContext(cctx) + + transferUint, err := strconv.ParseUint(cctx.Args().First(), 10, 64) + if err != nil { + return fmt.Errorf("Error reading transfer ID: %w", err) + } + transferID := datatransfer.TransferID(transferUint) + initiator := cctx.Bool("initiator") + var other peer.ID + if pidstr := cctx.String("peerid"); pidstr != "" { + p, err := peer.Decode(pidstr) + if err != nil { + return err + } + other = p + } else { + channels, err := nodeApi.MarketListDataTransfers(ctx) + if err != nil { + return err + } + found := false + for _, channel := range channels { + if channel.IsInitiator == initiator && channel.TransferID == transferID { + other = channel.OtherPeer + found = true + break + } + } + if !found { + return errors.New("unable to find matching data transfer") + } + } + + return nodeApi.MarketRestartDataTransfer(ctx, transferID, other, initiator) + }, +} + +var marketCancelTransfer = &cli.Command{ + Name: "cancel", + Usage: "Force cancel a data transfer", + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "peerid", + Usage: "narrow to transfer with specific peer", + }, + &cli.BoolFlag{ + Name: "initiator", + Usage: "specify only transfers where peer is/is not initiator", + Value: false, + }, + }, + Action: func(cctx *cli.Context) error { + if !cctx.Args().Present() { + return cli.ShowCommandHelp(cctx, cctx.Command.Name) + } + nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx) + if err != nil { + return err + } + defer closer() + ctx := lcli.ReqContext(cctx) + + transferUint, err := strconv.ParseUint(cctx.Args().First(), 10, 64) + if err != nil { + return fmt.Errorf("Error reading transfer ID: %w", err) + } + transferID := datatransfer.TransferID(transferUint) + initiator := cctx.Bool("initiator") + var other peer.ID + if pidstr := cctx.String("peerid"); pidstr != "" { + p, err := peer.Decode(pidstr) + if err != nil { + return err + } + other = p + } else { + channels, err := nodeApi.MarketListDataTransfers(ctx) + if err != nil { + return err + } + found := false + for _, channel := range channels { + if channel.IsInitiator == initiator && channel.TransferID == transferID { + other = channel.OtherPeer + found = true + break + } + } + if !found { + return errors.New("unable to find matching data transfer") + } + } + + return nodeApi.MarketCancelDataTransfer(ctx, transferID, other, initiator) }, } @@ -589,6 +714,10 @@ var transfersListCmd = &cli.Command{ Name: "watch", Usage: "watch deal updates in real-time, rather than a one time list", }, + &cli.BoolFlag{ + Name: "show-failed", + Usage: "show failed/cancelled transfers", + }, }, Action: func(cctx *cli.Context) error { api, closer, err := lcli.GetStorageMinerAPI(cctx) @@ -606,7 +735,7 @@ var transfersListCmd = &cli.Command{ completed := cctx.Bool("completed") color := cctx.Bool("color") watch := cctx.Bool("watch") - + showFailed := cctx.Bool("show-failed") if watch { channelUpdates, err := api.MarketDataTransferUpdates(ctx) if err != nil { @@ -618,7 +747,7 @@ var transfersListCmd = &cli.Command{ tm.MoveCursor(1, 1) - lcli.OutputDataTransferChannels(tm.Screen, channels, completed, color) + lcli.OutputDataTransferChannels(tm.Screen, channels, completed, color, showFailed) tm.Flush() @@ -643,7 +772,7 @@ var transfersListCmd = &cli.Command{ } } } - lcli.OutputDataTransferChannels(os.Stdout, channels, completed, color) + lcli.OutputDataTransferChannels(os.Stdout, channels, completed, color, showFailed) return nil }, } diff --git a/node/impl/client/client.go b/node/impl/client/client.go index 0cd1648261d..c166548e39c 100644 --- a/node/impl/client/client.go +++ b/node/impl/client/client.go @@ -858,6 +858,14 @@ func (a *API) ClientRestartDataTransfer(ctx context.Context, transferID datatran return a.DataTransfer.RestartDataTransferChannel(ctx, datatransfer.ChannelID{Initiator: otherPeer, Responder: selfPeer, ID: transferID}) } +func (a *API) ClientCancelDataTransfer(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error { + selfPeer := a.Host.ID() + if isInitiator { + return a.DataTransfer.CloseDataTransferChannel(ctx, datatransfer.ChannelID{Initiator: selfPeer, Responder: otherPeer, ID: transferID}) + } + return a.DataTransfer.CloseDataTransferChannel(ctx, datatransfer.ChannelID{Initiator: otherPeer, Responder: selfPeer, ID: transferID}) +} + func newDealInfo(v storagemarket.ClientDeal) api.DealInfo { return api.DealInfo{ ProposalCid: v.ProposalCid, diff --git a/node/impl/storminer.go b/node/impl/storminer.go index 6090e8a58c4..74b3f78f3fe 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -10,6 +10,7 @@ import ( "github.com/ipfs/go-cid" "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/peer" "golang.org/x/xerrors" "github.com/filecoin-project/go-address" @@ -400,6 +401,22 @@ func (sm *StorageMinerAPI) MarketListDataTransfers(ctx context.Context) ([]api.D return apiChannels, nil } +func (sm *StorageMinerAPI) MarketRestartDataTransfer(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error { + selfPeer := sm.Host.ID() + if isInitiator { + return sm.DataTransfer.RestartDataTransferChannel(ctx, datatransfer.ChannelID{Initiator: selfPeer, Responder: otherPeer, ID: transferID}) + } + return sm.DataTransfer.RestartDataTransferChannel(ctx, datatransfer.ChannelID{Initiator: otherPeer, Responder: selfPeer, ID: transferID}) +} + +func (sm *StorageMinerAPI) MarketCancelDataTransfer(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error { + selfPeer := sm.Host.ID() + if isInitiator { + return sm.DataTransfer.CloseDataTransferChannel(ctx, datatransfer.ChannelID{Initiator: selfPeer, Responder: otherPeer, ID: transferID}) + } + return sm.DataTransfer.CloseDataTransferChannel(ctx, datatransfer.ChannelID{Initiator: otherPeer, Responder: selfPeer, ID: transferID}) +} + func (sm *StorageMinerAPI) MarketDataTransferUpdates(ctx context.Context) (<-chan api.DataTransferChannel, error) { channels := make(chan api.DataTransferChannel) From b98b2d4310d8b9e4403b4068a0573c66665aadc4 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Fri, 23 Oct 2020 18:35:01 -0700 Subject: [PATCH 2/2] docs(docsgen): update api docs --- documentation/en/api-methods.md | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/documentation/en/api-methods.md b/documentation/en/api-methods.md index c8244037515..df06a60246f 100644 --- a/documentation/en/api-methods.md +++ b/documentation/en/api-methods.md @@ -32,6 +32,7 @@ * [ChainTipSetWeight](#ChainTipSetWeight) * [Client](#Client) * [ClientCalcCommP](#ClientCalcCommP) + * [ClientCancelDataTransfer](#ClientCancelDataTransfer) * [ClientDataTransferUpdates](#ClientDataTransferUpdates) * [ClientDealSize](#ClientDealSize) * [ClientFindData](#ClientFindData) @@ -848,6 +849,23 @@ Response: } ``` +### ClientCancelDataTransfer +ClientCancelDataTransfer cancels a data transfer with the given transfer ID and other peer + + +Perms: write + +Inputs: +```json +[ + 3, + "12D3KooWGzxzKZYveHXtpG6AsrUJBcWxHBFS2HsEoGTxrMLvKXtf", + true +] +``` + +Response: `{}` + ### ClientDataTransferUpdates There are not yet any comments for this method.