git ssb

10+

Matt McKegg / patchwork



Tree: 6b5fb10bffe7df13233ba65f8a3631d6121ed3cd

Files: 6b5fb10bffe7df13233ba65f8a3631d6121ed3cd / lib / replicate-with-progress.js

8077 bytesRaw
1'use strict'
2var pull = require('pull-stream')
3var para = require('pull-paramap')
4var Notify = require('pull-notify')
5var Cat = require('pull-cat')
6var Debounce = require('observ-debounce')
7var mdm = require('mdmanifest')
8var apidoc = require('scuttlebot/lib/apidocs').replicate
9var MutantToPull = require('./mutant-to-pull')
10var {Struct, Dict} = require('mutant')
11
12var Pushable = require('pull-pushable')
13
14// compatibility function for old implementations of `latestSequence`
15function toSeq (s) {
16 return typeof s === 'number' ? s : s.sequence
17}
18
19module.exports = {
20 name: 'replicate',
21 version: '2.0.0',
22 manifest: mdm.manifest(apidoc),
23 init: function (sbot, config) {
24 var debounce = Debounce(200)
25 var listeners = {}
26 var newPeer = Notify()
27
28 // keep track of sync progress and provide to client
29
30 var start = null
31 var count = 0
32 var rate = 0
33 var toSend = {}
34 var peerHas = {}
35 var pendingPeer = {}
36
37 window.pendingPeer = pendingPeer
38
39 var syncStatus = Struct({
40 type: 'global',
41 incomplete: 0,
42 pendingCount: 0,
43 pendingPeers: Dict({}, {fixedIndexing: true}),
44 feeds: null,
45 rate: 0
46 })
47
48 window.syncStatus = syncStatus
49
50 debounce(function () {
51 var incomplete = 0
52 var totalPending = 0
53 var feeds = Object.keys(toSend).length
54 var peers = {}
55
56 Object.keys(pendingPeer).forEach(function (peerId) {
57 if (pendingPeer[peerId]) {
58 totalPending += 1
59
60 if (Object.keys(toSend).some(function (feedId) {
61 if (peerHas[peerId] && peerHas[peerId][feedId]) {
62 return peerHas[peerId][feedId] > toSend[feedId]
63 }
64 })) {
65 incomplete += 1
66 }
67
68 peers[peerId] = pendingPeer[peerId]
69 }
70 })
71
72 syncStatus.set({
73 incomplete: incomplete,
74 feeds: syncStatus.loadedFriends ? feeds : null,
75 pendingPeers: peers,
76 pending: totalPending,
77 rate: rate
78 }, {merge: true})
79 })
80
81 pull(
82 sbot.createLogStream({old: false, live: true, sync: false, keys: false}),
83 pull.drain(function (e) {
84 // track writes per second, mainly used for developing initial sync.
85 if (!start) start = Date.now()
86 var time = (Date.now() - start) / 1000
87 if (time >= 1) {
88 rate = count / time
89 start = Date.now()
90 count = 0
91 }
92 var pushable = listeners[e.author]
93
94 if (pushable && pushable.sequence === e.sequence) {
95 pushable.sequence++
96 pushable.forEach(function (p) {
97 p.push(e)
98 })
99 }
100 count++
101 addPeer({id: e.author, sequence: e.sequence})
102 })
103 )
104
105 sbot.createHistoryStream.hook(function (fn, args) {
106 var upto = args[0] || {}
107 var seq = upto.sequence || upto.seq
108
109 if (this._emit) this._emit('call:createHistoryStream', args[0])
110
111 // if we are calling this locally, skip cleverness
112 if (this === sbot) return fn.call(this, upto)
113
114 // keep track of each requested value, per feed / per peer.
115 peerHas[this.id] = peerHas[this.id] || {}
116 peerHas[this.id][upto.id] = seq - 1
117
118 debounce.set()
119
120 // handle creating lots of history streams efficiently.
121 // maybe this could be optimized in map-filter-reduce queries instead?
122 if (toSend[upto.id] == null || (seq > toSend[upto.id])) {
123 upto.old = false
124 if (!upto.live) return pull.empty()
125 var pushable = listeners[upto.id] = listeners[upto.id] || []
126 var p = Pushable(function () {
127 var i = pushable.indexOf(p)
128 pushable.splice(i, 1)
129 })
130 pushable.push(p)
131 pushable.sequence = upto.sequence
132 return p
133 }
134 return fn.call(this, upto)
135 })
136
137 // collect the IDs of feeds we want to request
138 var opts = config.replication || {}
139 opts.hops = opts.hops || 3
140 opts.dunbar = opts.dunbar || 150
141 opts.live = true
142 opts.meta = true
143
144 function localPeers () {
145 if (!sbot.gossip) return
146 sbot.gossip.peers().forEach(function (e) {
147 if (toSend[e.key] == null) {
148 addPeer({id: e.key, sequence: 0})
149 }
150 })
151 }
152
153 // also request local peers.
154 if (sbot.gossip) {
155 // if we have the gossip plugin active, then include new local peers
156 // so that you can put a name to someone on your local network.
157 var int = setInterval(localPeers, 1000)
158 if (int.unref) int.unref()
159 localPeers()
160 }
161
162 function loadedFriends () {
163 console.log('>>>> loaded friends')
164 syncStatus.loadedFriends = true
165 debounce.set()
166 }
167
168 function addPeer (upto) {
169 if (upto.sync) return loadedFriends()
170 if (!upto.id) return console.log('invalid', upto)
171
172 if (toSend[upto.id] == null) {
173 toSend[upto.id] = Math.max(toSend[upto.id] || 0, upto.sequence || upto.seq || 0)
174 newPeer({ id: upto.id, sequence: toSend[upto.id], type: 'new' })
175 debounce.set()
176 } else {
177 toSend[upto.id] = Math.max(toSend[upto.id] || 0, upto.sequence || upto.seq || 0)
178 }
179
180 debounce.set()
181 }
182
183 // create read-streams for the desired feeds
184 pull(
185 sbot.friends.createFriendStream(opts),
186 // filter out duplicates, and also keep track of what we expect to receive
187 // lookup the latest sequence from each user
188 para(function (data, cb) {
189 if (data.sync) return cb(null, data)
190 var id = data.id || data
191 sbot.latestSequence(id, function (err, seq) {
192 cb(null, {
193 id: id, sequence: err ? 0 : toSeq(seq)
194 })
195 })
196 }, 32),
197 pull.drain(addPeer, loadedFriends)
198 )
199
200 function upto (opts) {
201 opts = opts || {}
202 var ary = Object.keys(toSend).map(function (k) {
203 return { id: k, sequence: toSend[k] }
204 })
205 if (opts.live) {
206 return Cat([pull.values(ary), pull.once({sync: true}), newPeer.listen()])
207 }
208
209 return pull.values(ary)
210 }
211
212 sbot.on('rpc:connect', function (rpc) {
213 // this is the cli client, just ignore.
214 if (rpc.id === sbot.id) return
215 // check for local peers, or manual connections.
216 localPeers()
217 sbot.emit('replicate:start', rpc)
218 rpc.on('closed', function () {
219 sbot.emit('replicate:finish', toSend)
220 })
221 pull(
222 upto({live: opts.live}),
223 pull.drain(function (upto) {
224 if (upto.sync) return
225 var last = (upto.sequence || upto.seq || 0)
226 pendingPeer[rpc.id] = (pendingPeer[rpc.id] || 0) + 1
227 debounce.set()
228
229 pull(
230 rpc.createHistoryStream({
231 id: upto.id,
232 seq: last + 1,
233 live: false,
234 keys: false
235 }),
236 pull.through((msg) => {
237 start = Math.max(start, msg.sequence)
238 }),
239 sbot.createWriteStream(function () {
240 // TODO: do something with the error
241 // this seems to be thrown fairly regularly whenever something weird happens to the stream
242
243 pendingPeer[rpc.id] -= 1
244 debounce.set()
245
246 // all synched, now lets keep watching for live changes
247 // need to handle this separately because there is no {sync: true} event with HistoryStream
248 // and we want to notify the client that sync has completed
249
250 pull(
251 rpc.createHistoryStream({
252 id: upto.id,
253 seq: last + 1,
254 live: true,
255 keys: false
256 }),
257 sbot.createWriteStream(function () {
258 // TODO: handle error
259 })
260 )
261 })
262 )
263 }, function (err) {
264 if (err) {
265 sbot.emit('log:error', ['replication', rpc.id, 'error', err])
266 }
267 })
268 )
269 })
270
271 return {
272 changes: function () {
273 return MutantToPull(syncStatus)
274 },
275 upto: upto
276 }
277 }
278}
279

Built with git-ssb-web