Files: 5b58e9504759e8b1ff2db82c197c436adcf91d37 / contact / obs.js
3099 bytesRaw
1 | var nest = require('depnest') |
2 | var {Value, onceTrue, computed} = require('mutant') |
3 | var defer = require('pull-defer') |
4 | var pull = require('pull-stream') |
5 | var ref = require('ssb-ref') |
6 | |
7 | exports.needs = nest({ |
8 | 'sbot.obs.connection': 'first' |
9 | }) |
10 | |
11 | exports.gives = nest({ |
12 | 'contact.obs': ['following', 'followers'], |
13 | 'sbot.hook.publish': true |
14 | }) |
15 | |
16 | exports.create = function (api) { |
17 | var cacheLoading = false |
18 | var cache = {} |
19 | var sync = Value(false) |
20 | |
21 | return nest({ |
22 | 'contact.obs': { |
23 | following: (id) => get(id).following, |
24 | followers: (id) => get(id).followers |
25 | }, |
26 | 'sbot.hook.publish': function (msg) { |
27 | if (isContact(msg)) { |
28 | var source = msg.value.author |
29 | var dest = msg.value.content.contact |
30 | if (typeof msg.value.content.following === 'boolean') { |
31 | get(source).push({ |
32 | following: { |
33 | [dest]: [msg.value.content] |
34 | } |
35 | }) |
36 | get(dest).push({ |
37 | followers: { |
38 | [source]: [msg.value.content] |
39 | } |
40 | }) |
41 | } |
42 | } |
43 | } |
44 | }) |
45 | |
46 | function loadCache () { |
47 | pull( |
48 | StreamWhenConnected(api.sbot.obs.connection, sbot => sbot.contacts.stream({live: true})), |
49 | pull.drain(item => { |
50 | for (var target in item) { |
51 | if (ref.isFeed(target)) { |
52 | get(target).push(item[target]) |
53 | } |
54 | } |
55 | |
56 | if (!sync()) { |
57 | sync.set(true) |
58 | } |
59 | }) |
60 | ) |
61 | } |
62 | |
63 | function get (id) { |
64 | if (!ref.isFeed(id)) throw new Error('Contact state requires an id!') |
65 | if (!cacheLoading) { |
66 | cacheLoading = true |
67 | loadCache() |
68 | } |
69 | if (!cache[id]) { |
70 | cache[id] = Contact(api, id, sync) |
71 | } |
72 | return cache[id] |
73 | } |
74 | } |
75 | |
76 | function Contact (api, id, sync) { |
77 | var state = Value({}) |
78 | return { |
79 | following: computedIds(state, 'following', true, sync), |
80 | followers: computedIds(state, 'followers', true, sync), |
81 | push: function (values) { |
82 | var lastState = state() |
83 | var changed = false |
84 | for (var key in values) { |
85 | var valuesForKey = lastState[key] = lastState[key] || {} |
86 | for (var dest in values[key]) { |
87 | var value = values[key][dest] |
88 | if (!valuesForKey[dest] || value[1] > valuesForKey[dest][1] || !values[1] || !valuesForKey[dest[1]]) { |
89 | valuesForKey[dest] = value |
90 | changed = true |
91 | } |
92 | } |
93 | } |
94 | if (changed) { |
95 | state.set(lastState) |
96 | } |
97 | } |
98 | } |
99 | } |
100 | |
101 | function computedIds (state, key, compare, sync) { |
102 | var obs = computed([state, key, true], getIds) |
103 | obs.sync = sync |
104 | return obs |
105 | } |
106 | |
107 | function getIds (state, key, compare) { |
108 | var result = new Set() |
109 | if (state[key]) { |
110 | for (var dest in state[key]) { |
111 | if (state[key][dest][0] === compare) { |
112 | result.add(dest) |
113 | } |
114 | } |
115 | } |
116 | |
117 | return result |
118 | } |
119 | |
120 | function isContact (msg) { |
121 | return msg.value && msg.value.content && msg.value.content.type === 'contact' |
122 | } |
123 | |
124 | function StreamWhenConnected (connection, fn) { |
125 | var stream = defer.source() |
126 | onceTrue(connection, function (connection) { |
127 | stream.resolve(fn(connection)) |
128 | }) |
129 | return stream |
130 | } |
131 |
Built with git-ssb-web