Files: 073f7adfddd31252faca6d3c503e202ad1aef088 / tag / obs / tagged.js
4673 bytesRaw
1 | var { Value, Set, computed } = require('mutant') |
2 | var pull = require('pull-stream') |
3 | var nest = require('depnest') |
4 | var ref = require('ssb-ref') |
5 | var set = require('lodash/set') |
6 | var unset = require('lodash/unset') |
7 | var get = require('lodash/get') |
8 | var isEmpty = require('lodash/isEmpty') |
9 | |
10 | exports.needs = nest({ |
11 | 'sbot.pull.stream': 'first' |
12 | }) |
13 | |
14 | exports.gives = nest({ |
15 | 'tag.obs': [ |
16 | 'taggedMessages', |
17 | 'messageTags', |
18 | 'allTagsFrom', |
19 | 'allTags' |
20 | ] |
21 | }) |
22 | |
23 | exports.create = function(api) { |
24 | var tagsCache = {} |
25 | var messagesCache = {} |
26 | var cacheLoading = false |
27 | var sync = Value(false) |
28 | |
29 | return nest({ |
30 | 'tag.obs': { |
31 | taggedMessages, |
32 | messageTags, |
33 | allTagsFrom, |
34 | allTags |
35 | } |
36 | }) |
37 | |
38 | function taggedMessages(author, tagId) { |
39 | if (!ref.isFeed(author) || !ref.isLink(tagId)) throw new Error('Requires an ssb ref!') |
40 | return withSync(computed([getObs(author, tagsCache), tagId], getTaggedMessages)) |
41 | } |
42 | |
43 | function messageTags(msgId) { |
44 | if (!ref.isLink(msgId)) throw new Error('Requires an ssb ref!') |
45 | return withSync(computed(getObs(msgId, messagesCache), getMessageTags)) |
46 | } |
47 | |
48 | function allTagsFrom(author) { |
49 | if (!ref.isFeed(author)) throw new Error('Requires an ssb ref!') |
50 | return withSync(computed(getObs(author, tagsCache), Object.keys)) |
51 | } |
52 | |
53 | function allTags() { |
54 | return withSync(getAllTags(getCache(tagsCache))) |
55 | } |
56 | |
57 | function withSync(obs) { |
58 | obs.sync = sync |
59 | return obs |
60 | } |
61 | |
62 | function getObs(id, lookup) { |
63 | if (!ref.isLink(id)) throw new Error('Requires an ssb ref!') |
64 | if (!cacheLoading) { |
65 | cacheLoading = true |
66 | loadCache() |
67 | } |
68 | if (!lookup[id]) { |
69 | lookup[id] = Value({}) |
70 | } |
71 | return lookup[id] |
72 | } |
73 | |
74 | function getCache(lookup) { |
75 | if (!cacheLoading) { |
76 | cacheLoading = true |
77 | loadCache() |
78 | } |
79 | return lookup |
80 | } |
81 | |
82 | function update(id, values, lookup) { |
83 | const state = getObs(id, lookup) |
84 | const lastState = state() |
85 | var changed = false |
86 | |
87 | for (const tag in values) { |
88 | const lastTag = lastState[tag] |
89 | const isUnusedTag = isEmpty(values[tag]) && (lastTag === undefined || !isEmpty(lastTag)) |
90 | if (isUnusedTag) { |
91 | set(lastState, [ tag ], {}) |
92 | changed = true |
93 | continue |
94 | } |
95 | for (const key in values[tag]) { |
96 | const value = get(values, [ tag, key ]) |
97 | const lastValue = get(lastState, [ tag, key ]) |
98 | if (value !== lastValue) { |
99 | if (value) { |
100 | set(lastState, [ tag, key ], value) |
101 | } else { |
102 | unset(lastState, [ tag, key ]) |
103 | } |
104 | changed = true |
105 | } |
106 | } |
107 | } |
108 | |
109 | if (changed) { |
110 | state.set(lastState) |
111 | } |
112 | } |
113 | |
114 | function loadCache() { |
115 | pull( |
116 | api.sbot.pull.stream(sbot => sbot.tags.stream({ live: true })), |
117 | pull.drain(item => { |
118 | if (!sync()) { |
119 | // populate tags observable cache |
120 | const messageLookup = {} |
121 | for (const author in item) { |
122 | update(author, item[author], tagsCache) |
123 | |
124 | // generate message lookup |
125 | for (const tag in item[author]) { |
126 | for (const message in item[author][tag]) { |
127 | set(messageLookup, [message, tag, author], item[author][tag][message]) |
128 | } |
129 | } |
130 | } |
131 | |
132 | // populate messages observable cache |
133 | for (const message in messageLookup) { |
134 | update(message, messageLookup[message], messagesCache) |
135 | } |
136 | |
137 | if (!sync()) { |
138 | sync.set(true) |
139 | } |
140 | } else if (item && ref.isLink(item.tagKey) && ref.isFeed(item.author) && ref.isLink(item.message)) { |
141 | // handle realtime updates |
142 | const { tagKey, author, message, tagged, timestamp } = item |
143 | if (tagged) { |
144 | update(author, { [tagKey]: { [message]: timestamp } }, tagsCache) |
145 | update(message, { [tagKey]: { [author]: timestamp } }, messagesCache) |
146 | } else { |
147 | update(author, { [tagKey]: { [message]: false } }, tagsCache) |
148 | update(message, { [tagKey]: { [author]: false } }, messagesCache) |
149 | } |
150 | } |
151 | }) |
152 | ) |
153 | } |
154 | } |
155 | |
156 | function getTaggedMessages(lookup, key) { |
157 | const messages = [] |
158 | for (const msg in lookup[key]) { |
159 | if (lookup[key][msg]) { |
160 | messages.push(msg) |
161 | } |
162 | } |
163 | return messages |
164 | } |
165 | |
166 | function getMessageTags(lookup) { |
167 | const tags = [] |
168 | for (const tag in lookup) { |
169 | if (!isEmpty(lookup[tag])) { |
170 | tags.push(tag) |
171 | } |
172 | } |
173 | return tags |
174 | } |
175 | |
176 | function getAllTags(lookup) { |
177 | const tags = Set([]) |
178 | for (const author in lookup) { |
179 | const authorTags = lookup[author]() |
180 | for (const tag in authorTags) { |
181 | tags.add(tag) |
182 | } |
183 | } |
184 | return tags |
185 | } |
186 |
Built with git-ssb-web