git ssb

7+

dinoworm ๐Ÿ› / patchcore



Tree: c2515fc07d5df937b0ab574f42b22ed788831215

Files: c2515fc07d5df937b0ab574f42b22ed788831215 / sbot.js

6343 bytesRaw
1var pull = require('pull-stream')
2var defer = require('pull-defer')
3var { Value, onceTrue, watch, Set: MutantSet } = require('mutant')
4var ref = require('ssb-ref')
5var Reconnect = require('pull-reconnect')
6var createClient = require('ssb-client')
7var createFeed = require('ssb-feed')
8var nest = require('depnest')
9var ssbKeys = require('ssb-keys')
10
11exports.needs = nest({
12 'config.sync.load': 'first',
13 'keys.sync.load': 'first',
14 'sbot.obs.connectionStatus': 'first',
15 'sbot.hook.publish': 'map'
16})
17
18exports.gives = {
19 sbot: {
20 sync: {
21 cache: true
22 },
23 async: {
24 get: true,
25 publish: true,
26 addBlob: true,
27 gossipConnect: true,
28 friendsGet: true
29 },
30 pull: {
31 log: true,
32 userFeed: true,
33 messagesByType: true,
34 feed: true,
35 links: true,
36 backlinks: true,
37 stream: true
38 },
39 obs: {
40 connectionStatus: true,
41 connection: true,
42 connectedPeers: true,
43 localPeers: true
44 }
45 }
46}
47
48exports.create = function (api) {
49 const config = api.config.sync.load()
50 const keys = api.keys.sync.load()
51 var cache = {}
52
53 var sbot = null
54 var connection = Value()
55 var connectionStatus = Value()
56 var connectedPeers = MutantSet()
57 var localPeers = MutantSet()
58
59 var rec = Reconnect(function (isConn) {
60 function notify (value) {
61 isConn(value); connectionStatus.set(value)
62 }
63
64 var opts = {
65 path: config.path,
66 remote: config.remote,
67 host: config.host,
68 port: config.port,
69 key: config.key,
70 appKey: config.caps.shs,
71 timers: config.timers
72 }
73
74 createClient(keys, opts, function (err, _sbot) {
75 if (err) {
76 return notify(err)
77 }
78
79 sbot = _sbot
80 sbot.on('closed', function () {
81 sbot = null
82 connection.set(null)
83 notify(new Error('closed'))
84 })
85
86 connection.set(sbot)
87 notify()
88 })
89 })
90
91 var internal = {
92 getLatest: rec.async(function (id, cb) {
93 sbot.getLatest(id, cb)
94 }),
95 add: rec.async(function (msg, cb) {
96 sbot.add(msg, cb)
97 })
98 }
99
100 watch(connection, (sbot) => {
101 if (sbot) {
102 sbot.gossip.peers((err, peers) => {
103 if (err) return console.error(err)
104 connectedPeers.set(peers.filter(x => x.state === 'connected').map(x => x.key))
105 localPeers.set(peers.filter(x => x.source === 'local').map(x => x.key))
106 })
107 pull(
108 sbot.gossip.changes(),
109 pull.drain(data => {
110 if (data.peer) {
111 if (data.type === 'remove') {
112 connectedPeers.delete(data.peer.key)
113 localPeers.delete(data.peer.key)
114 } else {
115 if (data.peer.source === 'local') {
116 localPeers.add(data.peer.key)
117 }
118 if (data.peer.state === 'connected') {
119 connectedPeers.add(data.peer.key)
120 } else {
121 connectedPeers.delete(data.peer.key)
122 }
123 }
124 }
125 })
126 )
127 }
128 })
129
130 var feed = createFeed(internal, keys, {remote: true})
131
132 return {
133 sbot: {
134 sync: {
135 cache: () => cache
136 },
137 async: {
138 get: rec.async(function (key, cb) {
139 if (typeof cb !== 'function') {
140 throw new Error('cb must be function')
141 }
142 if (cache[key]) cb(null, cache[key])
143 else {
144 sbot.get(key, function (err, value) {
145 if (err) return cb(err)
146 runHooks({key, value})
147 cb(null, value)
148 })
149 }
150 }),
151 publish: rec.async((content, cb) => {
152 if (content.recps) {
153 content = ssbKeys.box(content, content.recps.map(e => {
154 return ref.isFeed(e) ? e : e.link
155 }))
156 } else if (content.mentions) {
157 content.mentions.forEach(mention => {
158 if (ref.isBlob(mention.link)) {
159 sbot.blobs.push(mention.link, err => {
160 if (err) console.error(err)
161 })
162 }
163 })
164 }
165
166 if (sbot) {
167 // instant updating of interface (just incase sbot is busy)
168 runHooks({
169 publishing: true,
170 timestamp: Date.now(),
171 value: {
172 timestamp: Date.now(),
173 author: keys.id,
174 content
175 }
176 })
177 }
178
179 feed.add(content, (err, msg) => {
180 if (err) console.error(err)
181 else if (!cb) console.log(msg)
182 cb && cb(err, msg)
183 })
184 }),
185 addBlob: rec.async((stream, cb) => {
186 return pull(
187 stream,
188 sbot.blobs.add(cb)
189 )
190 }),
191 gossipConnect: rec.async(function (opts, cb) {
192 sbot.gossip.connect(opts, cb)
193 }),
194 friendsGet: rec.async(function (opts, cb) {
195 sbot.friends.get(opts, cb)
196 })
197 },
198 pull: {
199 backlinks: rec.source(query => {
200 return sbot.backlinks.read(query)
201 }),
202 userFeed: rec.source(opts => {
203 return sbot.createUserStream(opts)
204 }),
205 messagesByType: rec.source(opts => {
206 return sbot.messagesByType(opts)
207 }),
208 feed: rec.source(function (opts) {
209 return pull(
210 sbot.createFeedStream(opts),
211 pull.through(runHooks)
212 )
213 }),
214 log: rec.source(opts => {
215 return pull(
216 sbot.createLogStream(opts),
217 pull.through(runHooks)
218 )
219 }),
220 links: rec.source(function (query) {
221 return sbot.links(query)
222 }),
223 stream: function (fn) {
224 var stream = defer.source()
225 onceTrue(connection, function (connection) {
226 stream.resolve(fn(connection))
227 })
228 return stream
229 }
230 },
231 obs: {
232 connectionStatus: (listener) => connectionStatus(listener),
233 connection,
234 connectedPeers: () => connectedPeers,
235 localPeers: () => localPeers
236 }
237 }
238 }
239
240 // scoped
241
242 function runHooks (msg) {
243 if (msg.publishing) {
244 api.sbot.hook.publish(msg)
245 } else if (!cache[msg.key]) {
246 // cache[msg.key] = msg.value
247 // api.sbot.hook.feed(msg)
248 }
249 }
250}
251

Built with git-ssb-web