Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix panic in dagreader with raw nodes #3403

Merged
merged 5 commits into from
Nov 22, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/coreunix/cat.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
uio "github.com/ipfs/go-ipfs/unixfs/io"
)

func Cat(ctx context.Context, n *core.IpfsNode, pstr string) (*uio.DagReader, error) {
func Cat(ctx context.Context, n *core.IpfsNode, pstr string) (uio.DagReader, error) {
dagNode, err := core.Resolve(ctx, n.Namesys, n.Resolver, path.Path(pstr))
if err != nil {
return nil, err
Expand Down
10 changes: 10 additions & 0 deletions test/sharness/t0110-gateway.sh
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,16 @@ for cmd in "add" "block/put" "bootstrap" "config" "dht" "diag" "dns" "get" "id"
done
'

test_expect_success "create raw-leaves node" '
echo "This is RAW!" > rfile &&
echo "This is RAW!" | ipfs add --raw-leaves -q > rhash
'

test_expect_success "try fetching it from gateway" '
curl http://127.0.0.1:$port/ipfs/$(cat rhash) > ffile &&
test_cmp rfile ffile
'

test_kill_ipfs_daemon

test_done
2 changes: 1 addition & 1 deletion unixfs/archive/tar/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (w *Writer) writeFile(nd *mdag.ProtoNode, pb *upb.Data, fpath string) error
return err
}

dagr := uio.NewDataFileReader(w.ctx, nd, pb, w.Dag)
dagr := uio.NewPBFileReader(w.ctx, nd, pb, w.Dag)
if _, err := dagr.WriteTo(w.TarW); err != nil {
return err
}
Expand Down
41 changes: 41 additions & 0 deletions unixfs/io/bufdagreader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package io

import (
"bytes"
"context"
"io"
)

type bufDagReader struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we go ahead and replace all of the NewRNSC stuff with this now? It seems weird that they both serve roughly the same purpose.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the interface of DagReader supersedes RSC and bufDagReader is nop-closers on bytes.

*bytes.Reader
}

func NewBufDagReader(b []byte) *bufDagReader {
return &bufDagReader{bytes.NewReader(b)}
}

var _ DagReader = (*bufDagReader)(nil)

func (*bufDagReader) Close() error {
return nil
}

func (rd *bufDagReader) CtxReadFull(ctx context.Context, b []byte) (int, error) {
return rd.Read(b)
}

func (rd *bufDagReader) Offset() int64 {
of, err := rd.Seek(0, io.SeekCurrent)
if err != nil {
panic("this should never happen " + err.Error())
}
return of
}

func (rd *bufDagReader) Size() uint64 {
s := rd.Reader.Size()
if s < 0 {
panic("size smaller than 0 (impossible!!)")
}
return uint64(s)
}
259 changes: 8 additions & 251 deletions unixfs/io/dagreader.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
package io

import (
"bytes"
"context"
"errors"
"fmt"
"io"
"os"

mdag "github.com/ipfs/go-ipfs/merkledag"
ft "github.com/ipfs/go-ipfs/unixfs"
Expand All @@ -20,34 +18,11 @@ var ErrIsDir = errors.New("this dag node is a directory")

var ErrCantReadSymlinks = errors.New("cannot currently read symlinks")

// DagReader provides a way to easily read the data contained in a dag.
type DagReader struct {
serv mdag.DAGService

// the node being read
node *mdag.ProtoNode

// cached protobuf structure from node.Data
pbdata *ftpb.Data

// the current data buffer to be read from
// will either be a bytes.Reader or a child DagReader
buf ReadSeekCloser

// NodeGetters for each of 'nodes' child links
promises []mdag.NodeGetter

// the index of the child link currently being read from
linkPosition int

// current offset for the read head within the 'file'
offset int64

// Our context
ctx context.Context

// context cancel for children
cancel func()
type DagReader interface {
ReadSeekCloser
Size() uint64
CtxReadFull(context.Context, []byte) (int, error)
Offset() int64
}

type ReadSeekCloser interface {
Expand All @@ -59,12 +34,10 @@ type ReadSeekCloser interface {

// NewDagReader creates a new reader object that reads the data represented by
// the given node, using the passed in DAGService for data retreival
func NewDagReader(ctx context.Context, n node.Node, serv mdag.DAGService) (*DagReader, error) {
func NewDagReader(ctx context.Context, n node.Node, serv mdag.DAGService) (DagReader, error) {
switch n := n.(type) {
case *mdag.RawNode:
return &DagReader{
buf: NewRSNCFromBytes(n.RawData()),
}, nil
return NewBufDagReader(n.RawData()), nil
case *mdag.ProtoNode:
pb := new(ftpb.Data)
if err := proto.Unmarshal(n.Data(), pb); err != nil {
Expand All @@ -76,7 +49,7 @@ func NewDagReader(ctx context.Context, n node.Node, serv mdag.DAGService) (*DagR
// Dont allow reading directories
return nil, ErrIsDir
case ftpb.Data_File, ftpb.Data_Raw:
return NewDataFileReader(ctx, n, pb, serv), nil
return NewPBFileReader(ctx, n, pb, serv), nil
case ftpb.Data_Metadata:
if len(n.Links()) == 0 {
return nil, errors.New("incorrectly formatted metadata object")
Expand All @@ -100,219 +73,3 @@ func NewDagReader(ctx context.Context, n node.Node, serv mdag.DAGService) (*DagR
return nil, fmt.Errorf("unrecognized node type")
}
}

func NewDataFileReader(ctx context.Context, n *mdag.ProtoNode, pb *ftpb.Data, serv mdag.DAGService) *DagReader {
fctx, cancel := context.WithCancel(ctx)
promises := mdag.GetDAG(fctx, serv, n)
return &DagReader{
node: n,
serv: serv,
buf: NewRSNCFromBytes(pb.GetData()),
promises: promises,
ctx: fctx,
cancel: cancel,
pbdata: pb,
}
}

// precalcNextBuf follows the next link in line and loads it from the
// DAGService, setting the next buffer to read from
func (dr *DagReader) precalcNextBuf(ctx context.Context) error {
dr.buf.Close() // Just to make sure
if dr.linkPosition >= len(dr.promises) {
return io.EOF
}

nxt, err := dr.promises[dr.linkPosition].Get(ctx)
if err != nil {
return err
}
dr.linkPosition++

switch nxt := nxt.(type) {
case *mdag.ProtoNode:
pb := new(ftpb.Data)
err = proto.Unmarshal(nxt.Data(), pb)
if err != nil {
return fmt.Errorf("incorrectly formatted protobuf: %s", err)
}

switch pb.GetType() {
case ftpb.Data_Directory:
// A directory should not exist within a file
return ft.ErrInvalidDirLocation
case ftpb.Data_File:
dr.buf = NewDataFileReader(dr.ctx, nxt, pb, dr.serv)
return nil
case ftpb.Data_Raw:
dr.buf = NewRSNCFromBytes(pb.GetData())
return nil
case ftpb.Data_Metadata:
return errors.New("shouldnt have had metadata object inside file")
case ftpb.Data_Symlink:
return errors.New("shouldnt have had symlink inside file")
default:
return ft.ErrUnrecognizedType
}
case *mdag.RawNode:
dr.buf = NewRSNCFromBytes(nxt.RawData())
return nil
default:
return errors.New("unrecognized node type in DagReader")
}
}

// Size return the total length of the data from the DAG structured file.
func (dr *DagReader) Size() uint64 {
return dr.pbdata.GetFilesize()
}

// Read reads data from the DAG structured file
func (dr *DagReader) Read(b []byte) (int, error) {
return dr.CtxReadFull(dr.ctx, b)
}

// CtxReadFull reads data from the DAG structured file
func (dr *DagReader) CtxReadFull(ctx context.Context, b []byte) (int, error) {
// If no cached buffer, load one
total := 0
for {
// Attempt to fill bytes from cached buffer
n, err := dr.buf.Read(b[total:])
total += n
dr.offset += int64(n)
if err != nil {
// EOF is expected
if err != io.EOF {
return total, err
}
}

// If weve read enough bytes, return
if total == len(b) {
return total, nil
}

// Otherwise, load up the next block
err = dr.precalcNextBuf(ctx)
if err != nil {
return total, err
}
}
}

func (dr *DagReader) WriteTo(w io.Writer) (int64, error) {
// If no cached buffer, load one
total := int64(0)
for {
// Attempt to write bytes from cached buffer
n, err := dr.buf.WriteTo(w)
total += n
dr.offset += n
if err != nil {
if err != io.EOF {
return total, err
}
}

// Otherwise, load up the next block
err = dr.precalcNextBuf(dr.ctx)
if err != nil {
if err == io.EOF {
return total, nil
}
return total, err
}
}
}

func (dr *DagReader) Close() error {
dr.cancel()
return nil
}

func (dr *DagReader) Offset() int64 {
return dr.offset
}

// Seek implements io.Seeker, and will seek to a given offset in the file
// interface matches standard unix seek
// TODO: check if we can do relative seeks, to reduce the amount of dagreader
// recreations that need to happen.
func (dr *DagReader) Seek(offset int64, whence int) (int64, error) {
switch whence {
case os.SEEK_SET:
if offset < 0 {
return -1, errors.New("Invalid offset")
}

// Grab cached protobuf object (solely to make code look cleaner)
pb := dr.pbdata

// left represents the number of bytes remaining to seek to (from beginning)
left := offset
if int64(len(pb.Data)) >= offset {
// Close current buf to close potential child dagreader
dr.buf.Close()
dr.buf = NewRSNCFromBytes(pb.GetData()[offset:])

// start reading links from the beginning
dr.linkPosition = 0
dr.offset = offset
return offset, nil
} else {
// skip past root block data
left -= int64(len(pb.Data))
}

// iterate through links and find where we need to be
for i := 0; i < len(pb.Blocksizes); i++ {
if pb.Blocksizes[i] > uint64(left) {
dr.linkPosition = i
break
} else {
left -= int64(pb.Blocksizes[i])
}
}

// start sub-block request
err := dr.precalcNextBuf(dr.ctx)
if err != nil {
return 0, err
}

// set proper offset within child readseeker
n, err := dr.buf.Seek(left, os.SEEK_SET)
if err != nil {
return -1, err
}

// sanity
left -= n
if left != 0 {
return -1, errors.New("failed to seek properly")
}
dr.offset = offset
return offset, nil
case os.SEEK_CUR:
// TODO: be smarter here
noffset := dr.offset + offset
return dr.Seek(noffset, os.SEEK_SET)
case os.SEEK_END:
noffset := int64(dr.pbdata.GetFilesize()) - offset
return dr.Seek(noffset, os.SEEK_SET)
default:
return 0, errors.New("invalid whence")
}
}

// readSeekNopCloser wraps a bytes.Reader to implement ReadSeekCloser
type readSeekNopCloser struct {
*bytes.Reader
}

func NewRSNCFromBytes(b []byte) ReadSeekCloser {
return &readSeekNopCloser{bytes.NewReader(b)}
}

func (r *readSeekNopCloser) Close() error { return nil }
2 changes: 1 addition & 1 deletion unixfs/io/dagreader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func TestReaderSzie(t *testing.T) {
}
}

func readByte(t testing.TB, reader *DagReader) byte {
func readByte(t testing.TB, reader DagReader) byte {
out := make([]byte, 1)
c, err := reader.Read(out)

Expand Down
Loading