Files: 8b77d1c1cfb68a98aae007dcf1ca78189c1dd0be / db.js
3367 bytesRaw
1 | var pull = require('pull-stream') |
2 | var pl = require('pull-level') |
3 | var urls = require('./urls') |
4 | var config = require('./config') |
5 | var Sublevel = require('level-sublevel/bytewise') |
6 | var retrive = require('./') |
7 | var cont = require('cont') |
8 | var crypto = require('crypto') |
9 | |
10 | var db = Sublevel(require('level')(config.db, {valueEncoding: 'json'})) |
11 | var rawdb = db.sublevel('raw') |
12 | var sample = db.sublevel('sample') |
13 | var meta = db.sublevel('meta') |
14 | var waiting = [] |
15 | function hash (data) { |
16 | return crypto.createHash('sha256').update(JSON.stringify(data, null, 2)).digest('hex') |
17 | } |
18 | |
19 | db.update = function (cb) { |
20 | cont.para( |
21 | Object.keys(urls).map(function (name) { |
22 | return function (cb) { |
23 | retrive(name, function (err, data) { |
24 | if(err) return cb(err) |
25 | var text = JSON.stringify(data, null, 2) |
26 | var id = hash(text) |
27 | rawdb.get(id, function (err) { |
28 | return cb( |
29 | null, |
30 | err ? { key: id, value: data, type: 'put', prefix: rawdb } : null |
31 | ) |
32 | }) |
33 | }) |
34 | } |
35 | }) |
36 | ) (function (err, batch) { |
37 | if(err) return cb(err) |
38 | var ts = Date.now() |
39 | batch.filter(Boolean).forEach(function (e) { |
40 | batch.push({key: [e.value.name, ts], value: e.key, type: 'put', prefix: sample}) |
41 | batch.push({key: e.value.name, value: e.key, type: 'put', prefix: db}) |
42 | }) |
43 | batch.push({ |
44 | key: 'ts', value: Date.now(), type: 'put', prefix: meta |
45 | }) |
46 | if(!batch.length) return cb(null, []) |
47 | // console.error(batch) |
48 | batch = batch.filter(Boolean) |
49 | db.batch(batch, function (err) { |
50 | cb(null, batch.map(function (e) { |
51 | return {key: e.key, value: e.value} |
52 | })) |
53 | }) |
54 | }) |
55 | } |
56 | |
57 | db.getLast = function (cb) { |
58 | meta.get('ts', cb) |
59 | } |
60 | |
61 | function summary (value) { |
62 | var forecast = value.forecast |
63 | var m, i = 0 |
64 | if(!value.issued) return |
65 | var day = value.issued.substring(value.issued.indexOf('day')+4) |
66 | var date = new Date(day) |
67 | var month = /\d+ (\w+)/i.exec(day)[1] |
68 | var rx = /((?:north|south|east|west)(?:east|west)?(?:erly)?)\s+(\d+)\s+knots/gi |
69 | var winds = [] |
70 | do { |
71 | i++ |
72 | m = rx.exec(forecast) |
73 | if(m) { |
74 | winds.push({direction: m[1].toLowerCase(), strength: +m[2]}) |
75 | } |
76 | } while (i ,m) |
77 | if(winds.length == 0) return |
78 | return { |
79 | name: value.name, |
80 | month: month, |
81 | winds: winds |
82 | } |
83 | } |
84 | |
85 | var ts = Date.now() |
86 | |
87 | function reduce (state, forecast) { |
88 | var fstate = state[forecast.name] = state[forecast.name] || {} |
89 | var wstate = fstate[forecast.month] = fstate[forecast.month] || {winds: {}, count: 0} |
90 | forecast.winds.forEach(function (w) { |
91 | var wind = w.direction+' '+w.strength |
92 | wstate.winds[wind] = (wstate[wind] || 0) + 1 |
93 | wstate.count ++ |
94 | }) |
95 | |
96 | if(Date.now() > ts + 10000) |
97 | console.error(JSON.stringify(state, null, 2)) |
98 | return state |
99 | } |
100 | |
101 | var summary_data |
102 | |
103 | db.summary = function (cb) { |
104 | if(summary_data) return cb(null, summary_data) |
105 | else waiting.push(cb) |
106 | |
107 | pull( |
108 | pl.read(sample), |
109 | pull.asyncMap(function (index, cb) { |
110 | rawdb.get(index.value, function (err, value) { |
111 | cb(null, summary(value)) |
112 | }) |
113 | }), |
114 | pull.filter(), |
115 | pull.reduce(reduce, function (err, data) { |
116 | if(!err) summary_data = data |
117 | while(waiting.length) |
118 | waiting.shift()(err, data) |
119 | }) |
120 | ) |
121 | } |
122 | |
123 | module.exports = db |
124 | |
125 | if(!module.parent) |
126 | db.summary(function (err, data) { |
127 | console.log(data) |
128 | }) |
129 | |
130 | |
131 |
Built with git-ssb-web