Skip to content

Commit

Permalink
feat(lotus-miner): add data transfer list cmd
Browse files Browse the repository at this point in the history
add equivalent command to list data transfers on miner side, extract common functionality for reuse
  • Loading branch information
hannahhoward committed Aug 20, 2020
1 parent 885f357 commit 0086f76
Show file tree
Hide file tree
Showing 8 changed files with 175 additions and 38 deletions.
2 changes: 2 additions & 0 deletions api/api_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ type StorageMiner interface {
MarketGetAsk(ctx context.Context) (*storagemarket.SignedStorageAsk, error)
MarketSetRetrievalAsk(ctx context.Context, rask *retrievalmarket.Ask) error
MarketGetRetrievalAsk(ctx context.Context) (*retrievalmarket.Ask, error)
MarketListDataTransfers(ctx context.Context) ([]DataTransferChannel, error)
MarketDataTransferUpdates(ctx context.Context) (<-chan DataTransferChannel, error)

DealsImportData(ctx context.Context, dealPropCid cid.Cid, file string) error
DealsList(ctx context.Context) ([]storagemarket.StorageDeal, error)
Expand Down
10 changes: 10 additions & 0 deletions api/apistruct/struct.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,8 @@ type StorageMinerStruct struct {
MarketGetAsk func(ctx context.Context) (*storagemarket.SignedStorageAsk, error) `perm:"read"`
MarketSetRetrievalAsk func(ctx context.Context, rask *retrievalmarket.Ask) error `perm:"admin"`
MarketGetRetrievalAsk func(ctx context.Context) (*retrievalmarket.Ask, error) `perm:"read"`
MarketListDataTransfers func(ctx context.Context) ([]api.DataTransferChannel, error) `perm:"write"`
MarketDataTransferUpdates func(ctx context.Context) (<-chan api.DataTransferChannel, error) `perm:"write"`

PledgeSector func(context.Context) error `perm:"write"`

Expand Down Expand Up @@ -1080,6 +1082,14 @@ func (c *StorageMinerStruct) MarketGetRetrievalAsk(ctx context.Context) (*retrie
return c.Internal.MarketGetRetrievalAsk(ctx)
}

func (c *StorageMinerStruct) MarketListDataTransfers(ctx context.Context) ([]api.DataTransferChannel, error) {
return c.Internal.MarketListDataTransfers(ctx)
}

func (c *StorageMinerStruct) MarketDataTransferUpdates(ctx context.Context) (<-chan api.DataTransferChannel, error) {
return c.Internal.MarketDataTransferUpdates(ctx)
}

func (c *StorageMinerStruct) DealsImportData(ctx context.Context, dealPropCid cid.Cid, file string) error {
return c.Internal.DealsImportData(ctx, dealPropCid, file)
}
Expand Down
33 changes: 33 additions & 0 deletions api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package api

import (
"encoding/json"
"fmt"

"github.com/filecoin-project/go-address"
datatransfer "github.com/filecoin-project/go-data-transfer"
Expand Down Expand Up @@ -117,3 +118,35 @@ type DataTransferChannel struct {
OtherPeer peer.ID
Transferred uint64
}

// NewDataTransferChannel constructs an API DataTransferChannel type from full channel state snapshot and a host id
func NewDataTransferChannel(hostID peer.ID, channelState datatransfer.ChannelState) DataTransferChannel {
channel := DataTransferChannel{
TransferID: channelState.TransferID(),
Status: channelState.Status(),
BaseCID: channelState.BaseCID(),
IsSender: channelState.Sender() == hostID,
Message: channelState.Message(),
}
stringer, ok := channelState.Voucher().(fmt.Stringer)
if ok {
channel.Voucher = stringer.String()
} else {
voucherJSON, err := json.Marshal(channelState.Voucher())
if err != nil {
channel.Voucher = fmt.Errorf("Voucher Serialization: %w", err).Error()
} else {
channel.Voucher = string(voucherJSON)
}
}
if channel.IsSender {
channel.IsInitiator = !channelState.IsPull()
channel.Transferred = channelState.Sent()
channel.OtherPeer = channelState.Recipient()
} else {
channel.IsInitiator = channelState.IsPull()
channel.Transferred = channelState.Received()
channel.OtherPeer = channelState.Sender()
}
return channel
}
9 changes: 5 additions & 4 deletions cli/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1254,7 +1254,7 @@ var clientListTransfers = &cli.Command{

tm.MoveCursor(1, 1)

outputChannels(tm.Screen, channels, completed, color)
OutputDataTransferChannels(tm.Screen, channels, completed, color)

tm.Flush()

Expand All @@ -1279,12 +1279,13 @@ var clientListTransfers = &cli.Command{
}
}
}
outputChannels(os.Stdout, channels, completed, color)
OutputDataTransferChannels(os.Stdout, channels, completed, color)
return nil
},
}

