git ssb

1+

Daan Patchwork / patchwork



Tree: 000777db8b0f37f6c1d99fc608e4395c87512348

Files: 000777db8b0f37f6c1d99fc608e4395c87512348 / lib / plugins / thread.js

2131 bytesRaw
1const pull = require('pull-stream')
2const pullCat = require('pull-cat')
3const pullDefer = require('pull-defer')
4const FilterBlocked = require('../filter-blocked')
5
6const getRoot = require('../get-root')
7const sort = require('ssb-sort')
8
9exports.manifest = {
10 read: 'source',
11 sorted: 'source'
12}
13
14exports.init = function (ssb) {
15 return { read, sorted }
16
17 function sorted ({ types, live, old, dest, useBlocksFrom }) {
18 const includeOld = old == null ? !live : old
19 const includeLive = live == null ? !old : live
20 const streams = []
21
22 if (includeOld) {
23 const sortedOldMessages = pullDefer.source()
24 streams.push(sortedOldMessages)
25
26 // collect all old messages, sort, then emit all
27 pull(
28 read({ old: true, live: false, dest }),
29 pull.collect((err, msgs) => {
30 if (err) return sortedOldMessages.abort(err)
31 sortedOldMessages.resolve(pull.values(sort(msgs)))
32 })
33 )
34 }
35
36 if (includeLive && includeOld) {
37 streams.push(
38 pull.values([{ sync: true }])
39 )
40 }
41
42 if (includeLive) {
43 streams.push(read({ live: true, old: false, dest }))
44 }
45
46 return pull(
47 pullCat(streams),
48 pull.filter(msg => {
49 if (msg.sync) {
50 return true
51 }
52 const type = msg.value.content.type
53 return !types || types.includes(type)
54 }),
55 FilterBlocked([ssb.id].concat(useBlocksFrom), {
56 isBlocking: ssb.patchwork.contacts.isBlocking
57 })
58 )
59 }
60
61 function read ({ reverse = false, limit = null, types = null, live = null, old = null, dest = null }) {
62 // TODO: properly handle truncation
63 return pull(
64 ssb.backlinks.read({
65 private: true,
66 awaitReady: false,
67 reverse,
68 live,
69 old,
70 index: 'DTA',
71 query: [{ $filter: { dest } }]
72 }),
73 pull.filter(msg => {
74 if (msg.sync) return msg
75 const type = msg.value.content.type
76 const root = getRoot(msg)
77 return root === dest && (!types || types.includes(type))
78 }),
79 limit ? pull.take(limit) : pull.through()
80 )
81 }
82}
83

Built with git-ssb-web