git ssb

1+

Dominic / pull-flood



Tree: 044dcb33c78bf8c254d2769529899a9e7393ceb7

Files: 044dcb33c78bf8c254d2769529899a9e7393ceb7 / index.js

1408 bytesRaw
1var deepEqual = require('deep-equal')
2var Pushable = require('pull-pushable')
3var pull = require('pull-stream')
4var timestamp = require('monotonic-timestamp')
5
6module.exports = function (opts) {
7 var data = [], peers = {}
8 opts = opts || {}
9 var max = opts.max == null ? 100 : opts.max
10 function unique (msg) {
11 for(var i = 0; i < data.length; i++)
12 if(deepEqual(msg, data[i])) return false
13 return true
14 }
15 function broadcast(msg, not_id) {
16 for(var k in peers)
17 if(k != not_id)
18 peers[k].push(msg)
19 }
20
21 function update (msg, not_id) {
22 if(!unique(msg)) return
23 data.push({key: timestamp(), value: msg})
24 if(data.length > opts.max)
25 var _msg = data.shift()
26 opts.onMessage && opts.onMessage(msg)
27 if(_msg) opts.onDrop && opts.onDrop(_msg)
28 broadcast(msg, not_id)
29 }
30
31 return {
32 id: opts.id,
33 data: data,
34 append: function (msg) {
35 update(msg, null)
36 },
37 createStream: function (opts) {
38 var id = opts.id
39 if(peers[id]) {
40 peers[id].end()
41 delete peers[id]
42 }
43 var source = peers[id] = Pushable(function (err) {
44 if(peers[id] === source) delete peers[id]
45 })
46 return {
47 source: source,
48 sink: pull.drain(function (msg) {
49 update(msg, id)
50 }, function (err) {
51 if(peers[id] === source) delete peers[id]
52 })
53
54 }
55 }
56 }
57}
58
59

Built with git-ssb-web