git ssb

3+

Dominic / ssb-blobs



Tree: a569cc80c75f15b0204f3af8a330e5866dd83389

Files: a569cc80c75f15b0204f3af8a330e5866dd83389 / inject.js

9182 bytesRaw
1'use strict'
2function isEmpty (o) {
3 for(var k in o) return false
4 return true
5}
6
7function isInteger (i) {
8 return Number.isInteger(i)
9}
10
11var isArray = Array.isArray
12
13var Notify = require('pull-notify')
14var pull = require('pull-stream')
15var isBlobId = require('ssb-ref').isBlob
16
17var MB = 1024*1024
18var MAX_SIZE = 5*MB
19function noop () {}
20
21function clone (obj) {
22 var o = {}
23 for(var k in obj)
24 o[k] = obj[k]
25 return o
26}
27
28function count(obj) {
29 var c = 0
30 for(var k in obj) c++
31 return c
32}
33
34function onAbort(abortCb) {
35 return function (read) {
36 return function (abort, cb) {
37 if (abort) abortCb(abort, cb)
38 else read(null, cb)
39 }
40 }
41}
42
43module.exports = function inject (blobs, set, name, opts) {
44 opts = opts || {}
45 //sympathy controls whether you'll replicate
46 var sympathy = opts.sympathy == null ? 3 : opts.sympathy | 0
47 var stingy = opts.stingy === true
48 var legacy = opts.legacy !== false
49 var pushy = opts.pushy || 3
50 var max = opts.max || 5*1024*1024
51
52 var notify = Notify()
53 var pushed = Notify()
54
55 var peers = {}
56 var want = {}, push = {}, waiting = {}, getting = {}
57 var available = {}, streams = {}
58 var send = {}, timer
59
60 function queue (hash, hops) {
61 if(hops < 0)
62 want[hash] = hops
63 else
64 delete want[hash]
65
66 send[hash] = hops
67 var _send = send;
68 send = {}
69 notify(_send)
70 }
71
72 function isAvailable(id) {
73 for(var peer in peers)
74 if(available[peer] && available[peer][id] < max && peers[peer])
75 return peer
76 }
77
78 function get (peer, id, name) {
79 if(getting[id] || !peers[peer]) return
80
81 getting[id] = peer
82 var source = peers[peer].blobs.get({key: id, max: max})
83 pull(source, blobs.add(id, function (err, _id) {
84 delete getting[id]
85 if(err) {
86 if(available[peer]) delete available[peer][id]
87 //check if another peer has this.
88 //if so get it from them.
89 if(peer = isAvailable(id)) get(peer, id, name)
90 }
91 }))
92 }
93
94 function wants (peer, id, hops) {
95 if(Math.abs(hops) > sympathy) return //sorry!
96 if(!want[id] || want[id] < hops) {
97 want[id] = hops
98 queue(id, hops)
99 if(peer = isAvailable(id)) {
100 get(peer, id)
101 }
102 }
103 }
104
105 pull(
106 blobs.ls({old: false, meta: true}),
107 pull.drain(function (data) {
108 queue(data.id, data.size)
109 delete want[data.id]
110 if(waiting[data.id])
111 while(waiting[data.id].length)
112 waiting[data.id].shift()(null, true)
113 })
114 )
115
116 function has(peer_id, id, size) {
117 if('string' !== typeof peer_id) throw new Error('peer must be string id')
118 available[peer_id] = available[peer_id] || {}
119 available[peer_id][id] = size
120 //if we are broadcasting this blob,
121 //mark this peer has it.
122 //if N peers have it, we can stop broadcasting.
123 if(push[id]) {
124 push[id][peer_id] = size
125 if(count(push[id]) >= pushy) {
126 var data = {key: id, peers: push[id]}
127 set.remove(id)
128 delete push[id]; pushed(data)
129 }
130 }
131 if(want[id] && !getting[id] && size < max) get(peer_id, id)
132 }
133
134 function process (data, peer, cb) {
135 var n = 0, res = {}
136 for(var id in data) (function (id) {
137 if(isBlobId(id) && isInteger(data[id])) {
138 if(data[id] < 0 && (opts.stingy !== true || push[id])) { //interpret as "WANT"
139 n++
140 //check whether we already *HAVE* this file.
141 //respond with it's size, if we do.
142 blobs.size(id, function (err, size) { //XXX
143 if(size) res[id] = size
144 else wants(peer, id, data[id] - 1)
145 next()
146 })
147 }
148 else if(data[id] > 0) { //interpret as "HAS"
149 has(peer, id, data[id])
150 }
151 }
152 }(id))
153
154 function next () {
155 if(--n) return
156 cb(null, res)
157 }
158 }
159
160 function dead (peer_id) {
161 delete peers[peer_id]
162 delete available[peer_id]
163 delete streams[peer_id]
164 }
165
166 //LEGACY LEGACY LEGACY
167 function legacySync (peer) {
168 if(!legacy) return
169
170 var drain //we need to keep a reference to drain
171 //so we can abort it when we get an error.
172 function hasLegacy (hashes) {
173 var ary = Object.keys(hashes).filter(function (k) {
174 return hashes[k] < 0
175 })
176 if(ary.length)
177 peer.blobs.has(ary, function (err, haves) {
178 if(err) drain.abort(err) //ERROR: abort this stream.
179 else haves.forEach(function (have, i) {
180 if(have) has(peer.id, ary[i], have)
181 })
182 })
183 }
184
185 function notPeer (err) {
186 if(err) dead(peer.id)
187 }
188
189 drain = pull.drain(function (hash) {
190 has(peer.id, hash, true)
191 }, notPeer)
192
193
194 pull(peer.blobs.changes(), drain)
195
196 hasLegacy(want)
197
198 //a stream of hashes
199 pull(notify.listen(), pull.drain(hasLegacy, notPeer))
200 }
201 //LEGACY LEGACY LEGACY
202
203 function createWantStream (id) {
204 if(!streams[id]) {
205 streams[id] = notify.listen()
206
207 //merge in ids we are pushing.
208 var w = clone(want)
209 for(var k in push) w[k] = -1
210 streams[id].push(w)
211 }
212 return pull(streams[id], onAbort(function (err, cb) {
213 streams[id] = false
214 cb(err)
215 }))
216 }
217
218 function wantSink (peer) {
219 createWantStream(peer.id) //set streams[peer.id]
220
221 var modern = false
222 return pull.drain(function (data) {
223 modern = true
224 //respond with list of blobs you already have,
225 process(data, peer.id, function (err, has_data) {
226 //(if you have any)
227 if(!isEmpty(has_data) && streams[peer.id]) streams[peer.id].push(has_data)
228 })
229 }, function (err) {
230 if(err && !modern) {
231 streams[peer.id] = false
232 if(legacy) legacySync(peer)
233 }
234 //if can handle unpeer another way,
235 //then can refactor legacy handling out of sight.
236
237 //handle error and fallback to legacy mode, if enabled.
238 else if(peers[peer.id] == peer) {
239 delete peers[peer.id]
240 delete available[peer.id]
241 delete streams[peer.id]
242 }
243 })
244 }
245
246 var self
247 return self = {
248 //id: name,
249 has: function (hash, cb) {
250 if(isArray(hash)) {
251 for(var i = 0; i < hash.length; i++)
252 if(!isBlobId(hash[i]))
253 return cb(new Error('invalid hash:'+hash[i]))
254 }
255 else if(!isBlobId(hash))
256 return cb(new Error('invalid hash:'+hash))
257
258 if(!legacy) {
259 blobs.has.call(this, hash, cb)
260 }
261 else {
262 //LEGACY LEGACY LEGACY
263 if(this === self || !this || this === global) { // a local call
264 return blobs.has.call(this, hash, cb)
265 }
266 //ELSE, interpret remote calls to has as a WANT request.
267 //handling this by calling process (which calls size())
268 //avoids a race condition in the tests.
269 //(and avoids doubling the number of calls)
270 var a = Array.isArray(hash) ? hash : [hash]
271 var o = {}
272 a.forEach(function (h) { o[h] = -1 })
273 //since this is always "has" process will never use the second arg.
274 process(o, null, function (err, res) {
275 var a = []; for(var k in o) a.push(res[k] > 0)
276 cb(null, Array.isArray(hash) ? a : a[0])
277 })
278 //LEGACY LEGACY LEGACY
279 }
280 },
281 size: blobs.size,
282 get: blobs.get,
283 getSlice: blobs.getSlice,
284 add: blobs.add,
285 rm: blobs.rm,
286 ls: blobs.ls,
287 changes: function () {
288 //XXX for bandwidth sensitive peers, don't tell them about blobs we arn't trying to push.
289 return pull(
290 blobs.ls({old: false, meta: false}),
291 pull.filter(function (id) {
292 return !stingy || push[id]
293 })
294 )
295 },
296 want: function (hash, cb) {
297 if(!isBlobId(hash))
298 return cb(new Error('invalid hash:'+hash))
299 //always broadcast wants immediately, because of race condition
300 //between has and adding a blob (needed to pass test/async.js)
301 if(blobs.isEmptyHash(hash)) return cb(null, true)
302 var id = isAvailable(hash)
303
304 if(waiting[hash])
305 waiting[hash].push(cb)
306 else {
307 waiting[hash] = [cb]
308 blobs.size(hash, function (err, size) {
309 if(size != null) {
310 while(waiting[hash].length)
311 waiting[hash].shift()(null, true)
312 delete waiting[hash]
313 }
314 })
315 }
316
317 if(!id && waiting[hash]) queue(hash, -1)
318
319 if(id) return get(id, hash)
320 },
321 push: function (id, cb) {
322 //also store the id to push.
323 if(!isBlobId(id))
324 return cb(new Error('invalid hash:'+id))
325
326 push[id] = push[id] || {}
327 queue(id, -1)
328 set.add(id, cb)
329 },
330 pushed: function () {
331 return pushed.listen()
332 },
333 createWants: function () {
334 return createWantStream(this.id)
335 },
336 //private api. used for testing. not exposed over rpc.
337 _wantSink: wantSink,
338 _onConnect: function (other, name) {
339 peers[other.id] = other
340 //sending of your wants starts when you we know
341 //that they are not legacy style.
342 //process is called when wantSync
343 //doesn't immediately get an error.
344 pull(other.blobs.createWants(), wantSink(other))
345 }
346 }
347}
348
349
350
351

Built with git-ssb-web