git ssb

0+

Josiah / scuttle-tag



Tree: ade6f81f573808634650dd12382576ad44b97c1d

Files: ade6f81f573808634650dd12382576ad44b97c1d / tag / obs.js

5580 bytesRaw
1var { Value, Set, computed } = require('mutant')
2var pull = require('pull-stream')
3var nest = require('depnest')
4var ref = require('ssb-ref')
5var set = require('lodash/set')
6var unset = require('lodash/unset')
7var get = require('lodash/get')
8var isEmpty = require('lodash/isEmpty')
9
10exports.needs = nest({
11 'sbot.pull.stream': 'first'
12})
13
14exports.gives = nest({
15 'tag.obs': [
16 'taggedMessages',
17 'messageTags',
18 'messageTagsFrom',
19 'messageTaggers',
20 'allTagsFrom',
21 'allTags'
22 ]
23})
24
25exports.create = function(api) {
26 var tagsCache = {}
27 var messagesCache = {}
28 var cacheLoading = false
29 var sync = Value(false)
30
31 return nest({
32 'tag.obs': {
33 taggedMessages,
34 messageTags,
35 messageTagsFrom,
36 messageTaggers,
37 allTagsFrom,
38 allTags
39 }
40 })
41
42 function taggedMessages(author, tagId) {
43 if (!ref.isFeed(author) || !ref.isLink(tagId)) throw new Error('Requires an ssb ref!')
44 return withSync(computed([getObs(author, tagsCache), tagId], getTaggedMessages))
45 }
46
47 function messageTags(msgId) {
48 if (!ref.isLink(msgId)) throw new Error('Requires an ssb ref!')
49 return withSync(computed(getObs(msgId, messagesCache), getMessageTags))
50 }
51
52 function messageTagsFrom(msgId, author) {
53 if (!ref.isLink(msgId) || !ref.isFeed(author)) throw new Error('Requires an ssb ref!')
54 return withSync(computed([getObs(msgId, messagesCache), author], getMessageTagsFrom))
55 }
56
57 function messageTaggers(msgId, tagId) {
58 if (!ref.isLink(msgId) || !ref.isLink(tagId)) throw new Error('Requires an ssb ref!')
59 return withSync(computed([getObs(msgId, messagesCache), tagId], getMessageTaggers))
60 }
61
62 function allTagsFrom(author) {
63 if (!ref.isFeed(author)) throw new Error('Requires an ssb ref!')
64 return withSync(computed(getObs(author, tagsCache), Object.keys))
65 }
66
67 function allTags() {
68 return withSync(getAllTags(getCache(tagsCache)))
69 }
70
71 function withSync(obs) {
72 obs.sync = sync
73 return obs
74 }
75
76 function getObs(id, lookup) {
77 if (!ref.isLink(id)) throw new Error('Requires an ssb ref!')
78 if (!cacheLoading) {
79 cacheLoading = true
80 loadCache()
81 }
82 if (!lookup[id]) {
83 lookup[id] = Value({})
84 }
85 return lookup[id]
86 }
87
88 function getCache(lookup) {
89 if (!cacheLoading) {
90 cacheLoading = true
91 loadCache()
92 }
93 return lookup
94 }
95
96 function update(id, values, lookup) {
97 const state = getObs(id, lookup)
98 const lastState = state()
99 var changed = false
100
101 for (const tag in values) {
102 const lastTag = lastState[tag]
103 const isUnusedTag = isEmpty(values[tag]) && (lastTag === undefined || !isEmpty(lastTag))
104 if (isUnusedTag) {
105 set(lastState, [ tag ], {})
106 changed = true
107 continue
108 }
109 for (const key in values[tag]) {
110 const value = get(values, [ tag, key ])
111 const lastValue = get(lastState, [ tag, key ])
112 if (value !== lastValue) {
113 if (value) {
114 set(lastState, [ tag, key ], value)
115 } else {
116 unset(lastState, [ tag, key ])
117 }
118 changed = true
119 }
120 }
121 }
122
123 if (changed) {
124 state.set(lastState)
125 }
126 }
127
128 function loadCache() {
129 pull(
130 api.sbot.pull.stream(sbot => sbot.tags.stream({ live: true })),
131 pull.drain(item => {
132 if (!sync()) {
133 // populate tags observable cache
134 const messageLookup = {}
135 for (const author in item) {
136 update(author, item[author], tagsCache)
137
138 // generate message lookup
139 for (const tag in item[author]) {
140 for (const message in item[author][tag]) {
141 set(messageLookup, [message, tag, author], item[author][tag][message])
142 }
143 }
144 }
145
146 // populate messages observable cache
147 for (const message in messageLookup) {
148 update(message, messageLookup[message], messagesCache)
149 }
150
151 if (!sync()) {
152 sync.set(true)
153 }
154 } else if (item && ref.isLink(item.tagKey) && ref.isFeed(item.author) && ref.isLink(item.message)) {
155 // handle realtime updates
156 const { tagKey, author, message, tagged, timestamp } = item
157 if (tagged) {
158 update(author, { [tagKey]: { [message]: timestamp } }, tagsCache)
159 update(message, { [tagKey]: { [author]: timestamp } }, messagesCache)
160 } else {
161 update(author, { [tagKey]: { [message]: false } }, tagsCache)
162 update(message, { [tagKey]: { [author]: false } }, messagesCache)
163 }
164 }
165 })
166 )
167 }
168}
169
170function getTaggedMessages(lookup, key) {
171 const messages = []
172 for (const msg in lookup[key]) {
173 if (lookup[key][msg]) {
174 messages.push(msg)
175 }
176 }
177 return messages
178}
179
180function getMessageTags(lookup) {
181 const tags = []
182 for (const tag in lookup) {
183 if (!isEmpty(lookup[tag])) {
184 tags.push(tag)
185 }
186 }
187 return tags
188}
189
190function getMessageTagsFrom(lookup, author) {
191 const tags = []
192 for (const tag in lookup) {
193 if (lookup[tag][author]) {
194 tags.push(tag)
195 }
196 }
197 return tags
198}
199
200function getMessageTaggers(lookup, key) {
201 const taggers = []
202 for (const author in lookup[key]) {
203 if (lookup[key][author]) {
204 taggers.push(author)
205 }
206 }
207 return taggers
208}
209
210function getAllTags(lookup) {
211 const tags = Set([])
212 for (const author in lookup) {
213 const authorTags = lookup[author]()
214 for (const tag in authorTags) {
215 tags.add(tag)
216 }
217 }
218 return tags
219}
220

Built with git-ssb-web