git ssb

7+

dinoworm ๐Ÿ› / patchcore



Tree: e9bb4e285f6ed74c209f836552ac3cc01c7822c6

Files: e9bb4e285f6ed74c209f836552ac3cc01c7822c6 / sbot.js

6786 bytesRaw
1var pull = require('pull-stream')
2var defer = require('pull-defer')
3var { Value, onceTrue, watch, Set: MutantSet } = require('mutant')
4var ref = require('ssb-ref')
5var Reconnect = require('pull-reconnect')
6var createClient = require('ssb-client')
7var createFeed = require('ssb-feed')
8var nest = require('depnest')
9var ssbKeys = require('ssb-keys')
10var flat = require('flat')
11
12exports.needs = nest({
13 'config.sync.load': 'first',
14 'keys.sync.load': 'first',
15 'sbot.obs.connectionStatus': 'first',
16 'sbot.hook.publish': 'map'
17})
18
19exports.gives = {
20 sbot: {
21 sync: {
22 cache: true
23 },
24 async: {
25 get: true,
26 publish: true,
27 addBlob: true,
28 gossipConnect: true,
29 friendsGet: true
30 },
31 pull: {
32 log: true,
33 userFeed: true,
34 messagesByType: true,
35 feed: true,
36 links: true,
37 backlinks: true,
38 stream: true
39 },
40 obs: {
41 connectionStatus: true,
42 connection: true,
43 connectedPeers: true,
44 localPeers: true
45 }
46 }
47}
48
49exports.create = function (api) {
50 const config = api.config.sync.load()
51 const keys = api.keys.sync.load()
52 var cache = {}
53
54 var sbot = null
55 var connection = Value()
56 var connectionStatus = Value()
57 var connectedPeers = MutantSet()
58 var localPeers = MutantSet()
59
60 var rec = Reconnect(function (isConn) {
61 function notify (value) {
62 isConn(value); connectionStatus.set(value)
63 }
64
65 var opts = {
66 path: config.path,
67 remote: config.remote,
68 host: config.host,
69 port: config.port,
70 key: config.key,
71 appKey: config.caps.shs,
72 timers: config.timers,
73 caps: config.caps,
74 friends: config.friends
75 }
76
77 createClient(keys, opts, function (err, _sbot) {
78 if (err) {
79 return notify(err)
80 }
81
82 sbot = _sbot
83 sbot.on('closed', function () {
84 sbot = null
85 connection.set(null)
86 notify(new Error('closed'))
87 })
88
89 connection.set(sbot)
90 notify()
91 })
92 })
93
94 var internal = {
95 getLatest: rec.async(function (id, cb) {
96 sbot.getLatest(id, cb)
97 }),
98 add: rec.async(function (msg, cb) {
99 sbot.add(msg, cb)
100 })
101 }
102
103 setInterval(function () {
104 if (sbot) {
105 sbot.gossip.peers((err, peers) => {
106 if (err) return console.error(err)
107 localPeers.set(peers.filter(x => x.source === 'local').map(x => x.key))
108 connectedPeers.set(peers.filter(x => x.state === 'connected').map(x => x.key))
109 })
110 }
111 }, 1000 * 60)
112
113 watch(connection, (sbot) => {
114 if (sbot) {
115 sbot.gossip.peers((err, peers) => {
116 if (err) return console.error(err)
117 connectedPeers.set(peers.filter(x => x.state === 'connected').map(x => x.key))
118 localPeers.set(peers.filter(x => x.source === 'local').map(x => x.key))
119 })
120 pull(
121 sbot.gossip.changes(),
122 pull.drain(data => {
123 if (data.peer) {
124 if (data.type === 'remove' || data.type === 'disconnect') {
125 connectedPeers.delete(data.peer.key)
126 } else {
127 if (data.peer.source === 'local') {
128 localPeers.add(data.peer.key)
129 }
130 if (data.peer.state === 'connected') {
131 connectedPeers.add(data.peer.key)
132 } else {
133 connectedPeers.delete(data.peer.key)
134 }
135 }
136 }
137 })
138 )
139 }
140 })
141
142 var feed = createFeed(internal, keys, {remote: true})
143
144 return {
145 sbot: {
146 sync: {
147 cache: () => cache
148 },
149 async: {
150 get: rec.async(function (key, cb) {
151 if (typeof cb !== 'function') {
152 throw new Error('cb must be function')
153 }
154 if (cache[key]) cb(null, cache[key])
155 else {
156 sbot.get(key, function (err, value) {
157 if (err) return cb(err)
158 runHooks({key, value})
159 cb(null, value)
160 })
161 }
162 }),
163 publish: rec.async((content, cb) => {
164 if (content.recps) {
165 content = ssbKeys.box(content, content.recps.map(e => {
166 return ref.isFeed(e) ? e : e.link
167 }))
168 } else {
169 var flatContent = flat(content)
170 Object.keys(flatContent).forEach(key => {
171 var val = flatContent[key]
172 if (ref.isBlob(val)) {
173 sbot.blobs.push(val, err => {
174 if (err) console.error(err)
175 })
176 }
177 })
178 }
179
180 if (sbot) {
181 // instant updating of interface (just incase sbot is busy)
182 runHooks({
183 publishing: true,
184 timestamp: Date.now(),
185 value: {
186 timestamp: Date.now(),
187 author: keys.id,
188 content
189 }
190 })
191 }
192
193 feed.add(content, (err, msg) => {
194 if (err) console.error(err)
195 else if (!cb) console.log(msg)
196 cb && cb(err, msg)
197 })
198 }),
199 addBlob: rec.async((stream, cb) => {
200 return pull(
201 stream,
202 sbot.blobs.add(cb)
203 )
204 }),
205 gossipConnect: rec.async(function (opts, cb) {
206 sbot.gossip.connect(opts, cb)
207 }),
208 friendsGet: rec.async(function (opts, cb) {
209 sbot.friends.get(opts, cb)
210 })
211 },
212 pull: {
213 backlinks: rec.source(query => {
214 return sbot.backlinks.read(query)
215 }),
216 userFeed: rec.source(opts => {
217 return sbot.createUserStream(opts)
218 }),
219 messagesByType: rec.source(opts => {
220 return sbot.messagesByType(opts)
221 }),
222 feed: rec.source(function (opts) {
223 return pull(
224 sbot.createFeedStream(opts),
225 pull.through(runHooks)
226 )
227 }),
228 log: rec.source(opts => {
229 return pull(
230 sbot.createLogStream(opts),
231 pull.through(runHooks)
232 )
233 }),
234 links: rec.source(function (query) {
235 return sbot.links(query)
236 }),
237 stream: function (fn) {
238 var stream = defer.source()
239 onceTrue(connection, function (connection) {
240 stream.resolve(fn(connection))
241 })
242 return stream
243 }
244 },
245 obs: {
246 connectionStatus: (listener) => connectionStatus(listener),
247 connection,
248 connectedPeers: () => connectedPeers,
249 localPeers: () => localPeers
250 }
251 }
252 }
253
254 // scoped
255
256 function runHooks (msg) {
257 if (msg.publishing) {
258 api.sbot.hook.publish(msg)
259 } else if (!cache[msg.key]) {
260 // cache[msg.key] = msg.value
261 // api.sbot.hook.feed(msg)
262 }
263 }
264}
265

Built with git-ssb-web