Files: d2b5693b01842e3376dae3c3cf5c579e03e490c4 / plugins / replicate / legacy.js
10086 bytesRaw
1 | var pull = require('pull-stream') |
2 | var pullNext = require('pull-next') |
3 | var para = require('pull-paramap') |
4 | var Notify = require('pull-notify') |
5 | var Cat = require('pull-cat') |
6 | var Debounce = require('observ-debounce') |
7 | var deepEqual = require('deep-equal') |
8 | var Obv = require('obv') |
9 | var isFeed = require('ssb-ref').isFeed |
10 | var Pushable = require('pull-pushable') |
11 | var detectSync = require('../../lib/detect-sync') |
12 | |
13 | // compatibility function for old implementations of `latestSequence` |
14 | function toSeq (s) { |
15 | return 'number' === typeof s ? s : s.sequence |
16 | } |
17 | |
18 | function last (a) { return a[a.length - 1] } |
19 | |
20 | // if one of these shows up in a replication stream, the stream is dead |
21 | var streamErrors = { |
22 | 'unexpected end of parent stream': true, // stream closed okay |
23 | 'unexpected hangup': true, // stream closed probably okay |
24 | 'read EHOSTUNREACH': true, |
25 | 'read ECONNRESET': true, |
26 | 'read ENETDOWN': true, |
27 | 'read ETIMEDOUT': true, |
28 | 'write ECONNRESET': true, |
29 | 'write EPIPE': true, |
30 | 'stream is closed': true, // rpc method called after stream ended |
31 | } |
32 | |
33 | module.exports = function (ssbServer, notify, config) { |
34 | var debounce = Debounce(200) |
35 | var listeners = {} |
36 | var newPeers = Notify() |
37 | |
38 | var start = null |
39 | var count = 0 |
40 | var rate = 0 |
41 | var toSend = {} |
42 | var peerHas = {} |
43 | var pendingFeedsForPeer = {} |
44 | var lastProgress = null |
45 | |
46 | var replicate = {} |
47 | |
48 | function request (id, unfollow) { |
49 | if(unfollow === false) { |
50 | if(replicate[id]) { |
51 | delete replicate[id] |
52 | newPeers({id:id, sequence: -1}) |
53 | } |
54 | } |
55 | else if(!replicate[id]) { |
56 | replicate[id] = true |
57 | newPeers({id:id, sequence: toSend[id] || 0}) |
58 | } |
59 | } |
60 | |
61 | ssbServer.getVectorClock(function (err, clock) { |
62 | if(err) throw err |
63 | toSend = clock |
64 | }) |
65 | |
66 | ssbServer.post(function (msg) { |
67 | //this should be part of ssb.getVectorClock |
68 | toSend[msg.value.author] = msg.value.sequence |
69 | debounce.set() |
70 | }) |
71 | |
72 | debounce(function () { |
73 | // only list loaded feeds once we know about all of them! |
74 | var feeds = Object.keys(toSend).length |
75 | var legacyProgress = 0 |
76 | var legacyTotal = 0 |
77 | |
78 | var pendingFeeds = new Set() |
79 | var pendingPeers = {} |
80 | var legacyToRecv = {} |
81 | |
82 | Object.keys(pendingFeedsForPeer).forEach(function (peerId) { |
83 | if (pendingFeedsForPeer[peerId] && pendingFeedsForPeer[peerId].size) { |
84 | Object.keys(toSend).forEach(function (feedId) { |
85 | if (peerHas[peerId] && peerHas[peerId][feedId]) { |
86 | if (peerHas[peerId][feedId] > toSend[feedId]) { |
87 | pendingFeeds.add(feedId) |
88 | } |
89 | } |
90 | }) |
91 | pendingPeers[peerId] = pendingFeedsForPeer[peerId].size |
92 | } |
93 | }) |
94 | |
95 | for (var k in toSend) { |
96 | legacyProgress += toSend[k] |
97 | } |
98 | |
99 | for (var id in peerHas) { |
100 | for (var k in peerHas[id]) { |
101 | legacyToRecv[k] = Math.max(peerHas[id][k], legacyToRecv[k] || 0) |
102 | } |
103 | } |
104 | |
105 | for (var k in legacyToRecv) { |
106 | if (toSend[k] !== null) { |
107 | legacyTotal += legacyToRecv[k] |
108 | } |
109 | } |
110 | |
111 | var progress = { |
112 | id: ssbServer.id, |
113 | rate, // rate of messages written to ssbServer |
114 | feeds, // total number of feeds we want to replicate |
115 | pendingPeers, // number of pending feeds per peer |
116 | incompleteFeeds: pendingFeeds.size, // number of feeds with pending messages to download |
117 | |
118 | // LEGACY: Preserving old api. Needed for test/random.js to pass |
119 | progress: legacyProgress, |
120 | total: legacyTotal |
121 | } |
122 | |
123 | if (!deepEqual(progress, lastProgress)) { |
124 | lastProgress = progress |
125 | notify(progress) |
126 | } |
127 | }) |
128 | |
129 | pull( |
130 | ssbServer.createLogStream({old: false, live: true, sync: false, keys: false}), |
131 | pull.drain(function (e) { |
132 | //track writes per second, mainly used for developing initial sync. |
133 | if(!start) start = Date.now() |
134 | var time = (Date.now() - start)/1000 |
135 | if(time >= 1) { |
136 | rate = count / time |
137 | start = Date.now() |
138 | count = 0 |
139 | } |
140 | var pushable = listeners[e.author] |
141 | |
142 | if(pushable && pushable.sequence == e.sequence) { |
143 | pushable.sequence ++ |
144 | pushable.forEach(function (p) { |
145 | p.push(e) |
146 | }) |
147 | } |
148 | count ++ |
149 | }) |
150 | ) |
151 | |
152 | var chs = ssbServer.createHistoryStream |
153 | |
154 | ssbServer.createHistoryStream.hook(function (fn, args) { |
155 | var upto = args[0] || {} |
156 | var seq = upto.sequence || upto.seq |
157 | if(this._emit) this._emit('call:createHistoryStream', args[0]) |
158 | |
159 | //if we are calling this locally, skip cleverness |
160 | if(this===ssbServer) return fn.call(this, upto) |
161 | |
162 | // keep track of each requested value, per feed / per peer. |
163 | peerHas[this.id] = peerHas[this.id] || {} |
164 | peerHas[this.id][upto.id] = seq - 1 // peer requests +1 from actual last seq |
165 | |
166 | debounce.set() |
167 | |
168 | //handle creating lots of history streams efficiently. |
169 | //maybe this could be optimized in map-filter-reduce queries instead? |
170 | if(toSend[upto.id] == null || (seq > toSend[upto.id])) { |
171 | upto.old = false |
172 | if(!upto.live) return pull.empty() |
173 | var pushable = listeners[upto.id] = listeners[upto.id] || [] |
174 | var p = Pushable(function () { |
175 | var i = pushable.indexOf(p) |
176 | pushable.splice(i, 1) |
177 | }) |
178 | pushable.push(p) |
179 | pushable.sequence = seq |
180 | return p |
181 | } |
182 | return fn.call(this, upto) |
183 | }) |
184 | |
185 | // collect the IDs of feeds we want to request |
186 | var opts = config.replication || {} |
187 | opts.hops = opts.hops || 3 |
188 | opts.dunbar = opts.dunbar || 150 |
189 | opts.live = true |
190 | opts.meta = true |
191 | |
192 | //XXX policy about replicating specific peers should be outside |
193 | //of this plugin. |
194 | function localPeers () { |
195 | if(!ssbServer.gossip) return |
196 | ssbServer.gossip.peers().forEach(function (e) { |
197 | if (e.source === 'local') |
198 | request(e.key) |
199 | }) |
200 | } |
201 | |
202 | //also request local peers. |
203 | if (ssbServer.gossip) { |
204 | // if we have the gossip plugin active, then include new local peers |
205 | // so that you can put a name to someone on your local network. |
206 | var int = setInterval(localPeers, 1000) |
207 | if(int.unref) int.unref() |
208 | localPeers() |
209 | } |
210 | //XXX ^ |
211 | |
212 | function upto (opts) { |
213 | opts = opts || {} |
214 | var ary = Object.keys(replicate).map(function (k) { |
215 | return { id: k, sequence: toSend[k]||0 } |
216 | }) |
217 | if(opts.live) |
218 | return Cat([ |
219 | pull.values(ary), |
220 | pull.once({sync: true}), |
221 | newPeers.listen() |
222 | ]) |
223 | |
224 | return pull.values(ary) |
225 | } |
226 | |
227 | ssbServer.on('rpc:connect', function(rpc) { |
228 | // this is the cli client, just ignore. |
229 | if(rpc.id === ssbServer.id) return |
230 | if (!ssbServer.ready()) return |
231 | |
232 | var errorsSeen = {} |
233 | //check for local peers, or manual connections. |
234 | localPeers() |
235 | |
236 | var drain |
237 | |
238 | function replicate(upto, cb) { |
239 | pendingFeedsForPeer[rpc.id] = pendingFeedsForPeer[rpc.id] || new Set() |
240 | pendingFeedsForPeer[rpc.id].add(upto.id) |
241 | |
242 | debounce.set() |
243 | |
244 | var sync = false |
245 | |
246 | pull( |
247 | rpc.createHistoryStream({ |
248 | id: upto.id, |
249 | seq: (upto.sequence || upto.seq || 0) + 1, |
250 | live: true, |
251 | keys: false |
252 | }), |
253 | |
254 | pull.through(detectSync(rpc.id, upto, toSend, peerHas, function () { |
255 | sync = true |
256 | if (pendingFeedsForPeer[rpc.id]) { |
257 | // this peer has finished syncing, remove from progress |
258 | pendingFeedsForPeer[rpc.id].delete(upto.id) |
259 | debounce.set() |
260 | } |
261 | })), |
262 | |
263 | ssbServer.createWriteStream(function (err) { |
264 | if(err && !(err.message in errorsSeen)) { |
265 | errorsSeen[err.message] = true |
266 | if(err.message in streamErrors) { |
267 | cb && cb(err) |
268 | if(err.message === 'unexpected end of parent stream') { |
269 | if (err instanceof Error) { |
270 | // stream closed okay locally |
271 | } else { |
272 | // pre-emptively destroy the stream, assuming the other |
273 | // end is packet-stream 2.0.0 sending end messages. |
274 | rpc.close(err) |
275 | } |
276 | } |
277 | } else { |
278 | console.error( |
279 | 'Error replicating with ' + rpc.id + ':\n ', |
280 | err.stack |
281 | ) |
282 | } |
283 | } |
284 | |
285 | // if stream closes, remove from pending progress |
286 | if (pendingFeedsForPeer[rpc.id]) { |
287 | pendingFeedsForPeer[rpc.id].delete(upto.id) |
288 | debounce.set() |
289 | } |
290 | }) |
291 | ) |
292 | } |
293 | |
294 | var replicate_self = false |
295 | //if replicate.fallback is enabled |
296 | //then wait for the fallback event before |
297 | //starting to replicate by this strategy. |
298 | if(config.replicate && config.replicate.fallback) |
299 | rpc.once('fallback:replicate', fallback) |
300 | else |
301 | fallback() |
302 | |
303 | function fallback () { |
304 | //if we are not configured to use EBT, then fallback to createHistoryStream |
305 | if(replicate_self) return |
306 | replicate_self = true |
307 | replicate({id: ssbServer.id, sequence: toSend[ssbServer.id] || 0}) |
308 | } |
309 | |
310 | //trigger this if ebt.replicate fails... |
311 | rpc.once('call:createHistoryStream', next) |
312 | |
313 | var started = false |
314 | function next () { |
315 | if(started) return |
316 | started = true |
317 | ssbServer.emit('replicate:start', rpc) |
318 | |
319 | rpc.on('closed', function () { |
320 | ssbServer.emit('replicate:finish', toSend) |
321 | |
322 | // if we disconnect from a peer, remove it from sync progress |
323 | delete pendingFeedsForPeer[rpc.id] |
324 | debounce.set() |
325 | }) |
326 | |
327 | //make sure we wait until the clock is loaded |
328 | pull( |
329 | upto({live: opts.live}), |
330 | drain = pull.drain(function (upto) { |
331 | if(upto.sync) return |
332 | if(!isFeed(upto.id)) throw new Error('expected feed!') |
333 | if(!Number.isInteger(upto.sequence)) throw new Error('expected sequence!') |
334 | |
335 | if(upto.id == ssbServer.id && replicate_self) return replicate_self = true |
336 | replicate(upto, function (err) { |
337 | drain.abort() |
338 | }) |
339 | }, function (err) { |
340 | if(err && err !== true) |
341 | ssbServer.emit('log:error', ['replication', rpc.id, 'error', err]) |
342 | }) |
343 | ) |
344 | |
345 | } |
346 | }) |
347 | |
348 | return { |
349 | request: request, |
350 | upto: upto, |
351 | changes: notify.listen |
352 | } |
353 | } |
354 | |
355 |
Built with git-ssb-web