git ssb

10+

Matt McKegg / patchwork



Tree: 2150c96ae49e67fdcb638a88455af2c3ae8c9c53

Files: 2150c96ae49e67fdcb638a88455af2c3ae8c9c53 / plugs / channel / obs / recent.js

1963 bytesRaw
1// uses lib/flumeview-channels
2
3var nest = require('depnest')
4var pull = require('pull-stream')
5
6var { Value, Dict, Struct, computed, resolve, throttle } = require('mutant')
7
8exports.needs = nest({
9 'sbot.pull.stream': 'first'
10})
11
12exports.gives = nest({
13 'channel.obs.recent': true
14})
15
16exports.create = function (api) {
17 var recentChannels = null
18 var channelsLookup = null
19
20 return nest({
21 'channel.obs.recent': function () {
22 load()
23 return recentChannels
24 }
25 })
26
27 function load () {
28 if (!recentChannels) {
29 var sync = Value(false)
30 channelsLookup = Dict()
31
32 pull(
33 api.sbot.pull.stream(sbot => sbot.channels.stream({live: true})),
34 pull.drain(msg => {
35 if (!sync()) {
36 channelsLookup.transaction(() => {
37 for (var channel in msg) {
38 var obs = ChannelRef(channel)
39 obs.set({
40 id: channel,
41 updatedAt: msg[channel].timestamp,
42 count: msg[channel].count
43 })
44 channelsLookup.put(channel, obs)
45 }
46 sync.set(true)
47 })
48 } else {
49 var obs = channelsLookup.get(msg.channel)
50 if (!obs) {
51 obs = ChannelRef(msg.dest)
52 channelsLookup.put(msg.dest, obs)
53 }
54 obs.set({
55 id: msg.channel,
56 updatedAt: Math.max(resolve(obs.updatedAt), msg.timestamp),
57 count: resolve(obs.count) + 1
58 })
59 }
60 })
61 )
62
63 recentChannels = computed(throttle(channelsLookup, 1000), (lookup) => {
64 var values = Object.keys(lookup).map(x => lookup[x]).sort((a, b) => b.updatedAt - a.updatedAt).map(x => x.id)
65 return values
66 })
67 recentChannels.sync = sync
68 }
69 }
70}
71
72function ChannelRef (id) {
73 return Struct({
74 id,
75 updatedAt: Value(0),
76 count: Value(0)
77 }, {merge: true})
78}
79

Built with git-ssb-web