Commit b148361cb34214f26c1e35030d95aa785ca1a5e2
Merge pull request #21 from ssbc/with-flume-indexes
Use flume indexes/reducers for likes, backlinks, channels, aboutsMatt McKegg authored on 6/12/2017, 1:31:00 PM
GitHub committed on 6/12/2017, 1:31:00 PM
Parent: d5f37c13cd95a2e321a1e51db1e64eef746ee0ea
Parent: b0afea5e68a6a31afda1eeb38830433a9beedd2b
Files changed
about/obs.js | changed |
channel/obs/recent.js | changed |
channel/obs/subscribed.js | changed |
contact/async.js | changed |
contact/obs.js | changed |
feed/pull/channel.js | changed |
index.js | changed |
message/html/backlinks.js | changed |
message/obs/backlinks.js | changed |
message/obs/likes.js | changed |
package.json | changed |
sbot.js | changed |
about/obs.js | ||
---|---|---|
@@ -1,17 +1,17 @@ | ||
1 | -var {Value, Struct, Dict, computed} = require('mutant') | |
2 | -var pullPause = require('pull-pause') | |
1 … | +var {Value, computed, onceTrue} = require('mutant') | |
2 … | +var defer = require('pull-defer') | |
3 | 3 … | var pull = require('pull-stream') |
4 | -var msgs = require('ssb-msgs') | |
5 | 4 … | var nest = require('depnest') |
6 | 5 … | var ref = require('ssb-ref') |
7 | 6 … | var colorHash = new (require('color-hash'))() |
8 | 7 … | |
9 | 8 … | exports.needs = nest({ |
10 | - 'sbot.pull.query': 'first', | |
9 … | + 'sbot.obs.connection': 'first', | |
11 | 10 … | 'blob.sync.url': 'first', |
12 | 11 … | 'keys.sync.id': 'first' |
13 | 12 … | }) |
13 … | + | |
14 | 14 … | exports.gives = nest({ |
15 | 15 … | 'about.obs': [ |
16 | 16 … | 'name', |
17 | 17 … | 'description', |
@@ -19,10 +19,9 @@ | ||
19 | 19 … | 'imageUrl', |
20 | 20 … | 'names', |
21 | 21 … | 'images', |
22 | 22 … | 'color' |
23 | - ], | |
24 | - 'sbot.hook.feed': true | |
23 … | + ] | |
25 | 24 … | }) |
26 | 25 … | |
27 | 26 … | exports.create = function (api) { |
28 | 27 … | var sync = Value(false) |
@@ -30,176 +29,143 @@ | ||
30 | 29 … | var cacheLoading = false |
31 | 30 … | |
32 | 31 … | return nest({ |
33 | 32 … | 'about.obs': { |
34 | - name: (id) => get(id).displayName, | |
33 … | + name: (id) => get(id).name, | |
35 | 34 … | description: (id) => get(id).description, |
36 | 35 … | image: (id) => get(id).image, |
37 | 36 … | imageUrl: (id) => get(id).imageUrl, |
38 | - | |
39 | 37 … | names: (id) => get(id).names, |
40 | 38 … | images: (id) => get(id).images, |
41 | 39 … | color: (id) => computed(id, (id) => colorHash.hex(id)) |
42 | - }, | |
43 | - 'sbot.hook.feed': function (msg) { | |
44 | - if (isAbout(msg) && msg.timestamp) { | |
45 | - var target = msg.value.content.about | |
46 | - var from = cache[target] | |
47 | - if (from) { | |
48 | - from.push({ | |
49 | - author: msg.value.author, | |
50 | - timestamp: msg.timestamp, | |
51 | - name: msg.value.content.name, | |
52 | - image: msg.value.content.image, | |
53 | - description: msg.value.content.description | |
54 | - }) | |
55 | - } | |
56 | - } | |
57 | 40 … | } |
58 | 41 … | }) |
59 | 42 … | |
60 | 43 … | function get (id) { |
44 … | + if (!ref.isFeed(id)) throw new Error('About requires an id!') | |
61 | 45 … | if (!cacheLoading) { |
62 | 46 … | cacheLoading = true |
63 | 47 … | loadCache() |
64 | 48 … | } |
65 | 49 … | if (!cache[id]) { |
66 | - cache[id] = About(api, id, sync) | |
50 … | + cache[id] = About(api, id) | |
67 | 51 … | } |
68 | 52 … | return cache[id] |
69 | 53 … | } |
70 | 54 … | |
71 | 55 … | function loadCache () { |
72 | 56 … | pull( |
73 | - api.sbot.pull.query({ | |
74 | - query: [ | |
75 | - {$filter: { | |
76 | - value: { | |
77 | - content: { | |
78 | - type: 'about' | |
79 | - } | |
80 | - } | |
81 | - }}, | |
82 | - {$map: { | |
83 | - timestamp: 'timestamp', | |
84 | - author: ['value', 'author'], | |
85 | - id: ['value', 'content', 'about'], | |
86 | - name: ['value', 'content', 'name'], | |
87 | - image: ['value', 'content', 'image'], | |
88 | - description: ['value', 'content', 'description'] | |
89 | - }} | |
90 | - ], | |
91 | - live: true | |
92 | - }), | |
93 | - pull.drain( | |
94 | - msg => { | |
95 | - if (msg.sync) { | |
96 | - sync.set(true) | |
97 | - } else if (msgs.isLink(msg.id, 'feed')) { | |
98 | - get(msg.id).push(msg) | |
57 … | + StreamWhenConnected(api.sbot.obs.connection, sbot => sbot.about.stream({live: true})), | |
58 … | + pull.drain(item => { | |
59 … | + for (var target in item) { | |
60 … | + if (ref.isFeed(target)) { | |
61 … | + get(target).push(item[target]) | |
99 | 62 … | } |
100 | - }, | |
101 | - () => sync.set(true) | |
102 | - ) | |
63 … | + } | |
64 … | + | |
65 … | + if (!sync()) { | |
66 … | + sync.set(true) | |
67 … | + } | |
68 … | + }) | |
103 | 69 … | ) |
104 | 70 … | } |
105 | 71 … | } |
106 | 72 … | |
107 | -function About (api, id, sync) { | |
108 | - if (!ref.isLink(id)) throw new Error('About requires an id!') | |
109 | - | |
110 | - var pauser = pullPause((paused) => {}) | |
111 | - | |
73 … | +function About (api, id) { | |
112 | 74 … | // transparent image |
113 | 75 … | var fallbackImageUrl = 'data:image/gif;base64,R0lGODlhAQABAPAAAP///wAAACH5BAEAAAAALAAAAAABAAEAAAICRAEAOw==' |
114 | 76 … | |
77 … | + var state = Value({}) | |
115 | 78 … | var yourId = api.keys.sync.id() |
116 | - var lastestTimestamps = {} | |
79 … | + var image = computed([state, 'image', id, yourId], socialValue) | |
80 … | + var name = computed([state, 'name', id, yourId, id.slice(1, 10)], socialValue) | |
81 … | + var description = computed([state, 'description', id, yourId], socialValue) | |
117 | 82 … | |
118 | - var obs = Struct({ | |
119 | - assignedNames: Dict(), | |
120 | - assignedImages: Dict(), | |
121 | - assignedDescriptions: Dict() | |
122 | - }, { | |
123 | - onListen: pauser.resume, | |
124 | - onUnlisten: pauser.pause | |
125 | - }) | |
126 | - | |
127 | - obs.sync = computed([sync, obs], (v) => v) | |
128 | - obs.displayName = computed([obs.assignedNames, id, yourId, id.slice(1, 10)], socialValue) | |
129 | - obs.description = computed([obs.assignedDescriptions, id, yourId], socialValue) | |
130 | - obs.image = computed([obs.assignedImages, id, yourId], socialValue) | |
131 | - | |
132 | - obs.names = computed(obs.assignedNames, indexByValue) | |
133 | - obs.images = computed(obs.assignedImages, indexByValue) | |
134 | - | |
135 | - obs.imageUrl = computed(obs.image, (blobId) => { | |
136 | - if (blobId) { | |
137 | - return api.blob.sync.url(blobId) | |
138 | - } else { | |
139 | - return fallbackImageUrl | |
140 | - } | |
141 | - }) | |
142 | - | |
143 | - obs.push = push | |
144 | - | |
145 | - return obs | |
146 | - | |
147 | - // scoped | |
148 | - | |
149 | - function push (msg) { | |
150 | - if (!lastestTimestamps[msg.author]) { | |
151 | - lastestTimestamps[msg.author] = { | |
152 | - name: 0, image: 0, description: 0 | |
83 … | + return { | |
84 … | + name, | |
85 … | + image, | |
86 … | + description, | |
87 … | + names: computed([state, 'name', id, yourId, id.slice(1, 10)], allValues), | |
88 … | + images: computed([state, 'image', id, yourId], allValues), | |
89 … | + descriptions: computed([state, 'description', id, yourId], allValues), | |
90 … | + imageUrl: computed(image, (blobId) => { | |
91 … | + if (blobId) { | |
92 … | + return api.blob.sync.url(blobId) | |
93 … | + } else { | |
94 … | + return fallbackImageUrl | |
153 | 95 … | } |
154 | - } | |
155 | - if (msg.name && lastestTimestamps[msg.author].name < msg.timestamp) { | |
156 | - lastestTimestamps[msg.author].name = msg.timestamp | |
157 | - obs.assignedNames.put(msg.author, msg.name) | |
158 | - } | |
159 | - if (msg.image && lastestTimestamps[msg.author].image < msg.timestamp) { | |
160 | - lastestTimestamps[msg.author].image = msg.timestamp | |
161 | - var obj = msgs.link(msg.image, 'blob') | |
162 | - if (obj && obj.link) { | |
163 | - obs.assignedImages.put(msg.author, obj.link) | |
96 … | + }), | |
97 … | + push: function (values) { | |
98 … | + var lastState = state() | |
99 … | + var changed = false | |
100 … | + for (var key in values) { | |
101 … | + var valuesForKey = lastState[key] = lastState[key] || {} | |
102 … | + for (var author in values[key]) { | |
103 … | + var value = values[key][author] | |
104 … | + if (!valuesForKey[author] || value[1] > valuesForKey[author][1]) { | |
105 … | + valuesForKey[author] = value | |
106 … | + changed = true | |
107 … | + } | |
108 … | + } | |
164 | 109 … | } |
110 … | + if (changed) { | |
111 … | + state.set(lastState) | |
112 … | + } | |
165 | 113 … | } |
166 | - if (msg.description && lastestTimestamps[msg.author].description < msg.timestamp) { | |
167 | - lastestTimestamps[msg.author].description = msg.timestamp | |
168 | - obs.assignedDescriptions.put(msg.author, msg.description) | |
169 | - } | |
170 | 114 … | } |
171 | 115 … | } |
172 | 116 … | |
173 | -function socialValue (lookup, id, yourId, fallback) { | |
174 | - return lookup[yourId] || lookup[id] || highestRank(lookup) || fallback || null | |
117 … | +function socialValue (lookup, key, id, yourId, fallback) { | |
118 … | + var result = lookup[key] ? getValue(lookup[key][yourId]) || getValue(lookup[key][id]) || highestRank(lookup[key]) : null | |
119 … | + if (result != null) { | |
120 … | + return result | |
121 … | + } else { | |
122 … | + return fallback || null | |
123 … | + } | |
175 | 124 … | } |
176 | 125 … | |
126 … | +function allValues (lookup, key, id, yourId) { | |
127 … | + var values = {} | |
128 … | + for (var author in lookup[key]) { | |
129 … | + var value = getValue(lookup[key][author]) | |
130 … | + if (value != null) { | |
131 … | + values[value] = values[value] || [] | |
132 … | + values[value].push(author) | |
133 … | + } | |
134 … | + } | |
135 … | + return values | |
136 … | +} | |
137 … | + | |
177 | 138 … | function highestRank (lookup) { |
178 | - var indexed = indexByValue(lookup) | |
139 … | + var counts = {} | |
179 | 140 … | var highestCount = 0 |
180 | 141 … | var currentHighest = null |
181 | - Object.keys(indexed).forEach((item) => { | |
182 | - var count = indexed[item].length | |
183 | - if (count > highestCount) { | |
184 | - highestCount = count | |
185 | - currentHighest = item | |
142 … | + for (var key in lookup) { | |
143 … | + var value = getValue(lookup[key]) | |
144 … | + if (value != null) { | |
145 … | + counts[value] = (counts[value] || 0) + 1 | |
146 … | + if (counts[value] > highestCount) { | |
147 … | + currentHighest = value | |
148 … | + highestCount = counts[value] | |
149 … | + } | |
186 | 150 … | } |
187 | - }) | |
151 … | + } | |
188 | 152 … | return currentHighest |
189 | 153 … | } |
190 | 154 … | |
191 | -function indexByValue (lookup) { | |
192 | - var result = {} | |
193 | - Object.keys(lookup).forEach((key) => { | |
194 | - var value = lookup[key] | |
195 | - if (!result[value]) { | |
196 | - result[value] = [] | |
155 … | +function getValue (item) { | |
156 … | + if (item && item[0]) { | |
157 … | + if (typeof item[0] === 'string') { | |
158 … | + return item[0] | |
159 … | + } else if (item[0] && item[0].link && ref.isLink(item[0].link)) { | |
160 … | + return item[0].link | |
197 | 161 … | } |
198 | - result[value].push(key) | |
199 | - }) | |
200 | - return result | |
162 … | + } | |
201 | 163 … | } |
202 | 164 … | |
203 | -function isAbout (msg) { | |
204 | - return msg.value && msg.value.content && msg.value.content.type === 'about' | |
165 … | +function StreamWhenConnected (connection, fn) { | |
166 … | + var stream = defer.source() | |
167 … | + onceTrue(connection, function (connection) { | |
168 … | + stream.resolve(fn(connection)) | |
169 … | + }) | |
170 … | + return stream | |
205 | 171 … | } |
channel/obs/recent.js | ||
---|---|---|
@@ -1,41 +1,99 @@ | ||
1 | 1 … | var nest = require('depnest') |
2 | -var { Value, Dict, Struct, computed } = require('mutant') | |
2 … | +var pull = require('pull-stream') | |
3 | 3 … | |
4 … | +var { Value, Dict, Struct, computed, resolve } = require('mutant') | |
5 … | + | |
6 … | +exports.needs = nest({ | |
7 … | + 'sbot.pull.backlinks': 'first' | |
8 … | +}) | |
9 … | + | |
4 | 10 … | exports.gives = nest({ |
5 | - 'sbot.hook.feed': true, | |
6 | 11 … | 'channel.obs.recent': true |
7 | 12 … | }) |
8 | 13 … | |
9 | 14 … | exports.create = function (api) { |
10 | - var channelsLookup = Dict() | |
15 … | + var recentChannels = null | |
16 … | + var channelsLookup = null | |
11 | 17 … | |
12 | - var recentChannels = computed(channelsLookup, (lookup) => { | |
13 | - var values = Object.keys(lookup).map(x => lookup[x]).sort((a, b) => b.updatedAt - a.updatedAt).map(x => x.id) | |
14 | - return values | |
15 | - }, {nextTick: true}) | |
18 … | + return nest({ | |
19 … | + 'channel.obs.recent': function () { | |
20 … | + load() | |
21 … | + return recentChannels | |
22 … | + } | |
23 … | + }) | |
16 | 24 … | |
17 | - return nest({ | |
18 | - 'sbot.hook.feed': (msg) => { | |
19 | - if (msg.key && msg.value && msg.value.content) { | |
20 | - var c = msg.value.content | |
21 | - if (c.type === 'post' && typeof c.channel === 'string') { | |
22 | - var name = c.channel.trim() | |
23 | - if (name && name.length < 30) { | |
25 … | + function load () { | |
26 … | + if (!recentChannels) { | |
27 … | + var sync = Value(false) | |
28 … | + channelsLookup = Dict() | |
29 … | + | |
30 … | + pull( | |
31 … | + api.sbot.pull.backlinks({ | |
32 … | + old: false, | |
33 … | + live: true, | |
34 … | + query: [ | |
35 … | + {$filter: { | |
36 … | + dest: {$prefix: '#'} | |
37 … | + }} | |
38 … | + ] | |
39 … | + }), | |
40 … | + pull.drain(msg => { | |
41 … | + var obs = channelsLookup.get(msg.dest) | |
42 … | + if (!obs) { | |
43 … | + obs = ChannelRef(msg.dest) | |
44 … | + channelsLookup.put(msg.dest, obs) | |
45 … | + } | |
46 … | + obs.set({ | |
47 … | + id: msg.dest, | |
48 … | + updatedAt: Math.max(resolve(obs.updatedAt), msg.timestamp), | |
49 … | + count: resolve(obs.count) + 1 | |
50 … | + }) | |
51 … | + }) | |
52 … | + ) | |
53 … | + | |
54 … | + pull( | |
55 … | + api.sbot.pull.backlinks({ | |
56 … | + query: [ | |
57 … | + {$filter: { | |
58 … | + dest: {$prefix: '#'} | |
59 … | + }}, | |
60 … | + {$reduce: { | |
61 … | + id: 'dest', | |
62 … | + updatedAt: {$max: 'timestamp'}, | |
63 … | + count: {$count: true} | |
64 … | + }} | |
65 … | + ] | |
66 … | + }), | |
67 … | + pull.drain((item) => { | |
68 … | + if (item.sync) { | |
69 … | + sync.set(true) | |
70 … | + } else if (item.id && item.id.startsWith('#')) { | |
71 … | + var name = item.id | |
24 | 72 … | var channel = channelsLookup.get(name) |
25 | 73 … | if (!channel) { |
26 | - channel = Struct({ | |
27 | - id: name, | |
28 | - updatedAt: Value() | |
29 | - }) | |
74 … | + channel = ChannelRef(name) | |
30 | 75 … | channelsLookup.put(name, channel) |
31 | 76 … | } |
32 | - if (channel.updatedAt() < msg.timestamp) { | |
33 | - channel.updatedAt.set(msg.timestamp) | |
34 | - } | |
77 … | + channel.set(item) | |
35 | 78 … | } |
36 | - } | |
37 | - } | |
38 | - }, | |
39 | - 'channel.obs.recent': () => recentChannels | |
40 | - }) | |
79 … | + }, (err) => { | |
80 … | + if (err) throw err | |
81 … | + sync.set(true) | |
82 … | + }) | |
83 … | + ) | |
84 … | + recentChannels = computed(channelsLookup, (lookup) => { | |
85 … | + var values = Object.keys(lookup).map(x => lookup[x]).sort((a, b) => b.updatedAt - a.updatedAt).map(x => x.id.slice(1)) | |
86 … | + return values | |
87 … | + }, {nextTick: true}) | |
88 … | + recentChannels.sync = sync | |
89 … | + } | |
90 … | + } | |
41 | 91 … | } |
92 … | + | |
93 … | +function ChannelRef (id) { | |
94 … | + return Struct({ | |
95 … | + id, | |
96 … | + updatedAt: Value(0), | |
97 … | + count: Value(0) | |
98 … | + }, {merge: true}) | |
99 … | +} |
channel/obs/subscribed.js | ||
---|---|---|
@@ -11,18 +11,18 @@ | ||
11 | 11 … | }) |
12 | 12 … | |
13 | 13 … | exports.gives = nest({ |
14 | 14 … | 'channel.obs.subscribed': true, |
15 | - 'sbot.hook.feed': true | |
15 … | + 'sbot.hook.publish': true | |
16 | 16 … | }) |
17 | 17 … | |
18 | 18 … | exports.create = function (api) { |
19 | 19 … | var cache = {} |
20 | 20 … | var reducers = {} |
21 | 21 … | |
22 | 22 … | return nest({ |
23 | 23 … | 'channel.obs.subscribed': subscribed, |
24 | - 'sbot.hook.feed': function (msg) { | |
24 … | + 'sbot.hook.publish': function (msg) { | |
25 | 25 … | if (isChannelSubscription(msg)) { |
26 | 26 … | if (msg.value.content.channel && reducers[msg.value.author]) { |
27 | 27 … | reducers[msg.value.author].push(msg) |
28 | 28 … | } |
contact/async.js | ||
---|---|---|
@@ -1,10 +1,11 @@ | ||
1 | 1 … | var nest = require('depnest') |
2 | -var pull = require('pull-stream') | |
2 … | +var onceTrue = require('mutant/once-true') | |
3 … | +var resolve = require('mutant/resolve') | |
3 | 4 … | var ref = require('ssb-ref') |
4 | 5 … | |
5 | 6 … | exports.needs = nest({ |
6 | - 'sbot.pull.query': 'first', | |
7 … | + 'contact.obs.following': 'first', | |
7 | 8 … | 'sbot.async.publish': 'first' |
8 | 9 … | }) |
9 | 10 … | |
10 | 11 … | exports.gives = nest({ |
@@ -16,18 +17,13 @@ | ||
16 | 17 … | 'contact.async': {follow, unfollow, followerOf} |
17 | 18 … | }) |
18 | 19 … | |
19 | 20 … | function followerOf (source, dest, cb) { |
20 | - pull( | |
21 | - api.sbot.pull.query({query: [ | |
22 | - makeQuery(source, dest), | |
23 | - {$map: ['value', 'content', 'following']} | |
24 | - ]}), | |
25 | - pull.collect(function (err, ary) { | |
26 | - if (err) return cb(err) | |
27 | - else cb(null, ary.pop()) // will be true, or undefined/false | |
28 | - }) | |
29 | - ) | |
21 … | + var following = api.contact.obs.following(source) | |
22 … | + onceTrue(following.sync, () => { | |
23 … | + var value = resolve(following) | |
24 … | + cb(null, value && value.has(dest)) | |
25 … | + }) | |
30 | 26 … | } |
31 | 27 … | |
32 | 28 … | function follow (id, cb) { |
33 | 29 … | if (!ref.isFeed(id)) throw new Error('a feed id must be specified') |
@@ -46,17 +42,4 @@ | ||
46 | 42 … | following: false |
47 | 43 … | }, cb) |
48 | 44 … | } |
49 | 45 … | } |
50 | - | |
51 | -function makeQuery (a, b) { | |
52 | - return {'$filter': { | |
53 | - value: { | |
54 | - author: a, | |
55 | - content: { | |
56 | - type: 'contact', | |
57 | - contact: b, | |
58 | - following: true | |
59 | - } | |
60 | - } | |
61 | - }} | |
62 | -} |
contact/obs.js | ||
---|---|---|
@@ -1,106 +1,130 @@ | ||
1 | 1 … | var nest = require('depnest') |
2 | -var MutantPullReduce = require('mutant-pull-reduce') | |
2 … | +var {Value, onceTrue, computed} = require('mutant') | |
3 … | +var defer = require('pull-defer') | |
4 … | +var pull = require('pull-stream') | |
3 | 5 … | var ref = require('ssb-ref') |
4 | 6 … | |
5 | 7 … | exports.needs = nest({ |
6 | - 'sbot.pull.query': 'first', | |
7 | - 'keys.sync.id': 'first' | |
8 … | + 'sbot.obs.connection': 'first' | |
8 | 9 … | }) |
9 | 10 … | |
10 | 11 … | exports.gives = nest({ |
11 | 12 … | 'contact.obs': ['following', 'followers'], |
12 | - 'sbot.hook.feed': true | |
13 … | + 'sbot.hook.publish': true | |
13 | 14 … | }) |
14 | 15 … | |
15 | 16 … | exports.create = function (api) { |
16 | - var followingCache = {} | |
17 | - var followerCache = {} | |
17 … | + var cacheLoading = false | |
18 … | + var cache = {} | |
19 … | + var sync = Value(false) | |
18 | 20 … | |
19 | 21 … | return nest({ |
20 | - 'contact.obs': { following, followers }, | |
21 | - 'sbot.hook.feed': function (msg) { | |
22 | - if (isContact(msg) && msg.timestamp) { | |
23 | - var author = msg.value.author | |
24 | - var contact = msg.value.content.contact | |
25 | - var following = msg.value.content.following | |
26 | - var from = followingCache[author] | |
27 | - var to = followerCache[contact] | |
28 | - if (from) from.push({id: contact, value: following, timestamp: msg.timestamp}) | |
29 | - if (to) to.push({id: author, value: following, timestamp: msg.timestamp}) | |
22 … | + 'contact.obs': { | |
23 … | + following: (id) => get(id).following, | |
24 … | + followers: (id) => get(id).followers | |
25 … | + }, | |
26 … | + 'sbot.hook.publish': function (msg) { | |
27 … | + if (isContact(msg)) { | |
28 … | + var source = msg.value.author | |
29 … | + var dest = msg.value.content.contact | |
30 … | + if (typeof msg.value.content.following === 'boolean') { | |
31 … | + get(source).push({ | |
32 … | + following: { | |
33 … | + [dest]: [msg.value.content] | |
34 … | + } | |
35 … | + }) | |
36 … | + get(dest).push({ | |
37 … | + followers: { | |
38 … | + [source]: [msg.value.content] | |
39 … | + } | |
40 … | + }) | |
41 … | + } | |
30 | 42 … | } |
31 | 43 … | } |
32 | 44 … | }) |
33 | 45 … | |
34 | - function following (id) { | |
35 | - if (!ref.isFeed(id)) throw new Error('a feed id must be specified') | |
36 | - if (!followingCache[id]) { | |
37 | - followingCache[id] = reduce(api.sbot.pull.query({ | |
38 | - query: [ | |
39 | - makeQuery(id, { $prefix: '@' }), | |
40 | - {'$map': { | |
41 | - id: ['value', 'content', 'contact'], | |
42 | - value: ['value', 'content', 'following'], | |
43 | - timestamp: 'timestamp' | |
44 | - }} | |
45 | - ], | |
46 | - live: true | |
47 | - })) | |
48 | - } | |
49 | - return followingCache[id] | |
46 … | + function loadCache () { | |
47 … | + pull( | |
48 … | + StreamWhenConnected(api.sbot.obs.connection, sbot => sbot.contacts.stream({live: true})), | |
49 … | + pull.drain(item => { | |
50 … | + for (var target in item) { | |
51 … | + if (ref.isFeed(target)) { | |
52 … | + get(target).push(item[target]) | |
53 … | + } | |
54 … | + } | |
55 … | + | |
56 … | + if (!sync()) { | |
57 … | + sync.set(true) | |
58 … | + } | |
59 … | + }) | |
60 … | + ) | |
50 | 61 … | } |
51 | 62 … | |
52 | - function followers (id) { | |
53 | - if (!ref.isFeed(id)) throw new Error('a feed id must be specified') | |
54 | - if (!followerCache[id]) { | |
55 | - followerCache[id] = reduce(api.sbot.pull.query({ | |
56 | - query: [ | |
57 | - makeQuery({ $prefix: '@' }, id), | |
58 | - {'$map': { | |
59 | - id: ['value', 'author'], | |
60 | - value: ['value', 'content', 'following'], | |
61 | - timestamp: 'timestamp' | |
62 | - }} | |
63 | - ], | |
64 | - live: true | |
65 | - })) | |
63 … | + function get (id) { | |
64 … | + if (!ref.isFeed(id)) throw new Error('Contact state requires an id!') | |
65 … | + if (!cacheLoading) { | |
66 … | + cacheLoading = true | |
67 … | + loadCache() | |
66 | 68 … | } |
67 | - return followerCache[id] | |
69 … | + if (!cache[id]) { | |
70 … | + cache[id] = Contact(api, id, sync) | |
71 … | + } | |
72 … | + return cache[id] | |
68 | 73 … | } |
69 | 74 … | } |
70 | 75 … | |
71 | -function reduce (stream) { | |
72 | - var newestValues = {} | |
73 | - return MutantPullReduce(stream, (result, item) => { | |
74 | - if (!ref.isFeed(item.id)) return result // invalid message, skip this item | |
75 | - newestValues[item.id] = newestValues[item.id] || 0 | |
76 | - if (newestValues[item.id] < item.timestamp) { | |
77 | - newestValues[item.id] = item.timestamp | |
78 | - if (item.value != null) { | |
79 | - if (item.value) { | |
80 | - result.add(item.id) | |
81 | - } else { | |
82 | - result.delete(item.id) | |
76 … | +function Contact (api, id, sync) { | |
77 … | + var state = Value({}) | |
78 … | + return { | |
79 … | + following: computedIds(state, 'following', true, sync), | |
80 … | + followers: computedIds(state, 'followers', true, sync), | |
81 … | + push: function (values) { | |
82 … | + var lastState = state() | |
83 … | + var changed = false | |
84 … | + for (var key in values) { | |
85 … | + var valuesForKey = lastState[key] = lastState[key] || {} | |
86 … | + for (var dest in values[key]) { | |
87 … | + var value = values[key][dest] | |
88 … | + if (!valuesForKey[dest] || value[1] > valuesForKey[dest][1] || !values[1] || !valuesForKey[dest[1]]) { | |
89 … | + valuesForKey[dest] = value | |
90 … | + changed = true | |
91 … | + } | |
83 | 92 … | } |
84 | 93 … | } |
94 … | + if (changed) { | |
95 … | + state.set(lastState) | |
96 … | + } | |
85 | 97 … | } |
86 | - return result | |
87 | - }, { | |
88 | - startValue: new Set() | |
89 | - }) | |
98 … | + } | |
90 | 99 … | } |
91 | 100 … | |
92 | -function makeQuery (a, b) { | |
93 | - return {'$filter': { | |
94 | - value: { | |
95 | - author: a, | |
96 | - content: { | |
97 | - type: 'contact', | |
98 | - contact: b | |
101 … | +function computedIds (state, key, compare, sync) { | |
102 … | + var obs = computed([state, 'following', true], getIds) | |
103 … | + obs.sync = sync | |
104 … | + return obs | |
105 … | +} | |
106 … | + | |
107 … | +function getIds (state, key, compare) { | |
108 … | + var result = new Set() | |
109 … | + if (state[key]) { | |
110 … | + for (var dest in state[key]) { | |
111 … | + if (state[key][dest][0] === compare) { | |
112 … | + result.add(dest) | |
99 | 113 … | } |
100 | 114 … | } |
101 | - }} | |
115 … | + } | |
116 … | + | |
117 … | + return result | |
102 | 118 … | } |
103 | 119 … | |
104 | 120 … | function isContact (msg) { |
105 | 121 … | return msg.value && msg.value.content && msg.value.content.type === 'contact' |
106 | 122 … | } |
123 … | + | |
124 … | +function StreamWhenConnected (connection, fn) { | |
125 … | + var stream = defer.source() | |
126 … | + onceTrue(connection, function (connection) { | |
127 … | + stream.resolve(fn(connection)) | |
128 … | + }) | |
129 … | + return stream | |
130 … | +} |
feed/pull/channel.js | ||
---|---|---|
@@ -2,27 +2,26 @@ | ||
2 | 2 … | const extend = require('xtend') |
3 | 3 … | |
4 | 4 … | exports.gives = nest('feed.pull.channel') |
5 | 5 … | exports.needs = nest({ |
6 | - 'sbot.pull.query': 'first' | |
6 … | + 'sbot.pull.backlinks': 'first' | |
7 | 7 … | }) |
8 | 8 … | |
9 | 9 … | exports.create = function (api) { |
10 | 10 … | return nest('feed.pull.channel', function (channel) { |
11 | 11 … | if (typeof channel !== 'string') throw new Error('a channel name be specified') |
12 | 12 … | |
13 | 13 … | return function (opts) { |
14 | - var filter = {value: {content: { channel }}} | |
15 | - var query = {query: [ | |
16 | - {$filter: filter} | |
17 | - ]} | |
14 … | + var filter = {dest: `#${channel}`} | |
18 | 15 … | |
19 | 16 … | // HACK: handle lt |
20 | 17 … | if (opts.lt != null) { |
21 | - filter.timestamp = {$lt: opts.lt, $gte: 0, $le: 'hack around dominictarr/map-filter-reduce#1'} | |
18 … | + filter.timestamp = {$lt: opts.lt, $gte: 0} | |
22 | 19 … | delete opts.lt |
23 | 20 … | } |
24 | 21 … | |
25 | - return api.sbot.pull.query(extend(opts, query)) | |
22 … | + return api.sbot.pull.backlinks(extend(opts, {query: [ | |
23 … | + {$filter: filter} | |
24 … | + ]})) | |
26 | 25 … | } |
27 | 26 … | }) |
28 | 27 … | } |
index.js | ||
---|---|---|
@@ -1,8 +1,8 @@ | ||
1 | 1 … | const bulk = require('bulk-require') |
2 | 2 … | |
3 | 3 … | module.exports = { |
4 | 4 … | patchcore: bulk(__dirname, [ |
5 | - './!(index).js', | |
6 | - './!(node_modules|example)/**/*.js' | |
5 … | + './+(config|emoji|invite|keys|sbot).js', | |
6 … | + './+(about|blob|channel|contact|feed|lib|message)/**/*.js' | |
7 | 7 … | ]) |
8 | 8 … | } |
message/html/backlinks.js | ||
---|---|---|
@@ -2,8 +2,9 @@ | ||
2 | 2 … | const map = require('mutant/map') |
3 | 3 … | const computed = require('mutant/computed') |
4 | 4 … | const when = require('mutant/when') |
5 | 5 … | const nest = require('depnest') |
6 … | +const ref = require('ssb-ref') | |
6 | 7 … | |
7 | 8 … | exports.needs = nest({ |
8 | 9 … | 'message.obs.backlinks': 'first', |
9 | 10 … | 'message.obs.name': 'first', |
@@ -14,23 +15,37 @@ | ||
14 | 15 … | exports.gives = nest('message.html.backlinks') |
15 | 16 … | |
16 | 17 … | exports.create = function (api) { |
17 | 18 … | return nest('message.html.backlinks', function (msg) { |
19 … | + if (!ref.isMsg(msg.key)) return [] | |
18 | 20 … | var backlinks = api.message.obs.backlinks(msg.key) |
19 | - return when(computed(backlinks, hasItems), | |
21 … | + var references = computed([backlinks, msg], onlyReferences) | |
22 … | + return when(computed(references, hasItems), | |
20 | 23 … | h('MessageBacklinks', [ |
21 | 24 … | h('header', 'backlinks:'), |
22 | 25 … | h('ul', [ |
23 | - map(backlinks, (link) => { | |
26 … | + map(backlinks, (backlink) => { | |
24 | 27 … | return h('li', [ |
25 | - h('a -backlink', { href: link, title: link }, api.message.obs.name(link)) | |
28 … | + h('a -backlink', { href: backlink.id, title: backlink.id }, api.message.obs.name(backlink.id)) | |
26 | 29 … | ]) |
27 | 30 … | }) |
28 | 31 … | ]) |
29 | 32 … | ]) |
30 | 33 … | ) |
31 | 34 … | }) |
32 | 35 … | } |
33 | 36 … | |
34 | -function hasItems (items) { | |
37 … | +function onlyReferences (backlinks, msg) { | |
38 … | + return backlinks.filter(link => link.root !== msg.key && !includeOrEqual(link.branch, msg.key)) | |
39 … | +} | |
40 … | + | |
41 … | +function hasItems (items, msg) { | |
35 | 42 … | return (items && items.length) |
36 | 43 … | } |
44 … | + | |
45 … | +function includeOrEqual (valueOrArray, item) { | |
46 … | + if (Array.isArray(valueOrArray)) { | |
47 … | + return valueOrArray.includes(item) | |
48 … | + } else { | |
49 … | + return valueOrArray === item | |
50 … | + } | |
51 … | +} |
message/obs/backlinks.js | ||
---|---|---|
@@ -1,58 +1,40 @@ | ||
1 | 1 … | var nest = require('depnest') |
2 | -var MutantSet = require('mutant/set') | |
2 … | +var MutantPullReduce = require('mutant-pull-reduce') | |
3 | 3 … | |
4 | 4 … | exports.needs = nest({ |
5 | - 'message.sync.unbox': 'first' | |
5 … | + 'sbot.pull.backlinks': 'first' | |
6 | 6 … | }) |
7 | 7 … | |
8 | -exports.gives = nest({ | |
9 | - 'sbot.hook.feed': true, | |
10 | - 'message.obs.backlinks': true, | |
11 | - 'message.obs.forks': true | |
12 | -}) | |
8 … | +exports.gives = nest('message.obs.backlinks', true) | |
13 | 9 … | |
14 | 10 … | exports.create = function (api) { |
15 | - var mentionedLookup = {} | |
16 | - var rootLookup = {} | |
17 | - | |
18 | 11 … | return nest({ |
19 | - 'sbot.hook.feed': (msg) => { | |
20 | - if (msg.value && typeof msg.value.content === 'string') { | |
21 | - msg = api.message.sync.unbox(msg) | |
22 | - } | |
23 | - | |
24 | - if (msg && msg.value && msg.value.content) { | |
25 | - if (Array.isArray(msg.value.content.mentions)) { | |
26 | - msg.value.content.mentions.forEach(mention => { | |
27 | - var link = typeof mention === 'object' ? mention.link : mention | |
28 | - backlinks(link).add(msg.key) | |
29 | - }) | |
30 | - } | |
31 | - | |
32 | - var root = msg.value.content.root | |
33 | - var branch = msg.value.content.branch || root | |
34 | - | |
35 | - // fork or root reply | |
36 | - if (root && root === branch) { | |
37 | - forks(root).add(msg.key) | |
38 | - } | |
39 | - } | |
40 | - }, | |
41 | - 'message.obs.backlinks': (id) => backlinks(id), | |
42 | - 'message.obs.forks': (id) => forks(id) | |
12 … | + 'message.obs.backlinks': (id) => backlinks(id) | |
43 | 13 … | }) |
44 | 14 … | |
45 | 15 … | function backlinks (id) { |
46 | - if (!mentionedLookup[id]) { | |
47 | - mentionedLookup[id] = MutantSet() | |
48 | - } | |
49 | - return mentionedLookup[id] | |
16 … | + return MutantPullReduce(api.sbot.pull.backlinks({ | |
17 … | + query: [ | |
18 … | + {$filter: { | |
19 … | + dest: id | |
20 … | + }}, | |
21 … | + {$map: { | |
22 … | + dest: 'dest', | |
23 … | + id: 'key', | |
24 … | + timestamp: 'timestamp', | |
25 … | + type: ['value', 'content', 'type'], | |
26 … | + root: ['value', 'content', 'root'], | |
27 … | + branch: ['value', 'content', 'branch'], | |
28 … | + author: ['value', 'author'] | |
29 … | + }} | |
30 … | + ] | |
31 … | + }), (result, msg) => { | |
32 … | + if (msg.type !== 'vote' && msg.type !== 'about') { | |
33 … | + result.push(msg) | |
34 … | + } | |
35 … | + return result | |
36 … | + }, { | |
37 … | + startValue: [] | |
38 … | + }) | |
50 | 39 … | } |
51 | - | |
52 | - function forks (id) { | |
53 | - if (!rootLookup[id]) { | |
54 | - rootLookup[id] = MutantSet() | |
55 | - } | |
56 | - return rootLookup[id] | |
57 | - } | |
58 | 40 … | } |
message/obs/likes.js | ||
---|---|---|
@@ -1,21 +1,25 @@ | ||
1 | 1 … | var nest = require('depnest') |
2 | 2 … | var ref = require('ssb-ref') |
3 | -var { Value, computed } = require('mutant') | |
3 … | +var MutantPullReduce = require('mutant-pull-reduce') | |
4 … | +var SortedArray = require('sorted-array-functions') | |
4 | 5 … | |
6 … | +var { computed } = require('mutant') | |
7 … | + | |
5 | 8 … | exports.needs = nest({ |
6 | - 'message.sync.unbox': 'first' | |
9 … | + 'message.sync.unbox': 'first', | |
10 … | + 'sbot.pull.backlinks': 'first' | |
7 | 11 … | }) |
8 | 12 … | |
9 | 13 … | exports.gives = nest({ |
10 | - 'sbot.hook.feed': true, | |
14 … | + 'sbot.hook.publish': true, | |
11 | 15 … | 'message.obs.likes': true |
12 | 16 … | }) |
13 | 17 … | |
14 | 18 … | exports.create = function (api) { |
15 | - var likesLookup = {} | |
19 … | + var activeLikes = new Set() | |
16 | 20 … | return nest({ |
17 | - 'sbot.hook.feed': (msg) => { | |
21 … | + 'sbot.hook.publish': (msg) => { | |
18 | 22 … | if (!(msg && msg.value && msg.value.content)) return |
19 | 23 … | if (typeof msg.value.content === 'string') { |
20 | 24 … | msg = api.message.sync.unbox(msg) |
21 | 25 … | if (!msg) return |
@@ -24,33 +28,76 @@ | ||
24 | 28 … | var c = msg.value.content |
25 | 29 … | if (c.type !== 'vote') return |
26 | 30 … | if (!c.vote || !c.vote.link) return |
27 | 31 … | |
28 | - var likes = get(c.vote.link)() | |
29 | - var author = msg.value.author | |
30 | - if (!likes[author] || likes[author][1] < msg.timestamp) { | |
31 | - likes[author] = [c.vote.value > 0, msg.timestamp] | |
32 | - get(c.vote.link).set(likes) | |
33 | - } | |
32 … | + activeLikes.forEach((likes) => { | |
33 … | + if (likes.id === c.vote.link) { | |
34 … | + likes.push({ | |
35 … | + dest: c.vote.link, | |
36 … | + id: msg.key, | |
37 … | + expression: c.vote.expression, | |
38 … | + value: c.vote.value, | |
39 … | + timestamp: msg.value.timestamp, | |
40 … | + author: msg.value.author | |
41 … | + }) | |
42 … | + } | |
43 … | + }) | |
34 | 44 … | }, |
35 | 45 … | 'message.obs.likes': (id) => { |
36 | 46 … | if (!ref.isLink(id)) throw new Error('an id must be specified') |
37 | - return computed(get(id), getLikes) | |
47 … | + var obs = get(id) | |
48 … | + obs.id = id | |
49 … | + return computed(obs, getLikes, { | |
50 … | + // allow manual append for simulated realtime | |
51 … | + onListen: () => activeLikes.add(obs), | |
52 … | + onUnlisten: () => activeLikes.delete(obs) | |
53 … | + }) | |
38 | 54 … | } |
39 | 55 … | }) |
40 | 56 … | |
41 | 57 … | function get (id) { |
42 | - if (!likesLookup[id]) { | |
43 | - likesLookup[id] = Value({}) | |
44 | - } | |
45 | - return likesLookup[id] | |
58 … | + var likes = MutantPullReduce(api.sbot.pull.backlinks({ | |
59 … | + live: true, | |
60 … | + query: [ | |
61 … | + {$filter: { | |
62 … | + dest: id, | |
63 … | + value: { | |
64 … | + content: { | |
65 … | + type: 'vote', | |
66 … | + vote: { link: id } | |
67 … | + } | |
68 … | + } | |
69 … | + }}, | |
70 … | + {$map: { | |
71 … | + dest: 'dest', | |
72 … | + id: 'key', | |
73 … | + expression: ['value', 'content', 'vote', 'expression'], | |
74 … | + value: ['value', 'content', 'vote', 'value'], | |
75 … | + timestamp: 'timestamp', | |
76 … | + author: ['value', 'author'] | |
77 … | + }} | |
78 … | + ] | |
79 … | + }), (result, msg) => { | |
80 … | + if (!result[msg.author]) { | |
81 … | + result[msg.author] = [] | |
82 … | + } | |
83 … | + SortedArray.add(result[msg.author], msg, mostRecent) | |
84 … | + return result | |
85 … | + }, { | |
86 … | + startValue: [] | |
87 … | + }) | |
88 … | + return likes | |
46 | 89 … | } |
47 | 90 … | } |
48 | 91 … | |
49 | 92 … | function getLikes (likes) { |
50 | 93 … | return Object.keys(likes).reduce((result, id) => { |
51 | - if (likes[id][0]) { | |
94 … | + if (likes[id][0].value) { | |
52 | 95 … | result.push(id) |
53 | 96 … | } |
54 | 97 … | return result |
55 | 98 … | }, []) |
56 | 99 … | } |
100 … | + | |
101 … | +function mostRecent (a, b) { | |
102 … | + return b.timestamp - a.timestamp | |
103 … | +} |
package.json | ||
---|---|---|
@@ -45,14 +45,15 @@ | ||
45 | 45 … | "pull-cat": "^1.1.11", |
46 | 46 … | "pull-reconnect": "0.0.3", |
47 | 47 … | "pull-stream": "^3.5.0", |
48 | 48 … | "simple-mime": "^0.1.0", |
49 … | + "sorted-array-functions": "^1.0.0", | |
49 | 50 … | "split-buffer": "^1.0.0", |
50 | 51 … | "ssb-avatar": "^0.2.0", |
51 | 52 … | "ssb-client": "^4.4.0", |
52 | 53 … | "ssb-config": "^2.2.0", |
53 | 54 … | "ssb-feed": "^2.3.0", |
54 | - "ssb-keys": "^7.0.4", | |
55 … | + "ssb-keys": "^7.0.9", | |
55 | 56 … | "ssb-markdown": "^3.3.0", |
56 | 57 … | "ssb-marked": "^0.7.2", |
57 | 58 … | "ssb-ref": "^2.6.2", |
58 | 59 … | "ssb-sort": "^1.0.0", |
sbot.js | ||
---|---|---|
@@ -10,9 +10,9 @@ | ||
10 | 10 … | exports.needs = nest({ |
11 | 11 … | 'config.sync.load': 'first', |
12 | 12 … | 'keys.sync.load': 'first', |
13 | 13 … | 'sbot.obs.connectionStatus': 'first', |
14 | - 'sbot.hook.feed': 'map' | |
14 … | + 'sbot.hook.publish': 'map' | |
15 | 15 … | }) |
16 | 16 … | |
17 | 17 … | exports.gives = { |
18 | 18 … | sbot: { |
@@ -32,9 +32,10 @@ | ||
32 | 32 … | query: true, |
33 | 33 … | feed: true, |
34 | 34 … | links: true, |
35 | 35 … | search: true, |
36 | - replicateProgress: true | |
36 … | + replicateProgress: true, | |
37 … | + backlinks: true | |
37 | 38 … | }, |
38 | 39 … | obs: { |
39 | 40 … | connectionStatus: true, |
40 | 41 … | connection: true, |
@@ -174,8 +175,11 @@ | ||
174 | 175 … | pull: { |
175 | 176 … | query: rec.source(query => { |
176 | 177 … | return sbot.query.read(query) |
177 | 178 … | }), |
179 … | + backlinks: rec.source(query => { | |
180 … | + return sbot.backlinks.read(query) | |
181 … | + }), | |
178 | 182 … | userFeed: rec.source(opts => { |
179 | 183 … | return sbot.createUserStream(opts) |
180 | 184 … | }), |
181 | 185 … | messagesByType: rec.source(opts => { |
@@ -215,12 +219,12 @@ | ||
215 | 219 … | // scoped |
216 | 220 … | |
217 | 221 … | function runHooks (msg) { |
218 | 222 … | if (msg.publishing) { |
219 | - api.sbot.hook.feed(msg) | |
223 … | + api.sbot.hook.publish(msg) | |
220 | 224 … | } else if (!cache[msg.key]) { |
221 | - cache[msg.key] = msg.value | |
222 | - api.sbot.hook.feed(msg) | |
225 … | + // cache[msg.key] = msg.value | |
226 … | + // api.sbot.hook.feed(msg) | |
223 | 227 … | } |
224 | 228 … | } |
225 | 229 … | |
226 | 230 … | function refreshPeers () { |
Built with git-ssb-web