Skip to content
This repository has been archived by the owner on Feb 12, 2024. It is now read-only.

Commit

Permalink
Merge pull request #306 from ipfs/update/files-add-interface
Browse files Browse the repository at this point in the history
update/files add interface
  • Loading branch information
dignifiedquire committed Jun 6, 2016
2 parents bddefb1 + 20d880c commit bf56200
Show file tree
Hide file tree
Showing 13 changed files with 119 additions and 94 deletions.
6 changes: 4 additions & 2 deletions gulpfile.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ let nodes = []

function startNode (num, done) {
createTempNode(num, (err, node) => {
if (err) throw err
if (err) {
throw err
}

const api = new API(node.repo.path())
nodes.push(api)
Expand All @@ -23,7 +25,7 @@ gulp.task('libnode:start', (done) => {
parallel([
(cb) => startNode(7, cb),
(cb) => startNode(8, cb),
(cb) => startNode(9, cb)
(cb) => startNode(13, cb)
], done)
})

Expand Down
10 changes: 6 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
"form-data": "^1.0.0-rc3",
"gulp": "^3.9.1",
"idb-plus-blob-store": "^1.1.2",
"interface-ipfs-core": "^0.1.5",
"interface-ipfs-core": "^0.2.2",
"left-pad": "^1.1.0",
"lodash": "^4.11.2",
"mocha": "^2.5.1",
Expand All @@ -66,14 +66,15 @@
"fs-blob-store": "^5.2.1",
"glob": "^7.0.3",
"hapi": "^13.4.1",
"ipfs-api": "^4.1.0",
"ipfs-bitswap": "^0.4.1",
"ipfs-api": "^5.0.1",
"ipfs-block": "^0.3.0",
"ipfs-block-service": "^0.4.0",
"ipfs-merkle-dag": "^0.6.0",
"ipfs-multipart": "^0.1.0",
"ipfs-repo": "^0.8.0",
"ipfs-unixfs-engine": "^0.8.0",
"ipfs-unixfs-engine": "^0.9.0",
"isstream": "^0.1.2",
"joi": "^8.0.5",
"libp2p-ipfs": "^0.11.0",
"libp2p-ipfs-browser": "^0.10.0",
Expand All @@ -92,7 +93,8 @@
"run-parallel-limit": "^1.0.3",
"run-series": "^1.1.4",
"run-waterfall": "^1.1.3",
"temp": "^0.8.3"
"temp": "^0.8.3",
"through2": "^2.0.1"
},
"contributors": [
"Andrew de Andrade <andrew@deandrade.com.br>",
Expand Down
4 changes: 2 additions & 2 deletions src/cli/commands/files/add.js
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ module.exports = Command.extend({
if (!fs.statSync(element).isDirectory()) {
i.write({
path: element.substring(index + 1, element.length),
stream: fs.createReadStream(element)
content: fs.createReadStream(element)
})
}
callback()
Expand All @@ -86,7 +86,7 @@ module.exports = Command.extend({
} else {
rs = fs.createReadStream(inPath)
inPath = inPath.substring(inPath.lastIndexOf('/') + 1, inPath.length)
filePair = {path: inPath, stream: rs}
filePair = {path: inPath, content: rs}
i.write(filePair)
i.end()
}
Expand Down
2 changes: 1 addition & 1 deletion src/cli/commands/files/cat.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ module.exports = Command.extend({
throw (err)
}
res.on('data', (data) => {
data.stream.pipe(process.stdout)
data.content.pipe(process.stdout)
})
})
})
Expand Down
4 changes: 2 additions & 2 deletions src/cli/commands/files/get.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ function fileHandler (result, dir) {
const dirPath = path.join(dir, file.path)
// Check to see if the result is a directory
if (file.dir === false) {
file.stream.pipe(fs.createWriteStream(dirPath))
file.content.pipe(fs.createWriteStream(dirPath))
} else {
ensureDir(dirPath, (err) => {
if (err) {
Expand All @@ -64,7 +64,7 @@ function fileHandler (result, dir) {
throw err
}

file.stream.pipe(fs.createWriteStream(dirPath))
file.content.pipe(fs.createWriteStream(dirPath))
})
}
}
Expand Down
99 changes: 82 additions & 17 deletions src/core/ipfs/files.js
Original file line number Diff line number Diff line change
@@ -1,40 +1,105 @@
'use strict'

const Importer = require('ipfs-unixfs-engine').importer
const Exporter = require('ipfs-unixfs-engine').exporter
const unixfsEngine = require('ipfs-unixfs-engine')
const Importer = unixfsEngine.Importer
const Exporter = unixfsEngine.Exporter
const UnixFS = require('ipfs-unixfs')
const through = require('through2')
const isStream = require('isstream')
const promisify = require('promisify-es6')
const Duplex = require('stream').Duplex
const multihashes = require('multihashes')

module.exports = function files (self) {
return {
add: (arr, callback) => {
if (typeof arr === 'function') {
callback = arr
arr = undefined
createAddStream: (callback) => {
const i = new Importer(self._dagS)
const ds = new Duplex({ objectMode: true })

ds._read = (n) => {}
ds._write = (file, enc, next) => {
i.write(file)
next()
}
if (callback === undefined) {

ds.end = () => {
i.end()
}

let counter = 0

i.on('data', (file) => {
counter++
self.object.get(file.multihash, (err, node) => {
if (err) {
return ds.emit('error', err)
}
ds.push({path: file.path, node: node})
counter--
})
})

i.on('end', () => {
function canFinish () {
if (counter === 0) {
ds.push(null)
} else {
setTimeout(canFinish, 100)
}
}
canFinish()
})

This comment has been minimized.

Copy link
@deltab

deltab Jun 6, 2016

What happens if there is an error (sync or async) and counter doesn't get decremented, hence never becomes zero? Does canFinish get repeated forever?

callback(null, ds)

This comment has been minimized.

Copy link
@deltab

deltab Jun 6, 2016

Is this expected to become async at some point?

},
add: promisify((data, callback) => {
// Buffer input
if (Buffer.isBuffer(data)) {
data = [{
path: '',
content: data
}]
}
// Readable stream input
if (isStream.isReadable(data)) {
data = [{
path: '',
content: data
}]
}
if (!callback || typeof callback !== 'function') {
callback = function noop () {}
}
if (arr === undefined) {
return new Importer(self._dagS)
if (!Array.isArray(data)) {
return callback(new Error('"data" must be an array of { path: string, content: Buffer|Readable } or Buffer or Readable'))
}

const i = new Importer(self._dagS)
const res = []

i.on('data', (info) => {
res.push(info)
})

i.once('end', () => {
// Transform file info tuples to DAGNodes
i.pipe(through.obj((info, enc, next) => {
const mh = multihashes.toB58String(info.multihash)
self._dagS.get(mh, (err, node) => {
if (err) return callback(err)
var obj = {
path: info.path || mh,
node: node
}
res.push(obj)
next()
})
}, (done) => {
callback(null, res)
})
}))

arr.forEach((tuple) => {
data.forEach((tuple) => {
i.write(tuple)
})

i.end()
},
}),

cat: (hash, callback) => {
self._dagS.get(hash, (err, fetchedNode) => {
if (err) {
Expand Down
2 changes: 1 addition & 1 deletion src/core/ipfs/init.js
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ module.exports = function init (self) {
const rs = new Readable()
rs.push(fs.readFileSync(element))
rs.push(null)
const filePair = {path: addPath, stream: rs}
const filePair = {path: addPath, content: rs}
i.write(filePair)
}
callback()
Expand Down
2 changes: 1 addition & 1 deletion src/http-api/resources/files.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ exports.cat = {
}).code(500)
}
stream.on('data', (data) => {
return reply(data.stream)
return reply(data.content)
})
})
}
Expand Down
2 changes: 1 addition & 1 deletion test/cli/test-bitswap.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ const createTempNode = require('../utils/temp-node')
const repoPath = require('./index').repoPath

describe('bitswap', function () {
this.timeout(20000)
this.timeout(40000)
const env = _.clone(process.env)
env.IPFS_PATH = repoPath

Expand Down
9 changes: 5 additions & 4 deletions test/core/both/test-bitswap.js
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ describe('bitswap', () => {
// need timeout so we wait for identify to happen
// in the browsers
connectNodesSingle(node2, node1, cb)
}, 100)
}, 300)
], done)
}

Expand All @@ -101,15 +101,16 @@ describe('bitswap', () => {
})

afterEach((done) => {
setTimeout(() => ipfs.goOffline(done), 500)
// ipfs.goOffline(done)
setTimeout(() => ipfs.goOffline(done), 1500)
})

it('2 peers', (done) => {
const block = makeBlock()
let node
series([
// 0. Start node
(cb) => addNode(9, (err, _ipfs) => {
(cb) => addNode(13, (err, _ipfs) => {
node = _ipfs
cb(err)
}),
Expand Down Expand Up @@ -196,7 +197,7 @@ describe('bitswap', () => {
ipfs.files.cat(hash, (err, res) => {
expect(err).to.not.exist
res.on('file', (data) => {
data.stream.pipe(bl((err, bldata) => {
data.content.pipe(bl((err, bldata) => {
expect(err).to.not.exist
expect(bldata.toString()).to.equal('I love IPFS <3')
cb()
Expand Down
69 changes: 12 additions & 57 deletions test/core/both/test-files.js
Original file line number Diff line number Diff line change
@@ -1,64 +1,19 @@
/* eslint-env mocha */
'use strict'

const bl = require('bl')
const expect = require('chai').expect
const Readable = require('stream').Readable
const bs58 = require('bs58')

const test = require('interface-ipfs-core')
const IPFS = require('../../../src/core')

describe('files', () => {
let ipfs

before((done) => {
ipfs = new IPFS(require('../../utils/repo-path'))
ipfs.load(done)
})

it('add', (done) => {
const buffered = new Buffer('some data')
const rs = new Readable()
rs.push(buffered)
rs.push(null)
const arr = []
const filePair = {path: 'data.txt', stream: rs}
arr.push(filePair)
ipfs.files.add(arr, (err, res) => {
expect(err).to.not.exist
expect(res[0].path).to.equal('data.txt')
expect(res[0].size).to.equal(17)
expect(bs58.encode(res[0].multihash).toString()).to.equal('QmVv4Wz46JaZJeH5PMV4LGbRiiMKEmszPYY3g6fjGnVXBS')
done()
const common = {
setup: function (cb) {
const ipfs = new IPFS(require('../../utils/repo-path'))
ipfs.load(() => {
cb(null, ipfs)
})
})
},
teardown: function (cb) {
cb()
}
}

it('cat', (done) => {
const hash = 'QmT78zSuBmuS4z925WZfrqQ1qHaJ56DQaTfyMUF7F8ff5o'
ipfs.files.cat(hash, (err, res) => {
expect(err).to.not.exist
res.on('data', (data) => {
data.stream.pipe(bl((err, bldata) => {
expect(err).to.not.exist
expect(bldata.toString()).to.equal('hello world\n')
done()
}))
})
})
})

it('get', (done) => {
// TODO create non-trival get test
const hash = 'QmT78zSuBmuS4z925WZfrqQ1qHaJ56DQaTfyMUF7F8ff5o'
ipfs.files.get(hash, (err, res) => {
expect(err).to.not.exist
res.on('data', (data) => {
data.stream.pipe(bl((err, bldata) => {
expect(err).to.not.exist
expect(bldata.toString()).to.equal('hello world\n')
done()
}))
})
})
})
})
test.files(common)
2 changes: 1 addition & 1 deletion test/core/node-only/test-swarm.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ const parallel = require('run-parallel')
const createTempNode = require('../../utils/temp-node')

describe('swarm', function () {
this.timeout(20 * 1000)
this.timeout(40 * 1000)

let nodeA
let nodeB
Expand Down
2 changes: 1 addition & 1 deletion test/http-api/test-files.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ module.exports = (httpAPI) => {
describe('api', () => {
let api

it('api', () => {
before(() => {
api = httpAPI.server.select('API')
})

Expand Down

0 comments on commit bf56200

Please sign in to comment.