Skip to content

Commit

Permalink
Merge pull request #3191 from filecoin-project/feat/miner-data-transfers
Browse files Browse the repository at this point in the history
Add data transfer list command to miner
  • Loading branch information
magik6k authored Aug 20, 2020
2 parents e035e29 + 0086f76 commit cd82f79
Show file tree
Hide file tree
Showing 8 changed files with 175 additions and 38 deletions.
2 changes: 2 additions & 0 deletions api/api_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ type StorageMiner interface {
MarketGetAsk(ctx context.Context) (*storagemarket.SignedStorageAsk, error)
MarketSetRetrievalAsk(ctx context.Context, rask *retrievalmarket.Ask) error
MarketGetRetrievalAsk(ctx context.Context) (*retrievalmarket.Ask, error)
MarketListDataTransfers(ctx context.Context) ([]DataTransferChannel, error)
MarketDataTransferUpdates(ctx context.Context) (<-chan DataTransferChannel, error)

DealsImportData(ctx context.Context, dealPropCid cid.Cid, file string) error
DealsList(ctx context.Context) ([]storagemarket.StorageDeal, error)
Expand Down
10 changes: 10 additions & 0 deletions api/apistruct/struct.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,8 @@ type StorageMinerStruct struct {
MarketGetAsk func(ctx context.Context) (*storagemarket.SignedStorageAsk, error) `perm:"read"`
MarketSetRetrievalAsk func(ctx context.Context, rask *retrievalmarket.Ask) error `perm:"admin"`
MarketGetRetrievalAsk func(ctx context.Context) (*retrievalmarket.Ask, error) `perm:"read"`
MarketListDataTransfers func(ctx context.Context) ([]api.DataTransferChannel, error) `perm:"write"`
MarketDataTransferUpdates func(ctx context.Context) (<-chan api.DataTransferChannel, error) `perm:"write"`

PledgeSector func(context.Context) error `perm:"write"`

Expand Down Expand Up @@ -1080,6 +1082,14 @@ func (c *StorageMinerStruct) MarketGetRetrievalAsk(ctx context.Context) (*retrie
return c.Internal.MarketGetRetrievalAsk(ctx)
}

func (c *StorageMinerStruct) MarketListDataTransfers(ctx context.Context) ([]api.DataTransferChannel, error) {
return c.Internal.MarketListDataTransfers(ctx)
}

func (c *StorageMinerStruct) MarketDataTransferUpdates(ctx context.Context) (<-chan api.DataTransferChannel, error) {
return c.Internal.MarketDataTransferUpdates(ctx)
}

func (c *StorageMinerStruct) DealsImportData(ctx context.Context, dealPropCid cid.Cid, file string) error {
return c.Internal.DealsImportData(ctx, dealPropCid, file)
}
Expand Down
33 changes: 33 additions & 0 deletions api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package api

import (
"encoding/json"
"fmt"

"github.com/filecoin-project/go-address"
datatransfer "github.com/filecoin-project/go-data-transfer"
Expand Down Expand Up @@ -117,3 +118,35 @@ type DataTransferChannel struct {
OtherPeer peer.ID
Transferred uint64
}

// NewDataTransferChannel constructs an API DataTransferChannel type from full channel state snapshot and a host id
func NewDataTransferChannel(hostID peer.ID, channelState datatransfer.ChannelState) DataTransferChannel {
channel := DataTransferChannel{
TransferID: channelState.TransferID(),
Status: channelState.Status(),
BaseCID: channelState.BaseCID(),
IsSender: channelState.Sender() == hostID,
Message: channelState.Message(),
}
stringer, ok := channelState.Voucher().(fmt.Stringer)
if ok {
channel.Voucher = stringer.String()
} else {
voucherJSON, err := json.Marshal(channelState.Voucher())
if err != nil {
channel.Voucher = fmt.Errorf("Voucher Serialization: %w", err).Error()
} else {
channel.Voucher = string(voucherJSON)
}
}
if channel.IsSender {
channel.IsInitiator = !channelState.IsPull()
channel.Transferred = channelState.Sent()
channel.OtherPeer = channelState.Recipient()
} else {
channel.IsInitiator = channelState.IsPull()
channel.Transferred = channelState.Received()
channel.OtherPeer = channelState.Sender()
}
return channel
}
9 changes: 5 additions & 4 deletions cli/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1254,7 +1254,7 @@ var clientListTransfers = &cli.Command{

tm.MoveCursor(1, 1)

outputChannels(tm.Screen, channels, completed, color)
OutputDataTransferChannels(tm.Screen, channels, completed, color)

tm.Flush()

Expand All @@ -1279,12 +1279,13 @@ var clientListTransfers = &cli.Command{
}
}
}
outputChannels(os.Stdout, channels, completed, color)
OutputDataTransferChannels(os.Stdout, channels, completed, color)
return nil
},
}

