git ssb

7+

dinoworm ๐Ÿ› / patchcore



Tree: 909a17c6e4bb0835b5ae1e22c83f5ea3e933319c

Files: 909a17c6e4bb0835b5ae1e22c83f5ea3e933319c / sbot.js

5828 bytesRaw
1var pull = require('pull-stream')
2var ref = require('ssb-ref')
3var Reconnect = require('pull-reconnect')
4var createClient = require('ssb-client')
5var createFeed = require('ssb-feed')
6var nest = require('depnest')
7var Value = require('mutant/value')
8var ssbKeys = require('ssb-keys')
9
10exports.needs = nest({
11 'config.sync.load': 'first',
12 'keys.sync.load': 'first',
13 'sbot.obs.connectionStatus': 'first',
14 'sbot.hook.feed': 'map'
15})
16
17exports.gives = {
18 sbot: {
19 sync: {
20 cache: true
21 },
22 async: {
23 get: true,
24 publish: true,
25 addBlob: true,
26 gossipConnect: true
27 },
28 pull: {
29 log: true,
30 userFeed: true,
31 query: true,
32 feed: true,
33 links: true,
34 search: true,
35 replicateProgress: true,
36 queryProgress: true
37 },
38 obs: {
39 connectionStatus: true,
40 connectedPeers: true,
41 localPeers: true
42 }
43 }
44}
45
46exports.create = function (api) {
47 const config = api.config.sync.load()
48 const keys = api.keys.sync.load()
49 var cache = {}
50
51 var sbot = null
52 var connectionStatus = Value()
53 var connectedPeers = Value([])
54 var localPeers = Value([])
55
56 setInterval(refreshPeers, 1e3)
57
58 var rec = Reconnect(function (isConn) {
59 function notify (value) {
60 isConn(value); connectionStatus.set(value)
61 }
62
63 createClient(keys, config, function (err, _sbot) {
64 if (err) {
65 return notify(err)
66 }
67
68 sbot = _sbot
69 sbot.on('closed', function () {
70 sbot = null
71 notify(new Error('closed'))
72 })
73
74 notify()
75 refreshPeers()
76 })
77 })
78
79 var internal = {
80 getLatest: rec.async(function (id, cb) {
81 sbot.getLatest(id, cb)
82 }),
83 add: rec.async(function (msg, cb) {
84 sbot.add(msg, cb)
85 })
86 }
87
88 var feed = createFeed(internal, keys, {remote: true})
89
90 return {
91 sbot: {
92 sync: {
93 cache: () => cache
94 },
95 async: {
96 get: rec.async(function (key, cb) {
97 if (typeof cb !== 'function') {
98 throw new Error('cb must be function')
99 }
100 if (cache[key]) cb(null, cache[key])
101 else {
102 sbot.get(key, function (err, value) {
103 if (err) return cb(err)
104 runHooks({key, value})
105 cb(null, value)
106 })
107 }
108 }),
109 publish: rec.async((content, cb) => {
110 if (content.recps) {
111 content = ssbKeys.box(content, content.recps.map(e => {
112 return ref.isFeed(e) ? e : e.link
113 }))
114 } else if (content.mentions) {
115 content.mentions.forEach(mention => {
116 if (ref.isBlob(mention.link)) {
117 sbot.blobs.push(mention.link, err => {
118 if (err) console.error(err)
119 })
120 }
121 })
122 }
123
124 feed.add(content, (err, msg) => {
125 if (err) console.error(err)
126 else if (!cb) console.log(msg)
127 cb && cb(err, msg)
128 })
129 }),
130 addBlob: rec.async((stream, cb) => {
131 return pull(
132 stream,
133 Hash(function (err, id) {
134 if (err) return cb(err)
135 // completely UGLY hack to tell when the blob has been sucessfully written...
136 var start = Date.now()
137 var n = 5
138 next()
139
140 function next () {
141 setTimeout(function () {
142 sbot.blobs.has(id, function (_, has) {
143 if (has) return cb(null, id)
144 if (n--) next()
145 else cb(new Error('write failed'))
146 })
147 }, Date.now() - start)
148 }
149 }),
150 sbot.blobs.add()
151 )
152 }),
153 gossipConnect: rec.async(function (opts, cb) {
154 sbot.gossip.connect(opts, cb)
155 })
156 },
157 pull: {
158 query: rec.source(query => {
159 return sbot.query.read(query)
160 }),
161 userFeed: rec.source(opts => {
162 return sbot.createUserStream(opts)
163 }),
164 feed: rec.source(function (opts) {
165 return pull(
166 sbot.createFeedStream(opts),
167 pull.through(runHooks)
168 )
169 }),
170 log: rec.source(opts => {
171 return pull(
172 sbot.createLogStream(opts),
173 pull.through(runHooks)
174 )
175 }),
176 links: rec.source(function (query) {
177 return sbot.links(query)
178 }),
179 search: rec.source(function (opts) {
180 return sbot.fulltext.search(opts)
181 }),
182 replicateProgress: rec.source(function (opts) {
183 return sbot.replicate.changes()
184 }),
185 queryProgress: rec.source(function (opts) {
186 return sbot.query.progress()
187 })
188 },
189 obs: {
190 connectionStatus: (listener) => connectionStatus(listener),
191 connectedPeers: () => connectedPeers,
192 localPeers: () => localPeers
193 }
194 }
195 }
196
197 // scoped
198
199 function runHooks (msg) {
200 if (!cache[msg.key]) {
201 cache[msg.key] = msg.value
202 api.sbot.hook.feed(msg)
203 }
204 }
205
206 function refreshPeers () {
207 if (sbot) {
208 sbot.gossip.peers((err, peers) => {
209 if (err) throw console.log(err)
210 connectedPeers.set(peers.filter(x => x.state === 'connected').map(x => x.key))
211 localPeers.set(peers.filter(x => x.source === 'local').map(x => x.key))
212 })
213 }
214 }
215}
216
217function Hash (onHash) {
218 var buffers = []
219 return pull.through(function (data) {
220 buffers.push(typeof data === 'string'
221 ? new Buffer(data, 'utf8')
222 : data
223 )
224 }, function (err) {
225 if (err && !onHash) throw err
226 var b = buffers.length > 1 ? Buffer.concat(buffers) : buffers[0]
227 var h = '&' + ssbKeys.hash(b)
228 onHash && onHash(err, h)
229 })
230}
231

Built with git-ssb-web