var FlumeViewLevel = require('flumeview-level') var pull = require('pull-stream') var cat = require('pull-cat') function map(msg) { var author = msg.value.author var ts = msg.value.timestamp if (!ts) return [] var day = ts / 86400000 var week = day / 7 var month = day / 30.43666666666666666666 // 365.24 / 12 // record that the feed author was active this day, week and month return [ ['day', Math.floor(day), author], ['week', Math.floor(week), author], ['month', Math.floor(month), author] ] } var validSeries = { day: true, week: true, month: true } function groupByIds(opts) { var prevTime, ids var listIds = Boolean(opts.ids) return pull.map(function (indexed) { var key = indexed.key var id = key[2] var time = key[1] if (time === prevTime) { // count unique feed ids in the time interval if (listIds) ids.push(id) else ids++ } else { var ret = prevTime && {time: prevTime, ids: ids} prevTime = time ids = listIds ? [id] : 1 return ret } }) } exports.name = 'activity' exports.version = '0.0.0' exports.manifest = { read: 'source' } exports.init = function (sbot, config) { var version = 2 var deleteTheDb = false var view = FlumeViewLevel(deleteTheDb, map) var activity = sbot._flumeUse('activity' + version, view) return { read: function (opts) { if (!opts) return pull.error(new TypeError('Missing opts')) var series = opts.series if (!series) return pull.error(new TypeError('Missing opts.series')) if (!validSeries[series]) return pull.error(new TypeError('Invalid series')) var readOpts = {} var groupOpts = { ids: opts.ids } for (var k in opts) { if (k === 'series' || k === 'ids') continue var value = opts[k] // transform key ranges from query opts into internal key ranges if (k === 'gt' || k === 'gte') value = [series, value, null] else if (k === 'lt' || k === 'lte') value = [series, value, undefined] readOpts[k] = value } if (!readOpts.gt && !readOpts.gte) readOpts.gt = [series, null, null] if (!readOpts.lt && !readOpts.lte) readOpts.lt = [series, undefined, undefined] return pull( // add object at end of stream to make the last group-by interval work cat([activity.read(readOpts), pull.once({key: []})]), groupByIds(groupOpts), pull.filter(Boolean) ) } } }