var pull = require('pull-stream') var paramap = require('pull-paramap') var asyncMemo = require('asyncmemo') var issueSchemas = require('./lib/schemas') var multicb = require('multicb') function Cache(fn, ssb) { return asyncMemo(fn) return function (key, cb) { ac.get(key, cb) } } function truncate(str, len) { return str.length > len ? str.substr(0, len) + '...' : str } exports.name = 'issues' exports.manifest = { get: 'async', list: 'source', new: 'async', edit: 'async', close: 'async', reopen: 'async', getMention: 'sync', isStatusChanged: 'sync' } exports.schemas = issueSchemas function isStatusChanged(msg, issue) { var mention = getMention(msg, issue) return mention ? mention.open : null } function getMention(msg, issue) { var c = msg.value.content if (msg.key == issue.id || c.issue == issue.id || c.link == issue.id) if (c.open != null) return c if (c.issues) { var mention for (var i = 0; i < c.issues.length; i++) { mention = getMention({value: { timestamp: msg.value.timestamp, author: msg.value.author, content: c.issues[i] }}, issue) if (mention) return mention } } } function isMsgIdRange(opts) { return ( (opts.gt && opts.gt[0] === '%') || (opts.lt && opts.lt[0] === '%') || (opts.gte && opts.gte[0] === '%') || (opts.lte && opts.lte[0] === '%') ) } function passthrough(read) { return read } function optsToRangeFilter(opts) { if (!opts || ( opts.gt == null || opts.lt == null || opts.gte == null || opts.lte == null )) return passthrough return pull.filter(function (msg) { return (opts.gt == null || msg.timestamp > opts.gt) && (opts.lt == null || msg.timestamp < opts.lt) && (opts.gte == null || msg.timestamp >= opts.gte) && (opts.lte == null || msg.timestamp <= opts.lte) }) } function optsToRange(opts) { var range = {} var defined = false if (opts.gt != null) defined = true, range.$gt = opts.gt if (opts.lt != null) defined = true, range.$lt = opts.lt if (opts.gte != null) defined = true, range.$gte = opts.gt if (opts.lte != null) defined = true, range.$lte = opts.lt return defined ? range : undefined } exports.init = function (ssb) { var ssbGet = asyncMemo(ssb.get) var liveStreams = [] var getIssue = asyncMemo(function (id, cb) { var issue = { labels: [] } var labelCounts = {} var issueMsg function addLabel(id) { var count = labelCounts[id] || 0 labelCounts[id] = ++count if (count === 1) { var i = issue.labels.indexOf(id) if (i === -1) issue.labels.push(id) } } function removeLabel(id) { var count = labelCounts[id] || 0 labelCounts[id] = --count if (count === 0) { var i = issue.labels.indexOf(id) if (i > -1) issue.labels.splice(i, 1) } } ssbGet(id, function (err, msg) { msg = {key: id, value: msg} if (err) return cb(err) issueMsg = msg issue.id = msg.key issue.msg = msg issue.author = msg.value.author var c = msg.value.content issue.project = c.project issue.text = c.text || c.title || JSON.stringify(msg, null, 2) issue.created_at = issue.updated_at = msg.value.timestamp if (Array.isArray(c.labels)) c.labels.forEach(addLabel) if (c.project) ssbGet(c.project, gotProjectMsg) else getLinks() }) function gotProjectMsg(err, msg) { if (err) return cb(err) issue.projectAuthor = msg.author getLinks() } function getLinks() { var now = Date.now() // compute the result from the past data pull( ssb.links({dest: id, reverse: true, values: true, old: true, live: false, sync: false}), pull.drain(onOldMsg, onOldEnd) ) // keep the results up-to-date in the future var read = ssb.links({dest: id, values: true, old: false, live: true, sync: false}) liveStreams.push(read) pull( read, pull.drain(onNewMsg, onNewEnd) ) } function onOldMsg(msg) { if (!msg.value) return var c = msg.value.content // handle updates to issue if (msg.key == id || c.issue == id || c.link == id) { if (c.open != null && issue.open == null) issue.open = c.open if (c.title != null && issue.title == null) issue.title = c.title if (c.labels) { if (Array.isArray(c.labels.add)) c.labels.add.forEach(addLabel) if (Array.isArray(c.labels.remove)) c.labels.remove.forEach(removeLabel) } if (msg.value.timestamp > issue.updated_at) issue.updated_at = msg.value.timestamp } // handle updates via mention if (c.issues) { for (var i = 0; i < c.issues.length; i++) { if (c.type === 'issue-label') { if (c.issues[i] === id) addLabel(msg.key) } else { onOldMsg({value: { timestamp: msg.value.timestamp, author: msg.value.author, content: c.issues[i] }}) } } } checkReady() } function onNewMsg(msg) { if (!msg.value) return var c = msg.value.content // handle updates to issue if (msg.key == id || c.issue == id || c.link == id) { if (c.open != null) issue.open = c.open if (c.title != null) issue.title = c.title if (msg.value.timestamp > issue.updated_at) issue.updated_at = msg.value.timestamp if (c.labels) { if (Array.isArray(c.labels.add)) c.labels.add.forEach(addLabel) if (Array.isArray(c.labels.remove)) c.labels.remove.forEach(removeLabel) } } // handle updates via mention if (c.issues) { for (var i = 0; i < c.issues.length; i++) onNewMsg({value: { timestamp: msg.value.timestamp, author: msg.value.author, content: c.issues[i] }}) } } function checkReady() { // call back once all the issue properties are set if (issue.open != null && issue.title != null) { var _cb = cb delete cb _cb(null, issue) } } function onOldEnd(err) { if (err) { if (cb) cb(err) else console.error(err) return } // process the root message last onOldMsg(issueMsg) // if callback hasn't been called yet, the issue is missing a field if (cb) { if (issue.open == null) issue.open = true if (issue.title == null) issue.title = truncate(issue.text.split('\n')[0], 250) || issue.id checkReady() } } function onNewEnd(err) { if (err) { if (cb) cb(err) else console.error(err) } } }) function deinit(cb) { var done = multicb() // cancel all live streams liveStreams.forEach(function (read) { read(true, done()) }) done(cb) } function listIssues(opts) { opts.type = 'issue' return pull( opts.project && !isMsgIdRange(opts) ? ( ssb.backlinks ? ssb.backlinks.read({ reverse: opts.reverse, live: opts.live, query: [{$filter: { value: {content: {type: opts.type}}, dest: opts.project, rts: optsToRange(opts) }}] }) : pull( ssb.links({ reverse: opts.reverse, live: opts.live, dest: opts.project, values: true, rel: 'project' }), pull.filter(function (msg) { return msg.value.content.type === opts.type }), optsToRangeFilter(opts) ) ) : ssb.messagesByType(opts), pull.unique('key'), pull.filter(function (msg) { return (!opts.project || opts.project == msg.value.content.project) && (!opts.author || opts.author == msg.value.author) }), paramap(function (msg, cb) { getIssue(msg.key, cb) }, 8), pull.filter(opts.open != null && function (pr) { return pr.open == opts.open }) ) } function editIssue(id, opts, cb) { var msg try { ssb.publish(issueSchemas.edit(id, opts), cb) } catch(e) { return cb(e) } } function closeIssue(id, cb) { var msg try { ssb.publish(issueSchemas.close(id), cb) } catch(e) { return cb(e) } } function reopenIssue(id, cb) { var msg try { msg = issueSchemas.reopen(id) } catch(e) { return cb(e) } ssb.publish(msg, cb) } function newIssue(opts, cb) { var msg try { msg = issueSchemas.new(opts.project, opts.title, opts.text) } catch(e) { return cb(e) } ssb.publish(msg, function (err, msg) { if (err) return cb(err) getIssue(msg.key, cb) }) } return { deinit: deinit, get: getIssue, list: listIssues, new: newIssue, edit: editIssue, close: closeIssue, reopen: reopenIssue, getMention: getMention, isStatusChanged: isStatusChanged } }