Skip to content
This repository has been archived by the owner on Feb 12, 2024. It is now read-only.

Commit

Permalink
fix: remove libp2p-record for pubsub
Browse files Browse the repository at this point in the history
  • Loading branch information
vasco-santos committed Dec 1, 2018
1 parent 9bb8816 commit 62f25a6
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 30 deletions.
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@
"cids": "~0.5.5",
"class-is": "^1.1.0",
"datastore-core": "~0.6.0",
"datastore-pubsub": "ipfs/js-datastore-pubsub#feat/encode-record-store-keys",
"datastore-pubsub": "~0.1.1",
"debug": "^4.1.0",
"deep-extend": "~0.6.0",
"err-code": "^1.1.2",
Expand Down Expand Up @@ -120,7 +120,7 @@
"ipld-ethereum": "^2.0.1",
"ipld-git": "~0.2.2",
"ipld-zcash": "~0.1.6",
"ipns": "~0.3.0",
"ipns": "~0.4.2",
"is-ipfs": "~0.4.7",
"is-pull-stream": "~0.0.0",
"is-stream": "^1.1.0",
Expand Down
14 changes: 8 additions & 6 deletions src/core/components/pubsub.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ const errPubsubDisabled = () => {
return errCode(new Error('pubsub experiment is not enabled'), 'ERR_PUBSUB_DISABLED')
}

const pubsubEnabled = (options) => options.EXPERIMENTAL.pubsub || options.EXPERIMENTAL.ipnsPubsub

module.exports = function pubsub (self) {
return {
subscribe: (topic, handler, options, callback) => {
Expand All @@ -16,7 +18,7 @@ module.exports = function pubsub (self) {
options = {}
}

if (!self._options.EXPERIMENTAL.pubsub) {
if (!pubsubEnabled(self._options)) {
return callback
? setImmediate(() => callback(errPubsubDisabled()))
: Promise.reject(errPubsubDisabled())
Expand All @@ -37,7 +39,7 @@ module.exports = function pubsub (self) {
},

unsubscribe: (topic, handler, callback) => {
if (!self._options.EXPERIMENTAL.pubsub) {
if (!pubsubEnabled(self._options)) {
return callback
? setImmediate(() => callback(errPubsubDisabled()))
: Promise.reject(errPubsubDisabled())
Expand All @@ -53,28 +55,28 @@ module.exports = function pubsub (self) {
},

publish: promisify((topic, data, callback) => {
if (!self._options.EXPERIMENTAL.pubsub) {
if (!pubsubEnabled(self._options)) {
return setImmediate(() => callback(errPubsubDisabled()))
}
self._libp2pNode.pubsub.publish(topic, data, callback)
}),

ls: promisify((callback) => {
if (!self._options.EXPERIMENTAL.pubsub) {
if (!pubsubEnabled(self._options)) {
return setImmediate(() => callback(errPubsubDisabled()))
}
self._libp2pNode.pubsub.ls(callback)
}),

peers: promisify((topic, callback) => {
if (!self._options.EXPERIMENTAL.pubsub) {
if (!pubsubEnabled(self._options)) {
return setImmediate(() => callback(errPubsubDisabled()))
}
self._libp2pNode.pubsub.peers(topic, callback)
}),

setMaxListeners (n) {
if (!self._options.EXPERIMENTAL.pubsub) {
if (!pubsubEnabled(self._options)) {
throw errPubsubDisabled()
}
self._libp2pNode.pubsub.setMaxListeners(n)
Expand Down
20 changes: 4 additions & 16 deletions src/core/ipns/publisher.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
'use strict'

const PeerId = require('peer-id')
const Record = require('libp2p-record').Record
const { Key } = require('interface-datastore')
const series = require('async/series')
const errcode = require('err-code')
Expand Down Expand Up @@ -97,19 +96,17 @@ class IpnsPublisher {
return callback(errcode(new Error(errMsg), 'ERR_INVALID_DATASTORE_KEY'))
}

let rec
let entryData
try {
// Marshal record
const entryData = ipns.marshal(entry)
// Marshal to libp2p record
rec = new Record(key.toBuffer(), entryData)
entryData = ipns.marshal(entry)
} catch (err) {
log.error(err)
return callback(err)
}

// Add record to routing (buffer key)
this._routing.put(key.toBuffer(), rec.serialize(), (err, res) => {
this._routing.put(key.toBuffer(), entryData, (err, res) => {
if (err) {
const errMsg = `ipns record for ${key.toString()} could not be stored in the routing`

Expand Down Expand Up @@ -137,17 +134,8 @@ class IpnsPublisher {
return callback(errcode(new Error(errMsg), 'ERR_UNDEFINED_PARAMETER'))
}

let rec
try {
// Marshal to libp2p record
rec = new Record(key.toBuffer(), publicKey.bytes)
} catch (err) {
log.error(err)
return callback(err)
}

// Add public key to routing (buffer key)
this._routing.put(key.toBuffer(), rec.serialize(), (err, res) => {
this._routing.put(key.toBuffer(), publicKey.bytes, (err, res) => {
if (err) {
const errMsg = `public key for ${key.toString()} could not be stored in the routing`

Expand Down
4 changes: 1 addition & 3 deletions src/core/ipns/resolver.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
'use strict'

const ipns = require('ipns')
const Record = require('libp2p-record').Record
const PeerId = require('peer-id')
const errcode = require('err-code')

Expand Down Expand Up @@ -119,8 +118,7 @@ class IpnsResolver {

let ipnsEntry
try {
const record = Record.deserialize(res)
ipnsEntry = ipns.unmarshal(record.value)
ipnsEntry = ipns.unmarshal(res)
} catch (err) {
const errMsg = `found ipns record that we couldn't convert to a value`

Expand Down
23 changes: 21 additions & 2 deletions src/core/ipns/routing/offline-datastore.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
'use strict'

const { Key } = require('interface-datastore')
const Record = require('libp2p-record').Record
const { encodeBase32 } = require('./utils')

const errcode = require('err-code')
Expand Down Expand Up @@ -48,7 +49,10 @@ class OfflineDatastore {
return callback(errcode(new Error(errMsg), 'ERR_GENERATING_ROUTING_KEY'))
}

this._repo.datastore.put(routingKey, value, callback)
// Marshal to libp2p record as the DHT does
let record = new Record(key, value)

this._repo.datastore.put(routingKey, record.serialize(), callback)
}

/**
Expand Down Expand Up @@ -76,7 +80,22 @@ class OfflineDatastore {
return callback(errcode(new Error(errMsg), 'ERR_GENERATING_ROUTING_KEY'))
}

this._repo.datastore.get(routingKey, callback)
this._repo.datastore.get(routingKey, (err, res) => {
if (err) {
return callback(err)
}

// Unmarshal libp2p record as the DHT does
let record
try {
record = Record.deserialize(res)
} catch (err) {
log.error(err)
return callback(err)
}

callback(null, record.value)
})
}

// encode key properly - base32(/ipns/{cid})
Expand Down
1 change: 0 additions & 1 deletion src/core/ipns/routing/pubsub-datastore.js
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ class IpnsPubsubDatastore {
}

// Modify subscription key to have a proper encoding
// Without this, the utf-8 encoding gets the key broken
_handleSubscriptionKey (key, callback) {
const subscriber = this._subscriptions[key]

Expand Down

0 comments on commit 62f25a6

Please sign in to comment.