git ssb

10+

Matt McKegg / patchwork



Tree: bcadbcc3325b20b2d11e736b46940303044e756c

Files: bcadbcc3325b20b2d11e736b46940303044e756c / lib / replicate-with-progress.js

8278 bytesRaw
1'use strict'
2var pull = require('pull-stream')
3var para = require('pull-paramap')
4var Notify = require('pull-notify')
5var Cat = require('pull-cat')
6var Debounce = require('observ-debounce')
7var mdm = require('mdmanifest')
8var apidoc = require('scuttlebot/lib/apidocs').replicate
9var MutantToPull = require('./mutant-to-pull')
10var {Struct, Dict} = require('mutant')
11
12var Pushable = require('pull-pushable')
13
14// compatibility function for old implementations of `latestSequence`
15function toSeq (s) {
16 return typeof s === 'number' ? s : s.sequence
17}
18
19module.exports = {
20 name: 'replicate',
21 version: '2.0.0',
22 manifest: mdm.manifest(apidoc),
23 init: function (sbot, config) {
24 var debounce = Debounce(200)
25 var listeners = {}
26 var newPeer = Notify()
27
28 // keep track of sync progress and provide to client
29
30 var start = null
31 var count = 0
32 var rate = 0
33 var toSend = {}
34 var peerHas = {}
35 var pendingPeer = {}
36
37 window.pendingPeer = pendingPeer
38
39 var syncStatus = Struct({
40 type: 'global',
41 incomplete: 0,
42 pending: 0,
43 pendingPeers: Dict({}, {fixedIndexing: true}),
44 feeds: null,
45 rate: 0
46 })
47
48 window.syncStatus = syncStatus
49
50 debounce(function () {
51 var totalPending = 0
52 var feeds = Object.keys(toSend).length
53 var peers = {}
54 var pendingFeeds = new Set()
55
56 Object.keys(pendingPeer).forEach(function (peerId) {
57 if (pendingPeer[peerId]) {
58 totalPending += 1
59
60 Object.keys(toSend).forEach(function (feedId) {
61 if (peerHas[peerId] && peerHas[peerId][feedId]) {
62 if (peerHas[peerId][feedId] > toSend[feedId]) {
63 pendingFeeds.add(feedId)
64 }
65 }
66 })
67
68 peers[peerId] = pendingPeer[peerId]
69 }
70 })
71
72 syncStatus.set({
73 incomplete: pendingFeeds.size,
74 feeds: syncStatus.loadedFriends ? feeds : null,
75 pendingPeers: peers,
76 pending: totalPending,
77 rate: rate
78 }, {merge: true})
79 })
80
81 pull(
82 sbot.createLogStream({old: false, live: true, sync: false, keys: false}),
83 pull.drain(function (e) {
84 // track writes per second, mainly used for developing initial sync.
85 if (!start) start = Date.now()
86 var time = (Date.now() - start) / 1000
87 if (time >= 1) {
88 rate = count / time
89 start = Date.now()
90 count = 0
91 }
92 var pushable = listeners[e.author]
93
94 if (pushable && pushable.sequence === e.sequence) {
95 pushable.sequence++
96 pushable.forEach(function (p) {
97 p.push(e)
98 })
99 }
100 count++
101 addPeer({id: e.author, sequence: e.sequence})
102 })
103 )
104
105 sbot.createHistoryStream.hook(function (fn, args) {
106 var upto = args[0] || {}
107 var seq = upto.sequence || upto.seq
108
109 if (this._emit) this._emit('call:createHistoryStream', args[0])
110
111 // if we are calling this locally, skip cleverness
112 if (this === sbot) return fn.call(this, upto)
113
114 // keep track of each requested value, per feed / per peer.
115 peerHas[this.id] = peerHas[this.id] || {}
116 peerHas[this.id][upto.id] = seq - 1
117
118 debounce.set()
119
120 // handle creating lots of history streams efficiently.
121 // maybe this could be optimized in map-filter-reduce queries instead?
122 if (toSend[upto.id] == null || (seq > toSend[upto.id])) {
123 upto.old = false
124 if (!upto.live) return pull.empty()
125 var pushable = listeners[upto.id] = listeners[upto.id] || []
126 var p = Pushable(function () {
127 var i = pushable.indexOf(p)
128 pushable.splice(i, 1)
129 })
130 pushable.push(p)
131 pushable.sequence = seq
132 return p
133 }
134 return fn.call(this, upto)
135 })
136
137 // collect the IDs of feeds we want to request
138 var opts = config.replication || {}
139 opts.hops = opts.hops || 3
140 opts.dunbar = opts.dunbar || 150
141 opts.live = true
142 opts.meta = true
143
144 function localPeers () {
145 if (!sbot.gossip) return
146 sbot.gossip.peers().forEach(function (e) {
147 if (e.source === 'local' && toSend[e.key] == null) {
148 sbot.latestSequence(e.key, function (err, seq) {
149 addPeer({id: e.key, sequence: err ? 0 : toSeq(seq)})
150 })
151 }
152 })
153 }
154
155 // also request local peers.
156 if (sbot.gossip) {
157 // if we have the gossip plugin active, then include new local peers
158 // so that you can put a name to someone on your local network.
159 var int = setInterval(localPeers, 1000)
160 if (int.unref) int.unref()
161 localPeers()
162 }
163
164 function loadedFriends () {
165 syncStatus.loadedFriends = true
166 debounce.set()
167 }
168
169 function addPeer (upto) {
170 if (upto.sync) return loadedFriends()
171 if (!upto.id) return console.log('invalid', upto)
172
173 if (toSend[upto.id] == null) {
174 toSend[upto.id] = Math.max(toSend[upto.id] || 0, upto.sequence || upto.seq || 0)
175 newPeer({ id: upto.id, sequence: toSend[upto.id], type: 'new' })
176 debounce.set()
177 } else {
178 toSend[upto.id] = Math.max(toSend[upto.id] || 0, upto.sequence || upto.seq || 0)
179 }
180
181 debounce.set()
182 }
183
184 // create read-streams for the desired feeds
185 pull(
186 sbot.friends.createFriendStream(opts),
187 // filter out duplicates, and also keep track of what we expect to receive
188 // lookup the latest sequence from each user
189 para(function (data, cb) {
190 if (data.sync) return cb(null, data)
191 var id = data.id || data
192 sbot.latestSequence(id, function (err, seq) {
193 cb(null, {
194 id: id, sequence: err ? 0 : toSeq(seq)
195 })
196 })
197 }, 32),
198 pull.drain(addPeer, loadedFriends)
199 )
200
201 function upto (opts) {
202 opts = opts || {}
203 var ary = Object.keys(toSend).map(function (k) {
204 return { id: k, sequence: toSend[k] }
205 })
206 if (opts.live) {
207 return Cat([pull.values(ary), pull.once({sync: true}), newPeer.listen()])
208 }
209
210 return pull.values(ary)
211 }
212
213 sbot.on('rpc:connect', function (rpc) {
214 // this is the cli client, just ignore.
215 if (rpc.id === sbot.id) return
216 // check for local peers, or manual connections.
217 localPeers()
218 sbot.emit('replicate:start', rpc)
219 rpc.on('closed', function () {
220 sbot.emit('replicate:finish', toSend)
221 })
222 pull(
223 upto({live: opts.live}),
224 pull.drain(function (upto) {
225 if (upto.sync) return
226 var last = (upto.sequence || upto.seq || 0)
227 pendingPeer[rpc.id] = (pendingPeer[rpc.id] || 0) + 1
228 debounce.set()
229
230 pull(
231 rpc.createHistoryStream({
232 id: upto.id,
233 seq: last + 1,
234 live: false,
235 keys: false
236 }),
237 pull.through((msg) => {
238 start = Math.max(start, msg.sequence)
239 }),
240 sbot.createWriteStream(function () {
241 // TODO: do something with the error
242 // this seems to be thrown fairly regularly whenever something weird happens to the stream
243
244 pendingPeer[rpc.id] -= 1
245 debounce.set()
246
247 // all synched, now lets keep watching for live changes
248 // need to handle this separately because there is no {sync: true} event with HistoryStream
249 // and we want to notify the client that sync has completed
250
251 pull(
252 rpc.createHistoryStream({
253 id: upto.id,
254 seq: last + 1,
255 sequence: last + 1, // HACK: some clients won't stream if we don't specify this as sequence
256 live: true,
257 keys: false
258 }),
259 sbot.createWriteStream(function () {
260 // TODO: handle error
261 })
262 )
263 })
264 )
265 }, function (err) {
266 if (err) {
267 sbot.emit('log:error', ['replication', rpc.id, 'error', err])
268 }
269 })
270 )
271 })
272
273 return {
274 changes: function () {
275 return MutantToPull(syncStatus)
276 },
277 upto: upto
278 }
279 }
280}
281

Built with git-ssb-web