diff --git a/api/api.go b/api/api.go index 56e71b603..e75054a08 100644 --- a/api/api.go +++ b/api/api.go @@ -35,6 +35,8 @@ type Boost interface { // MethodGroup: Boost BoostIndexerAnnounceAllDeals(ctx context.Context) error //perm:admin BoostIndexerListMultihashes(ctx context.Context, proposalCid cid.Cid) ([]multihash.Multihash, error) //perm:admin + BoostIndexerAnnounceLatest(ctx context.Context) (cid.Cid, error) //perm:admin + BoostIndexerAnnounceLatestHttp(ctx context.Context, urls []string) (cid.Cid, error) //perm:admin BoostOfflineDealWithData(ctx context.Context, dealUuid uuid.UUID, filePath string, delAfterImport bool) (*ProviderDealRejectionInfo, error) //perm:admin BoostDeal(ctx context.Context, dealUuid uuid.UUID) (*smtypes.ProviderDealState, error) //perm:admin BoostDealBySignedProposalCid(ctx context.Context, proposalCid cid.Cid) (*smtypes.ProviderDealState, error) //perm:admin diff --git a/api/proxy_gen.go b/api/proxy_gen.go index c50f44d42..bbdefb2ae 100644 --- a/api/proxy_gen.go +++ b/api/proxy_gen.go @@ -68,6 +68,10 @@ type BoostStruct struct { BoostIndexerAnnounceAllDeals func(p0 context.Context) error `perm:"admin"` + BoostIndexerAnnounceLatest func(p0 context.Context) (cid.Cid, error) `perm:"admin"` + + BoostIndexerAnnounceLatestHttp func(p0 context.Context, p1 []string) (cid.Cid, error) `perm:"admin"` + BoostIndexerListMultihashes func(p0 context.Context, p1 cid.Cid) ([]multihash.Multihash, error) `perm:"admin"` BoostMakeDeal func(p0 context.Context, p1 smtypes.DealParams) (*ProviderDealRejectionInfo, error) `perm:"write"` @@ -437,6 +441,28 @@ func (s *BoostStub) BoostIndexerAnnounceAllDeals(p0 context.Context) error { return ErrNotSupported } +func (s *BoostStruct) BoostIndexerAnnounceLatest(p0 context.Context) (cid.Cid, error) { + if s.Internal.BoostIndexerAnnounceLatest == nil { + return *new(cid.Cid), ErrNotSupported + } + return s.Internal.BoostIndexerAnnounceLatest(p0) +} + +func (s *BoostStub) BoostIndexerAnnounceLatest(p0 context.Context) (cid.Cid, error) { + return *new(cid.Cid), ErrNotSupported +} + +func (s *BoostStruct) BoostIndexerAnnounceLatestHttp(p0 context.Context, p1 []string) (cid.Cid, error) { + if s.Internal.BoostIndexerAnnounceLatestHttp == nil { + return *new(cid.Cid), ErrNotSupported + } + return s.Internal.BoostIndexerAnnounceLatestHttp(p0, p1) +} + +func (s *BoostStub) BoostIndexerAnnounceLatestHttp(p0 context.Context, p1 []string) (cid.Cid, error) { + return *new(cid.Cid), ErrNotSupported +} + func (s *BoostStruct) BoostIndexerListMultihashes(p0 context.Context, p1 cid.Cid) ([]multihash.Multihash, error) { if s.Internal.BoostIndexerListMultihashes == nil { return *new([]multihash.Multihash), ErrNotSupported diff --git a/build/openrpc/boost.json.gz b/build/openrpc/boost.json.gz index aab22def9..bf0cce0a7 100644 Binary files a/build/openrpc/boost.json.gz and b/build/openrpc/boost.json.gz differ diff --git a/build/version.go b/build/version.go index 5c2f02cfc..dbc1ffc08 100644 --- a/build/version.go +++ b/build/version.go @@ -2,7 +2,7 @@ package build var CurrentCommit string -const BuildVersion = "1.7.3-rc1" +const BuildVersion = "1.7.3-rc3" func UserVersion() string { return BuildVersion + CurrentCommit diff --git a/cmd/boostd/import_data.go b/cmd/boostd/import_data.go index 92aed947e..bed8bcb33 100644 --- a/cmd/boostd/import_data.go +++ b/cmd/boostd/import_data.go @@ -67,9 +67,6 @@ var importDataCmd = &cli.Command{ // If the user has supplied a signed proposal cid deleteAfterImport := cctx.Bool("delete-after-import") if proposalCid != nil { - if deleteAfterImport { - return fmt.Errorf("legacy deal data cannot be automatically deleted after import (only new deals)") - } // Look up the deal in the boost database deal, err := napi.BoostDealBySignedProposalCid(cctx.Context, *proposalCid) @@ -80,6 +77,10 @@ var importDataCmd = &cli.Command{ return err } + if deleteAfterImport { + return fmt.Errorf("cannot find boost deal with proposal cid %s and legacy deal data cannot be automatically deleted after import (only new deals)", proposalCid) + } + // The deal is not in the boost database, try the legacy // markets datastore (v1.1.0 deal) err := napi.MarketImportDealData(cctx.Context, *proposalCid, filePath) diff --git a/cmd/boostd/index.go b/cmd/boostd/index.go index 3008ea184..903f3e0b8 100644 --- a/cmd/boostd/index.go +++ b/cmd/boostd/index.go @@ -14,6 +14,8 @@ var indexProvCmd = &cli.Command{ Subcommands: []*cli.Command{ indexProvAnnounceAllCmd, indexProvListMultihashesCmd, + indexProvAnnounceLatest, + indexProvAnnounceLatestHttp, }, } @@ -64,11 +66,63 @@ var indexProvListMultihashesCmd = &cli.Command{ if err != nil { return err } - + fmt.Printf("Found %d multihashes for deal with proposal cid %s:\n", len(mhs), propCid) for _, mh := range mhs { fmt.Println(" " + mh.String()) } + + return nil + }, +} + +var indexProvAnnounceLatest = &cli.Command{ + Name: "announce-latest", + Usage: "Re-publish the latest existing advertisement to pubsub", + Action: func(cctx *cli.Context) error { + ctx := lcli.ReqContext(cctx) + + napi, closer, err := bcli.GetBoostAPI(cctx) + if err != nil { + return err + } + defer closer() + + c, err := napi.BoostIndexerAnnounceLatest(ctx) + if err != nil { + return err + } + + fmt.Printf("Announced advertisement with cid %s\n", c) + return nil + }, +} + +var indexProvAnnounceLatestHttp = &cli.Command{ + Name: "announce-latest-http", + Usage: "Re-publish the latest existing advertisement to specific indexers over http", + Flags: []cli.Flag{ + &cli.StringSliceFlag{ + Name: "announce-url", + Usage: "The url(s) to announce to. If not specified, announces to the http urls in config", + Required: false, + }, + }, + Action: func(cctx *cli.Context) error { + ctx := lcli.ReqContext(cctx) + + napi, closer, err := bcli.GetBoostAPI(cctx) + if err != nil { + return err + } + defer closer() + + c, err := napi.BoostIndexerAnnounceLatestHttp(ctx, cctx.StringSlice("announce-url")) + if err != nil { + return err + } + + fmt.Printf("Announced advertisement to indexers over http with cid %s\n", c) return nil }, } diff --git a/cmd/boostd/init.go b/cmd/boostd/init.go index 76ffc7838..97ed8fa2a 100644 --- a/cmd/boostd/init.go +++ b/cmd/boostd/init.go @@ -493,7 +493,13 @@ func migrateMarketsConfig(cctx *cli.Context, mktsRepo lotus_repo.LockedRepo, boo // Clear the DAG store root dir config, because the DAG store is no longer configurable in Boost // (it is always at /dagstore rcfg.DAGStore.RootDir = "" - rcfg.IndexProvider = mktsCfg.IndexProvider + rcfg.IndexProvider = config.IndexProviderConfig{ + Enable: mktsCfg.IndexProvider.Enable, + EntriesCacheCapacity: mktsCfg.IndexProvider.EntriesCacheCapacity, + EntriesChunkSize: mktsCfg.IndexProvider.EntriesChunkSize, + TopicName: mktsCfg.IndexProvider.TopicName, + PurgeCacheOnStart: mktsCfg.IndexProvider.PurgeCacheOnStart, + } rcfg.IndexProvider.Enable = true // Enable index provider in Boost by default if fromMonolith { diff --git a/cmd/booster-http/http_test.go b/cmd/booster-http/http_test.go index 20bf4ecda..3fe058ef9 100644 --- a/cmd/booster-http/http_test.go +++ b/cmd/booster-http/http_test.go @@ -22,7 +22,7 @@ const testFile = "test/test_file" func TestNewHttpServer(t *testing.T) { // Create a new mock Http server ctrl := gomock.NewController(t) - httpServer := NewHttpServer("", 7777, mocks_booster_http.NewMockHttpServerApi(ctrl), nil) + httpServer := NewHttpServer("", "0.0.0.0", 7777, mocks_booster_http.NewMockHttpServerApi(ctrl), nil) err := httpServer.Start(context.Background()) require.NoError(t, err) waitServerUp(t, 7777) @@ -41,7 +41,7 @@ func TestHttpGzipResponse(t *testing.T) { // Create a new mock Http server with custom functions ctrl := gomock.NewController(t) mockHttpServer := mocks_booster_http.NewMockHttpServerApi(ctrl) - httpServer := NewHttpServer("", 7777, mockHttpServer, nil) + httpServer := NewHttpServer("", "0.0.0.0", 7777, mockHttpServer, nil) err := httpServer.Start(context.Background()) require.NoError(t, err) waitServerUp(t, 7777) @@ -99,7 +99,7 @@ func TestHttpInfo(t *testing.T) { // Create a new mock Http server ctrl := gomock.NewController(t) - httpServer := NewHttpServer("", 7777, mocks_booster_http.NewMockHttpServerApi(ctrl), nil) + httpServer := NewHttpServer("", "0.0.0.0", 7777, mocks_booster_http.NewMockHttpServerApi(ctrl), nil) err := httpServer.Start(context.Background()) require.NoError(t, err) waitServerUp(t, 7777) diff --git a/cmd/booster-http/mocks/mock_booster_http.go b/cmd/booster-http/mocks/mock_booster_http.go index 77cb30a93..2392f3ce9 100644 --- a/cmd/booster-http/mocks/mock_booster_http.go +++ b/cmd/booster-http/mocks/mock_booster_http.go @@ -1,8 +1,8 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: cmd/booster-http/server.go +// Source: server.go -// Package mock_main is a generated GoMock package. -package mock_main +// Package mocks_booster_http is a generated GoMock package. +package mocks_booster_http import ( context "context" diff --git a/cmd/booster-http/run.go b/cmd/booster-http/run.go index 255e123e4..5eed671d4 100644 --- a/cmd/booster-http/run.go +++ b/cmd/booster-http/run.go @@ -40,6 +40,12 @@ var runCmd = &cli.Command{ Usage: "the base path at which to run the web server", Value: "", }, + &cli.StringFlag{ + Name: "address", + Aliases: []string{"addr"}, + Usage: "the listen address for the web server", + Value: "0.0.0.0", + }, &cli.UintFlag{ Name: "port", Usage: "the port the web server listens on", @@ -207,6 +213,7 @@ var runCmd = &cli.Command{ sapi := serverApi{ctx: ctx, piecedirectory: pd, sa: sa} server := NewHttpServer( cctx.String("base-path"), + cctx.String("address"), cctx.Int("port"), sapi, opts, @@ -216,8 +223,8 @@ var runCmd = &cli.Command{ pd.Start(ctx) // Start the server - log.Infof("Starting booster-http node on port %d with base path '%s'", - cctx.Int("port"), cctx.String("base-path")) + log.Infof("Starting booster-http node on listen address %s and port %d with base path '%s'", + cctx.String("address"), cctx.Int("port"), cctx.String("base-path")) err = server.Start(ctx) if err != nil { return fmt.Errorf("starting http server: %w", err) diff --git a/cmd/booster-http/server.go b/cmd/booster-http/server.go index 03c38f6b7..94d33c04c 100644 --- a/cmd/booster-http/server.go +++ b/cmd/booster-http/server.go @@ -42,11 +42,12 @@ type apiVersion struct { } type HttpServer struct { - path string - port int - api HttpServerApi - opts HttpServerOptions - idxPage string + path string + listenAddr string + port int + api HttpServerApi + opts HttpServerOptions + idxPage string ctx context.Context cancel context.CancelFunc @@ -65,11 +66,11 @@ type HttpServerOptions struct { SupportedResponseFormats []string } -func NewHttpServer(path string, port int, api HttpServerApi, opts *HttpServerOptions) *HttpServer { +func NewHttpServer(path string, listenAddr string, port int, api HttpServerApi, opts *HttpServerOptions) *HttpServer { if opts == nil { opts = &HttpServerOptions{ServePieces: true} } - return &HttpServer{path: path, port: port, api: api, opts: *opts, idxPage: parseTemplate(*opts)} + return &HttpServer{path: path, listenAddr: listenAddr, port: port, api: api, opts: *opts, idxPage: parseTemplate(*opts)} } func (s *HttpServer) pieceBasePath() string { @@ -102,7 +103,7 @@ func (s *HttpServer) Start(ctx context.Context) error { handler.HandleFunc("/info", s.handleInfo) handler.Handle("/metrics", metrics.Exporter("booster_http")) // metrics s.server = &http.Server{ - Addr: fmt.Sprintf(":%d", s.port), + Addr: fmt.Sprintf("%s:%d", s.listenAddr, s.port), Handler: handler, // This context will be the parent of the context associated with all // incoming requests diff --git a/cmd/boostx/stats_cmd.go b/cmd/boostx/stats_cmd.go index 81ddced69..1b8775955 100644 --- a/cmd/boostx/stats_cmd.go +++ b/cmd/boostx/stats_cmd.go @@ -2,13 +2,14 @@ package main import ( "fmt" - "github.com/filecoin-project/boost/retrievalmarket/lp2pimpl" - transports_types "github.com/filecoin-project/boost/retrievalmarket/types" "regexp" "sort" "strings" "sync" + "github.com/filecoin-project/boost/retrievalmarket/lp2pimpl" + transports_types "github.com/filecoin-project/boost/retrievalmarket/types" + clinode "github.com/filecoin-project/boost/cli/node" "github.com/filecoin-project/boost/cmd" "github.com/filecoin-project/boostd-data/shared/cliutil" @@ -223,7 +224,7 @@ var statsCmd = &cli.Command{ fmt.Println("Total Boost nodes:", boostNodes) fmt.Println("Total Boost raw power:", boostRawBytePower) fmt.Println("Total Boost quality adj power:", boostQualityAdjPower) - fmt.Println("Total Lotus Markets nodes:", marketsNodes) + fmt.Println("Total Markets nodes:", marketsNodes) fmt.Println("Total SPs with minimum power: ", len(withMinPower)) fmt.Println("Total Indexer nodes:", indexerNodes) diff --git a/docker/devnet/boost/entrypoint.sh b/docker/devnet/boost/entrypoint.sh index 7716ef0c3..025a8d97e 100755 --- a/docker/devnet/boost/entrypoint.sh +++ b/docker/devnet/boost/entrypoint.sh @@ -40,6 +40,7 @@ if [ ! -f $BOOST_PATH/.init.boost ]; then echo Setting port in boost config... sed -i 's|ip4/0.0.0.0/tcp/0|ip4/0.0.0.0/tcp/50000|g' $BOOST_PATH/config.toml + sed -i 's|127.0.0.1|0.0.0.0|g' $BOOST_PATH/config.toml echo Done touch $BOOST_PATH/.init.boost diff --git a/documentation/en/api-v1-methods.md b/documentation/en/api-v1-methods.md index a1fb094bf..4bed0fd6b 100644 --- a/documentation/en/api-v1-methods.md +++ b/documentation/en/api-v1-methods.md @@ -23,6 +23,8 @@ * [BoostDealBySignedProposalCid](#boostdealbysignedproposalcid) * [BoostDummyDeal](#boostdummydeal) * [BoostIndexerAnnounceAllDeals](#boostindexerannouncealldeals) + * [BoostIndexerAnnounceLatest](#boostindexerannouncelatest) + * [BoostIndexerAnnounceLatestHttp](#boostindexerannouncelatesthttp) * [BoostIndexerListMultihashes](#boostindexerlistmultihashes) * [BoostMakeDeal](#boostmakedeal) * [BoostOfflineDealWithData](#boostofflinedealwithdata) @@ -560,6 +562,41 @@ Inputs: `null` Response: `{}` +### BoostIndexerAnnounceLatest + + +Perms: admin + +Inputs: `null` + +Response: +```json +{ + "/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4" +} +``` + +### BoostIndexerAnnounceLatestHttp + + +Perms: admin + +Inputs: +```json +[ + [ + "string value" + ] +] +``` + +Response: +```json +{ + "/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4" +} +``` + ### BoostIndexerListMultihashes diff --git a/go.mod b/go.mod index ca4eda9cf..79bed3db8 100644 --- a/go.mod +++ b/go.mod @@ -76,7 +76,7 @@ require ( github.com/ipld/go-car/v2 v2.7.0 github.com/ipld/go-ipld-prime v0.20.0 github.com/ipld/go-ipld-selector-text-lite v0.0.1 - github.com/ipni/index-provider v0.11.1 + github.com/ipni/index-provider v0.11.2 github.com/ipni/storetheindex v0.5.10 github.com/jbenet/go-random v0.0.0-20190219211222-123a90aedc0c github.com/jpillora/backoff v1.0.0 diff --git a/go.sum b/go.sum index ce6f22c48..920982a8c 100644 --- a/go.sum +++ b/go.sum @@ -981,8 +981,8 @@ github.com/ipld/go-ipld-prime/storage/bsadapter v0.0.0-20211210234204-ce2a1c70cd github.com/ipld/go-ipld-prime/storage/bsadapter v0.0.0-20230102063945-1a409dc236dd h1:gMlw/MhNr2Wtp5RwGdsW23cs+yCuj9k2ON7i9MiJlRo= github.com/ipld/go-ipld-selector-text-lite v0.0.1 h1:lNqFsQpBHc3p5xHob2KvEg/iM5dIFn6iw4L/Hh+kS1Y= github.com/ipld/go-ipld-selector-text-lite v0.0.1/go.mod h1:U2CQmFb+uWzfIEF3I1arrDa5rwtj00PrpiwwCO+k1RM= -github.com/ipni/index-provider v0.11.1 h1:viNfSBvZA9G+Qe6/FGqfZtavnu4tTSfGUoWEECavqoI= -github.com/ipni/index-provider v0.11.1/go.mod h1:gB/wN4Mdz4MzikQubjyRRV97iS5BkD4FKB0U/bF/dY4= +github.com/ipni/index-provider v0.11.2 h1:nvykWK+/ncPTqHiuiJdXp/O0UF0V7iWesjHGKX//NYc= +github.com/ipni/index-provider v0.11.2/go.mod h1:gB/wN4Mdz4MzikQubjyRRV97iS5BkD4FKB0U/bF/dY4= github.com/ipni/storetheindex v0.5.10 h1:r97jIZsXPuwQvePJQuStu2a/kn+Zn8X4MAdA0rU2Pu4= github.com/ipni/storetheindex v0.5.10/go.mod h1:SJKFCnSx4X/4ekQuZvq8pVU/7tmxkEv632Qmgu3m2bQ= github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52 h1:QG4CGBqCeuBo6aZlGAamSkxWdgWfZGeE49eUOWJPA4c= diff --git a/gql/resolver.go b/gql/resolver.go index 751cb4946..dd21dda01 100644 --- a/gql/resolver.go +++ b/gql/resolver.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "math" + "time" "github.com/dustin/go-humanize" "github.com/filecoin-project/boost-gfm/piecestore" @@ -26,6 +27,7 @@ import ( "github.com/filecoin-project/boost/storagemarket/types/dealcheckpoints" "github.com/filecoin-project/boost/transport" "github.com/filecoin-project/lotus/api/v1api" + "github.com/filecoin-project/lotus/build" lotus_repo "github.com/filecoin-project/lotus/node/repo" "github.com/google/uuid" "github.com/graph-gophers/graphql-go" @@ -531,14 +533,14 @@ func (dr *dealResolver) Retry() string { } func (dr *dealResolver) Message(ctx context.Context) string { - msg := dr.message(ctx, dr.ProviderDealState.Checkpoint) + msg := dr.message(ctx, dr.ProviderDealState.Checkpoint, dr.ProviderDealState.CheckpointAt) if dr.ProviderDealState.Retry != types.DealRetryFatal && dr.ProviderDealState.Err != "" { msg = "Paused at '" + msg + "': " + dr.ProviderDealState.Err } return msg } -func (dr *dealResolver) message(ctx context.Context, checkpoint dealcheckpoints.Checkpoint) string { +func (dr *dealResolver) message(ctx context.Context, checkpoint dealcheckpoints.Checkpoint, checkpointAt time.Time) string { switch checkpoint { case dealcheckpoints.Accepted: if dr.IsOffline { @@ -574,7 +576,9 @@ func (dr *dealResolver) message(ctx context.Context, checkpoint dealcheckpoints. case dealcheckpoints.Transferred: return "Ready to Publish" case dealcheckpoints.Published: - return "Awaiting Publish Confirmation" + elapsedEpochs := uint64(time.Since(checkpointAt).Seconds()) / build.BlockDelaySecs + confidenceEpochs := build.MessageConfidence * 2 + return fmt.Sprintf("Awaiting Publish Confirmation (%d/%d epochs)", elapsedEpochs, confidenceEpochs) case dealcheckpoints.PublishConfirmed: return "Adding to Sector" case dealcheckpoints.AddedPiece: diff --git a/gql/resolver_mpool.go b/gql/resolver_mpool.go index f60a4b9db..7e2db36f8 100644 --- a/gql/resolver_mpool.go +++ b/gql/resolver_mpool.go @@ -4,17 +4,16 @@ import ( "bytes" "context" "encoding/json" - "errors" "fmt" + "reflect" "github.com/filecoin-project/lotus/chain/consensus" + cbg "github.com/whyrusleeping/cbor-gen" gqltypes "github.com/filecoin-project/boost/gql/types" "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-state-types/abi" - stbig "github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/lotus/chain/types" - "github.com/filecoin-project/specs-actors/actors/builtin/multisig" ) type msg struct { @@ -65,22 +64,24 @@ func (r *resolver) Mpool(ctx context.Context, args struct{ Local bool }) ([]*msg } } - method := m.Message.Method.String() + var params string + methodName := m.Message.Method.String() toact, err := r.fullNode.StateGetActor(ctx, m.Message.To, types.EmptyTSK) if err == nil { - method = consensus.NewActorRegistry().Methods[toact.Code][m.Message.Method].Params.Name() - } - - var params string - paramsMsg, err := messageFromBytes(m.Message.Params) - if err != nil { - params = err.Error() - } else { - paramsBytes, err := json.MarshalIndent(paramsMsg, "", " ") - if err != nil { - params = err.Error() - } else { - params = string(paramsBytes) + method, ok := consensus.NewActorRegistry().Methods[toact.Code][m.Message.Method] + if ok { + methodName = method.Name + + params = string(m.Message.Params) + p, ok := reflect.New(method.Params.Elem()).Interface().(cbg.CBORUnmarshaler) + if ok { + if err := p.UnmarshalCBOR(bytes.NewReader(m.Message.Params)); err == nil { + b, err := json.MarshalIndent(p, "", " ") + if err == nil { + params = string(b) + } + } + } } } @@ -92,7 +93,7 @@ func (r *resolver) Mpool(ctx context.Context, args struct{ Local bool }) ([]*msg GasFeeCap: gqltypes.BigInt{Int: m.Message.GasFeeCap}, GasLimit: gqltypes.Uint64(uint64(m.Message.GasLimit)), GasPremium: gqltypes.BigInt{Int: m.Message.GasPremium}, - Method: method, + Method: methodName, Params: params, BaseFee: gqltypes.BigInt{Int: baseFee}, }) @@ -101,83 +102,6 @@ func (r *resolver) Mpool(ctx context.Context, args struct{ Local bool }) ([]*msg return gqlmsgs, nil } -func messageFromBytes(msgb []byte) (types.ChainMsg, error) { - // Signed - { - var msg types.SignedMessage - if err := msg.UnmarshalCBOR(bytes.NewReader(msgb)); err == nil { - return &msg, nil - } - } - - // Unsigned - { - var msg types.Message - if err := msg.UnmarshalCBOR(bytes.NewReader(msgb)); err == nil { - return &msg, nil - } - } - - // Multisig propose? - { - var pp multisig.ProposeParams - if err := pp.UnmarshalCBOR(bytes.NewReader(msgb)); err == nil { - i, err := address.NewIDAddress(0) - if err != nil { - return nil, err - } - - return &types.Message{ - // Hack(-ish) - Version: 0x6d736967, - From: i, - - To: pp.To, - Value: pp.Value, - - Method: pp.Method, - Params: pp.Params, - - GasFeeCap: stbig.Zero(), - GasPremium: stbig.Zero(), - }, nil - } - } - - // Encoded json??? - { - if msg, err := messageFromJson(msgb); err == nil { - return msg, nil - } - } - - return nil, errors.New("probably not a cbor-serialized message") -} - -func messageFromJson(msgb []byte) (types.ChainMsg, error) { - // Unsigned - { - var msg types.Message - if err := json.Unmarshal(msgb, &msg); err == nil { - if msg.To != address.Undef { - return &msg, nil - } - } - } - - // Signed - { - var msg types.SignedMessage - if err := json.Unmarshal(msgb, &msg); err == nil { - if msg.Message.To != address.Undef { - return &msg, nil - } - } - } - - return nil, errors.New("probably not a json-serialized message") -} - func mockMessages() []*types.SignedMessage { to0, _ := address.NewFromString("f01469945") from0, _ := address.NewFromString("f3uakndzne4lorwykinlitx2d2puuhgburvxw4dpkfskeofmzg33pm7okyzikqe2gzvaqj2k3hpunwayij6haa") diff --git a/indexprovider/wrapper.go b/indexprovider/wrapper.go index 773caf849..c8fe5095a 100644 --- a/indexprovider/wrapper.go +++ b/indexprovider/wrapper.go @@ -2,11 +2,9 @@ package indexprovider import ( "context" + "database/sql" "errors" "fmt" - "os" - "path/filepath" - "github.com/filecoin-project/boost-gfm/storagemarket" gfm_storagemarket "github.com/filecoin-project/boost-gfm/storagemarket" "github.com/filecoin-project/boost/db" @@ -16,6 +14,7 @@ import ( "github.com/filecoin-project/boost/sectorstatemgr" "github.com/filecoin-project/boost/storagemarket/types" "github.com/filecoin-project/boost/storagemarket/types/dealcheckpoints" + bdtypes "github.com/filecoin-project/boostd-data/svc/types" "github.com/filecoin-project/go-address" cborutil "github.com/filecoin-project/go-cbor-util" "github.com/filecoin-project/go-state-types/abi" @@ -23,15 +22,21 @@ import ( "github.com/filecoin-project/lotus/node/repo" "github.com/hashicorp/go-multierror" "github.com/ipfs/go-cid" + "github.com/ipfs/go-datastore" logging "github.com/ipfs/go-log/v2" + "github.com/ipld/go-ipld-prime" provider "github.com/ipni/index-provider" + "github.com/ipni/index-provider/engine" "github.com/ipni/index-provider/engine/xproviders" "github.com/ipni/index-provider/metadata" "github.com/libp2p/go-libp2p/core/crypto" - host "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" "github.com/multiformats/go-multihash" "go.uber.org/fx" + "net/url" + "os" + "path/filepath" ) var log = logging.Logger("index-provider-wrapper") @@ -436,50 +441,133 @@ func (w *Wrapper) IndexerAnnounceAllDeals(ctx context.Context) error { return merr } +// While ingesting cids for each piece, if there is an error the indexer +// checks if the error contains the string "content not found": +// - if so, the indexer skips the piece and continues ingestion +// - if not, the indexer pauses ingestion +var ErrStringSkipAdIngest = "content not found" + +func skipError(err error) error { + return fmt.Errorf("%s: %s: %w", ErrStringSkipAdIngest, err.Error(), ipld.ErrNotExists{}) +} + +func (w *Wrapper) IndexerAnnounceLatest(ctx context.Context) (cid.Cid, error) { + e, ok := w.prov.(*engine.Engine) + if !ok { + return cid.Undef, fmt.Errorf("index provider is disabled") + } + return e.PublishLatest(ctx) +} + +func (w *Wrapper) IndexerAnnounceLatestHttp(ctx context.Context, announceUrls []string) (cid.Cid, error) { + e, ok := w.prov.(*engine.Engine) + if !ok { + return cid.Undef, fmt.Errorf("index provider is disabled") + } + + if len(announceUrls) == 0 { + announceUrls = w.cfg.IndexProvider.Announce.DirectAnnounceURLs + } + + urls := make([]*url.URL, 0, len(announceUrls)) + for _, us := range announceUrls { + u, err := url.Parse(us) + if err != nil { + return cid.Undef, fmt.Errorf("parsing url %s: %w", us, err) + } + urls = append(urls, u) + } + return e.PublishLatestHTTP(ctx, urls...) +} + func (w *Wrapper) MultihashLister(ctx context.Context, prov peer.ID, contextID []byte) (provider.MultihashIterator, error) { - provideF := func(pieceCid cid.Cid) (provider.MultihashIterator, error) { + provideF := func(proposalCid cid.Cid, pieceCid cid.Cid) (provider.MultihashIterator, error) { ii, err := w.piecedirectory.GetIterableIndex(ctx, pieceCid) if err != nil { - return nil, fmt.Errorf("failed to get iterable index: %w", err) + e := fmt.Errorf("failed to get iterable index: %w", err) + if bdtypes.IsNotFound(err) { + // If it's a not found error, skip over this piece and continue ingesting + log.Infow("skipping ingestion: piece not found", "piece", pieceCid, "propCid", proposalCid, "err", e) + return nil, skipError(e) + } + + // Some other error, pause ingestion + log.Infow("pausing ingestion: error getting piece", "piece", pieceCid, "propCid", proposalCid, "err", e) + return nil, e } - // Check if there are any records in the iterator. If there are no - // records, the multihash lister expects us to return an error. + // Check if there are any records in the iterator. hasRecords := ii.ForEach(func(_ multihash.Multihash, _ uint64) error { return fmt.Errorf("has at least one record") }) if hasRecords == nil { - return nil, fmt.Errorf("no records found for piece %s", pieceCid) + // If there are no records, it's effectively the same as a not + // found error. Skip over this piece and continue ingesting. + e := fmt.Errorf("no records found for piece %s", pieceCid) + log.Infow("skipping ingestion: piece has no records", "piece", pieceCid, "propCid", proposalCid, "err", e) + return nil, skipError(e) } mhi, err := provider.CarMultihashIterator(ii) if err != nil { - return nil, fmt.Errorf("failed to get mhiterator: %w", err) + // Bad index, skip over this piece and continue ingesting + err = fmt.Errorf("failed to get mhiterator: %w", err) + log.Infow("skipping ingestion", "piece", pieceCid, "propCid", proposalCid, "err", err) + return nil, skipError(err) } + + log.Debugw("returning piece iterator", "piece", pieceCid, "propCid", proposalCid, "err", err) return mhi, nil } // convert context ID to proposal Cid proposalCid, err := cid.Cast(contextID) if err != nil { - return nil, fmt.Errorf("failed to cast context ID to a cid") + // Bad contextID, skip over this piece and continue ingesting + err = fmt.Errorf("failed to cast context ID to a cid") + log.Infow("skipping ingestion", "proposalCid", proposalCid, "err", err) + return nil, skipError(err) } - // go from proposal cid -> piece cid by looking up deal in boost and if we can't find it there -> then markets - // check Boost deals DB + // Look up deal by proposal cid in the boost database. + // If we can't find it there check legacy markets DB. pds, boostErr := w.dealsDB.BySignedProposalCID(ctx, proposalCid) if boostErr == nil { + // Found the deal, get an iterator over the piece pieceCid := pds.ClientDealProposal.Proposal.PieceCID - return provideF(pieceCid) + return provideF(proposalCid, pieceCid) + } + + // Check if it's a "not found" error + if !errors.Is(boostErr, sql.ErrNoRows) { + // It's not a "not found" error: there was a problem accessing the + // database. Pause ingestion until the user can fix the DB. + e := fmt.Errorf("getting deal with proposal cid %s from boost database: %w", proposalCid, boostErr) + log.Infow("pausing ingestion", "proposalCid", proposalCid, "err", e) + return nil, e } - // check in legacy markets + // Deal was not found in boost DB - check in legacy markets md, legacyErr := w.legacyProv.GetLocalDeal(proposalCid) if legacyErr == nil { - return provideF(md.Proposal.PieceCID) + // Found the deal, get an interator over the piece + return provideF(proposalCid, md.Proposal.PieceCID) + } + + // Check if it's a "not found" error + if !errors.Is(legacyErr, datastore.ErrNotFound) { + // It's not a "not found" error: there was a problem accessing the + // legacy database. Pause ingestion until the user can fix the legacy DB. + e := fmt.Errorf("getting deal with proposal cid %s from Legacy Markets: %w", proposalCid, legacyErr) + log.Infow("pausing ingestion", "proposalCid", proposalCid, "err", e) + return nil, e } - return nil, fmt.Errorf("failed to look up deal in Boost, err=%s and Legacy Markets, err=%s", boostErr, legacyErr) + // The deal was not found in the boost or legacy database. + // Skip this deal and continue ingestion. + err = fmt.Errorf("deal with proposal cid %s not found", proposalCid) + log.Infow("skipping ingestion", "proposalCid", proposalCid, "err", err) + return nil, skipError(err) } func (w *Wrapper) AnnounceBoostDeal(ctx context.Context, deal *types.ProviderDealState) (cid.Cid, error) { diff --git a/node/config/def.go b/node/config/def.go index 656134488..fb7f8cd39 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -65,7 +65,7 @@ func DefaultBoost() *Boost { }, Graphql: GraphqlConfig{ - ListenAddress: "0.0.0.0", + ListenAddress: "127.0.0.1", Port: 8080, }, @@ -116,7 +116,7 @@ func DefaultBoost() *Boost { DealProposalLogDuration: Duration(time.Hour * 24), RetrievalLogDuration: Duration(time.Hour * 24), - StalledRetrievalTimeout: Duration(time.Minute * 30), + StalledRetrievalTimeout: Duration(time.Second * 30), RetrievalPricing: &lotus_config.RetrievalPricing{ Strategy: RetrievalPricingDefaultMode, @@ -189,7 +189,7 @@ func DefaultBoost() *Boost { MaxConcurrencyStorageCalls: 100, GCInterval: lotus_config.Duration(1 * time.Minute), }, - IndexProvider: lotus_config.IndexProviderConfig{ + IndexProvider: IndexProviderConfig{ Enable: true, EntriesCacheCapacity: 1024, EntriesChunkSize: 16384, @@ -197,6 +197,17 @@ func DefaultBoost() *Boost { // format: "/indexer/ingest/" TopicName: "", PurgeCacheOnStart: false, + + Announce: IndexProviderAnnounceConfig{ + AnnounceOverHttp: false, + DirectAnnounceURLs: []string{"https://cid.contact/ingest/announce"}, + }, + + HttpPublisher: IndexProviderHttpPublisherConfig{ + Enabled: false, + PublicHostname: "", + Port: 3104, + }, }, } return cfg diff --git a/node/config/doc_gen.go b/node/config/doc_gen.go index f5f7f9eaf..cd6cdca47 100644 --- a/node/config/doc_gen.go +++ b/node/config/doc_gen.go @@ -99,7 +99,7 @@ your node if metadata log is disabled`, }, { Name: "IndexProvider", - Type: "lotus_config.IndexProviderConfig", + Type: "IndexProviderConfig", Comment: ``, }, @@ -433,7 +433,7 @@ for any other deal.`, Name: "ListenAddress", Type: "string", - Comment: `The ip address the GraphQL server will bind to. Default: 0.0.0.0`, + Comment: `The ip address the GraphQL server will bind to. Default: 127.0.0.1`, }, { Name: "Port", @@ -442,6 +442,102 @@ for any other deal.`, Comment: `The port that the graphql server listens on`, }, }, + "IndexProviderAnnounceConfig": []DocField{ + { + Name: "AnnounceOverHttp", + Type: "bool", + + Comment: `Make a direct announcement to a list of indexing nodes over http. +Note that announcements are already made over pubsub regardless +of this setting.`, + }, + { + Name: "DirectAnnounceURLs", + Type: "[]string", + + Comment: `The list of URLs of indexing nodes to announce to.`, + }, + }, + "IndexProviderConfig": []DocField{ + { + Name: "Enable", + Type: "bool", + + Comment: `Enable set whether to enable indexing announcement to the network and expose endpoints that +allow indexer nodes to process announcements. Enabled by default.`, + }, + { + Name: "EntriesCacheCapacity", + Type: "int", + + Comment: `EntriesCacheCapacity sets the maximum capacity to use for caching the indexing advertisement +entries. Defaults to 1024 if not specified. The cache is evicted using LRU policy. The +maximum storage used by the cache is a factor of EntriesCacheCapacity, EntriesChunkSize and +the length of multihashes being advertised. For example, advertising 128-bit long multihashes +with the default EntriesCacheCapacity, and EntriesChunkSize means the cache size can grow to +256MiB when full.`, + }, + { + Name: "EntriesChunkSize", + Type: "int", + + Comment: `EntriesChunkSize sets the maximum number of multihashes to include in a single entries chunk. +Defaults to 16384 if not specified. Note that chunks are chained together for indexing +advertisements that include more multihashes than the configured EntriesChunkSize.`, + }, + { + Name: "TopicName", + Type: "string", + + Comment: `TopicName sets the topic name on which the changes to the advertised content are announced. +If not explicitly specified, the topic name is automatically inferred from the network name +in following format: '/indexer/ingest/' +Defaults to empty, which implies the topic name is inferred from network name.`, + }, + { + Name: "PurgeCacheOnStart", + Type: "bool", + + Comment: `PurgeCacheOnStart sets whether to clear any cached entries chunks when the provider engine +starts. By default, the cache is rehydrated from previously cached entries stored in +datastore if any is present.`, + }, + { + Name: "Announce", + Type: "IndexProviderAnnounceConfig", + + Comment: ``, + }, + { + Name: "HttpPublisher", + Type: "IndexProviderHttpPublisherConfig", + + Comment: ``, + }, + }, + "IndexProviderHttpPublisherConfig": []DocField{ + { + Name: "Enabled", + Type: "bool", + + Comment: `If not enabled, requests are served over graphsync instead.`, + }, + { + Name: "PublicHostname", + Type: "string", + + Comment: `Set the public hostname / IP for the index provider listener. +eg "82.129.73.111" +This is usually the same as the for the boost node.`, + }, + { + Name: "Port", + Type: "int", + + Comment: `Set the port on which to listen for index provider requests over HTTP. +Note that this port must be open on the firewall.`, + }, + }, "LocalIndexDirectoryConfig": []DocField{ { Name: "Yugabyte", diff --git a/node/config/types.go b/node/config/types.go index 30a1b0702..e7165631b 100644 --- a/node/config/types.go +++ b/node/config/types.go @@ -50,7 +50,7 @@ type Boost struct { LotusDealmaking lotus_config.DealmakingConfig LotusFees FeeConfig DAGStore lotus_config.DAGStoreConfig - IndexProvider lotus_config.IndexProviderConfig + IndexProvider IndexProviderConfig } func (b *Boost) GetDealmakingConfig() lotus_config.DealmakingConfig { @@ -74,7 +74,7 @@ type WalletsConfig struct { } type GraphqlConfig struct { - // The ip address the GraphQL server will bind to. Default: 0.0.0.0 + // The ip address the GraphQL server will bind to. Default: 127.0.0.1 ListenAddress string // The port that the graphql server listens on Port uint64 @@ -280,6 +280,62 @@ type ContractDealsConfig struct { From string } +type IndexProviderConfig struct { + // Enable set whether to enable indexing announcement to the network and expose endpoints that + // allow indexer nodes to process announcements. Enabled by default. + Enable bool + + // EntriesCacheCapacity sets the maximum capacity to use for caching the indexing advertisement + // entries. Defaults to 1024 if not specified. The cache is evicted using LRU policy. The + // maximum storage used by the cache is a factor of EntriesCacheCapacity, EntriesChunkSize and + // the length of multihashes being advertised. For example, advertising 128-bit long multihashes + // with the default EntriesCacheCapacity, and EntriesChunkSize means the cache size can grow to + // 256MiB when full. + EntriesCacheCapacity int + + // EntriesChunkSize sets the maximum number of multihashes to include in a single entries chunk. + // Defaults to 16384 if not specified. Note that chunks are chained together for indexing + // advertisements that include more multihashes than the configured EntriesChunkSize. + EntriesChunkSize int + + // TopicName sets the topic name on which the changes to the advertised content are announced. + // If not explicitly specified, the topic name is automatically inferred from the network name + // in following format: '/indexer/ingest/' + // Defaults to empty, which implies the topic name is inferred from network name. + TopicName string + + // PurgeCacheOnStart sets whether to clear any cached entries chunks when the provider engine + // starts. By default, the cache is rehydrated from previously cached entries stored in + // datastore if any is present. + PurgeCacheOnStart bool + + Announce IndexProviderAnnounceConfig + + HttpPublisher IndexProviderHttpPublisherConfig +} + +type IndexProviderAnnounceConfig struct { + // Make a direct announcement to a list of indexing nodes over http. + // Note that announcements are already made over pubsub regardless + // of this setting. + AnnounceOverHttp bool + + // The list of URLs of indexing nodes to announce to. + DirectAnnounceURLs []string +} + +type IndexProviderHttpPublisherConfig struct { + // If not enabled, requests are served over graphsync instead. + Enabled bool + // Set the public hostname / IP for the index provider listener. + // eg "82.129.73.111" + // This is usually the same as the for the boost node. + PublicHostname string + // Set the port on which to listen for index provider requests over HTTP. + // Note that this port must be open on the firewall. + Port int +} + type FeeConfig struct { // The maximum fee to pay when sending the PublishStorageDeals message MaxPublishDealsFee types.FIL diff --git a/node/impl/boost.go b/node/impl/boost.go index fb2d24d57..ca85cbae3 100644 --- a/node/impl/boost.go +++ b/node/impl/boost.go @@ -172,6 +172,14 @@ func (sm *BoostAPI) BoostIndexerListMultihashes(ctx context.Context, proposalCid } } +func (sm *BoostAPI) BoostIndexerAnnounceLatest(ctx context.Context) (cid.Cid, error) { + return sm.IndexProvider.IndexerAnnounceLatest(ctx) +} + +func (sm *BoostAPI) BoostIndexerAnnounceLatestHttp(ctx context.Context, announceUrls []string) (cid.Cid, error) { + return sm.IndexProvider.IndexerAnnounceLatestHttp(ctx, announceUrls) +} + func (sm *BoostAPI) BoostOfflineDealWithData(ctx context.Context, dealUuid uuid.UUID, filePath string, delAfterImport bool) (*api.ProviderDealRejectionInfo, error) { res, err := sm.StorageProvider.ImportOfflineDealData(ctx, dealUuid, filePath, delAfterImport) return res, err diff --git a/node/modules/storageminer_idxprov.go b/node/modules/storageminer_idxprov.go index 9e3585672..0b41dc600 100644 --- a/node/modules/storageminer_idxprov.go +++ b/node/modules/storageminer_idxprov.go @@ -5,13 +5,14 @@ import ( "fmt" "github.com/filecoin-project/boost/build" "github.com/filecoin-project/boost/indexprovider" + "github.com/filecoin-project/boost/node/config" "github.com/filecoin-project/boost/node/modules/dtypes" "github.com/filecoin-project/boost/retrievalmarket/types" + "github.com/filecoin-project/boost/util" "github.com/filecoin-project/go-address" datatransfer "github.com/filecoin-project/go-data-transfer" "github.com/filecoin-project/go-data-transfer/transport/graphsync" datatransferv2 "github.com/filecoin-project/go-data-transfer/v2" - "github.com/filecoin-project/lotus/node/config" lotus_dtypes "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" @@ -77,7 +78,8 @@ func IndexProvider(cfg config.IndexProviderConfig) func(params IdxProv, marketHo "pid", marketHost.ID(), "topic", topicName, "retAddrs", marketHost.Addrs()) - // If announcements to the network are enabled, then set options for datatransfer publisher. + + // If announcements to the network are enabled, then set options for the publisher. var e *engine.Engine if cfg.Enable { // Join the indexer topic using the market's pubsub instance. Otherwise, the provider @@ -93,14 +95,36 @@ func IndexProvider(cfg config.IndexProviderConfig) func(params IdxProv, marketHo // The extra data is required by the lotus-specific index-provider gossip message validators. ma := address.Address(maddr) opts = append(opts, - engine.WithPublisherKind(engine.DataTransferPublisher), - engine.WithDataTransfer(dtV1ToIndexerDT(dt, func() ipld.LinkSystem { - return *e.LinkSystem() - })), - engine.WithExtraGossipData(ma.Bytes()), engine.WithTopic(t), + engine.WithExtraGossipData(ma.Bytes()), ) - llog = llog.With("extraGossipData", ma, "publisher", "data-transfer") + if cfg.Announce.AnnounceOverHttp { + opts = append(opts, engine.WithDirectAnnounce(cfg.Announce.DirectAnnounceURLs...)) + } + + // Advertisements can be served over the data transfer protocol + // (on graphsync) or over HTTP + if cfg.HttpPublisher.Enabled { + announceAddr, err := util.ToHttpMultiaddr(cfg.HttpPublisher.PublicHostname, cfg.HttpPublisher.Port) + if err != nil { + return nil, fmt.Errorf("parsing HTTP Publisher hostname '%s' / port %d: %w", + cfg.HttpPublisher.PublicHostname, cfg.HttpPublisher.Port, err) + } + opts = append(opts, + engine.WithPublisherKind(engine.HttpPublisher), + engine.WithHttpPublisherListenAddr(fmt.Sprintf("0.0.0.0:%d", cfg.HttpPublisher.Port)), + engine.WithHttpPublisherAnnounceAddr(announceAddr.String()), + ) + llog = llog.With("publisher", "http", "announceAddr", announceAddr) + } else { + opts = append(opts, + engine.WithPublisherKind(engine.DataTransferPublisher), + engine.WithDataTransfer(dtV1ToIndexerDT(dt, func() ipld.LinkSystem { + return *e.LinkSystem() + })), + ) + llog = llog.With("extraGossipData", ma, "publisher", "data-transfer") + } } else { opts = append(opts, engine.WithPublisherKind(engine.NoPublisher)) llog = llog.With("publisher", "none") diff --git a/react/src/Mpool.css b/react/src/Mpool.css index 588ec5427..2a30e787e 100644 --- a/react/src/Mpool.css +++ b/react/src/Mpool.css @@ -117,4 +117,19 @@ left: 1.5em; border-left: 1px solid #000; height: 0.5em; +} + +.mpool .params{ + width: 1080px; + text-overflow: ellipsis; + cursor: pointer; + word-break: break-all; + overflow: hidden; + white-space: nowrap; +} + +.mpool .params:hover{ + overflow: visible; + white-space: normal; + width: auto; } \ No newline at end of file diff --git a/react/src/Mpool.js b/react/src/Mpool.js index 89950b346..c7b609afe 100644 --- a/react/src/Mpool.js +++ b/react/src/Mpool.js @@ -67,7 +67,9 @@ function MpoolMessage(props) { Params - {msg.Params} + +
{msg.Params}
+ Gas Fee Cap diff --git a/retrievalmarket/rtvllog/retrieval_log.go b/retrievalmarket/rtvllog/retrieval_log.go index c1f27e350..3711bf577 100644 --- a/retrievalmarket/rtvllog/retrieval_log.go +++ b/retrievalmarket/rtvllog/retrieval_log.go @@ -2,6 +2,7 @@ package rtvllog import ( "context" + "errors" "sync" "time" @@ -295,22 +296,36 @@ func (r *RetrievalLog) gcRetrievals(ctx context.Context) { continue } + var wg sync.WaitGroup for _, row := range rows { - chid := datatransfer.ChannelID{Initiator: row.PeerID, Responder: row.LocalPeerID, ID: row.TransferID} - // Try to cancel via unpaid graphsync first - err := r.gsur.CancelTransfer(ctx, row.TransferID, &row.PeerID) - - if err != nil { - // Attempt to terminate legacy, paid retrievals if we didnt cancel a free retrieval - err = r.dataTransfer.CloseDataTransferChannel(ctx, chid) - } - - if err != nil { - log.Debugw("error canceling retrieval", "dealID", row.DealID, "err", err) - } else { - log.Infof("Canceled retrieval %s, older than %s", row.DealID, r.stalledTimeout) + if row.TransferID <= 0 { + continue } + wg.Add(1) + go func(s RetrievalDealState) { + // Don't wait for more than 5 seconds for the cancel + // message to be sent when cancelling an unpaid retrieval + unpaidRtrvCtx, cancel := context.WithTimeout(ctx, time.Second*5) + defer cancel() + defer wg.Done() + + // Try to cancel an unpaid retrieval with the given transfer id first + err := r.gsur.CancelTransfer(unpaidRtrvCtx, s.TransferID, &s.PeerID) + if err != nil && errors.Is(err, server.ErrRetrievalNotFound) { + // Couldn't find an unpaid retrieval with that id, try + // to cancel a legacy, paid retrieval + chid := datatransfer.ChannelID{Initiator: s.PeerID, Responder: s.LocalPeerID, ID: s.TransferID} + err = r.dataTransfer.CloseDataTransferChannel(ctx, chid) + } + + if err != nil { + log.Debugw("error canceling retrieval", "dealID", s.DealID, "err", err) + } else { + log.Infof("Canceled retrieval %s, older than %s", s.DealID, r.stalledTimeout) + } + }(row) } + wg.Wait() } } } diff --git a/retrievalmarket/server/channelstate.go b/retrievalmarket/server/channelstate.go index 2a7dbb4a7..1f1b48ed8 100644 --- a/retrievalmarket/server/channelstate.go +++ b/retrievalmarket/server/channelstate.go @@ -4,6 +4,7 @@ import ( "bytes" "github.com/filecoin-project/boost-gfm/retrievalmarket" + graphsync "github.com/filecoin-project/boost-graphsync" datatransfer "github.com/filecoin-project/go-data-transfer" "github.com/ipfs/go-cid" "github.com/ipld/go-ipld-prime" @@ -22,6 +23,7 @@ type retrievalState struct { retType RetrievalType cs *channelState mkts *retrievalmarket.ProviderDealState + gsReq graphsync.RequestID } func (r retrievalState) ChannelState() channelState { return *r.cs } diff --git a/retrievalmarket/server/gsunpaidretrieval.go b/retrievalmarket/server/gsunpaidretrieval.go index 46a42ce58..2f97a6289 100644 --- a/retrievalmarket/server/gsunpaidretrieval.go +++ b/retrievalmarket/server/gsunpaidretrieval.go @@ -30,6 +30,7 @@ import ( ) var log = logging.Logger("boostgs") +var ErrRetrievalNotFound = fmt.Errorf("no transfer found") var incomingReqExtensions = []graphsync.ExtensionName{ extension.ExtensionIncomingRequest1_1, @@ -175,13 +176,23 @@ func (g *GraphsyncUnpaidRetrieval) CancelTransfer(ctx context.Context, id datatr if state == nil { g.activeRetrievalsLk.Unlock() - return fmt.Errorf("no transfer with id %d", id) + return fmt.Errorf("failed to cancel with id %d: %w", id, ErrRetrievalNotFound) } rcpt := state.cs.recipient tID := state.cs.transferID + gsRequestID := state.gsReq g.activeRetrievalsLk.Unlock() + // tell GraphSync to cancel the request + if (gsRequestID != graphsync.RequestID{}) { + err := g.Cancel(ctx, gsRequestID) + if err != nil { + log.Info("unable to force close graphsync request %s: %s", tID, err) + } + } + + // send a message on data transfer err := g.dtnet.SendMessage(ctx, rcpt, message.CancelResponse(tID)) g.failTransfer(state, errors.New("transfer cancelled by provider")) @@ -325,6 +336,7 @@ func (g *GraphsyncUnpaidRetrieval) handleRetrievalDeal(peerID peer.ID, msg datat retType: retType, cs: cs, mkts: mktsState, + gsReq: request.ID(), } // Record the data transfer ID so that we can intercept future diff --git a/retrievalmarket/server/gsunpaidretrieval_test.go b/retrievalmarket/server/gsunpaidretrieval_test.go index 8b2395472..758bff251 100644 --- a/retrievalmarket/server/gsunpaidretrieval_test.go +++ b/retrievalmarket/server/gsunpaidretrieval_test.go @@ -319,6 +319,10 @@ func runRequestTest(t *testing.T, tc testCase) { } else { require.NoError(t, err) } + + // final verification -- the server has no active graphsync requests + stats := gsupr.Stats() + require.Equal(t, stats.IncomingRequests.Active, uint64(0)) } func createRetrievalProvider(ctx context.Context, t *testing.T, testData *tut.Libp2pTestData, pieceStore *tut.TestPieceStore, sectorAccessor *testnodes.TestSectorAccessor, dagstoreWrapper *tut.MockDagStoreWrapper, gs graphsync.GraphExchange, paymentAddress address.Address) retrievalmarket.RetrievalProvider { diff --git a/storagemarket/lp2pimpl/net.go b/storagemarket/lp2pimpl/net.go index aae92782d..e281921b2 100644 --- a/storagemarket/lp2pimpl/net.go +++ b/storagemarket/lp2pimpl/net.go @@ -23,6 +23,7 @@ import ( "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/protocol" + "go.uber.org/zap" ) var log = logging.Logger("boost-net") @@ -31,9 +32,19 @@ var propLog = logging.Logger("boost-prop") const DealProtocolv120ID = "/fil/storage/mk/1.2.0" const DealProtocolv121ID = "/fil/storage/mk/1.2.1" const DealStatusV12ProtocolID = "/fil/storage/status/1.2.0" + +// The time limit to read a message from the client when the client opens a stream const providerReadDeadline = 10 * time.Second + +// The time limit to write a response to the client const providerWriteDeadline = 10 * time.Second -const clientReadDeadline = 10 * time.Second + +// The time limit to wait for the provider to send a response to a client's request. +// This includes the time it takes for the provider to process the request and +// send a response. +const clientReadDeadline = 60 * time.Second + +// The time limit to write a message to the provider const clientWriteDeadline = 10 * time.Second // DealClientOption is an option for configuring the libp2p storage deal client @@ -195,34 +206,44 @@ func (p *DealProvider) Stop() { // Called when the client opens a libp2p stream with a new deal proposal func (p *DealProvider) handleNewDealStream(s network.Stream) { - defer s.Close() + start := time.Now() + reqLogUuid := uuid.New() + reqLog := log.With("reqlog-uuid", reqLogUuid.String(), "client-peer", s.Conn().RemotePeer()) + reqLog.Debugw("new deal proposal request") + + defer func() { + err := s.Close() + if err != nil { + reqLog.Infow("closing stream", "err", err) + } + reqLog.Debugw("handled deal proposal request", "duration", time.Since(start).String()) + }() // Set a deadline on reading from the stream so it doesn't hang _ = s.SetReadDeadline(time.Now().Add(providerReadDeadline)) - defer s.SetReadDeadline(time.Time{}) // nolint // Read the deal proposal from the stream var proposal types.DealParams err := proposal.UnmarshalCBOR(s) + _ = s.SetReadDeadline(time.Time{}) // Clear read deadline so conn doesn't get closed if err != nil { - log.Warnw("reading storage deal proposal from stream", "err", err) + reqLog.Warnw("reading storage deal proposal from stream", "err", err) return } - log.Infow("received deal proposal", "id", proposal.DealUUID, "client-peer", s.Conn().RemotePeer()) + reqLog = reqLog.With("id", proposal.DealUUID) + reqLog.Infow("received deal proposal") // Start executing the deal. // Note: This method just waits for the deal to be accepted, it doesn't // wait for deal execution to complete. + startExec := time.Now() res, err := p.prov.ExecuteDeal(context.Background(), &proposal, s.Conn().RemotePeer()) + reqLog.Debugw("processed deal proposal accept") if err != nil { - log.Warnw("deal proposal failed", "id", proposal.DealUUID, "err", err, "reason", res.Reason) + reqLog.Warnw("deal proposal failed", "err", err, "reason", res.Reason) } - // Set a deadline on writing to the stream so it doesn't hang - _ = s.SetWriteDeadline(time.Now().Add(providerWriteDeadline)) - defer s.SetWriteDeadline(time.Time{}) // nolint - // Log the response propLog.Infow("send deal proposal response", "id", proposal.DealUUID, @@ -238,44 +259,60 @@ func (p *DealProvider) handleNewDealStream(s network.Stream) { "start epoch", proposal.ClientDealProposal.Proposal.StartEpoch, "end epoch", proposal.ClientDealProposal.Proposal.EndEpoch, "price per epoch", proposal.ClientDealProposal.Proposal.StoragePricePerEpoch, + "duration", time.Since(startExec).String(), ) _ = p.plDB.InsertLog(p.ctx, proposal, res.Accepted, res.Reason) //nolint:errcheck + // Set a deadline on writing to the stream so it doesn't hang + _ = s.SetWriteDeadline(time.Now().Add(providerWriteDeadline)) + defer s.SetWriteDeadline(time.Time{}) // nolint + // Write the response to the client err = cborutil.WriteCborRPC(s, &types.DealResponse{Accepted: res.Accepted, Message: res.Reason}) if err != nil { - log.Warnw("writing deal response", "id", proposal.DealUUID, "err", err) - return + reqLog.Warnw("writing deal response", "err", err) } } func (p *DealProvider) handleNewDealStatusStream(s network.Stream) { - defer s.Close() + start := time.Now() + reqLogUuid := uuid.New() + reqLog := log.With("reqlog-uuid", reqLogUuid.String(), "client-peer", s.Conn().RemotePeer()) + reqLog.Debugw("new deal status request") + + defer func() { + err := s.Close() + if err != nil { + reqLog.Infow("closing stream", "err", err) + } + reqLog.Debugw("handled deal status request", "duration", time.Since(start).String()) + }() + // Read the deal status request from the stream _ = s.SetReadDeadline(time.Now().Add(providerReadDeadline)) - defer s.SetReadDeadline(time.Time{}) // nolint - var req types.DealStatusRequest err := req.UnmarshalCBOR(s) + _ = s.SetReadDeadline(time.Time{}) // Clear read deadline so conn doesn't get closed if err != nil { - log.Warnw("reading deal status request from stream", "err", err) + reqLog.Warnw("reading deal status request from stream", "err", err) return } - log.Debugw("received deal status request", "id", req.DealUUID, "client-peer", s.Conn().RemotePeer()) + reqLog = reqLog.With("id", req.DealUUID) + reqLog.Debugw("received deal status request") - resp := p.getDealStatus(req) + resp := p.getDealStatus(req, reqLog) + reqLog.Debugw("processed deal status request") // Set a deadline on writing to the stream so it doesn't hang _ = s.SetWriteDeadline(time.Now().Add(providerWriteDeadline)) defer s.SetWriteDeadline(time.Time{}) // nolint if err := cborutil.WriteCborRPC(s, &resp); err != nil { - log.Errorw("failed to write deal status response", "err", err) - return + reqLog.Errorw("failed to write deal status response", "err", err) } } -func (p *DealProvider) getDealStatus(req types.DealStatusRequest) types.DealStatusResponse { +func (p *DealProvider) getDealStatus(req types.DealStatusRequest, reqLog *zap.SugaredLogger) types.DealStatusResponse { errResp := func(err string) types.DealStatusResponse { return types.DealStatusResponse{DealUUID: req.DealUUID, Error: err} } @@ -286,34 +323,34 @@ func (p *DealProvider) getDealStatus(req types.DealStatusRequest) types.DealStat } if err != nil { - log.Errorw("failed to fetch deal status", "err", err) + reqLog.Errorw("failed to fetch deal status", "err", err) return errResp("failed to fetch deal status") } // verify request signature uuidBytes, err := req.DealUUID.MarshalBinary() if err != nil { - log.Errorw("failed to serialize request deal UUID", "err", err) + reqLog.Errorw("failed to serialize request deal UUID", "err", err) return errResp("failed to serialize request deal UUID") } clientAddr := pds.ClientDealProposal.Proposal.Client addr, err := p.fullNode.StateAccountKey(p.ctx, clientAddr, chaintypes.EmptyTSK) if err != nil { - log.Errorw("failed to get account key for client addr", "client", clientAddr.String(), "err", err) + reqLog.Errorw("failed to get account key for client addr", "client", clientAddr.String(), "err", err) msg := fmt.Sprintf("failed to get account key for client addr %s", clientAddr.String()) return errResp(msg) } err = sigs.Verify(&req.Signature, addr, uuidBytes) if err != nil { - log.Warnw("signature verification failed", "err", err) + reqLog.Warnw("signature verification failed", "err", err) return errResp("signature verification failed") } signedPropCid, err := pds.SignedProposalCid() if err != nil { - log.Errorw("getting signed proposal cid", "err", err) + reqLog.Errorw("getting signed proposal cid", "err", err) return errResp("getting signed proposal cid") } @@ -321,7 +358,7 @@ func (p *DealProvider) getDealStatus(req types.DealStatusRequest) types.DealStat si, err := p.spApi.SectorsStatus(p.ctx, pds.SectorID, false) if err != nil { - log.Errorw("getting sector status from sealer", "err", err) + reqLog.Errorw("getting sector status from sealer", "err", err) return errResp("getting sector status from sealer") } diff --git a/util/addr.go b/util/addr.go new file mode 100644 index 000000000..f171cffc9 --- /dev/null +++ b/util/addr.go @@ -0,0 +1,26 @@ +package util + +import ( + "fmt" + ma "github.com/multiformats/go-multiaddr" + "net" + "strings" +) + +func ToHttpMultiaddr(hostname string, port int) (ma.Multiaddr, error) { + if hostname == "" { + return nil, fmt.Errorf("hostname is empty") + } + + var saddr string + if n := net.ParseIP(hostname); n != nil { + ipVersion := "ip4" + if strings.Contains(hostname, ":") { + ipVersion = "ip6" + } + saddr = fmt.Sprintf("/%s/%s/tcp/%d/http", ipVersion, hostname, port) + } else { + saddr = fmt.Sprintf("/dns/%s/tcp/%d/http", hostname, port) + } + return ma.NewMultiaddr(saddr) +} diff --git a/util/addr_test.go b/util/addr_test.go new file mode 100644 index 000000000..810020787 --- /dev/null +++ b/util/addr_test.go @@ -0,0 +1,44 @@ +package util + +import ( + "github.com/stretchr/testify/require" + "testing" +) + +func TestToHttpMultiaddr(t *testing.T) { + tcs := []struct { + hostname string + port int + expected string + expectErr bool + }{{ + hostname: "192.168.1.1", + port: 1234, + expected: "/ip4/192.168.1.1/tcp/1234/http", + }, { + hostname: "2001:db8::68", + port: 1234, + expected: "/ip6/2001:db8::68/tcp/1234/http", + }, { + hostname: "example.com", + port: 1234, + expected: "/dns/example.com/tcp/1234/http", + }, { + hostname: "", + port: 1234, + expected: "", + expectErr: true, + }} + + for _, tc := range tcs { + t.Run("", func(t *testing.T) { + ma, err := ToHttpMultiaddr(tc.hostname, tc.port) + if tc.expectErr { + require.Error(t, err) + } else { + require.NoError(t, err) + require.Equal(t, tc.expected, ma.String()) + } + }) + } +}