git ssb

3+

Dominic / ssb-blobs



Tree: 340ef7cdcc9255c17889664920a134eca6611486

Files: 340ef7cdcc9255c17889664920a134eca6611486 / inject.js

8858 bytesRaw
1'use strict'
2function isEmpty (o) {
3 for(var k in o) return false
4 return true
5}
6
7function isInteger (i) {
8 return Number.isInteger(i)
9}
10
11var isArray = Array.isArray
12
13var Notify = require('pull-notify')
14var pull = require('pull-stream')
15var isBlobId = require('ssb-ref').isBlob
16
17var MB = 1024*1024
18var MAX_SIZE = 5*MB
19function noop () {}
20
21function clone (obj) {
22 var o = {}
23 for(var k in obj)
24 o[k] = obj[k]
25 return o
26}
27
28function count(obj) {
29 var c = 0
30 for(var k in obj) c++
31 return c
32}
33
34module.exports = function inject (blobs, set, name, opts) {
35 opts = opts || {}
36 //sympathy controls whether you'll replicate
37 var sympathy = opts.sympathy == null ? 3 : opts.sympathy | 0
38 var stingy = opts.stingy === true
39 var legacy = opts.legacy !== false
40 var pushy = opts.pushy || 3
41 var max = opts.max || 5*1024*1024
42
43 var notify = Notify()
44 var pushed = Notify()
45
46 var peers = {}
47 var want = {}, push = {}, waiting = {}, getting = {}
48 var available = {}, streams = {}
49 var send = {}, timer
50
51 function queue (hash, hops) {
52 if(hops < 0)
53 want[hash] = hops
54 else
55 delete want[hash]
56
57 send[hash] = hops
58 var _send = send;
59 send = {}
60 notify(_send)
61 }
62
63 function isAvailable(id) {
64 for(var peer in peers)
65 if(available[peer] && available[peer][id] < max && peers[peer])
66 return peer
67 }
68
69 function get (peer, id, name) {
70 if(getting[id] || !peers[peer]) return
71
72 getting[id] = peer
73 var source = peers[peer].blobs.get({key: id, max: max})
74 pull(source, blobs.add(id, function (err, _id) {
75 delete getting[id]
76 if(err) {
77 if(available[peer]) delete available[peer][id]
78 //check if another peer has this.
79 //if so get it from them.
80 if(peer = isAvailable(id)) get(peer, id, name)
81 }
82 }))
83 }
84
85 function wants (peer, id, hops) {
86 if(Math.abs(hops) > sympathy) return //sorry!
87 if(!want[id] || want[id] < hops) {
88 want[id] = hops
89 queue(id, hops)
90 if(peer = isAvailable(id)) {
91 get(peer, id)
92 }
93 }
94 }
95
96 pull(
97 blobs.ls({old: false, meta: true}),
98 pull.drain(function (data) {
99 queue(data.id, data.size)
100 delete want[data.id]
101 if(waiting[data.id])
102 while(waiting[data.id].length)
103 waiting[data.id].shift()(null, true)
104 })
105 )
106
107 function has(peer_id, id, size) {
108 if('string' !== typeof peer_id) throw new Error('peer must be string id')
109 available[peer_id] = available[peer_id] || {}
110 available[peer_id][id] = size
111 //if we are broadcasting this blob,
112 //mark this peer has it.
113 //if N peers have it, we can stop broadcasting.
114 if(push[id]) {
115 push[id][peer_id] = size
116 if(count(push[id]) >= pushy) {
117 var data = {key: id, peers: push[id]}
118 set.remove(id)
119 delete push[id]; pushed(data)
120 }
121 }
122 if(want[id] && !getting[id] && size < max) get(peer_id, id)
123 }
124
125 function process (data, peer, cb) {
126 var n = 0, res = {}
127 for(var id in data) {
128 if(isBlobId(id) && isInteger(data[id])) {
129 if(data[id] <= 0 && (opts.stingy !== true || push[id])) { //interpret as "WANT"
130 n++
131 //check whether we already *HAVE* this file.
132 //respond with it's size, if we do.
133 blobs.size(id, function (err, size) { //XXX
134 if(size) res[id] = size
135 else wants(peer, id, data[id] - 1)
136 next()
137 })
138 }
139 else if(data[id] > 0) { //interpret as "HAS"
140 has(peer, id, data[id])
141 }
142 }
143 }
144
145 function next () {
146 if(--n) return
147 cb(null, res)
148 }
149 }
150
151 function dead (peer_id) {
152 delete peers[peer_id]
153 delete available[peer_id]
154 delete streams[peer_id]
155 }
156
157 //LEGACY LEGACY LEGACY
158 function legacySync (peer) {
159 if(!legacy) return
160
161 var drain //we need to keep a reference to drain
162 //so we can abort it when we get an error.
163 function hasLegacy (hashes) {
164 var ary = Object.keys(hashes).filter(function (k) {
165 return hashes[k] < 0
166 })
167 if(ary.length)
168 peer.blobs.has(ary, function (err, haves) {
169 if(err) drain.abort(err) //ERROR: abort this stream.
170 else haves.forEach(function (have, i) {
171 if(have) has(peer.id, ary[i], have)
172 })
173 })
174 }
175
176 function notPeer (err) {
177 if(err) dead(peer.id)
178 }
179
180 drain = pull.drain(function (hash) {
181 has(peer.id, hash, true)
182 }, notPeer)
183
184
185 pull(peer.blobs.changes(), drain)
186
187 hasLegacy(want)
188
189 //a stream of hashes
190 pull(notify.listen(), pull.drain(hasLegacy, notPeer))
191 }
192 //LEGACY LEGACY LEGACY
193
194 function createWantStream (id) {
195 if(!streams[id]) {
196 streams[id] = notify.listen()
197
198 //merge in ids we are pushing.
199 var w = clone(want)
200 for(var k in push) w[k] = -1
201 streams[id].push(w)
202
203 return streams[id]
204 }
205 return streams[id]
206 }
207
208 function wantSink (peer) {
209 createWantStream(peer.id) //set streams[peer.id]
210
211 var modern = false
212 return pull.drain(function (data) {
213 modern = true
214 //respond with list of blobs you already have,
215 process(data, peer.id, function (err, has_data) {
216 //(if you have any)
217 if(!isEmpty(has_data) && streams[peer.id]) streams[peer.id].push(has_data)
218 })
219 }, function (err) {
220 if(err && !modern) {
221 streams[peer.id] = false
222 if(legacy) legacySync(peer)
223 }
224 //if can handle unpeer another way,
225 //then can refactor legacy handling out of sight.
226
227 //handle error and fallback to legacy mode, if enabled.
228 else if(peers[peer.id] == peer) {
229 delete peers[peer.id]
230 delete available[peer.id]
231 delete streams[peer.id]
232 }
233 })
234 }
235
236 var self
237 return self = {
238 //id: name,
239 has: function (hash, cb) {
240 if(isArray(hash)) {
241 for(var i = 0; i < hash.length; i++)
242 if(!isBlobId(hash[i]))
243 return cb(new Error('invalid hash:'+hash[i]))
244 }
245 else if(!isBlobId(hash))
246 return cb(new Error('invalid hash:'+hash))
247
248 if(!legacy) {
249 blobs.has.call(this, hash, cb)
250 }
251 else {
252 //LEGACY LEGACY LEGACY
253 if(this === self || !this || this === global) { // a local call
254 return blobs.has.call(this, hash, cb)
255 }
256 //ELSE, interpret remote calls to has as a WANT request.
257 //handling this by calling process (which calls size())
258 //avoids a race condition in the tests.
259 //(and avoids doubling the number of calls)
260 var a = Array.isArray(hash) ? hash : [hash]
261 var o = {}
262 a.forEach(function (h) { o[h] = -1 })
263 //since this is always "has" process will never use the second arg.
264 process(o, null, function (err, res) {
265 var a = []; for(var k in o) a.push(res[k] > 0)
266 cb(null, Array.isArray(hash) ? a : a[0])
267 })
268 //LEGACY LEGACY LEGACY
269 }
270 },
271 size: blobs.size,
272 get: blobs.get,
273 getSlice: blobs.getSlice,
274 add: blobs.add,
275 rm: blobs.rm,
276 ls: blobs.ls,
277 changes: function () {
278 //XXX for bandwidth sensitive peers, don't tell them about blobs we arn't trying to push.
279 return pull(
280 blobs.ls({old: false, meta: false}),
281 pull.filter(function (id) {
282 return !stingy || push[id]
283 })
284 )
285 },
286 want: function (hash, cb) {
287 if(!isBlobId(hash))
288 return cb(new Error('invalid hash:'+hash))
289 //always broadcast wants immediately, because of race condition
290 //between has and adding a blob (needed to pass test/async.js)
291 var id = isAvailable(hash)
292 if(!id) queue(hash, -1)
293
294 if(waiting[hash])
295 waiting[hash].push(cb)
296 else {
297 waiting[hash] = [cb]
298 blobs.size(hash, function (err, has) {
299 if(has) {
300 while(waiting[hash].length)
301 waiting[hash].shift()(null, true)
302 delete waiting[hash]
303 }
304 })
305 }
306 if(id) return get(id, hash)
307 },
308 push: function (id, cb) {
309 //also store the id to push.
310 if(!isBlobId(id))
311 return cb(new Error('invalid hash:'+id))
312
313 push[id] = push[id] || {}
314 queue(id, -1)
315 set.add(id, cb)
316 },
317 pushed: function () {
318 return pushed.listen()
319 },
320 createWants: function () {
321 return createWantStream(this.id)
322 },
323 //private api. used for testing. not exposed over rpc.
324 _wantSink: wantSink,
325 _onConnect: function (other, name) {
326 peers[other.id] = other
327 //sending of your wants starts when you we know
328 //that they are not legacy style.
329 //process is called when wantSync
330 //doesn't immediately get an error.
331 pull(other.blobs.createWants(), wantSink(other))
332 }
333 }
334}
335
336
337
338
339

Built with git-ssb-web