func outputChannels(out io.Writer, channels []api.DataTransferChannel, completed bool, color bool) {
// OutputDataTransferChannels generates table output for a list of channels
func OutputDataTransferChannels(out io.Writer, channels []lapi.DataTransferChannel, completed bool, color bool) {
sort.Slice(channels, func(i, j int) bool {
return channels[i].TransferID < channels[j].TransferID
})
Expand Down Expand Up @@ -1346,7 +1347,7 @@ func channelStatusString(useColor bool, status datatransfer.Status) string {
}
}

func toChannelOutput(useColor bool, otherPartyColumn string, channel api.DataTransferChannel) map[string]interface{} {
func toChannelOutput(useColor bool, otherPartyColumn string, channel lapi.DataTransferChannel) map[string]interface{} {
rootCid := channel.BaseCID.String()
rootCid = "..." + rootCid[len(rootCid)-8:]

Expand Down
1 change: 1 addition & 0 deletions cmd/lotus-storage-miner/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func main() {
lcli.WithCategory("chain", infoCmd),
lcli.WithCategory("market", storageDealsCmd),
lcli.WithCategory("market", retrievalDealsCmd),
lcli.WithCategory("market", dataTransfersCmd),
lcli.WithCategory("storage", sectorsCmd),
lcli.WithCategory("storage", provingCmd),
lcli.WithCategory("storage", storageCmd),
Expand Down
85 changes: 85 additions & 0 deletions cmd/lotus-storage-miner/market.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"text/tabwriter"
"time"

tm "github.com/buger/goterm"
"github.com/docker/go-units"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-cidutil/cidenc"
Expand Down Expand Up @@ -506,3 +507,87 @@ var setSealDurationCmd = &cli.Command{
return nodeApi.SectorSetExpectedSealDuration(ctx, time.Duration(delay))
},
}

var dataTransfersCmd = &cli.Command{
Name: "data-transfers",
Usage: "Manage data transfers",
Subcommands: []*cli.Command{
transfersListCmd,
},
}

var transfersListCmd = &cli.Command{
Name: "list",
Usage: "List ongoing data transfers for this miner",
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "color",
Usage: "use color in display output",
Value: true,
},
&cli.BoolFlag{
Name: "completed",
Usage: "show completed data transfers",
},
&cli.BoolFlag{
Name: "watch",
Usage: "watch deal updates in real-time, rather than a one time list",
},
},
Action: func(cctx *cli.Context) error {
api, closer, err := lcli.GetStorageMinerAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := lcli.ReqContext(cctx)

channels, err := api.MarketListDataTransfers(ctx)
if err != nil {
return err
}

completed := cctx.Bool("completed")
color := cctx.Bool("color")
watch := cctx.Bool("watch")

if watch {
channelUpdates, err := api.MarketDataTransferUpdates(ctx)
if err != nil {
return err
}

for {
tm.Clear() // Clear current screen

tm.MoveCursor(1, 1)

lcli.OutputDataTransferChannels(tm.Screen, channels, completed, color)

tm.Flush()

select {
case <-ctx.Done():
return nil
case channelUpdate := <-channelUpdates:
var found bool
for i, existing := range channels {
if existing.TransferID == channelUpdate.TransferID &&
existing.OtherPeer == channelUpdate.OtherPeer &&
existing.IsSender == channelUpdate.IsSender &&
existing.IsInitiator == channelUpdate.IsInitiator {
channels[i] = channelUpdate
found = true
break
}
}
if !found {
channels = append(channels, channelUpdate)
}
}
}
}
lcli.OutputDataTransferChannels(os.Stdout, channels, completed, color)
return nil
},
}
36 changes: 2 additions & 34 deletions node/impl/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package client

import (
"context"
"encoding/json"
"fmt"
"io"
"os"
Expand Down Expand Up @@ -773,7 +772,7 @@ func (a *API) ClientListDataTransfers(ctx context.Context) ([]api.DataTransferCh

apiChannels := make([]api.DataTransferChannel, 0, len(inProgressChannels))
for _, channelState := range inProgressChannels {
apiChannels = append(apiChannels, toAPIChannel(a.Host.ID(), channelState))
apiChannels = append(apiChannels, api.NewDataTransferChannel(a.Host.ID(), channelState))
}

return apiChannels, nil
Expand All @@ -783,7 +782,7 @@ func (a *API) ClientDataTransferUpdates(ctx context.Context) (<-chan api.DataTra
channels := make(chan api.DataTransferChannel)

unsub := a.DataTransfer.SubscribeToEvents(func(evt datatransfer.Event, channelState datatransfer.ChannelState) {
channel := toAPIChannel(a.Host.ID(), channelState)
channel := api.NewDataTransferChannel(a.Host.ID(), channelState)
select {
case <-ctx.Done():
case channels <- channel:
Expand All @@ -797,34 +796,3 @@ func (a *API) ClientDataTransferUpdates(ctx context.Context) (<-chan api.DataTra

return channels, nil
}

func toAPIChannel(hostID peer.ID, channelState datatransfer.ChannelState) api.DataTransferChannel {
channel := api.DataTransferChannel{
TransferID: channelState.TransferID(),
Status: channelState.Status(),
BaseCID: channelState.BaseCID(),
IsSender: channelState.Sender() == hostID,
Message: channelState.Message(),
}
stringer, ok := channelState.Voucher().(fmt.Stringer)
if ok {
channel.Voucher = stringer.String()
} else {
voucherJSON, err := json.Marshal(channelState.Voucher())
if err != nil {
channel.Voucher = fmt.Errorf("Voucher Serialization: %w", err).Error()
} else {
channel.Voucher = string(voucherJSON)
}
}
if channel.IsSender {
channel.IsInitiator = !channelState.IsPull()
channel.Transferred = channelState.Sent()
channel.OtherPeer = channelState.Recipient()
} else {
channel.IsInitiator = channelState.IsPull()
channel.Transferred = channelState.Received()
channel.OtherPeer = channelState.Sender()
}
return channel
}
37 changes: 37 additions & 0 deletions node/impl/storminer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ import (
"strconv"
"time"

datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/specs-actors/actors/abi/big"
"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p-core/host"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-address"
Expand Down Expand Up @@ -51,6 +53,8 @@ type StorageMinerAPI struct {
StorageMgr *sectorstorage.Manager `optional:"true"`
IStorageMgr sectorstorage.SectorManager
*stores.Index
DataTransfer dtypes.ProviderDataTransfer
Host host.Host

ConsiderOnlineStorageDealsConfigFunc dtypes.ConsiderOnlineStorageDealsConfigFunc
SetConsiderOnlineStorageDealsConfigFunc dtypes.SetConsiderOnlineStorageDealsConfigFunc
Expand Down Expand Up @@ -354,6 +358,39 @@ func (sm *StorageMinerAPI) MarketGetRetrievalAsk(ctx context.Context) (*retrieva
return sm.RetrievalProvider.GetAsk(), nil
}

func (sm *StorageMinerAPI) MarketListDataTransfers(ctx context.Context) ([]api.DataTransferChannel, error) {
inProgressChannels, err := sm.DataTransfer.InProgressChannels(ctx)
if err != nil {
return nil, err
}

apiChannels := make([]api.DataTransferChannel, 0, len(inProgressChannels))
for _, channelState := range inProgressChannels {
apiChannels = append(apiChannels, api.NewDataTransferChannel(sm.Host.ID(), channelState))
}

return apiChannels, nil
}

func (sm *StorageMinerAPI) MarketDataTransferUpdates(ctx context.Context) (<-chan api.DataTransferChannel, error) {
channels := make(chan api.DataTransferChannel)

unsub := sm.DataTransfer.SubscribeToEvents(func(evt datatransfer.Event, channelState datatransfer.ChannelState) {
channel := api.NewDataTransferChannel(sm.Host.ID(), channelState)
select {
case <-ctx.Done():
case channels <- channel:
}
})

go func() {
defer unsub()
<-ctx.Done()
}()

return channels, nil
}

func (sm *StorageMinerAPI) DealsList(ctx context.Context) ([]storagemarket.StorageDeal, error) {
return sm.StorageProvider.ListDeals(ctx)
}
Expand Down

0 comments on commit cd82f79

Please sign in to comment.