var pull = require('pull-stream') var defer = require('pull-defer') var { onceTrue } = require('mutant') var ref = require('ssb-ref') var Reconnect = require('pull-reconnect') var createClient = require('ssb-client') var createFeed = require('ssb-feed') var nest = require('depnest') var Value = require('mutant/value') var ssbKeys = require('ssb-keys') exports.needs = nest({ 'config.sync.load': 'first', 'keys.sync.load': 'first', 'sbot.obs.connectionStatus': 'first', 'sbot.hook.publish': 'map' }) exports.gives = { sbot: { sync: { cache: true }, async: { get: true, publish: true, addBlob: true, gossipConnect: true }, pull: { log: true, userFeed: true, messagesByType: true, feed: true, links: true, backlinks: true, stream: true }, obs: { connectionStatus: true, connection: true, connectedPeers: true, localPeers: true } } } exports.create = function (api) { const config = api.config.sync.load() const keys = api.keys.sync.load() var cache = {} var sbot = null var connection = Value() var connectionStatus = Value() var connectedPeers = Value([]) var localPeers = Value([]) setInterval(refreshPeers, 1e3) var rec = Reconnect(function (isConn) { function notify (value) { isConn(value); connectionStatus.set(value) } createClient(keys, config, function (err, _sbot) { if (err) { return notify(err) } sbot = _sbot sbot.on('closed', function () { sbot = null connection.set(null) notify(new Error('closed')) }) connection.set(sbot) notify() refreshPeers() }) }) var internal = { getLatest: rec.async(function (id, cb) { sbot.getLatest(id, cb) }), add: rec.async(function (msg, cb) { sbot.add(msg, cb) }) } var feed = createFeed(internal, keys, {remote: true}) return { sbot: { sync: { cache: () => cache }, async: { get: rec.async(function (key, cb) { if (typeof cb !== 'function') { throw new Error('cb must be function') } if (cache[key]) cb(null, cache[key]) else { sbot.get(key, function (err, value) { if (err) return cb(err) runHooks({key, value}) cb(null, value) }) } }), publish: rec.async((content, cb) => { if (content.recps) { content = ssbKeys.box(content, content.recps.map(e => { return ref.isFeed(e) ? e : e.link })) } else if (content.mentions) { content.mentions.forEach(mention => { if (ref.isBlob(mention.link)) { sbot.blobs.push(mention.link, err => { if (err) console.error(err) }) } }) } if (sbot) { // instant updating of interface (just incase sbot is busy) runHooks({ publishing: true, timestamp: Date.now(), value: { timestamp: Date.now(), author: keys.id, content } }) } feed.add(content, (err, msg) => { if (err) console.error(err) else if (!cb) console.log(msg) cb && cb(err, msg) }) }), addBlob: rec.async((stream, cb) => { return pull( stream, Hash(function (err, id) { if (err) return cb(err) // completely UGLY hack to tell when the blob has been sucessfully written... var start = Date.now() var n = 5 next() function next () { setTimeout(function () { sbot.blobs.has(id, function (_, has) { if (has) return cb(null, id) if (n--) next() else cb(new Error('write failed')) }) }, Date.now() - start) } }), sbot.blobs.add() ) }), gossipConnect: rec.async(function (opts, cb) { sbot.gossip.connect(opts, cb) }) }, pull: { backlinks: rec.source(query => { return sbot.backlinks.read(query) }), userFeed: rec.source(opts => { return sbot.createUserStream(opts) }), messagesByType: rec.source(opts => { return sbot.messagesByType(opts) }), feed: rec.source(function (opts) { return pull( sbot.createFeedStream(opts), pull.through(runHooks) ) }), log: rec.source(opts => { return pull( sbot.createLogStream(opts), pull.through(runHooks) ) }), links: rec.source(function (query) { return sbot.links(query) }), stream: function (fn) { var stream = defer.source() onceTrue(connection, function (connection) { stream.resolve(fn(connection)) }) return stream } }, obs: { connectionStatus: (listener) => connectionStatus(listener), connection, connectedPeers: () => connectedPeers, localPeers: () => localPeers } } } // scoped function runHooks (msg) { if (msg.publishing) { api.sbot.hook.publish(msg) } else if (!cache[msg.key]) { // cache[msg.key] = msg.value // api.sbot.hook.feed(msg) } } function refreshPeers () { if (sbot) { sbot.gossip.peers((err, peers) => { if (err) return console.error(err) connectedPeers.set(peers.filter(x => x.state === 'connected').map(x => x.key)) localPeers.set(peers.filter(x => x.source === 'local').map(x => x.key)) }) } } } function Hash (onHash) { var buffers = [] return pull.through(function (data) { buffers.push(typeof data === 'string' ? new Buffer(data, 'utf8') : data ) }, function (err) { if (err && !onHash) throw err var b = buffers.length > 1 ? Buffer.concat(buffers) : buffers[0] var h = '&' + ssbKeys.hash(b) onHash && onHash(err, h) }) }