Commit 6df42424aee05290381c9ec078709f8303a89ae7
use ssb-contacts for `contact.obs`
Matt McKegg committed on 6/12/2017, 1:15:07 PMParent: 51b10fcae8f88e89444a0c702a5a50bcaf49c157
Files changed
contact/obs.js | changed |
contact/obs.js | ||
---|---|---|
@@ -1,106 +1,130 @@ | ||
1 | 1 … | var nest = require('depnest') |
2 | -var MutantPullReduce = require('mutant-pull-reduce') | |
2 … | +var {Value, onceTrue, computed} = require('mutant') | |
3 … | +var defer = require('pull-defer') | |
4 … | +var pull = require('pull-stream') | |
3 | 5 … | var ref = require('ssb-ref') |
4 | 6 … | |
5 | 7 … | exports.needs = nest({ |
6 | - 'sbot.pull.query': 'first', | |
7 | - 'keys.sync.id': 'first' | |
8 … | + 'sbot.obs.connection': 'first' | |
8 | 9 … | }) |
9 | 10 … | |
10 | 11 … | exports.gives = nest({ |
11 | 12 … | 'contact.obs': ['following', 'followers'], |
12 | 13 … | 'sbot.hook.publish': true |
13 | 14 … | }) |
14 | 15 … | |
15 | 16 … | exports.create = function (api) { |
16 | - var followingCache = {} | |
17 | - var followerCache = {} | |
17 … | + var cacheLoading = false | |
18 … | + var cache = {} | |
19 … | + var sync = Value(false) | |
18 | 20 … | |
19 | 21 … | return nest({ |
20 | - 'contact.obs': { following, followers }, | |
22 … | + 'contact.obs': { | |
23 … | + following: (id) => get(id).following, | |
24 … | + followers: (id) => get(id).followers | |
25 … | + }, | |
21 | 26 … | 'sbot.hook.publish': function (msg) { |
22 | - if (isContact(msg) && msg.timestamp) { | |
23 | - var author = msg.value.author | |
24 | - var contact = msg.value.content.contact | |
25 | - var following = msg.value.content.following | |
26 | - var from = followingCache[author] | |
27 | - var to = followerCache[contact] | |
28 | - if (from) from.push({id: contact, value: following, timestamp: msg.timestamp}) | |
29 | - if (to) to.push({id: author, value: following, timestamp: msg.timestamp}) | |
27 … | + if (isContact(msg)) { | |
28 … | + var source = msg.value.author | |
29 … | + var dest = msg.value.content.contact | |
30 … | + if (typeof msg.value.content.following === 'boolean') { | |
31 … | + get(source).push({ | |
32 … | + following: { | |
33 … | + [dest]: [msg.value.content] | |
34 … | + } | |
35 … | + }) | |
36 … | + get(dest).push({ | |
37 … | + followers: { | |
38 … | + [source]: [msg.value.content] | |
39 … | + } | |
40 … | + }) | |
41 … | + } | |
30 | 42 … | } |
31 | 43 … | } |
32 | 44 … | }) |
33 | 45 … | |
34 | - function following (id) { | |
35 | - if (!ref.isFeed(id)) throw new Error('a feed id must be specified') | |
36 | - if (!followingCache[id]) { | |
37 | - followingCache[id] = reduce(api.sbot.pull.query({ | |
38 | - query: [ | |
39 | - makeQuery(id, { $prefix: '@' }), | |
40 | - {'$map': { | |
41 | - id: ['value', 'content', 'contact'], | |
42 | - value: ['value', 'content', 'following'], | |
43 | - timestamp: 'timestamp' | |
44 | - }} | |
45 | - ], | |
46 | - live: true | |
47 | - })) | |
48 | - } | |
49 | - return followingCache[id] | |
46 … | + function loadCache () { | |
47 … | + pull( | |
48 … | + StreamWhenConnected(api.sbot.obs.connection, sbot => sbot.contacts.stream({live: true})), | |
49 … | + pull.drain(item => { | |
50 … | + for (var target in item) { | |
51 … | + if (ref.isFeed(target)) { | |
52 … | + get(target).push(item[target]) | |
53 … | + } | |
54 … | + } | |
55 … | + | |
56 … | + if (!sync()) { | |
57 … | + sync.set(true) | |
58 … | + } | |
59 … | + }) | |
60 … | + ) | |
50 | 61 … | } |
51 | 62 … | |
52 | - function followers (id) { | |
53 | - if (!ref.isFeed(id)) throw new Error('a feed id must be specified') | |
54 | - if (!followerCache[id]) { | |
55 | - followerCache[id] = reduce(api.sbot.pull.query({ | |
56 | - query: [ | |
57 | - makeQuery({ $prefix: '@' }, id), | |
58 | - {'$map': { | |
59 | - id: ['value', 'author'], | |
60 | - value: ['value', 'content', 'following'], | |
61 | - timestamp: 'timestamp' | |
62 | - }} | |
63 | - ], | |
64 | - live: true | |
65 | - })) | |
63 … | + function get (id) { | |
64 … | + if (!ref.isFeed(id)) throw new Error('Contact state requires an id!') | |
65 … | + if (!cacheLoading) { | |
66 … | + cacheLoading = true | |
67 … | + loadCache() | |
66 | 68 … | } |
67 | - return followerCache[id] | |
69 … | + if (!cache[id]) { | |
70 … | + cache[id] = Contact(api, id, sync) | |
71 … | + } | |
72 … | + return cache[id] | |
68 | 73 … | } |
69 | 74 … | } |
70 | 75 … | |
71 | -function reduce (stream) { | |
72 | - var newestValues = {} | |
73 | - return MutantPullReduce(stream, (result, item) => { | |
74 | - if (!ref.isFeed(item.id)) return result // invalid message, skip this item | |
75 | - newestValues[item.id] = newestValues[item.id] || 0 | |
76 | - if (newestValues[item.id] < item.timestamp) { | |
77 | - newestValues[item.id] = item.timestamp | |
78 | - if (item.value != null) { | |
79 | - if (item.value) { | |
80 | - result.add(item.id) | |
81 | - } else { | |
82 | - result.delete(item.id) | |
76 … | +function Contact (api, id, sync) { | |
77 … | + var state = Value({}) | |
78 … | + return { | |
79 … | + following: computedIds(state, 'following', true, sync), | |
80 … | + followers: computedIds(state, 'followers', true, sync), | |
81 … | + push: function (values) { | |
82 … | + var lastState = state() | |
83 … | + var changed = false | |
84 … | + for (var key in values) { | |
85 … | + var valuesForKey = lastState[key] = lastState[key] || {} | |
86 … | + for (var dest in values[key]) { | |
87 … | + var value = values[key][dest] | |
88 … | + if (!valuesForKey[dest] || value[1] > valuesForKey[dest][1] || !values[1] || !valuesForKey[dest[1]]) { | |
89 … | + valuesForKey[dest] = value | |
90 … | + changed = true | |
91 … | + } | |
83 | 92 … | } |
84 | 93 … | } |
94 … | + if (changed) { | |
95 … | + state.set(lastState) | |
96 … | + } | |
85 | 97 … | } |
86 | - return result | |
87 | - }, { | |
88 | - startValue: new Set() | |
89 | - }) | |
98 … | + } | |
90 | 99 … | } |
91 | 100 … | |
92 | -function makeQuery (a, b) { | |
93 | - return {'$filter': { | |
94 | - value: { | |
95 | - author: a, | |
96 | - content: { | |
97 | - type: 'contact', | |
98 | - contact: b | |
101 … | +function computedIds (state, key, compare, sync) { | |
102 … | + var obs = computed([state, 'following', true], getIds) | |
103 … | + obs.sync = sync | |
104 … | + return obs | |
105 … | +} | |
106 … | + | |
107 … | +function getIds (state, key, compare) { | |
108 … | + var result = new Set() | |
109 … | + if (state[key]) { | |
110 … | + for (var dest in state[key]) { | |
111 … | + if (state[key][dest][0] === compare) { | |
112 … | + result.add(dest) | |
99 | 113 … | } |
100 | 114 … | } |
101 | - }} | |
115 … | + } | |
116 … | + | |
117 … | + return result | |
102 | 118 … | } |
103 | 119 … | |
104 | 120 … | function isContact (msg) { |
105 | 121 … | return msg.value && msg.value.content && msg.value.content.type === 'contact' |
106 | 122 … | } |
123 … | + | |
124 … | +function StreamWhenConnected (connection, fn) { | |
125 … | + var stream = defer.source() | |
126 … | + onceTrue(connection, function (connection) { | |
127 … | + stream.resolve(fn(connection)) | |
128 … | + }) | |
129 … | + return stream | |
130 … | +} |
Built with git-ssb-web