Files: 1b789e63aa59f5d3e5539a4432c8b3c466977a0e / lib / pull-resume.js
1484 bytesRaw
1 | const pull = require('pull-stream') |
2 | const pullCat = require('pull-cat') |
3 | const Next = require('pull-next') |
4 | const extend = require('xtend') |
5 | |
6 | module.exports = { |
7 | source: function (stream, { getResume, limit, filterMap }) { |
8 | if (limit) { |
9 | const marker = { marker: true, resume: null } |
10 | let count = 0 |
11 | |
12 | return pullCat([ |
13 | pull( |
14 | stream, |
15 | pull.take(limit), |
16 | pull.through(msg => { |
17 | if (!msg.sync) { |
18 | marker.resume = getResume(msg) |
19 | } |
20 | |
21 | count += 1 |
22 | }), |
23 | filterMap |
24 | ), |
25 | |
26 | pull( |
27 | // send truncated marker for resuming search |
28 | pull.values([marker]), |
29 | |
30 | // don't emit the resume if we're at the end of the stream |
31 | pull.filter(() => count === limit) |
32 | ) |
33 | ]) |
34 | } else { |
35 | return pull( |
36 | stream, |
37 | filterMap |
38 | ) |
39 | } |
40 | }, |
41 | remote: function (getStream, opts) { |
42 | let started = false |
43 | let lastMessage = null |
44 | |
45 | return Next(function () { |
46 | if (started && (!lastMessage || lastMessage.resume == null)) return |
47 | started = true |
48 | |
49 | const subOpts = extend(opts, { |
50 | resume: (lastMessage && lastMessage.resume) || undefined |
51 | }) |
52 | |
53 | lastMessage = null |
54 | |
55 | return pull( |
56 | getStream(subOpts), |
57 | pull.through(function (msg) { |
58 | lastMessage = msg |
59 | }), |
60 | pull.filter(msg => { |
61 | return !(msg && msg.resume) |
62 | }) |
63 | ) |
64 | }) |
65 | } |
66 | } |
67 |
Built with git-ssb-web