git ssb

1+

punkmonk.termux / mvd



forked from ev / mvd

Tree: 90dfad001a91b412a745a9cad0dccf4a3ef49dc2

Files: 90dfad001a91b412a745a9cad0dccf4a3ef49dc2 / plugins / replicate / legacy.js

10086 bytesRaw
1var pull = require('pull-stream')
2var pullNext = require('pull-next')
3var para = require('pull-paramap')
4var Notify = require('pull-notify')
5var Cat = require('pull-cat')
6var Debounce = require('observ-debounce')
7var deepEqual = require('deep-equal')
8var Obv = require('obv')
9var isFeed = require('ssb-ref').isFeed
10var Pushable = require('pull-pushable')
11var detectSync = require('../../lib/detect-sync')
12
13// compatibility function for old implementations of `latestSequence`
14function toSeq (s) {
15 return 'number' === typeof s ? s : s.sequence
16}
17
18function last (a) { return a[a.length - 1] }
19
20// if one of these shows up in a replication stream, the stream is dead
21var streamErrors = {
22 'unexpected end of parent stream': true, // stream closed okay
23 'unexpected hangup': true, // stream closed probably okay
24 'read EHOSTUNREACH': true,
25 'read ECONNRESET': true,
26 'read ENETDOWN': true,
27 'read ETIMEDOUT': true,
28 'write ECONNRESET': true,
29 'write EPIPE': true,
30 'stream is closed': true, // rpc method called after stream ended
31}
32
33module.exports = function (ssbServer, notify, config) {
34 var debounce = Debounce(200)
35 var listeners = {}
36 var newPeers = Notify()
37
38 var start = null
39 var count = 0
40 var rate = 0
41 var toSend = {}
42 var peerHas = {}
43 var pendingFeedsForPeer = {}
44 var lastProgress = null
45
46 var replicate = {}
47
48 function request (id, unfollow) {
49 if(unfollow === false) {
50 if(replicate[id]) {
51 delete replicate[id]
52 newPeers({id:id, sequence: -1})
53 }
54 }
55 else if(!replicate[id]) {
56 replicate[id] = true
57 newPeers({id:id, sequence: toSend[id] || 0})
58 }
59 }
60
61 ssbServer.getVectorClock(function (err, clock) {
62 if(err) throw err
63 toSend = clock
64 })
65
66 ssbServer.post(function (msg) {
67 //this should be part of ssb.getVectorClock
68 toSend[msg.value.author] = msg.value.sequence
69 debounce.set()
70 })
71
72 debounce(function () {
73 // only list loaded feeds once we know about all of them!
74 var feeds = Object.keys(toSend).length
75 var legacyProgress = 0
76 var legacyTotal = 0
77
78 var pendingFeeds = new Set()
79 var pendingPeers = {}
80 var legacyToRecv = {}
81
82 Object.keys(pendingFeedsForPeer).forEach(function (peerId) {
83 if (pendingFeedsForPeer[peerId] && pendingFeedsForPeer[peerId].size) {
84 Object.keys(toSend).forEach(function (feedId) {
85 if (peerHas[peerId] && peerHas[peerId][feedId]) {
86 if (peerHas[peerId][feedId] > toSend[feedId]) {
87 pendingFeeds.add(feedId)
88 }
89 }
90 })
91 pendingPeers[peerId] = pendingFeedsForPeer[peerId].size
92 }
93 })
94
95 for (var k in toSend) {
96 legacyProgress += toSend[k]
97 }
98
99 for (var id in peerHas) {
100 for (var k in peerHas[id]) {
101 legacyToRecv[k] = Math.max(peerHas[id][k], legacyToRecv[k] || 0)
102 }
103 }
104
105 for (var k in legacyToRecv) {
106 if (toSend[k] !== null) {
107 legacyTotal += legacyToRecv[k]
108 }
109 }
110
111 var progress = {
112 id: ssbServer.id,
113 rate, // rate of messages written to ssbServer
114 feeds, // total number of feeds we want to replicate
115 pendingPeers, // number of pending feeds per peer
116 incompleteFeeds: pendingFeeds.size, // number of feeds with pending messages to download
117
118 // LEGACY: Preserving old api. Needed for test/random.js to pass
119 progress: legacyProgress,
120 total: legacyTotal
121 }
122
123 if (!deepEqual(progress, lastProgress)) {
124 lastProgress = progress
125 notify(progress)
126 }
127 })
128
129 pull(
130 ssbServer.createLogStream({old: false, live: true, sync: false, keys: false}),
131 pull.drain(function (e) {
132 //track writes per second, mainly used for developing initial sync.
133 if(!start) start = Date.now()
134 var time = (Date.now() - start)/1000
135 if(time >= 1) {
136 rate = count / time
137 start = Date.now()
138 count = 0
139 }
140 var pushable = listeners[e.author]
141
142 if(pushable && pushable.sequence == e.sequence) {
143 pushable.sequence ++
144 pushable.forEach(function (p) {
145 p.push(e)
146 })
147 }
148 count ++
149 })
150 )
151
152 var chs = ssbServer.createHistoryStream
153
154 ssbServer.createHistoryStream.hook(function (fn, args) {
155 var upto = args[0] || {}
156 var seq = upto.sequence || upto.seq
157 if(this._emit) this._emit('call:createHistoryStream', args[0])
158
159 //if we are calling this locally, skip cleverness
160 if(this===ssbServer) return fn.call(this, upto)
161
162 // keep track of each requested value, per feed / per peer.
163 peerHas[this.id] = peerHas[this.id] || {}
164 peerHas[this.id][upto.id] = seq - 1 // peer requests +1 from actual last seq
165
166 debounce.set()
167
168 //handle creating lots of history streams efficiently.
169 //maybe this could be optimized in map-filter-reduce queries instead?
170 if(toSend[upto.id] == null || (seq > toSend[upto.id])) {
171 upto.old = false
172 if(!upto.live) return pull.empty()
173 var pushable = listeners[upto.id] = listeners[upto.id] || []
174 var p = Pushable(function () {
175 var i = pushable.indexOf(p)
176 pushable.splice(i, 1)
177 })
178 pushable.push(p)
179 pushable.sequence = seq
180 return p
181 }
182 return fn.call(this, upto)
183 })
184
185 // collect the IDs of feeds we want to request
186 var opts = config.replication || {}
187 opts.hops = opts.hops || 3
188 opts.dunbar = opts.dunbar || 150
189 opts.live = true
190 opts.meta = true
191
192 //XXX policy about replicating specific peers should be outside
193 //of this plugin.
194 function localPeers () {
195 if(!ssbServer.gossip) return
196 ssbServer.gossip.peers().forEach(function (e) {
197 if (e.source === 'local')
198 request(e.key)
199 })
200 }
201
202 //also request local peers.
203 if (ssbServer.gossip) {
204 // if we have the gossip plugin active, then include new local peers
205 // so that you can put a name to someone on your local network.
206 var int = setInterval(localPeers, 1000)
207 if(int.unref) int.unref()
208 localPeers()
209 }
210 //XXX ^
211
212 function upto (opts) {
213 opts = opts || {}
214 var ary = Object.keys(replicate).map(function (k) {
215 return { id: k, sequence: toSend[k]||0 }
216 })
217 if(opts.live)
218 return Cat([
219 pull.values(ary),
220 pull.once({sync: true}),
221 newPeers.listen()
222 ])
223
224 return pull.values(ary)
225 }
226
227 ssbServer.on('rpc:connect', function(rpc) {
228 // this is the cli client, just ignore.
229 if(rpc.id === ssbServer.id) return
230 if (!ssbServer.ready()) return
231
232 var errorsSeen = {}
233 //check for local peers, or manual connections.
234 localPeers()
235
236 var drain
237
238 function replicate(upto, cb) {
239 pendingFeedsForPeer[rpc.id] = pendingFeedsForPeer[rpc.id] || new Set()
240 pendingFeedsForPeer[rpc.id].add(upto.id)
241
242 debounce.set()
243
244 var sync = false
245
246 pull(
247 rpc.createHistoryStream({
248 id: upto.id,
249 seq: (upto.sequence || upto.seq || 0) + 1,
250 live: true,
251 keys: false
252 }),
253
254 pull.through(detectSync(rpc.id, upto, toSend, peerHas, function () {
255 sync = true
256 if (pendingFeedsForPeer[rpc.id]) {
257 // this peer has finished syncing, remove from progress
258 pendingFeedsForPeer[rpc.id].delete(upto.id)
259 debounce.set()
260 }
261 })),
262
263 ssbServer.createWriteStream(function (err) {
264 if(err && !(err.message in errorsSeen)) {
265 errorsSeen[err.message] = true
266 if(err.message in streamErrors) {
267 cb && cb(err)
268 if(err.message === 'unexpected end of parent stream') {
269 if (err instanceof Error) {
270 // stream closed okay locally
271 } else {
272 // pre-emptively destroy the stream, assuming the other
273 // end is packet-stream 2.0.0 sending end messages.
274 rpc.close(err)
275 }
276 }
277 } else {
278 console.error(
279 'Error replicating with ' + rpc.id + ':\n ',
280 err.stack
281 )
282 }
283 }
284
285 // if stream closes, remove from pending progress
286 if (pendingFeedsForPeer[rpc.id]) {
287 pendingFeedsForPeer[rpc.id].delete(upto.id)
288 debounce.set()
289 }
290 })
291 )
292 }
293
294 var replicate_self = false
295 //if replicate.fallback is enabled
296 //then wait for the fallback event before
297 //starting to replicate by this strategy.
298 if(config.replicate && config.replicate.fallback)
299 rpc.once('fallback:replicate', fallback)
300 else
301 fallback()
302
303 function fallback () {
304 //if we are not configured to use EBT, then fallback to createHistoryStream
305 if(replicate_self) return
306 replicate_self = true
307 replicate({id: ssbServer.id, sequence: toSend[ssbServer.id] || 0})
308 }
309
310 //trigger this if ebt.replicate fails...
311 rpc.once('call:createHistoryStream', next)
312
313 var started = false
314 function next () {
315 if(started) return
316 started = true
317 ssbServer.emit('replicate:start', rpc)
318
319 rpc.on('closed', function () {
320 ssbServer.emit('replicate:finish', toSend)
321
322 // if we disconnect from a peer, remove it from sync progress
323 delete pendingFeedsForPeer[rpc.id]
324 debounce.set()
325 })
326
327 //make sure we wait until the clock is loaded
328 pull(
329 upto({live: opts.live}),
330 drain = pull.drain(function (upto) {
331 if(upto.sync) return
332 if(!isFeed(upto.id)) throw new Error('expected feed!')
333 if(!Number.isInteger(upto.sequence)) throw new Error('expected sequence!')
334
335 if(upto.id == ssbServer.id && replicate_self) return replicate_self = true
336 replicate(upto, function (err) {
337 drain.abort()
338 })
339 }, function (err) {
340 if(err && err !== true)
341 ssbServer.emit('log:error', ['replication', rpc.id, 'error', err])
342 })
343 )
344
345 }
346 })
347
348 return {
349 request: request,
350 upto: upto,
351 changes: notify.listen
352 }
353}
354
355

Built with git-ssb-web