Files: 55fc93a9190c25f467ead205ab8d676b5191dbd4 / lib / plugins / thread.js
2131 bytesRaw
1 | const pull = require('pull-stream') |
2 | const pullCat = require('pull-cat') |
3 | const pullDefer = require('pull-defer') |
4 | const FilterBlocked = require('../filter-blocked') |
5 | |
6 | const getRoot = require('../get-root') |
7 | const sort = require('ssb-sort') |
8 | |
9 | exports.manifest = { |
10 | read: 'source', |
11 | sorted: 'source' |
12 | } |
13 | |
14 | exports.init = function (ssb) { |
15 | return { read, sorted } |
16 | |
17 | function sorted ({ types, live, old, dest, useBlocksFrom }) { |
18 | const includeOld = old == null ? !live : old |
19 | const includeLive = live == null ? !old : live |
20 | const streams = [] |
21 | |
22 | if (includeOld) { |
23 | const sortedOldMessages = pullDefer.source() |
24 | streams.push(sortedOldMessages) |
25 | |
26 | // collect all old messages, sort, then emit all |
27 | pull( |
28 | read({ old: true, live: false, dest }), |
29 | pull.collect((err, msgs) => { |
30 | if (err) return sortedOldMessages.abort(err) |
31 | sortedOldMessages.resolve(pull.values(sort(msgs))) |
32 | }) |
33 | ) |
34 | } |
35 | |
36 | if (includeLive && includeOld) { |
37 | streams.push( |
38 | pull.values([{ sync: true }]) |
39 | ) |
40 | } |
41 | |
42 | if (includeLive) { |
43 | streams.push(read({ live: true, old: false, dest })) |
44 | } |
45 | |
46 | return pull( |
47 | pullCat(streams), |
48 | pull.filter(msg => { |
49 | if (msg.sync) { |
50 | return true |
51 | } |
52 | const type = msg.value.content.type |
53 | return !types || types.includes(type) |
54 | }), |
55 | FilterBlocked([ssb.id].concat(useBlocksFrom), { |
56 | isBlocking: ssb.patchwork.contacts.isBlocking |
57 | }) |
58 | ) |
59 | } |
60 | |
61 | function read ({ reverse = false, limit = null, types = null, live = null, old = null, dest = null }) { |
62 | // TODO: properly handle truncation |
63 | return pull( |
64 | ssb.backlinks.read({ |
65 | private: true, |
66 | awaitReady: false, |
67 | reverse, |
68 | live, |
69 | old, |
70 | index: 'DTA', |
71 | query: [{ $filter: { dest } }] |
72 | }), |
73 | pull.filter(msg => { |
74 | if (msg.sync) return msg |
75 | const type = msg.value.content.type |
76 | const root = getRoot(msg) |
77 | return root === dest && (!types || types.includes(type)) |
78 | }), |
79 | limit ? pull.take(limit) : pull.through() |
80 | ) |
81 | } |
82 | } |
83 |
Built with git-ssb-web