This repository has been archived by the owner on Jun 27, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 53
/
dagreader.go
498 lines (418 loc) · 15.2 KB
/
dagreader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
package io
import (
"bytes"
"context"
"errors"
"io"
ipld "github.com/ipfs/go-ipld-format"
mdag "github.com/ipfs/go-merkledag"
unixfs "github.com/ipfs/go-unixfs"
)
// Common errors
var (
// Deprecated: use github.com/ipfs/boxo/ipld/unixfs/io.ErrIsDir
ErrIsDir = errors.New("this dag node is a directory")
// Deprecated: use github.com/ipfs/boxo/ipld/unixfs/io.ErrCantReadSymlinks
ErrCantReadSymlinks = errors.New("cannot currently read symlinks")
// Deprecated: use github.com/ipfs/boxo/ipld/unixfs/io.ErrUnkownNodeType
ErrUnkownNodeType = errors.New("unknown node type")
// Deprecated: use github.com/ipfs/boxo/ipld/unixfs/io.ErrSeekNotSupported
ErrSeekNotSupported = errors.New("file does not support seeking")
)
// TODO: Rename the `DagReader` interface, this doesn't read *any* DAG, just
// DAGs with UnixFS node (and it *belongs* to the `unixfs` package). Some
// alternatives: `FileReader`, `UnixFSFileReader`, `UnixFSReader`.
// A DagReader provides read-only read and seek acess to a unixfs file.
// Different implementations of readers are used for the different
// types of unixfs/protobuf-encoded nodes.
//
// Deprecated: use github.com/ipfs/boxo/ipld/unixfs/io.DagReader
type DagReader interface {
ReadSeekCloser
Size() uint64
CtxReadFull(context.Context, []byte) (int, error)
}
// A ReadSeekCloser implements interfaces to read, copy, seek and close.
//
// Deprecated: use github.com/ipfs/boxo/ipld/unixfs/io.ReadSeekCloser
type ReadSeekCloser interface {
io.Reader
io.Seeker
io.Closer
io.WriterTo
}
// NewDagReader creates a new reader object that reads the data represented by
// the given node, using the passed in DAGService for data retrieval.
//
// Deprecated: use github.com/ipfs/boxo/ipld/unixfs/io.NewDagReader
func NewDagReader(ctx context.Context, n ipld.Node, serv ipld.NodeGetter) (DagReader, error) {
var size uint64
switch n := n.(type) {
case *mdag.RawNode:
size = uint64(len(n.RawData()))
case *mdag.ProtoNode:
fsNode, err := unixfs.FSNodeFromBytes(n.Data())
if err != nil {
return nil, err
}
switch fsNode.Type() {
case unixfs.TFile, unixfs.TRaw:
size = fsNode.FileSize()
case unixfs.TDirectory, unixfs.THAMTShard:
// Dont allow reading directories
return nil, ErrIsDir
case unixfs.TMetadata:
if len(n.Links()) == 0 {
return nil, errors.New("incorrectly formatted metadata object")
}
child, err := n.Links()[0].GetNode(ctx, serv)
if err != nil {
return nil, err
}
childpb, ok := child.(*mdag.ProtoNode)
if !ok {
return nil, mdag.ErrNotProtobuf
}
return NewDagReader(ctx, childpb, serv)
case unixfs.TSymlink:
return nil, ErrCantReadSymlinks
default:
return nil, unixfs.ErrUnrecognizedType
}
default:
return nil, ErrUnkownNodeType
}
ctxWithCancel, cancel := context.WithCancel(ctx)
return &dagReader{
ctx: ctxWithCancel,
cancel: cancel,
serv: serv,
size: size,
rootNode: n,
dagWalker: ipld.NewWalker(ctxWithCancel, ipld.NewNavigableIPLDNode(n, serv)),
}, nil
}
// dagReader provides a way to easily read the data contained in a dag.
type dagReader struct {
// Structure to perform the DAG iteration and search, the reader
// just needs to add logic to the `Visitor` callback passed to
// `Iterate` and `Seek`.
dagWalker *ipld.Walker
// Buffer with the data extracted from the current node being visited.
// To avoid revisiting a node to complete a (potential) partial read
// (or read after seek) the node's data is fully extracted in a single
// `readNodeDataBuffer` operation.
currentNodeData *bytes.Reader
// Implements the `Size()` API.
size uint64
// Current offset for the read head within the DAG file.
offset int64
// Root node of the DAG, stored to re-create the `dagWalker` (effectively
// re-setting the position of the reader, used during `Seek`).
rootNode ipld.Node
// Context passed to the `dagWalker`, the `cancel` function is used to
// cancel read operations (cancelling requested child node promises,
// see `ipld.NavigableIPLDNode.FetchChild` for details).
ctx context.Context
cancel func()
// Passed to the `dagWalker` that will use it to request nodes.
// TODO: Revisit name.
serv ipld.NodeGetter
}
// Size returns the total size of the data from the DAG structured file.
func (dr *dagReader) Size() uint64 {
return dr.size
}
// Read implements the `io.Reader` interface through the `CtxReadFull`
// method using the DAG reader's internal context.
func (dr *dagReader) Read(b []byte) (int, error) {
return dr.CtxReadFull(dr.ctx, b)
}
// CtxReadFull reads data from the DAG structured file. It always
// attempts a full read of the DAG until the `out` buffer is full.
// It uses the `Walker` structure to iterate the file DAG and read
// every node's data into the `out` buffer.
func (dr *dagReader) CtxReadFull(ctx context.Context, out []byte) (n int, err error) {
// Set the `dagWalker`'s context to the `ctx` argument, it will be used
// to fetch the child node promises (see
// `ipld.NavigableIPLDNode.FetchChild` for details).
dr.dagWalker.SetContext(ctx)
// If there was a partially read buffer from the last visited
// node read it before visiting a new one.
if dr.currentNodeData != nil {
// TODO: Move this check inside `readNodeDataBuffer`?
n = dr.readNodeDataBuffer(out)
if n == len(out) {
return n, nil
// Output buffer full, no need to traverse the DAG.
}
}
// Iterate the DAG calling the passed `Visitor` function on every node
// to read its data into the `out` buffer, stop if there is an error or
// if the entire DAG is traversed (`EndOfDag`).
err = dr.dagWalker.Iterate(func(visitedNode ipld.NavigableNode) error {
node := ipld.ExtractIPLDNode(visitedNode)
// Skip internal nodes, they shouldn't have any file data
// (see the `balanced` package for more details).
if len(node.Links()) > 0 {
return nil
}
err = dr.saveNodeData(node)
if err != nil {
return err
}
// Save the leaf node file data in a buffer in case it is only
// partially read now and future `CtxReadFull` calls reclaim the
// rest (as each node is visited only once during `Iterate`).
//
// TODO: We could check if the entire node's data can fit in the
// remaining `out` buffer free space to skip this intermediary step.
n += dr.readNodeDataBuffer(out[n:])
if n == len(out) {
// Output buffer full, no need to keep traversing the DAG,
// signal the `Walker` to pause the iteration.
dr.dagWalker.Pause()
}
return nil
})
if err == ipld.EndOfDag {
return n, io.EOF
// Reached the end of the (DAG) file, no more data to read.
} else if err != nil {
return n, err
// Pass along any other errors from the `Visitor`.
}
return n, nil
}
// Save the UnixFS `node`'s data into the internal `currentNodeData` buffer to
// later move it to the output buffer (`Read`) or seek into it (`Seek`).
func (dr *dagReader) saveNodeData(node ipld.Node) error {
extractedNodeData, err := unixfs.ReadUnixFSNodeData(node)
if err != nil {
return err
}
dr.currentNodeData = bytes.NewReader(extractedNodeData)
return nil
}
// Read the `currentNodeData` buffer into `out`. This function can't have
// any errors as it's always reading from a `bytes.Reader` and asking only
// the available data in it.
func (dr *dagReader) readNodeDataBuffer(out []byte) int {
n, _ := dr.currentNodeData.Read(out)
// Ignore the error as the EOF may not be returned in the first
// `Read` call, explicitly ask for an empty buffer below to check
// if we've reached the end.
if dr.currentNodeData.Len() == 0 {
dr.currentNodeData = nil
// Signal that the buffer was consumed (for later `Read` calls).
// This shouldn't return an EOF error as it's just the end of a
// single node's data, not the entire DAG.
}
dr.offset += int64(n)
// TODO: Should `offset` be incremented here or in the calling function?
// (Doing it here saves LoC but may be confusing as it's more hidden).
return n
}
// Similar to `readNodeDataBuffer` but it writes the contents to
// an `io.Writer` argument.
//
// TODO: Check what part of the logic between the two functions
// can be extracted away.
func (dr *dagReader) writeNodeDataBuffer(w io.Writer) (int64, error) {
n, err := dr.currentNodeData.WriteTo(w)
if err != nil {
return n, err
}
if dr.currentNodeData.Len() == 0 {
dr.currentNodeData = nil
// Signal that the buffer was consumed (for later `Read` calls).
// This shouldn't return an EOF error as it's just the end of a
// single node's data, not the entire DAG.
}
dr.offset += int64(n)
return n, nil
}
// WriteTo writes to the given writer.
// This follows the `bytes.Reader.WriteTo` implementation
// where it starts from the internal index that may have
// been modified by other `Read` calls.
//
// TODO: This implementation is very similar to `CtxReadFull`,
// the common parts should be abstracted away.
func (dr *dagReader) WriteTo(w io.Writer) (n int64, err error) {
// Use the internal reader's context to fetch the child node promises
// (see `ipld.NavigableIPLDNode.FetchChild` for details).
dr.dagWalker.SetContext(dr.ctx)
// If there was a partially read buffer from the last visited
// node read it before visiting a new one.
if dr.currentNodeData != nil {
n, err = dr.writeNodeDataBuffer(w)
if err != nil {
return n, err
}
}
// Iterate the DAG calling the passed `Visitor` function on every node
// to read its data into the `out` buffer, stop if there is an error or
// if the entire DAG is traversed (`EndOfDag`).
err = dr.dagWalker.Iterate(func(visitedNode ipld.NavigableNode) error {
node := ipld.ExtractIPLDNode(visitedNode)
// Skip internal nodes, they shouldn't have any file data
// (see the `balanced` package for more details).
if len(node.Links()) > 0 {
return nil
}
err = dr.saveNodeData(node)
if err != nil {
return err
}
// Save the leaf node file data in a buffer in case it is only
// partially read now and future `CtxReadFull` calls reclaim the
// rest (as each node is visited only once during `Iterate`).
written, err := dr.writeNodeDataBuffer(w)
n += written
if err != nil {
return err
}
return nil
})
if err == ipld.EndOfDag {
return n, nil
}
return n, err
}
// Close the reader (cancelling fetch node operations requested with
// the internal context, that is, `Read` calls but not `CtxReadFull`
// with user-supplied contexts).
func (dr *dagReader) Close() error {
dr.cancel()
return nil
}
// Seek implements `io.Seeker` seeking to a given offset in the DAG file,
// it matches the standard unix `seek`. It moves the position of the internal
// `dagWalker` and may also leave a `currentNodeData` buffer loaded in case
// the seek is performed to the middle of the data in a node.
//
// TODO: Support seeking from the current position (relative seek)
// through the `dagWalker` in `io.SeekCurrent`.
func (dr *dagReader) Seek(offset int64, whence int) (int64, error) {
switch whence {
case io.SeekStart:
if offset < 0 {
return dr.offset, errors.New("invalid offset")
}
if offset == dr.offset {
return offset, nil
// Already at the requested `offset`, nothing to do.
}
left := offset
// Amount left to seek.
// Seek from the beginning of the DAG.
dr.resetPosition()
// Shortcut seeking to the beginning, we're already there.
if offset == 0 {
return 0, nil
}
// Use the internal reader's context to fetch the child node promises
// (see `ipld.NavigableIPLDNode.FetchChild` for details).
dr.dagWalker.SetContext(dr.ctx)
// TODO: Performance: we could adjust here `preloadSize` of
// `ipld.NavigableIPLDNode` also, when seeking we only want
// to fetch one child at a time.
// Seek the DAG by calling the provided `Visitor` function on every
// node the `dagWalker` descends to while searching which can be
// either an internal or leaf node. In the internal node case, check
// the child node sizes and set the corresponding child index to go
// down to next. In the leaf case (last visit of the search), if there
// is still an amount `left` to seek do it inside the node's data
// saved in the `currentNodeData` buffer, leaving it ready for a `Read`
// call.
err := dr.dagWalker.Seek(func(visitedNode ipld.NavigableNode) error {
node := ipld.ExtractIPLDNode(visitedNode)
if len(node.Links()) > 0 {
// Internal node, should be a `mdag.ProtoNode` containing a
// `unixfs.FSNode` (see the `balanced` package for more details).
fsNode, err := unixfs.ExtractFSNode(node)
if err != nil {
return err
}
// If there aren't enough size hints don't seek
// (see the `io.EOF` handling error comment below).
if fsNode.NumChildren() != len(node.Links()) {
return ErrSeekNotSupported
}
// Internal nodes have no data, so just iterate through the
// sizes of its children (advancing the child index of the
// `dagWalker`) to find where we need to go down to next in
// the search.
for {
childSize := fsNode.BlockSize(int(dr.dagWalker.ActiveChildIndex()))
if childSize > uint64(left) {
// This child's data contains the position requested
// in `offset`, go down this child.
return nil
}
// Else, skip this child.
left -= int64(childSize)
err := dr.dagWalker.NextChild()
if err == ipld.ErrNextNoChild {
// No more child nodes available, nothing to do,
// the `Seek` will stop on its own.
return nil
} else if err != nil {
return err
// Pass along any other errors (that may in future
// implementations be returned by `Next`) to stop
// the search.
}
}
} else {
// Leaf node, seek inside its data.
err := dr.saveNodeData(node)
if err != nil {
return err
}
_, err = dr.currentNodeData.Seek(left, io.SeekStart)
if err != nil {
return err
}
// The corner case of a DAG consisting only of a single (leaf)
// node should make no difference here. In that case, where the
// node doesn't have a parent UnixFS node with size hints, this
// implementation would allow this `Seek` to be called with an
// argument larger than the buffer size which normally wouldn't
// happen (because we would skip the node based on the size
// hint) but that would just mean that a future `CtxReadFull`
// call would read no data from the `currentNodeData` buffer.
// TODO: Re-check this reasoning.
return nil
// In the leaf node case the search will stop here.
}
})
if err != nil {
return 0, err
}
dr.offset = offset
return dr.offset, nil
case io.SeekCurrent:
if offset == 0 {
return dr.offset, nil
}
return dr.Seek(dr.offset+offset, io.SeekStart)
// TODO: Performance. This can be improved supporting relative
// searches in the `Walker` (see `Walker.Seek`).
case io.SeekEnd:
return dr.Seek(int64(dr.Size())+offset, io.SeekStart)
default:
return 0, errors.New("invalid whence")
}
}
// Reset the reader position by resetting the `dagWalker` and discarding
// any partially used node's data in the `currentNodeData` buffer, used
// in the `SeekStart` case.
func (dr *dagReader) resetPosition() {
dr.currentNodeData = nil
dr.offset = 0
dr.dagWalker = ipld.NewWalker(dr.ctx, ipld.NewNavigableIPLDNode(dr.rootNode, dr.serv))
// TODO: This could be avoided (along with storing the `dr.rootNode` and
// `dr.serv` just for this call) if `Reset` is supported in the `Walker`.
}