Files: bb0b78e35453aa2f936f01fe4b2cf30c15f55b53 / db.js
3444 bytesRaw
1 | var pull = require('pull-stream') |
2 | var pl = require('pull-level') |
3 | var urls = require('./urls') |
4 | var config = require('./config') |
5 | var Sublevel = require('level-sublevel/bytewise') |
6 | var retrive = require('./') |
7 | var cont = require('cont') |
8 | var crypto = require('crypto') |
9 | |
10 | var db = Sublevel(require('level')(config.db, {valueEncoding: 'json'})) |
11 | var rawdb = db.sublevel('raw') |
12 | var sample = db.sublevel('sample') |
13 | var meta = db.sublevel('meta') |
14 | var waiting = [] |
15 | function hash (data) { |
16 | return crypto.createHash('sha256').update(JSON.stringify(data, null, 2)).digest('hex') |
17 | } |
18 | |
19 | db.update = function (cb) { |
20 | cont.para( |
21 | Object.keys(urls).map(function (name) { |
22 | return function (cb) { |
23 | retrive(name, function (err, data) { |
24 | if(err) return cb(err) |
25 | var text = JSON.stringify(data, null, 2) |
26 | var id = hash(text) |
27 | rawdb.get(id, function (err) { |
28 | return cb( |
29 | null, |
30 | err ? { key: id, value: data, type: 'put', prefix: rawdb } : null |
31 | ) |
32 | }) |
33 | }) |
34 | } |
35 | }) |
36 | ) (function (err, batch) { |
37 | if(err) return cb(err) |
38 | var ts = Date.now() |
39 | batch.filter(Boolean).forEach(function (e) { |
40 | batch.push({key: [e.value.name, ts], value: e.key, type: 'put', prefix: sample}) |
41 | batch.push({key: e.value.name, value: e.key, type: 'put', prefix: db}) |
42 | }) |
43 | batch.push({ |
44 | key: 'ts', value: Date.now(), type: 'put', prefix: meta |
45 | }) |
46 | if(!batch.length) return cb(null, []) |
47 | // console.error(batch) |
48 | batch = batch.filter(Boolean) |
49 | db.batch(batch, function (err) { |
50 | cb(null, batch.map(function (e) { |
51 | return {key: e.key, value: e.value} |
52 | })) |
53 | }) |
54 | }) |
55 | } |
56 | |
57 | db.getLast = function (cb) { |
58 | meta.get('ts', cb) |
59 | } |
60 | |
61 | function summary (value) { |
62 | var forecast = value.forecast |
63 | var m, i = 0 |
64 | if(!value.issued) return |
65 | var day = value.issued.substring(value.issued.indexOf('day')+4) |
66 | var date = new Date(day) |
67 | var month = /\d+ (\w+)/i.exec(day)[1] |
68 | var rx = /((?:north|south|east|west)(?:east|west)?(?:erly)?)\s+(\d+)\s+knots/gi |
69 | var winds = [] |
70 | do { |
71 | i++ |
72 | m = rx.exec(forecast) |
73 | if(m) { |
74 | winds.push({direction: m[1].toLowerCase(), strength: +m[2]}) |
75 | } |
76 | } while (i ,m) |
77 | if(winds.length == 0) return |
78 | return { |
79 | name: value.name, |
80 | month: month, |
81 | winds: winds |
82 | } |
83 | } |
84 | |
85 | var ts = Date.now() |
86 | |
87 | function reduce (state, forecast) { |
88 | state = state || {} |
89 | var fstate = state[forecast.name] = state[forecast.name] || {} |
90 | var wstate = fstate[forecast.month] = fstate[forecast.month] || {winds: {}, count: 0} |
91 | forecast.winds.forEach(function (w) { |
92 | var wind = w.direction+' '+w.strength |
93 | wstate.winds[wind] = (wstate[wind] || 0) + 1 |
94 | wstate.count ++ |
95 | }) |
96 | |
97 | if(Date.now() > ts + 10000) { |
98 | ts = Date.now() |
99 | console.error(JSON.stringify(state, null, 2)) |
100 | } |
101 | return state |
102 | } |
103 | |
104 | var summary_data |
105 | |
106 | db.summary = function (cb) { |
107 | if(summary_data) return cb(null, summary_data) |
108 | else waiting.push(cb) |
109 | |
110 | pull( |
111 | pl.read(sample), |
112 | pull.asyncMap(function (index, cb) { |
113 | rawdb.get(index.value, function (err, value) { |
114 | cb(null, summary(value)) |
115 | }) |
116 | }), |
117 | pull.filter(), |
118 | pull.reduce(reduce, {}, function (err, data) { |
119 | if(!err) summary_data = data |
120 | while(waiting.length) |
121 | waiting.shift()(err, data) |
122 | }) |
123 | ) |
124 | } |
125 | |
126 | module.exports = db |
127 | |
128 | if(!module.parent) |
129 | db.summary(function (err, data) { |
130 | console.log(JSON.stringify(data, null, 2)) |
131 | }) |
132 | |
133 | |
134 |
Built with git-ssb-web