git ssb

10+

Matt McKegg / patchwork



Tree: 5751bfc7a66e62b9d47d5cc86f6a753fae3f41a0

Files: 5751bfc7a66e62b9d47d5cc86f6a753fae3f41a0 / lib / query-with-progress.js

3026 bytesRaw
1
2var pull = require('pull-stream')
3var path = require('path')
4var Links = require('streamview-links')
5var explain = require('explain-error')
6var Notify = require('pull-notify')
7var Value = require('mutant/value')
8var watchThrottle = require('mutant/watch-throttle')
9
10exports.name = 'query'
11exports.version = '0.1.2'
12exports.manifest = {
13 read: 'source', dump: 'source', progress: 'source'
14}
15
16var indexes = [
17 {key: 'clk', value: [['value', 'author'], ['value', 'sequence'], 'timestamp'] },
18 {key: 'typ', value: [['value', 'content', 'type'], 'timestamp'] },
19 {key: 'hsh', value: ['key', 'timestamp']},
20 {key: 'cha', value: [['value', 'content', 'channel'], 'timestamp'] },
21// {key: 'aty', value: [['value', 'author'], ['value', 'content', 'type'], 'ts']}
22]
23
24//createHistoryStream( id, seq )
25//[{$filter: {author: <id>, sequence: {$gt: <seq>}}}, {$map: true}]
26
27//messagesByType (type)
28
29//[{$filter: {content: {type: <type>}}}, {$map: true}]
30
31exports.init = function (ssb, config) {
32
33 var dir = path.join(config.path, 'query')
34
35 var version = 13
36 //it's really nice to tweak a few things
37 //and then change the version number,
38 //restart the server and have it regenerate the indexes,
39 //all consistent again.
40 function id (e, emit) {
41 return emit(e)
42 }
43
44 var links = Links(dir, indexes, id, version)
45 var notify = Notify()
46 var pending = Value(0)
47
48 watchThrottle(pending, 200, (value) => {
49 notify({pending: Math.max(0, value)})
50 })
51
52 links.init(function (err, since) {
53 countChanges(since, function (err, changes) {
54 if (err) throw err
55 pending.set(changes)
56 onChange(() => {
57 pending.set(pending() + 1)
58 })
59 pull(
60 ssb.createLogStream({gt: since || 0, live: true, sync: false}),
61 pull.through(function () {
62 pending.set(pending() - 1)
63 }),
64 links.write(function (err) {
65 if(err) throw err
66 })
67 )
68 })
69 })
70
71 return {
72 dump: function () {
73 return links.dump()
74 },
75
76 read: function (opts) {
77 if(opts && 'string' == typeof opts)
78 try { opts = {query: JSON.parse(opts) } } catch (err) {
79 return pull.error(err)
80 }
81 return links.read(opts, function (ts, cb) {
82 ssb.sublevel('log').get(ts, function (err, key) {
83 if(err) return cb(explain(err, 'missing timestamp:'+ts))
84 ssb.get(key, function (err, value) {
85 if(err) return cb(explain(err, 'missing key:'+key))
86 cb(null, {key: key, value: value, timestamp: ts})
87 })
88 })
89 })
90 },
91
92 progress: notify.listen
93 }
94
95 function countChanges (since, cb) {
96 var result = 0
97 pull(
98 ssb.createLogStream({gt: since || 0, keys: false, values: false}),
99 pull.drain(function () {
100 result += 1
101 }, function (err) {
102 cb(err, result)
103 })
104 )
105 }
106
107 function onChange (cb) {
108 pull(
109 ssb.createLogStream({keys: false, values: false, old: false}),
110 pull.drain(function () {
111 cb()
112 })
113 )
114 }
115}
116

Built with git-ssb-web