Files: ade6f81f573808634650dd12382576ad44b97c1d / tag / obs.js
5580 bytesRaw
1 | var { Value, Set, computed } = require('mutant') |
2 | var pull = require('pull-stream') |
3 | var nest = require('depnest') |
4 | var ref = require('ssb-ref') |
5 | var set = require('lodash/set') |
6 | var unset = require('lodash/unset') |
7 | var get = require('lodash/get') |
8 | var isEmpty = require('lodash/isEmpty') |
9 | |
10 | exports.needs = nest({ |
11 | 'sbot.pull.stream': 'first' |
12 | }) |
13 | |
14 | exports.gives = nest({ |
15 | 'tag.obs': [ |
16 | 'taggedMessages', |
17 | 'messageTags', |
18 | 'messageTagsFrom', |
19 | 'messageTaggers', |
20 | 'allTagsFrom', |
21 | 'allTags' |
22 | ] |
23 | }) |
24 | |
25 | exports.create = function(api) { |
26 | var tagsCache = {} |
27 | var messagesCache = {} |
28 | var cacheLoading = false |
29 | var sync = Value(false) |
30 | |
31 | return nest({ |
32 | 'tag.obs': { |
33 | taggedMessages, |
34 | messageTags, |
35 | messageTagsFrom, |
36 | messageTaggers, |
37 | allTagsFrom, |
38 | allTags |
39 | } |
40 | }) |
41 | |
42 | function taggedMessages(author, tagId) { |
43 | if (!ref.isFeed(author) || !ref.isLink(tagId)) throw new Error('Requires an ssb ref!') |
44 | return withSync(computed([getObs(author, tagsCache), tagId], getTaggedMessages)) |
45 | } |
46 | |
47 | function messageTags(msgId) { |
48 | if (!ref.isLink(msgId)) throw new Error('Requires an ssb ref!') |
49 | return withSync(computed(getObs(msgId, messagesCache), getMessageTags)) |
50 | } |
51 | |
52 | function messageTagsFrom(msgId, author) { |
53 | if (!ref.isLink(msgId) || !ref.isFeed(author)) throw new Error('Requires an ssb ref!') |
54 | return withSync(computed([getObs(msgId, messagesCache), author], getMessageTagsFrom)) |
55 | } |
56 | |
57 | function messageTaggers(msgId, tagId) { |
58 | if (!ref.isLink(msgId) || !ref.isLink(tagId)) throw new Error('Requires an ssb ref!') |
59 | return withSync(computed([getObs(msgId, messagesCache), tagId], getMessageTaggers)) |
60 | } |
61 | |
62 | function allTagsFrom(author) { |
63 | if (!ref.isFeed(author)) throw new Error('Requires an ssb ref!') |
64 | return withSync(computed(getObs(author, tagsCache), Object.keys)) |
65 | } |
66 | |
67 | function allTags() { |
68 | return withSync(getAllTags(getCache(tagsCache))) |
69 | } |
70 | |
71 | function withSync(obs) { |
72 | obs.sync = sync |
73 | return obs |
74 | } |
75 | |
76 | function getObs(id, lookup) { |
77 | if (!ref.isLink(id)) throw new Error('Requires an ssb ref!') |
78 | if (!cacheLoading) { |
79 | cacheLoading = true |
80 | loadCache() |
81 | } |
82 | if (!lookup[id]) { |
83 | lookup[id] = Value({}) |
84 | } |
85 | return lookup[id] |
86 | } |
87 | |
88 | function getCache(lookup) { |
89 | if (!cacheLoading) { |
90 | cacheLoading = true |
91 | loadCache() |
92 | } |
93 | return lookup |
94 | } |
95 | |
96 | function update(id, values, lookup) { |
97 | const state = getObs(id, lookup) |
98 | const lastState = state() |
99 | var changed = false |
100 | |
101 | for (const tag in values) { |
102 | const lastTag = lastState[tag] |
103 | const isUnusedTag = isEmpty(values[tag]) && (lastTag === undefined || !isEmpty(lastTag)) |
104 | if (isUnusedTag) { |
105 | set(lastState, [ tag ], {}) |
106 | changed = true |
107 | continue |
108 | } |
109 | for (const key in values[tag]) { |
110 | const value = get(values, [ tag, key ]) |
111 | const lastValue = get(lastState, [ tag, key ]) |
112 | if (value !== lastValue) { |
113 | if (value) { |
114 | set(lastState, [ tag, key ], value) |
115 | } else { |
116 | unset(lastState, [ tag, key ]) |
117 | } |
118 | changed = true |
119 | } |
120 | } |
121 | } |
122 | |
123 | if (changed) { |
124 | state.set(lastState) |
125 | } |
126 | } |
127 | |
128 | function loadCache() { |
129 | pull( |
130 | api.sbot.pull.stream(sbot => sbot.tags.stream({ live: true })), |
131 | pull.drain(item => { |
132 | if (!sync()) { |
133 | // populate tags observable cache |
134 | const messageLookup = {} |
135 | for (const author in item) { |
136 | update(author, item[author], tagsCache) |
137 | |
138 | // generate message lookup |
139 | for (const tag in item[author]) { |
140 | for (const message in item[author][tag]) { |
141 | set(messageLookup, [message, tag, author], item[author][tag][message]) |
142 | } |
143 | } |
144 | } |
145 | |
146 | // populate messages observable cache |
147 | for (const message in messageLookup) { |
148 | update(message, messageLookup[message], messagesCache) |
149 | } |
150 | |
151 | if (!sync()) { |
152 | sync.set(true) |
153 | } |
154 | } else if (item && ref.isLink(item.tagKey) && ref.isFeed(item.author) && ref.isLink(item.message)) { |
155 | // handle realtime updates |
156 | const { tagKey, author, message, tagged, timestamp } = item |
157 | if (tagged) { |
158 | update(author, { [tagKey]: { [message]: timestamp } }, tagsCache) |
159 | update(message, { [tagKey]: { [author]: timestamp } }, messagesCache) |
160 | } else { |
161 | update(author, { [tagKey]: { [message]: false } }, tagsCache) |
162 | update(message, { [tagKey]: { [author]: false } }, messagesCache) |
163 | } |
164 | } |
165 | }) |
166 | ) |
167 | } |
168 | } |
169 | |
170 | function getTaggedMessages(lookup, key) { |
171 | const messages = [] |
172 | for (const msg in lookup[key]) { |
173 | if (lookup[key][msg]) { |
174 | messages.push(msg) |
175 | } |
176 | } |
177 | return messages |
178 | } |
179 | |
180 | function getMessageTags(lookup) { |
181 | const tags = [] |
182 | for (const tag in lookup) { |
183 | if (!isEmpty(lookup[tag])) { |
184 | tags.push(tag) |
185 | } |
186 | } |
187 | return tags |
188 | } |
189 | |
190 | function getMessageTagsFrom(lookup, author) { |
191 | const tags = [] |
192 | for (const tag in lookup) { |
193 | if (lookup[tag][author]) { |
194 | tags.push(tag) |
195 | } |
196 | } |
197 | return tags |
198 | } |
199 | |
200 | function getMessageTaggers(lookup, key) { |
201 | const taggers = [] |
202 | for (const author in lookup[key]) { |
203 | if (lookup[key][author]) { |
204 | taggers.push(author) |
205 | } |
206 | } |
207 | return taggers |
208 | } |
209 | |
210 | function getAllTags(lookup) { |
211 | const tags = Set([]) |
212 | for (const author in lookup) { |
213 | const authorTags = lookup[author]() |
214 | for (const tag in authorTags) { |
215 | tags.add(tag) |
216 | } |
217 | } |
218 | return tags |
219 | } |
220 |
Built with git-ssb-web