diff --git a/client/httpapi/api.go b/client/httpapi/api.go new file mode 100644 index 00000000000..17a289cc597 --- /dev/null +++ b/client/httpapi/api.go @@ -0,0 +1,183 @@ +package httpapi + +import ( + "fmt" + "io/ioutil" + gohttp "net/http" + "os" + "path" + "strings" + + iface "github.com/ipfs/interface-go-ipfs-core" + caopts "github.com/ipfs/interface-go-ipfs-core/options" + homedir "github.com/mitchellh/go-homedir" + ma "github.com/multiformats/go-multiaddr" + manet "github.com/multiformats/go-multiaddr-net" +) + +const ( + DefaultPathName = ".ipfs" + DefaultPathRoot = "~/" + DefaultPathName + DefaultApiFile = "api" + EnvDir = "IPFS_PATH" +) + +// HttpApi implements github.com/ipfs/interface-go-ipfs-core/CoreAPI using +// IPFS HTTP API. +// +// For interface docs see +// https://godoc.org/github.com/ipfs/interface-go-ipfs-core#CoreAPI +type HttpApi struct { + url string + httpcli gohttp.Client + + applyGlobal func(*RequestBuilder) +} + +// NewLocalApi tries to construct new HttpApi instance communicating with local +// IPFS daemon +// +// Daemon api address is pulled from the $IPFS_PATH/api file. +// If $IPFS_PATH env var is not present, it defaults to ~/.ipfs +func NewLocalApi() (iface.CoreAPI, error) { + baseDir := os.Getenv(EnvDir) + if baseDir == "" { + baseDir = DefaultPathRoot + } + + return NewPathApi(baseDir) +} + +// NewPathApi constructs new HttpApi by pulling api address from specified +// ipfspath. Api file should be located at $ipfspath/api +func NewPathApi(ipfspath string) (iface.CoreAPI, error) { + a, err := ApiAddr(ipfspath) + if err != nil { + if os.IsNotExist(err) { + err = nil + } + return nil, err + } + return NewApi(a) +} + +// ApiAddr reads api file in specified ipfs path +func ApiAddr(ipfspath string) (ma.Multiaddr, error) { + baseDir, err := homedir.Expand(ipfspath) + if err != nil { + return nil, err + } + + apiFile := path.Join(baseDir, DefaultApiFile) + + api, err := ioutil.ReadFile(apiFile) + if err != nil { + return nil, err + } + + return ma.NewMultiaddr(strings.TrimSpace(string(api))) +} + +// NewApi constructs HttpApi with specified endpoint +func NewApi(a ma.Multiaddr) (*HttpApi, error) { + c := &gohttp.Client{ + Transport: &gohttp.Transport{ + Proxy: gohttp.ProxyFromEnvironment, + DisableKeepAlives: true, + }, + } + + return NewApiWithClient(a, c) +} + +// NewApiWithClient constructs HttpApi with specified endpoint and custom http client +func NewApiWithClient(a ma.Multiaddr, c *gohttp.Client) (*HttpApi, error) { + _, url, err := manet.DialArgs(a) + if err != nil { + return nil, err + } + + if a, err := ma.NewMultiaddr(url); err == nil { + _, host, err := manet.DialArgs(a) + if err == nil { + url = host + } + } + + api := &HttpApi{ + url: url, + httpcli: *c, + applyGlobal: func(*RequestBuilder) {}, + } + + // We don't support redirects. + api.httpcli.CheckRedirect = func(_ *gohttp.Request, _ []*gohttp.Request) error { + return fmt.Errorf("unexpected redirect") + } + + return api, nil +} + +func (api *HttpApi) WithOptions(opts ...caopts.ApiOption) (iface.CoreAPI, error) { + options, err := caopts.ApiOptions(opts...) + if err != nil { + return nil, err + } + + subApi := *api + subApi.applyGlobal = func(req *RequestBuilder) { + if options.Offline { + req.Option("offline", options.Offline) + } + } + + return &subApi, nil +} + +func (api *HttpApi) request(command string, args ...string) *RequestBuilder { + return &RequestBuilder{ + command: command, + args: args, + shell: api, + } +} + +func (api *HttpApi) Unixfs() iface.UnixfsAPI { + return (*UnixfsAPI)(api) +} + +func (api *HttpApi) Block() iface.BlockAPI { + return (*BlockAPI)(api) +} + +func (api *HttpApi) Dag() iface.APIDagService { + return (*HttpDagServ)(api) +} + +func (api *HttpApi) Name() iface.NameAPI { + return (*NameAPI)(api) +} + +func (api *HttpApi) Key() iface.KeyAPI { + return (*KeyAPI)(api) +} + +func (api *HttpApi) Pin() iface.PinAPI { + return (*PinAPI)(api) +} + +func (api *HttpApi) Object() iface.ObjectAPI { + return (*ObjectAPI)(api) +} + +func (api *HttpApi) Dht() iface.DhtAPI { + return (*DhtAPI)(api) +} + +func (api *HttpApi) Swarm() iface.SwarmAPI { + return (*SwarmAPI)(api) +} + +func (api *HttpApi) PubSub() iface.PubSubAPI { + return (*PubsubAPI)(api) +} diff --git a/client/httpapi/api_test.go b/client/httpapi/api_test.go new file mode 100644 index 00000000000..df45c15af50 --- /dev/null +++ b/client/httpapi/api_test.go @@ -0,0 +1,213 @@ +package httpapi + +import ( + "context" + "io/ioutil" + gohttp "net/http" + "os" + "strconv" + "sync" + "testing" + + "github.com/ipfs/interface-go-ipfs-core" + "github.com/ipfs/interface-go-ipfs-core/tests" + local "github.com/ipfs/iptb-plugins/local" + "github.com/ipfs/iptb/testbed" + "github.com/ipfs/iptb/testbed/interfaces" + ma "github.com/multiformats/go-multiaddr" +) + +const parallelSpeculativeNodes = 15 // 15 seems to work best + +func init() { + _, err := testbed.RegisterPlugin(testbed.IptbPlugin{ + From: "", + NewNode: local.NewNode, + GetAttrList: local.GetAttrList, + GetAttrDesc: local.GetAttrDesc, + PluginName: local.PluginName, + BuiltIn: true, + }, false) + if err != nil { + panic(err) + } +} + +type NodeProvider struct { + simple <-chan func(context.Context) ([]iface.CoreAPI, error) +} + +func newNodeProvider(ctx context.Context) *NodeProvider { + simpleNodes := make(chan func(context.Context) ([]iface.CoreAPI, error), parallelSpeculativeNodes) + + np := &NodeProvider{ + simple: simpleNodes, + } + + // start basic nodes speculatively in parallel + for i := 0; i < parallelSpeculativeNodes; i++ { + go func() { + for { + ctx, cancel := context.WithCancel(ctx) + + snd, err := np.makeAPISwarm(ctx, false, 1) + + res := func(ctx context.Context) ([]iface.CoreAPI, error) { + if err != nil { + return nil, err + } + + go func() { + <-ctx.Done() + cancel() + }() + + return snd, nil + } + + select { + case simpleNodes <- res: + case <-ctx.Done(): + return + } + } + }() + } + + return np +} + +func (np *NodeProvider) MakeAPISwarm(ctx context.Context, fullIdentity bool, n int) ([]iface.CoreAPI, error) { + if !fullIdentity && n == 1 { + return (<-np.simple)(ctx) + } + return np.makeAPISwarm(ctx, fullIdentity, n) +} + +func (NodeProvider) makeAPISwarm(ctx context.Context, fullIdentity bool, n int) ([]iface.CoreAPI, error) { + + dir, err := ioutil.TempDir("", "httpapi-tb-") + if err != nil { + return nil, err + } + + tb := testbed.NewTestbed(dir) + + specs, err := testbed.BuildSpecs(tb.Dir(), n, "localipfs", nil) + if err != nil { + return nil, err + } + + if err := testbed.WriteNodeSpecs(tb.Dir(), specs); err != nil { + return nil, err + } + + nodes, err := tb.Nodes() + if err != nil { + return nil, err + } + + apis := make([]iface.CoreAPI, n) + + wg := sync.WaitGroup{} + zero := sync.WaitGroup{} + + wg.Add(len(nodes)) + zero.Add(1) + errs := make(chan error, len(nodes)) + + for i, nd := range nodes { + go func(i int, nd testbedi.Core) { + defer wg.Done() + + if _, err := nd.Init(ctx, "--empty-repo"); err != nil { + errs <- err + return + } + + if _, err := nd.RunCmd(ctx, nil, "ipfs", "config", "--json", "Experimental.FilestoreEnabled", "true"); err != nil { + errs <- err + return + } + + if _, err := nd.Start(ctx, true, "--enable-pubsub-experiment", "--offline="+strconv.FormatBool(n == 1)); err != nil { + errs <- err + return + } + + if i > 0 { + zero.Wait() + if err := nd.Connect(ctx, nodes[0]); err != nil { + errs <- err + return + } + } else { + zero.Done() + } + + addr, err := nd.APIAddr() + if err != nil { + errs <- err + return + } + + maddr, err := ma.NewMultiaddr(addr) + if err != nil { + errs <- err + return + } + + c := &gohttp.Client{ + Transport: &gohttp.Transport{ + Proxy: gohttp.ProxyFromEnvironment, + DisableKeepAlives: true, + DisableCompression: true, + }, + } + apis[i], err = NewApiWithClient(maddr, c) + if err != nil { + errs <- err + return + } + + // empty node is pinned even with --empty-repo, we don't want that + emptyNode, err := iface.ParsePath("/ipfs/QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn") + if err != nil { + errs <- err + return + } + if err := apis[i].Pin().Rm(ctx, emptyNode); err != nil { + errs <- err + return + } + }(i, nd) + } + + wg.Wait() + + go func() { + <-ctx.Done() + + defer os.Remove(dir) + + defer func() { + for _, nd := range nodes { + _ = nd.Stop(context.Background()) + } + }() + }() + + select { + case err = <-errs: + default: + } + + return apis, err +} + +func TestHttpApi(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + tests.TestApi(newNodeProvider(ctx))(t) +} diff --git a/client/httpapi/apifile.go b/client/httpapi/apifile.go new file mode 100644 index 00000000000..a8eb0de1af2 --- /dev/null +++ b/client/httpapi/apifile.go @@ -0,0 +1,254 @@ +package httpapi + +import ( + "context" + "encoding/json" + "fmt" + "io" + "io/ioutil" + + "github.com/ipfs/go-cid" + "github.com/ipfs/go-ipfs-files" + "github.com/ipfs/interface-go-ipfs-core" +) + +const forwardSeekLimit = 1 << 14 //16k + +func (api *UnixfsAPI) Get(ctx context.Context, p iface.Path) (files.Node, error) { + if p.Mutable() { // use resolved path in case we are dealing with IPNS / MFS + var err error + p, err = api.core().ResolvePath(ctx, p) + if err != nil { + return nil, err + } + } + + var stat struct { + Hash string + Type string + Size int64 // unixfs size + } + err := api.core().request("files/stat", p.String()).Exec(ctx, &stat) + if err != nil { + return nil, err + } + + switch stat.Type { + case "file": + return api.getFile(ctx, p, stat.Size) + case "directory": + return api.getDir(ctx, p, stat.Size) + default: + return nil, fmt.Errorf("unsupported file type '%s'", stat.Type) + } +} + +type apiFile struct { + ctx context.Context + core *HttpApi + size int64 + path iface.Path + + r *Response + at int64 +} + +func (f *apiFile) reset() error { + if f.r != nil { + f.r.Cancel() + } + req := f.core.request("cat", f.path.String()) + if f.at != 0 { + req.Option("offset", f.at) + } + resp, err := req.Send(f.ctx) + if err != nil { + return err + } + if resp.Error != nil { + return resp.Error + } + f.r = resp + return nil +} + +func (f *apiFile) Read(p []byte) (int, error) { + n, err := f.r.Output.Read(p) + if n > 0 { + f.at += int64(n) + } + return n, err +} + +func (f *apiFile) Seek(offset int64, whence int) (int64, error) { + switch whence { + case io.SeekEnd: + offset = f.size + offset + case io.SeekCurrent: + offset = f.at + offset + } + if f.at == offset { //noop + return offset, nil + } + + if f.at < offset && offset-f.at < forwardSeekLimit { //forward skip + r, err := io.CopyN(ioutil.Discard, f.r.Output, offset-f.at) + + f.at += r + return f.at, err + } + f.at = offset + return f.at, f.reset() +} + +func (f *apiFile) Close() error { + if f.r != nil { + return f.r.Cancel() + } + return nil +} + +func (f *apiFile) Size() (int64, error) { + return f.size, nil +} + +func (api *UnixfsAPI) getFile(ctx context.Context, p iface.Path, size int64) (files.Node, error) { + f := &apiFile{ + ctx: ctx, + core: api.core(), + size: size, + path: p, + } + + return f, f.reset() +} + +type apiIter struct { + ctx context.Context + core *UnixfsAPI + + err error + + dec *json.Decoder + curFile files.Node + cur lsLink +} + +func (it *apiIter) Err() error { + return it.err +} + +func (it *apiIter) Name() string { + return it.cur.Name +} + +func (it *apiIter) Next() bool { + if it.ctx.Err() != nil { + it.err = it.ctx.Err() + return false + } + + var out lsOutput + if err := it.dec.Decode(&out); err != nil { + if err != io.EOF { + it.err = err + } + return false + } + + if len(out.Objects) != 1 { + it.err = fmt.Errorf("ls returned more objects than expected (%d)", len(out.Objects)) + return false + } + + if len(out.Objects[0].Links) != 1 { + it.err = fmt.Errorf("ls returned more links than expected (%d)", len(out.Objects[0].Links)) + return false + } + + it.cur = out.Objects[0].Links[0] + c, err := cid.Parse(it.cur.Hash) + if err != nil { + it.err = err + return false + } + + switch it.cur.Type { + case iface.THAMTShard: + fallthrough + case iface.TMetadata: + fallthrough + case iface.TDirectory: + it.curFile, err = it.core.getDir(it.ctx, iface.IpfsPath(c), int64(it.cur.Size)) + if err != nil { + it.err = err + return false + } + case iface.TFile: + it.curFile, err = it.core.getFile(it.ctx, iface.IpfsPath(c), int64(it.cur.Size)) + if err != nil { + it.err = err + return false + } + default: + it.err = fmt.Errorf("file type %d not supported", it.cur.Type) + return false + } + return true +} + +func (it *apiIter) Node() files.Node { + return it.curFile +} + +type apiDir struct { + ctx context.Context + core *UnixfsAPI + size int64 + path iface.Path + + dec *json.Decoder +} + +func (d *apiDir) Close() error { + return nil +} + +func (d *apiDir) Size() (int64, error) { + return d.size, nil +} + +func (d *apiDir) Entries() files.DirIterator { + return &apiIter{ + ctx: d.ctx, + core: d.core, + dec: d.dec, + } +} + +func (api *UnixfsAPI) getDir(ctx context.Context, p iface.Path, size int64) (files.Node, error) { + resp, err := api.core().request("ls", p.String()). + Option("resolve-size", true). + Option("stream", true).Send(ctx) + + if err != nil { + return nil, err + } + if resp.Error != nil { + return nil, resp.Error + } + + d := &apiDir{ + ctx: ctx, + core: api, + size: size, + path: p, + + dec: json.NewDecoder(resp.Output), + } + + return d, nil +} + +var _ files.File = &apiFile{} +var _ files.Directory = &apiDir{} diff --git a/client/httpapi/block.go b/client/httpapi/block.go new file mode 100644 index 00000000000..fd4d9bab971 --- /dev/null +++ b/client/httpapi/block.go @@ -0,0 +1,124 @@ +package httpapi + +import ( + "bytes" + "context" + "errors" + "fmt" + "io" + + "github.com/ipfs/go-cid" + "github.com/ipfs/interface-go-ipfs-core" + caopts "github.com/ipfs/interface-go-ipfs-core/options" + mh "github.com/multiformats/go-multihash" +) + +type BlockAPI HttpApi + +type blockStat struct { + Key string + BSize int `json:"Size"` + + cid cid.Cid +} + +func (s *blockStat) Size() int { + return s.BSize +} + +func (s *blockStat) Path() iface.ResolvedPath { + return iface.IpldPath(s.cid) +} + +func (api *BlockAPI) Put(ctx context.Context, r io.Reader, opts ...caopts.BlockPutOption) (iface.BlockStat, error) { + options, _, err := caopts.BlockPutOptions(opts...) + if err != nil { + return nil, err + } + + mht, ok := mh.Codes[options.MhType] + if !ok { + return nil, fmt.Errorf("unknowm mhType %d", options.MhType) + } + + req := api.core().request("block/put"). + Option("mhtype", mht). + Option("mhlen", options.MhLength). + Option("format", options.Codec). + Option("pin", options.Pin). + FileBody(r) + + var out blockStat + if err := req.Exec(ctx, &out); err != nil { + return nil, err + } + out.cid, err = cid.Parse(out.Key) + if err != nil { + return nil, err + } + + return &out, nil +} + +func (api *BlockAPI) Get(ctx context.Context, p iface.Path) (io.Reader, error) { + resp, err := api.core().request("block/get", p.String()).Send(ctx) + if err != nil { + return nil, err + } + if resp.Error != nil { + return nil, resp.Error + } + + //TODO: make get return ReadCloser to avoid copying + defer resp.Close() + b := new(bytes.Buffer) + if _, err := io.Copy(b, resp.Output); err != nil { + return nil, err + } + + return b, nil +} + +func (api *BlockAPI) Rm(ctx context.Context, p iface.Path, opts ...caopts.BlockRmOption) error { + options, err := caopts.BlockRmOptions(opts...) + if err != nil { + return err + } + + removedBlock := struct { + Hash string `json:",omitempty"` + Error string `json:",omitempty"` + }{} + + req := api.core().request("block/rm"). + Option("force", options.Force). + Arguments(p.String()) + + if err := req.Exec(ctx, &removedBlock); err != nil { + return err + } + + if removedBlock.Error != "" { + return errors.New(removedBlock.Error) + } + + return nil +} + +func (api *BlockAPI) Stat(ctx context.Context, p iface.Path) (iface.BlockStat, error) { + var out blockStat + err := api.core().request("block/stat", p.String()).Exec(ctx, &out) + if err != nil { + return nil, err + } + out.cid, err = cid.Parse(out.Key) + if err != nil { + return nil, err + } + + return &out, nil +} + +func (api *BlockAPI) core() *HttpApi { + return (*HttpApi)(api) +} diff --git a/client/httpapi/dag.go b/client/httpapi/dag.go new file mode 100644 index 00000000000..669b5f893ad --- /dev/null +++ b/client/httpapi/dag.go @@ -0,0 +1,127 @@ +package httpapi + +import ( + "bytes" + "context" + "fmt" + "io/ioutil" + + "github.com/ipfs/go-block-format" + "github.com/ipfs/go-cid" + "github.com/ipfs/go-ipld-format" + "github.com/ipfs/interface-go-ipfs-core" + "github.com/ipfs/interface-go-ipfs-core/options" +) + +type httpNodeAdder HttpApi +type HttpDagServ httpNodeAdder +type pinningHttpNodeAdder httpNodeAdder + +func (api *HttpDagServ) Get(ctx context.Context, c cid.Cid) (format.Node, error) { + r, err := api.core().Block().Get(ctx, iface.IpldPath(c)) + if err != nil { + return nil, err + } + + data, err := ioutil.ReadAll(r) + if err != nil { + return nil, err + } + + blk, err := blocks.NewBlockWithCid(data, c) + if err != nil { + return nil, err + } + + return format.DefaultBlockDecoder.Decode(blk) +} + +func (api *HttpDagServ) GetMany(ctx context.Context, cids []cid.Cid) <-chan *format.NodeOption { + out := make(chan *format.NodeOption) + + for _, c := range cids { + // TODO: Consider limiting concurrency of this somehow + go func(c cid.Cid) { + n, err := api.Get(ctx, c) + + select { + case out <- &format.NodeOption{Node: n, Err: err}: + case <-ctx.Done(): + } + }(c) + } + return out +} + +func (api *httpNodeAdder) add(ctx context.Context, nd format.Node, pin bool) error { + c := nd.Cid() + prefix := c.Prefix() + format := cid.CodecToStr[prefix.Codec] + if prefix.Version == 0 { + format = "v0" + } + + stat, err := api.core().Block().Put(ctx, bytes.NewReader(nd.RawData()), + options.Block.Hash(prefix.MhType, prefix.MhLength), + options.Block.Format(format), + options.Block.Pin(pin)) + if err != nil { + return err + } + if !stat.Path().Cid().Equals(c) { + return fmt.Errorf("cids didn't match - local %s, remote %s", c.String(), stat.Path().Cid().String()) + } + return nil +} + +func (api *httpNodeAdder) addMany(ctx context.Context, nds []format.Node, pin bool) error { + for _, nd := range nds { + // TODO: optimize + if err := api.add(ctx, nd, pin); err != nil { + return err + } + } + return nil +} + +func (api *HttpDagServ) AddMany(ctx context.Context, nds []format.Node) error { + return (*httpNodeAdder)(api).addMany(ctx, nds, false) +} + +func (api *HttpDagServ) Add(ctx context.Context, nd format.Node) error { + return (*httpNodeAdder)(api).add(ctx, nd, false) +} + +func (api *pinningHttpNodeAdder) Add(ctx context.Context, nd format.Node) error { + return (*httpNodeAdder)(api).add(ctx, nd, true) +} + +func (api *pinningHttpNodeAdder) AddMany(ctx context.Context, nds []format.Node) error { + return (*httpNodeAdder)(api).addMany(ctx, nds, true) +} + +func (api *HttpDagServ) Pinning() format.NodeAdder { + return (*pinningHttpNodeAdder)(api) +} + +func (api *HttpDagServ) Remove(ctx context.Context, c cid.Cid) error { + return api.core().Block().Rm(ctx, iface.IpldPath(c)) //TODO: should we force rm? +} + +func (api *HttpDagServ) RemoveMany(ctx context.Context, cids []cid.Cid) error { + for _, c := range cids { + // TODO: optimize + if err := api.Remove(ctx, c); err != nil { + return err + } + } + return nil +} + +func (api *httpNodeAdder) core() *HttpApi { + return (*HttpApi)(api) +} + +func (api *HttpDagServ) core() *HttpApi { + return (*HttpApi)(api) +} diff --git a/client/httpapi/dht.go b/client/httpapi/dht.go new file mode 100644 index 00000000000..dc7dd6beaf6 --- /dev/null +++ b/client/httpapi/dht.go @@ -0,0 +1,114 @@ +package httpapi + +import ( + "context" + "encoding/json" + + "github.com/ipfs/interface-go-ipfs-core" + caopts "github.com/ipfs/interface-go-ipfs-core/options" + "github.com/libp2p/go-libp2p-peer" + "github.com/libp2p/go-libp2p-peerstore" + notif "github.com/libp2p/go-libp2p-routing/notifications" +) + +type DhtAPI HttpApi + +func (api *DhtAPI) FindPeer(ctx context.Context, p peer.ID) (peerstore.PeerInfo, error) { + var out struct { + Type notif.QueryEventType + Responses []peerstore.PeerInfo + } + resp, err := api.core().request("dht/findpeer", p.Pretty()).Send(ctx) + if err != nil { + return peerstore.PeerInfo{}, err + } + if resp.Error != nil { + return peerstore.PeerInfo{}, resp.Error + } + defer resp.Close() + dec := json.NewDecoder(resp.Output) + for { + if err := dec.Decode(&out); err != nil { + return peerstore.PeerInfo{}, err + } + if out.Type == notif.FinalPeer { + return out.Responses[0], nil + } + } +} + +func (api *DhtAPI) FindProviders(ctx context.Context, p iface.Path, opts ...caopts.DhtFindProvidersOption) (<-chan peerstore.PeerInfo, error) { + options, err := caopts.DhtFindProvidersOptions(opts...) + if err != nil { + return nil, err + } + + rp, err := api.core().ResolvePath(ctx, p) + if err != nil { + return nil, err + } + + resp, err := api.core().request("dht/findprovs", rp.Cid().String()). + Option("num-providers", options.NumProviders). + Send(ctx) + if err != nil { + return nil, err + } + if resp.Error != nil { + return nil, resp.Error + } + res := make(chan peerstore.PeerInfo) + + go func() { + defer resp.Close() + defer close(res) + dec := json.NewDecoder(resp.Output) + + for { + var out struct { + Extra string + Type notif.QueryEventType + Responses []peerstore.PeerInfo + } + + if err := dec.Decode(&out); err != nil { + return // todo: handle this somehow + } + if out.Type == notif.QueryError { + return // usually a 'not found' error + // todo: handle other errors + } + if out.Type == notif.Provider { + for _, pi := range out.Responses { + select { + case res <- pi: + case <-ctx.Done(): + return + } + } + } + } + }() + + return res, nil +} + +func (api *DhtAPI) Provide(ctx context.Context, p iface.Path, opts ...caopts.DhtProvideOption) error { + options, err := caopts.DhtProvideOptions(opts...) + if err != nil { + return err + } + + rp, err := api.core().ResolvePath(ctx, p) + if err != nil { + return err + } + + return api.core().request("dht/provide", rp.Cid().String()). + Option("recursive", options.Recursive). + Exec(ctx, nil) +} + +func (api *DhtAPI) core() *HttpApi { + return (*HttpApi)(api) +} diff --git a/client/httpapi/key.go b/client/httpapi/key.go new file mode 100644 index 00000000000..a16c30d8ef8 --- /dev/null +++ b/client/httpapi/key.go @@ -0,0 +1,123 @@ +package httpapi + +import ( + "context" + "errors" + + "github.com/ipfs/interface-go-ipfs-core" + caopts "github.com/ipfs/interface-go-ipfs-core/options" + "github.com/libp2p/go-libp2p-peer" +) + +type KeyAPI HttpApi + +type keyOutput struct { + JName string `json:"Name"` + Id string + + pid peer.ID +} + +func (k *keyOutput) Name() string { + return k.JName +} + +func (k *keyOutput) Path() iface.Path { + p, _ := iface.ParsePath("/ipns/" + k.Id) + return p +} + +func (k *keyOutput) ID() peer.ID { + return k.pid +} + +func (api *KeyAPI) Generate(ctx context.Context, name string, opts ...caopts.KeyGenerateOption) (iface.Key, error) { + options, err := caopts.KeyGenerateOptions(opts...) + if err != nil { + return nil, err + } + + var out keyOutput + err = api.core().request("key/gen", name). + Option("type", options.Algorithm). + Option("size", options.Size). + Exec(ctx, &out) + if err != nil { + return nil, err + } + out.pid, err = peer.IDB58Decode(out.Id) + return &out, err +} + +func (api *KeyAPI) Rename(ctx context.Context, oldName string, newName string, opts ...caopts.KeyRenameOption) (iface.Key, bool, error) { + options, err := caopts.KeyRenameOptions(opts...) + if err != nil { + return nil, false, err + } + + var out struct { + Was string + Now string + Id string + Overwrite bool + } + err = api.core().request("key/rename", oldName, newName). + Option("force", options.Force). + Exec(ctx, &out) + if err != nil { + return nil, false, err + } + + id := &keyOutput{JName: out.Now, Id: out.Id} + id.pid, err = peer.IDB58Decode(id.Id) + return id, out.Overwrite, err +} + +func (api *KeyAPI) List(ctx context.Context) ([]iface.Key, error) { + var out struct{ Keys []*keyOutput } + if err := api.core().request("key/list").Exec(ctx, &out); err != nil { + return nil, err + } + + res := make([]iface.Key, len(out.Keys)) + for i, k := range out.Keys { + var err error + k.pid, err = peer.IDB58Decode(k.Id) + if err != nil { + return nil, err + } + res[i] = k + } + + return res, nil +} + +func (api *KeyAPI) Self(ctx context.Context) (iface.Key, error) { + var id struct{ ID string } + if err := api.core().request("id").Exec(ctx, &id); err != nil { + return nil, err + } + + var err error + out := keyOutput{JName: "self", Id: id.ID} + out.pid, err = peer.IDB58Decode(out.Id) + return &out, err +} + +func (api *KeyAPI) Remove(ctx context.Context, name string) (iface.Key, error) { + var out struct{ Keys []keyOutput } + if err := api.core().request("key/rm", name).Exec(ctx, &out); err != nil { + return nil, err + } + if len(out.Keys) != 1 { + return nil, errors.New("got unexpected number of keys back") + } + + var err error + out.Keys[0].pid, err = peer.IDB58Decode(out.Keys[0].Id) + return &out.Keys[0], err +} + +func (api *KeyAPI) core() *HttpApi { + return (*HttpApi)(api) +} diff --git a/client/httpapi/name.go b/client/httpapi/name.go new file mode 100644 index 00000000000..b848aa81978 --- /dev/null +++ b/client/httpapi/name.go @@ -0,0 +1,139 @@ +package httpapi + +import ( + "context" + "encoding/json" + "fmt" + "io" + + "github.com/ipfs/interface-go-ipfs-core" + caopts "github.com/ipfs/interface-go-ipfs-core/options" + "github.com/ipfs/interface-go-ipfs-core/options/namesys" +) + +type NameAPI HttpApi + +type ipnsEntry struct { + JName string `json:"Name"` + JValue string `json:"Value"` + + path iface.Path +} + +func (e *ipnsEntry) Name() string { + return e.JName +} + +func (e *ipnsEntry) Value() iface.Path { + return e.path +} + +func (api *NameAPI) Publish(ctx context.Context, p iface.Path, opts ...caopts.NamePublishOption) (iface.IpnsEntry, error) { + options, err := caopts.NamePublishOptions(opts...) + if err != nil { + return nil, err + } + + req := api.core().request("name/publish", p.String()). + Option("key", options.Key). + Option("allow-offline", options.AllowOffline). + Option("lifetime", options.ValidTime). + Option("resolve", false) + + if options.TTL != nil { + req.Option("ttl", options.TTL) + } + + var out ipnsEntry + if err := req.Exec(ctx, &out); err != nil { + return nil, err + } + out.path, err = iface.ParsePath(out.JValue) + return &out, err +} + +func (api *NameAPI) Search(ctx context.Context, name string, opts ...caopts.NameResolveOption) (<-chan iface.IpnsResult, error) { + options, err := caopts.NameResolveOptions(opts...) + if err != nil { + return nil, err + } + + ropts := nsopts.ProcessOpts(options.ResolveOpts) + if ropts.Depth != nsopts.DefaultDepthLimit && ropts.Depth != 1 { + return nil, fmt.Errorf("Name.Resolve: depth other than 1 or %d not supported", nsopts.DefaultDepthLimit) + } + + req := api.core().request("name/resolve", name). + Option("nocache", !options.Cache). + Option("recursive", ropts.Depth != 1). + Option("dht-record-count", ropts.DhtRecordCount). + Option("dht-timeout", ropts.DhtTimeout). + Option("stream", true) + resp, err := req.Send(ctx) + if err != nil { + return nil, err + } + if resp.Error != nil { + return nil, resp.Error + } + + res := make(chan iface.IpnsResult) + + go func() { + defer close(res) + defer resp.Close() + + dec := json.NewDecoder(resp.Output) + + for { + var out struct{ Path string } + err := dec.Decode(&out) + if err == io.EOF { + return + } + var ires iface.IpnsResult + if err == nil { + ires.Path, err = iface.ParsePath(out.Path) + } + + select { + case res <- ires: + case <-ctx.Done(): + } + if err != nil { + return + } + } + }() + + return res, nil +} + +func (api *NameAPI) Resolve(ctx context.Context, name string, opts ...caopts.NameResolveOption) (iface.Path, error) { + options, err := caopts.NameResolveOptions(opts...) + if err != nil { + return nil, err + } + + ropts := nsopts.ProcessOpts(options.ResolveOpts) + if ropts.Depth != nsopts.DefaultDepthLimit && ropts.Depth != 1 { + return nil, fmt.Errorf("Name.Resolve: depth other than 1 or %d not supported", nsopts.DefaultDepthLimit) + } + + req := api.core().request("name/resolve", name). + Option("nocache", !options.Cache). + Option("recursive", ropts.Depth != 1). + Option("dht-record-count", ropts.DhtRecordCount). + Option("dht-timeout", ropts.DhtTimeout) + + var out struct{ Path string } + if err := req.Exec(ctx, &out); err != nil { + return nil, err + } + + return iface.ParsePath(out.Path) +} + +func (api *NameAPI) core() *HttpApi { + return (*HttpApi)(api) +} diff --git a/client/httpapi/object.go b/client/httpapi/object.go new file mode 100644 index 00000000000..5a06f74d99a --- /dev/null +++ b/client/httpapi/object.go @@ -0,0 +1,261 @@ +package httpapi + +import ( + "bytes" + "context" + "fmt" + "io" + "io/ioutil" + + "github.com/ipfs/go-cid" + ipld "github.com/ipfs/go-ipld-format" + "github.com/ipfs/go-merkledag" + dag "github.com/ipfs/go-merkledag" + ft "github.com/ipfs/go-unixfs" + "github.com/ipfs/interface-go-ipfs-core" + caopts "github.com/ipfs/interface-go-ipfs-core/options" +) + +type ObjectAPI HttpApi + +type objectOut struct { + Hash string +} + +func (api *ObjectAPI) New(ctx context.Context, opts ...caopts.ObjectNewOption) (ipld.Node, error) { + options, err := caopts.ObjectNewOptions(opts...) + if err != nil { + return nil, err + } + + var n ipld.Node + switch options.Type { + case "empty": + n = new(dag.ProtoNode) + case "unixfs-dir": + n = ft.EmptyDirNode() + default: + return nil, fmt.Errorf("unknown object type: %s", options.Type) + } + + return n, nil +} + +func (api *ObjectAPI) Put(ctx context.Context, r io.Reader, opts ...caopts.ObjectPutOption) (iface.ResolvedPath, error) { + options, err := caopts.ObjectPutOptions(opts...) + if err != nil { + return nil, err + } + + var out objectOut + err = api.core().request("object/put"). + Option("inputenc", options.InputEnc). + Option("datafieldenc", options.DataType). + Option("pin", options.Pin). + FileBody(r). + Exec(ctx, &out) + if err != nil { + return nil, err + } + + c, err := cid.Parse(out.Hash) + if err != nil { + return nil, err + } + + return iface.IpfsPath(c), nil +} + +func (api *ObjectAPI) Get(ctx context.Context, p iface.Path) (ipld.Node, error) { + r, err := api.core().Block().Get(ctx, p) + if err != nil { + return nil, err + } + b, err := ioutil.ReadAll(r) + if err != nil { + return nil, err + } + + return merkledag.DecodeProtobuf(b) +} + +func (api *ObjectAPI) Data(ctx context.Context, p iface.Path) (io.Reader, error) { + resp, err := api.core().request("object/data", p.String()).Send(ctx) + if err != nil { + return nil, err + } + if resp.Error != nil { + return nil, resp.Error + } + + //TODO: make Data return ReadCloser to avoid copying + defer resp.Close() + b := new(bytes.Buffer) + if _, err := io.Copy(b, resp.Output); err != nil { + return nil, err + } + + return b, nil +} + +func (api *ObjectAPI) Links(ctx context.Context, p iface.Path) ([]*ipld.Link, error) { + var out struct { + Links []struct { + Name string + Hash string + Size uint64 + } + } + if err := api.core().request("object/links", p.String()).Exec(ctx, &out); err != nil { + return nil, err + } + res := make([]*ipld.Link, len(out.Links)) + for i, l := range out.Links { + c, err := cid.Parse(l.Hash) + if err != nil { + return nil, err + } + + res[i] = &ipld.Link{ + Cid: c, + Name: l.Name, + Size: l.Size, + } + } + + return res, nil +} + +func (api *ObjectAPI) Stat(ctx context.Context, p iface.Path) (*iface.ObjectStat, error) { + var out struct { + Hash string + NumLinks int + BlockSize int + LinksSize int + DataSize int + CumulativeSize int + } + if err := api.core().request("object/stat", p.String()).Exec(ctx, &out); err != nil { + return nil, err + } + + c, err := cid.Parse(out.Hash) + if err != nil { + return nil, err + } + + return &iface.ObjectStat{ + Cid: c, + NumLinks: out.NumLinks, + BlockSize: out.BlockSize, + LinksSize: out.LinksSize, + DataSize: out.DataSize, + CumulativeSize: out.CumulativeSize, + }, nil +} + +func (api *ObjectAPI) AddLink(ctx context.Context, base iface.Path, name string, child iface.Path, opts ...caopts.ObjectAddLinkOption) (iface.ResolvedPath, error) { + options, err := caopts.ObjectAddLinkOptions(opts...) + if err != nil { + return nil, err + } + + var out objectOut + err = api.core().request("object/patch/add-link", base.String(), name, child.String()). + Option("create", options.Create). + Exec(ctx, &out) + if err != nil { + return nil, err + } + + c, err := cid.Parse(out.Hash) + if err != nil { + return nil, err + } + + return iface.IpfsPath(c), nil +} + +func (api *ObjectAPI) RmLink(ctx context.Context, base iface.Path, link string) (iface.ResolvedPath, error) { + var out objectOut + err := api.core().request("object/patch/rm-link", base.String(), link). + Exec(ctx, &out) + if err != nil { + return nil, err + } + + c, err := cid.Parse(out.Hash) + if err != nil { + return nil, err + } + + return iface.IpfsPath(c), nil +} + +func (api *ObjectAPI) AppendData(ctx context.Context, p iface.Path, r io.Reader) (iface.ResolvedPath, error) { + var out objectOut + err := api.core().request("object/patch/append-data", p.String()). + FileBody(r). + Exec(ctx, &out) + if err != nil { + return nil, err + } + + c, err := cid.Parse(out.Hash) + if err != nil { + return nil, err + } + + return iface.IpfsPath(c), nil +} + +func (api *ObjectAPI) SetData(ctx context.Context, p iface.Path, r io.Reader) (iface.ResolvedPath, error) { + var out objectOut + err := api.core().request("object/patch/set-data", p.String()). + FileBody(r). + Exec(ctx, &out) + if err != nil { + return nil, err + } + + c, err := cid.Parse(out.Hash) + if err != nil { + return nil, err + } + + return iface.IpfsPath(c), nil +} + +type change struct { + Type iface.ChangeType + Path string + Before cid.Cid + After cid.Cid +} + +func (api *ObjectAPI) Diff(ctx context.Context, a iface.Path, b iface.Path) ([]iface.ObjectChange, error) { + var out struct { + Changes []change + } + if err := api.core().request("object/diff", a.String(), b.String()).Exec(ctx, &out); err != nil { + return nil, err + } + res := make([]iface.ObjectChange, len(out.Changes)) + for i, ch := range out.Changes { + res[i] = iface.ObjectChange{ + Type: ch.Type, + Path: ch.Path, + } + if ch.Before != cid.Undef { + res[i].Before = iface.IpfsPath(ch.Before) + } + if ch.After != cid.Undef { + res[i].After = iface.IpfsPath(ch.After) + } + } + return res, nil +} + +func (api *ObjectAPI) core() *HttpApi { + return (*HttpApi)(api) +} diff --git a/client/httpapi/path.go b/client/httpapi/path.go new file mode 100644 index 00000000000..8c819121ad1 --- /dev/null +++ b/client/httpapi/path.go @@ -0,0 +1,52 @@ +package httpapi + +import ( + "context" + + cid "github.com/ipfs/go-cid" + ipld "github.com/ipfs/go-ipld-format" + ipfspath "github.com/ipfs/go-path" + "github.com/ipfs/interface-go-ipfs-core" +) + +func (api *HttpApi) ResolvePath(ctx context.Context, path iface.Path) (iface.ResolvedPath, error) { + var out struct { + Cid cid.Cid + RemPath string + } + + //TODO: this is hacky, fixing https://github.com/ipfs/go-ipfs/issues/5703 would help + + var err error + if path.Namespace() == "ipns" { + if path, err = api.Name().Resolve(ctx, path.String()); err != nil { + return nil, err + } + } + + if err := api.request("dag/resolve", path.String()).Exec(ctx, &out); err != nil { + return nil, err + } + + // TODO: + ipath, err := ipfspath.FromSegments("/"+path.Namespace()+"/", out.Cid.String(), out.RemPath) + if err != nil { + return nil, err + } + + root, err := cid.Parse(ipfspath.Path(path.String()).Segments()[1]) + if err != nil { + return nil, err + } + + return iface.NewResolvedPath(ipath, out.Cid, root, out.RemPath), nil +} + +func (api *HttpApi) ResolveNode(ctx context.Context, p iface.Path) (ipld.Node, error) { + rp, err := api.ResolvePath(ctx, p) + if err != nil { + return nil, err + } + + return api.Dag().Get(ctx, rp.Cid()) +} diff --git a/client/httpapi/pin.go b/client/httpapi/pin.go new file mode 100644 index 00000000000..0111d626a09 --- /dev/null +++ b/client/httpapi/pin.go @@ -0,0 +1,183 @@ +package httpapi + +import ( + "context" + "encoding/json" + + "github.com/ipfs/go-cid" + "github.com/ipfs/interface-go-ipfs-core" + caopts "github.com/ipfs/interface-go-ipfs-core/options" + "github.com/pkg/errors" +) + +type PinAPI HttpApi + +type pinRefKeyObject struct { + Type string +} + +type pinRefKeyList struct { + Keys map[string]pinRefKeyObject +} + +type pin struct { + path iface.ResolvedPath + typ string +} + +func (p *pin) Path() iface.ResolvedPath { + return p.path +} + +func (p *pin) Type() string { + return p.typ +} + +func (api *PinAPI) Add(ctx context.Context, p iface.Path, opts ...caopts.PinAddOption) error { + options, err := caopts.PinAddOptions(opts...) + if err != nil { + return err + } + + return api.core().request("pin/add", p.String()). + Option("recursive", options.Recursive).Exec(ctx, nil) +} + +func (api *PinAPI) Ls(ctx context.Context, opts ...caopts.PinLsOption) ([]iface.Pin, error) { + options, err := caopts.PinLsOptions(opts...) + if err != nil { + return nil, err + } + + var out pinRefKeyList + err = api.core().request("pin/ls"). + Option("type", options.Type).Exec(ctx, &out) + if err != nil { + return nil, err + } + + pins := make([]iface.Pin, 0, len(out.Keys)) + for hash, p := range out.Keys { + c, err := cid.Parse(hash) + if err != nil { + return nil, err + } + pins = append(pins, &pin{typ: p.Type, path: iface.IpldPath(c)}) + } + + return pins, nil +} + +func (api *PinAPI) Rm(ctx context.Context, p iface.Path, opts ...caopts.PinRmOption) error { + options, err := caopts.PinRmOptions(opts...) + if err != nil { + return err + } + + return api.core().request("pin/rm", p.String()). + Option("recursive", options.Recursive). + Exec(ctx, nil) +} + +func (api *PinAPI) Update(ctx context.Context, from iface.Path, to iface.Path, opts ...caopts.PinUpdateOption) error { + options, err := caopts.PinUpdateOptions(opts...) + if err != nil { + return err + } + + return api.core().request("pin/update"). + Option("unpin", options.Unpin).Exec(ctx, nil) +} + +type pinVerifyRes struct { + ok bool + badNodes []iface.BadPinNode +} + +func (r *pinVerifyRes) Ok() bool { + return r.ok +} + +func (r *pinVerifyRes) BadNodes() []iface.BadPinNode { + return r.badNodes +} + +type badNode struct { + err error + cid cid.Cid +} + +func (n *badNode) Path() iface.ResolvedPath { + return iface.IpldPath(n.cid) +} + +func (n *badNode) Err() error { + return n.err +} + +func (api *PinAPI) Verify(ctx context.Context) (<-chan iface.PinStatus, error) { + resp, err := api.core().request("pin/verify").Option("verbose", true).Send(ctx) + if err != nil { + return nil, err + } + if resp.Error != nil { + return nil, resp.Error + } + res := make(chan iface.PinStatus) + + go func() { + defer resp.Close() + defer close(res) + dec := json.NewDecoder(resp.Output) + for { + var out struct { + Cid string + Ok bool + + BadNodes []struct { + Cid string + Err string + } + } + if err := dec.Decode(&out); err != nil { + return // todo: handle non io.EOF somehow + } + + badNodes := make([]iface.BadPinNode, len(out.BadNodes)) + for i, n := range out.BadNodes { + c, err := cid.Decode(n.Cid) + if err != nil { + badNodes[i] = &badNode{ + cid: c, + err: err, + } + continue + } + + if n.Err != "" { + err = errors.New(n.Err) + } + badNodes[i] = &badNode{ + cid: c, + err: err, + } + } + + select { + case res <- &pinVerifyRes{ + ok: out.Ok, + + badNodes: badNodes, + }: + case <-ctx.Done(): + return + } + } + }() + + return res, nil +} + +func (api *PinAPI) core() *HttpApi { + return (*HttpApi)(api) +} diff --git a/client/httpapi/pubsub.go b/client/httpapi/pubsub.go new file mode 100644 index 00000000000..2ac04b53ce3 --- /dev/null +++ b/client/httpapi/pubsub.go @@ -0,0 +1,170 @@ +package httpapi + +import ( + "bytes" + "context" + "encoding/json" + "io" + + "github.com/ipfs/interface-go-ipfs-core" + caopts "github.com/ipfs/interface-go-ipfs-core/options" + "github.com/libp2p/go-libp2p-peer" +) + +type PubsubAPI HttpApi + +func (api *PubsubAPI) Ls(ctx context.Context) ([]string, error) { + var out struct { + Strings []string + } + + if err := api.core().request("pubsub/ls").Exec(ctx, &out); err != nil { + return nil, err + } + + return out.Strings, nil +} + +func (api *PubsubAPI) Peers(ctx context.Context, opts ...caopts.PubSubPeersOption) ([]peer.ID, error) { + options, err := caopts.PubSubPeersOptions(opts...) + if err != nil { + return nil, err + } + + var out struct { + Strings []string + } + + if err := api.core().request("pubsub/peers", options.Topic).Exec(ctx, &out); err != nil { + return nil, err + } + + res := make([]peer.ID, len(out.Strings)) + for i, sid := range out.Strings { + id, err := peer.IDB58Decode(sid) + if err != nil { + return nil, err + } + res[i] = id + } + return res, nil +} + +func (api *PubsubAPI) Publish(ctx context.Context, topic string, message []byte) error { + return api.core().request("pubsub/pub", topic). + FileBody(bytes.NewReader(message)). + Exec(ctx, nil) +} + +type pubsubSub struct { + messages chan pubsubMessage + + done chan struct{} + rcloser func() error +} + +type pubsubMessage struct { + JFrom []byte `json:"from,omitempty"` + JData []byte `json:"data,omitempty"` + JSeqno []byte `json:"seqno,omitempty"` + JTopicIDs []string `json:"topicIDs,omitempty"` + + from peer.ID + err error +} + +func (msg *pubsubMessage) From() peer.ID { + return msg.from +} + +func (msg *pubsubMessage) Data() []byte { + return msg.JData +} + +func (msg *pubsubMessage) Seq() []byte { + return msg.JSeqno +} + +func (msg *pubsubMessage) Topics() []string { + return msg.JTopicIDs +} + +func (s *pubsubSub) Next(ctx context.Context) (iface.PubSubMessage, error) { + select { + case msg, ok := <-s.messages: + if !ok { + return nil, io.EOF + } + if msg.err != nil { + return nil, msg.err + } + var err error + msg.from, err = peer.IDFromBytes(msg.JFrom) + return &msg, err + case <-ctx.Done(): + return nil, ctx.Err() + } +} + +func (api *PubsubAPI) Subscribe(ctx context.Context, topic string, opts ...caopts.PubSubSubscribeOption) (iface.PubSubSubscription, error) { + options, err := caopts.PubSubSubscribeOptions(opts...) + if err != nil { + return nil, err + } + + resp, err := api.core().request("pubsub/sub", topic). + Option("discover", options.Discover).Send(ctx) + + if err != nil { + return nil, err + } + if resp.Error != nil { + return nil, resp.Error + } + + sub := &pubsubSub{ + messages: make(chan pubsubMessage), + done: make(chan struct{}), + rcloser: func() error { + return resp.Cancel() + }, + } + + dec := json.NewDecoder(resp.Output) + + go func() { + defer close(sub.messages) + + for { + var msg pubsubMessage + if err := dec.Decode(&msg); err != nil { + if err == io.EOF { + return + } + msg.err = err + } + + select { + case sub.messages <- msg: + case <-sub.done: + return + case <-ctx.Done(): + return + } + } + }() + + return sub, nil +} + +func (s *pubsubSub) Close() error { + if s.done != nil { + close(s.done) + s.done = nil + } + return s.rcloser() +} + +func (api *PubsubAPI) core() *HttpApi { + return (*HttpApi)(api) +} diff --git a/client/httpapi/request.go b/client/httpapi/request.go new file mode 100644 index 00000000000..58c61ac6780 --- /dev/null +++ b/client/httpapi/request.go @@ -0,0 +1,34 @@ +package httpapi + +import ( + "context" + "io" + "strings" +) + +type Request struct { + ApiBase string + Command string + Args []string + Opts map[string]string + Body io.Reader + Headers map[string]string +} + +func NewRequest(ctx context.Context, url, command string, args ...string) *Request { + if !strings.HasPrefix(url, "http") { + url = "http://" + url + } + + opts := map[string]string{ + "encoding": "json", + "stream-channels": "true", + } + return &Request{ + ApiBase: url + "/api/v0", + Command: command, + Args: args, + Opts: opts, + Headers: make(map[string]string), + } +} diff --git a/client/httpapi/requestbuilder.go b/client/httpapi/requestbuilder.go new file mode 100644 index 00000000000..2ffed7a0ae7 --- /dev/null +++ b/client/httpapi/requestbuilder.go @@ -0,0 +1,114 @@ +package httpapi + +import ( + "bytes" + "context" + "fmt" + "io" + "io/ioutil" + "strconv" + "strings" + + "github.com/ipfs/go-ipfs-files" +) + +// RequestBuilder is an IPFS commands request builder. +type RequestBuilder struct { + command string + args []string + opts map[string]string + headers map[string]string + body io.Reader + + shell *HttpApi +} + +// Arguments adds the arguments to the args. +func (r *RequestBuilder) Arguments(args ...string) *RequestBuilder { + r.args = append(r.args, args...) + return r +} + +// BodyString sets the request body to the given string. +func (r *RequestBuilder) BodyString(body string) *RequestBuilder { + return r.Body(strings.NewReader(body)) +} + +// BodyBytes sets the request body to the given buffer. +func (r *RequestBuilder) BodyBytes(body []byte) *RequestBuilder { + return r.Body(bytes.NewReader(body)) +} + +// Body sets the request body to the given reader. +func (r *RequestBuilder) Body(body io.Reader) *RequestBuilder { + r.body = body + return r +} + +// FileBody sets the request body to the given reader wrapped into multipartreader. +func (r *RequestBuilder) FileBody(body io.Reader) *RequestBuilder { + pr, _ := files.NewReaderPathFile("/dev/stdin", ioutil.NopCloser(body), nil) + d := files.NewMapDirectory(map[string]files.Node{"": pr}) + r.body = files.NewMultiFileReader(d, false) + + return r +} + +// Option sets the given option. +func (r *RequestBuilder) Option(key string, value interface{}) *RequestBuilder { + var s string + switch v := value.(type) { + case bool: + s = strconv.FormatBool(v) + case string: + s = v + case []byte: + s = string(v) + default: + // slow case. + s = fmt.Sprint(value) + } + if r.opts == nil { + r.opts = make(map[string]string, 1) + } + r.opts[key] = s + return r +} + +// Header sets the given header. +func (r *RequestBuilder) Header(name, value string) *RequestBuilder { + if r.headers == nil { + r.headers = make(map[string]string, 1) + } + r.headers[name] = value + return r +} + +// Send sends the request and return the response. +func (r *RequestBuilder) Send(ctx context.Context) (*Response, error) { + r.shell.applyGlobal(r) + + req := NewRequest(ctx, r.shell.url, r.command, r.args...) + req.Opts = r.opts + req.Headers = r.headers + req.Body = r.body + return req.Send(&r.shell.httpcli) +} + +// Exec sends the request a request and decodes the response. +func (r *RequestBuilder) Exec(ctx context.Context, res interface{}) error { + httpRes, err := r.Send(ctx) + if err != nil { + return err + } + + if res == nil { + lateErr := httpRes.Close() + if httpRes.Error != nil { + return httpRes.Error + } + return lateErr + } + + return httpRes.decode(res) +} diff --git a/client/httpapi/response.go b/client/httpapi/response.go new file mode 100644 index 00000000000..339c7365861 --- /dev/null +++ b/client/httpapi/response.go @@ -0,0 +1,168 @@ +package httpapi + +import ( + "encoding/json" + "errors" + "fmt" + "github.com/ipfs/go-ipfs-files" + "io" + "io/ioutil" + "mime" + "net/http" + "net/url" + "os" +) + +type trailerReader struct { + resp *http.Response +} + +func (r *trailerReader) Read(b []byte) (int, error) { + n, err := r.resp.Body.Read(b) + if err != nil { + if e := r.resp.Trailer.Get("X-Stream-Error"); e != "" { + err = errors.New(e) + } + } + return n, err +} + +func (r *trailerReader) Close() error { + return r.resp.Body.Close() +} + +type Response struct { + Output io.ReadCloser + Error *Error +} + +func (r *Response) Close() error { + if r.Output != nil { + + // drain output (response body) + _, err1 := io.Copy(ioutil.Discard, r.Output) + err2 := r.Output.Close() + if err1 != nil { + return err1 + } + return err2 + } + return nil +} + +// Cancel aborts running request (without draining request body) +func (r *Response) Cancel() error { + if r.Output != nil { + return r.Output.Close() + } + + return nil +} + +// Decode reads request body and decodes it as json +func (r *Response) decode(dec interface{}) error { + if r.Error != nil { + return r.Error + } + + err := json.NewDecoder(r.Output).Decode(dec) + err2 := r.Close() + if err != nil { + return err + } + + return err2 +} + +type Error struct { + Command string + Message string + Code int +} + +func (e *Error) Error() string { + var out string + if e.Code != 0 { + out = fmt.Sprintf("%s%d: ", out, e.Code) + } + return out + e.Message +} + +func (r *Request) Send(c *http.Client) (*Response, error) { + url := r.getURL() + req, err := http.NewRequest("POST", url, r.Body) + if err != nil { + return nil, err + } + + // Add any headers that were supplied via the RequestBuilder. + for k, v := range r.Headers { + req.Header.Add(k, v) + } + + if fr, ok := r.Body.(*files.MultiFileReader); ok { + req.Header.Set("Content-Type", "multipart/form-data; boundary="+fr.Boundary()) + req.Header.Set("Content-Disposition", "form-data; name=\"files\"") + } + + resp, err := c.Do(req) + if err != nil { + return nil, err + } + + contentType, _, err := mime.ParseMediaType(resp.Header.Get("Content-Type")) + if err != nil { + return nil, err + } + + nresp := new(Response) + + nresp.Output = &trailerReader{resp} + if resp.StatusCode >= http.StatusBadRequest { + e := &Error{ + Command: r.Command, + } + switch { + case resp.StatusCode == http.StatusNotFound: + e.Message = "command not found" + case contentType == "text/plain": + out, err := ioutil.ReadAll(resp.Body) + if err != nil { + fmt.Fprintf(os.Stderr, "ipfs-shell: warning! response (%d) read error: %s\n", resp.StatusCode, err) + } + e.Message = string(out) + case contentType == "application/json": + if err = json.NewDecoder(resp.Body).Decode(e); err != nil { + fmt.Fprintf(os.Stderr, "ipfs-shell: warning! response (%d) unmarshall error: %s\n", resp.StatusCode, err) + } + default: + fmt.Fprintf(os.Stderr, "ipfs-shell: warning! unhandled response (%d) encoding: %s", resp.StatusCode, contentType) + out, err := ioutil.ReadAll(resp.Body) + if err != nil { + fmt.Fprintf(os.Stderr, "ipfs-shell: response (%d) read error: %s\n", resp.StatusCode, err) + } + e.Message = fmt.Sprintf("unknown ipfs-shell error encoding: %q - %q", contentType, out) + } + nresp.Error = e + nresp.Output = nil + + // drain body and close + io.Copy(ioutil.Discard, resp.Body) + resp.Body.Close() + } + + return nresp, nil +} + +func (r *Request) getURL() string { + + values := make(url.Values) + for _, arg := range r.Args { + values.Add("arg", arg) + } + for k, v := range r.Opts { + values.Add(k, v) + } + + return fmt.Sprintf("%s/%s?%s", r.ApiBase, r.Command, values.Encode()) +} diff --git a/client/httpapi/swarm.go b/client/httpapi/swarm.go new file mode 100644 index 00000000000..0814debee3c --- /dev/null +++ b/client/httpapi/swarm.go @@ -0,0 +1,187 @@ +package httpapi + +import ( + "context" + "time" + + "github.com/ipfs/interface-go-ipfs-core" + inet "github.com/libp2p/go-libp2p-net" + "github.com/libp2p/go-libp2p-peer" + "github.com/libp2p/go-libp2p-peerstore" + "github.com/libp2p/go-libp2p-protocol" + "github.com/multiformats/go-multiaddr" +) + +type SwarmAPI HttpApi + +func (api *SwarmAPI) Connect(ctx context.Context, pi peerstore.PeerInfo) error { + pidma, err := multiaddr.NewComponent("p2p", pi.ID.Pretty()) + if err != nil { + return err + } + + saddrs := make([]string, len(pi.Addrs)) + for i, addr := range pi.Addrs { + saddrs[i] = addr.Encapsulate(pidma).String() + } + + return api.core().request("swarm/connect", saddrs...).Exec(ctx, nil) +} + +func (api *SwarmAPI) Disconnect(ctx context.Context, addr multiaddr.Multiaddr) error { + return api.core().request("swarm/disconnect", addr.String()).Exec(ctx, nil) +} + +type connInfo struct { + addr multiaddr.Multiaddr + peer peer.ID + latency time.Duration + muxer string + direction inet.Direction + streams []protocol.ID +} + +func (c *connInfo) ID() peer.ID { + return c.peer +} + +func (c *connInfo) Address() multiaddr.Multiaddr { + return c.addr +} + +func (c *connInfo) Direction() inet.Direction { + return c.direction +} + +func (c *connInfo) Latency() (time.Duration, error) { + return c.latency, nil +} + +func (c *connInfo) Streams() ([]protocol.ID, error) { + return c.streams, nil +} + +func (api *SwarmAPI) Peers(ctx context.Context) ([]iface.ConnectionInfo, error) { + var resp struct { + Peers []struct { + Addr string + Peer string + Latency time.Duration + Muxer string + Direction inet.Direction + Streams []struct { + Protocol string + } + } + } + + err := api.core().request("swarm/peers"). + Option("streams", true). + Option("latency", true). + Exec(ctx, &resp) + if err != nil { + return nil, err + } + + res := make([]iface.ConnectionInfo, len(resp.Peers)) + for i, conn := range resp.Peers { + out := &connInfo{ + latency: conn.Latency, + muxer: conn.Muxer, + direction: conn.Direction, + } + + out.peer, err = peer.IDB58Decode(conn.Peer) + if err != nil { + return nil, err + } + + out.addr, err = multiaddr.NewMultiaddr(conn.Addr) + if err != nil { + return nil, err + } + + out.streams = make([]protocol.ID, len(conn.Streams)) + for i, p := range conn.Streams { + out.streams[i] = protocol.ID(p.Protocol) + } + + res[i] = out + } + + return res, nil +} + +func (api *SwarmAPI) KnownAddrs(ctx context.Context) (map[peer.ID][]multiaddr.Multiaddr, error) { + var out struct { + Addrs map[string][]string + } + if err := api.core().request("swarm/addrs").Exec(ctx, &out); err != nil { + return nil, err + } + res := map[peer.ID][]multiaddr.Multiaddr{} + for spid, saddrs := range out.Addrs { + addrs := make([]multiaddr.Multiaddr, len(saddrs)) + + for i, addr := range saddrs { + a, err := multiaddr.NewMultiaddr(addr) + if err != nil { + return nil, err + } + addrs[i] = a + } + + pid, err := peer.IDB58Decode(spid) + if err != nil { + return nil, err + } + + res[pid] = addrs + } + + return res, nil +} + +func (api *SwarmAPI) LocalAddrs(ctx context.Context) ([]multiaddr.Multiaddr, error) { + var out struct { + Strings []string + } + + if err := api.core().request("swarm/addrs/local").Exec(ctx, &out); err != nil { + return nil, err + } + + res := make([]multiaddr.Multiaddr, len(out.Strings)) + for i, addr := range out.Strings { + ma, err := multiaddr.NewMultiaddr(addr) + if err != nil { + return nil, err + } + res[i] = ma + } + return res, nil +} + +func (api *SwarmAPI) ListenAddrs(ctx context.Context) ([]multiaddr.Multiaddr, error) { + var out struct { + Strings []string + } + + if err := api.core().request("swarm/addrs/listen").Exec(ctx, &out); err != nil { + return nil, err + } + + res := make([]multiaddr.Multiaddr, len(out.Strings)) + for i, addr := range out.Strings { + ma, err := multiaddr.NewMultiaddr(addr) + if err != nil { + return nil, err + } + res[i] = ma + } + return res, nil +} + +func (api *SwarmAPI) core() *HttpApi { + return (*HttpApi)(api) +} diff --git a/client/httpapi/unixfs.go b/client/httpapi/unixfs.go new file mode 100644 index 00000000000..1f340b6573a --- /dev/null +++ b/client/httpapi/unixfs.go @@ -0,0 +1,227 @@ +package httpapi + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "io" + + "github.com/ipfs/go-cid" + "github.com/ipfs/go-ipfs-files" + "github.com/ipfs/go-ipld-format" + "github.com/ipfs/interface-go-ipfs-core" + caopts "github.com/ipfs/interface-go-ipfs-core/options" + mh "github.com/multiformats/go-multihash" +) + +type addEvent struct { + Name string + Hash string `json:",omitempty"` + Bytes int64 `json:",omitempty"` + Size string `json:",omitempty"` +} + +type UnixfsAPI HttpApi + +func (api *UnixfsAPI) Add(ctx context.Context, f files.Node, opts ...caopts.UnixfsAddOption) (iface.ResolvedPath, error) { + options, _, err := caopts.UnixfsAddOptions(opts...) + if err != nil { + return nil, err + } + + mht, ok := mh.Codes[options.MhType] + if !ok { + return nil, fmt.Errorf("unknowm mhType %d", options.MhType) + } + + req := api.core().request("add"). + Option("hash", mht). + Option("chunker", options.Chunker). + Option("cid-version", options.CidVersion). + Option("fscache", options.FsCache). + Option("hidden", options.Hidden). + Option("inline", options.Inline). + Option("inline-limit", options.InlineLimit). + Option("nocopy", options.NoCopy). + Option("only-hash", options.OnlyHash). + Option("pin", options.Pin). + Option("silent", options.Silent). + Option("stdin-name", options.StdinName). + Option("wrap-with-directory", options.Wrap). + Option("progress", options.Progress) + + if options.RawLeavesSet { + req.Option("raw-leaves", options.RawLeaves) + } + + switch options.Layout { + case caopts.BalancedLayout: + // noop, default + case caopts.TrickleLayout: + req.Option("trickle", true) + } + + switch c := f.(type) { + case files.Directory: + req.Body(files.NewMultiFileReader(c, false)) + case files.File: + d := files.NewMapDirectory(map[string]files.Node{"": c}) // unwrapped on the other side + req.Body(files.NewMultiFileReader(d, false)) + } + + var out addEvent + resp, err := req.Send(ctx) + if err != nil { + return nil, err + } + if resp.Error != nil { + return nil, resp.Error + } + defer resp.Output.Close() + dec := json.NewDecoder(resp.Output) +loop: + for { + var evt addEvent + switch err := dec.Decode(&evt); err { + case nil: + case io.EOF: + break loop + default: + return nil, err + } + out = evt + + if options.Events != nil { + ifevt := &iface.AddEvent{ + Name: out.Name, + Size: out.Size, + Bytes: out.Bytes, + } + + if out.Hash != "" { + c, err := cid.Parse(out.Hash) + if err != nil { + return nil, err + } + + ifevt.Path = iface.IpfsPath(c) + } + + select { + case options.Events <- ifevt: + case <-ctx.Done(): + return nil, ctx.Err() + } + } + } + + c, err := cid.Parse(out.Hash) + if err != nil { + return nil, err + } + + return iface.IpfsPath(c), nil +} + +type lsLink struct { + Name, Hash string + Size uint64 + Type iface.FileType +} + +type lsObject struct { + Hash string + Links []lsLink +} + +type lsOutput struct { + Objects []lsObject +} + +func (api *UnixfsAPI) Ls(ctx context.Context, p iface.Path, opts ...caopts.UnixfsLsOption) (<-chan iface.LsLink, error) { + options, err := caopts.UnixfsLsOptions(opts...) + if err != nil { + return nil, err + } + + resp, err := api.core().request("ls", p.String()). + Option("resolve-type", options.ResolveChildren). + Option("size", options.ResolveChildren). + Option("stream", true). + Send(ctx) + if err != nil { + return nil, err + } + if resp.Error != nil { + return nil, resp.Error + } + + dec := json.NewDecoder(resp.Output) + out := make(chan iface.LsLink) + + go func() { + defer resp.Close() + defer close(out) + + for { + var link lsOutput + if err := dec.Decode(&link); err != nil { + if err == io.EOF { + return + } + select { + case out <- iface.LsLink{Err: err}: + case <-ctx.Done(): + } + return + } + + if len(link.Objects) != 1 { + select { + case out <- iface.LsLink{Err: errors.New("unexpected Objects len")}: + case <-ctx.Done(): + } + return + } + + if len(link.Objects[0].Links) != 1 { + select { + case out <- iface.LsLink{Err: errors.New("unexpected Links len")}: + case <-ctx.Done(): + } + return + } + + l0 := link.Objects[0].Links[0] + + c, err := cid.Decode(l0.Hash) + if err != nil { + select { + case out <- iface.LsLink{Err: err}: + case <-ctx.Done(): + } + return + } + + select { + case out <- iface.LsLink{ + Link: &format.Link{ + Cid: c, + Name: l0.Name, + Size: l0.Size, + }, + Size: l0.Size, + Type: l0.Type, + }: + case <-ctx.Done(): + } + } + }() + + return out, nil +} + +func (api *UnixfsAPI) core() *HttpApi { + return (*HttpApi)(api) +}