Files: 43af34efb145e2705ff168ae3e6f3cc6832314ac / message-cache.js
3110 bytesRaw
1 | // pull drains that receives updates from update-stream |
2 | // and keep a cache of observable message objects |
3 | const pull = require('pull-stream') |
4 | const MutantDict = require('mutant/dict') |
5 | const MutantArray = require('mutant/array') |
6 | |
7 | // TODO: this does not account for messages changing branches! |
8 | |
9 | function cacheAndIndex(opts) { |
10 | opts = opts || {} |
11 | let branches = {} |
12 | let messages = {} |
13 | let ret |
14 | |
15 | pull( |
16 | ret = updateObservableMessages(null, { |
17 | makeObservable: kv => { |
18 | let d = MutantDict(kv) |
19 | messages[kv.key] = d |
20 | return d |
21 | }, |
22 | updateObservable: (child, kv) => { |
23 | child.set(kv) |
24 | }, |
25 | getContainer: kv => { |
26 | let {key, value} = kv |
27 | let branch = value.content && value.content.branch || 'ROOTS' |
28 | let mutantArray = branches[branch] |
29 | if (!mutantArray) { |
30 | mutantArray = branches[branch] = MutantArray() |
31 | } |
32 | return mutantArray |
33 | } |
34 | }) |
35 | ) |
36 | ret.getChildrenObservable = parentId => branches[parentId] |
37 | ret.getMessageObservable = msgId => messages[msgId] |
38 | return ret |
39 | } |
40 | |
41 | function updateObservableMessages(container, opts, cb) { |
42 | opts = opts || {} |
43 | let makeObservable = opts.makeObservable |
44 | let updateObservable = opts.updateObservable |
45 | if (!makeObservable) throw new Error('You need to pass makeObservable') |
46 | if (!updateObservable) throw new Error('You need to pass updateObservable') |
47 | |
48 | let currentRevisions = {} // needed to implement revert, because update-stream doesn't give as the revision we revert to |
49 | let previousRevisions = {} |
50 | |
51 | let ret = pull.drain( kv => { |
52 | let mutantArray = container || opts.getContainer(kv) |
53 | let {key, value} = kv |
54 | // do we have a child for that revRoot yet? |
55 | let child = mutantArray.find( x=> x.id === key ) |
56 | |
57 | // Is this a request to remove a draft? |
58 | if (kv.type === 'del') { |
59 | if (child) { |
60 | mutantArray.delete(child) |
61 | } else console.error('Request to delete non-existing child', key) |
62 | return |
63 | } else if (kv.type === 'revert') { |
64 | if (kv.remove !== currentRevisions[kv.key]) throw new Error('revert to unknow revision') |
65 | kv.revision = previousRevisions[kv.key] |
66 | } |
67 | |
68 | if (!child) { |
69 | if (!value) return console.error('Trying to make a node without a value. This is bad.') |
70 | child = makeObservable(kv) |
71 | child.id = kv.key |
72 | // if this is a new child that was just created from a draft, |
73 | // make sure to get rid of the draft |
74 | let fromDraft = value.content && value.content['from-draft'] |
75 | if (fromDraft) { |
76 | delete currentRevisions[fromDraft] |
77 | let draft = mutantArray.find( x=> x.id === fromDraft ) |
78 | if (draft) mutantArray.delete(draft) |
79 | } |
80 | mutantArray.push(child) |
81 | } |
82 | if (currentRevisions[kv.key] !== kv.revision) { |
83 | previousRevisions[kv.key] = currentRevisions[kv.key] |
84 | currentRevisions[kv.key] = kv.revision |
85 | } |
86 | |
87 | updateObservable(child, kv) |
88 | }, cb || (err => { |
89 | if (err) console.error('updateObservableMessages: stream ended', err) |
90 | })) |
91 | |
92 | return ret |
93 | } |
94 | |
95 | module.exports = { |
96 | cacheAndIndex, |
97 | updateObservableMessages |
98 | } |
99 |
Built with git-ssb-web