Files: 43af34efb145e2705ff168ae3e6f3cc6832314ac / update-stream.js
5141 bytesRaw
1 | const pull = require('pull-stream') |
2 | const pushable = require('pull-pushable') |
3 | const {isDraft} = require('./util') |
4 | const ssbsort = require('./ssb-sort')(links, breakTie) |
5 | |
6 | function links(value, each) { |
7 | if (!value) return |
8 | let revBranches = (value.content && value.content.revisionBranch) || [] |
9 | let links = Array.isArray(revBranches) ? revBranches : [revBranches] |
10 | |
11 | let fromDraft = value.content && value.content['from-draft'] |
12 | if (fromDraft) links.push(fromDraft) |
13 | |
14 | links.forEach(each) |
15 | } |
16 | |
17 | function breakTie(a, b) { |
18 | return ( |
19 | // drafts after non-drafts |
20 | (isDraft(b.key) - isDraft(a.key)) || |
21 | //declared timestamp, may by incorrect or a lie |
22 | (b.value.timestamp - a.value.timestamp) || |
23 | //finially, sort hashes lexiegraphically. |
24 | (a.key > b.key ? -1 : a.key < b.key ? 1 : 0) |
25 | ) |
26 | } |
27 | |
28 | module.exports = function(trusted_keys) { |
29 | trusted_keys = trusted_keys || [] |
30 | |
31 | return function updates(opts) { |
32 | opts = opts || {} |
33 | const slots = {} |
34 | let doBuffer = opts.bufferUntilSync // in sync mode, we buffer until we see a sync |
35 | let drain |
36 | |
37 | const out = pushable(true, function (err) { |
38 | if (err) console.error('out stream closed by client!', err) |
39 | drain.abort(err) |
40 | }) |
41 | |
42 | function push(o) { |
43 | if (!doBuffer) out.push(o) |
44 | } |
45 | |
46 | function flush() { |
47 | if (!doBuffer) return |
48 | doBuffer = false |
49 | Object.keys(slots).forEach( k => { |
50 | processSlot(slots[k]) |
51 | }) |
52 | } |
53 | |
54 | function processSlot(slot) { |
55 | if (slot.revisions.length === 0) return |
56 | if (opts.allRevisions) { |
57 | const revisions = ssbsort(slot.revisions) |
58 | const fingerprint = revisions.map( kv => kv.key ).join('-') |
59 | if (slot.fingerprint !== fingerprint) { |
60 | slot.fingerprint = fingerprint |
61 | push({ |
62 | key: slot.key, |
63 | revisions: revisions.slice() |
64 | }) |
65 | } |
66 | } else { |
67 | const newHeads = heads(slot.revisions) |
68 | if (slot.currKv !== newHeads[0]) { |
69 | slot.currKv = newHeads[0] |
70 | push({ |
71 | key: slot.key, |
72 | value: slot.currKv && slot.currKv.value, |
73 | revision: slot.currKv && slot.currKv.key, |
74 | heads: newHeads, |
75 | |
76 | // for compatibility |
77 | unsaved: isDraft(slot.currKv && slot.currKv.key), |
78 | }) |
79 | } |
80 | } |
81 | } |
82 | |
83 | function heads(revisions) { |
84 | let heads = ssbsort.heads(revisions) |
85 | |
86 | const revs = {} |
87 | revisions.forEach( kv => revs[kv.key] = kv ) |
88 | |
89 | function trusted(kv) { |
90 | let newestTrusted |
91 | function recurse(key) { |
92 | let kv = revs[key] |
93 | if (!kv) return |
94 | // too old |
95 | if (newestTrusted && newestTrusted.value.timestamp > kv.value.timestamp) return |
96 | // not trusted |
97 | if (!isDraft(kv.key) && !opts.allowUntrusted && !trusted_keys.includes(kv.value.author)) return links(kv.value, recurse) |
98 | newestTrusted = kv |
99 | } |
100 | if (isDraft(kv.key) || opts.allowUntrusted || trusted_keys.includes(kv.value.author)) return kv |
101 | links(kv.value, recurse) |
102 | //console.log('trusted',kv,newestTrusted) |
103 | return newestTrusted |
104 | } |
105 | |
106 | // sort trusted heads, drafts first, then newest first |
107 | return heads.map(k => trusted(revs[k])).filter( x=>x ).sort(breakTie) |
108 | } |
109 | |
110 | function findKey(key) { |
111 | return Object.keys(slots).find( k => { |
112 | return slots[k].revisions.find( kv => kv.key === key) |
113 | }) |
114 | } |
115 | |
116 | return function(read) { |
117 | pull( |
118 | read, |
119 | pull.filter( kv =>{ |
120 | if (kv.sync) { |
121 | flush() |
122 | if (opts.sync) push(kv) |
123 | return false |
124 | } |
125 | return true |
126 | }), |
127 | |
128 | drain = pull.drain( kv => { |
129 | let {key, value} = kv |
130 | let revRoot |
131 | |
132 | if (!value && kv.type == 'del') { |
133 | revRoot = findKey(key) |
134 | if (!revRoot) console.warn('unable to find', key) |
135 | } else { |
136 | revRoot = (value && value.content && value.content.revisionRoot) || key |
137 | } |
138 | |
139 | let slot = slots[revRoot] |
140 | if (!slot) slot = slots[revRoot] = { |
141 | revisions: [], |
142 | key: revRoot |
143 | } |
144 | |
145 | if (kv.type === 'del') { |
146 | const r = slot.revisions.find( kv => kv.key === key ) |
147 | //console.warn('deletion in slot', slot, r) |
148 | slot.revisions = slot.revisions.filter( kv => kv !== r ) |
149 | if (slot.revisions.length === 0) { |
150 | return push({ |
151 | key: slot.key, |
152 | value: r.value, |
153 | type: 'del' |
154 | }) |
155 | } |
156 | } else { |
157 | if (isDraft(kv.key)) { |
158 | // if this is a draft, the same revision draft might already |
159 | // be in the array |
160 | slot.revisions = slot.revisions.filter( kv => kv.key !== key ) |
161 | } |
162 | slot.revisions.push(kv) |
163 | } |
164 | if (doBuffer) return |
165 | processSlot(slot) |
166 | }, err => { |
167 | // drain ends |
168 | flush() |
169 | out.end(err) |
170 | }) |
171 | ) |
172 | return out.source |
173 | } |
174 | } |
175 | } |
176 | |
177 | // for testing |
178 | module.exports.breakTie = breakTie |
179 |
Built with git-ssb-web