git ssb

1+

Daan Patchwork / patchwork



Tree: 401c797bb92a373ffa4499d47ecea37f75e2a16e

Files: 401c797bb92a373ffa4499d47ecea37f75e2a16e / lib / pull-resume.js

1484 bytesRaw
1const pull = require('pull-stream')
2const pullCat = require('pull-cat')
3const Next = require('pull-next')
4const extend = require('xtend')
5
6module.exports = {
7 source: function (stream, { getResume, limit, filterMap }) {
8 if (limit) {
9 const marker = { marker: true, resume: null }
10 let count = 0
11
12 return pullCat([
13 pull(
14 stream,
15 pull.take(limit),
16 pull.through(msg => {
17 if (!msg.sync) {
18 marker.resume = getResume(msg)
19 }
20
21 count += 1
22 }),
23 filterMap
24 ),
25
26 pull(
27 // send truncated marker for resuming search
28 pull.values([marker]),
29
30 // don't emit the resume if we're at the end of the stream
31 pull.filter(() => count === limit)
32 )
33 ])
34 } else {
35 return pull(
36 stream,
37 filterMap
38 )
39 }
40 },
41 remote: function (getStream, opts) {
42 let started = false
43 let lastMessage = null
44
45 return Next(function () {
46 if (started && (!lastMessage || lastMessage.resume == null)) return
47 started = true
48
49 const subOpts = extend(opts, {
50 resume: (lastMessage && lastMessage.resume) || undefined
51 })
52
53 lastMessage = null
54
55 return pull(
56 getStream(subOpts),
57 pull.through(function (msg) {
58 lastMessage = msg
59 }),
60 pull.filter(msg => {
61 return !(msg && msg.resume)
62 })
63 )
64 })
65 }
66}
67

Built with git-ssb-web