var pull = require('pull-stream') var paramap = require('pull-paramap') var asyncMemo = require('asyncmemo') var issueSchemas = require('./lib/schemas') var multicb = require('multicb') function Cache(fn, ssb) { return asyncMemo(fn) return function (key, cb) { ac.get(key, cb) } } function isUpdateValid(issue, msg) { return msg.value.author == issue.author || msg.value.author == issue.projectAuthor } function truncate(str, len) { return str.length > len ? str.substr(0, len) + '...' : str } exports.name = 'issues' exports.manifest = { get: 'async', list: 'source', new: 'async', edit: 'async', close: 'async', reopen: 'async', getMention: 'sync', isStatusChanged: 'sync' } exports.schemas = issueSchemas function isStatusChanged(msg, issue) { var mention = getMention(msg, issue) return mention ? mention.open : null } function getMention(msg, issue) { var c = msg.value.content if (msg.key == issue.id || c.issue == issue.id || c.link == issue.id) if (c.open != null) return c if (c.issues) { var mention for (var i = 0; i < c.issues.length; i++) { mention = getMention({value: { timestamp: msg.value.timestamp, author: msg.value.author, content: c.issues[i] }}, issue) if (mention) return mention } } } exports.init = function (ssb) { var ssbGet = asyncMemo(ssb.get) var liveStreams = [] var getIssue = asyncMemo(function (id, cb) { var issue = {} var issueMsg ssbGet(id, function (err, msg) { msg = {key: id, value: msg} if (err) return cb(err) issueMsg = msg issue.id = msg.key issue.msg = msg issue.author = msg.value.author var c = msg.value.content issue.project = c.project issue.text = String(c.text) issue.created_at = issue.updated_at = msg.value.timestamp if (c.project) ssbGet(c.project, gotProjectMsg) else getLinks() }) function gotProjectMsg(err, msg) { if (err) return cb(err) issue.projectAuthor = msg.author getLinks() } function getLinks() { var now = Date.now() // compute the result from the past data pull( ssb.links({dest: id, reverse: true, values: true, old: true, live: false, sync: false}), pull.drain(onOldMsg, onOldEnd) ) // keep the results up-to-date in the future var read = ssb.links({dest: id, values: true, old: false, live: true, sync: false}) liveStreams.push(read) pull( read, pull.drain(onNewMsg, onNewEnd) ) } function onOldMsg(msg) { if (!msg.value || !isUpdateValid(issue, msg)) return var c = msg.value.content // handle updates to issue if (msg.key == id || c.issue == id || c.link == id) { if (c.open != null && issue.open == null) issue.open = c.open if (c.title != null && issue.title == null) issue.title = c.title if (msg.value.timestamp > issue.updated_at) issue.updated_at = msg.value.timestamp } // handle updates via mention if (c.issues) { for (var i = 0; i < c.issues.length; i++) onOldMsg({value: { timestamp: msg.value.timestamp, author: msg.value.author, content: c.issues[i] }}) } checkReady() } function onNewMsg(msg) { if (!msg.value || !isUpdateValid(issue, msg)) return var c = msg.value.content // handle updates to issue if (msg.key == id || c.issue == id || c.link == id) { if (c.open != null) issue.open = c.open if (c.title != null) issue.title = c.title if (msg.value.timestamp > issue.updated_at) issue.updated_at = msg.value.timestamp } // handle updates via mention if (c.issues) { for (var i = 0; i < c.issues.length; i++) onNewMsg({value: { timestamp: msg.value.timestamp, author: msg.value.author, content: c.issues[i] }}) } } function checkReady() { // call back once all the issue properties are set if (issue.open != null && issue.title != null) { var _cb = cb delete cb _cb(null, issue) } } function onOldEnd(err) { if (err) { if (cb) cb(err) else console.error(err) return } // process the root message last onOldMsg(issueMsg) // if callback hasn't been called yet, the issue is missing a field if (cb) { if (issue.open == null) issue.open = true if (issue.title == null) issue.title = truncate(issue.text, 40) || issue.id checkReady() } } function onNewEnd(err) { if (err) { if (cb) cb(err) else console.error(err) } } }) function deinit(cb) { var done = multicb() // cancel all live streams liveStreams.forEach(function (read) { read(true, done()) }) done(cb) } function listIssues(opts) { opts.type = 'issue' return pull( // TODO: use links2 for this ssb.messagesByType(opts), pull.unique('key'), pull.filter(function (msg) { return (!opts.project || opts.project == msg.value.content.project) && (!opts.author || opts.author == msg.value.author) }), paramap(function (msg, cb) { getIssue(msg.key, cb) }, 8), pull.filter(opts.open != null && function (pr) { return pr.open == opts.open }) ) } function editIssue(id, opts, cb) { var msg try { ssb.publish(issueSchemas.edit(id, opts), cb) } catch(e) { return cb(e) } } function closeIssue(id, cb) { var msg try { ssb.publish(issueSchemas.close(id), cb) } catch(e) { return cb(e) } } function reopenIssue(id, cb) { var msg try { msg = issueSchemas.reopen(id) } catch(e) { return cb(e) } ssb.publish(msg, cb) } function newIssue(opts, cb) { var msg try { msg = issueSchemas.new(opts.project, opts.title, opts.text) } catch(e) { return cb(e) } ssb.publish(msg, function (err, msg) { if (err) return cb(err) getIssue(msg.key, cb) }) } return { deinit: deinit, get: getIssue, list: listIssues, new: newIssue, edit: editIssue, close: closeIssue, reopen: reopenIssue, getMention: getMention, isStatusChanged: isStatusChanged } }