git ssb

1+

Dominic / pull-flood



Tree: 19b684ba319b74735187b3449321026e2a0127fd

Files: 19b684ba319b74735187b3449321026e2a0127fd / index.js

1334 bytesRaw
1var deepEqual = require('deep-equal')
2var Pushable = require('pull-pushable')
3var pull = require('pull-stream')
4
5module.exports = function (opts) {
6 var data = [], peers = {}
7 opts = opts || {}
8 var max = opts.max == null ? 100 : opts.max
9 function unique (msg) {
10 for(var i = 0; i < data.length; i++)
11 if(deepEqual(msg, data[i])) return false
12 return true
13 }
14 function broadcast(msg, not_id) {
15 for(var k in peers)
16 if(k != not_id)
17 peers[k].push(msg)
18 }
19
20 function update (msg, not_id) {
21 if(!unique(msg)) return
22 data.push(msg)
23 if(data.length > opts.max)
24 var _msg = data.shift()
25 opts.onMessage && opts.onMessage(msg)
26 if(_msg) opts.onDrop && opts.onDrop(_msg)
27 broadcast(msg, not_id)
28 }
29
30 return {
31 id: opts.id,
32 data: data,
33 append: function (msg) {
34 update(msg, null)
35 },
36 createStream: function (opts) {
37 var id = opts.id
38 if(peers[id]) {
39 peers[id].end()
40 delete peers[id]
41 }
42 var source = peers[id] = Pushable(function (err) {
43 if(peers[id] === source) delete peers[id]
44 })
45 return {
46 source: source,
47 sink: pull.drain(function (msg) {
48 update(msg, id)
49 }, function (err) {
50 if(peers[id] === source) delete peers[id]
51 })
52
53 }
54 }
55 }
56}
57
58

Built with git-ssb-web