var pull = require('pull-stream') var paramap = require('pull-paramap') var asyncMemo = require('asyncmemo') var Issues = require('ssb-issues') function readNext(fn) { var next return function (end, cb) { if (next) return next(end, cb) fn(function (err, _next) { if (err) return cb(err) next = _next next(null, cb) }) } } module.exports = { name: 'pull-requests', version: '1.0.0', manifest: { get: 'async', getRevs: 'async', list: 'source' }, schemas: require('./lib/schemas'), init: function (sbot) { var issues = Issues.init(sbot) var getMsg = asyncMemo(sbot.get) var getPullReq = asyncMemo(function (id, cb) { issues.get(id, function (err, issue) { if (err) return cb(err) var c = issue.msg.value.content issue.baseRepo = c.repo issue.baseBranch = c.branch issue.headRepo = c.head_repo issue.headBranch = c.head_branch cb(null, issue) }) }) function getBranchLastUpdate(repoId, branch, lte, cb) { // TODO: detect and skip updates from subsequent PR with same branch name getMsg(repoId, function (err, msg) { if (err) return cb(err) var repoAuthor = msg.author pull( sbot.links({ dest: repoId, source: repoAuthor, rel: 'repo', values: true, reverse: true }), pull.filter(function (link) { return link.value.content.type == 'git-update' }), pull.map(function (link) { return { timestamp: link.value.timestamp, rev: (link.value.content.refs || {})['refs/heads/' + branch] } }), pull.filter(function (update) { return update.rev && (lte ? update.timestamp <= lte : true) }), pull.take(1), pull.collect(function (err, links) { cb(err, links && links[0]) }) ) }) } function getRevs(prId, cb) { getPullReq(prId, function (err, pr) { if (err) return cb(err) // get the latest rev of the head branch before it was deleted or the // PR closed var lastTime = pr.open ? null : pr.updated_at getBranchLastUpdate(pr.headRepo, pr.headBranch, lastTime, function (err, headUpdate) { if (err) return cb(err) // get the rev of base when head was last updated getBranchLastUpdate(pr.baseRepo, pr.baseBranch, headUpdate.timestamp, function (err, baseUpdate) { if (err) return cb(err) cb(null, {base: baseUpdate.rev, head: headUpdate.rev}) }) }) }) } function listPullReqs(opts) { opts = opts || {} opts.type = 'pull-request' return pull( sbot.messagesByType(opts), pull.unique('key'), pull.filter(function (msg) { return (!opts.repo || opts.repo == msg.value.content.repo) && (!opts.headRepo || opts.headRepo == msg.value.content.head_repo) && (!opts.author || opts.author == msg.value.author) }), paramap(function (msg, cb) { getPullReq(msg.key, cb) }, 8), pull.filter(opts.open != null && function (pr) { return pr.open == opts.open }) ) } return { deinit: issues.deinit, get: getPullReq, getRevs: getRevs, list: listPullReqs } } }