Files: 6fd3ee341a376214993d0363c4c72161700b7fbc / lib / replicate-with-progress.js
8278 bytesRaw
1 | |
2 | var pull = require('pull-stream') |
3 | var para = require('pull-paramap') |
4 | var Notify = require('pull-notify') |
5 | var Cat = require('pull-cat') |
6 | var Debounce = require('observ-debounce') |
7 | var mdm = require('mdmanifest') |
8 | var apidoc = require('scuttlebot/lib/apidocs').replicate |
9 | var MutantToPull = require('./mutant-to-pull') |
10 | var {Struct, Dict} = require('mutant') |
11 | |
12 | var Pushable = require('pull-pushable') |
13 | |
14 | // compatibility function for old implementations of `latestSequence` |
15 | function toSeq (s) { |
16 | return typeof s === 'number' ? s : s.sequence |
17 | } |
18 | |
19 | module.exports = { |
20 | name: 'replicate', |
21 | version: '2.0.0', |
22 | manifest: mdm.manifest(apidoc), |
23 | init: function (sbot, config) { |
24 | var debounce = Debounce(200) |
25 | var listeners = {} |
26 | var newPeer = Notify() |
27 | |
28 | // keep track of sync progress and provide to client |
29 | |
30 | var start = null |
31 | var count = 0 |
32 | var rate = 0 |
33 | var toSend = {} |
34 | var peerHas = {} |
35 | var pendingPeer = {} |
36 | |
37 | window.pendingPeer = pendingPeer |
38 | |
39 | var syncStatus = Struct({ |
40 | type: 'global', |
41 | incomplete: 0, |
42 | pending: 0, |
43 | pendingPeers: Dict({}, {fixedIndexing: true}), |
44 | feeds: null, |
45 | rate: 0 |
46 | }) |
47 | |
48 | window.syncStatus = syncStatus |
49 | |
50 | debounce(function () { |
51 | var totalPending = 0 |
52 | var feeds = Object.keys(toSend).length |
53 | var peers = {} |
54 | var pendingFeeds = new Set() |
55 | |
56 | Object.keys(pendingPeer).forEach(function (peerId) { |
57 | if (pendingPeer[peerId]) { |
58 | totalPending += 1 |
59 | |
60 | Object.keys(toSend).forEach(function (feedId) { |
61 | if (peerHas[peerId] && peerHas[peerId][feedId]) { |
62 | if (peerHas[peerId][feedId] > toSend[feedId]) { |
63 | pendingFeeds.add(feedId) |
64 | } |
65 | } |
66 | }) |
67 | |
68 | peers[peerId] = pendingPeer[peerId] |
69 | } |
70 | }) |
71 | |
72 | syncStatus.set({ |
73 | incomplete: pendingFeeds.size, |
74 | feeds: syncStatus.loadedFriends ? feeds : null, |
75 | pendingPeers: peers, |
76 | pending: totalPending, |
77 | rate: rate |
78 | }, {merge: true}) |
79 | }) |
80 | |
81 | pull( |
82 | sbot.createLogStream({old: false, live: true, sync: false, keys: false}), |
83 | pull.drain(function (e) { |
84 | // track writes per second, mainly used for developing initial sync. |
85 | if (!start) start = Date.now() |
86 | var time = (Date.now() - start) / 1000 |
87 | if (time >= 1) { |
88 | rate = count / time |
89 | start = Date.now() |
90 | count = 0 |
91 | } |
92 | var pushable = listeners[e.author] |
93 | |
94 | if (pushable && pushable.sequence === e.sequence) { |
95 | pushable.sequence++ |
96 | pushable.forEach(function (p) { |
97 | p.push(e) |
98 | }) |
99 | } |
100 | count++ |
101 | addPeer({id: e.author, sequence: e.sequence}) |
102 | }) |
103 | ) |
104 | |
105 | sbot.createHistoryStream.hook(function (fn, args) { |
106 | var upto = args[0] || {} |
107 | var seq = upto.sequence || upto.seq |
108 | |
109 | if (this._emit) this._emit('call:createHistoryStream', args[0]) |
110 | |
111 | // if we are calling this locally, skip cleverness |
112 | if (this === sbot) return fn.call(this, upto) |
113 | |
114 | // keep track of each requested value, per feed / per peer. |
115 | peerHas[this.id] = peerHas[this.id] || {} |
116 | peerHas[this.id][upto.id] = seq - 1 |
117 | |
118 | debounce.set() |
119 | |
120 | // handle creating lots of history streams efficiently. |
121 | // maybe this could be optimized in map-filter-reduce queries instead? |
122 | if (toSend[upto.id] == null || (seq > toSend[upto.id])) { |
123 | upto.old = false |
124 | if (!upto.live) return pull.empty() |
125 | var pushable = listeners[upto.id] = listeners[upto.id] || [] |
126 | var p = Pushable(function () { |
127 | var i = pushable.indexOf(p) |
128 | pushable.splice(i, 1) |
129 | }) |
130 | pushable.push(p) |
131 | pushable.sequence = seq |
132 | return p |
133 | } |
134 | return fn.call(this, upto) |
135 | }) |
136 | |
137 | // collect the IDs of feeds we want to request |
138 | var opts = config.replication || {} |
139 | opts.hops = opts.hops || 3 |
140 | opts.dunbar = opts.dunbar || 150 |
141 | opts.live = true |
142 | opts.meta = true |
143 | |
144 | function localPeers () { |
145 | if (!sbot.gossip) return |
146 | sbot.gossip.peers().forEach(function (e) { |
147 | if (e.source === 'local' && toSend[e.key] == null) { |
148 | sbot.latestSequence(e.key, function (err, seq) { |
149 | addPeer({id: e.key, sequence: err ? 0 : toSeq(seq)}) |
150 | }) |
151 | } |
152 | }) |
153 | } |
154 | |
155 | // also request local peers. |
156 | if (sbot.gossip) { |
157 | // if we have the gossip plugin active, then include new local peers |
158 | // so that you can put a name to someone on your local network. |
159 | var int = setInterval(localPeers, 1000) |
160 | if (int.unref) int.unref() |
161 | localPeers() |
162 | } |
163 | |
164 | function loadedFriends () { |
165 | syncStatus.loadedFriends = true |
166 | debounce.set() |
167 | } |
168 | |
169 | function addPeer (upto) { |
170 | if (upto.sync) return loadedFriends() |
171 | if (!upto.id) return console.log('invalid', upto) |
172 | |
173 | if (toSend[upto.id] == null) { |
174 | toSend[upto.id] = Math.max(toSend[upto.id] || 0, upto.sequence || upto.seq || 0) |
175 | newPeer({ id: upto.id, sequence: toSend[upto.id], type: 'new' }) |
176 | debounce.set() |
177 | } else { |
178 | toSend[upto.id] = Math.max(toSend[upto.id] || 0, upto.sequence || upto.seq || 0) |
179 | } |
180 | |
181 | debounce.set() |
182 | } |
183 | |
184 | // create read-streams for the desired feeds |
185 | pull( |
186 | sbot.friends.createFriendStream(opts), |
187 | // filter out duplicates, and also keep track of what we expect to receive |
188 | // lookup the latest sequence from each user |
189 | para(function (data, cb) { |
190 | if (data.sync) return cb(null, data) |
191 | var id = data.id || data |
192 | sbot.latestSequence(id, function (err, seq) { |
193 | cb(null, { |
194 | id: id, sequence: err ? 0 : toSeq(seq) |
195 | }) |
196 | }) |
197 | }, 32), |
198 | pull.drain(addPeer, loadedFriends) |
199 | ) |
200 | |
201 | function upto (opts) { |
202 | opts = opts || {} |
203 | var ary = Object.keys(toSend).map(function (k) { |
204 | return { id: k, sequence: toSend[k] } |
205 | }) |
206 | if (opts.live) { |
207 | return Cat([pull.values(ary), pull.once({sync: true}), newPeer.listen()]) |
208 | } |
209 | |
210 | return pull.values(ary) |
211 | } |
212 | |
213 | sbot.on('rpc:connect', function (rpc) { |
214 | // this is the cli client, just ignore. |
215 | if (rpc.id === sbot.id) return |
216 | // check for local peers, or manual connections. |
217 | localPeers() |
218 | sbot.emit('replicate:start', rpc) |
219 | rpc.on('closed', function () { |
220 | sbot.emit('replicate:finish', toSend) |
221 | }) |
222 | pull( |
223 | upto({live: opts.live}), |
224 | pull.drain(function (upto) { |
225 | if (upto.sync) return |
226 | var last = (upto.sequence || upto.seq || 0) |
227 | pendingPeer[rpc.id] = (pendingPeer[rpc.id] || 0) + 1 |
228 | debounce.set() |
229 | |
230 | pull( |
231 | rpc.createHistoryStream({ |
232 | id: upto.id, |
233 | seq: last + 1, |
234 | live: false, |
235 | keys: false |
236 | }), |
237 | pull.through((msg) => { |
238 | start = Math.max(start, msg.sequence) |
239 | }), |
240 | sbot.createWriteStream(function () { |
241 | // TODO: do something with the error |
242 | // this seems to be thrown fairly regularly whenever something weird happens to the stream |
243 | |
244 | pendingPeer[rpc.id] -= 1 |
245 | debounce.set() |
246 | |
247 | // all synched, now lets keep watching for live changes |
248 | // need to handle this separately because there is no {sync: true} event with HistoryStream |
249 | // and we want to notify the client that sync has completed |
250 | |
251 | pull( |
252 | rpc.createHistoryStream({ |
253 | id: upto.id, |
254 | seq: last + 1, |
255 | sequence: last + 1, // HACK: some clients won't stream if we don't specify this as sequence |
256 | live: true, |
257 | keys: false |
258 | }), |
259 | sbot.createWriteStream(function () { |
260 | // TODO: handle error |
261 | }) |
262 | ) |
263 | }) |
264 | ) |
265 | }, function (err) { |
266 | if (err) { |
267 | sbot.emit('log:error', ['replication', rpc.id, 'error', err]) |
268 | } |
269 | }) |
270 | ) |
271 | }) |
272 | |
273 | return { |
274 | changes: function () { |
275 | return MutantToPull(syncStatus) |
276 | }, |
277 | upto: upto |
278 | } |
279 | } |
280 | } |
281 |
Built with git-ssb-web