Files: 340ef7cdcc9255c17889664920a134eca6611486 / inject.js
8858 bytesRaw
1 | |
2 | function isEmpty (o) { |
3 | for(var k in o) return false |
4 | return true |
5 | } |
6 | |
7 | function isInteger (i) { |
8 | return Number.isInteger(i) |
9 | } |
10 | |
11 | var isArray = Array.isArray |
12 | |
13 | var Notify = require('pull-notify') |
14 | var pull = require('pull-stream') |
15 | var isBlobId = require('ssb-ref').isBlob |
16 | |
17 | var MB = 1024*1024 |
18 | var MAX_SIZE = 5*MB |
19 | function noop () {} |
20 | |
21 | function clone (obj) { |
22 | var o = {} |
23 | for(var k in obj) |
24 | o[k] = obj[k] |
25 | return o |
26 | } |
27 | |
28 | function count(obj) { |
29 | var c = 0 |
30 | for(var k in obj) c++ |
31 | return c |
32 | } |
33 | |
34 | module.exports = function inject (blobs, set, name, opts) { |
35 | opts = opts || {} |
36 | //sympathy controls whether you'll replicate |
37 | var sympathy = opts.sympathy == null ? 3 : opts.sympathy | 0 |
38 | var stingy = opts.stingy === true |
39 | var legacy = opts.legacy !== false |
40 | var pushy = opts.pushy || 3 |
41 | var max = opts.max || 5*1024*1024 |
42 | |
43 | var notify = Notify() |
44 | var pushed = Notify() |
45 | |
46 | var peers = {} |
47 | var want = {}, push = {}, waiting = {}, getting = {} |
48 | var available = {}, streams = {} |
49 | var send = {}, timer |
50 | |
51 | function queue (hash, hops) { |
52 | if(hops < 0) |
53 | want[hash] = hops |
54 | else |
55 | delete want[hash] |
56 | |
57 | send[hash] = hops |
58 | var _send = send; |
59 | send = {} |
60 | notify(_send) |
61 | } |
62 | |
63 | function isAvailable(id) { |
64 | for(var peer in peers) |
65 | if(available[peer] && available[peer][id] < max && peers[peer]) |
66 | return peer |
67 | } |
68 | |
69 | function get (peer, id, name) { |
70 | if(getting[id] || !peers[peer]) return |
71 | |
72 | getting[id] = peer |
73 | var source = peers[peer].blobs.get({key: id, max: max}) |
74 | pull(source, blobs.add(id, function (err, _id) { |
75 | delete getting[id] |
76 | if(err) { |
77 | if(available[peer]) delete available[peer][id] |
78 | //check if another peer has this. |
79 | //if so get it from them. |
80 | if(peer = isAvailable(id)) get(peer, id, name) |
81 | } |
82 | })) |
83 | } |
84 | |
85 | function wants (peer, id, hops) { |
86 | if(Math.abs(hops) > sympathy) return //sorry! |
87 | if(!want[id] || want[id] < hops) { |
88 | want[id] = hops |
89 | queue(id, hops) |
90 | if(peer = isAvailable(id)) { |
91 | get(peer, id) |
92 | } |
93 | } |
94 | } |
95 | |
96 | pull( |
97 | blobs.ls({old: false, meta: true}), |
98 | pull.drain(function (data) { |
99 | queue(data.id, data.size) |
100 | delete want[data.id] |
101 | if(waiting[data.id]) |
102 | while(waiting[data.id].length) |
103 | waiting[data.id].shift()(null, true) |
104 | }) |
105 | ) |
106 | |
107 | function has(peer_id, id, size) { |
108 | if('string' !== typeof peer_id) throw new Error('peer must be string id') |
109 | available[peer_id] = available[peer_id] || {} |
110 | available[peer_id][id] = size |
111 | //if we are broadcasting this blob, |
112 | //mark this peer has it. |
113 | //if N peers have it, we can stop broadcasting. |
114 | if(push[id]) { |
115 | push[id][peer_id] = size |
116 | if(count(push[id]) >= pushy) { |
117 | var data = {key: id, peers: push[id]} |
118 | set.remove(id) |
119 | delete push[id]; pushed(data) |
120 | } |
121 | } |
122 | if(want[id] && !getting[id] && size < max) get(peer_id, id) |
123 | } |
124 | |
125 | function process (data, peer, cb) { |
126 | var n = 0, res = {} |
127 | for(var id in data) { |
128 | if(isBlobId(id) && isInteger(data[id])) { |
129 | if(data[id] <= 0 && (opts.stingy !== true || push[id])) { //interpret as "WANT" |
130 | n++ |
131 | //check whether we already *HAVE* this file. |
132 | //respond with it's size, if we do. |
133 | blobs.size(id, function (err, size) { //XXX |
134 | if(size) res[id] = size |
135 | else wants(peer, id, data[id] - 1) |
136 | next() |
137 | }) |
138 | } |
139 | else if(data[id] > 0) { //interpret as "HAS" |
140 | has(peer, id, data[id]) |
141 | } |
142 | } |
143 | } |
144 | |
145 | function next () { |
146 | if(--n) return |
147 | cb(null, res) |
148 | } |
149 | } |
150 | |
151 | function dead (peer_id) { |
152 | delete peers[peer_id] |
153 | delete available[peer_id] |
154 | delete streams[peer_id] |
155 | } |
156 | |
157 | //LEGACY LEGACY LEGACY |
158 | function legacySync (peer) { |
159 | if(!legacy) return |
160 | |
161 | var drain //we need to keep a reference to drain |
162 | //so we can abort it when we get an error. |
163 | function hasLegacy (hashes) { |
164 | var ary = Object.keys(hashes).filter(function (k) { |
165 | return hashes[k] < 0 |
166 | }) |
167 | if(ary.length) |
168 | peer.blobs.has(ary, function (err, haves) { |
169 | if(err) drain.abort(err) //ERROR: abort this stream. |
170 | else haves.forEach(function (have, i) { |
171 | if(have) has(peer.id, ary[i], have) |
172 | }) |
173 | }) |
174 | } |
175 | |
176 | function notPeer (err) { |
177 | if(err) dead(peer.id) |
178 | } |
179 | |
180 | drain = pull.drain(function (hash) { |
181 | has(peer.id, hash, true) |
182 | }, notPeer) |
183 | |
184 | |
185 | pull(peer.blobs.changes(), drain) |
186 | |
187 | hasLegacy(want) |
188 | |
189 | //a stream of hashes |
190 | pull(notify.listen(), pull.drain(hasLegacy, notPeer)) |
191 | } |
192 | //LEGACY LEGACY LEGACY |
193 | |
194 | function createWantStream (id) { |
195 | if(!streams[id]) { |
196 | streams[id] = notify.listen() |
197 | |
198 | //merge in ids we are pushing. |
199 | var w = clone(want) |
200 | for(var k in push) w[k] = -1 |
201 | streams[id].push(w) |
202 | |
203 | return streams[id] |
204 | } |
205 | return streams[id] |
206 | } |
207 | |
208 | function wantSink (peer) { |
209 | createWantStream(peer.id) //set streams[peer.id] |
210 | |
211 | var modern = false |
212 | return pull.drain(function (data) { |
213 | modern = true |
214 | //respond with list of blobs you already have, |
215 | process(data, peer.id, function (err, has_data) { |
216 | //(if you have any) |
217 | if(!isEmpty(has_data) && streams[peer.id]) streams[peer.id].push(has_data) |
218 | }) |
219 | }, function (err) { |
220 | if(err && !modern) { |
221 | streams[peer.id] = false |
222 | if(legacy) legacySync(peer) |
223 | } |
224 | //if can handle unpeer another way, |
225 | //then can refactor legacy handling out of sight. |
226 | |
227 | //handle error and fallback to legacy mode, if enabled. |
228 | else if(peers[peer.id] == peer) { |
229 | delete peers[peer.id] |
230 | delete available[peer.id] |
231 | delete streams[peer.id] |
232 | } |
233 | }) |
234 | } |
235 | |
236 | var self |
237 | return self = { |
238 | //id: name, |
239 | has: function (hash, cb) { |
240 | if(isArray(hash)) { |
241 | for(var i = 0; i < hash.length; i++) |
242 | if(!isBlobId(hash[i])) |
243 | return cb(new Error('invalid hash:'+hash[i])) |
244 | } |
245 | else if(!isBlobId(hash)) |
246 | return cb(new Error('invalid hash:'+hash)) |
247 | |
248 | if(!legacy) { |
249 | blobs.has.call(this, hash, cb) |
250 | } |
251 | else { |
252 | //LEGACY LEGACY LEGACY |
253 | if(this === self || !this || this === global) { // a local call |
254 | return blobs.has.call(this, hash, cb) |
255 | } |
256 | //ELSE, interpret remote calls to has as a WANT request. |
257 | //handling this by calling process (which calls size()) |
258 | //avoids a race condition in the tests. |
259 | //(and avoids doubling the number of calls) |
260 | var a = Array.isArray(hash) ? hash : [hash] |
261 | var o = {} |
262 | a.forEach(function (h) { o[h] = -1 }) |
263 | //since this is always "has" process will never use the second arg. |
264 | process(o, null, function (err, res) { |
265 | var a = []; for(var k in o) a.push(res[k] > 0) |
266 | cb(null, Array.isArray(hash) ? a : a[0]) |
267 | }) |
268 | //LEGACY LEGACY LEGACY |
269 | } |
270 | }, |
271 | size: blobs.size, |
272 | get: blobs.get, |
273 | getSlice: blobs.getSlice, |
274 | add: blobs.add, |
275 | rm: blobs.rm, |
276 | ls: blobs.ls, |
277 | changes: function () { |
278 | //XXX for bandwidth sensitive peers, don't tell them about blobs we arn't trying to push. |
279 | return pull( |
280 | blobs.ls({old: false, meta: false}), |
281 | pull.filter(function (id) { |
282 | return !stingy || push[id] |
283 | }) |
284 | ) |
285 | }, |
286 | want: function (hash, cb) { |
287 | if(!isBlobId(hash)) |
288 | return cb(new Error('invalid hash:'+hash)) |
289 | //always broadcast wants immediately, because of race condition |
290 | //between has and adding a blob (needed to pass test/async.js) |
291 | var id = isAvailable(hash) |
292 | if(!id) queue(hash, -1) |
293 | |
294 | if(waiting[hash]) |
295 | waiting[hash].push(cb) |
296 | else { |
297 | waiting[hash] = [cb] |
298 | blobs.size(hash, function (err, has) { |
299 | if(has) { |
300 | while(waiting[hash].length) |
301 | waiting[hash].shift()(null, true) |
302 | delete waiting[hash] |
303 | } |
304 | }) |
305 | } |
306 | if(id) return get(id, hash) |
307 | }, |
308 | push: function (id, cb) { |
309 | //also store the id to push. |
310 | if(!isBlobId(id)) |
311 | return cb(new Error('invalid hash:'+id)) |
312 | |
313 | push[id] = push[id] || {} |
314 | queue(id, -1) |
315 | set.add(id, cb) |
316 | }, |
317 | pushed: function () { |
318 | return pushed.listen() |
319 | }, |
320 | createWants: function () { |
321 | return createWantStream(this.id) |
322 | }, |
323 | //private api. used for testing. not exposed over rpc. |
324 | _wantSink: wantSink, |
325 | _onConnect: function (other, name) { |
326 | peers[other.id] = other |
327 | //sending of your wants starts when you we know |
328 | //that they are not legacy style. |
329 | //process is called when wantSync |
330 | //doesn't immediately get an error. |
331 | pull(other.blobs.createWants(), wantSink(other)) |
332 | } |
333 | } |
334 | } |
335 | |
336 | |
337 | |
338 | |
339 |
Built with git-ssb-web