Files: 0b68762e851dab73abc45fa60add0d96d3b5c7ac / channel / obs / recent.js
2371 bytesRaw
1 | var nest = require('depnest') |
2 | var pull = require('pull-stream') |
3 | |
4 | var { Value, Dict, Struct, computed, resolve, throttle } = require('mutant') |
5 | |
6 | exports.needs = nest({ |
7 | 'sbot.pull.backlinks': 'first' |
8 | }) |
9 | |
10 | exports.gives = nest({ |
11 | 'channel.obs.recent': true |
12 | }) |
13 | |
14 | exports.create = function (api) { |
15 | var recentChannels = null |
16 | var channelsLookup = null |
17 | |
18 | return nest({ |
19 | 'channel.obs.recent': function () { |
20 | load() |
21 | return recentChannels |
22 | } |
23 | }) |
24 | |
25 | function load () { |
26 | if (!recentChannels) { |
27 | var sync = Value(false) |
28 | channelsLookup = Dict() |
29 | |
30 | pull( |
31 | api.sbot.pull.backlinks({ |
32 | old: false, |
33 | live: true, |
34 | query: [ |
35 | {$filter: { |
36 | dest: {$prefix: '#'} |
37 | }} |
38 | ] |
39 | }), |
40 | pull.drain(msg => { |
41 | var obs = channelsLookup.get(msg.dest) |
42 | if (!obs) { |
43 | obs = ChannelRef(msg.dest) |
44 | channelsLookup.put(msg.dest, obs) |
45 | } |
46 | obs.set({ |
47 | id: msg.dest, |
48 | updatedAt: Math.max(resolve(obs.updatedAt), msg.timestamp), |
49 | count: resolve(obs.count) + 1 |
50 | }) |
51 | }) |
52 | ) |
53 | |
54 | pull( |
55 | api.sbot.pull.backlinks({ |
56 | query: [ |
57 | {$filter: { |
58 | dest: {$prefix: '#'} |
59 | }}, |
60 | {$reduce: { |
61 | id: 'dest', |
62 | updatedAt: {$max: 'timestamp'}, |
63 | count: {$count: true} |
64 | }} |
65 | ] |
66 | }), |
67 | pull.drain((item) => { |
68 | if (item.sync) { |
69 | sync.set(true) |
70 | } else if (item.id && item.id.startsWith('#')) { |
71 | var name = item.id |
72 | var channel = channelsLookup.get(name) |
73 | if (!channel) { |
74 | channel = ChannelRef(name) |
75 | channelsLookup.put(name, channel) |
76 | } |
77 | channel.set(item) |
78 | } |
79 | }, (err) => { |
80 | if (err) throw err |
81 | sync.set(true) |
82 | }) |
83 | ) |
84 | recentChannels = computed(throttle(channelsLookup, 1000), (lookup) => { |
85 | var values = Object.keys(lookup).map(x => lookup[x]).sort((a, b) => b.updatedAt - a.updatedAt).map(x => x.id.slice(1)) |
86 | return values |
87 | }) |
88 | recentChannels.sync = sync |
89 | } |
90 | } |
91 | } |
92 | |
93 | function ChannelRef (id) { |
94 | return Struct({ |
95 | id, |
96 | updatedAt: Value(0), |
97 | count: Value(0) |
98 | }, {merge: true}) |
99 | } |
100 |
Built with git-ssb-web