git ssb

0+

mixmix / flume-intro



Tree: 1b92defc7a364a894997eabdeefb6b94f0c1230b

Files: 1b92defc7a364a894997eabdeefb6b94f0c1230b / 6_flume_view_reading_source.js

1171 bytesRaw
1var Flume = require('flumedb')
2var FlumeLog = require('flumelog-offset')
3var codec = require('flumecodec')
4var pull = require('pull-stream')
5
6var FlumeView = require('flumeview-reduce')
7
8var log = FlumeLog('demo_log', { codec: codec.json })
9
10var db = Flume(log)
11 .use('stats', FlumeView(
12 2, //version
13 reducer,
14 (msg) => msg.count, // map,
15 null, // codec
16 {
17 sum: 0,
18 squareSum: 0
19 } //initial
20 ))
21 // .use('someOtherView', otherView)
22
23function reducer (acc, val) { // reducer,
24 console.log('+')
25 return {
26 sum: acc.sum + val,
27 squareSum: acc.squareSum + val*val
28 }
29}
30
31var viewState
32pull(
33 db.stats.stream({live: true}),
34 pull.drain(val => {
35 if (!viewState) viewState = val
36 else viewState = reducer(viewState, val)
37 console.log(viewState)
38 })
39)
40
41// writing pull-stream
42pull(
43 pull.values([1,2,3,4,5,6,7]),
44 pull.asyncMap((val, cb) => setTimeout(
45 () => cb(null, val),
46 500
47 )),
48 pull.drain(
49 val => db.append({ count: val }, (err, seq) => {
50 console.log(`FlumeDB.append: appended '${val}' at offset of ${seq} bytes`)
51 }),
52 () => {
53 console.log('DONE: append stream')
54 }
55 )
56)
57
58

Built with git-ssb-web