Files: 73d53a413c5991111416c31c1d615c89aefe88af / tag / obs / tagged.js
3611 bytesRaw
1 | var { Value, computed } = require('mutant') |
2 | var pull = require('pull-stream') |
3 | var nest = require('depnest') |
4 | var ref = require('ssb-ref') |
5 | var set = require('lodash/set') |
6 | |
7 | exports.needs = nest({ |
8 | 'sbot.pull.stream': 'first' |
9 | }) |
10 | |
11 | exports.gives = nest({ |
12 | 'tag.obs': [ |
13 | 'taggedMessages', |
14 | 'messageTags', |
15 | 'allTagsFrom' |
16 | ] |
17 | }) |
18 | |
19 | exports.create = function(api) { |
20 | var tagsCache = null |
21 | var messagesCache = null |
22 | var sync = Value(false) |
23 | |
24 | return nest({ |
25 | 'tag.obs': { |
26 | taggedMessages, |
27 | messageTags, |
28 | allTagsFrom |
29 | } |
30 | }) |
31 | |
32 | function taggedMessages(author, tagId) { |
33 | if (!ref.isLink(author) || !ref.isLink(tagId)) throw new Error('Requires an ssb ref!') |
34 | return withSync(computed([get(author, tagsCache), tagId], getTaggedMessages)) |
35 | } |
36 | |
37 | function messageTags(msgId, tagId) { |
38 | if (!ref.isLink(tagId) || !ref.isLink(msgId)) throw new Error('Requires an ssb ref!') |
39 | return withSync(computed([get(msgId, messagesCache), tagId], getMessageTags)) |
40 | } |
41 | |
42 | function allTagsFrom(author) { |
43 | if (!ref.isLink(author)) throw new Error('Requires an ssb ref!') |
44 | return withSync(computed(get(author, tagsCache), lookup => Object.keys(lookup))) |
45 | } |
46 | |
47 | function withSync(obs) { |
48 | obs.sync = sync |
49 | return obs |
50 | } |
51 | |
52 | function get(id, lookup) { |
53 | if (!ref.isLink(id)) throw new Error('Requires an ssb ref!') |
54 | load() |
55 | if (!lookup[id]) { |
56 | lookup[id] = Value({}) |
57 | } |
58 | return lookup[id] |
59 | } |
60 | |
61 | function load() { |
62 | if (!tagsCache) { |
63 | tagsCache = {} |
64 | messagesCache = {} |
65 | pull( |
66 | api.sbot.pull.stream(sbot => sbot.tags.stream({ live: true })), |
67 | pull.drain(item => { |
68 | if (!sync()) { |
69 | // populate tags observable cache |
70 | const messageLookup = {} |
71 | for (const author in item) { |
72 | update(author, item[author], tagsCache) |
73 | |
74 | // generate message lookup |
75 | for (const tag in item[author]) { |
76 | for (const message in item[author][tag]) { |
77 | set(messageLookup, [message, tag, author], item[message][author][tag]) |
78 | } |
79 | } |
80 | } |
81 | |
82 | // populate messages observable cache |
83 | for (const message in messageLookup) { |
84 | update(message, messageLookup[message], messagesCache) |
85 | } |
86 | |
87 | if (!sync()) { |
88 | sync.set(true) |
89 | } |
90 | } else if (item && ref.isLink(item.tag) && ref.isLink(item.author) && ref.isLink(item.message)) { |
91 | // handle realtime updates |
92 | const { tag, author, message, tagged, timestamp } = item |
93 | update(author, { [tag]: { [message]: timestamp } }, tagsCache) |
94 | update(message, { [tag]: { [author]: timestamp } }, messagesCache) |
95 | } |
96 | }) |
97 | ) |
98 | } |
99 | } |
100 | } |
101 | |
102 | function update(id, values, lookup) { |
103 | const state = get(id, lookup) |
104 | const lastState = state() |
105 | var changed = false |
106 | |
107 | for (const tag in values) { |
108 | for (const key in values[tag]) { |
109 | if (values[tag][key] !== lastState[tag][key]) { |
110 | lastState[tag][key] = values[tag][key] |
111 | changed = true |
112 | } |
113 | } |
114 | } |
115 | |
116 | if (changed) { |
117 | state.set(lastState) |
118 | } |
119 | } |
120 | |
121 | function getTaggedMessages(lookup, key) { |
122 | const messages = [] |
123 | for (const msg in lookup[key]) { |
124 | if (lookup[key][msg].tagged) { |
125 | messages.push(msg) |
126 | } |
127 | } |
128 | return messages |
129 | } |
130 | |
131 | function getMessageTags(lookup, tagId) { |
132 | const tags = {} |
133 | for (const author in lookup[tagId]) { |
134 | if (lookup[tagId][author]) { |
135 | if (!tags[tagId]) tags[tagId] = {} |
136 | tags[tagId][author] = lookup[tagId][author] |
137 | } |
138 | } |
139 | return tags |
140 | } |
141 |
Built with git-ssb-web