Files: cffa78292325759c294a642e710938fdb67ebbb4 / lib / query-with-progress.js
3026 bytesRaw
1 | |
2 | var pull = require('pull-stream') |
3 | var path = require('path') |
4 | var Links = require('streamview-links') |
5 | var explain = require('explain-error') |
6 | var Notify = require('pull-notify') |
7 | var Value = require('mutant/value') |
8 | var watchThrottle = require('mutant/watch-throttle') |
9 | |
10 | exports.name = 'query' |
11 | exports.version = '0.1.2' |
12 | exports.manifest = { |
13 | read: 'source', dump: 'source', progress: 'source' |
14 | } |
15 | |
16 | var indexes = [ |
17 | {key: 'clk', value: [['value', 'author'], ['value', 'sequence'], 'timestamp'] }, |
18 | {key: 'typ', value: [['value', 'content', 'type'], 'timestamp'] }, |
19 | {key: 'hsh', value: ['key', 'timestamp']}, |
20 | {key: 'cha', value: [['value', 'content', 'channel'], 'timestamp'] }, |
21 | // {key: 'aty', value: [['value', 'author'], ['value', 'content', 'type'], 'ts']} |
22 | ] |
23 | |
24 | //createHistoryStream( id, seq ) |
25 | //[{$filter: {author: <id>, sequence: {$gt: <seq>}}}, {$map: true}] |
26 | |
27 | //messagesByType (type) |
28 | |
29 | //[{$filter: {content: {type: <type>}}}, {$map: true}] |
30 | |
31 | exports.init = function (ssb, config) { |
32 | |
33 | var dir = path.join(config.path, 'query') |
34 | |
35 | var version = 13 |
36 | //it's really nice to tweak a few things |
37 | //and then change the version number, |
38 | //restart the server and have it regenerate the indexes, |
39 | //all consistent again. |
40 | function id (e, emit) { |
41 | return emit(e) |
42 | } |
43 | |
44 | var links = Links(dir, indexes, id, version) |
45 | var notify = Notify() |
46 | var pending = Value(0) |
47 | |
48 | watchThrottle(pending, 200, (value) => { |
49 | notify({pending: Math.max(0, value)}) |
50 | }) |
51 | |
52 | links.init(function (err, since) { |
53 | countChanges(since, function (err, changes) { |
54 | if (err) throw err |
55 | pending.set(changes) |
56 | onChange(() => { |
57 | pending.set(pending() + 1) |
58 | }) |
59 | pull( |
60 | ssb.createLogStream({gt: since || 0, live: true, sync: false}), |
61 | pull.through(function () { |
62 | pending.set(pending() - 1) |
63 | }), |
64 | links.write(function (err) { |
65 | if(err) throw err |
66 | }) |
67 | ) |
68 | }) |
69 | }) |
70 | |
71 | return { |
72 | dump: function () { |
73 | return links.dump() |
74 | }, |
75 | |
76 | read: function (opts) { |
77 | if(opts && 'string' == typeof opts) |
78 | try { opts = {query: JSON.parse(opts) } } catch (err) { |
79 | return pull.error(err) |
80 | } |
81 | return links.read(opts, function (ts, cb) { |
82 | ssb.sublevel('log').get(ts, function (err, key) { |
83 | if(err) return cb(explain(err, 'missing timestamp:'+ts)) |
84 | ssb.get(key, function (err, value) { |
85 | if(err) return cb(explain(err, 'missing key:'+key)) |
86 | cb(null, {key: key, value: value, timestamp: ts}) |
87 | }) |
88 | }) |
89 | }) |
90 | }, |
91 | |
92 | progress: notify.listen |
93 | } |
94 | |
95 | function countChanges (since, cb) { |
96 | var result = 0 |
97 | pull( |
98 | ssb.createLogStream({gt: since || 0, keys: false, values: false}), |
99 | pull.drain(function () { |
100 | result += 1 |
101 | }, function (err) { |
102 | cb(err, result) |
103 | }) |
104 | ) |
105 | } |
106 | |
107 | function onChange (cb) { |
108 | pull( |
109 | ssb.createLogStream({keys: false, values: false, old: false}), |
110 | pull.drain(function () { |
111 | cb() |
112 | }) |
113 | ) |
114 | } |
115 | } |
116 |
Built with git-ssb-web