diff --git a/cmd/ipfs/init.go b/cmd/ipfs/init.go index fc4563f4e9a..697f9516d52 100644 --- a/cmd/ipfs/init.go +++ b/cmd/ipfs/init.go @@ -121,7 +121,7 @@ func doInit(configRoot string, dspathOverride string, force bool, nBitsForKeypai func addTheWelcomeFile(conf *config.Config) error { // TODO extract this file creation operation into a function ctx, cancel := context.WithCancel(context.Background()) - nd, err := core.NewIpfsNode(ctx, conf, false) + nd, err := core.NewIPFSNode(ctx, core.Offline(conf)) if err != nil { return err } diff --git a/cmd/ipfs/main.go b/cmd/ipfs/main.go index 4d7f4350c54..b58b8bf0705 100644 --- a/cmd/ipfs/main.go +++ b/cmd/ipfs/main.go @@ -188,7 +188,7 @@ func (i *cmdInvocation) constructNodeFunc(ctx context.Context) func() (*core.Ipf // ok everything is good. set it on the invocation (for ownership) // and return it. - i.node, err = core.NewIpfsNode(ctx, cfg, cmdctx.Online) + i.node, err = core.NewIPFSNode(ctx, core.Standard(cfg, cmdctx.Online)) return i.node, err } } diff --git a/core/bootstrap.go b/core/bootstrap.go index 8f11596cae3..b2f8a59a2de 100644 --- a/core/bootstrap.go +++ b/core/bootstrap.go @@ -29,7 +29,7 @@ func superviseConnections(parent context.Context, h host.Host, route *dht.IpfsDHT, // TODO depend on abstract interface for testing purposes store peer.Peerstore, - peers []config.BootstrapPeer) error { + peers []peer.PeerInfo) error { for { ctx, _ := context.WithTimeout(parent, connectiontimeout) @@ -51,7 +51,7 @@ func bootstrap(ctx context.Context, h host.Host, r *dht.IpfsDHT, ps peer.Peerstore, - boots []config.BootstrapPeer) error { + bootstrapPeers []peer.PeerInfo) error { connectedPeers := h.Network().Peers() if len(connectedPeers) >= recoveryThreshold { @@ -66,17 +66,6 @@ func bootstrap(ctx context.Context, log.Event(ctx, "bootstrapStart", h.ID()) log.Debugf("%s bootstrapping to %d more nodes", h.ID(), numCxnsToCreate) - var bootstrapPeers []peer.PeerInfo - for _, bootstrap := range boots { - p, err := toPeer(bootstrap) - if err != nil { - log.Event(ctx, "bootstrapError", h.ID(), lgbl.Error(err)) - log.Errorf("%s bootstrap error: %s", h.ID(), err) - return err - } - bootstrapPeers = append(bootstrapPeers, p) - } - var notConnected []peer.PeerInfo for _, p := range bootstrapPeers { if h.Network().Connectedness(p.ID) != inet.Connected { diff --git a/core/core.go b/core/core.go index d847411e436..9408d76ff53 100644 --- a/core/core.go +++ b/core/core.go @@ -6,6 +6,7 @@ import ( context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" b58 "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-base58" ctxgroup "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup" + datastore "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" bstore "github.com/jbenet/go-ipfs/blocks/blockstore" @@ -15,7 +16,7 @@ import ( exchange "github.com/jbenet/go-ipfs/exchange" bitswap "github.com/jbenet/go-ipfs/exchange/bitswap" bsnet "github.com/jbenet/go-ipfs/exchange/bitswap/network" - "github.com/jbenet/go-ipfs/exchange/offline" + offline "github.com/jbenet/go-ipfs/exchange/offline" mount "github.com/jbenet/go-ipfs/fuse/mount" merkledag "github.com/jbenet/go-ipfs/merkledag" namesys "github.com/jbenet/go-ipfs/namesys" @@ -28,9 +29,11 @@ import ( pin "github.com/jbenet/go-ipfs/pin" routing "github.com/jbenet/go-ipfs/routing" dht "github.com/jbenet/go-ipfs/routing/dht" + util "github.com/jbenet/go-ipfs/util" ds2 "github.com/jbenet/go-ipfs/util/datastore2" debugerror "github.com/jbenet/go-ipfs/util/debugerror" eventlog "github.com/jbenet/go-ipfs/util/eventlog" + lgbl "github.com/jbenet/go-ipfs/util/eventlog/loggables" ) const IpnsValidatorTag = "ipns" @@ -38,33 +41,51 @@ const kSizeBlockstoreWriteCache = 100 var log = eventlog.Logger("core") +type mode int + +const ( + // zero value is not a valid mode, must be explicitly set + invalidMode mode = iota + offlineMode + onlineMode +) + // IpfsNode is IPFS Core module. It represents an IPFS instance. type IpfsNode struct { // Self - Config *config.Config // the node's configuration - Identity peer.ID // the local node's identity - PrivateKey ic.PrivKey // the local node's private Key - onlineMode bool // alternatively, offline + Identity peer.ID // the local node's identity - // Local node + // TODO abstract as repo.Repo + Config *config.Config // the node's configuration Datastore ds2.ThreadSafeDatastoreCloser // the local datastore - Pinning pin.Pinner // the pinning manager - Mounts Mounts // current mount state, if any. + + // Local node + Pinning pin.Pinner // the pinning manager + Mounts Mounts // current mount state, if any. // Services - Peerstore peer.Peerstore // storage for other Peer instances - PeerHost p2phost.Host // the network host (server+client) - Routing routing.IpfsRouting // the routing system. recommend ipfs-dht - Exchange exchange.Interface // the block exchange + strategy (bitswap) - Blockstore bstore.Blockstore // the block store (lower level) - Blocks *bserv.BlockService // the block service, get/add blocks. - DAG merkledag.DAGService // the merkle dag service, get/add objects. - Resolver *path.Resolver // the path resolution system - Namesys namesys.NameSystem // the name system, resolves paths to hashes - Diagnostics *diag.Diagnostics // the diagnostics service + Peerstore peer.Peerstore // storage for other Peer instances + Blockstore bstore.Blockstore // the block store (lower level) + Blocks *bserv.BlockService // the block service, get/add blocks. + DAG merkledag.DAGService // the merkle dag service, get/add objects. + Resolver *path.Resolver // the path resolution system + + // Online + PrivateKey ic.PrivKey // the local node's private Key + PeerHost p2phost.Host // the network host (server+client) + Routing routing.IpfsRouting // the routing system. recommend ipfs-dht + Exchange exchange.Interface // the block exchange + strategy (bitswap) + Namesys namesys.NameSystem // the name system, resolves paths to hashes + Diagnostics *diag.Diagnostics // the diagnostics service ctxgroup.ContextGroup + + // dht allows node to Bootstrap when dht is present + // TODO privatize before merging. This is here temporarily during the + // migration of the TestNet constructor + DHT *dht.IpfsDHT + mode mode } // Mounts defines what the node's mount state is. This should @@ -75,67 +96,99 @@ type Mounts struct { Ipns mount.Mount } -// NewIpfsNode constructs a new IpfsNode based on the given config. -func NewIpfsNode(ctx context.Context, cfg *config.Config, online bool) (n *IpfsNode, err error) { - success := false // flip to true after all sub-system inits succeed - defer func() { - if !success && n != nil { - n.Close() - } - }() - - if cfg == nil { - return nil, debugerror.Errorf("configuration required") - } +type ConfigOption func(ctx context.Context) (*IpfsNode, error) - n = &IpfsNode{ - onlineMode: online, - Config: cfg, +func NewIPFSNode(ctx context.Context, option ConfigOption) (*IpfsNode, error) { + node, err := option(ctx) + if err != nil { + return nil, err } - n.ContextGroup = ctxgroup.WithContextAndTeardown(ctx, n.teardown) - ctx = n.ContextGroup.Context() - // setup Peerstore - n.Peerstore = peer.NewPeerstore() + // Need to make sure it's perfectly clear 1) which variables are expected + // to be initialized at this point, and 2) which variables will be + // initialized after this point. - // setup datastore. - if n.Datastore, err = makeDatastore(cfg.Datastore); err != nil { + node.Blocks, err = bserv.New(node.Blockstore, node.Exchange) + if err != nil { return nil, debugerror.Wrap(err) } - - // setup local peer ID (private key is loaded in online setup) - if err := n.loadID(); err != nil { - return nil, err + if node.Peerstore == nil { + node.Peerstore = peer.NewPeerstore() } - - n.Blockstore, err = bstore.WriteCached(bstore.NewBlockstore(n.Datastore), kSizeBlockstoreWriteCache) + node.DAG = merkledag.NewDAGService(node.Blocks) + node.Pinning, err = pin.LoadPinner(node.Datastore, node.DAG) if err != nil { - return nil, debugerror.Wrap(err) + node.Pinning = pin.NewPinner(node.Datastore, node.DAG) } + node.Resolver = &path.Resolver{DAG: node.DAG} + return node, nil +} + +func Offline(cfg *config.Config) ConfigOption { + return Standard(cfg, false) +} + +func Online(cfg *config.Config) ConfigOption { + return Standard(cfg, true) +} + +// DEPRECATED: use Online, Offline functions +func Standard(cfg *config.Config, online bool) ConfigOption { + return func(ctx context.Context) (n *IpfsNode, err error) { + + success := false // flip to true after all sub-system inits succeed + defer func() { + if !success && n != nil { + n.Close() + } + }() - // setup online services - if online { - if err := n.StartOnlineServices(); err != nil { - return nil, err // debugerror.Wraps. + if cfg == nil { + return nil, debugerror.Errorf("configuration required") + } + n = &IpfsNode{ + mode: func() mode { + if online { + return onlineMode + } + return offlineMode + }(), + Config: cfg, } - } else { - n.Exchange = offline.Exchange(n.Blockstore) - } - n.Blocks, err = bserv.New(n.Blockstore, n.Exchange) - if err != nil { - return nil, debugerror.Wrap(err) - } + n.ContextGroup = ctxgroup.WithContextAndTeardown(ctx, n.teardown) + ctx = n.ContextGroup.Context() - n.DAG = merkledag.NewDAGService(n.Blocks) - n.Pinning, err = pin.LoadPinner(n.Datastore, n.DAG) - if err != nil { - n.Pinning = pin.NewPinner(n.Datastore, n.DAG) - } - n.Resolver = &path.Resolver{DAG: n.DAG} + // setup Peerstore + n.Peerstore = peer.NewPeerstore() + + // setup datastore. + if n.Datastore, err = makeDatastore(cfg.Datastore); err != nil { + return nil, debugerror.Wrap(err) + } + + // setup local peer ID (private key is loaded in online setup) + if err := n.loadID(); err != nil { + return nil, err + } + + n.Blockstore, err = bstore.WriteCached(bstore.NewBlockstore(n.Datastore), kSizeBlockstoreWriteCache) + if err != nil { + return nil, debugerror.Wrap(err) + } + + // setup online services + if online { + if err := n.StartOnlineServices(); err != nil { + return nil, err // debugerror.Wraps. + } + } else { + n.Exchange = offline.Exchange(n.Blockstore) + } - success = true - return n, nil + success = true + return n, nil + } } func (n *IpfsNode) StartOnlineServices() error { @@ -150,18 +203,22 @@ func (n *IpfsNode) StartOnlineServices() error { return err } - if err := n.startNetwork(); err != nil { - return err + peerhost, err := constructPeerHost(ctx, n.ContextGroup, n.Config, n.Identity, n.Peerstore) + if err != nil { + return debugerror.Wrap(err) } + n.PeerHost = peerhost // setup diagnostics service n.Diagnostics = diag.NewDiagnostics(n.Identity, n.PeerHost) // setup routing service - dhtRouting := dht.NewDHT(ctx, n.PeerHost, n.Datastore) - dhtRouting.Validators[IpnsValidatorTag] = namesys.ValidateIpnsRecord + dhtRouting, err := constructDHTRouting(ctx, n.ContextGroup, n.PeerHost, n.Datastore) + if err != nil { + return debugerror.Wrap(err) + } + n.DHT = dhtRouting n.Routing = dhtRouting - n.AddChildGroup(dhtRouting) // setup exchange service const alwaysSendToPeer = true // use YesManStrategy @@ -178,35 +235,17 @@ func (n *IpfsNode) StartOnlineServices() error { // an Exchange, Network, or Routing component and have the constructor // manage the wiring. In that scenario, this dangling function is a bit // awkward. - go superviseConnections(ctx, n.PeerHost, dhtRouting, n.Peerstore, n.Config.Bootstrap) - return nil -} - -func (n *IpfsNode) startNetwork() error { - ctx := n.Context() - - // setup the network - listenAddrs, err := listenAddresses(n.Config) - if err != nil { - return debugerror.Wrap(err) - } - // make sure we dont error out if our config includes some addresses we cant use. - listenAddrs = swarm.FilterAddrs(listenAddrs) - network, err := swarm.NewNetwork(ctx, listenAddrs, n.Identity, n.Peerstore) - if err != nil { - return debugerror.Wrap(err) - } - n.AddChildGroup(network.CtxGroup()) - n.PeerHost = p2pbhost.New(network) - - // explicitly set these as our listen addrs. - // (why not do it inside inet.NewNetwork? because this way we can - // listen on addresses without necessarily advertising those publicly.) - addrs, err := n.PeerHost.Network().InterfaceListenAddresses() - if err != nil { - return debugerror.Wrap(err) + var bootstrapPeers []peer.PeerInfo + for _, bootstrap := range n.Config.Bootstrap { + p, err := toPeer(bootstrap) + if err != nil { + log.Event(ctx, "bootstrapError", n.Identity, lgbl.Error(err)) + log.Errorf("%s bootstrap error: %s", n.Identity, err) + return err + } + bootstrapPeers = append(bootstrapPeers, p) } - n.Peerstore.AddAddresses(n.Identity, addrs) + go superviseConnections(ctx, n.PeerHost, n.DHT, n.Peerstore, bootstrapPeers) return nil } @@ -218,7 +257,27 @@ func (n *IpfsNode) teardown() error { } func (n *IpfsNode) OnlineMode() bool { - return n.onlineMode + switch n.mode { + case onlineMode: + return true + default: + return false + } +} + +func (n *IpfsNode) Resolve(k util.Key) (*merkledag.Node, error) { + return (&path.Resolver{n.DAG}).ResolvePath(k.String()) +} + +// Bootstrap is undefined when node is not in OnlineMode +func (n *IpfsNode) Bootstrap(ctx context.Context, peers []peer.PeerInfo) error { + + // TODO what should return value be when in offlineMode? + + if n.DHT != nil { + return bootstrap(ctx, n.PeerHost, n.DHT, n.Peerstore, peers) + } + return nil } func (n *IpfsNode) loadID() error { @@ -289,3 +348,36 @@ func listenAddresses(cfg *config.Config) ([]ma.Multiaddr, error) { return listen, nil } + +// isolates the complex initialization steps +func constructPeerHost(ctx context.Context, ctxg ctxgroup.ContextGroup, cfg *config.Config, id peer.ID, ps peer.Peerstore) (p2phost.Host, error) { + listenAddrs, err := listenAddresses(cfg) + // make sure we dont error out if our config includes some addresses we cant use. + filteredAddrs := swarm.FilterAddrs(listenAddrs) + if err != nil { + return nil, debugerror.Wrap(err) + } + network, err := swarm.NewNetwork(ctx, filteredAddrs, id, ps) + if err != nil { + return nil, debugerror.Wrap(err) + } + ctxg.AddChildGroup(network.CtxGroup()) + + peerhost := p2pbhost.New(network) + // explicitly set these as our listen addrs. + // (why not do it inside inet.NewNetwork? because this way we can + // listen on addresses without necessarily advertising those publicly.) + addrs, err := peerhost.Network().InterfaceListenAddresses() + if err != nil { + return nil, debugerror.Wrap(err) + } + ps.AddAddresses(id, addrs) + return peerhost, nil +} + +func constructDHTRouting(ctx context.Context, ctxg ctxgroup.ContextGroup, host p2phost.Host, ds datastore.ThreadSafeDatastore) (*dht.IpfsDHT, error) { + dhtRouting := dht.NewDHT(ctx, host, ds) + dhtRouting.Validators[IpnsValidatorTag] = namesys.ValidateIpnsRecord + ctxg.AddChildGroup(dhtRouting) + return dhtRouting, nil +} diff --git a/core/core_test.go b/core/core_test.go index 619adbb508b..12c8e554f22 100644 --- a/core/core_test.go +++ b/core/core_test.go @@ -45,14 +45,14 @@ func TestInitialization(t *testing.T) { } for i, c := range good { - n, err := NewIpfsNode(ctx, c, false) + n, err := NewIPFSNode(ctx, Standard(c, false)) if n == nil || err != nil { t.Error("Should have constructed.", i, err) } } for i, c := range bad { - n, err := NewIpfsNode(ctx, c, false) + n, err := NewIPFSNode(ctx, Standard(c, false)) if n != nil || err == nil { t.Error("Should have failed to construct.", i) } diff --git a/core/io/add.go b/core/io/add.go new file mode 100644 index 00000000000..384cbba903e --- /dev/null +++ b/core/io/add.go @@ -0,0 +1,35 @@ +package core_io + +// TODO rename package to something that doesn't conflict with io/ioutil. +// Pretty names are hard to find. +// +// Candidates: +// +// go-ipfs/core/unix +// go-ipfs/core/io +// go-ipfs/core/ioutil +// go-ipfs/core/coreio +// go-ipfs/core/coreunix + +import ( + "io" + + core "github.com/jbenet/go-ipfs/core" + importer "github.com/jbenet/go-ipfs/importer" + chunk "github.com/jbenet/go-ipfs/importer/chunk" + u "github.com/jbenet/go-ipfs/util" +) + +func Add(n *core.IpfsNode, r io.Reader) (u.Key, error) { + // TODO more attractive function signature importer.BuildDagFromReader + dagNode, err := importer.BuildDagFromReader( + r, + n.DAG, + nil, + chunk.DefaultSplitter, + ) + if err != nil { + return "", err + } + return dagNode.Key() +} diff --git a/core/io/cat.go b/core/io/cat.go new file mode 100644 index 00000000000..0e0f2712a5a --- /dev/null +++ b/core/io/cat.go @@ -0,0 +1,28 @@ +package core_io + +// TODO rename package to something that doesn't conflict with io/ioutil. +// Pretty names are hard to find. +// +// Candidates: +// +// go-ipfs/core/unix +// go-ipfs/core/io +// go-ipfs/core/ioutil +// go-ipfs/core/coreio +// go-ipfs/core/coreunix + +import ( + "io" + + core "github.com/jbenet/go-ipfs/core" + uio "github.com/jbenet/go-ipfs/unixfs/io" + u "github.com/jbenet/go-ipfs/util" +) + +func Cat(n *core.IpfsNode, k u.Key) (io.Reader, error) { + dagNode, err := n.Resolve(k) + if err != nil { + return nil, err + } + return uio.NewDagReader(dagNode, n.DAG) +} diff --git a/test/epictest/addcat_test.go b/test/epictest/addcat_test.go index ff6e899db92..d3a99e9328a 100644 --- a/test/epictest/addcat_test.go +++ b/test/epictest/addcat_test.go @@ -11,14 +11,18 @@ import ( context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" random "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-random" + "github.com/jbenet/go-ipfs/core" + core_io "github.com/jbenet/go-ipfs/core/io" mocknet "github.com/jbenet/go-ipfs/p2p/net/mock" + "github.com/jbenet/go-ipfs/p2p/peer" errors "github.com/jbenet/go-ipfs/util/debugerror" + testutil "github.com/jbenet/go-ipfs/util/testutil" ) const kSeed = 1 func Test1KBInstantaneous(t *testing.T) { - conf := Config{ + conf := testutil.LatencyConfig{ NetworkLatency: 0, RoutingLatency: 0, BlockstoreLatency: 0, @@ -31,7 +35,7 @@ func Test1KBInstantaneous(t *testing.T) { func TestDegenerateSlowBlockstore(t *testing.T) { SkipUnlessEpic(t) - conf := Config{BlockstoreLatency: 50 * time.Millisecond} + conf := testutil.LatencyConfig{BlockstoreLatency: 50 * time.Millisecond} if err := AddCatPowers(conf, 128); err != nil { t.Fatal(err) } @@ -39,7 +43,7 @@ func TestDegenerateSlowBlockstore(t *testing.T) { func TestDegenerateSlowNetwork(t *testing.T) { SkipUnlessEpic(t) - conf := Config{NetworkLatency: 400 * time.Millisecond} + conf := testutil.LatencyConfig{NetworkLatency: 400 * time.Millisecond} if err := AddCatPowers(conf, 128); err != nil { t.Fatal(err) } @@ -47,7 +51,7 @@ func TestDegenerateSlowNetwork(t *testing.T) { func TestDegenerateSlowRouting(t *testing.T) { SkipUnlessEpic(t) - conf := Config{RoutingLatency: 400 * time.Millisecond} + conf := testutil.LatencyConfig{RoutingLatency: 400 * time.Millisecond} if err := AddCatPowers(conf, 128); err != nil { t.Fatal(err) } @@ -55,13 +59,13 @@ func TestDegenerateSlowRouting(t *testing.T) { func Test100MBMacbookCoastToCoast(t *testing.T) { SkipUnlessEpic(t) - conf := Config{}.Network_NYtoSF().Blockstore_SlowSSD2014().Routing_Slow() + conf := testutil.LatencyConfig{}.Network_NYtoSF().Blockstore_SlowSSD2014().Routing_Slow() if err := DirectAddCat(RandomBytes(100*1024*1024), conf); err != nil { t.Fatal(err) } } -func AddCatPowers(conf Config, megabytesMax int64) error { +func AddCatPowers(conf testutil.LatencyConfig, megabytesMax int64) error { var i int64 for i = 1; i < megabytesMax; i = i * 2 { fmt.Printf("%d MB\n", i) @@ -78,7 +82,7 @@ func RandomBytes(n int64) []byte { return data.Bytes() } -func DirectAddCat(data []byte, conf Config) error { +func DirectAddCat(data []byte, conf testutil.LatencyConfig) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() const numPeers = 2 @@ -99,24 +103,24 @@ func DirectAddCat(data []byte, conf Config) error { return errors.New("test initialization error") } - adder, err := makeCore(ctx, MocknetTestRepo(peers[0], mn.Host(peers[0]), conf)) + adder, err := core.NewIPFSNode(ctx, core.ConfigOption(MocknetTestRepo(peers[0], mn.Host(peers[0]), conf))) if err != nil { return err } - catter, err := makeCore(ctx, MocknetTestRepo(peers[1], mn.Host(peers[1]), conf)) + catter, err := core.NewIPFSNode(ctx, core.ConfigOption(MocknetTestRepo(peers[1], mn.Host(peers[1]), conf))) if err != nil { return err } - adder.Bootstrap(ctx, catter.ID()) - catter.Bootstrap(ctx, adder.ID()) + catter.Bootstrap(ctx, []peer.PeerInfo{adder.Peerstore.PeerInfo(adder.Identity)}) + adder.Bootstrap(ctx, []peer.PeerInfo{catter.Peerstore.PeerInfo(catter.Identity)}) - keyAdded, err := adder.Add(bytes.NewReader(data)) + keyAdded, err := core_io.Add(adder, bytes.NewReader(data)) if err != nil { return err } - readerCatted, err := catter.Cat(keyAdded) + readerCatted, err := core_io.Cat(catter, keyAdded) if err != nil { return err } diff --git a/test/epictest/bench_test.go b/test/epictest/bench_test.go index 323ad62658b..72c82871b70 100644 --- a/test/epictest/bench_test.go +++ b/test/epictest/bench_test.go @@ -1,8 +1,12 @@ package epictest -import "testing" +import ( + "testing" -func benchmarkAddCat(numBytes int64, conf Config, b *testing.B) { + testutil "github.com/jbenet/go-ipfs/util/testutil" +) + +func benchmarkAddCat(numBytes int64, conf testutil.LatencyConfig, b *testing.B) { b.StopTimer() b.SetBytes(numBytes) @@ -16,7 +20,7 @@ func benchmarkAddCat(numBytes int64, conf Config, b *testing.B) { } } -var instant = Config{}.All_Instantaneous() +var instant = testutil.LatencyConfig{}.All_Instantaneous() func BenchmarkInstantaneousAddCat1KB(b *testing.B) { benchmarkAddCat(1*KB, instant, b) } func BenchmarkInstantaneousAddCat1MB(b *testing.B) { benchmarkAddCat(1*MB, instant, b) } @@ -29,7 +33,7 @@ func BenchmarkInstantaneousAddCat64MB(b *testing.B) { benchmarkAddCat(64*MB, in func BenchmarkInstantaneousAddCat128MB(b *testing.B) { benchmarkAddCat(128*MB, instant, b) } func BenchmarkInstantaneousAddCat256MB(b *testing.B) { benchmarkAddCat(256*MB, instant, b) } -var routing = Config{}.Routing_Slow() +var routing = testutil.LatencyConfig{}.Routing_Slow() func BenchmarkRoutingSlowAddCat1MB(b *testing.B) { benchmarkAddCat(1*MB, routing, b) } func BenchmarkRoutingSlowAddCat2MB(b *testing.B) { benchmarkAddCat(2*MB, routing, b) } @@ -42,7 +46,7 @@ func BenchmarkRoutingSlowAddCat128MB(b *testing.B) { benchmarkAddCat(128*MB, rou func BenchmarkRoutingSlowAddCat256MB(b *testing.B) { benchmarkAddCat(256*MB, routing, b) } func BenchmarkRoutingSlowAddCat512MB(b *testing.B) { benchmarkAddCat(512*MB, routing, b) } -var network = Config{}.Network_NYtoSF() +var network = testutil.LatencyConfig{}.Network_NYtoSF() func BenchmarkNetworkSlowAddCat1MB(b *testing.B) { benchmarkAddCat(1*MB, network, b) } func BenchmarkNetworkSlowAddCat2MB(b *testing.B) { benchmarkAddCat(2*MB, network, b) } @@ -54,7 +58,7 @@ func BenchmarkNetworkSlowAddCat64MB(b *testing.B) { benchmarkAddCat(64*MB, netw func BenchmarkNetworkSlowAddCat128MB(b *testing.B) { benchmarkAddCat(128*MB, network, b) } func BenchmarkNetworkSlowAddCat256MB(b *testing.B) { benchmarkAddCat(256*MB, network, b) } -var hdd = Config{}.Blockstore_7200RPM() +var hdd = testutil.LatencyConfig{}.Blockstore_7200RPM() func BenchmarkBlockstoreSlowAddCat1MB(b *testing.B) { benchmarkAddCat(1*MB, hdd, b) } func BenchmarkBlockstoreSlowAddCat2MB(b *testing.B) { benchmarkAddCat(2*MB, hdd, b) } @@ -66,7 +70,7 @@ func BenchmarkBlockstoreSlowAddCat64MB(b *testing.B) { benchmarkAddCat(64*MB, h func BenchmarkBlockstoreSlowAddCat128MB(b *testing.B) { benchmarkAddCat(128*MB, hdd, b) } func BenchmarkBlockstoreSlowAddCat256MB(b *testing.B) { benchmarkAddCat(256*MB, hdd, b) } -var mixed = Config{}.Network_NYtoSF().Blockstore_SlowSSD2014().Routing_Slow() +var mixed = testutil.LatencyConfig{}.Network_NYtoSF().Blockstore_SlowSSD2014().Routing_Slow() func BenchmarkMixedAddCat1MBXX(b *testing.B) { benchmarkAddCat(1*MB, mixed, b) } func BenchmarkMixedAddCat2MBXX(b *testing.B) { benchmarkAddCat(2*MB, mixed, b) } diff --git a/test/epictest/core.go b/test/epictest/core.go index e89db1522c8..d5aef22f4a6 100644 --- a/test/epictest/core.go +++ b/test/epictest/core.go @@ -1,141 +1,31 @@ package epictest import ( - "io" - context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" datastore "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" sync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync" blockstore "github.com/jbenet/go-ipfs/blocks/blockstore" - blockservice "github.com/jbenet/go-ipfs/blockservice" - exchange "github.com/jbenet/go-ipfs/exchange" + core "github.com/jbenet/go-ipfs/core" bitswap "github.com/jbenet/go-ipfs/exchange/bitswap" bsnet "github.com/jbenet/go-ipfs/exchange/bitswap/network" - importer "github.com/jbenet/go-ipfs/importer" - chunk "github.com/jbenet/go-ipfs/importer/chunk" - merkledag "github.com/jbenet/go-ipfs/merkledag" host "github.com/jbenet/go-ipfs/p2p/host" peer "github.com/jbenet/go-ipfs/p2p/peer" - path "github.com/jbenet/go-ipfs/path" dht "github.com/jbenet/go-ipfs/routing/dht" - uio "github.com/jbenet/go-ipfs/unixfs/io" - util "github.com/jbenet/go-ipfs/util" "github.com/jbenet/go-ipfs/util/datastore2" delay "github.com/jbenet/go-ipfs/util/delay" eventlog "github.com/jbenet/go-ipfs/util/eventlog" + testutil "github.com/jbenet/go-ipfs/util/testutil" ) var log = eventlog.Logger("epictest") -// TODO merge with core.IpfsNode -type core struct { - repo Repo - - blockService *blockservice.BlockService - blockstore blockstore.Blockstore - dag merkledag.DAGService - id peer.ID -} - -func (c *core) ID() peer.ID { - return c.repo.ID() -} - -func (c *core) Bootstrap(ctx context.Context, p peer.ID) error { - return c.repo.Bootstrap(ctx, p) -} - -func (c *core) Cat(k util.Key) (io.Reader, error) { - catterdag := c.dag - nodeCatted, err := (&path.Resolver{catterdag}).ResolvePath(k.String()) - if err != nil { - return nil, err - } - return uio.NewDagReader(nodeCatted, catterdag) -} - -func (c *core) Add(r io.Reader) (util.Key, error) { - nodeAdded, err := importer.BuildDagFromReader( - r, - c.dag, - nil, - chunk.DefaultSplitter, - ) - if err != nil { - return "", err - } - return nodeAdded.Key() -} - -func makeCore(ctx context.Context, rf RepoFactory) (*core, error) { - repo, err := rf(ctx) - if err != nil { - return nil, err - } - - bss, err := blockservice.New(repo.Blockstore(), repo.Exchange()) - if err != nil { - return nil, err - } - - dag := merkledag.NewDAGService(bss) - // to make sure nothing is omitted, init each individual field and assign - // all at once at the bottom. - return &core{ - repo: repo, - blockService: bss, - dag: dag, - }, nil -} - -type RepoFactory func(ctx context.Context) (Repo, error) - -type Repo interface { - ID() peer.ID - Blockstore() blockstore.Blockstore - Exchange() exchange.Interface - - Bootstrap(ctx context.Context, peer peer.ID) error -} - -type repo struct { - // DHT, Exchange, Network,Datastore - bitSwapNetwork bsnet.BitSwapNetwork - blockstore blockstore.Blockstore - exchange exchange.Interface - datastore datastore.ThreadSafeDatastore - host host.Host - dht *dht.IpfsDHT - id peer.ID -} - -func (r *repo) ID() peer.ID { - return r.id -} - -func (c *repo) Bootstrap(ctx context.Context, p peer.ID) error { - return c.dht.Connect(ctx, p) -} - -func (r *repo) Datastore() datastore.ThreadSafeDatastore { - return r.datastore -} - -func (r *repo) Blockstore() blockstore.Blockstore { - return r.blockstore -} - -func (r *repo) Exchange() exchange.Interface { - return r.exchange -} - -func MocknetTestRepo(p peer.ID, h host.Host, conf Config) RepoFactory { - return func(ctx context.Context) (Repo, error) { +func MocknetTestRepo(p peer.ID, h host.Host, conf testutil.LatencyConfig) core.ConfigOption { + return func(ctx context.Context) (*core.IpfsNode, error) { const kWriteCacheElems = 100 const alwaysSendToPeer = true dsDelay := delay.Fixed(conf.BlockstoreLatency) - ds := sync.MutexWrap(datastore2.WithDelay(datastore.NewMapDatastore(), dsDelay)) + ds := datastore2.CloserWrap(sync.MutexWrap(datastore2.WithDelay(datastore.NewMapDatastore(), dsDelay))) log.Debugf("MocknetTestRepo: %s %s %s", p, h.ID(), h) dhtt := dht.NewDHT(ctx, h, ds) @@ -145,14 +35,15 @@ func MocknetTestRepo(p peer.ID, h host.Host, conf Config) RepoFactory { return nil, err } exch := bitswap.New(ctx, p, bsn, bstore, alwaysSendToPeer) - return &repo{ - bitSwapNetwork: bsn, - blockstore: bstore, - exchange: exch, - datastore: ds, - host: h, - dht: dhtt, - id: p, + return &core.IpfsNode{ + Peerstore: h.Peerstore(), + Blockstore: bstore, + Exchange: exch, + Datastore: ds, + PeerHost: h, + Routing: dhtt, + Identity: p, + DHT: dhtt, }, nil } } diff --git a/test/epictest/test_config.go b/test/epictest/test_config.go index e3d34110e83..619ed8ae41b 100644 --- a/test/epictest/test_config.go +++ b/test/epictest/test_config.go @@ -1,48 +1 @@ package epictest - -import "time" - -type Config struct { - BlockstoreLatency time.Duration - NetworkLatency time.Duration - RoutingLatency time.Duration -} - -func (c Config) All_Instantaneous() Config { - // Could use a zero value but whatever. Consistency of interface - c.NetworkLatency = 0 - c.RoutingLatency = 0 - c.BlockstoreLatency = 0 - return c -} - -func (c Config) Network_NYtoSF() Config { - c.NetworkLatency = 20 * time.Millisecond - return c -} - -func (c Config) Network_IntraDatacenter2014() Config { - c.NetworkLatency = 250 * time.Microsecond - return c -} - -func (c Config) Blockstore_FastSSD2014() Config { - const iops = 100000 - c.BlockstoreLatency = (1 / iops) * time.Second - return c -} - -func (c Config) Blockstore_SlowSSD2014() Config { - c.BlockstoreLatency = 150 * time.Microsecond - return c -} - -func (c Config) Blockstore_7200RPM() Config { - c.BlockstoreLatency = 8 * time.Millisecond - return c -} - -func (c Config) Routing_Slow() Config { - c.RoutingLatency = 200 * time.Millisecond - return c -} diff --git a/test/epictest/three_legged_cat_test.go b/test/epictest/three_legged_cat_test.go index 38147e426eb..b33b07077ef 100644 --- a/test/epictest/three_legged_cat_test.go +++ b/test/epictest/three_legged_cat_test.go @@ -7,12 +7,16 @@ import ( "testing" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" + core "github.com/jbenet/go-ipfs/core" + core_io "github.com/jbenet/go-ipfs/core/io" mocknet "github.com/jbenet/go-ipfs/p2p/net/mock" + "github.com/jbenet/go-ipfs/p2p/peer" errors "github.com/jbenet/go-ipfs/util/debugerror" + testutil "github.com/jbenet/go-ipfs/util/testutil" ) func TestThreeLeggedCat(t *testing.T) { - conf := Config{ + conf := testutil.LatencyConfig{ NetworkLatency: 0, RoutingLatency: 0, BlockstoreLatency: 0, @@ -22,7 +26,7 @@ func TestThreeLeggedCat(t *testing.T) { } } -func RunThreeLeggedCat(data []byte, conf Config) error { +func RunThreeLeggedCat(data []byte, conf testutil.LatencyConfig) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() const numPeers = 3 @@ -42,28 +46,28 @@ func RunThreeLeggedCat(data []byte, conf Config) error { if len(peers) < numPeers { return errors.New("test initialization error") } - adder, err := makeCore(ctx, MocknetTestRepo(peers[0], mn.Host(peers[0]), conf)) + adder, err := core.NewIPFSNode(ctx, core.ConfigOption(MocknetTestRepo(peers[0], mn.Host(peers[0]), conf))) if err != nil { return err } - catter, err := makeCore(ctx, MocknetTestRepo(peers[1], mn.Host(peers[1]), conf)) + catter, err := core.NewIPFSNode(ctx, core.ConfigOption(MocknetTestRepo(peers[1], mn.Host(peers[1]), conf))) if err != nil { return err } - bootstrap, err := makeCore(ctx, MocknetTestRepo(peers[2], mn.Host(peers[2]), conf)) + bootstrap, err := core.NewIPFSNode(ctx, core.ConfigOption(MocknetTestRepo(peers[2], mn.Host(peers[2]), conf))) if err != nil { return err } + boostrapInfo := bootstrap.Peerstore.PeerInfo(bootstrap.PeerHost.ID()) + adder.Bootstrap(ctx, []peer.PeerInfo{boostrapInfo}) + catter.Bootstrap(ctx, []peer.PeerInfo{boostrapInfo}) - adder.Bootstrap(ctx, bootstrap.ID()) - catter.Bootstrap(ctx, bootstrap.ID()) - - keyAdded, err := adder.Add(bytes.NewReader(data)) + keyAdded, err := core_io.Add(adder, bytes.NewReader(data)) if err != nil { return err } - readerCatted, err := catter.Cat(keyAdded) + readerCatted, err := core_io.Cat(catter, keyAdded) if err != nil { return err } diff --git a/util/testutil/latency_config.go b/util/testutil/latency_config.go new file mode 100644 index 00000000000..767b9a63874 --- /dev/null +++ b/util/testutil/latency_config.go @@ -0,0 +1,48 @@ +package testutil + +import "time" + +type LatencyConfig struct { + BlockstoreLatency time.Duration + NetworkLatency time.Duration + RoutingLatency time.Duration +} + +func (c LatencyConfig) All_Instantaneous() LatencyConfig { + // Could use a zero value but whatever. Consistency of interface + c.NetworkLatency = 0 + c.RoutingLatency = 0 + c.BlockstoreLatency = 0 + return c +} + +func (c LatencyConfig) Network_NYtoSF() LatencyConfig { + c.NetworkLatency = 20 * time.Millisecond + return c +} + +func (c LatencyConfig) Network_IntraDatacenter2014() LatencyConfig { + c.NetworkLatency = 250 * time.Microsecond + return c +} + +func (c LatencyConfig) Blockstore_FastSSD2014() LatencyConfig { + const iops = 100000 + c.BlockstoreLatency = (1 / iops) * time.Second + return c +} + +func (c LatencyConfig) Blockstore_SlowSSD2014() LatencyConfig { + c.BlockstoreLatency = 150 * time.Microsecond + return c +} + +func (c LatencyConfig) Blockstore_7200RPM() LatencyConfig { + c.BlockstoreLatency = 8 * time.Millisecond + return c +} + +func (c LatencyConfig) Routing_Slow() LatencyConfig { + c.RoutingLatency = 200 * time.Millisecond + return c +}