-
Notifications
You must be signed in to change notification settings - Fork 1.2k
update/files add interface #306
Changes from all commits
d704742
3e1f563
b083649
f5e6761
dd344e4
903022c
20d880c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,40 +1,105 @@ | ||
'use strict' | ||
|
||
const Importer = require('ipfs-unixfs-engine').importer | ||
const Exporter = require('ipfs-unixfs-engine').exporter | ||
const unixfsEngine = require('ipfs-unixfs-engine') | ||
const Importer = unixfsEngine.Importer | ||
const Exporter = unixfsEngine.Exporter | ||
const UnixFS = require('ipfs-unixfs') | ||
const through = require('through2') | ||
const isStream = require('isstream') | ||
const promisify = require('promisify-es6') | ||
const Duplex = require('stream').Duplex | ||
const multihashes = require('multihashes') | ||
|
||
module.exports = function files (self) { | ||
return { | ||
add: (arr, callback) => { | ||
if (typeof arr === 'function') { | ||
callback = arr | ||
arr = undefined | ||
createAddStream: (callback) => { | ||
const i = new Importer(self._dagS) | ||
const ds = new Duplex({ objectMode: true }) | ||
|
||
ds._read = (n) => {} | ||
ds._write = (file, enc, next) => { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is this passthrough needed? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is a transform, on the way up |
||
i.write(file) | ||
next() | ||
} | ||
if (callback === undefined) { | ||
|
||
ds.end = () => { | ||
i.end() | ||
} | ||
|
||
let counter = 0 | ||
|
||
i.on('data', (file) => { | ||
counter++ | ||
self.object.get(file.multihash, (err, node) => { | ||
if (err) { | ||
return ds.emit('error', err) | ||
} | ||
ds.push({path: file.path, node: node}) | ||
counter-- | ||
}) | ||
}) | ||
|
||
i.on('end', () => { | ||
function canFinish () { | ||
if (counter === 0) { | ||
ds.push(null) | ||
} else { | ||
setTimeout(canFinish, 100) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we need this 100ms timeout loop? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Cause getting the object is an Async operation and the underlying stream from the importer won't care to wait that we have pushed all the objects to the upper stream and emit them all and then the 'end' event. This avoid a racing condition. |
||
} | ||
} | ||
canFinish() | ||
}) | ||
|
||
callback(null, ds) | ||
}, | ||
add: promisify((data, callback) => { | ||
// Buffer input | ||
if (Buffer.isBuffer(data)) { | ||
data = [{ | ||
path: '', | ||
content: data | ||
}] | ||
} | ||
// Readable stream input | ||
if (isStream.isReadable(data)) { | ||
data = [{ | ||
path: '', | ||
content: data | ||
}] | ||
} | ||
if (!callback || typeof callback !== 'function') { | ||
callback = function noop () {} | ||
} | ||
if (arr === undefined) { | ||
return new Importer(self._dagS) | ||
if (!Array.isArray(data)) { | ||
return callback(new Error('"data" must be an array of { path: string, content: Buffer|Readable } or Buffer or Readable')) | ||
} | ||
|
||
const i = new Importer(self._dagS) | ||
const res = [] | ||
|
||
i.on('data', (info) => { | ||
res.push(info) | ||
}) | ||
|
||
i.once('end', () => { | ||
// Transform file info tuples to DAGNodes | ||
i.pipe(through.obj((info, enc, next) => { | ||
const mh = multihashes.toB58String(info.multihash) | ||
self._dagS.get(mh, (err, node) => { | ||
if (err) return callback(err) | ||
var obj = { | ||
path: info.path || mh, | ||
node: node | ||
} | ||
res.push(obj) | ||
next() | ||
}) | ||
}, (done) => { | ||
callback(null, res) | ||
}) | ||
})) | ||
|
||
arr.forEach((tuple) => { | ||
data.forEach((tuple) => { | ||
i.write(tuple) | ||
}) | ||
|
||
i.end() | ||
}, | ||
}), | ||
|
||
cat: (hash, callback) => { | ||
self._dagS.get(hash, (err, fetchedNode) => { | ||
if (err) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,64 +1,19 @@ | ||
/* eslint-env mocha */ | ||
'use strict' | ||
|
||
const bl = require('bl') | ||
const expect = require('chai').expect | ||
const Readable = require('stream').Readable | ||
const bs58 = require('bs58') | ||
|
||
const test = require('interface-ipfs-core') | ||
const IPFS = require('../../../src/core') | ||
|
||
describe('files', () => { | ||
let ipfs | ||
|
||
before((done) => { | ||
ipfs = new IPFS(require('../../utils/repo-path')) | ||
ipfs.load(done) | ||
}) | ||
|
||
it('add', (done) => { | ||
const buffered = new Buffer('some data') | ||
const rs = new Readable() | ||
rs.push(buffered) | ||
rs.push(null) | ||
const arr = [] | ||
const filePair = {path: 'data.txt', stream: rs} | ||
arr.push(filePair) | ||
ipfs.files.add(arr, (err, res) => { | ||
expect(err).to.not.exist | ||
expect(res[0].path).to.equal('data.txt') | ||
expect(res[0].size).to.equal(17) | ||
expect(bs58.encode(res[0].multihash).toString()).to.equal('QmVv4Wz46JaZJeH5PMV4LGbRiiMKEmszPYY3g6fjGnVXBS') | ||
done() | ||
const common = { | ||
setup: function (cb) { | ||
const ipfs = new IPFS(require('../../utils/repo-path')) | ||
ipfs.load(() => { | ||
cb(null, ipfs) | ||
}) | ||
}) | ||
}, | ||
teardown: function (cb) { | ||
cb() | ||
} | ||
} | ||
|
||
it('cat', (done) => { | ||
const hash = 'QmT78zSuBmuS4z925WZfrqQ1qHaJ56DQaTfyMUF7F8ff5o' | ||
ipfs.files.cat(hash, (err, res) => { | ||
expect(err).to.not.exist | ||
res.on('data', (data) => { | ||
data.stream.pipe(bl((err, bldata) => { | ||
expect(err).to.not.exist | ||
expect(bldata.toString()).to.equal('hello world\n') | ||
done() | ||
})) | ||
}) | ||
}) | ||
}) | ||
|
||
it('get', (done) => { | ||
// TODO create non-trival get test | ||
const hash = 'QmT78zSuBmuS4z925WZfrqQ1qHaJ56DQaTfyMUF7F8ff5o' | ||
ipfs.files.get(hash, (err, res) => { | ||
expect(err).to.not.exist | ||
res.on('data', (data) => { | ||
data.stream.pipe(bl((err, bldata) => { | ||
expect(err).to.not.exist | ||
expect(bldata.toString()).to.equal('hello world\n') | ||
done() | ||
})) | ||
}) | ||
}) | ||
}) | ||
}) | ||
test.files(common) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,7 +9,7 @@ module.exports = (httpAPI) => { | |
describe('api', () => { | ||
let api | ||
|
||
it('api', () => { | ||
before(() => { | ||
api = httpAPI.server.select('API') | ||
}) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. no need for done |
||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will be changed to make res the stream instead, once we have the interface-ipfs-core for cat merged too
//cc @nginnever
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, changes to make cat return just a stream are in PR 256