plugins/replicate.jsView |
---|
| 1 … | +'use strict' |
| 2 … | +var pull = require('pull-stream') |
| 3 … | +var pullNext = require('pull-next') |
| 4 … | +var para = require('pull-paramap') |
| 5 … | +var Notify = require('pull-notify') |
| 6 … | +var Cat = require('pull-cat') |
| 7 … | +var Debounce = require('observ-debounce') |
| 8 … | +var mdm = require('mdmanifest') |
| 9 … | +var apidoc = require('../lib/apidocs').replicate |
| 10 … | +var deepEqual = require('deep-equal') |
| 11 … | + |
| 12 … | +var Pushable = require('pull-pushable') |
| 13 … | + |
| 14 … | + |
| 15 … | +function toSeq (s) { |
| 16 … | + return 'number' === typeof s ? s : s.sequence |
| 17 … | +} |
| 18 … | + |
| 19 … | + |
| 20 … | +var streamErrors = { |
| 21 … | + 'unexpected end of parent stream': true, |
| 22 … | + 'unexpected hangup': true, |
| 23 … | + 'read EHOSTUNREACH': true, |
| 24 … | + 'read ECONNRESET': true, |
| 25 … | + 'read ENETDOWN': true, |
| 26 … | + 'read ETIMEDOUT': true, |
| 27 … | + 'write ECONNRESET': true, |
| 28 … | + 'write EPIPE': true, |
| 29 … | + 'stream is closed': true |
| 30 … | +} |
| 31 … | + |
| 32 … | +module.exports = { |
| 33 … | + name: 'replicate', |
| 34 … | + version: '2.0.0', |
| 35 … | + manifest: mdm.manifest(apidoc), |
| 36 … | + |
| 37 … | + init: function (sbot, config) { |
| 38 … | + var debounce = Debounce(200) |
| 39 … | + var listeners = {} |
| 40 … | + var notify = Notify() |
| 41 … | + var newPeer = Notify() |
| 42 … | + |
| 43 … | + var start = null |
| 44 … | + var count = 0 |
| 45 … | + var rate = 0 |
| 46 … | + var loadedFriends = false |
| 47 … | + var toSend = {} |
| 48 … | + var peerHas = {} |
| 49 … | + var pendingFeedsForPeer = {} |
| 50 … | + var lastProgress = null |
| 51 … | + |
| 52 … | + debounce(function () { |
| 53 … | + |
| 54 … | + var feeds = loadedFriends ? Object.keys(toSend).length : null |
| 55 … | + var legacyProgress = 0 |
| 56 … | + var legacyTotal = 0 |
| 57 … | + |
| 58 … | + var pendingFeeds = new Set() |
| 59 … | + var pendingPeers = {} |
| 60 … | + var legacyToRecv = {} |
| 61 … | + |
| 62 … | + Object.keys(pendingFeedsForPeer).forEach(function (peerId) { |
| 63 … | + if (pendingFeedsForPeer[peerId] && pendingFeedsForPeer[peerId].size) { |
| 64 … | + Object.keys(toSend).forEach(function (feedId) { |
| 65 … | + if (peerHas[peerId] && peerHas[peerId][feedId]) { |
| 66 … | + if (peerHas[peerId][feedId] > toSend[feedId]) { |
| 67 … | + pendingFeeds.add(feedId) |
| 68 … | + } |
| 69 … | + } |
| 70 … | + }) |
| 71 … | + pendingPeers[peerId] = pendingFeedsForPeer[peerId].size |
| 72 … | + } |
| 73 … | + }) |
| 74 … | + |
| 75 … | + for (var k in toSend) { |
| 76 … | + legacyProgress += toSend[k] |
| 77 … | + } |
| 78 … | + |
| 79 … | + for (var id in peerHas) { |
| 80 … | + for (var k in peerHas[id]) { |
| 81 … | + legacyToRecv[k] = Math.max(peerHas[id][k], legacyToRecv[k] || 0) |
| 82 … | + } |
| 83 … | + } |
| 84 … | + |
| 85 … | + for (var k in legacyToRecv) { |
| 86 … | + if (toSend[k] !== null) { |
| 87 … | + legacyTotal += legacyToRecv[k] |
| 88 … | + } |
| 89 … | + } |
| 90 … | + |
| 91 … | + var progress = { |
| 92 … | + id: sbot.id, |
| 93 … | + rate, |
| 94 … | + feeds, |
| 95 … | + pendingPeers, |
| 96 … | + incompleteFeeds: pendingFeeds.size, |
| 97 … | + |
| 98 … | + |
| 99 … | + progress: legacyProgress, |
| 100 … | + total: legacyTotal |
| 101 … | + } |
| 102 … | + |
| 103 … | + if (!deepEqual(progress, lastProgress)) { |
| 104 … | + lastProgress = progress |
| 105 … | + notify(progress) |
| 106 … | + } |
| 107 … | + }) |
| 108 … | + |
| 109 … | + pull( |
| 110 … | + sbot.createLogStream({old: false, live: true, sync: false, keys: false}), |
| 111 … | + pull.drain(function (e) { |
| 112 … | + |
| 113 … | + if(!start) start = Date.now() |
| 114 … | + var time = (Date.now() - start)/1000 |
| 115 … | + if(time >= 1) { |
| 116 … | + rate = count / time |
| 117 … | + start = Date.now() |
| 118 … | + count = 0 |
| 119 … | + } |
| 120 … | + var pushable = listeners[e.author] |
| 121 … | + |
| 122 … | + if(pushable && pushable.sequence == e.sequence) { |
| 123 … | + pushable.sequence ++ |
| 124 … | + pushable.forEach(function (p) { |
| 125 … | + p.push(e) |
| 126 … | + }) |
| 127 … | + } |
| 128 … | + count ++ |
| 129 … | + addPeer({id: e.author, sequence: e.sequence}) |
| 130 … | + }) |
| 131 … | + ) |
| 132 … | + |
| 133 … | + sbot.createHistoryStream.hook(function (fn, args) { |
| 134 … | + var upto = args[0] || {} |
| 135 … | + var seq = upto.sequence || upto.seq |
| 136 … | + |
| 137 … | + if(this._emit) this._emit('call:createHistoryStream', args[0]) |
| 138 … | + |
| 139 … | + |
| 140 … | + if(this===sbot) return fn.call(this, upto) |
| 141 … | + |
| 142 … | + |
| 143 … | + peerHas[this.id] = peerHas[this.id] || {} |
| 144 … | + peerHas[this.id][upto.id] = seq - 1 |
| 145 … | + |
| 146 … | + debounce.set() |
| 147 … | + |
| 148 … | + |
| 149 … | + |
| 150 … | + if(toSend[upto.id] == null || (seq > toSend[upto.id])) { |
| 151 … | + upto.old = false |
| 152 … | + if(!upto.live) return pull.empty() |
| 153 … | + var pushable = listeners[upto.id] = listeners[upto.id] || [] |
| 154 … | + var p = Pushable(function () { |
| 155 … | + var i = pushable.indexOf(p) |
| 156 … | + pushable.splice(i, 1) |
| 157 … | + }) |
| 158 … | + pushable.push(p) |
| 159 … | + pushable.sequence = seq |
| 160 … | + return p |
| 161 … | + } |
| 162 … | + return fn.call(this, upto) |
| 163 … | + }) |
| 164 … | + |
| 165 … | + |
| 166 … | + var opts = config.replication || {} |
| 167 … | + opts.hops = opts.hops || 3 |
| 168 … | + opts.dunbar = opts.dunbar || 150 |
| 169 … | + opts.live = true |
| 170 … | + opts.meta = true |
| 171 … | + |
| 172 … | + function localPeers () { |
| 173 … | + if(!sbot.gossip) return |
| 174 … | + sbot.gossip.peers().forEach(function (e) { |
| 175 … | + if (e.source === 'local' && toSend[e.key] == null) { |
| 176 … | + sbot.latestSequence(e.key, function (err, seq) { |
| 177 … | + addPeer({id: e.key, sequence: err ? 0 : toSeq(seq)}) |
| 178 … | + }) |
| 179 … | + } |
| 180 … | + }) |
| 181 … | + } |
| 182 … | + |
| 183 … | + |
| 184 … | + if (sbot.gossip) { |
| 185 … | + |
| 186 … | + |
| 187 … | + var int = setInterval(localPeers, 1000) |
| 188 … | + if(int.unref) int.unref() |
| 189 … | + localPeers() |
| 190 … | + } |
| 191 … | + |
| 192 … | + function friendsLoaded () { |
| 193 … | + loadedFriends = true |
| 194 … | + debounce.set() |
| 195 … | + } |
| 196 … | + |
| 197 … | + function addPeer (upto) { |
| 198 … | + if(upto.sync) return friendsLoaded() |
| 199 … | + if(!upto.id) return console.log('invalid', upto) |
| 200 … | + |
| 201 … | + if(toSend[upto.id] == null) { |
| 202 … | + toSend[upto.id] = Math.max(toSend[upto.id] || 0, upto.sequence || upto.seq || 0) |
| 203 … | + newPeer({id: upto.id, sequence: toSend[upto.id] , type: 'new' }) |
| 204 … | + } else { |
| 205 … | + toSend[upto.id] = Math.max(toSend[upto.id] || 0, upto.sequence || upto.seq || 0) |
| 206 … | + } |
| 207 … | + |
| 208 … | + debounce.set() |
| 209 … | + } |
| 210 … | + |
| 211 … | + |
| 212 … | + |
| 213 … | + pull( |
| 214 … | + sbot.friends.createFriendStream(opts), |
| 215 … | + |
| 216 … | + |
| 217 … | + para(function (data, cb) { |
| 218 … | + if(data.sync) return cb(null, data) |
| 219 … | + var id = data.id || data |
| 220 … | + sbot.latestSequence(id, function (err, seq) { |
| 221 … | + cb(null, { |
| 222 … | + id: id, sequence: err ? 0 : toSeq(seq) |
| 223 … | + }) |
| 224 … | + }) |
| 225 … | + }, 32), |
| 226 … | + pull.drain(addPeer, friendsLoaded) |
| 227 … | + ) |
| 228 … | + |
| 229 … | + function upto (opts) { |
| 230 … | + opts = opts || {} |
| 231 … | + var ary = Object.keys(toSend).map(function (k) { |
| 232 … | + return { id: k, sequence: toSend[k] } |
| 233 … | + }) |
| 234 … | + if(opts.live) |
| 235 … | + return Cat([pull.values(ary), pull.once({sync: true}), newPeer.listen()]) |
| 236 … | + |
| 237 … | + return pull.values(ary) |
| 238 … | + } |
| 239 … | + |
| 240 … | + sbot.on('rpc:connect', function(rpc) { |
| 241 … | + |
| 242 … | + if(rpc.id === sbot.id) return |
| 243 … | + |
| 244 … | + |
| 245 … | + localPeers() |
| 246 … | + |
| 247 … | + var drain |
| 248 … | + sbot.emit('replicate:start', rpc) |
| 249 … | + rpc.on('closed', function () { |
| 250 … | + sbot.emit('replicate:finish', toSend) |
| 251 … | + |
| 252 … | + |
| 253 … | + delete pendingFeedsForPeer[rpc.id] |
| 254 … | + debounce.set() |
| 255 … | + }) |
| 256 … | + var errorsSeen = {} |
| 257 … | + pull( |
| 258 … | + upto({live: opts.live}), |
| 259 … | + drain = pull.drain(function (upto) { |
| 260 … | + if(upto.sync) return |
| 261 … | + |
| 262 … | + |
| 263 … | + pendingFeedsForPeer[rpc.id] = pendingFeedsForPeer[rpc.id] || new Set() |
| 264 … | + pendingFeedsForPeer[rpc.id].add(upto.id) |
| 265 … | + debounce.set() |
| 266 … | + |
| 267 … | + pull( |
| 268 … | + rpc.createHistoryStream({ |
| 269 … | + id: upto.id, |
| 270 … | + seq: (upto.sequence || upto.seq || 0) + 1, |
| 271 … | + live: true, |
| 272 … | + keys: false |
| 273 … | + }), |
| 274 … | + |
| 275 … | + |
| 276 … | + pull.through(detectSync(rpc.id, upto, toSend, peerHas, function () { |
| 277 … | + if (pendingFeedsForPeer[rpc.id]) { |
| 278 … | + pendingFeedsForPeer[rpc.id].delete(upto.id) |
| 279 … | + debounce.set() |
| 280 … | + } |
| 281 … | + })), |
| 282 … | + |
| 283 … | + sbot.createWriteStream(function (err) { |
| 284 … | + if(err && !(err.message in errorsSeen)) { |
| 285 … | + errorsSeen[err.message] = true |
| 286 … | + if(err.message in streamErrors) { |
| 287 … | + drain.abort() |
| 288 … | + if(err.message === 'unexpected end of parent stream') { |
| 289 … | + if (err instanceof Error) { |
| 290 … | + |
| 291 … | + } else { |
| 292 … | + |
| 293 … | + |
| 294 … | + rpc.close(err) |
| 295 … | + } |
| 296 … | + } |
| 297 … | + } else { |
| 298 … | + console.error('Error replicating with ' + rpc.id + ':\n ', |
| 299 … | + err.stack) |
| 300 … | + } |
| 301 … | + } |
| 302 … | + |
| 303 … | + |
| 304 … | + if (pendingFeedsForPeer[rpc.id]) { |
| 305 … | + pendingFeedsForPeer[rpc.id].delete(upto.id) |
| 306 … | + debounce.set() |
| 307 … | + } |
| 308 … | + }) |
| 309 … | + ) |
| 310 … | + |
| 311 … | + }, function (err) { |
| 312 … | + if(err && err !== true) |
| 313 … | + sbot.emit('log:error', ['replication', rpc.id, 'error', err]) |
| 314 … | + }) |
| 315 … | + ) |
| 316 … | + }) |
| 317 … | + |
| 318 … | + return { |
| 319 … | + changes: notify.listen, |
| 320 … | + upto: upto |
| 321 … | + } |
| 322 … | + } |
| 323 … | +} |
| 324 … | + |
| 325 … | +function detectSync (peerId, upto, toSend, peerHas, onSync) { |
| 326 … | + |
| 327 … | + |
| 328 … | + |
| 329 … | + var sync = false |
| 330 … | + var last = (upto.sequence || upto.seq || 0) |
| 331 … | + |
| 332 … | + |
| 333 … | + setTimeout(function () { |
| 334 … | + if (peerHas[peerId] && peerHas[peerId][upto.id] != null) { |
| 335 … | + checkSync() |
| 336 … | + } else { |
| 337 … | + |
| 338 … | + |
| 339 … | + broadcastSync() |
| 340 … | + } |
| 341 … | + }, 500) |
| 342 … | + |
| 343 … | + return function (msg) { |
| 344 … | + if (msg.sync) { |
| 345 … | + |
| 346 … | + broadcastSync() |
| 347 … | + return false |
| 348 … | + } |
| 349 … | + |
| 350 … | + last = msg.sequence |
| 351 … | + checkSync() |
| 352 … | + return true |
| 353 … | + } |
| 354 … | + |
| 355 … | + function checkSync () { |
| 356 … | + if (!sync) { |
| 357 … | + var availableSeq = peerHas[peerId] && peerHas[peerId][upto.id] |
| 358 … | + if (availableSeq === last || availableSeq < toSend[upto.id]) { |
| 359 … | + |
| 360 … | + |
| 361 … | + broadcastSync() |
| 362 … | + } |
| 363 … | + } |
| 364 … | + } |
| 365 … | + |
| 366 … | + function broadcastSync () { |
| 367 … | + if (!sync) { |
| 368 … | + sync = true |
| 369 … | + onSync && onSync() |
| 370 … | + } |
| 371 … | + } |
| 372 … | +} |