Files: a569cc80c75f15b0204f3af8a330e5866dd83389 / inject.js
9182 bytesRaw
1 | |
2 | function isEmpty (o) { |
3 | for(var k in o) return false |
4 | return true |
5 | } |
6 | |
7 | function isInteger (i) { |
8 | return Number.isInteger(i) |
9 | } |
10 | |
11 | var isArray = Array.isArray |
12 | |
13 | var Notify = require('pull-notify') |
14 | var pull = require('pull-stream') |
15 | var isBlobId = require('ssb-ref').isBlob |
16 | |
17 | var MB = 1024*1024 |
18 | var MAX_SIZE = 5*MB |
19 | function noop () {} |
20 | |
21 | function clone (obj) { |
22 | var o = {} |
23 | for(var k in obj) |
24 | o[k] = obj[k] |
25 | return o |
26 | } |
27 | |
28 | function count(obj) { |
29 | var c = 0 |
30 | for(var k in obj) c++ |
31 | return c |
32 | } |
33 | |
34 | function onAbort(abortCb) { |
35 | return function (read) { |
36 | return function (abort, cb) { |
37 | if (abort) abortCb(abort, cb) |
38 | else read(null, cb) |
39 | } |
40 | } |
41 | } |
42 | |
43 | module.exports = function inject (blobs, set, name, opts) { |
44 | opts = opts || {} |
45 | //sympathy controls whether you'll replicate |
46 | var sympathy = opts.sympathy == null ? 3 : opts.sympathy | 0 |
47 | var stingy = opts.stingy === true |
48 | var legacy = opts.legacy !== false |
49 | var pushy = opts.pushy || 3 |
50 | var max = opts.max || 5*1024*1024 |
51 | |
52 | var notify = Notify() |
53 | var pushed = Notify() |
54 | |
55 | var peers = {} |
56 | var want = {}, push = {}, waiting = {}, getting = {} |
57 | var available = {}, streams = {} |
58 | var send = {}, timer |
59 | |
60 | function queue (hash, hops) { |
61 | if(hops < 0) |
62 | want[hash] = hops |
63 | else |
64 | delete want[hash] |
65 | |
66 | send[hash] = hops |
67 | var _send = send; |
68 | send = {} |
69 | notify(_send) |
70 | } |
71 | |
72 | function isAvailable(id) { |
73 | for(var peer in peers) |
74 | if(available[peer] && available[peer][id] < max && peers[peer]) |
75 | return peer |
76 | } |
77 | |
78 | function get (peer, id, name) { |
79 | if(getting[id] || !peers[peer]) return |
80 | |
81 | getting[id] = peer |
82 | var source = peers[peer].blobs.get({key: id, max: max}) |
83 | pull(source, blobs.add(id, function (err, _id) { |
84 | delete getting[id] |
85 | if(err) { |
86 | if(available[peer]) delete available[peer][id] |
87 | //check if another peer has this. |
88 | //if so get it from them. |
89 | if(peer = isAvailable(id)) get(peer, id, name) |
90 | } |
91 | })) |
92 | } |
93 | |
94 | function wants (peer, id, hops) { |
95 | if(Math.abs(hops) > sympathy) return //sorry! |
96 | if(!want[id] || want[id] < hops) { |
97 | want[id] = hops |
98 | queue(id, hops) |
99 | if(peer = isAvailable(id)) { |
100 | get(peer, id) |
101 | } |
102 | } |
103 | } |
104 | |
105 | pull( |
106 | blobs.ls({old: false, meta: true}), |
107 | pull.drain(function (data) { |
108 | queue(data.id, data.size) |
109 | delete want[data.id] |
110 | if(waiting[data.id]) |
111 | while(waiting[data.id].length) |
112 | waiting[data.id].shift()(null, true) |
113 | }) |
114 | ) |
115 | |
116 | function has(peer_id, id, size) { |
117 | if('string' !== typeof peer_id) throw new Error('peer must be string id') |
118 | available[peer_id] = available[peer_id] || {} |
119 | available[peer_id][id] = size |
120 | //if we are broadcasting this blob, |
121 | //mark this peer has it. |
122 | //if N peers have it, we can stop broadcasting. |
123 | if(push[id]) { |
124 | push[id][peer_id] = size |
125 | if(count(push[id]) >= pushy) { |
126 | var data = {key: id, peers: push[id]} |
127 | set.remove(id) |
128 | delete push[id]; pushed(data) |
129 | } |
130 | } |
131 | if(want[id] && !getting[id] && size < max) get(peer_id, id) |
132 | } |
133 | |
134 | function process (data, peer, cb) { |
135 | var n = 0, res = {} |
136 | for(var id in data) (function (id) { |
137 | if(isBlobId(id) && isInteger(data[id])) { |
138 | if(data[id] < 0 && (opts.stingy !== true || push[id])) { //interpret as "WANT" |
139 | n++ |
140 | //check whether we already *HAVE* this file. |
141 | //respond with it's size, if we do. |
142 | blobs.size(id, function (err, size) { //XXX |
143 | if(size) res[id] = size |
144 | else wants(peer, id, data[id] - 1) |
145 | next() |
146 | }) |
147 | } |
148 | else if(data[id] > 0) { //interpret as "HAS" |
149 | has(peer, id, data[id]) |
150 | } |
151 | } |
152 | }(id)) |
153 | |
154 | function next () { |
155 | if(--n) return |
156 | cb(null, res) |
157 | } |
158 | } |
159 | |
160 | function dead (peer_id) { |
161 | delete peers[peer_id] |
162 | delete available[peer_id] |
163 | delete streams[peer_id] |
164 | } |
165 | |
166 | //LEGACY LEGACY LEGACY |
167 | function legacySync (peer) { |
168 | if(!legacy) return |
169 | |
170 | var drain //we need to keep a reference to drain |
171 | //so we can abort it when we get an error. |
172 | function hasLegacy (hashes) { |
173 | var ary = Object.keys(hashes).filter(function (k) { |
174 | return hashes[k] < 0 |
175 | }) |
176 | if(ary.length) |
177 | peer.blobs.has(ary, function (err, haves) { |
178 | if(err) drain.abort(err) //ERROR: abort this stream. |
179 | else haves.forEach(function (have, i) { |
180 | if(have) has(peer.id, ary[i], have) |
181 | }) |
182 | }) |
183 | } |
184 | |
185 | function notPeer (err) { |
186 | if(err) dead(peer.id) |
187 | } |
188 | |
189 | drain = pull.drain(function (hash) { |
190 | has(peer.id, hash, true) |
191 | }, notPeer) |
192 | |
193 | |
194 | pull(peer.blobs.changes(), drain) |
195 | |
196 | hasLegacy(want) |
197 | |
198 | //a stream of hashes |
199 | pull(notify.listen(), pull.drain(hasLegacy, notPeer)) |
200 | } |
201 | //LEGACY LEGACY LEGACY |
202 | |
203 | function createWantStream (id) { |
204 | if(!streams[id]) { |
205 | streams[id] = notify.listen() |
206 | |
207 | //merge in ids we are pushing. |
208 | var w = clone(want) |
209 | for(var k in push) w[k] = -1 |
210 | streams[id].push(w) |
211 | } |
212 | return pull(streams[id], onAbort(function (err, cb) { |
213 | streams[id] = false |
214 | cb(err) |
215 | })) |
216 | } |
217 | |
218 | function wantSink (peer) { |
219 | createWantStream(peer.id) //set streams[peer.id] |
220 | |
221 | var modern = false |
222 | return pull.drain(function (data) { |
223 | modern = true |
224 | //respond with list of blobs you already have, |
225 | process(data, peer.id, function (err, has_data) { |
226 | //(if you have any) |
227 | if(!isEmpty(has_data) && streams[peer.id]) streams[peer.id].push(has_data) |
228 | }) |
229 | }, function (err) { |
230 | if(err && !modern) { |
231 | streams[peer.id] = false |
232 | if(legacy) legacySync(peer) |
233 | } |
234 | //if can handle unpeer another way, |
235 | //then can refactor legacy handling out of sight. |
236 | |
237 | //handle error and fallback to legacy mode, if enabled. |
238 | else if(peers[peer.id] == peer) { |
239 | delete peers[peer.id] |
240 | delete available[peer.id] |
241 | delete streams[peer.id] |
242 | } |
243 | }) |
244 | } |
245 | |
246 | var self |
247 | return self = { |
248 | //id: name, |
249 | has: function (hash, cb) { |
250 | if(isArray(hash)) { |
251 | for(var i = 0; i < hash.length; i++) |
252 | if(!isBlobId(hash[i])) |
253 | return cb(new Error('invalid hash:'+hash[i])) |
254 | } |
255 | else if(!isBlobId(hash)) |
256 | return cb(new Error('invalid hash:'+hash)) |
257 | |
258 | if(!legacy) { |
259 | blobs.has.call(this, hash, cb) |
260 | } |
261 | else { |
262 | //LEGACY LEGACY LEGACY |
263 | if(this === self || !this || this === global) { // a local call |
264 | return blobs.has.call(this, hash, cb) |
265 | } |
266 | //ELSE, interpret remote calls to has as a WANT request. |
267 | //handling this by calling process (which calls size()) |
268 | //avoids a race condition in the tests. |
269 | //(and avoids doubling the number of calls) |
270 | var a = Array.isArray(hash) ? hash : [hash] |
271 | var o = {} |
272 | a.forEach(function (h) { o[h] = -1 }) |
273 | //since this is always "has" process will never use the second arg. |
274 | process(o, null, function (err, res) { |
275 | var a = []; for(var k in o) a.push(res[k] > 0) |
276 | cb(null, Array.isArray(hash) ? a : a[0]) |
277 | }) |
278 | //LEGACY LEGACY LEGACY |
279 | } |
280 | }, |
281 | size: blobs.size, |
282 | get: blobs.get, |
283 | getSlice: blobs.getSlice, |
284 | add: blobs.add, |
285 | rm: blobs.rm, |
286 | ls: blobs.ls, |
287 | changes: function () { |
288 | //XXX for bandwidth sensitive peers, don't tell them about blobs we arn't trying to push. |
289 | return pull( |
290 | blobs.ls({old: false, meta: false}), |
291 | pull.filter(function (id) { |
292 | return !stingy || push[id] |
293 | }) |
294 | ) |
295 | }, |
296 | want: function (hash, cb) { |
297 | if(!isBlobId(hash)) |
298 | return cb(new Error('invalid hash:'+hash)) |
299 | //always broadcast wants immediately, because of race condition |
300 | //between has and adding a blob (needed to pass test/async.js) |
301 | if(blobs.isEmptyHash(hash)) return cb(null, true) |
302 | var id = isAvailable(hash) |
303 | |
304 | if(waiting[hash]) |
305 | waiting[hash].push(cb) |
306 | else { |
307 | waiting[hash] = [cb] |
308 | blobs.size(hash, function (err, size) { |
309 | if(size != null) { |
310 | while(waiting[hash].length) |
311 | waiting[hash].shift()(null, true) |
312 | delete waiting[hash] |
313 | } |
314 | }) |
315 | } |
316 | |
317 | if(!id && waiting[hash]) queue(hash, -1) |
318 | |
319 | if(id) return get(id, hash) |
320 | }, |
321 | push: function (id, cb) { |
322 | //also store the id to push. |
323 | if(!isBlobId(id)) |
324 | return cb(new Error('invalid hash:'+id)) |
325 | |
326 | push[id] = push[id] || {} |
327 | queue(id, -1) |
328 | set.add(id, cb) |
329 | }, |
330 | pushed: function () { |
331 | return pushed.listen() |
332 | }, |
333 | createWants: function () { |
334 | return createWantStream(this.id) |
335 | }, |
336 | //private api. used for testing. not exposed over rpc. |
337 | _wantSink: wantSink, |
338 | _onConnect: function (other, name) { |
339 | peers[other.id] = other |
340 | //sending of your wants starts when you we know |
341 | //that they are not legacy style. |
342 | //process is called when wantSync |
343 | //doesn't immediately get an error. |
344 | pull(other.blobs.createWants(), wantSink(other)) |
345 | } |
346 | } |
347 | } |
348 | |
349 | |
350 | |
351 |
Built with git-ssb-web