git ssb

4+

Dominic / scuttlebot



Commit 95940b043d869f534221ddc08a73b412b5024be2

pass test/random.js at least, but not in one round trip, hmm...

Dominic Tarr committed on 5/24/2017, 11:26:16 AM
Parent: c1997c8a400cf294934a5e97a93d35b3705dec55

Files changed

plugins/replicate/index.jschanged
plugins/replicate/legacy.jschanged
plugins/replicate/index.jsView
@@ -3,24 +3,33 @@
33 var Legacy = require('./legacy')
44 var mdm = require('mdmanifest')
55 var apidoc = require('../../lib/apidocs').replicate
66 var Notify = require('pull-notify')
7+var pull = require('pull-stream')
78
89 module.exports = {
910 name: 'replicate',
1011 version: '2.0.0',
1112 manifest: mdm.manifest(apidoc),
1213 //replicate: replicate,
1314 init: function (sbot, config) {
14-
15+// var change = Obv()
1516 var notify = Notify(), upto
16- if(!config.replicate || config.replicate.legacy !== false)
17- upto = Legacy.call(this, sbot, notify, config)
17+ if(!config.replicate || config.replicate.legacy !== false) {
18+ var replicate = Legacy.call(this, sbot, notify, config)
1819
19- return {
20- changes: notify.listen,
21- upto: upto,
20+ pull(
21+ sbot.friends.createFriendStream({live: true, meta: false}),
22+ // filter out duplicates, and also keep track of what we expect to receive
23+ // lookup the latest sequence from each user
24+ // TODO: use paramap?
25+ pull.drain(function (id) {
26+ if(id.sync) return
27+ replicate.request(id)
28+ })
29+ )
30+
31+ return replicate
2232 }
2333 }
2434 }
2535
26-
plugins/replicate/legacy.jsView
@@ -4,8 +4,10 @@
44 var Notify = require('pull-notify')
55 var Cat = require('pull-cat')
66 var Debounce = require('observ-debounce')
77 var deepEqual = require('deep-equal')
8+var Obv = require('obv')
9+var isFeed = require('ssb-ref').isFeed
810
911 var Pushable = require('pull-pushable')
1012
1113 // compatibility function for old implementations of `latestSequence`
@@ -68,13 +70,34 @@
6870 var start = null
6971 var count = 0
7072 var rate = 0
7173 var loadedFriends = false
72- var toSend = {}
74+ var toSend// = {}
7375 var peerHas = {}
7476 var pendingFeedsForPeer = {}
7577 var lastProgress = null
7678
79+ var newPeers = Notify()
80+ var replicate = {}
81+
82+ function request (id) {
83+ if(!replicate[id]) {
84+ replicate[id] = true
85+ newPeers({id:id, sequence: toSend[id] || 0})
86+ }
87+ }
88+
89+ sbot.getVectorClock(function (err, clock) {
90+ if(err) throw err
91+ toSend = clock
92+ })
93+
94+ sbot.post(function (msg) {
95+ //this should be part of ssb.getVectorClock
96+ toSend[msg.value.author] = msg.value.sequence
97+ debounce.set()
98+ })
99+
77100 debounce(function () {
78101 // only list loaded feeds once we know about all of them!
79102 var feeds = loadedFriends ? Object.keys(toSend).length : null
80103 var legacyProgress = 0
@@ -152,16 +175,24 @@
152175 p.push(e)
153176 })
154177 }
155178 count ++
156- addPeer({id: e.author, sequence: e.sequence})
179+//remove, using getVectorClock handles this already.
180+// addPeer({id: e.author, sequence: e.sequence})
157181 })
158182 )
159183
184+ var chs = sbot.createHistoryStream
185+
186+// sbot.createHistoryStream = function (opts) {
187+// console.log("CREATE HISTORY STREAM")
188+// if(this._emit) this._emit('call:createHistoryStream', args[0])
189+// return chs.call(this, opts)
190+// }
191+//
160192 sbot.createHistoryStream.hook(function (fn, args) {
161193 var upto = args[0] || {}
162194 var seq = upto.sequence || upto.seq
163-
164195 if(this._emit) this._emit('call:createHistoryStream', args[0])
165196
166197 //if we are calling this locally, skip cleverness
167198 if(this===sbot) return fn.call(this, upto)
@@ -171,9 +202,9 @@
171202 peerHas[this.id][upto.id] = seq - 1 // peer requests +1 from actual last seq
172203
173204 debounce.set()
174205
175- //handle creating lots of histor streams efficiently.
206+ //handle creating lots of history streams efficiently.
176207 //maybe this could be optimized in map-filter-reduce queries instead?
177208 if(toSend[upto.id] == null || (seq > toSend[upto.id])) {
178209 upto.old = false
179210 if(!upto.live) return pull.empty()
@@ -195,8 +226,10 @@
195226 opts.dunbar = opts.dunbar || 150
196227 opts.live = true
197228 opts.meta = true
198229
230+ //XXX policy about replicating specific peers should be outside
231+ //of this plugin.
199232 function localPeers () {
200233 if(!sbot.gossip) return
201234 sbot.gossip.peers().forEach(function (e) {
202235 if (e.source === 'local' && toSend[e.key] == null) {
@@ -214,54 +247,59 @@
214247 var int = setInterval(localPeers, 1000)
215248 if(int.unref) int.unref()
216249 localPeers()
217250 }
251+ //XXX ^
218252
219253 function friendsLoaded () {
220254 loadedFriends = true
221255 debounce.set()
222256 }
223257
224- function addPeer (upto) {
225- if(upto.sync) return friendsLoaded()
226- if(!upto.id) return console.log('invalid', upto)
258+// function addPeer (upto) {
259+// if(upto.sync) return friendsLoaded()
260+// if(!upto.id) return console.log('invalid', upto)
261+//
262+// if(toSend[upto.id] == null) {
263+// toSend[upto.id] = Math.max(toSend[upto.id] || 0, upto.sequence || upto.seq || 0)
264+// newPeer({id: upto.id, sequence: toSend[upto.id] , type: 'new' })
265+// } else {
266+// toSend[upto.id] = Math.max(toSend[upto.id] || 0, upto.sequence || upto.seq || 0)
267+// }
268+//
269+// debounce.set()
270+// }
271+//
227272
228- if(toSend[upto.id] == null) {
229- toSend[upto.id] = Math.max(toSend[upto.id] || 0, upto.sequence || upto.seq || 0)
230- newPeer({id: upto.id, sequence: toSend[upto.id] , type: 'new' })
231- } else {
232- toSend[upto.id] = Math.max(toSend[upto.id] || 0, upto.sequence || upto.seq || 0)
233- }
234-
235- debounce.set()
236- }
237-
238-
239273 // create read-streams for the desired feeds
240- pull(
241- sbot.friends.createFriendStream(opts),
242- // filter out duplicates, and also keep track of what we expect to receive
243- // lookup the latest sequence from each user
244- // TODO: use paramap?
245- pull.asyncMap(function (data, cb) {
246- if(data.sync) return cb(null, data)
247- var id = data.id || data
248- sbot.latestSequence(id, function (err, seq) {
249- cb(null, {
250- id: id, sequence: err ? 0 : toSeq(seq)
251- })
252- })
253- }, 32),
254- pull.drain(addPeer, friendsLoaded)
255- )
274+// pull(
275+// sbot.friends.createFriendStream(opts),
276+// // filter out duplicates, and also keep track of what we expect to receive
277+// // lookup the latest sequence from each user
278+// // TODO: use paramap?
279+// pull.asyncMap(function (data, cb) {
280+// if(data.sync) return cb(null, data)
281+// var id = data.id || data
282+// sbot.latestSequence(id, function (err, seq) {
283+// cb(null, {
284+// id: id, sequence: err ? 0 : toSeq(seq)
285+// })
286+// })
287+// }, 32),
288+// pull.drain(addPeer, friendsLoaded)
289+// )
256290
257291 function upto (opts) {
258292 opts = opts || {}
259- var ary = Object.keys(toSend).map(function (k) {
260- return { id: k, sequence: toSend[k] }
293+ var ary = Object.keys(replicate).map(function (k) {
294+ return { id: k, sequence: toSend[k]||0 }
261295 })
262296 if(opts.live)
263- return Cat([pull.values(ary), pull.once({sync: true}), newPeer.listen()])
297+ return Cat([
298+ pull.values(ary),
299+ pull.once({sync: true}),
300+ newPeer.listen()
301+ ])
264302
265303 return pull.values(ary)
266304 }
267305
@@ -274,9 +312,8 @@
274312
275313 var drain
276314
277315 function replicate(upto, cb) {
278- console.log(sbot.id.substring(0, 10), upto)
279316 pendingFeedsForPeer[rpc.id] = pendingFeedsForPeer[rpc.id] || new Set()
280317 pendingFeedsForPeer[rpc.id].add(upto.id)
281318
282319 debounce.set()
@@ -289,9 +326,9 @@
289326 sbot.createWriteStream(function (err) {
290327 if(err && !(err.message in errorsSeen)) {
291328 errorsSeen[err.message] = true
292329 if(err.message in streamErrors) {
293- cb(err)
330+ cb && cb(err)
294331 if(err.message === 'unexpected end of parent stream') {
295332 if (err instanceof Error) {
296333 // stream closed okay locally
297334 } else {
@@ -300,10 +337,12 @@
300337 rpc.close(err)
301338 }
302339 }
303340 } else {
304- console.error('Error replicating with ' + rpc.id + ':\n ',
305- err.stack)
341+ console.error(
342+ 'Error replicating with ' + rpc.id + ':\n ',
343+ err.stack
344+ )
306345 }
307346 }
308347
309348 pendingFeedsForPeer[rpc.id].delete(upto.id)
@@ -324,28 +363,39 @@
324363 function fallback () {
325364 //if we are not configured to use EBT, then fallback to createHistoryStream
326365 if(replicate_self) return
327366 replicated_self = true
328- sbot.latestSequence(sbot.id, function (err, seq) {
329- replicate({
330- id: sbot.id, sequence: err ? 0 : toSeq(seq)
331- }, function () {})
332- })
367+// console.log('replicate_self', sbot.id, toSend)
368+ replicate({id: sbot.id, sequence: toSend[sbot.id] || 0})
369+// sbot.latestSequence(sbot.id, function (err, seq) {
370+// replicate({
371+// id: sbot.id, sequence: err ? 0 : toSeq(seq)
372+// }, function () {})
373+// })
333374 }
375+
334376 //trigger this if ebt.replicate fails...
335377 rpc.once('call:createHistoryStream', next)
336378
379+ var started = false
337380 function next () {
381+ if(started) return
382+ started = true
338383 sbot.emit('replicate:start', rpc)
339384
340385 rpc.on('closed', function () {
386+ console.log(toSend)
341387 sbot.emit('replicate:finish', toSend)
342388 })
343389
390+ //make sure we wait until the clock is loaded
344391 pull(
345392 upto({live: opts.live}),
346393 drain = pull.drain(function (upto) {
347394 if(upto.sync) return
395+ if(!isFeed(upto.id)) throw new Error('expected feed!')
396+ if(!Number.isInteger(upto.sequence)) throw new Error('expected sequence!')
397+
348398 if(upto.id == sbot.id && replicate_self) return replicate_self = true
349399 replicate(upto, function (err) {
350400 drain.abort()
351401 })
@@ -356,10 +406,17 @@
356406 )
357407
358408 }
359409 })
360- return upto
410+
411+ return {
412+ request: request,
413+ upto: upto,
414+ changes: notify.listen
415+ }
416+// return upto
361417 }
362418
363419
364420
365421
422+

Built with git-ssb-web