func outputChannels(out io.Writer, channels []api.DataTransferChannel, completed bool, color bool) {
// OutputDataTransferChannels generates table output for a list of channels
func OutputDataTransferChannels(out io.Writer, channels []lapi.DataTransferChannel, completed bool, color bool) {
sort.Slice(channels, func(i, j int) bool {
return channels[i].TransferID < channels[j].TransferID
})
Expand Down Expand Up @@ -1346,7 +1347,7 @@ func channelStatusString(useColor bool, status datatransfer.Status) string {
}
}

func toChannelOutput(useColor bool, otherPartyColumn string, channel api.DataTransferChannel) map[string]interface{} {
func toChannelOutput(useColor bool, otherPartyColumn string, channel lapi.DataTransferChannel) map[string]interface{} {
rootCid := channel.BaseCID.String()
rootCid = "..." + rootCid[len(rootCid)-8:]

Expand Down
1 change: 1 addition & 0 deletions cmd/lotus-storage-miner/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func main() {
lcli.WithCategory("chain", infoCmd),
lcli.WithCategory("market", storageDealsCmd),
lcli.WithCategory("market", retrievalDealsCmd),
lcli.WithCategory("market", dataTransfersCmd),
lcli.WithCategory("storage", sectorsCmd),
lcli.WithCategory("storage", provingCmd),
lcli.WithCategory("storage", storageCmd),
Expand Down
85 changes: 85 additions & 0 deletions cmd/lotus-storage-miner/market.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"text/tabwriter"
"time"

tm "github.com/buger/goterm"
"github.com/docker/go-units"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-cidutil/cidenc"
Expand Down Expand Up @@ -506,3 +507,87 @@ var setSealDurationCmd = &cli.Command{
return nodeApi.SectorSetExpectedSealDuration(ctx, time.Duration(delay))
},
}

var dataTransfersCmd = &cli.Command{
Name: "data-transfers",
Usage: "Manage data transfers",
Subcommands: []*cli.Command{
transfersListCmd,
},
}

var transfersListCmd = &cli.Command{
Name: "list",
Usage: "List ongoing data transfers for this miner",
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "color",
Usage: "use color in display output",
Value: true,
},
&cli.BoolFlag{
Name: "completed",
Usage: "show completed data transfers",
},
&cli.BoolFlag{
Name: "watch",
Usage: "watch deal updates in real-time, rather than a one time list",
},
},
Action: func(cctx *cli.Context) error {
api, closer, err := lcli.GetStorageMinerAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := lcli.ReqContext(cctx)

channels, err := api.MarketListDataTransfers(ctx)
if err != nil {
return err
}

completed := cctx.Bool("completed")
color := cctx.Bool("color")
watch := cctx.Bool("watch")

if watch {
channelUpdates, err := api.MarketDataTransferUpdates(ctx)
if err != nil {
return err
}

for {
tm.Clear() // Clear current screen

tm.MoveCursor(1, 1)

lcli.OutputDataTransferChannels(tm.Screen, channels, completed, color)

tm.Flush()

select {
case <-ctx.Done():
return nil
case channelUpdate := <-channelUpdates:
var found bool
for i, existing := range channels {
if existing.TransferID == channelUpdate.TransferID &&
existing.OtherPeer == channelUpdate.OtherPeer &&
existing.IsSender == channelUpdate.IsSender &&
existing.IsInitiator == channelUpdate.IsInitiator {
channels[i] = channelUpdate
found = true
break
}
}
if !found {
channels = append(channels, channelUpdate)
}
}
}
}
lcli.OutputDataTransferChannels(os.Stdout, channels, completed, color)
return nil
},
}
36 changes: 2 additions & 34 deletions node/impl/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package client

import (
"context"
"encoding/json"
"fmt"
"io"
"os"
Expand Down Expand Up @@ -773,7 +772,7 @@ func (a *API) ClientListDataTransfers(ctx context.Context) ([]api.DataTransferCh

apiChannels := make([]api.DataTransferChannel, 0, len(inProgressChannels))
for _, channelState := range inProgressChannels {
apiChannels = append(apiChannels, toAPIChannel(a.Host.ID(), channelState))
apiChannels = append(apiChannels, api.NewDataTransferChannel(a.Host.ID(), channelState))
}

return apiChannels, nil
Expand All @@ -783,7 +782,7 @@ func (a *API) ClientDataTransferUpdates(ctx context.Context) (<-chan api.DataTra
channels := make(chan api.DataTransferChannel)

unsub := a.DataTransfer.SubscribeToEvents(func(evt datatransfer.Event, channelState datatransfer.ChannelState) {
channel := toAPIChannel(a.Host.ID(), channelState)
channel := api.NewDataTransferChannel(a.Host.ID(), channelState)
select {
case <-ctx.Done():
case channels <- channel:
Expand All @@ -797,34 +796,3 @@ func (a *API) ClientDataTransferUpdates(ctx context.Context) (<-chan api.DataTra

return channels, nil
}

func toAPIChannel(hostID peer.ID, channelState datatransfer.ChannelState) api.DataTransferChannel {
channel := api.DataTransferChannel{
TransferID: channelState.TransferID(),
Status: channelState.Status(),
BaseCID: channelState.BaseCID(),
IsSender: channelState.Sender() == hostID,
Message: channelState.Message(),
}
stringer, ok := channelState.Voucher().(fmt.Stringer)
if ok {
channel.Voucher = stringer.String()
} else {
voucherJSON, err := json.Marshal(channelState.Voucher())
if err != nil {
channel.Voucher = fmt.Errorf("Voucher Serialization: %w", err).Error()
} else {
channel.Voucher = string(voucherJSON)
}
}
if channel.IsSender {
channel.IsInitiator = !channelState.IsPull()
channel.Transferred = channelState.Sent()
channel.OtherPeer = channelState.Recipient()
} else {
channel.IsInitiator = channelState.IsPull()
channel.Transferred = channelState.Received()
channel.OtherPeer = channelState.Sender()
}
return channel
}
37 changes: 37 additions & 0 deletions node/impl/storminer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ import (
"strconv"
"time"

datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/specs-actors/actors/abi/big"
"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p-core/host"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-address"
Expand Down Expand Up @@ -51,6 +53,8 @@ type StorageMinerAPI struct {
StorageMgr *sectorstorage.Manager `optional:"true"`
IStorageMgr sectorstorage.SectorManager
*stores.Index
DataTransfer dtypes.ProviderDataTransfer
Host host.Host

ConsiderOnlineStorageDealsConfigFunc dtypes.ConsiderOnlineStorageDealsConfigFunc
SetConsiderOnlineStorageDealsConfigFunc dtypes.SetConsiderOnlineStorageDealsConfigFunc
Expand Down Expand Up @@ -354,6 +358,39 @@ func (sm *StorageMinerAPI) MarketGetRetrievalAsk(ctx context.Context) (*retrieva
return sm.RetrievalProvider.GetAsk(), nil
}

func (sm *StorageMinerAPI) MarketListDataTransfers(ctx context.Context) ([]api.DataTransferChannel, error) {
inProgressChannels, err := sm.DataTransfer.InProgressChannels(ctx)
if err != nil {
return nil, err
}

apiChannels := make([]api.DataTransferChannel, 0, len(inProgressChannels))
for _, channelState := range inProgressChannels {
apiChannels = append(apiChannels, api.NewDataTransferChannel(sm.Host.ID(), channelState))
}

return apiChannels, nil
}

func (sm *StorageMinerAPI) MarketDataTransferUpdates(ctx context.Context) (<-chan api.DataTransferChannel, error) {
channels := make(chan api.DataTransferChannel)

unsub := sm.DataTransfer.SubscribeToEvents(func(evt datatransfer.Event, channelState datatransfer.ChannelState) {
channel := api.NewDataTransferChannel(sm.Host.ID(), channelState)
select {
case <-ctx.Done():
case channels <- channel:
}
})

go func() {
defer unsub()
<-ctx.Done()
}()

return channels, nil
}

func (sm *StorageMinerAPI) DealsList(ctx context.Context) ([]storagemarket.StorageDeal, error) {
return sm.StorageProvider.ListDeals(ctx)
}
Expand Down

0 comments on commit 0086f76

Please sign in to comment.