git ssb

0+

regular / ssb-cms



Tree: 43af34efb145e2705ff168ae3e6f3cc6832314ac

Files: 43af34efb145e2705ff168ae3e6f3cc6832314ac / update-stream.js

5141 bytesRaw
1const pull = require('pull-stream')
2const pushable = require('pull-pushable')
3const {isDraft} = require('./util')
4const ssbsort = require('./ssb-sort')(links, breakTie)
5
6function links(value, each) {
7 if (!value) return
8 let revBranches = (value.content && value.content.revisionBranch) || []
9 let links = Array.isArray(revBranches) ? revBranches : [revBranches]
10
11 let fromDraft = value.content && value.content['from-draft']
12 if (fromDraft) links.push(fromDraft)
13
14 links.forEach(each)
15}
16
17function breakTie(a, b) {
18 return (
19 // drafts after non-drafts
20 (isDraft(b.key) - isDraft(a.key)) ||
21 //declared timestamp, may by incorrect or a lie
22 (b.value.timestamp - a.value.timestamp) ||
23 //finially, sort hashes lexiegraphically.
24 (a.key > b.key ? -1 : a.key < b.key ? 1 : 0)
25 )
26}
27
28module.exports = function(trusted_keys) {
29 trusted_keys = trusted_keys || []
30
31 return function updates(opts) {
32 opts = opts || {}
33 const slots = {}
34 let doBuffer = opts.bufferUntilSync // in sync mode, we buffer until we see a sync
35 let drain
36
37 const out = pushable(true, function (err) {
38 if (err) console.error('out stream closed by client!', err)
39 drain.abort(err)
40 })
41
42 function push(o) {
43 if (!doBuffer) out.push(o)
44 }
45
46 function flush() {
47 if (!doBuffer) return
48 doBuffer = false
49 Object.keys(slots).forEach( k => {
50 processSlot(slots[k])
51 })
52 }
53
54 function processSlot(slot) {
55 if (slot.revisions.length === 0) return
56 if (opts.allRevisions) {
57 const revisions = ssbsort(slot.revisions)
58 const fingerprint = revisions.map( kv => kv.key ).join('-')
59 if (slot.fingerprint !== fingerprint) {
60 slot.fingerprint = fingerprint
61 push({
62 key: slot.key,
63 revisions: revisions.slice()
64 })
65 }
66 } else {
67 const newHeads = heads(slot.revisions)
68 if (slot.currKv !== newHeads[0]) {
69 slot.currKv = newHeads[0]
70 push({
71 key: slot.key,
72 value: slot.currKv && slot.currKv.value,
73 revision: slot.currKv && slot.currKv.key,
74 heads: newHeads,
75
76 // for compatibility
77 unsaved: isDraft(slot.currKv && slot.currKv.key),
78 })
79 }
80 }
81 }
82
83 function heads(revisions) {
84 let heads = ssbsort.heads(revisions)
85
86 const revs = {}
87 revisions.forEach( kv => revs[kv.key] = kv )
88
89 function trusted(kv) {
90 let newestTrusted
91 function recurse(key) {
92 let kv = revs[key]
93 if (!kv) return
94 // too old
95 if (newestTrusted && newestTrusted.value.timestamp > kv.value.timestamp) return
96 // not trusted
97 if (!isDraft(kv.key) && !opts.allowUntrusted && !trusted_keys.includes(kv.value.author)) return links(kv.value, recurse)
98 newestTrusted = kv
99 }
100 if (isDraft(kv.key) || opts.allowUntrusted || trusted_keys.includes(kv.value.author)) return kv
101 links(kv.value, recurse)
102 //console.log('trusted',kv,newestTrusted)
103 return newestTrusted
104 }
105
106 // sort trusted heads, drafts first, then newest first
107 return heads.map(k => trusted(revs[k])).filter( x=>x ).sort(breakTie)
108 }
109
110 function findKey(key) {
111 return Object.keys(slots).find( k => {
112 return slots[k].revisions.find( kv => kv.key === key)
113 })
114 }
115
116 return function(read) {
117 pull(
118 read,
119 pull.filter( kv =>{
120 if (kv.sync) {
121 flush()
122 if (opts.sync) push(kv)
123 return false
124 }
125 return true
126 }),
127
128 drain = pull.drain( kv => {
129 let {key, value} = kv
130 let revRoot
131
132 if (!value && kv.type == 'del') {
133 revRoot = findKey(key)
134 if (!revRoot) console.warn('unable to find', key)
135 } else {
136 revRoot = (value && value.content && value.content.revisionRoot) || key
137 }
138
139 let slot = slots[revRoot]
140 if (!slot) slot = slots[revRoot] = {
141 revisions: [],
142 key: revRoot
143 }
144
145 if (kv.type === 'del') {
146 const r = slot.revisions.find( kv => kv.key === key )
147 //console.warn('deletion in slot', slot, r)
148 slot.revisions = slot.revisions.filter( kv => kv !== r )
149 if (slot.revisions.length === 0) {
150 return push({
151 key: slot.key,
152 value: r.value,
153 type: 'del'
154 })
155 }
156 } else {
157 if (isDraft(kv.key)) {
158 // if this is a draft, the same revision draft might already
159 // be in the array
160 slot.revisions = slot.revisions.filter( kv => kv.key !== key )
161 }
162 slot.revisions.push(kv)
163 }
164 if (doBuffer) return
165 processSlot(slot)
166 }, err => {
167 // drain ends
168 flush()
169 out.end(err)
170 })
171 )
172 return out.source
173 }
174 }
175}
176
177// for testing
178module.exports.breakTie = breakTie
179

Built with git-ssb-web