Skip to content
This repository has been archived by the owner on Feb 12, 2024. It is now read-only.

update/files add interface #306

Merged
merged 7 commits into from
Jun 6, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions gulpfile.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ let nodes = []

function startNode (num, done) {
createTempNode(num, (err, node) => {
if (err) throw err
if (err) {
throw err
}

const api = new API(node.repo.path())
nodes.push(api)
Expand All @@ -23,7 +25,7 @@ gulp.task('libnode:start', (done) => {
parallel([
(cb) => startNode(7, cb),
(cb) => startNode(8, cb),
(cb) => startNode(9, cb)
(cb) => startNode(13, cb)
], done)
})

Expand Down
10 changes: 6 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
"form-data": "^1.0.0-rc3",
"gulp": "^3.9.1",
"idb-plus-blob-store": "^1.1.2",
"interface-ipfs-core": "^0.1.5",
"interface-ipfs-core": "^0.2.2",
"left-pad": "^1.1.0",
"lodash": "^4.11.2",
"mocha": "^2.5.1",
Expand All @@ -66,14 +66,15 @@
"fs-blob-store": "^5.2.1",
"glob": "^7.0.3",
"hapi": "^13.4.1",
"ipfs-api": "^4.1.0",
"ipfs-bitswap": "^0.4.1",
"ipfs-api": "^5.0.1",
"ipfs-block": "^0.3.0",
"ipfs-block-service": "^0.4.0",
"ipfs-merkle-dag": "^0.6.0",
"ipfs-multipart": "^0.1.0",
"ipfs-repo": "^0.8.0",
"ipfs-unixfs-engine": "^0.8.0",
"ipfs-unixfs-engine": "^0.9.0",
"isstream": "^0.1.2",
"joi": "^8.0.5",
"libp2p-ipfs": "^0.11.0",
"libp2p-ipfs-browser": "^0.10.0",
Expand All @@ -92,7 +93,8 @@
"run-parallel-limit": "^1.0.3",
"run-series": "^1.1.4",
"run-waterfall": "^1.1.3",
"temp": "^0.8.3"
"temp": "^0.8.3",
"through2": "^2.0.1"
},
"contributors": [
"Andrew de Andrade <andrew@deandrade.com.br>",
Expand Down
4 changes: 2 additions & 2 deletions src/cli/commands/files/add.js
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ module.exports = Command.extend({
if (!fs.statSync(element).isDirectory()) {
i.write({
path: element.substring(index + 1, element.length),
stream: fs.createReadStream(element)
content: fs.createReadStream(element)
})
}
callback()
Expand All @@ -86,7 +86,7 @@ module.exports = Command.extend({
} else {
rs = fs.createReadStream(inPath)
inPath = inPath.substring(inPath.lastIndexOf('/') + 1, inPath.length)
filePair = {path: inPath, stream: rs}
filePair = {path: inPath, content: rs}
i.write(filePair)
i.end()
}
Expand Down
2 changes: 1 addition & 1 deletion src/cli/commands/files/cat.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ module.exports = Command.extend({
throw (err)
}
res.on('data', (data) => {
data.stream.pipe(process.stdout)
data.content.pipe(process.stdout)
})
Copy link
Member Author

@daviddias daviddias Jun 5, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be changed to make res the stream instead, once we have the interface-ipfs-core for cat merged too

//cc @nginnever

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, changes to make cat return just a stream are in PR 256

})
})
Expand Down
4 changes: 2 additions & 2 deletions src/cli/commands/files/get.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ function fileHandler (result, dir) {
const dirPath = path.join(dir, file.path)
// Check to see if the result is a directory
if (file.dir === false) {
file.stream.pipe(fs.createWriteStream(dirPath))
file.content.pipe(fs.createWriteStream(dirPath))
} else {
ensureDir(dirPath, (err) => {
if (err) {
Expand All @@ -64,7 +64,7 @@ function fileHandler (result, dir) {
throw err
}

file.stream.pipe(fs.createWriteStream(dirPath))
file.content.pipe(fs.createWriteStream(dirPath))
})
}
}
Expand Down
99 changes: 82 additions & 17 deletions src/core/ipfs/files.js
Original file line number Diff line number Diff line change
@@ -1,40 +1,105 @@
'use strict'

const Importer = require('ipfs-unixfs-engine').importer
const Exporter = require('ipfs-unixfs-engine').exporter
const unixfsEngine = require('ipfs-unixfs-engine')
const Importer = unixfsEngine.Importer
const Exporter = unixfsEngine.Exporter
const UnixFS = require('ipfs-unixfs')
const through = require('through2')
const isStream = require('isstream')
const promisify = require('promisify-es6')
const Duplex = require('stream').Duplex
const multihashes = require('multihashes')

module.exports = function files (self) {
return {
add: (arr, callback) => {
if (typeof arr === 'function') {
callback = arr
arr = undefined
createAddStream: (callback) => {
const i = new Importer(self._dagS)
const ds = new Duplex({ objectMode: true })

ds._read = (n) => {}
ds._write = (file, enc, next) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this passthrough needed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a transform, on the way up

i.write(file)
next()
}
if (callback === undefined) {

ds.end = () => {
i.end()
}

let counter = 0

i.on('data', (file) => {
counter++
self.object.get(file.multihash, (err, node) => {
if (err) {
return ds.emit('error', err)
}
ds.push({path: file.path, node: node})
counter--
})
})

i.on('end', () => {
function canFinish () {
if (counter === 0) {
ds.push(null)
} else {
setTimeout(canFinish, 100)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this 100ms timeout loop?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cause getting the object is an Async operation and the underlying stream from the importer won't care to wait that we have pushed all the objects to the upper stream and emit them all and then the 'end' event. This avoid a racing condition.

}
}
canFinish()
})

callback(null, ds)
},
add: promisify((data, callback) => {
// Buffer input
if (Buffer.isBuffer(data)) {
data = [{
path: '',
content: data
}]
}
// Readable stream input
if (isStream.isReadable(data)) {
data = [{
path: '',
content: data
}]
}
if (!callback || typeof callback !== 'function') {
callback = function noop () {}
}
if (arr === undefined) {
return new Importer(self._dagS)
if (!Array.isArray(data)) {
return callback(new Error('"data" must be an array of { path: string, content: Buffer|Readable } or Buffer or Readable'))
}

const i = new Importer(self._dagS)
const res = []

i.on('data', (info) => {
res.push(info)
})

i.once('end', () => {
// Transform file info tuples to DAGNodes
i.pipe(through.obj((info, enc, next) => {
const mh = multihashes.toB58String(info.multihash)
self._dagS.get(mh, (err, node) => {
if (err) return callback(err)
var obj = {
path: info.path || mh,
node: node
}
res.push(obj)
next()
})
}, (done) => {
callback(null, res)
})
}))

arr.forEach((tuple) => {
data.forEach((tuple) => {
i.write(tuple)
})

i.end()
},
}),

cat: (hash, callback) => {
self._dagS.get(hash, (err, fetchedNode) => {
if (err) {
Expand Down
2 changes: 1 addition & 1 deletion src/core/ipfs/init.js
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ module.exports = function init (self) {
const rs = new Readable()
rs.push(fs.readFileSync(element))
rs.push(null)
const filePair = {path: addPath, stream: rs}
const filePair = {path: addPath, content: rs}
i.write(filePair)
}
callback()
Expand Down
2 changes: 1 addition & 1 deletion src/http-api/resources/files.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ exports.cat = {
}).code(500)
}
stream.on('data', (data) => {
return reply(data.stream)
return reply(data.content)
})
})
}
Expand Down
2 changes: 1 addition & 1 deletion test/cli/test-bitswap.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ const createTempNode = require('../utils/temp-node')
const repoPath = require('./index').repoPath

describe('bitswap', function () {
this.timeout(20000)
this.timeout(40000)
const env = _.clone(process.env)
env.IPFS_PATH = repoPath

Expand Down
9 changes: 5 additions & 4 deletions test/core/both/test-bitswap.js
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ describe('bitswap', () => {
// need timeout so we wait for identify to happen
// in the browsers
connectNodesSingle(node2, node1, cb)
}, 100)
}, 300)
], done)
}

Expand All @@ -101,15 +101,16 @@ describe('bitswap', () => {
})

afterEach((done) => {
setTimeout(() => ipfs.goOffline(done), 500)
// ipfs.goOffline(done)
setTimeout(() => ipfs.goOffline(done), 1500)
})

it('2 peers', (done) => {
const block = makeBlock()
let node
series([
// 0. Start node
(cb) => addNode(9, (err, _ipfs) => {
(cb) => addNode(13, (err, _ipfs) => {
node = _ipfs
cb(err)
}),
Expand Down Expand Up @@ -196,7 +197,7 @@ describe('bitswap', () => {
ipfs.files.cat(hash, (err, res) => {
expect(err).to.not.exist
res.on('file', (data) => {
data.stream.pipe(bl((err, bldata) => {
data.content.pipe(bl((err, bldata) => {
expect(err).to.not.exist
expect(bldata.toString()).to.equal('I love IPFS <3')
cb()
Expand Down
69 changes: 12 additions & 57 deletions test/core/both/test-files.js
Original file line number Diff line number Diff line change
@@ -1,64 +1,19 @@
/* eslint-env mocha */
'use strict'

const bl = require('bl')
const expect = require('chai').expect
const Readable = require('stream').Readable
const bs58 = require('bs58')

const test = require('interface-ipfs-core')
const IPFS = require('../../../src/core')

describe('files', () => {
let ipfs

before((done) => {
ipfs = new IPFS(require('../../utils/repo-path'))
ipfs.load(done)
})

it('add', (done) => {
const buffered = new Buffer('some data')
const rs = new Readable()
rs.push(buffered)
rs.push(null)
const arr = []
const filePair = {path: 'data.txt', stream: rs}
arr.push(filePair)
ipfs.files.add(arr, (err, res) => {
expect(err).to.not.exist
expect(res[0].path).to.equal('data.txt')
expect(res[0].size).to.equal(17)
expect(bs58.encode(res[0].multihash).toString()).to.equal('QmVv4Wz46JaZJeH5PMV4LGbRiiMKEmszPYY3g6fjGnVXBS')
done()
const common = {
setup: function (cb) {
const ipfs = new IPFS(require('../../utils/repo-path'))
ipfs.load(() => {
cb(null, ipfs)
})
})
},
teardown: function (cb) {
cb()
}
}

it('cat', (done) => {
const hash = 'QmT78zSuBmuS4z925WZfrqQ1qHaJ56DQaTfyMUF7F8ff5o'
ipfs.files.cat(hash, (err, res) => {
expect(err).to.not.exist
res.on('data', (data) => {
data.stream.pipe(bl((err, bldata) => {
expect(err).to.not.exist
expect(bldata.toString()).to.equal('hello world\n')
done()
}))
})
})
})

it('get', (done) => {
// TODO create non-trival get test
const hash = 'QmT78zSuBmuS4z925WZfrqQ1qHaJ56DQaTfyMUF7F8ff5o'
ipfs.files.get(hash, (err, res) => {
expect(err).to.not.exist
res.on('data', (data) => {
data.stream.pipe(bl((err, bldata) => {
expect(err).to.not.exist
expect(bldata.toString()).to.equal('hello world\n')
done()
}))
})
})
})
})
test.files(common)
2 changes: 1 addition & 1 deletion test/core/node-only/test-swarm.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ const parallel = require('run-parallel')
const createTempNode = require('../../utils/temp-node')

describe('swarm', function () {
this.timeout(20 * 1000)
this.timeout(40 * 1000)

let nodeA
let nodeB
Expand Down
2 changes: 1 addition & 1 deletion test/http-api/test-files.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ module.exports = (httpAPI) => {
describe('api', () => {
let api

it('api', () => {
before(() => {
api = httpAPI.server.select('API')
})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need for done


Expand Down