diff --git a/cmd/ipfs/daemon.go b/cmd/ipfs/daemon.go index 88861055c5a..010ec208bdb 100644 --- a/cmd/ipfs/daemon.go +++ b/cmd/ipfs/daemon.go @@ -16,12 +16,10 @@ import ( commands "github.com/ipfs/go-ipfs/core/commands" corehttp "github.com/ipfs/go-ipfs/core/corehttp" corerepo "github.com/ipfs/go-ipfs/core/corerepo" - "github.com/ipfs/go-ipfs/core/corerouting" nodeMount "github.com/ipfs/go-ipfs/fuse/node" fsrepo "github.com/ipfs/go-ipfs/repo/fsrepo" migrate "github.com/ipfs/go-ipfs/repo/fsrepo/migrations" - pstore "gx/ipfs/QmPgDWmTmuzvP7QE5zwo1TmjbJme9pmZHNujB2453jkCTr/go-libp2p-peerstore" mprome "gx/ipfs/QmSk46nSD78YiuNojYMS8NW6hSCjH95JajqqzzoychZgef/go-metrics-prometheus" "gx/ipfs/QmX3QZ5jHEPidwUrymXV1iSCSUhdGxj15sm2gP4jKMef7B/client_golang/prometheus" "gx/ipfs/QmX3U3YXCQ6UYBxq2LVWF8dARS1hPUTEYLrSx654Qyxyw6/go-multiaddr-net" @@ -304,21 +302,8 @@ func daemonFunc(req cmds.Request, res cmds.Response) { } switch routingOption { case routingOptionSupernodeKwd: - servers, err := cfg.SupernodeRouting.ServerIPFSAddrs() - if err != nil { - res.SetError(err, cmds.ErrNormal) - repo.Close() // because ownership hasn't been transferred to the node - return - } - var infos []pstore.PeerInfo - for _, addr := range servers { - infos = append(infos, pstore.PeerInfo{ - ID: addr.ID(), - Addrs: []ma.Multiaddr{addr.Transport()}, - }) - } - - ncfg.Routing = corerouting.SupernodeClient(infos...) + res.SetError(errors.New("supernode routing was never fully implemented and has been removed"), cmds.ErrNormal) + return case routingOptionDHTClientKwd: ncfg.Routing = core.DHTClientOption case routingOptionDHTKwd: diff --git a/core/corerouting/core.go b/core/corerouting/core.go deleted file mode 100644 index 91d2adb04e6..00000000000 --- a/core/corerouting/core.go +++ /dev/null @@ -1,52 +0,0 @@ -package corerouting - -import ( - "errors" - - context "context" - core "github.com/ipfs/go-ipfs/core" - repo "github.com/ipfs/go-ipfs/repo" - supernode "github.com/ipfs/go-ipfs/routing/supernode" - gcproxy "github.com/ipfs/go-ipfs/routing/supernode/proxy" - routing "gx/ipfs/QmPR2JzfKd9poHx9XBhzoFeBBC31ZM3W5iUPKJZWyaoZZm/go-libp2p-routing" - pstore "gx/ipfs/QmPgDWmTmuzvP7QE5zwo1TmjbJme9pmZHNujB2453jkCTr/go-libp2p-peerstore" - ds "gx/ipfs/QmVSase1JP7cq9QkPT46oNwdp9pT6kBkG3oqS14y3QcZjG/go-datastore" - "gx/ipfs/QmaSxYRuMq4pkpBBG2CYaRrPx2z7NmMVEs34b9g61biQA6/go-libp2p-host" -) - -// NB: DHT option is included in the core to avoid 1) because it's a sane -// default and 2) to avoid a circular dependency (it needs to be referenced in -// the core if it's going to be the default) - -var errServersMissing = errors.New("supernode routing client requires at least 1 server peer") - -// SupernodeServer returns a configuration for a routing server that stores -// routing records to the provided datastore. Only routing records are store in -// the datastore. -func SupernodeServer(recordSource ds.Datastore) core.RoutingOption { - return func(ctx context.Context, ph host.Host, dstore repo.Datastore) (routing.IpfsRouting, error) { - server, err := supernode.NewServer(recordSource, ph.Peerstore(), ph.ID()) - if err != nil { - return nil, err - } - proxy := &gcproxy.Loopback{ - Handler: server, - Local: ph.ID(), - } - ph.SetStreamHandler(gcproxy.ProtocolSNR, proxy.HandleStream) - return supernode.NewClient(proxy, ph, ph.Peerstore(), ph.ID()) - } -} - -// TODO doc -func SupernodeClient(remotes ...pstore.PeerInfo) core.RoutingOption { - return func(ctx context.Context, ph host.Host, dstore repo.Datastore) (routing.IpfsRouting, error) { - if len(remotes) < 1 { - return nil, errServersMissing - } - - proxy := gcproxy.Standard(ph, remotes) - ph.SetStreamHandler(gcproxy.ProtocolSNR, proxy.HandleStream) - return supernode.NewClient(proxy, ph, ph.Peerstore(), ph.ID()) - } -} diff --git a/routing/supernode/client.go b/routing/supernode/client.go deleted file mode 100644 index 6764f784fbf..00000000000 --- a/routing/supernode/client.go +++ /dev/null @@ -1,164 +0,0 @@ -package supernode - -import ( - "bytes" - "context" - "errors" - "time" - - proxy "github.com/ipfs/go-ipfs/routing/supernode/proxy" - - cid "gx/ipfs/QmNp85zy9RLrQ5oQD4hPyS39ezrrXpcaa7R4Y9kxdWQLLQ/go-cid" - routing "gx/ipfs/QmPR2JzfKd9poHx9XBhzoFeBBC31ZM3W5iUPKJZWyaoZZm/go-libp2p-routing" - pstore "gx/ipfs/QmPgDWmTmuzvP7QE5zwo1TmjbJme9pmZHNujB2453jkCTr/go-libp2p-peerstore" - logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log" - loggables "gx/ipfs/QmT4PgCNdv73hnFAqzHqwW44q7M9PWpykSswHDxndquZbc/go-libp2p-loggables" - dhtpb "gx/ipfs/QmT7PnPxYkeKPCG8pAnucfcjrXc15Q7FgvFv7YC24EPrw8/go-libp2p-kad-dht/pb" - peer "gx/ipfs/QmXYjuNuxVzXKJCfWasQk1RqkhVLDM9jtUKhqc2WPQmFSB/go-libp2p-peer" - proto "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/proto" - "gx/ipfs/QmaSxYRuMq4pkpBBG2CYaRrPx2z7NmMVEs34b9g61biQA6/go-libp2p-host" - pb "gx/ipfs/QmbxkgUceEcuSZ4ZdBA3x74VUDSSYjHYmmeEqkjxbtZ6Jg/go-libp2p-record/pb" -) - -var log = logging.Logger("supernode") - -type Client struct { - peerhost host.Host - peerstore pstore.Peerstore - proxy proxy.Proxy - local peer.ID -} - -// TODO take in datastore/cache -func NewClient(px proxy.Proxy, h host.Host, ps pstore.Peerstore, local peer.ID) (*Client, error) { - return &Client{ - proxy: px, - local: local, - peerstore: ps, - peerhost: h, - }, nil -} - -func (c *Client) FindProvidersAsync(ctx context.Context, k *cid.Cid, max int) <-chan pstore.PeerInfo { - logging.ContextWithLoggable(ctx, loggables.Uuid("findProviders")) - defer log.EventBegin(ctx, "findProviders", k).Done() - ch := make(chan pstore.PeerInfo) - go func() { - defer close(ch) - request := dhtpb.NewMessage(dhtpb.Message_GET_PROVIDERS, k.KeyString(), 0) - response, err := c.proxy.SendRequest(ctx, request) - if err != nil { - log.Debug(err) - return - } - for _, p := range dhtpb.PBPeersToPeerInfos(response.GetProviderPeers()) { - select { - case <-ctx.Done(): - log.Debug(ctx.Err()) - return - case ch <- *p: - } - } - }() - return ch -} - -func (c *Client) PutValue(ctx context.Context, k string, v []byte) error { - defer log.EventBegin(ctx, "putValue").Done() - r, err := makeRecord(c.peerstore, c.local, k, v) - if err != nil { - return err - } - pmes := dhtpb.NewMessage(dhtpb.Message_PUT_VALUE, string(k), 0) - pmes.Record = r - return c.proxy.SendMessage(ctx, pmes) // wrap to hide the remote -} - -func (c *Client) GetValue(ctx context.Context, k string) ([]byte, error) { - defer log.EventBegin(ctx, "getValue").Done() - msg := dhtpb.NewMessage(dhtpb.Message_GET_VALUE, string(k), 0) - response, err := c.proxy.SendRequest(ctx, msg) // TODO wrap to hide the remote - if err != nil { - return nil, err - } - return response.Record.GetValue(), nil -} - -func (c *Client) GetValues(ctx context.Context, k string, _ int) ([]routing.RecvdVal, error) { - defer log.EventBegin(ctx, "getValue").Done() - msg := dhtpb.NewMessage(dhtpb.Message_GET_VALUE, string(k), 0) - response, err := c.proxy.SendRequest(ctx, msg) // TODO wrap to hide the remote - if err != nil { - return nil, err - } - - return []routing.RecvdVal{ - { - Val: response.Record.GetValue(), - From: c.local, - }, - }, nil -} - -// Provide adds the given key 'k' to the content routing system. If 'brd' is -// true, it announces that content to the network. For the supernode client, -// setting 'brd' to false makes this call a no-op -func (c *Client) Provide(ctx context.Context, k *cid.Cid, brd bool) error { - if !brd { - return nil - } - defer log.EventBegin(ctx, "provide", k).Done() - msg := dhtpb.NewMessage(dhtpb.Message_ADD_PROVIDER, k.KeyString(), 0) - // FIXME how is connectedness defined for the local node - pri := []dhtpb.PeerRoutingInfo{ - { - PeerInfo: pstore.PeerInfo{ - ID: c.local, - Addrs: c.peerhost.Addrs(), - }, - }, - } - msg.ProviderPeers = dhtpb.PeerRoutingInfosToPBPeers(pri) - return c.proxy.SendMessage(ctx, msg) // TODO wrap to hide remote -} - -func (c *Client) FindPeer(ctx context.Context, id peer.ID) (pstore.PeerInfo, error) { - defer log.EventBegin(ctx, "findPeer", id).Done() - request := dhtpb.NewMessage(dhtpb.Message_FIND_NODE, string(id), 0) - response, err := c.proxy.SendRequest(ctx, request) // hide remote - if err != nil { - return pstore.PeerInfo{}, err - } - for _, p := range dhtpb.PBPeersToPeerInfos(response.GetCloserPeers()) { - if p.ID == id { - return *p, nil - } - } - return pstore.PeerInfo{}, errors.New("could not find peer") -} - -// creates and signs a record for the given key/value pair -func makeRecord(ps pstore.Peerstore, p peer.ID, k string, v []byte) (*pb.Record, error) { - blob := bytes.Join([][]byte{[]byte(k), v, []byte(p)}, []byte{}) - sig, err := ps.PrivKey(p).Sign(blob) - if err != nil { - return nil, err - } - return &pb.Record{ - Key: proto.String(string(k)), - Value: v, - Author: proto.String(string(p)), - Signature: sig, - }, nil -} - -func (c *Client) Ping(ctx context.Context, id peer.ID) (time.Duration, error) { - defer log.EventBegin(ctx, "ping", id).Done() - return time.Nanosecond, errors.New("supernode routing does not support the ping method") -} - -func (c *Client) Bootstrap(ctx context.Context) error { - return c.proxy.Bootstrap(ctx) -} - -var _ routing.IpfsRouting = &Client{} diff --git a/routing/supernode/proxy/loopback.go b/routing/supernode/proxy/loopback.go deleted file mode 100644 index 16480e65cf7..00000000000 --- a/routing/supernode/proxy/loopback.go +++ /dev/null @@ -1,59 +0,0 @@ -package proxy - -import ( - context "context" - inet "gx/ipfs/QmNa31VPzC561NWwRsJLE7nGYZYuuD2QfpK2b1q9BK54J1/go-libp2p-net" - dhtpb "gx/ipfs/QmT7PnPxYkeKPCG8pAnucfcjrXc15Q7FgvFv7YC24EPrw8/go-libp2p-kad-dht/pb" - peer "gx/ipfs/QmXYjuNuxVzXKJCfWasQk1RqkhVLDM9jtUKhqc2WPQmFSB/go-libp2p-peer" - ggio "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/io" -) - -// RequestHandler handles routing requests locally -type RequestHandler interface { - HandleRequest(ctx context.Context, p peer.ID, m *dhtpb.Message) *dhtpb.Message -} - -// Loopback forwards requests to a local handler -type Loopback struct { - Handler RequestHandler - Local peer.ID -} - -func (_ *Loopback) Bootstrap(ctx context.Context) error { - return nil -} - -// SendMessage intercepts local requests, forwarding them to a local handler -func (lb *Loopback) SendMessage(ctx context.Context, m *dhtpb.Message) error { - response := lb.Handler.HandleRequest(ctx, lb.Local, m) - if response != nil { - log.Warning("loopback handler returned unexpected message") - } - return nil -} - -// SendRequest intercepts local requests, forwarding them to a local handler -func (lb *Loopback) SendRequest(ctx context.Context, m *dhtpb.Message) (*dhtpb.Message, error) { - return lb.Handler.HandleRequest(ctx, lb.Local, m), nil -} - -func (lb *Loopback) HandleStream(s inet.Stream) { - defer s.Close() - pbr := ggio.NewDelimitedReader(s, inet.MessageSizeMax) - var incoming dhtpb.Message - if err := pbr.ReadMsg(&incoming); err != nil { - s.Reset() - log.Debug(err) - return - } - ctx := context.TODO() - outgoing := lb.Handler.HandleRequest(ctx, s.Conn().RemotePeer(), &incoming) - - pbw := ggio.NewDelimitedWriter(s) - - if err := pbw.WriteMsg(outgoing); err != nil { - s.Reset() - log.Debug(err) - return - } -} diff --git a/routing/supernode/proxy/standard.go b/routing/supernode/proxy/standard.go deleted file mode 100644 index 09c78d69a8f..00000000000 --- a/routing/supernode/proxy/standard.go +++ /dev/null @@ -1,174 +0,0 @@ -package proxy - -import ( - "context" - "errors" - - inet "gx/ipfs/QmNa31VPzC561NWwRsJLE7nGYZYuuD2QfpK2b1q9BK54J1/go-libp2p-net" - pstore "gx/ipfs/QmPgDWmTmuzvP7QE5zwo1TmjbJme9pmZHNujB2453jkCTr/go-libp2p-peerstore" - kbucket "gx/ipfs/QmSAFA8v42u4gpJNy1tb7vW3JiiXiaYDC2b845c2RnNSJL/go-libp2p-kbucket" - logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log" - loggables "gx/ipfs/QmT4PgCNdv73hnFAqzHqwW44q7M9PWpykSswHDxndquZbc/go-libp2p-loggables" - dhtpb "gx/ipfs/QmT7PnPxYkeKPCG8pAnucfcjrXc15Q7FgvFv7YC24EPrw8/go-libp2p-kad-dht/pb" - peer "gx/ipfs/QmXYjuNuxVzXKJCfWasQk1RqkhVLDM9jtUKhqc2WPQmFSB/go-libp2p-peer" - ggio "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/io" - host "gx/ipfs/QmaSxYRuMq4pkpBBG2CYaRrPx2z7NmMVEs34b9g61biQA6/go-libp2p-host" -) - -const ProtocolSNR = "/ipfs/supernoderouting" - -var log = logging.Logger("supernode/proxy") - -type Proxy interface { - Bootstrap(context.Context) error - HandleStream(inet.Stream) - SendMessage(ctx context.Context, m *dhtpb.Message) error - SendRequest(ctx context.Context, m *dhtpb.Message) (*dhtpb.Message, error) -} - -type standard struct { - Host host.Host - - remoteInfos []pstore.PeerInfo // addr required for bootstrapping - remoteIDs []peer.ID // []ID is required for each req. here, cached for performance. -} - -func Standard(h host.Host, remotes []pstore.PeerInfo) Proxy { - var ids []peer.ID - for _, remote := range remotes { - ids = append(ids, remote.ID) - } - return &standard{h, remotes, ids} -} - -func (px *standard) Bootstrap(ctx context.Context) error { - var cxns []pstore.PeerInfo - for _, info := range px.remoteInfos { - if err := px.Host.Connect(ctx, info); err != nil { - continue - } - cxns = append(cxns, info) - } - if len(cxns) == 0 { - log.Error("unable to bootstrap to any supernode routers") - } else { - log.Infof("bootstrapped to %d supernode routers: %s", len(cxns), cxns) - } - return nil -} - -func (p *standard) HandleStream(s inet.Stream) { - // TODO(brian): Should clients be able to satisfy requests? - log.Error("supernode client received (dropped) a routing message from", s.Conn().RemotePeer()) - s.Reset() -} - -const replicationFactor = 2 - -// SendMessage sends message to each remote sequentially (randomized order), -// stopping after the first successful response. If all fail, returns the last -// error. -func (px *standard) SendMessage(ctx context.Context, m *dhtpb.Message) error { - var err error - var numSuccesses int - for _, remote := range sortedByKey(px.remoteIDs, m.GetKey()) { - if err = px.sendMessage(ctx, m, remote); err != nil { // careful don't re-declare err! - continue - } - numSuccesses++ - switch m.GetType() { - case dhtpb.Message_ADD_PROVIDER, dhtpb.Message_PUT_VALUE: - if numSuccesses < replicationFactor { - continue - } - } - return nil // success - } - return err // NB: returns the last error -} - -func (px *standard) sendMessage(ctx context.Context, m *dhtpb.Message, remote peer.ID) (err error) { - e := log.EventBegin(ctx, "sendRoutingMessage", px.Host.ID(), remote, m) - defer func() { - if err != nil { - e.SetError(err) - } - e.Done() - }() - if err = px.Host.Connect(ctx, pstore.PeerInfo{ID: remote}); err != nil { - return err - } - s, err := px.Host.NewStream(ctx, remote, ProtocolSNR) - if err != nil { - return err - } - pbw := ggio.NewDelimitedWriter(s) - - err = pbw.WriteMsg(m) - if err == nil { - s.Close() - } else { - s.Reset() - } - return err -} - -// SendRequest sends the request to each remote sequentially (randomized order), -// stopping after the first successful response. If all fail, returns the last -// error. -func (px *standard) SendRequest(ctx context.Context, m *dhtpb.Message) (*dhtpb.Message, error) { - var err error - for _, remote := range sortedByKey(px.remoteIDs, m.GetKey()) { - var reply *dhtpb.Message - reply, err = px.sendRequest(ctx, m, remote) // careful don't redeclare err! - if err != nil { - continue - } - return reply, nil // success - } - return nil, err // NB: returns the last error -} - -func (px *standard) sendRequest(ctx context.Context, m *dhtpb.Message, remote peer.ID) (*dhtpb.Message, error) { - e := log.EventBegin(ctx, "sendRoutingRequest", px.Host.ID(), remote, logging.Pair("request", m)) - defer e.Done() - if err := px.Host.Connect(ctx, pstore.PeerInfo{ID: remote}); err != nil { - e.SetError(err) - return nil, err - } - s, err := px.Host.NewStream(ctx, remote, ProtocolSNR) - if err != nil { - e.SetError(err) - return nil, err - } - defer s.Close() - r := ggio.NewDelimitedReader(s, inet.MessageSizeMax) - w := ggio.NewDelimitedWriter(s) - if err = w.WriteMsg(m); err != nil { - s.Reset() - e.SetError(err) - return nil, err - } - - response := &dhtpb.Message{} - if err = r.ReadMsg(response); err != nil { - s.Reset() - e.SetError(err) - return nil, err - } - // need ctx expiration? - if response == nil { - s.Reset() - err := errors.New("no response to request") - e.SetError(err) - return nil, err - } - e.Append(logging.Pair("response", response)) - e.Append(logging.Pair("uuid", loggables.Uuid("foo"))) - return response, nil -} - -func sortedByKey(peers []peer.ID, skey string) []peer.ID { - target := kbucket.ConvertKey(skey) - return kbucket.SortClosestPeers(peers, target) -} diff --git a/routing/supernode/server.go b/routing/supernode/server.go deleted file mode 100644 index 592cd0732f0..00000000000 --- a/routing/supernode/server.go +++ /dev/null @@ -1,202 +0,0 @@ -package supernode - -import ( - "context" - "errors" - "fmt" - - proxy "github.com/ipfs/go-ipfs/routing/supernode/proxy" - dshelp "github.com/ipfs/go-ipfs/thirdparty/ds-help" - - pstore "gx/ipfs/QmPgDWmTmuzvP7QE5zwo1TmjbJme9pmZHNujB2453jkCTr/go-libp2p-peerstore" - dhtpb "gx/ipfs/QmT7PnPxYkeKPCG8pAnucfcjrXc15Q7FgvFv7YC24EPrw8/go-libp2p-kad-dht/pb" - datastore "gx/ipfs/QmVSase1JP7cq9QkPT46oNwdp9pT6kBkG3oqS14y3QcZjG/go-datastore" - peer "gx/ipfs/QmXYjuNuxVzXKJCfWasQk1RqkhVLDM9jtUKhqc2WPQmFSB/go-libp2p-peer" - proto "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/proto" - pb "gx/ipfs/QmbxkgUceEcuSZ4ZdBA3x74VUDSSYjHYmmeEqkjxbtZ6Jg/go-libp2p-record/pb" -) - -// Server handles routing queries using a database backend -type Server struct { - local peer.ID - routingBackend datastore.Datastore - peerstore pstore.Peerstore - *proxy.Loopback // so server can be injected into client -} - -// NewServer creates a new Supernode routing Server -func NewServer(ds datastore.Datastore, ps pstore.Peerstore, local peer.ID) (*Server, error) { - s := &Server{local, ds, ps, nil} - s.Loopback = &proxy.Loopback{ - Handler: s, - Local: local, - } - return s, nil -} - -func (_ *Server) Bootstrap(ctx context.Context) error { - return nil -} - -// HandleLocalRequest implements the proxy.RequestHandler interface. This is -// where requests are received from the outside world. -func (s *Server) HandleRequest(ctx context.Context, p peer.ID, req *dhtpb.Message) *dhtpb.Message { - _, response := s.handleMessage(ctx, p, req) // ignore response peer. it's local. - return response -} - -func (s *Server) handleMessage( - ctx context.Context, p peer.ID, req *dhtpb.Message) (peer.ID, *dhtpb.Message) { - - defer log.EventBegin(ctx, "routingMessageReceived", req, p).Done() - - var response = dhtpb.NewMessage(req.GetType(), req.GetKey(), req.GetClusterLevel()) - switch req.GetType() { - - case dhtpb.Message_GET_VALUE: - rawRecord, err := getRoutingRecord(s.routingBackend, req.GetKey()) - if err != nil { - return "", nil - } - response.Record = rawRecord - return p, response - - case dhtpb.Message_PUT_VALUE: - // FIXME: verify complains that the peer's ID is not present in the - // peerstore. Mocknet problem? - // if err := verify(s.peerstore, req.GetRecord()); err != nil { - // log.Event(ctx, "validationFailed", req, p) - // return "", nil - // } - putRoutingRecord(s.routingBackend, req.GetKey(), req.GetRecord()) - return p, req - - case dhtpb.Message_FIND_NODE: - p := s.peerstore.PeerInfo(peer.ID(req.GetKey())) - pri := []dhtpb.PeerRoutingInfo{ - { - PeerInfo: p, - // Connectedness: TODO - }, - } - response.CloserPeers = dhtpb.PeerRoutingInfosToPBPeers(pri) - return p.ID, response - - case dhtpb.Message_ADD_PROVIDER: - for _, provider := range req.GetProviderPeers() { - providerID := peer.ID(provider.GetId()) - if providerID == p { - store := []*dhtpb.Message_Peer{provider} - storeProvidersToPeerstore(s.peerstore, p, store) - if err := putRoutingProviders(s.routingBackend, req.GetKey(), store); err != nil { - return "", nil - } - } else { - log.Event(ctx, "addProviderBadRequest", p, req) - } - } - return "", nil - - case dhtpb.Message_GET_PROVIDERS: - providers, err := getRoutingProviders(s.routingBackend, req.GetKey()) - if err != nil { - return "", nil - } - response.ProviderPeers = providers - return p, response - - case dhtpb.Message_PING: - return p, req - default: - } - return "", nil -} - -var _ proxy.RequestHandler = &Server{} -var _ proxy.Proxy = &Server{} - -func getRoutingRecord(ds datastore.Datastore, k string) (*pb.Record, error) { - dskey := dshelp.NewKeyFromBinary([]byte(k)) - val, err := ds.Get(dskey) - if err != nil { - return nil, err - } - recordBytes, ok := val.([]byte) - if !ok { - return nil, fmt.Errorf("datastore had non byte-slice value for %v", dskey) - } - var record pb.Record - if err := proto.Unmarshal(recordBytes, &record); err != nil { - return nil, errors.New("failed to unmarshal dht record from datastore") - } - return &record, nil -} - -func putRoutingRecord(ds datastore.Datastore, k string, value *pb.Record) error { - data, err := proto.Marshal(value) - if err != nil { - return err - } - dskey := dshelp.NewKeyFromBinary([]byte(k)) - // TODO namespace - return ds.Put(dskey, data) -} - -func putRoutingProviders(ds datastore.Datastore, k string, newRecords []*dhtpb.Message_Peer) error { - log.Event(context.Background(), "putRoutingProviders") - oldRecords, err := getRoutingProviders(ds, k) - if err != nil { - return err - } - mergedRecords := make(map[string]*dhtpb.Message_Peer) - for _, provider := range oldRecords { - mergedRecords[provider.GetId()] = provider // add original records - } - for _, provider := range newRecords { - mergedRecords[provider.GetId()] = provider // overwrite old record if new exists - } - var protomsg dhtpb.Message - protomsg.ProviderPeers = make([]*dhtpb.Message_Peer, 0, len(mergedRecords)) - for _, provider := range mergedRecords { - protomsg.ProviderPeers = append(protomsg.ProviderPeers, provider) - } - data, err := proto.Marshal(&protomsg) - if err != nil { - return err - } - return ds.Put(providerKey(k), data) -} - -func storeProvidersToPeerstore(ps pstore.Peerstore, p peer.ID, providers []*dhtpb.Message_Peer) { - for _, provider := range providers { - providerID := peer.ID(provider.GetId()) - if providerID != p { - log.Errorf("provider message came from third-party %s", p) - continue - } - for _, maddr := range provider.Addresses() { - // as a router, we want to store addresses for peers who have provided - ps.AddAddr(p, maddr, pstore.AddressTTL) - } - } -} - -func getRoutingProviders(ds datastore.Datastore, k string) ([]*dhtpb.Message_Peer, error) { - e := log.EventBegin(context.Background(), "getProviders") - defer e.Done() - var providers []*dhtpb.Message_Peer - if v, err := ds.Get(providerKey(k)); err == nil { - if data, ok := v.([]byte); ok { - var msg dhtpb.Message - if err := proto.Unmarshal(data, &msg); err != nil { - return nil, err - } - providers = append(providers, msg.GetProviderPeers()...) - } - } - return providers, nil -} - -func providerKey(k string) datastore.Key { - return datastore.KeyWithNamespaces([]string{"routing", "providers", k}) -} diff --git a/routing/supernode/server_test.go b/routing/supernode/server_test.go deleted file mode 100644 index 348fbe39268..00000000000 --- a/routing/supernode/server_test.go +++ /dev/null @@ -1,39 +0,0 @@ -package supernode - -import ( - "testing" - - dhtpb "gx/ipfs/QmT7PnPxYkeKPCG8pAnucfcjrXc15Q7FgvFv7YC24EPrw8/go-libp2p-kad-dht/pb" - datastore "gx/ipfs/QmVSase1JP7cq9QkPT46oNwdp9pT6kBkG3oqS14y3QcZjG/go-datastore" -) - -func TestPutProviderDoesntResultInDuplicates(t *testing.T) { - routingBackend := datastore.NewMapDatastore() - k := "foo" - put := []*dhtpb.Message_Peer{ - convPeer("bob", "127.0.0.1/tcp/4001"), - convPeer("alice", "10.0.0.10/tcp/4001"), - } - if err := putRoutingProviders(routingBackend, k, put); err != nil { - t.Fatal(err) - } - if err := putRoutingProviders(routingBackend, k, put); err != nil { - t.Fatal(err) - } - - got, err := getRoutingProviders(routingBackend, k) - if err != nil { - t.Fatal(err) - } - if len(got) != 2 { - t.Fatal("should be 2 values, but there are", len(got)) - } -} - -func convPeer(name string, addrs ...string) *dhtpb.Message_Peer { - var rawAddrs [][]byte - for _, addr := range addrs { - rawAddrs = append(rawAddrs, []byte(addr)) - } - return &dhtpb.Message_Peer{Id: &name, Addrs: rawAddrs} -} diff --git a/test/integration/grandcentral_test.go b/test/integration/grandcentral_test.go deleted file mode 100644 index df6ffdd9858..00000000000 --- a/test/integration/grandcentral_test.go +++ /dev/null @@ -1,180 +0,0 @@ -package integrationtest - -import ( - "bytes" - "errors" - "fmt" - "io" - "math" - "testing" - - context "context" - - core "github.com/ipfs/go-ipfs/core" - "github.com/ipfs/go-ipfs/core/corerouting" - "github.com/ipfs/go-ipfs/core/coreunix" - mock "github.com/ipfs/go-ipfs/core/mock" - ds2 "github.com/ipfs/go-ipfs/thirdparty/datastore2" - "github.com/ipfs/go-ipfs/thirdparty/unit" - pstore "gx/ipfs/QmPgDWmTmuzvP7QE5zwo1TmjbJme9pmZHNujB2453jkCTr/go-libp2p-peerstore" - testutil "gx/ipfs/QmQgLZP9haZheimMHqqAjJh2LhRmNfEoZDfbtkpeMhi9xK/go-testutil" - mocknet "gx/ipfs/QmRQ76P5dgvxTujhfPsCRAG83rC15jgb1G9bKLuomuC6dQ/go-libp2p/p2p/net/mock" -) - -func TestSupernodeBootstrappedAddCat(t *testing.T) { - // create 8 supernode-routing bootstrap nodes - // create 2 supernode-routing clients both bootstrapped to the bootstrap nodes - // let the bootstrap nodes share a single datastore - // add a large file on one node then cat the file from the other - conf := testutil.LatencyConfig{ - NetworkLatency: 0, - RoutingLatency: 0, - BlockstoreLatency: 0, - } - if err := RunSupernodeBootstrappedAddCat(RandomBytes(100*unit.MB), conf); err != nil { - t.Fatal(err) - } -} - -func RunSupernodeBootstrappedAddCat(data []byte, conf testutil.LatencyConfig) error { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - servers, clients, err := InitializeSupernodeNetwork(ctx, 8, 2, conf) - if err != nil { - return err - } - for _, n := range append(servers, clients...) { - defer n.Close() - } - - adder := clients[0] - catter := clients[1] - - log.Info("adder is", adder.Identity) - log.Info("catter is", catter.Identity) - - keyAdded, err := coreunix.Add(adder, bytes.NewReader(data)) - if err != nil { - return err - } - - readerCatted, err := coreunix.Cat(ctx, catter, keyAdded) - if err != nil { - return err - } - - // verify - bufout := new(bytes.Buffer) - io.Copy(bufout, readerCatted) - if 0 != bytes.Compare(bufout.Bytes(), data) { - return errors.New("catted data does not match added data") - } - cancel() - return nil -} - -func InitializeSupernodeNetwork( - ctx context.Context, - numServers, numClients int, - conf testutil.LatencyConfig) ([]*core.IpfsNode, []*core.IpfsNode, error) { - - // create network - mn := mocknet.New(ctx) - - mn.SetLinkDefaults(mocknet.LinkOptions{ - Latency: conf.NetworkLatency, - Bandwidth: math.MaxInt32, - }) - - routingDatastore := ds2.ThreadSafeCloserMapDatastore() - var servers []*core.IpfsNode - for i := 0; i < numServers; i++ { - bootstrap, err := core.NewNode(ctx, &core.BuildCfg{ - Online: true, - Host: mock.MockHostOption(mn), - Routing: corerouting.SupernodeServer(routingDatastore), - }) - if err != nil { - return nil, nil, err - } - servers = append(servers, bootstrap) - } - - var bootstrapInfos []pstore.PeerInfo - for _, n := range servers { - info := n.Peerstore.PeerInfo(n.PeerHost.ID()) - bootstrapInfos = append(bootstrapInfos, info) - } - - var clients []*core.IpfsNode - for i := 0; i < numClients; i++ { - n, err := core.NewNode(ctx, &core.BuildCfg{ - Online: true, - Host: mock.MockHostOption(mn), - Routing: corerouting.SupernodeClient(bootstrapInfos...), - }) - if err != nil { - return nil, nil, err - } - clients = append(clients, n) - } - mn.LinkAll() - - bcfg := core.BootstrapConfigWithPeers(bootstrapInfos) - for _, n := range clients { - if err := n.Bootstrap(bcfg); err != nil { - return nil, nil, err - } - } - return servers, clients, nil -} - -func TestSupernodePutRecordGetRecord(t *testing.T) { - // create 8 supernode-routing bootstrap nodes - // create 2 supernode-routing clients both bootstrapped to the bootstrap nodes - // let the bootstrap nodes share a single datastore - // add a large file on one node then cat the file from the other - conf := testutil.LatencyConfig{ - NetworkLatency: 0, - RoutingLatency: 0, - BlockstoreLatency: 0, - } - if err := RunSupernodePutRecordGetRecord(conf); err != nil { - t.Fatal(err) - } -} - -func RunSupernodePutRecordGetRecord(conf testutil.LatencyConfig) error { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - servers, clients, err := InitializeSupernodeNetwork(ctx, 2, 2, conf) - if err != nil { - return err - } - for _, n := range append(servers, clients...) { - defer n.Close() - } - - putter := clients[0] - getter := clients[1] - - k := "key" - note := []byte("a note from putter") - - if err := putter.Routing.PutValue(ctx, k, note); err != nil { - return fmt.Errorf("failed to put value: %s", err) - } - - received, err := getter.Routing.GetValue(ctx, k) - if err != nil { - return fmt.Errorf("failed to get value: %s", err) - } - - if 0 != bytes.Compare(note, received) { - return errors.New("record doesn't match") - } - cancel() - return nil -} diff --git a/test/supernode_client/.gitignore b/test/supernode_client/.gitignore deleted file mode 100644 index c8d27e8f5c0..00000000000 --- a/test/supernode_client/.gitignore +++ /dev/null @@ -1 +0,0 @@ -.ipfs/ diff --git a/test/supernode_client/main.go b/test/supernode_client/main.go deleted file mode 100644 index b9ce47c8e93..00000000000 --- a/test/supernode_client/main.go +++ /dev/null @@ -1,245 +0,0 @@ -package main - -import ( - "bytes" - "flag" - "fmt" - "io" - "io/ioutil" - "log" - "math" - "os" - gopath "path" - "time" - - random "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-random" - commands "github.com/ipfs/go-ipfs/commands" - core "github.com/ipfs/go-ipfs/core" - corehttp "github.com/ipfs/go-ipfs/core/corehttp" - corerouting "github.com/ipfs/go-ipfs/core/corerouting" - "github.com/ipfs/go-ipfs/core/coreunix" - "github.com/ipfs/go-ipfs/repo" - config "github.com/ipfs/go-ipfs/repo/config" - fsrepo "github.com/ipfs/go-ipfs/repo/fsrepo" - ds2 "github.com/ipfs/go-ipfs/thirdparty/datastore2" - "github.com/ipfs/go-ipfs/thirdparty/ipfsaddr" - unit "github.com/ipfs/go-ipfs/thirdparty/unit" - - context "context" - pstore "gx/ipfs/QmPgDWmTmuzvP7QE5zwo1TmjbJme9pmZHNujB2453jkCTr/go-libp2p-peerstore" - logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log" - ma "gx/ipfs/QmXY77cVe7rVRQXZZQRioukUM7aRW3BTcAgJe12MCtb3Ji/go-multiaddr" -) - -var elog = logging.Logger("gc-client") - -var ( - cat = flag.Bool("cat", false, "else add") - seed = flag.Int64("seed", 1, "") - nBitsForKeypair = flag.Int("b", 1024, "number of bits for keypair (if repo is uninitialized)") -) - -func main() { - flag.Parse() - if err := run(); err != nil { - fmt.Fprintf(os.Stderr, "error: %s\n", err) - os.Exit(1) - } -} - -func run() error { - servers := config.DefaultSNRServers - fmt.Println("using gcr remotes:") - for _, p := range servers { - fmt.Println("\t", p) - } - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - cwd, err := os.Getwd() - if err != nil { - return err - } - repoPath := gopath.Join(cwd, config.DefaultPathName) - _ = ensureRepoInitialized(repoPath) - - repo, err := fsrepo.Open(repoPath) - if err != nil { // owned by node - return err - } - cfg, err := repo.Config() - if err != nil { - return err - } - - cfg.Bootstrap = servers - if err := repo.SetConfig(cfg); err != nil { - return err - } - - var addrs []ipfsaddr.IPFSAddr - for _, info := range servers { - addr, err := ipfsaddr.ParseString(info) - if err != nil { - return err - } - addrs = append(addrs, addr) - } - - var infos []pstore.PeerInfo - for _, addr := range addrs { - infos = append(infos, pstore.PeerInfo{ - ID: addr.ID(), - Addrs: []ma.Multiaddr{addr.Transport()}, - }) - } - - node, err := core.NewNode(ctx, &core.BuildCfg{ - Online: true, - Repo: repo, - Routing: corerouting.SupernodeClient(infos...), - }) - if err != nil { - return err - } - defer node.Close() - - opts := []corehttp.ServeOption{ - corehttp.CommandsOption(cmdCtx(node, repoPath)), - corehttp.GatewayOption(false), - } - - if *cat { - if err := runFileCattingWorker(ctx, node); err != nil { - return err - } - } else { - if err := runFileAddingWorker(node); err != nil { - return err - } - } - return corehttp.ListenAndServe(node, cfg.Addresses.API, opts...) -} - -func ensureRepoInitialized(path string) error { - if !fsrepo.IsInitialized(path) { - conf, err := config.Init(ioutil.Discard, *nBitsForKeypair) - if err != nil { - return err - } - if err := fsrepo.Init(path, conf); err != nil { - return err - } - } - return nil -} - -func sizeOfIthFile(i int64) int64 { - return (1 << uint64(i)) * unit.KB -} - -func runFileAddingWorker(n *core.IpfsNode) error { - errs := make(chan error) - go func() { - var i int64 - for i = 1; i < math.MaxInt32; i++ { - piper, pipew := io.Pipe() - go func() { - defer pipew.Close() - if err := random.WritePseudoRandomBytes(sizeOfIthFile(i), pipew, *seed); err != nil { - errs <- err - } - }() - k, err := coreunix.Add(n, piper) - if err != nil { - errs <- err - } - log.Println("added file", "seed", *seed, "#", i, "key", k, "size", unit.Information(sizeOfIthFile(i))) - time.Sleep(1 * time.Second) - } - }() - - var i int64 - for i = 0; i < math.MaxInt32; i++ { - err := <-errs - if err != nil { - log.Fatal(err) - } - } - - return nil -} - -func runFileCattingWorker(ctx context.Context, n *core.IpfsNode) error { - conf, err := config.Init(ioutil.Discard, *nBitsForKeypair) - if err != nil { - return err - } - - r := &repo.Mock{ - D: ds2.ThreadSafeCloserMapDatastore(), - C: *conf, - } - dummy, err := core.NewNode(ctx, &core.BuildCfg{ - Repo: r, - }) - if err != nil { - return err - } - - errs := make(chan error) - - go func() { - defer dummy.Close() - var i int64 = 1 - for { - buf := new(bytes.Buffer) - if err := random.WritePseudoRandomBytes(sizeOfIthFile(i), buf, *seed); err != nil { - errs <- err - } - // add to a dummy node to discover the key - k, err := coreunix.Add(dummy, bytes.NewReader(buf.Bytes())) - if err != nil { - errs <- err - } - e := elog.EventBegin(ctx, "cat", logging.LoggableF(func() map[string]interface{} { - return map[string]interface{}{ - "key": k, - "localPeer": n.Identity, - } - })) - if r, err := coreunix.Cat(ctx, n, k); err != nil { - e.Done() - log.Printf("failed to cat file. seed: %d #%d key: %s err: %s", *seed, i, k, err) - } else { - log.Println("found file", "seed", *seed, "#", i, "key", k, "size", unit.Information(sizeOfIthFile(i))) - io.Copy(ioutil.Discard, r) - e.Done() - log.Println("catted file", "seed", *seed, "#", i, "key", k, "size", unit.Information(sizeOfIthFile(i))) - i++ - } - time.Sleep(time.Second) - } - }() - - err = <-errs - if err != nil { - log.Fatal(err) - } - - return nil -} - -func cmdCtx(node *core.IpfsNode, repoPath string) commands.Context { - return commands.Context{ - Online: true, - ConfigRoot: repoPath, - LoadConfig: func(path string) (*config.Config, error) { - return node.Repo.Config() - }, - ConstructNode: func() (*core.IpfsNode, error) { - return node, nil - }, - } -}