Skip to content

Commit

Permalink
Merge pull request #4572 from filecoin-project/feat/data-transfer-utils
Browse files Browse the repository at this point in the history
Flesh out data transfer features
  • Loading branch information
magik6k authored Oct 26, 2020
2 parents 8fe7db1 + b98b2d4 commit a37c372
Show file tree
Hide file tree
Showing 8 changed files with 270 additions and 7 deletions.
2 changes: 2 additions & 0 deletions api/api_full.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,8 @@ type FullNode interface {
ClientDataTransferUpdates(ctx context.Context) (<-chan DataTransferChannel, error)
// ClientRestartDataTransfer attempts to restart a data transfer with the given transfer ID and other peer
ClientRestartDataTransfer(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error
// ClientCancelDataTransfer cancels a data transfer with the given transfer ID and other peer
ClientCancelDataTransfer(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error
// ClientRetrieveTryRestartInsufficientFunds attempts to restart stalled retrievals on a given payment channel
// which are stuck due to insufficient funds
ClientRetrieveTryRestartInsufficientFunds(ctx context.Context, paymentChannel address.Address) error
Expand Down
6 changes: 6 additions & 0 deletions api/api_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (
"context"
"time"

datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p-core/peer"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-fil-markets/piecestore"
Expand Down Expand Up @@ -81,6 +83,10 @@ type StorageMiner interface {
MarketGetRetrievalAsk(ctx context.Context) (*retrievalmarket.Ask, error)
MarketListDataTransfers(ctx context.Context) ([]DataTransferChannel, error)
MarketDataTransferUpdates(ctx context.Context) (<-chan DataTransferChannel, error)
// MinerRestartDataTransfer attempts to restart a data transfer with the given transfer ID and other peer
MarketRestartDataTransfer(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error
// ClientCancelDataTransfer cancels a data transfer with the given transfer ID and other peer
MarketCancelDataTransfer(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error

DealsImportData(ctx context.Context, dealPropCid cid.Cid, file string) error
DealsList(ctx context.Context) ([]MarketDeal, error)
Expand Down
15 changes: 15 additions & 0 deletions api/apistruct/struct.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ type FullNodeStruct struct {
ClientListDataTransfers func(ctx context.Context) ([]api.DataTransferChannel, error) `perm:"write"`
ClientDataTransferUpdates func(ctx context.Context) (<-chan api.DataTransferChannel, error) `perm:"write"`
ClientRestartDataTransfer func(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error `perm:"write"`
ClientCancelDataTransfer func(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error `perm:"write"`
ClientRetrieveTryRestartInsufficientFunds func(ctx context.Context, paymentChannel address.Address) error `perm:"write"`

StateNetworkName func(context.Context) (dtypes.NetworkName, error) `perm:"read"`
Expand Down Expand Up @@ -284,6 +285,8 @@ type StorageMinerStruct struct {
MarketGetRetrievalAsk func(ctx context.Context) (*retrievalmarket.Ask, error) `perm:"read"`
MarketListDataTransfers func(ctx context.Context) ([]api.DataTransferChannel, error) `perm:"write"`
MarketDataTransferUpdates func(ctx context.Context) (<-chan api.DataTransferChannel, error) `perm:"write"`
MarketRestartDataTransfer func(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error `perm:"read"`
MarketCancelDataTransfer func(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error `perm:"read"`

PledgeSector func(context.Context) error `perm:"write"`

Expand Down Expand Up @@ -580,6 +583,10 @@ func (c *FullNodeStruct) ClientRestartDataTransfer(ctx context.Context, transfer
return c.Internal.ClientRestartDataTransfer(ctx, transferID, otherPeer, isInitiator)
}

func (c *FullNodeStruct) ClientCancelDataTransfer(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error {
return c.Internal.ClientCancelDataTransfer(ctx, transferID, otherPeer, isInitiator)
}

func (c *FullNodeStruct) ClientRetrieveTryRestartInsufficientFunds(ctx context.Context, paymentChannel address.Address) error {
return c.Internal.ClientRetrieveTryRestartInsufficientFunds(ctx, paymentChannel)
}
Expand Down Expand Up @@ -1316,6 +1323,14 @@ func (c *StorageMinerStruct) MarketDataTransferUpdates(ctx context.Context) (<-c
return c.Internal.MarketDataTransferUpdates(ctx)
}

func (c *StorageMinerStruct) MarketRestartDataTransfer(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error {
return c.Internal.MarketRestartDataTransfer(ctx, transferID, otherPeer, isInitiator)
}

func (c *StorageMinerStruct) MarketCancelDataTransfer(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error {
return c.Internal.MarketCancelDataTransfer(ctx, transferID, otherPeer, isInitiator)
}

func (c *StorageMinerStruct) DealsImportData(ctx context.Context, dealPropCid cid.Cid, file string) error {
return c.Internal.DealsImportData(ctx, dealPropCid, file)
}
Expand Down
76 changes: 72 additions & 4 deletions cli/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ var clientCmd = &cli.Command{
WithCategory("util", clientInfoCmd),
WithCategory("util", clientListTransfers),
WithCategory("util", clientRestartTransfer),
WithCategory("util", clientCancelTransfer),
},
}

Expand Down Expand Up @@ -1698,6 +1699,66 @@ var clientRestartTransfer = &cli.Command{
},
}

var clientCancelTransfer = &cli.Command{
Name: "cancel-transfer",
Usage: "Force cancel a data transfer",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "peerid",
Usage: "narrow to transfer with specific peer",
},
&cli.BoolFlag{
Name: "initiator",
Usage: "specify only transfers where peer is/is not initiator",
Value: true,
},
},
Action: func(cctx *cli.Context) error {
if !cctx.Args().Present() {
return cli.ShowCommandHelp(cctx, cctx.Command.Name)
}
api, closer, err := GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)

transferUint, err := strconv.ParseUint(cctx.Args().First(), 10, 64)
if err != nil {
return fmt.Errorf("Error reading transfer ID: %w", err)
}
transferID := datatransfer.TransferID(transferUint)
initiator := cctx.Bool("initiator")
var other peer.ID
if pidstr := cctx.String("peerid"); pidstr != "" {
p, err := peer.Decode(pidstr)
if err != nil {
return err
}
other = p
} else {
channels, err := api.ClientListDataTransfers(ctx)
if err != nil {
return err
}
found := false
for _, channel := range channels {
if channel.IsInitiator == initiator && channel.TransferID == transferID {
other = channel.OtherPeer
found = true
break
}
}
if !found {
return errors.New("unable to find matching data transfer")
}
}

return api.ClientCancelDataTransfer(ctx, transferID, other, initiator)
},
}

var clientListTransfers = &cli.Command{
Name: "list-transfers",
Usage: "List ongoing data transfers for deals",
Expand All @@ -1715,6 +1776,10 @@ var clientListTransfers = &cli.Command{
Name: "watch",
Usage: "watch deal updates in real-time, rather than a one time list",
},
&cli.BoolFlag{
Name: "show-failed",
Usage: "show failed/cancelled transfers",
},
},
Action: func(cctx *cli.Context) error {
api, closer, err := GetFullNodeAPI(cctx)
Expand All @@ -1732,7 +1797,7 @@ var clientListTransfers = &cli.Command{
completed := cctx.Bool("completed")
color := cctx.Bool("color")
watch := cctx.Bool("watch")

showFailed := cctx.Bool("show-failed")
if watch {
channelUpdates, err := api.ClientDataTransferUpdates(ctx)
if err != nil {
Expand All @@ -1744,7 +1809,7 @@ var clientListTransfers = &cli.Command{

tm.MoveCursor(1, 1)

OutputDataTransferChannels(tm.Screen, channels, completed, color)
OutputDataTransferChannels(tm.Screen, channels, completed, color, showFailed)

tm.Flush()

Expand All @@ -1769,13 +1834,13 @@ var clientListTransfers = &cli.Command{
}
}
}
OutputDataTransferChannels(os.Stdout, channels, completed, color)
OutputDataTransferChannels(os.Stdout, channels, completed, color, showFailed)
return nil
},
}

// OutputDataTransferChannels generates table output for a list of channels
func OutputDataTransferChannels(out io.Writer, channels []lapi.DataTransferChannel, completed bool, color bool) {
func OutputDataTransferChannels(out io.Writer, channels []lapi.DataTransferChannel, completed bool, color bool, showFailed bool) {
sort.Slice(channels, func(i, j int) bool {
return channels[i].TransferID < channels[j].TransferID
})
Expand All @@ -1785,6 +1850,9 @@ func OutputDataTransferChannels(out io.Writer, channels []lapi.DataTransferChann
if !completed && channel.Status == datatransfer.Completed {
continue
}
if !showFailed && (channel.Status == datatransfer.Failed || channel.Status == datatransfer.Cancelled) {
continue
}
if channel.IsSender {
sendingChannels = append(sendingChannels, channel)
} else {
Expand Down
135 changes: 132 additions & 3 deletions cmd/lotus-storage-miner/market.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"bufio"
"errors"
"fmt"
"io"
"os"
Expand All @@ -13,8 +14,10 @@ import (

tm "github.com/buger/goterm"
"github.com/docker/go-units"
datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-cidutil/cidenc"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/multiformats/go-multibase"
"github.com/urfave/cli/v2"
"golang.org/x/xerrors"
Expand Down Expand Up @@ -569,6 +572,128 @@ var dataTransfersCmd = &cli.Command{
Usage: "Manage data transfers",
Subcommands: []*cli.Command{
transfersListCmd,
marketRestartTransfer,
marketCancelTransfer,
},
}

var marketRestartTransfer = &cli.Command{
Name: "restart",
Usage: "Force restart a stalled data transfer",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "peerid",
Usage: "narrow to transfer with specific peer",
},
&cli.BoolFlag{
Name: "initiator",
Usage: "specify only transfers where peer is/is not initiator",
Value: false,
},
},
Action: func(cctx *cli.Context) error {
if !cctx.Args().Present() {
return cli.ShowCommandHelp(cctx, cctx.Command.Name)
}
nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := lcli.ReqContext(cctx)

transferUint, err := strconv.ParseUint(cctx.Args().First(), 10, 64)
if err != nil {
return fmt.Errorf("Error reading transfer ID: %w", err)
}
transferID := datatransfer.TransferID(transferUint)
initiator := cctx.Bool("initiator")
var other peer.ID
if pidstr := cctx.String("peerid"); pidstr != "" {
p, err := peer.Decode(pidstr)
if err != nil {
return err
}
other = p
} else {
channels, err := nodeApi.MarketListDataTransfers(ctx)
if err != nil {
return err
}
found := false
for _, channel := range channels {
if channel.IsInitiator == initiator && channel.TransferID == transferID {
other = channel.OtherPeer
found = true
break
}
}
if !found {
return errors.New("unable to find matching data transfer")
}
}

return nodeApi.MarketRestartDataTransfer(ctx, transferID, other, initiator)
},
}

var marketCancelTransfer = &cli.Command{
Name: "cancel",
Usage: "Force cancel a data transfer",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "peerid",
Usage: "narrow to transfer with specific peer",
},
&cli.BoolFlag{
Name: "initiator",
Usage: "specify only transfers where peer is/is not initiator",
Value: false,
},
},
Action: func(cctx *cli.Context) error {
if !cctx.Args().Present() {
return cli.ShowCommandHelp(cctx, cctx.Command.Name)
}
nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := lcli.ReqContext(cctx)

transferUint, err := strconv.ParseUint(cctx.Args().First(), 10, 64)
if err != nil {
return fmt.Errorf("Error reading transfer ID: %w", err)
}
transferID := datatransfer.TransferID(transferUint)
initiator := cctx.Bool("initiator")
var other peer.ID
if pidstr := cctx.String("peerid"); pidstr != "" {
p, err := peer.Decode(pidstr)
if err != nil {
return err
}
other = p
} else {
channels, err := nodeApi.MarketListDataTransfers(ctx)
if err != nil {
return err
}
found := false
for _, channel := range channels {
if channel.IsInitiator == initiator && channel.TransferID == transferID {
other = channel.OtherPeer
found = true
break
}
}
if !found {
return errors.New("unable to find matching data transfer")
}
}

return nodeApi.MarketCancelDataTransfer(ctx, transferID, other, initiator)
},
}

Expand All @@ -589,6 +714,10 @@ var transfersListCmd = &cli.Command{
Name: "watch",
Usage: "watch deal updates in real-time, rather than a one time list",
},
&cli.BoolFlag{
Name: "show-failed",
Usage: "show failed/cancelled transfers",
},
},
Action: func(cctx *cli.Context) error {
api, closer, err := lcli.GetStorageMinerAPI(cctx)
Expand All @@ -606,7 +735,7 @@ var transfersListCmd = &cli.Command{
completed := cctx.Bool("completed")
color := cctx.Bool("color")
watch := cctx.Bool("watch")

showFailed := cctx.Bool("show-failed")
if watch {
channelUpdates, err := api.MarketDataTransferUpdates(ctx)
if err != nil {
Expand All @@ -618,7 +747,7 @@ var transfersListCmd = &cli.Command{

tm.MoveCursor(1, 1)

lcli.OutputDataTransferChannels(tm.Screen, channels, completed, color)
lcli.OutputDataTransferChannels(tm.Screen, channels, completed, color, showFailed)

tm.Flush()

Expand All @@ -643,7 +772,7 @@ var transfersListCmd = &cli.Command{
}
}
}
lcli.OutputDataTransferChannels(os.Stdout, channels, completed, color)
lcli.OutputDataTransferChannels(os.Stdout, channels, completed, color, showFailed)
return nil
},
}
Loading

0 comments on commit a37c372

Please sign in to comment.