git ssb

4+

Dominic / scuttlebot



Commit 707a39608eb88fea931761a44972491e378d34b3

Merge remote-tracking branch 'origin/flume-with-latest-legacy-replication'

Dominic Tarr committed on 6/9/2017, 10:55:44 PM
Parent: 18875b2a6630bee59ed985aafef99167cf8dfe00
Parent: 730020600eba585e4e9d9afce47283806715f12b

Files changed

lib/detect-sync.jsadded
plugins/replicate/legacy.jschanged
lib/detect-sync.jsView
@@ -1,0 +1,48 @@
1+module.exports = function detectSync (peerId, upto, toSend, peerHas, onSync) {
2+ // HACK: createHistoryStream does not emit sync event, so we don't
3+ // know when it switches to live. Do it manually!
4+
5+ var sync = false
6+ var last = (upto.sequence || upto.seq || 0)
7+
8+ // check sync after 500ms, hopefully we have the info from the peer by then
9+ setTimeout(function () {
10+ if (peerHas[peerId] && peerHas[peerId][upto.id] != null) {
11+ checkSync()
12+ } else {
13+ // if we get here, the peer hasn't yet asked for this feed, or is not responding
14+ // we can assume it doesn't have the feed, so lets call sync
15+ broadcastSync()
16+ }
17+ }, 500)
18+
19+ return function (msg) {
20+ if (msg.sync) {
21+ // surprise! This peer actually has a sync event!
22+ broadcastSync()
23+ return false
24+ }
25+
26+ last = msg.sequence
27+ checkSync()
28+ return true
29+ }
30+
31+ function checkSync () {
32+ if (!sync) {
33+ var availableSeq = peerHas[peerId] && peerHas[peerId][upto.id]
34+ if (availableSeq === last || availableSeq < toSend[upto.id]) {
35+ // we've reached the maximum sequence this server has told us it knows about
36+ // or we don't need anything from this server
37+ broadcastSync()
38+ }
39+ }
40+ }
41+
42+ function broadcastSync () {
43+ if (!sync) {
44+ sync = true
45+ onSync && onSync()
46+ }
47+ }
48+}
plugins/replicate/legacy.jsView
@@ -6,10 +6,10 @@
66 var Debounce = require('observ-debounce')
77 var deepEqual = require('deep-equal')
88 var Obv = require('obv')
99 var isFeed = require('ssb-ref').isFeed
10-
1110 var Pushable = require('pull-pushable')
11+var detectSync = require('../../lib/detect-sync')
1212
1313 // compatibility function for old implementations of `latestSequence`
1414 function toSeq (s) {
1515 return 'number' === typeof s ? s : s.sequence
@@ -29,40 +29,8 @@
2929 'write EPIPE': true,
3030 'stream is closed': true, // rpc method called after stream ended
3131 }
3232
33-function createHistoryStreamWithSync (rpc, upto, onSync) {
34- // HACK: createHistoryStream does not emit sync event, so we don't
35- // know when it switches to live. Do it manually!
36- var last = (upto.sequence || upto.seq || 0)
37- var state = null
38- return pullNext(function () {
39- if (!state) {
40- state = 'old'
41- return pull(
42- rpc.createHistoryStream({
43- id: upto.id,
44- seq: last + 1,
45- live: false,
46- keys: false
47- }),
48- pull.through(msg => {
49- last = Math.max(last, msg.sequence)
50- })
51- )
52- } else if (state === 'old') {
53- state = 'sync'
54- onSync && onSync(true)
55- return rpc.createHistoryStream({
56- id: upto.id,
57- seq: last + 1,
58- live: true,
59- keys: false
60- })
61- }
62- })
63-}
64-
6533 module.exports = function (sbot, notify, config) {
6634 var debounce = Debounce(200)
6735 var listeners = {}
6836 var newPeers = Notify()
@@ -279,12 +247,23 @@
279247
280248 debounce.set()
281249
282250 pull(
283- createHistoryStreamWithSync(rpc, upto, function onSync () {
284- pendingFeedsForPeer[rpc.id].delete(upto.id)
285- debounce.set()
251+ rpc.createHistoryStream({
252+ id: upto.id,
253+ seq: (upto.sequence || upto.seq || 0) + 1,
254+ live: true,
255+ keys: false
286256 }),
257+
258+ pull.through(detectSync(rpc.id, upto, toSend, peerHas, function () {
259+ if (pendingFeedsForPeer[rpc.id]) {
260+ // this peer has finished syncing, remove from progress
261+ pendingFeedsForPeer[rpc.id].delete(upto.id)
262+ debounce.set()
263+ }
264+ })),
265+
287266 sbot.createWriteStream(function (err) {
288267 if(err && !(err.message in errorsSeen)) {
289268 errorsSeen[err.message] = true
290269 if(err.message in streamErrors) {
@@ -305,10 +284,13 @@
305284 )
306285 }
307286 }
308287
309- pendingFeedsForPeer[rpc.id].delete(upto.id)
310- debounce.set()
288+ // if stream closes, remove from pending progress
289+ if (pendingFeedsForPeer[rpc.id]) {
290+ pendingFeedsForPeer[rpc.id].delete(upto.id)
291+ debounce.set()
292+ }
311293 })
312294 )
313295 }
314296
@@ -338,8 +320,12 @@
338320 sbot.emit('replicate:start', rpc)
339321
340322 rpc.on('closed', function () {
341323 sbot.emit('replicate:finish', toSend)
324+
325+ // if we disconnect from a peer, remove it from sync progress
326+ delete pendingFeedsForPeer[rpc.id]
327+ debounce.set()
342328 })
343329
344330 //make sure we wait until the clock is loaded
345331 pull(
@@ -367,8 +353,4 @@
367353 upto: upto,
368354 changes: notify.listen
369355 }
370356 }
371-
372-
373-
374-

Built with git-ssb-web