var pull = require('pull-stream') var defer = require('pull-defer') var many = require('pull-many') var util = require('./util') module.exports = Contacts function Contacts(sbot) { if (!(this instanceof Contacts)) return new Contacts(sbot) this.sbot = sbot if (!sbot.links) { // No ssb-links: need to use ssb-friends singleton if (sbot.friends && typeof sbot.friends.graphStream === 'function') { this._startGraphStream(); } else { console.error('Missing ssb-friends or sbot.links') } } } Contacts.prototype._startGraphStream = function () { const graph = this.graph = {} let waiting = this._waiting = []; const self = this; pull( this.sbot.friends.graphStream({ old: true, live: true, }), pull.drain(function (obj) { for (const fromId in obj) { const map = obj[fromId] const map2 = graph[fromId] || (graph[fromId] = {}) for (const toId in map) { map2[toId] = map[toId] } } if (waiting) { delete self._waiting; delete waiting while (waiting.length) { waiting.shift()(); } } }, err => { if (waiting) { delete self._waiting; delete waiting while (waiting.length) { waiting.shift()(err) } } console.trace(err) }) ) } Contacts.prototype._createContactStream = function (source, dest) { if (this.sbot.links) return this._createContactStreamLinks(source, dest) if (this.graph) return this._createContactStreamGraph(source, dest) return pull.error(new Error('missing sbot.links or ssb-friends')) } Contacts.prototype._createContactStreamLinks = function (source, dest) { return pull( this.sbot.links({ source: source, dest: dest, rel: 'contact', values: true, reverse: true }), pull.filter(function (msg) { var c = msg && msg.value && msg.value.content return c && c.type === 'contact' && (!dest || c.contact === dest) }), pull.map(function (msg) { var c = msg && msg.value && msg.value.content return { source: msg.value.author, dest: c.contact, msg: msg, value: c.following ? true : c.flagged || c.blocking ? false : null } }), pull.unique(function (edge) { return edge.source + '-' + edge.dest }) ) } Contacts.prototype._createContactStreamGraph = function (source, dest) { if (this._waiting) { return u.readNext(cb => { this._waiting.push(err => { if (err) return cb(err) cb(this._createContactStreamGraph(source, dest)) }) }) } let edges = [] function mapFrom(source, dest, value) { return { source, dest, value: value >= 1 ? true : value == -1 ? false : null // Note: cannot add msg here, as it's not provided by ssb-friends } } if (source) { const from = this.graph[source] if (dest == null) { edges.push(mapFrom(source, dest, from[dest])) } else for (const d in from) { edges.push(mapFrom(source, d, from[d])) } } else if (dest) { for (const source in this.graph) { const from = this.graph[source] edges.push(mapFrom(source, dest, from[dest])) } } else { return pull.error(new TypeError('source or dest required')) } return pull.values(edges) } Contacts.prototype.createFollowsStream = function (id) { return pull( this._createContactStream(id, null), pull.filter('value'), pull.map('dest') ) } Contacts.prototype.createFollowersStream = function (id) { return pull( this._createContactStream(null, id), pull.filter('value'), pull.map('source') ) } Contacts.prototype.createFollowedFollowersStream = function (source, dest) { var follows = {}, followers = {} return pull( many([ this._createContactStream(source, null), this._createContactStream(null, dest) ]), pull.filter('value'), pull.map(function (edge) { if (edge.source === source) { if (followers[edge.dest]) { delete followers[edge.dest] return edge.dest } else { follows[edge.dest] = true } } else if (edge.dest === dest) { if (follows[edge.source]) { delete follows[edge.source] return edge.source } else { followers[edge.source] = true } } }), pull.filter() ) } Contacts.prototype.createFriendsStream = function (opts, endCb) { if (typeof opts === 'string') opts = {id: opts} var id = opts.id var msgIds = opts.msgIds var follows = {}, followers = {} var blocks = {}, blockers = {} var enemies = opts.enemies && {} return pull( many([ this._createContactStream(id, null), this._createContactStream(null, id) ]), pull.map(function (edge) { if (edge.value) { if (edge.source === id) { if (followers[edge.dest]) { var item2 = followers[edge.dest] delete followers[edge.dest] return msgIds ? {feed: edge.dest, msg: edge.msg, msg2: item2.msg} : edge.dest } else { follows[edge.dest] = msgIds ? {feed: edge.dest, msg: edge.msg} : edge.dest } } else if (edge.dest === id) { if (follows[edge.source]) { var item2 = follows[edge.source] delete follows[edge.source] return msgIds ? {feed: edge.source, msg: edge.msg, msg2: item2.msg} : edge.source } else { followers[edge.source] = msgIds ? {feed: edge.source, msg: edge.msg} : edge.source } } } else if (edge.value === false) { if (edge.source === id) { if (enemies && blockers[edge.dest]) { var item2 = blockers[edge.dest] delete blockers[edge.dest] enemies[edge.dest] = msgIds ? {feed: edge.dest, msg: edge.msg, msg2: item2.msg} : edge.dest } else { blocks[edge.dest] = msgIds ? {feed: edge.dest, msg: edge.msg} : edge.dest } } else if (edge.dest === id) { if (enemies && blocks[edge.source]) { var item2 = blocks[edge.source] delete blocks[edge.source] enemies[edge.source] = msgIds ? {feed: edge.source, msg: edge.msg, msg2: item2.msg} : edge.source } else { blockers[edge.source] = msgIds ? {feed: edge.source, msg: edge.msg} : edge.source } } } }), pull.filter(), endCb && function (read) { return function (abort, cb) { read(abort, function (end, data) { cb(end, data) if (end) endCb(end === true ? null : end, { followers: Object.values(followers), follows: Object.values(follows), blocks: Object.values(blocks), blockers: Object.values(blockers), enemies: Object.values(enemies), }) }) } } ) } Contacts.prototype.createContactStreams = function (opts) { var msgIds = opts.msgIds var follows = defer.source() var followers = defer.source() var blocks = defer.source() var blockers = defer.source() var enemies = defer.source() var friends = this.createFriendsStream(opts, function (err, more) { try { follows.resolve(err ? pull.error(err) : pull.values(more.follows)) followers.resolve(err ? pull.error(err) : pull.values(more.followers)) blocks.resolve(err ? pull.error(err) : pull.values(more.blocks)) blockers.resolve(err ? pull.error(err) : pull.values(more.blockers)) enemies.resolve(err ? pull.error(err) : pull.values(more.enemies)) } catch(e) { console.trace(e) } }) return { friends: friends, follows: follows, followers: followers, enemies: enemies, blocks: blocks, blockers: blockers, } } Contacts.prototype.get = function (source, dest, cb) { if (this.sbot.links) return pull( this.sbot.links({source, dest, rel: 'contact', reverse: true, values: true, meta: false, keys: false}), pull.filter(value => { const c = value && !value.private && value.content return c && c.type === 'contact' }), pull.take(1), pull.reduce((acc, value) => { // trinary logic from ssb-friends const c = value && value.content return c.following ? true : c.flagged || c.blocking ? false : null }, null, cb) ) if (this._waiting) { return this._waiting.push(err => { if (err) return cb(err) this.get(source, dest, cb) }) } const map = this.graph[source] const value = map && map[dest] cb(null, value >= 1 ? true : value == -1 ? false : null) }