git ssb

0+

farewellutopia-dev / deno-ssb-experiments



Commit abe042a1d885ce5ec68425fb45227fd111689725

comm/agents splitting

Reto Gmür committed on 9/4/2021, 4:56:47 PM
Parent: 5cae72632c2f11c96732e27126b88ba284d8281b

Files changed

.devcontainer/Dockerfilechanged
Dockerfilechanged
README.mdchanged
util.tschanged
BoxConnection.tsdeleted
NetTransport.tsdeleted
Procedures.tsdeleted
ScuttlebuttHost.tsadded
RPCConnection.tsdeleted
agents/Agent.tsadded
agents/blobs/BlobsAgent.tsadded
agents/feeds/FeedsAgent.tsadded
agents/feeds/feedSubscriptions.tsadded
ScuttlebuttBoxPeer.tsdeleted
comm/CommInterface.tsadded
comm/README.mdadded
comm/box/BoxConnection.tsadded
comm/box/BoxInterface.tsadded
comm/rpc/RpcConnection.tsadded
comm/rpc/RpcInterface.tsadded
comm/rpc/RpcMethodsHandler.tsadded
comm/rpc/types.tsadded
comm/transport/NetTransport.tsadded
comm/transport/Transport.tsadded
ScuttlebuttRpcPeer.tsdeleted
Transport.tsdeleted
feedSubscriptions.tsdeleted
main.tsadded
play.tsdeleted
run.tsdeleted
.devcontainer/DockerfileView
@@ -1,5 +1,5 @@
1-FROM denoland/deno:1.12.2
1 +FROM denoland/deno:1.13.2
22 RUN apt update
33 RUN yes | apt install git
44 RUN mkdir /home/deno && chown deno /home/deno
55 USER deno
DockerfileView
@@ -1,8 +1,8 @@
1-FROM denoland/deno:1.12.2
1 +FROM denoland/deno:1.13.2
22 RUN mkdir /home/deno && chown deno /home/deno
33 USER deno
44 ENV DENO_DIR=/home/deno/.cache/deno
55 COPY ./ /home/deno/application
66 WORKDIR /home/deno/application
77 RUN deno cache --unstable run.ts
8-CMD ["run", "-A", "--unstable", "run.ts"]
8 +CMD ["run", "-A", "--unstable", "main.ts"]
README.mdView
@@ -10,19 +10,17 @@
1010 code is currently a subordinate goal.
1111
1212 ## Usage
1313
14-There are currently two executables.
14 +The main executables is `main.ts`, executing it with
1515
16-play.ts: This establishes a connection to a an address specified as the first
17-argument, requests the feed specified in the second argument, or the main feed
18-of the address if no second argument is given, and saves all received messages
19-in a folder in `data/feeds`. For example the following command will store all
20-messages of `@2NANnQVdsoqk0XPiJG2oMZqaEpTeoGrxOHJkLIqs7eY=.ed255` it gets in the
16 + deno run --unstable -A main.ts
17 +
18 +will start a host interacting on the Scuttlebut network according to the configuration
19 +files in `~/.ssb`. By default feeds are stored in `~/.ssb/data/feeds`, one folder per feed, e.g. all
20 +messages of `@2NANnQVdsoqk0XPiJG2oMZqaEpTeoGrxOHJkLIqs7eY=.ed255` are in the
2121 folder `data/feeds/2NANnQVdsoqk0XPiJG2oMZqaEpTeoGrxOHJkLIqs7eY=/`:
2222
23- deno run -A play.ts "net:gossip.noisebridge.info:8008~shs:2NANnQVdsoqk0XPiJG2oMZqaEpTeoGrxOHJkLIqs7eY="
24-
2523 FindPeers: listens for peers announcing themselves with UDP broadcast.
2624
2725 deno run --unstable -A FindPeers.ts
2826
util.tsView
@@ -247,5 +247,5 @@
247247 }
248248 // no await here - see https://github.com/tc39/proposal-async-iteration/issues/126
249249 }
250250 return results;
251-}
251 +}
BoxConnection.tsView
@@ -1,172 +1,0 @@
1-import sodium from "https://deno.land/x/sodium@0.2.0/sumo.ts";
2-import { concat, FeedId, isZero, log, readBytes } from "./util.ts";
3-import config from "./config.ts";
4-
5-export default class BoxConnection extends EventTarget
6- implements Deno.Reader, Deno.Writer, Deno.Closer {
7- closed = false;
8- serverToClientKey: Uint8Array;
9- clientToServerKey: Uint8Array;
10- serverToClientNonce: Uint8Array;
11- clientToServerNonce: Uint8Array;
12- peer: FeedId;
13- constructor(
14- public conn: Deno.Reader & Deno.Writer & Deno.Closer,
15- combinedSharedSecret: Uint8Array,
16- ourLongTermPublicKey: Uint8Array,
17- theirLongTermPublicKey: Uint8Array,
18- ourEphemeralPublicKey: Uint8Array,
19- theirEphemeralTermPublicKey: Uint8Array,
20- ) {
21- super();
22- this.peer = new FeedId(theirLongTermPublicKey);
23- this.serverToClientKey = sodium.crypto_hash_sha256(
24- concat(
25- combinedSharedSecret,
26- ourLongTermPublicKey,
27- ),
28- );
29-
30- this.clientToServerKey = sodium.crypto_hash_sha256(
31- concat(
32- combinedSharedSecret,
33- theirLongTermPublicKey,
34- ),
35- );
36-
37- this.serverToClientNonce = sodium.crypto_auth(
38- ourEphemeralPublicKey,
39- config.networkIdentifier,
40- ).slice(0, 24);
41- this.clientToServerNonce = sodium.crypto_auth(
42- theirEphemeralTermPublicKey,
43- config.networkIdentifier,
44- ).slice(0, 24);
45- }
46-
47- pendingData: Uint8Array | null = null;
48-
49- async read(p: Uint8Array): Promise<number | null> {
50- if (!this.pendingData) {
51- const chunk = await this.readChunk();
52- if (chunk === null) {
53- return null;
54- }
55- if (!this.pendingData) {
56- this.pendingData = chunk;
57- } else {
58- //race condition
59- this.pendingData = concat(this.pendingData, chunk);
60- }
61- }
62- //TODO merge metods to avoid copying data
63- if (this.pendingData.length < p.length) {
64- p.set(this.pendingData);
65- const result = this.pendingData.length;
66- this.pendingData = null;
67- return result;
68- } else {
69- p.set(this.pendingData.subarray(0, p.length));
70- this.pendingData = this.pendingData.subarray(p.length);
71- return p.length;
72- }
73- }
74-
75- /** Gets the next chunk (box/body) of data*/
76- async readChunk() {
77- try {
78- const headerBox = await readBytes(this.conn, 34);
79- const header = sodium.crypto_box_open_easy_afternm(
80- headerBox,
81- this.serverToClientNonce,
82- this.serverToClientKey,
83- );
84- increment(this.serverToClientNonce);
85- if (isZero(header)) {
86- //they said goodbye
87- this.close();
88- return null;
89- }
90- const bodyLength = header[0] * 0x100 + header[1];
91- const authenticationBodyTag = header.slice(2);
92- const encryptedBody = await readBytes(this.conn, bodyLength);
93- const decodedBody = sodium.crypto_box_open_easy_afternm(
94- concat(authenticationBodyTag, encryptedBody),
95- this.serverToClientNonce,
96- this.serverToClientKey,
97- );
98- increment(this.serverToClientNonce);
99- //log.debug("Read " + decodedBody);
100- return decodedBody;
101- } catch (error) {
102- if (!this.closed) {
103- this.close();
104- }
105- if (error.message.startsWith("End of reader")) {
106- log.info("End of reader, closing.");
107- }
108- throw error;
109- }
110- }
111-
112- async write(message: Uint8Array) {
113- //log.debug("Writing " + message);
114- const headerNonce = new Uint8Array(this.clientToServerNonce);
115- increment(this.clientToServerNonce);
116- const bodyNonce = new Uint8Array(this.clientToServerNonce);
117- increment(this.clientToServerNonce);
118- const encryptedMessage = sodium.crypto_box_easy_afternm(
119- message,
120- bodyNonce,
121- this.clientToServerKey,
122- );
123- const messageLengh = message.length;
124- const messageLenghUiA = new Uint8Array([
125- messageLengh >> 8,
126- messageLengh & 0xFF,
127- ]);
128- const authenticationBodyTag = encryptedMessage.slice(0, 16);
129- const encryptedHeader = sodium.crypto_box_easy_afternm(
130- concat(messageLenghUiA, authenticationBodyTag),
131- headerNonce,
132- this.clientToServerKey,
133- );
134-
135- await this.conn.write(concat(encryptedHeader, encryptedMessage.slice(16)));
136- return messageLengh;
137- }
138- async close() {
139- if (this.closed) {
140- log.warning(`Connection closed already.`);
141- return;
142- }
143- this.closed = true;
144- this.dispatchEvent(new CustomEvent("close"));
145- const byeMessage = sodium.crypto_box_easy_afternm(
146- new Uint8Array(18),
147- this.clientToServerNonce,
148- this.clientToServerKey,
149- );
150- try {
151- await this.conn.write(byeMessage);
152- } catch (error) {
153- log.debug(`Failed at properly bidding goodbye: ${error}`);
154- }
155- this.conn.close();
156- }
157-}
158-
159-function increment(bytes: Uint8Array) {
160- let pos = bytes.length - 1;
161- while (true) {
162- bytes[pos]++;
163- if (bytes[pos] === 0) {
164- pos--;
165- if (pos < 0) {
166- return;
167- }
168- } else {
169- return;
170- }
171- }
172-}
NetTransport.tsView
@@ -1,23 +1,0 @@
1-import Transport from "./Transport.ts";
2-import { Address, combine } from "./util.ts";
3-export default class NetTransport implements Transport {
4- listeners: AsyncIterable<Deno.Reader & Deno.Writer & Deno.Closer>[] = [];
5- [Symbol.asyncIterator](): AsyncIterator<Deno.Reader & Deno.Writer & Deno.Closer> {
6- return combine(...this.listeners)[Symbol.asyncIterator]();
7- }
8- protocol = "net";
9- async connect(
10- addr: Address,
11- ): Promise<Deno.Reader & Deno.Writer & Deno.Closer> {
12- return await Deno.connect({
13- hostname: addr.host,
14- port: addr.port,
15- });
16- }
17- listen(
18- options: { port: number } & Record<string, unknown> = { port: 8008 },
19- ) {
20- this.listeners.push(Deno.listen(options));
21- return new Promise<void>(() => {})
22- }
23-}
Procedures.tsView
@@ -1,71 +1,0 @@
1-import { RequestHandler } from "./RPCConnection.ts";
2-import * as FSStorage from "./fsStorage.ts";
3-import { log, parseFeedId, path } from "./util.ts";
4-
5-type sourceProcedure = (
6- args: Record<string, string>[],
7-) => AsyncIterator<Record<string, unknown> | string | Uint8Array>;
8-type rpcContext = Record<string, sourceProcedure>;
9-
10-/** An RPC request handler providing default procedured based on FSStorage */
11-export default class Procedures implements RequestHandler {
12- rootContext: rpcContext = {
13- createHistoryStream: async function* (args: Record<string, string>[]) {
14- const opts = args[0];
15- const feedKey = parseFeedId(opts.id);
16- let seq = Number.parseInt(opts.seq);
17- //log.info(`got request for ${feedKey} with seq: ${seq}`);
18- //console.log(`"@${feedKey}.ed25519",`)
19- const lastMessage = await FSStorage.lastMessage(feedKey);
20- while (seq < lastMessage) {
21- const fileName = path.join(
22- FSStorage.getFeedDir(feedKey),
23- (seq++) + ".json",
24- );
25- try {
26- const parsedFile = JSON.parse(
27- await Deno.readTextFile(fileName),
28- );
29- if (opts.keys === undefined || opts.keys) {
30- yield parsedFile as string | Record<string, unknown> | Uint8Array;
31- } else {
32- yield parsedFile.value as
33- | string
34- | Record<string, unknown>
35- | Uint8Array;
36- }
37- } catch (error) {
38- if (error instanceof Deno.errors.NotFound) {
39- log.debug(`File ${fileName} not found`);
40- }
41- }
42- }
43- },
44- };
45-
46- namedContext: Record<string, rpcContext> = {
47- blobs: {
48- createWants: async function* (_args: Record<string, string>[]) {
49- yield {};
50- },
51- },
52- };
53-
54- handleSourceRequest(
55- names: string[],
56- args: Record<string, string>[],
57- ) {
58- const context = names.length > 1
59- ? this.namedContext[names.shift()!]
60- : this.rootContext;
61- if (context && context[names[0]]) {
62- return context[names[0]](args);
63- } else {
64- return (async function* () {})() as AsyncIterator<
65- string | Record<string, unknown> | Uint8Array,
66- unknown,
67- undefined
68- >;
69- }
70- }
71-}
ScuttlebuttHost.tsView
@@ -1,0 +1,69 @@
1 +import Transport from "./comm/transport/Transport.ts";
2 +import NetTransport from "./comm/transport/NetTransport.ts";
3 +import BoxInterface from "./comm/box/BoxInterface.ts";
4 +import RpcInterface from "./comm/rpc/RpcInterface.ts";
5 +import RpcMethodsHandler from "./comm/rpc/RpcMethodsHandler.ts";
6 +import { FeedId, log } from "./util.ts";
7 +import Agent from "./agents/Agent.ts";
8 +import FeedsAgent from "./agents/feeds/FeedsAgent.ts";
9 +
10 +/** A host communicating to peers using the Secure Scuttlebutt protocol */
11 +export default class ScuttlebuttHost {
12 + readonly transports = new Map<string, Transport>();
13 + private agents: Agent[] = [];
14 +
15 + constructor(readonly config: Record<string, unknown>) {
16 + const options = config.port
17 + ? {
18 + port: config.port as number,
19 + }
20 + : undefined;
21 + this.addTransport(
22 + new NetTransport(options),
23 + );
24 +
25 + if (config.autoConnectLocalPeers) {
26 + /* for await (const peer of udpPeerDiscoverer) {
27 + if (
28 + JSON.stringify(peerAddresses.get(peer.hostname)) !==
29 + JSON.stringify(peer.addresses)
30 + ) {
31 + peerAddresses.set(peer.hostname, peer.addresses);
32 + console.log(peer.addresses);
33 + //TODO check if bcm already has connection to peer, otherwise connect.
34 + }
35 + }
36 + */
37 + }
38 + }
39 +
40 + addTransport(transport: Transport) {
41 + this.transports.set(transport.protocol, transport);
42 + }
43 +
44 + async start() {
45 + log.info(`Starting SSB Host`);
46 + const boxInterface = new BoxInterface([...this.transports.values()]);
47 + //there are incoming connections, connections established explicitely by user, connections initiated by the feeds- or blobs-subsystem
48 + //incoming procedures call are handled by a RequestHandler provided by the subsystem for a specific peer
49 + //subsystem can send request over any connection and are notified on new connectins
50 + const rpcInterface = new RpcInterface(
51 + (feedId: FeedId) =>
52 + new RpcMethodsHandler(
53 + this.agents.map((agent) => agent.createRpcContext(feedId)),
54 + ),
55 + boxInterface,
56 + );
57 + this.agents = getAgents(rpcInterface);
58 + this.agents.forEach((agent) => agent.run());
59 + for await (const rpcConnection of rpcInterface.listen()) {
60 + Promise.all(
61 + this.agents.map((agent) => agent.incomingConnection(rpcConnection)),
62 + );
63 + }
64 + }
65 +}
66 +
67 +function getAgents(rpcInterface: RpcInterface) {
68 + return [new FeedsAgent(rpcInterface)];
69 +}
RPCConnection.tsView
@@ -1,348 +1,0 @@
1-import BoxConnection from "./BoxConnection.ts";
2-import {
3- bytes2NumberSigned,
4- bytes2NumberUnsigned,
5- concat,
6- delay,
7- isZero,
8- log,
9- readBytes,
10-} from "./util.ts";
11-
12-const textDecoder = new TextDecoder();
13-const textEncoder = new TextEncoder();
14-
15-export enum RpcBodyType {
16- binary = 0b00,
17- utf8 = 0b01,
18- json = 0b10,
19-}
20-
21-export interface ResponseStream {
22- read: () => Record<string, unknown>;
23-}
24-
25-export type Header = {
26- partOfStream: boolean;
27- endOrError: boolean;
28- bodyType: RpcBodyType;
29- bodyLength: number;
30- requestNumber: number;
31-};
32-
33-export class EndOfStream extends Error {
34- constructor() {
35- super("Stream ended");
36- }
37-}
38-
39-export interface RequestHandler {
40- handleSourceRequest: (
41- name: string[],
42- args: Record<string, string>[],
43- ) => AsyncIterator<Record<string, unknown> | string | Uint8Array>;
44-}
45-
46-function parseHeader(
47- header: Uint8Array,
48-): Header {
49- const flags = header[0];
50- const partOfStream = !!(flags & 0b1000);
51- const endOrError = !!(flags & 0b100);
52- const bodyType: RpcBodyType = flags & 0b11;
53- const bodyLength = bytes2NumberUnsigned(header.subarray(1, 5));
54- const requestNumber = bytes2NumberSigned(header.subarray(5));
55- return { partOfStream, endOrError, bodyType, bodyLength, requestNumber };
56-}
57-
58-/** parses a message according to bodyType */
59-const parse = (message: Uint8Array, bodyType: RpcBodyType) =>
60- (bodyType === RpcBodyType.json
61- ? JSON.parse(textDecoder.decode(message))
62- : bodyType === RpcBodyType.utf8
63- ? textDecoder.decode(message)
64- : message) as Record<string, unknown> | string | Uint8Array;
65-
66-let lastAnswer = Date.now();
67-let lastActivity = Date.now();
68-
69-export default class RPCConnection {
70- constructor(
71- public boxConnection: BoxConnection,
72- public requestHandler: RequestHandler,
73- {
74- answerTimeout = 300,
75- activityTimeout = 60,
76- }: {
77- answerTimeout?: number;
78- activityTimeout?: number;
79- } = {},
80- ) {
81- this.requestCounter = 0;
82- const monitorConnection = async () => {
83- try {
84- while (!this.boxConnection.closed) {
85- const headerBytes = await readBytes(boxConnection, 9);
86- lastActivity = Date.now();
87- if (isZero(headerBytes)) {
88- log.debug("They said godbye.");
89- break;
90- }
91- const header = parseHeader(headerBytes);
92- if (header.bodyLength === 0) {
93- throw new Error("Got RPC message with lentgh 0.");
94- }
95- const body = await readBytes(boxConnection, header.bodyLength);
96- lastActivity = Date.now();
97- if (header.requestNumber < 0) {
98- const listener = this.responseStreamListeners.get(
99- -header.requestNumber,
100- );
101- if (!listener) {
102- throw new Error(
103- `Got request with unexpected number ${header.requestNumber}`,
104- );
105- }
106- lastAnswer = Date.now();
107- listener(body, header);
108- } else {
109- const parse = () => {
110- const decoded = textDecoder.decode(body);
111- try {
112- return JSON.parse(decoded);
113- } catch (error) {
114- log.error(
115- `Parsing ${decoded} in request ${JSON.stringify(header)}`,
116- );
117- throw error;
118- }
119- };
120- const request = parse();
121- if (this.requestHandler) {
122- if (request.type === "source") {
123- const responseIterator = this.requestHandler
124- .handleSourceRequest(request.name, request.args);
125- (async () => {
126- for await (
127- const value of {
128- [Symbol.asyncIterator]: () => responseIterator,
129- }
130- ) {
131- log.debug(() => "sending back " + JSON.stringify(value));
132- try {
133- await this.sendRpcMessage(value, {
134- isStream: true,
135- inReplyTo: header.requestNumber,
136- });
137- } catch (error) {
138- log.error(
139- `Error sending back ${JSON.stringify(value)}: ${error}`,
140- );
141- }
142- }
143- })();
144- } else {
145- log.info(
146- `Request type ${request.type} not yet supported. Ignoring request number ${header.requestNumber}: ${
147- textDecoder.decode(body)
148- }`,
149- );
150- }
151- } else {
152- log.info(
153- `No handler to handle request number ${header.requestNumber}: ${
154- textDecoder.decode(body)
155- }`,
156- );
157- }
158- }
159- }
160- } catch (e) {
161- if (boxConnection.closed) {
162- log.info("Connection closed");
163- } else {
164- if ((e.name === "Interrupted") || (e.name === "ConnectionReset")) {
165- // ignore
166- log.info(`RPCConnection ${e.name}`);
167- } else {
168- throw e;
169- }
170- }
171- }
172- };
173- monitorConnection();
174- const checkTimeout = async () => {
175- while (!this.boxConnection.closed) {
176- await delay(5000);
177- const timeSinceRead = Date.now() - lastAnswer;
178- if (timeSinceRead > answerTimeout * 1000) {
179- log.info(
180- `RPCConnection readTimeout: ${timeSinceRead /
181- 1000} seconds since last response was received.`,
182- );
183- this.boxConnection.close();
184- break;
185- }
186- const timeSinceActivity = Date.now() - lastActivity;
187- if (timeSinceActivity > activityTimeout * 1000) {
188- log.info(
189- `RPCConnection activityTimeout: ${timeSinceActivity /
190- 1000} seconds since last data was read.`,
191- );
192- this.boxConnection.close();
193- break;
194- }
195- }
196- };
197- checkTimeout();
198- }
199- private responseStreamListeners: Map<
200- number,
201- ((message: Uint8Array, header: Header) => void)
202- > = new Map();
203- sendSourceRequest = async (request: {
204- name: string[];
205- args: unknown;
206- }) => {
207- const requestNumber = await this.sendRpcMessage({
208- name: request.name,
209- args: request.args,
210- "type": "source",
211- }, {
212- bodyType: RpcBodyType.json,
213- isStream: true,
214- });
215- const buffer: [Uint8Array, Header][] = [];
216- const bufferer = (message: Uint8Array, header: Header) => {
217- buffer.push([message, header]);
218- };
219- this.responseStreamListeners.set(requestNumber, bufferer);
220- return { //TODO return AsyncIterator instead
221- read: () => {
222- if (buffer.length > 0) {
223- const [message, header] = buffer.shift() as [Uint8Array, Header];
224- if (!header.endOrError) {
225- return Promise.resolve(parse(message, header.bodyType));
226- } else {
227- const endMessage = textDecoder.decode(message);
228- if (endMessage === "true") {
229- return Promise.reject(new EndOfStream());
230- } else {
231- return Promise.reject(new Error(endMessage));
232- }
233- }
234- } else {
235- return new Promise<Record<string, unknown> | string | Uint8Array>(
236- (resolve, reject) => {
237- this.responseStreamListeners.set(
238- requestNumber,
239- (message: Uint8Array, header: Header) => {
240- if (!header.endOrError) {
241- this.responseStreamListeners.set(requestNumber, bufferer);
242- resolve(parse(message, header.bodyType));
243- } else {
244- const endMessage = textDecoder.decode(message);
245- if (endMessage === "true") {
246- reject(new EndOfStream());
247- } else {
248- reject(
249- new Error(
250- `On connectiion with ${this.boxConnection}: ${endMessage}`,
251- ),
252- );
253- }
254- }
255- },
256- );
257- },
258- );
259- }
260- },
261- };
262- };
263- sendAsyncRequest = async (request: {
264- name: string[];
265- args: unknown;
266- }) => {
267- const requestNumber = await this.sendRpcMessage({
268- name: request.name,
269- args: request.args,
270- "type": "async",
271- }, {
272- bodyType: RpcBodyType.json,
273- isStream: false,
274- });
275- return new Promise((resolve, reject) => {
276- this.responseStreamListeners.set(
277- requestNumber,
278- (message: Uint8Array, header: Header) => {
279- this.responseStreamListeners.delete(requestNumber);
280- if (!header.endOrError) {
281- resolve(parse(message, header.bodyType));
282- } else {
283- reject(new Error(textDecoder.decode(message)));
284- }
285- },
286- );
287- });
288- };
289- private requestCounter;
290- private sendRpcMessage = async (
291- body: Record<string, unknown> | string | Uint8Array,
292- options: {
293- isStream?: boolean;
294- endOrError?: boolean;
295- bodyType?: RpcBodyType;
296- inReplyTo?: number;
297- } = {},
298- ) => {
299- function isUint8Array(
300- v: Record<string, unknown> | string | Uint8Array,
301- ): v is Uint8Array {
302- return v.constructor.prototype === Uint8Array.prototype;
303- }
304- function isString(
305- v: Record<string, unknown> | string | Uint8Array,
306- ): v is string {
307- return v.constructor.prototype === String.prototype;
308- }
309- const getPayload = () => {
310- if (isUint8Array(body)) {
311- if (!options.bodyType) options.bodyType = RpcBodyType.binary;
312- return body;
313- }
314- if (isString(body)) {
315- if (!options.bodyType) options.bodyType = RpcBodyType.utf8;
316- return textEncoder.encode(body);
317- }
318- if (!options.bodyType) options.bodyType = RpcBodyType.json;
319- return textEncoder.encode(JSON.stringify(body));
320- };
321- const payload: Uint8Array = getPayload();
322- const flags = (options.isStream ? 0b1000 : 0) | (options.endOrError
323- ? 0b100
324- : 0) |
325- options.bodyType!;
326- const requestNumber = options.inReplyTo
327- ? options.inReplyTo * -1
328- : ++this.requestCounter;
329- const header = new Uint8Array(9);
330- header[0] = flags;
331- header.set(
332- new Uint8Array(new Uint32Array([payload.length]).buffer).reverse(),
333- 1,
334- );
335- header.set(
336- new Uint8Array(new Uint32Array([requestNumber]).buffer).reverse(),
337- 5,
338- );
339- //writing in one go, to ensure correct order
340- const message = concat(header, payload);
341- try {
342- await this.boxConnection.write(message);
343- } catch (error) {
344- throw new Error(`Failed writing to boxConnection: ${error}.`);
345- }
346- return requestNumber;
347- };
348-}
agents/Agent.tsView
@@ -1,0 +1,19 @@
1 +import RpcConnection from "../comm/rpc/RpcConnection.ts";
2 +import { RpcContext } from "../comm/rpc/types.ts";
3 +import { Address, FeedId } from "../util.ts";
4 +
5 +/** An object handling a sub-protocol, such as Feeds or Blobs */
6 +export default abstract class Agent {
7 + constructor(
8 + public connector: {
9 + connect(address: Address): Promise<RpcConnection>;
10 + },
11 + ) {}
12 + abstract createRpcContext(feedId: FeedId): RpcContext;
13 +
14 + /** Act on incoming connection */
15 + abstract incomingConnection(rpcConnection: RpcConnection): Promise<void>;
16 +
17 + /** Performs the self-initiated actions of this Agent. Note that the Agent may handle requests and act on incoming connection even if this method has not been invoked */
18 + abstract run(): Promise<void>;
19 +}
agents/blobs/BlobsAgent.tsView
@@ -1,0 +1,40 @@
1 +import RpcConnection from "../../comm/rpc/RpcConnection.ts";
2 +import { RpcContext } from "../../comm/rpc/types.ts";
3 +import { BlobId, FeedId, log } from "../../util.ts";
4 +import Agent from "../Agent.ts";
5 +
6 +class BlobWant {
7 + constructor(public blobId: BlobId, public level = -1) {}
8 +}
9 +
10 +export default class BlobsAgent extends Agent {
11 + wantFeeds = new Set<(_: BlobWant) => void>();
12 +
13 + createRpcContext(feedId: FeedId): RpcContext {
14 + const wantFeeds = this.wantFeeds;
15 + const rpcMethods = {
16 + blobs: {
17 + /*has(args: Record<string, string>[]): Promise<boolean> {
18 + log.info(`${feedId} asked about ${args}`);
19 + return Promise.resolve(false);
20 + },*/
21 + async *get(args: Record<string, string>[]) {
22 + log.info(`${feedId} requested ${args}`);
23 + yield new Uint8Array(0);
24 + },
25 + async *createWants(args: Record<string, string>[]) {
26 + log.info(`${feedId} requested ${args}`);
27 + yield {};
28 + wantFeeds.add((_bw: BlobWant) => {/*yield bw */});
29 + },
30 + },
31 + };
32 + return rpcMethods;
33 + }
34 + incomingConnection(_rpcConnection: RpcConnection): Promise<void> {
35 + throw new Error("Method not implemented.");
36 + }
37 + run(): Promise<void> {
38 + throw new Error("Method not implemented.");
39 + }
40 +}
agents/feeds/FeedsAgent.tsView
@@ -1,0 +1,110 @@
1 +import RpcConnection from "../../comm/rpc/RpcConnection.ts";
2 +import { RpcContext } from "../../comm/rpc/types.ts";
3 +import {
4 + Address,
5 + delay,
6 + FeedId,
7 + log,
8 + parseAddress,
9 + parseFeedId,
10 + path,
11 +} from "../../util.ts";
12 +import Agent from "../Agent.ts";
13 +import * as FsStorage from "../../fsStorage.ts";
14 +import { updateFeeds } from "./feedSubscriptions.ts";
15 +import config from "../../config.ts";
16 +
17 +export default class FeedsAgent extends Agent {
18 + createRpcContext(_feedId: FeedId): RpcContext {
19 + const rpcMethods = {
20 + createHistoryStream: async function* (args: Record<string, string>[]) {
21 + const opts = args[0];
22 + const feedKey = parseFeedId(opts.id);
23 + let seq = Number.parseInt(opts.seq);
24 + //log.info(`got request for ${feedKey} with seq: ${seq}`);
25 + //console.log(`"@${feedKey}.ed25519",`)
26 + const lastMessage = await FsStorage.lastMessage(feedKey);
27 + while (seq < lastMessage) {
28 + const fileName = path.join(
29 + FsStorage.getFeedDir(feedKey),
30 + (seq++) + ".json",
31 + );
32 + try {
33 + const parsedFile = JSON.parse(
34 + await Deno.readTextFile(fileName),
35 + );
36 + if (opts.keys === undefined || opts.keys) {
37 + yield parsedFile as string | Record<string, unknown> | Uint8Array;
38 + } else {
39 + yield parsedFile.value as
40 + | string
41 + | Record<string, unknown>
42 + | Uint8Array;
43 + }
44 + } catch (error) {
45 + if (error instanceof Deno.errors.NotFound) {
46 + log.debug(`File ${fileName} not found`);
47 + }
48 + }
49 + }
50 + },
51 + };
52 + return rpcMethods;
53 + }
54 + async incomingConnection(rpcConnection: RpcConnection) {
55 + await updateFeeds(rpcConnection);
56 + }
57 +
58 + async run(): Promise<void> {
59 + const peersFile = path.join(config.baseDir, "peers.json");
60 +
61 + function getPeersFromFile() {
62 + try {
63 + return JSON.parse(Deno.readTextFileSync(peersFile));
64 + } catch (error) {
65 + if (error instanceof Deno.errors.NotFound) {
66 + return [];
67 + }
68 + throw error;
69 + }
70 + }
71 +
72 + function getPeers() {
73 + return getPeersFromFile().map(parseAddress);
74 + }
75 +
76 + const peers: Address[] = getPeers();
77 +
78 + let initialDelaySec = 0;
79 + let onGoingSyncs = 0;
80 + await Promise.all(peers.map((address) =>
81 + (async () => {
82 + initialDelaySec += 10;
83 + await delay(initialDelaySec * 1000);
84 + let minutesDelay = 1;
85 + while (true) {
86 + if (onGoingSyncs > 20) {
87 + log.info("More than 20 connections open, standing by.");
88 + } else {
89 + log.info(
90 + `${onGoingSyncs} connections open, connecting to ${address}`,
91 + );
92 + onGoingSyncs++;
93 + try {
94 + const rpcConnection = await this.connector.connect(address);
95 + await updateFeeds(rpcConnection);
96 + } catch (error) {
97 + log.error(
98 + `In connection with ${address}: ${error}, now having ${onGoingSyncs} connections left`,
99 + );
100 + log.info(`stack: ${error.stack}`);
101 + minutesDelay++;
102 + }
103 + onGoingSyncs--;
104 + }
105 + await delay(minutesDelay * 60 * 1000);
106 + }
107 + })()
108 + ));
109 + }
110 +}
agents/feeds/feedSubscriptions.tsView
@@ -1,0 +1,111 @@
1 +import * as FSStorage from "../../fsStorage.ts";
2 +import {
3 + computeMsgHash,
4 + FeedId,
5 + log,
6 + parseFeedId,
7 + path,
8 + toBase64,
9 + verifySignature,
10 +} from "../../util.ts";
11 +import RPCConnection, { EndOfStream } from "../../comm/rpc/RpcConnection.ts";
12 +import config from "../../config.ts";
13 +
14 +const textEncoder = new TextEncoder();
15 +const followeesFile = path.join(config.baseDir, "followees.json");
16 +
17 +function getFollowees() {
18 + try {
19 + return JSON.parse(Deno.readTextFileSync(followeesFile));
20 + } catch (error) {
21 + if (error instanceof Deno.errors.NotFound) {
22 + return [];
23 + }
24 + throw error;
25 + }
26 +}
27 +const subscriptions: string[] = getFollowees();
28 +
29 +export async function updateFeed(
30 + rpcConnection: RPCConnection,
31 + feedKey: FeedId,
32 +) {
33 + const messagesAlreadyHere = await FSStorage.lastMessage(feedKey);
34 + try {
35 + await updateFeedFrom(
36 + rpcConnection,
37 + feedKey,
38 + messagesAlreadyHere > 0 ? messagesAlreadyHere : 1,
39 + );
40 + } catch (error) {
41 + log.info(`error updating feed ${feedKey}: ${error}`);
42 + }
43 +}
44 +
45 +export async function updateFeedFrom(
46 + rpcConnection: RPCConnection,
47 + feedKey: FeedId,
48 + from: number,
49 +) {
50 + log.debug(`Updating Feed ${feedKey} from ${from}`);
51 + const historyStream = await rpcConnection.sendSourceRequest({
52 + "name": ["createHistoryStream"],
53 + "args": [{
54 + "id": feedKey.toString(),
55 + "seq": from,
56 + }],
57 + });
58 + return (async () => {
59 + const feedDir = FSStorage.getFeedDir(feedKey);
60 + await Deno.mkdir(feedDir, { recursive: true });
61 + try {
62 + while (true) {
63 + const msg = await historyStream.read() as {
64 + value: Record<string, string>;
65 + key: string;
66 + };
67 + const hash = computeMsgHash(msg.value);
68 + const key = `%${toBase64(hash)}.sha256`;
69 + if (key !== msg.key) {
70 + throw new Error(
71 + "Computed hash doesn't match key " +
72 + JSON.stringify(msg, undefined, 2),
73 + );
74 + }
75 + if (
76 + !verifySignature(msg.value as { author: string; signature: string })
77 + ) {
78 + throw Error(
79 + `failed to veriy signature of the message: ${
80 + JSON.stringify(msg.value, undefined, 2)
81 + }`,
82 + );
83 + }
84 + const msgFile = await Deno.create(
85 + feedDir + "/" +
86 + (msg as { value: Record<string, string> }).value!.sequence! +
87 + ".json",
88 + );
89 + await msgFile.write(
90 + textEncoder.encode(JSON.stringify(msg, undefined, 2)),
91 + );
92 + msgFile.close();
93 + /*log.info(
94 + JSON.stringify(msg, undefined, 2),
95 + );*/
96 + }
97 + } catch (err) {
98 + if (err instanceof EndOfStream) {
99 + log.debug(() => `Stream ended for feed ${feedKey}`);
100 + } else {
101 + log.error(err);
102 + }
103 + }
104 + })();
105 +}
106 +
107 +export function updateFeeds(rpcConnection: RPCConnection) {
108 + return Promise.all(
109 + subscriptions.map((feed) => updateFeed(rpcConnection, parseFeedId(feed))),
110 + );
111 +}
ScuttlebuttBoxPeer.tsView
@@ -1,356 +1,0 @@
1-// deno-lint-ignore-file camelcase
2-import sodium from "https://deno.land/x/sodium@0.2.0/sumo.ts";
3-import {
4- Address,
5- concat,
6- FeedId,
7- fromBase64,
8- log,
9- path,
10- readBytes,
11- toBase64,
12-} from "./util.ts";
13-import BoxConnection from "./BoxConnection.ts";
14-import config from "./config.ts";
15-import NetTransport from "./NetTransport.ts";
16-
17-/** A peer with an identity and the abity to connect to other peers using the Secure Scuttlebutt Handshake */
18-export default class ScuttlebuttBoxPeer extends EventTarget {
19- network_identifier = config.networkIdentifier;
20- keyPair = getClientKeyPair();
21- id = new FeedId(this.keyPair.publicKey);
22-
23- netTransport = new NetTransport();
24-
25- connections: BoxConnection[] = [];
26-
27- /** perform handshake as client */
28- async connect(
29- address: Address,
30- ) {
31- // deno-lint-ignore no-this-alias
32- const _host = this;
33- const clientEphemeralKeyPair = sodium.crypto_box_keypair("uint8array");
34- const conn = await this.netTransport.connect(address);
35-
36- const clientHello = () => {
37- const hmac = sodium.crypto_auth(
38- clientEphemeralKeyPair.publicKey,
39- this.network_identifier,
40- );
41- return concat(hmac, clientEphemeralKeyPair.publicKey);
42- };
43-
44- const authenticate = async (
45- server_longterm_pk: Uint8Array,
46- shared_secret_ab: Uint8Array,
47- shared_secret_aB: Uint8Array,
48- ) => {
49- // 3. Client authenticate
50- const shared_secret_ab_sha256 = sodium.crypto_hash_sha256(
51- shared_secret_ab,
52- );
53- const msg = concat(
54- this.network_identifier,
55- server_longterm_pk,
56- shared_secret_ab_sha256,
57- );
58- const detached_signature_A = sodium.crypto_sign_detached(
59- msg,
60- this.keyPair.privateKey,
61- );
62- const boxMsg = new Uint8Array(
63- detached_signature_A.length +
64- this.keyPair.publicKey.length,
65- );
66- boxMsg.set(detached_signature_A);
67- boxMsg.set(
68- this.keyPair.publicKey,
69- detached_signature_A.length,
70- );
71- const nonce = new Uint8Array(24);
72- const boxKey = sodium.crypto_hash_sha256(
73- concat(this.network_identifier, shared_secret_ab, shared_secret_aB),
74- );
75- await conn.write(sodium.crypto_secretbox_easy(boxMsg, nonce, boxKey));
76- return detached_signature_A;
77- };
78-
79- const hello = clientHello();
80- await conn.write(hello);
81- const serverResponse = await readBytes(conn, 64);
82- const server_hmac = serverResponse.subarray(0, 32);
83- const server_ephemeral_pk = serverResponse.subarray(32, 64);
84- if (
85- !sodium.crypto_auth_verify(
86- server_hmac,
87- server_ephemeral_pk,
88- this.network_identifier,
89- )
90- ) {
91- throw new Error("Verification of the server's first response failed");
92- }
93- const shared_secret_ab = sodium.crypto_scalarmult(
94- clientEphemeralKeyPair.privateKey,
95- server_ephemeral_pk,
96- );
97-
98- const shared_secret_aB = sodium.crypto_scalarmult(
99- clientEphemeralKeyPair.privateKey,
100- sodium.crypto_sign_ed25519_pk_to_curve25519(address.key),
101- );
102- const server_longterm_pk = address.key;
103- const detached_signature_A = await authenticate(
104- server_longterm_pk,
105- shared_secret_ab,
106- shared_secret_aB,
107- );
108-
109- const shared_secret_Ab = sodium.crypto_scalarmult(
110- sodium.crypto_sign_ed25519_sk_to_curve25519(
111- this.keyPair.privateKey,
112- ),
113- server_ephemeral_pk,
114- );
115-
116- const serverResponse2 = await readBytes(conn, 80); //msg4 in protocol guide
117-
118- const detached_signature_B = sodium.crypto_box_open_easy_afternm(
119- serverResponse2,
120- new Uint8Array(24),
121- sodium.crypto_hash_sha256(
122- concat(
123- this.network_identifier,
124- shared_secret_ab,
125- shared_secret_aB,
126- shared_secret_Ab,
127- ),
128- ),
129- );
130-
131- const verification2 = sodium.crypto_sign_verify_detached(
132- detached_signature_B,
133- concat(
134- this.network_identifier,
135- detached_signature_A,
136- this.keyPair.publicKey,
137- sodium.crypto_hash_sha256(shared_secret_ab),
138- ),
139- server_longterm_pk,
140- );
141-
142- if (!verification2) {
143- throw new Error("Verification of the server's second response failed");
144- }
145-
146- const combinedSharedSecret = sodium.crypto_hash_sha256(
147- sodium.crypto_hash_sha256(
148- concat(
149- this.network_identifier,
150- shared_secret_ab,
151- shared_secret_aB,
152- shared_secret_Ab,
153- ),
154- ),
155- );
156-
157- const connection = new BoxConnection(
158- conn,
159- combinedSharedSecret,
160- this.keyPair.publicKey,
161- server_longterm_pk,
162- clientEphemeralKeyPair.publicKey,
163- server_ephemeral_pk,
164- );
165- this.connections.push(connection);
166- connection.addEventListener("close", () => {
167- log.debug(
168- `closed outbound connection, one of ${this.connections.length}`,
169- );
170- this.connections = this.connections.filter((c) => c !== connection);
171- });
172- this.dispatchEvent(new CustomEvent("connected", { "detail": connection }));
173- return connection;
174- }
175-
176- /** perform handshake as server */
177- async acceptConnection(conn: Deno.Reader & Deno.Writer & Deno.Closer) {
178- const serverEphemeralKeyPair = sodium.crypto_box_keypair("uint8array");
179- const clientHello = await readBytes(conn, 64);
180- const client_hmac = clientHello.subarray(0, 32);
181- const client_ephemeral_pk = clientHello.subarray(32, 64);
182- if (
183- !sodium.crypto_auth_verify(
184- client_hmac,
185- client_ephemeral_pk,
186- this.network_identifier,
187- )
188- ) {
189- throw new Error("Verification of the client's hello failed");
190- }
191- const serverHello = concat(
192- sodium.crypto_auth(
193- serverEphemeralKeyPair.publicKey,
194- this.network_identifier,
195- ),
196- serverEphemeralKeyPair.publicKey,
197- );
198- await conn.write(serverHello);
199- const shared_secret_ab = sodium.crypto_scalarmult(
200- serverEphemeralKeyPair.privateKey,
201- client_ephemeral_pk,
202- );
203-
204- const shared_secret_aB = sodium.crypto_scalarmult(
205- sodium.crypto_sign_ed25519_sk_to_curve25519(
206- this.keyPair.privateKey,
207- ),
208- client_ephemeral_pk,
209- );
210-
211- const msg3 = await readBytes(conn, 112);
212-
213- const msg3_plaintext = sodium.crypto_secretbox_open_easy(
214- msg3,
215- new Uint8Array(24),
216- sodium.crypto_hash_sha256(
217- concat(
218- this.network_identifier,
219- shared_secret_ab,
220- shared_secret_aB,
221- ),
222- ),
223- );
224-
225- if (msg3_plaintext.length !== 96) {
226- throw Error("Invalid message length");
227- }
228-
229- const detached_signature_A = msg3_plaintext.subarray(0, 64);
230- const client_longterm_pk = msg3_plaintext.subarray(64, 96);
231-
232- const verification3 = sodium.crypto_sign_verify_detached(
233- detached_signature_A,
234- concat(
235- this.network_identifier,
236- this.keyPair.publicKey,
237- sodium.crypto_hash_sha256(shared_secret_ab),
238- ),
239- client_longterm_pk,
240- );
241- if (!verification3) {
242- throw new Error("Verification of the client's third message failed");
243- }
244-
245- const shared_secret_Ab = sodium.crypto_scalarmult(
246- serverEphemeralKeyPair.privateKey,
247- sodium.crypto_sign_ed25519_pk_to_curve25519(client_longterm_pk),
248- );
249- const detached_signature_B = sodium.crypto_sign_detached(
250- concat(
251- this.network_identifier,
252- detached_signature_A,
253- client_longterm_pk,
254- sodium.crypto_hash_sha256(shared_secret_ab),
255- ),
256- this.keyPair.privateKey,
257- );
258- const completionMsg = sodium.crypto_secretbox_easy(
259- detached_signature_B,
260- new Uint8Array(24),
261- sodium.crypto_hash_sha256(
262- concat(
263- this.network_identifier,
264- shared_secret_ab,
265- shared_secret_aB,
266- shared_secret_Ab,
267- ),
268- ),
269- );
270- await conn.write(completionMsg);
271-
272- const combinedSharedSecret = sodium.crypto_hash_sha256(
273- sodium.crypto_hash_sha256(
274- concat(
275- this.network_identifier,
276- shared_secret_ab,
277- shared_secret_aB,
278- shared_secret_Ab,
279- ),
280- ),
281- );
282-
283- const connection = new BoxConnection(
284- conn,
285- combinedSharedSecret,
286- this.keyPair.publicKey,
287- client_longterm_pk,
288- serverEphemeralKeyPair.publicKey,
289- client_ephemeral_pk,
290- );
291- this.connections.push(connection);
292- connection.addEventListener("close", () => {
293- log.debug(
294- `closed incoming connection, one of ${this.connections.length}`,
295- );
296- this.connections = this.connections.filter((c) => c !== connection);
297- });
298- this.dispatchEvent(new CustomEvent("connected", { "detail": connection }));
299- }
300-
301- async listen() {
302- this.netTransport.listen({
303- port: config.port,
304- });
305- log.info(`listening on port ${config.port}`);
306- for await (const conn of this.netTransport) {
307- log.info(`Received connection from ${conn}`);
308- try {
309- await this.acceptConnection(conn);
310- } catch (error) {
311- log.warning(
312- `Error with incoming connection ${JSON.stringify(conn)}: ${error}`,
313- );
314- }
315- }
316- }
317-}
318-
319-function getClientKeyPair() {
320- const secretFileDir = config.baseDir;
321- const secretFilePath = path.join(secretFileDir, "secret");
322- try {
323- const secretText = Deno.readTextFileSync(secretFilePath);
324- const secretTextNoComments = secretText.split("\n").filter((line) =>
325- line.charAt(0) !== "#"
326- ).join("\n");
327- const secret = JSON.parse(secretTextNoComments);
328- return {
329- keyType: secret.curve,
330- publicKey: fromBase64(
331- secret.public.substring(0, secret.public.length - ".ed25519".length),
332- ),
333- privateKey: fromBase64(
334- secret.private.substring(0, secret.private.length - ".ed25519".length),
335- ),
336- };
337- } catch (error) {
338- if (error instanceof Deno.errors.NotFound) {
339- const newKey = sodium.crypto_sign_keypair("uint8array");
340- const secret = {
341- public: toBase64(newKey.publicKey) + ".ed25519",
342- "private": toBase64(newKey.privateKey) + ".ed25519",
343- curve: newKey.keyType,
344- };
345- Deno.mkdirSync(secretFileDir, { recursive: true });
346- Deno.writeTextFileSync(
347- secretFilePath,
348- JSON.stringify(secret, undefined, 2),
349- );
350- return newKey;
351- } else {
352- // unexpected error, pass it along
353- throw error;
354- }
355- }
356-}
comm/CommInterface.tsView
@@ -1,0 +1,7 @@
1 +import { Address } from "../util.ts";
2 +
3 +/** An object that allows to receive and initiate connections of a certain type */
4 +export default interface CommInterface<T> {
5 + connect(addr: Address): Promise<T>;
6 + listen(): AsyncIterable<T>;
7 +}
comm/README.mdView
@@ -1,0 +1,4 @@
1 +# Scuttlebutt Protocol Communication Layers
2 +
3 +All layers provide an implementation of `commInterface` for their type of
4 +connections.
comm/box/BoxConnection.tsView
@@ -1,0 +1,172 @@
1 +import sodium from "https://deno.land/x/sodium@0.2.0/sumo.ts";
2 +import { concat, FeedId, isZero, log, readBytes } from "../../util.ts";
3 +import config from "../../config.ts";
4 +
5 +export default class BoxConnection extends EventTarget
6 + implements Deno.Reader, Deno.Writer, Deno.Closer {
7 + closed = false;
8 + serverToClientKey: Uint8Array;
9 + clientToServerKey: Uint8Array;
10 + serverToClientNonce: Uint8Array;
11 + clientToServerNonce: Uint8Array;
12 + peer: FeedId;
13 + constructor(
14 + public conn: Deno.Reader & Deno.Writer & Deno.Closer,
15 + combinedSharedSecret: Uint8Array,
16 + ourLongTermPublicKey: Uint8Array,
17 + theirLongTermPublicKey: Uint8Array,
18 + ourEphemeralPublicKey: Uint8Array,
19 + theirEphemeralTermPublicKey: Uint8Array,
20 + ) {
21 + super();
22 + this.peer = new FeedId(theirLongTermPublicKey);
23 + this.serverToClientKey = sodium.crypto_hash_sha256(
24 + concat(
25 + combinedSharedSecret,
26 + ourLongTermPublicKey,
27 + ),
28 + );
29 +
30 + this.clientToServerKey = sodium.crypto_hash_sha256(
31 + concat(
32 + combinedSharedSecret,
33 + theirLongTermPublicKey,
34 + ),
35 + );
36 +
37 + this.serverToClientNonce = sodium.crypto_auth(
38 + ourEphemeralPublicKey,
39 + config.networkIdentifier,
40 + ).slice(0, 24);
41 + this.clientToServerNonce = sodium.crypto_auth(
42 + theirEphemeralTermPublicKey,
43 + config.networkIdentifier,
44 + ).slice(0, 24);
45 + }
46 +
47 + pendingData: Uint8Array | null = null;
48 +
49 + async read(p: Uint8Array): Promise<number | null> {
50 + if (!this.pendingData) {
51 + const chunk = await this.readChunk();
52 + if (chunk === null) {
53 + return null;
54 + }
55 + if (!this.pendingData) {
56 + this.pendingData = chunk;
57 + } else {
58 + //race condition
59 + this.pendingData = concat(this.pendingData, chunk);
60 + }
61 + }
62 + //TODO merge metods to avoid copying data
63 + if (this.pendingData.length < p.length) {
64 + p.set(this.pendingData);
65 + const result = this.pendingData.length;
66 + this.pendingData = null;
67 + return result;
68 + } else {
69 + p.set(this.pendingData.subarray(0, p.length));
70 + this.pendingData = this.pendingData.subarray(p.length);
71 + return p.length;
72 + }
73 + }
74 +
75 + /** Gets the next chunk (box/body) of data*/
76 + async readChunk() {
77 + try {
78 + const headerBox = await readBytes(this.conn, 34);
79 + const header = sodium.crypto_box_open_easy_afternm(
80 + headerBox,
81 + this.serverToClientNonce,
82 + this.serverToClientKey,
83 + );
84 + increment(this.serverToClientNonce);
85 + if (isZero(header)) {
86 + //they said goodbye
87 + this.close();
88 + return null;
89 + }
90 + const bodyLength = header[0] * 0x100 + header[1];
91 + const authenticationBodyTag = header.slice(2);
92 + const encryptedBody = await readBytes(this.conn, bodyLength);
93 + const decodedBody = sodium.crypto_box_open_easy_afternm(
94 + concat(authenticationBodyTag, encryptedBody),
95 + this.serverToClientNonce,
96 + this.serverToClientKey,
97 + );
98 + increment(this.serverToClientNonce);
99 + //log.debug("Read " + decodedBody);
100 + return decodedBody;
101 + } catch (error) {
102 + if (!this.closed) {
103 + this.close();
104 + }
105 + if (error.message.startsWith("End of reader")) {
106 + log.info("End of reader, closing.");
107 + }
108 + throw error;
109 + }
110 + }
111 +
112 + async write(message: Uint8Array) {
113 + //log.debug("Writing " + message);
114 + const headerNonce = new Uint8Array(this.clientToServerNonce);
115 + increment(this.clientToServerNonce);
116 + const bodyNonce = new Uint8Array(this.clientToServerNonce);
117 + increment(this.clientToServerNonce);
118 + const encryptedMessage = sodium.crypto_box_easy_afternm(
119 + message,
120 + bodyNonce,
121 + this.clientToServerKey,
122 + );
123 + const messageLengh = message.length;
124 + const messageLenghUiA = new Uint8Array([
125 + messageLengh >> 8,
126 + messageLengh & 0xFF,
127 + ]);
128 + const authenticationBodyTag = encryptedMessage.slice(0, 16);
129 + const encryptedHeader = sodium.crypto_box_easy_afternm(
130 + concat(messageLenghUiA, authenticationBodyTag),
131 + headerNonce,
132 + this.clientToServerKey,
133 + );
134 +
135 + await this.conn.write(concat(encryptedHeader, encryptedMessage.slice(16)));
136 + return messageLengh;
137 + }
138 + async close() {
139 + if (this.closed) {
140 + log.warning(`Connection closed already.`);
141 + return;
142 + }
143 + this.closed = true;
144 + this.dispatchEvent(new CustomEvent("close"));
145 + const byeMessage = sodium.crypto_box_easy_afternm(
146 + new Uint8Array(18),
147 + this.clientToServerNonce,
148 + this.clientToServerKey,
149 + );
150 + try {
151 + await this.conn.write(byeMessage);
152 + } catch (error) {
153 + log.debug(`Failed at properly bidding goodbye: ${error}`);
154 + }
155 + this.conn.close();
156 + }
157 +}
158 +
159 +function increment(bytes: Uint8Array) {
160 + let pos = bytes.length - 1;
161 + while (true) {
162 + bytes[pos]++;
163 + if (bytes[pos] === 0) {
164 + pos--;
165 + if (pos < 0) {
166 + return;
167 + }
168 + } else {
169 + return;
170 + }
171 + }
172 +}
comm/box/BoxInterface.tsView
@@ -1,0 +1,367 @@
1 +// deno-lint-ignore-file camelcase
2 +import sodium from "https://deno.land/x/sodium@0.2.0/sumo.ts";
3 +import {
4 + Address,
5 + combine,
6 + concat,
7 + FeedId,
8 + fromBase64,
9 + log,
10 + path,
11 + readBytes,
12 + toBase64,
13 +} from "../../util.ts";
14 +import BoxConnection from "./BoxConnection.ts";
15 +import config from "../../config.ts";
16 +import NetTransport from "../transport/NetTransport.ts";
17 +import CommInterface from "../CommInterface.ts";
18 +
19 +/** A peer with an identity and the abity to connect to other peers using the Secure Scuttlebutt Handshake */
20 +export default class BoxInterface implements CommInterface<BoxConnection> {
21 + network_identifier = config.networkIdentifier;
22 + keyPair = getClientKeyPair();
23 + id = new FeedId(this.keyPair.publicKey);
24 +
25 + netTransport = new NetTransport();
26 +
27 + constructor(
28 + public readonly underlying: CommInterface<
29 + Deno.Reader & Deno.Writer & Deno.Closer
30 + >[],
31 + ) {}
32 +
33 + connections: BoxConnection[] = [];
34 +
35 + /** perform handshake as client */
36 + async connect(
37 + address: Address,
38 + ) {
39 + // deno-lint-ignore no-this-alias
40 + const _host = this;
41 + const clientEphemeralKeyPair = sodium.crypto_box_keypair("uint8array");
42 + const conn = await this.netTransport.connect(address);
43 +
44 + const clientHello = () => {
45 + const hmac = sodium.crypto_auth(
46 + clientEphemeralKeyPair.publicKey,
47 + this.network_identifier,
48 + );
49 + return concat(hmac, clientEphemeralKeyPair.publicKey);
50 + };
51 +
52 + const authenticate = async (
53 + server_longterm_pk: Uint8Array,
54 + shared_secret_ab: Uint8Array,
55 + shared_secret_aB: Uint8Array,
56 + ) => {
57 + // 3. Client authenticate
58 + const shared_secret_ab_sha256 = sodium.crypto_hash_sha256(
59 + shared_secret_ab,
60 + );
61 + const msg = concat(
62 + this.network_identifier,
63 + server_longterm_pk,
64 + shared_secret_ab_sha256,
65 + );
66 + const detached_signature_A = sodium.crypto_sign_detached(
67 + msg,
68 + this.keyPair.privateKey,
69 + );
70 + const boxMsg = new Uint8Array(
71 + detached_signature_A.length +
72 + this.keyPair.publicKey.length,
73 + );
74 + boxMsg.set(detached_signature_A);
75 + boxMsg.set(
76 + this.keyPair.publicKey,
77 + detached_signature_A.length,
78 + );
79 + const nonce = new Uint8Array(24);
80 + const boxKey = sodium.crypto_hash_sha256(
81 + concat(this.network_identifier, shared_secret_ab, shared_secret_aB),
82 + );
83 + await conn.write(sodium.crypto_secretbox_easy(boxMsg, nonce, boxKey));
84 + return detached_signature_A;
85 + };
86 +
87 + const hello = clientHello();
88 + await conn.write(hello);
89 + const serverResponse = await readBytes(conn, 64);
90 + const server_hmac = serverResponse.subarray(0, 32);
91 + const server_ephemeral_pk = serverResponse.subarray(32, 64);
92 + if (
93 + !sodium.crypto_auth_verify(
94 + server_hmac,
95 + server_ephemeral_pk,
96 + this.network_identifier,
97 + )
98 + ) {
99 + throw new Error("Verification of the server's first response failed");
100 + }
101 + const shared_secret_ab = sodium.crypto_scalarmult(
102 + clientEphemeralKeyPair.privateKey,
103 + server_ephemeral_pk,
104 + );
105 +
106 + const shared_secret_aB = sodium.crypto_scalarmult(
107 + clientEphemeralKeyPair.privateKey,
108 + sodium.crypto_sign_ed25519_pk_to_curve25519(address.key),
109 + );
110 + const server_longterm_pk = address.key;
111 + const detached_signature_A = await authenticate(
112 + server_longterm_pk,
113 + shared_secret_ab,
114 + shared_secret_aB,
115 + );
116 +
117 + const shared_secret_Ab = sodium.crypto_scalarmult(
118 + sodium.crypto_sign_ed25519_sk_to_curve25519(
119 + this.keyPair.privateKey,
120 + ),
121 + server_ephemeral_pk,
122 + );
123 +
124 + const serverResponse2 = await readBytes(conn, 80); //msg4 in protocol guide
125 +
126 + const detached_signature_B = sodium.crypto_box_open_easy_afternm(
127 + serverResponse2,
128 + new Uint8Array(24),
129 + sodium.crypto_hash_sha256(
130 + concat(
131 + this.network_identifier,
132 + shared_secret_ab,
133 + shared_secret_aB,
134 + shared_secret_Ab,
135 + ),
136 + ),
137 + );
138 +
139 + const verification2 = sodium.crypto_sign_verify_detached(
140 + detached_signature_B,
141 + concat(
142 + this.network_identifier,
143 + detached_signature_A,
144 + this.keyPair.publicKey,
145 + sodium.crypto_hash_sha256(shared_secret_ab),
146 + ),
147 + server_longterm_pk,
148 + );
149 +
150 + if (!verification2) {
151 + throw new Error("Verification of the server's second response failed");
152 + }
153 +
154 + const combinedSharedSecret = sodium.crypto_hash_sha256(
155 + sodium.crypto_hash_sha256(
156 + concat(
157 + this.network_identifier,
158 + shared_secret_ab,
159 + shared_secret_aB,
160 + shared_secret_Ab,
161 + ),
162 + ),
163 + );
164 +
165 + const connection = new BoxConnection(
166 + conn,
167 + combinedSharedSecret,
168 + this.keyPair.publicKey,
169 + server_longterm_pk,
170 + clientEphemeralKeyPair.publicKey,
171 + server_ephemeral_pk,
172 + );
173 + this.connections.push(connection);
174 + connection.addEventListener("close", () => {
175 + log.debug(
176 + `closed outbound connection, one of ${this.connections.length}`,
177 + );
178 + this.connections = this.connections.filter((c) => c !== connection);
179 + });
180 + return connection;
181 + }
182 +
183 + /** perform handshake as server */
184 + async acceptConnection(
185 + conn: Deno.Reader & Deno.Writer & Deno.Closer,
186 + ): Promise<BoxConnection> {
187 + const serverEphemeralKeyPair = sodium.crypto_box_keypair("uint8array");
188 + const clientHello = await readBytes(conn, 64);
189 + const client_hmac = clientHello.subarray(0, 32);
190 + const client_ephemeral_pk = clientHello.subarray(32, 64);
191 + if (
192 + !sodium.crypto_auth_verify(
193 + client_hmac,
194 + client_ephemeral_pk,
195 + this.network_identifier,
196 + )
197 + ) {
198 + throw new Error("Verification of the client's hello failed");
199 + }
200 + const serverHello = concat(
201 + sodium.crypto_auth(
202 + serverEphemeralKeyPair.publicKey,
203 + this.network_identifier,
204 + ),
205 + serverEphemeralKeyPair.publicKey,
206 + );
207 + await conn.write(serverHello);
208 + const shared_secret_ab = sodium.crypto_scalarmult(
209 + serverEphemeralKeyPair.privateKey,
210 + client_ephemeral_pk,
211 + );
212 +
213 + const shared_secret_aB = sodium.crypto_scalarmult(
214 + sodium.crypto_sign_ed25519_sk_to_curve25519(
215 + this.keyPair.privateKey,
216 + ),
217 + client_ephemeral_pk,
218 + );
219 +
220 + const msg3 = await readBytes(conn, 112);
221 +
222 + const msg3_plaintext = sodium.crypto_secretbox_open_easy(
223 + msg3,
224 + new Uint8Array(24),
225 + sodium.crypto_hash_sha256(
226 + concat(
227 + this.network_identifier,
228 + shared_secret_ab,
229 + shared_secret_aB,
230 + ),
231 + ),
232 + );
233 +
234 + if (msg3_plaintext.length !== 96) {
235 + throw Error("Invalid message length");
236 + }
237 +
238 + const detached_signature_A = msg3_plaintext.subarray(0, 64);
239 + const client_longterm_pk = msg3_plaintext.subarray(64, 96);
240 +
241 + const verification3 = sodium.crypto_sign_verify_detached(
242 + detached_signature_A,
243 + concat(
244 + this.network_identifier,
245 + this.keyPair.publicKey,
246 + sodium.crypto_hash_sha256(shared_secret_ab),
247 + ),
248 + client_longterm_pk,
249 + );
250 + if (!verification3) {
251 + throw new Error("Verification of the client's third message failed");
252 + }
253 +
254 + const shared_secret_Ab = sodium.crypto_scalarmult(
255 + serverEphemeralKeyPair.privateKey,
256 + sodium.crypto_sign_ed25519_pk_to_curve25519(client_longterm_pk),
257 + );
258 + const detached_signature_B = sodium.crypto_sign_detached(
259 + concat(
260 + this.network_identifier,
261 + detached_signature_A,
262 + client_longterm_pk,
263 + sodium.crypto_hash_sha256(shared_secret_ab),
264 + ),
265 + this.keyPair.privateKey,
266 + );
267 + const completionMsg = sodium.crypto_secretbox_easy(
268 + detached_signature_B,
269 + new Uint8Array(24),
270 + sodium.crypto_hash_sha256(
271 + concat(
272 + this.network_identifier,
273 + shared_secret_ab,
274 + shared_secret_aB,
275 + shared_secret_Ab,
276 + ),
277 + ),
278 + );
279 + await conn.write(completionMsg);
280 +
281 + const combinedSharedSecret = sodium.crypto_hash_sha256(
282 + sodium.crypto_hash_sha256(
283 + concat(
284 + this.network_identifier,
285 + shared_secret_ab,
286 + shared_secret_aB,
287 + shared_secret_Ab,
288 + ),
289 + ),
290 + );
291 +
292 + const connection = new BoxConnection(
293 + conn,
294 + combinedSharedSecret,
295 + this.keyPair.publicKey,
296 + client_longterm_pk,
297 + serverEphemeralKeyPair.publicKey,
298 + client_ephemeral_pk,
299 + );
300 + this.connections.push(connection);
301 + connection.addEventListener("close", () => {
302 + log.debug(
303 + `closed incoming connection, one of ${this.connections.length}`,
304 + );
305 + this.connections = this.connections.filter((c) => c !== connection);
306 + });
307 + return connection;
308 + }
309 +
310 + async *listen() {
311 + const iterator = combine(
312 + ...this.underlying.map((i) => i.listen()),
313 + );
314 + for await (
315 + const conn of {
316 + [Symbol.asyncIterator]: () => iterator,
317 + }
318 + ) {
319 + try {
320 + yield await this.acceptConnection(conn);
321 + } catch (error) {
322 + log.warning(
323 + `Error with incoming connection ${JSON.stringify(conn)}: ${error}`,
324 + );
325 + }
326 + }
327 + }
328 +}
329 +
330 +function getClientKeyPair() {
331 + const secretFileDir = config.baseDir;
332 + const secretFilePath = path.join(secretFileDir, "secret");
333 + try {
334 + const secretText = Deno.readTextFileSync(secretFilePath);
335 + const secretTextNoComments = secretText.split("\n").filter((line) =>
336 + line.charAt(0) !== "#"
337 + ).join("\n");
338 + const secret = JSON.parse(secretTextNoComments);
339 + return {
340 + keyType: secret.curve,
341 + publicKey: fromBase64(
342 + secret.public.substring(0, secret.public.length - ".ed25519".length),
343 + ),
344 + privateKey: fromBase64(
345 + secret.private.substring(0, secret.private.length - ".ed25519".length),
346 + ),
347 + };
348 + } catch (error) {
349 + if (error instanceof Deno.errors.NotFound) {
350 + const newKey = sodium.crypto_sign_keypair("uint8array");
351 + const secret = {
352 + public: toBase64(newKey.publicKey) + ".ed25519",
353 + "private": toBase64(newKey.privateKey) + ".ed25519",
354 + curve: newKey.keyType,
355 + };
356 + Deno.mkdirSync(secretFileDir, { recursive: true });
357 + Deno.writeTextFileSync(
358 + secretFilePath,
359 + JSON.stringify(secret, undefined, 2),
360 + );
361 + return newKey;
362 + } else {
363 + // unexpected error, pass it along
364 + throw error;
365 + }
366 + }
367 +}
comm/rpc/RpcConnection.tsView
@@ -1,0 +1,339 @@
1 +import BoxConnection from "../box/BoxConnection.ts";
2 +import {
3 + bytes2NumberSigned,
4 + bytes2NumberUnsigned,
5 + concat,
6 + delay,
7 + isZero,
8 + log,
9 + readBytes,
10 +} from "../../util.ts";
11 +
12 +import { RequestHandler } from "./types.ts";
13 +
14 +const textDecoder = new TextDecoder();
15 +const textEncoder = new TextEncoder();
16 +
17 +export enum RpcBodyType {
18 + binary = 0b00,
19 + utf8 = 0b01,
20 + json = 0b10,
21 +}
22 +
23 +export type Header = {
24 + partOfStream: boolean;
25 + endOrError: boolean;
26 + bodyType: RpcBodyType;
27 + bodyLength: number;
28 + requestNumber: number;
29 +};
30 +
31 +export class EndOfStream extends Error {
32 + constructor() {
33 + super("Stream ended");
34 + }
35 +}
36 +
37 +function parseHeader(
38 + header: Uint8Array,
39 +): Header {
40 + const flags = header[0];
41 + const partOfStream = !!(flags & 0b1000);
42 + const endOrError = !!(flags & 0b100);
43 + const bodyType: RpcBodyType = flags & 0b11;
44 + const bodyLength = bytes2NumberUnsigned(header.subarray(1, 5));
45 + const requestNumber = bytes2NumberSigned(header.subarray(5));
46 + return { partOfStream, endOrError, bodyType, bodyLength, requestNumber };
47 +}
48 +
49 +/** parses a message according to bodyType */
50 +const parse = (message: Uint8Array, bodyType: RpcBodyType) =>
51 + (bodyType === RpcBodyType.json
52 + ? JSON.parse(textDecoder.decode(message))
53 + : bodyType === RpcBodyType.utf8
54 + ? textDecoder.decode(message)
55 + : message) as Record<string, unknown> | string | Uint8Array;
56 +
57 +let lastAnswer = Date.now();
58 +let lastActivity = Date.now();
59 +
60 +export default class RpcConnection {
61 + constructor(
62 + public boxConnection: BoxConnection,
63 + public requestHandler: RequestHandler,
64 + {
65 + answerTimeout = 300,
66 + activityTimeout = 60,
67 + }: {
68 + answerTimeout?: number;
69 + activityTimeout?: number;
70 + } = {},
71 + ) {
72 + this.requestCounter = 0;
73 + const monitorConnection = async () => {
74 + try {
75 + while (!this.boxConnection.closed) {
76 + const headerBytes = await readBytes(boxConnection, 9);
77 + lastActivity = Date.now();
78 + if (isZero(headerBytes)) {
79 + log.debug("They said godbye.");
80 + break;
81 + }
82 + const header = parseHeader(headerBytes);
83 + if (header.bodyLength === 0) {
84 + throw new Error("Got RPC message with lentgh 0.");
85 + }
86 + const body = await readBytes(boxConnection, header.bodyLength);
87 + lastActivity = Date.now();
88 + if (header.requestNumber < 0) {
89 + const listener = this.responseStreamListeners.get(
90 + -header.requestNumber,
91 + );
92 + if (!listener) {
93 + throw new Error(
94 + `Got request with unexpected number ${header.requestNumber}`,
95 + );
96 + }
97 + lastAnswer = Date.now();
98 + listener(body, header);
99 + } else {
100 + const parse = () => {
101 + const decoded = textDecoder.decode(body);
102 + try {
103 + return JSON.parse(decoded);
104 + } catch (error) {
105 + log.error(
106 + `Parsing ${decoded} in request ${JSON.stringify(header)}`,
107 + );
108 + throw error;
109 + }
110 + };
111 + const request = parse();
112 + if (this.requestHandler) {
113 + if (request.type === "source") {
114 + const responseIterator = this.requestHandler
115 + .handleSourceRequest(request.name, request.args);
116 + (async () => {
117 + for await (
118 + const value of {
119 + [Symbol.asyncIterator]: () => responseIterator,
120 + }
121 + ) {
122 + log.debug(() => "sending back " + JSON.stringify(value));
123 + try {
124 + await this.sendRpcMessage(value, {
125 + isStream: true,
126 + inReplyTo: header.requestNumber,
127 + });
128 + } catch (error) {
129 + log.error(
130 + `Error sending back ${JSON.stringify(value)}: ${error}`,
131 + );
132 + }
133 + }
134 + })();
135 + } else {
136 + log.info(
137 + `Request type ${request.type} not yet supported. Ignoring request number ${header.requestNumber}: ${
138 + textDecoder.decode(body)
139 + }`,
140 + );
141 + }
142 + } else {
143 + log.info(
144 + `No handler to handle request number ${header.requestNumber}: ${
145 + textDecoder.decode(body)
146 + }`,
147 + );
148 + }
149 + }
150 + }
151 + } catch (e) {
152 + if (boxConnection.closed) {
153 + log.info("Connection closed");
154 + } else {
155 + if ((e.name === "Interrupted") || (e.name === "ConnectionReset")) {
156 + // ignore
157 + log.info(`RPCConnection ${e.name}`);
158 + } else {
159 + throw e;
160 + }
161 + }
162 + }
163 + };
164 + monitorConnection();
165 + const checkTimeout = async () => {
166 + while (!this.boxConnection.closed) {
167 + await delay(5000);
168 + const timeSinceRead = Date.now() - lastAnswer;
169 + if (timeSinceRead > answerTimeout * 1000) {
170 + log.info(
171 + `RPCConnection readTimeout: ${timeSinceRead /
172 + 1000} seconds since last response was received.`,
173 + );
174 + this.boxConnection.close();
175 + break;
176 + }
177 + const timeSinceActivity = Date.now() - lastActivity;
178 + if (timeSinceActivity > activityTimeout * 1000) {
179 + log.info(
180 + `RPCConnection activityTimeout: ${timeSinceActivity /
181 + 1000} seconds since last data was read.`,
182 + );
183 + this.boxConnection.close();
184 + break;
185 + }
186 + }
187 + };
188 + checkTimeout();
189 + }
190 + private responseStreamListeners: Map<
191 + number,
192 + ((message: Uint8Array, header: Header) => void)
193 + > = new Map();
194 + sendSourceRequest = async (request: {
195 + name: string[];
196 + args: unknown;
197 + }) => {
198 + const requestNumber = await this.sendRpcMessage({
199 + name: request.name,
200 + args: request.args,
201 + "type": "source",
202 + }, {
203 + bodyType: RpcBodyType.json,
204 + isStream: true,
205 + });
206 + const buffer: [Uint8Array, Header][] = [];
207 + const bufferer = (message: Uint8Array, header: Header) => {
208 + buffer.push([message, header]);
209 + };
210 + this.responseStreamListeners.set(requestNumber, bufferer);
211 + return { //TODO return AsyncIterator instead
212 + read: () => {
213 + if (buffer.length > 0) {
214 + const [message, header] = buffer.shift() as [Uint8Array, Header];
215 + if (!header.endOrError) {
216 + return Promise.resolve(parse(message, header.bodyType));
217 + } else {
218 + const endMessage = textDecoder.decode(message);
219 + if (endMessage === "true") {
220 + return Promise.reject(new EndOfStream());
221 + } else {
222 + return Promise.reject(new Error(endMessage));
223 + }
224 + }
225 + } else {
226 + return new Promise<Record<string, unknown> | string | Uint8Array>(
227 + (resolve, reject) => {
228 + this.responseStreamListeners.set(
229 + requestNumber,
230 + (message: Uint8Array, header: Header) => {
231 + if (!header.endOrError) {
232 + this.responseStreamListeners.set(requestNumber, bufferer);
233 + resolve(parse(message, header.bodyType));
234 + } else {
235 + const endMessage = textDecoder.decode(message);
236 + if (endMessage === "true") {
237 + reject(new EndOfStream());
238 + } else {
239 + reject(
240 + new Error(
241 + `On connectiion with ${this.boxConnection}: ${endMessage}`,
242 + ),
243 + );
244 + }
245 + }
246 + },
247 + );
248 + },
249 + );
250 + }
251 + },
252 + };
253 + };
254 + sendAsyncRequest = async (request: {
255 + name: string[];
256 + args: unknown;
257 + }) => {
258 + const requestNumber = await this.sendRpcMessage({
259 + name: request.name,
260 + args: request.args,
261 + "type": "async",
262 + }, {
263 + bodyType: RpcBodyType.json,
264 + isStream: false,
265 + });
266 + return new Promise((resolve, reject) => {
267 + this.responseStreamListeners.set(
268 + requestNumber,
269 + (message: Uint8Array, header: Header) => {
270 + this.responseStreamListeners.delete(requestNumber);
271 + if (!header.endOrError) {
272 + resolve(parse(message, header.bodyType));
273 + } else {
274 + reject(new Error(textDecoder.decode(message)));
275 + }
276 + },
277 + );
278 + });
279 + };
280 + private requestCounter;
281 + private sendRpcMessage = async (
282 + body: Record<string, unknown> | string | Uint8Array,
283 + options: {
284 + isStream?: boolean;
285 + endOrError?: boolean;
286 + bodyType?: RpcBodyType;
287 + inReplyTo?: number;
288 + } = {},
289 + ) => {
290 + function isUint8Array(
291 + v: Record<string, unknown> | string | Uint8Array,
292 + ): v is Uint8Array {
293 + return v.constructor.prototype === Uint8Array.prototype;
294 + }
295 + function isString(
296 + v: Record<string, unknown> | string | Uint8Array,
297 + ): v is string {
298 + return v.constructor.prototype === String.prototype;
299 + }
300 + const getPayload = () => {
301 + if (isUint8Array(body)) {
302 + if (!options.bodyType) options.bodyType = RpcBodyType.binary;
303 + return body;
304 + }
305 + if (isString(body)) {
306 + if (!options.bodyType) options.bodyType = RpcBodyType.utf8;
307 + return textEncoder.encode(body);
308 + }
309 + if (!options.bodyType) options.bodyType = RpcBodyType.json;
310 + return textEncoder.encode(JSON.stringify(body));
311 + };
312 + const payload: Uint8Array = getPayload();
313 + const flags = (options.isStream ? 0b1000 : 0) | (options.endOrError
314 + ? 0b100
315 + : 0) |
316 + options.bodyType!;
317 + const requestNumber = options.inReplyTo
318 + ? options.inReplyTo * -1
319 + : ++this.requestCounter;
320 + const header = new Uint8Array(9);
321 + header[0] = flags;
322 + header.set(
323 + new Uint8Array(new Uint32Array([payload.length]).buffer).reverse(),
324 + 1,
325 + );
326 + header.set(
327 + new Uint8Array(new Uint32Array([requestNumber]).buffer).reverse(),
328 + 5,
329 + );
330 + //writing in one go, to ensure correct order
331 + const message = concat(header, payload);
332 + try {
333 + await this.boxConnection.write(message);
334 + } catch (error) {
335 + throw new Error(`Failed writing to boxConnection: ${error}.`);
336 + }
337 + return requestNumber;
338 + };
339 +}
comm/rpc/RpcInterface.tsView
@@ -1,0 +1,31 @@
1 +import BoxInterface from "../box/BoxInterface.ts";
2 +import RpcConnection from "./RpcConnection.ts";
3 +import { RequestHandler } from "./types.ts";
4 +import { Address, FeedId } from "../../util.ts";
5 +import CommInterface from "../CommInterface.ts";
6 +
7 +export default class RPCInterface implements CommInterface<RpcConnection> {
8 + constructor(
9 + public requestHandlerBuilder: (_: FeedId) => RequestHandler,
10 + public boxPeer: BoxInterface,
11 + ) {}
12 +
13 + async connect(
14 + address: Address,
15 + ) {
16 + const boxConnection = await this.boxPeer.connect(address);
17 + return new RpcConnection(
18 + boxConnection,
19 + this.requestHandlerBuilder(boxConnection.peer),
20 + );
21 + }
22 +
23 + async *listen() {
24 + for await (const boxConnection of this.boxPeer.listen()) {
25 + yield new RpcConnection(
26 + boxConnection,
27 + this.requestHandlerBuilder(boxConnection.peer),
28 + );
29 + }
30 + }
31 +}
comm/rpc/RpcMethodsHandler.tsView
@@ -1,0 +1,66 @@
1 +import {
2 + RequestHandler,
3 + ResultValue,
4 + RpcContext,
5 + RpcFunction,
6 +} from "./types.ts";
7 +import { log } from "../../util.ts";
8 +
9 +/** An RPC request handler providing default procedured based on FSStorage */
10 +export default class RpcMethodsHandler implements RequestHandler {
11 + constructor(public rcpContexts: RpcContext[]) {
12 + log.debug(`creating request handler for ${rcpContexts}`);
13 + }
14 +
15 + protected getFunction(
16 + names: string[],
17 + ) {
18 + return this.rcpContexts.map((context) =>
19 + getFunctionInContext(names, context)
20 + ).find(
21 + (f) => typeof f !== "undefined",
22 + );
23 + }
24 +
25 + handleSourceRequest(
26 + names: string[],
27 + args: Record<string, string>[],
28 + ) {
29 + const method = this.getFunction(names);
30 + if (method) {
31 + return method(args) as AsyncIterator<ResultValue>;
32 + } else {
33 + return (async function* () {})() as AsyncIterator<
34 + string | Record<string, unknown> | Uint8Array,
35 + unknown,
36 + undefined
37 + >;
38 + }
39 + }
40 +
41 + handleAsyncRequest(
42 + names: string[],
43 + args: Record<string, string>[],
44 + ) {
45 + const method = this.getFunction(names);
46 + if (method) {
47 + return method(args) as Promise<ResultValue>;
48 + } else {
49 + return new Promise(() => {/*never*/}) as Promise<ResultValue>;
50 + }
51 + }
52 +}
53 +
54 +function getFunctionInContext(
55 + names: string[],
56 + methods: RpcContext,
57 +): RpcFunction {
58 + if (names.length > 1) {
59 + return getFunctionInContext(
60 + names.slice(1),
61 + methods[names[0]] as RpcContext,
62 + );
63 + } else {
64 + return methods?.[names[0]] as RpcFunction;
65 + }
66 +}
comm/rpc/types.tsView
@@ -1,0 +1,17 @@
1 +export interface RequestHandler {
2 + handleSourceRequest: (
3 + name: string[],
4 + args: Record<string, string>[],
5 + ) => AsyncIterator<ResultValue>;
6 +
7 + handleAsyncRequest: (
8 + name: string[],
9 + args: Record<string, string>[],
10 + ) => Promise<ResultValue>;
11 +}
12 +
13 +export type ResultValue = Record<string, unknown> | string | Uint8Array;
14 +export type RpcFunction = (
15 + args: Record<string, string>[],
16 +) => (Promise<ResultValue> | AsyncGenerator<ResultValue>);
17 +export type RpcContext = { [key: string]: (RpcFunction | RpcContext) };
comm/transport/NetTransport.tsView
@@ -1,0 +1,25 @@
1 +import Transport from "./Transport.ts";
2 +import { Address, combine } from "../../util.ts";
3 +export default class NetTransport implements Transport {
4 + constructor(
5 + public options: { port: number } & Record<string, unknown> = { port: 8008 },
6 + ) {}
7 + listeners: AsyncIterable<Deno.Reader & Deno.Writer & Deno.Closer>[] = [];
8 + [Symbol.asyncIterator](): AsyncIterator<
9 + Deno.Reader & Deno.Writer & Deno.Closer
10 + > {
11 + return combine(...this.listeners)[Symbol.asyncIterator]();
12 + }
13 + protocol = "net";
14 + async connect(
15 + addr: Address,
16 + ): Promise<Deno.Reader & Deno.Writer & Deno.Closer> {
17 + return await Deno.connect({
18 + hostname: addr.host,
19 + port: addr.port,
20 + });
21 + }
22 + listen() {
23 + return Deno.listen(this.options);
24 + }
25 +}
comm/transport/Transport.tsView
@@ -1,0 +1,6 @@
1 +import CommInterface from "../CommInterface.ts";
2 +
3 +export default interface Transport
4 + extends CommInterface<Deno.Reader & Deno.Writer & Deno.Closer> {
5 + protocol: string;
6 +}
ScuttlebuttRpcPeer.tsView
@@ -1,43 +1,0 @@
1-import ScuttlebuttBoxPeer from "./ScuttlebuttBoxPeer.ts";
2-import RPCConnection, { EndOfStream, RequestHandler } from "./RPCConnection.ts";
3-import {
4- Address,
5- concat,
6- FeedId,
7- fromBase64,
8- log,
9- path,
10- readBytes,
11- toBase64,
12-} from "./util.ts";
13-import BoxConnection from "./BoxConnection.ts";
14-
15-export default class ScuttlebuttRpcPeer extends EventTarget {
16- constructor(
17- public requestHandler: RequestHandler,
18- public boxPeer: ScuttlebuttBoxPeer = new ScuttlebuttBoxPeer(),
19- ) {
20- super();
21- boxPeer.addEventListener("connected", (options) => {
22- const boxConnection: BoxConnection = (options as CustomEvent).detail;
23- const rpcConnection = new RPCConnection(
24- boxConnection,
25- this.requestHandler,
26- );
27- this.dispatchEvent(
28- new CustomEvent("connected", { "detail": rpcConnection }),
29- );
30- });
31- }
32-
33- async connect(
34- address: Address,
35- ) {
36- const boxConnection = await this.boxPeer.connect(address);
37- return new RPCConnection(boxConnection, this.requestHandler);
38- }
39-
40- listen() {
41- return this.boxPeer.listen();
42- }
43-}
Transport.tsView
@@ -1,10 +1,0 @@
1-import { Address } from "./util.ts";
2-
3-export default interface Transport
4- extends AsyncIterable<Deno.Reader & Deno.Writer & Deno.Closer> {
5- protocol: string;
6- connect(addr: Address): Promise<Deno.Reader & Deno.Writer & Deno.Closer>;
7- listen(
8- options?: Record<string, unknown>,
9- ): Promise<void>;
10-}
feedSubscriptions.tsView
@@ -1,111 +1,0 @@
1-import * as FSStorage from "./fsStorage.ts";
2-import {
3- computeMsgHash,
4- FeedId,
5- log,
6- parseFeedId,
7- path,
8- toBase64,
9- verifySignature,
10-} from "./util.ts";
11-import RPCConnection, { EndOfStream } from "./RPCConnection.ts";
12-import config from "./config.ts";
13-
14-const textEncoder = new TextEncoder();
15-const followeesFile = path.join(config.baseDir, "followees.json");
16-
17-function getFollowees() {
18- try {
19- return JSON.parse(Deno.readTextFileSync(followeesFile));
20- } catch (error) {
21- if (error instanceof Deno.errors.NotFound) {
22- return [];
23- }
24- throw error;
25- }
26-}
27-const subscriptions: string[] = getFollowees();
28-
29-export async function updateFeed(
30- rpcConnection: RPCConnection,
31- feedKey: FeedId,
32-) {
33- const messagesAlreadyHere = await FSStorage.lastMessage(feedKey);
34- try {
35- await updateFeedFrom(
36- rpcConnection,
37- feedKey,
38- messagesAlreadyHere > 0 ? messagesAlreadyHere : 1,
39- );
40- } catch (error) {
41- log.info(`error updating feed ${feedKey}: ${error}`);
42- }
43-}
44-
45-export async function updateFeedFrom(
46- rpcConnection: RPCConnection,
47- feedKey: FeedId,
48- from: number,
49-) {
50- log.debug(`Updating Feed ${feedKey} from ${from}`);
51- const historyStream = await rpcConnection.sendSourceRequest({
52- "name": ["createHistoryStream"],
53- "args": [{
54- "id": feedKey.toString(),
55- "seq": from,
56- }],
57- });
58- return (async () => {
59- const feedDir = FSStorage.getFeedDir(feedKey);
60- await Deno.mkdir(feedDir, { recursive: true });
61- try {
62- while (true) {
63- const msg = await historyStream.read() as {
64- value: Record<string, string>;
65- key: string;
66- };
67- const hash = computeMsgHash(msg.value);
68- const key = `%${toBase64(hash)}.sha256`;
69- if (key !== msg.key) {
70- throw new Error(
71- "Computed hash doesn't match key " +
72- JSON.stringify(msg, undefined, 2),
73- );
74- }
75- if (
76- !verifySignature(msg.value as { author: string; signature: string })
77- ) {
78- throw Error(
79- `failed to veriy signature of the message: ${
80- JSON.stringify(msg.value, undefined, 2)
81- }`,
82- );
83- }
84- const msgFile = await Deno.create(
85- feedDir + "/" +
86- (msg as { value: Record<string, string> }).value!.sequence! +
87- ".json",
88- );
89- await msgFile.write(
90- textEncoder.encode(JSON.stringify(msg, undefined, 2)),
91- );
92- msgFile.close();
93- /*log.info(
94- JSON.stringify(msg, undefined, 2),
95- );*/
96- }
97- } catch (err) {
98- if (err instanceof EndOfStream) {
99- log.debug(() => `Stream ended for feed ${feedKey}`);
100- } else {
101- log.error(err);
102- }
103- }
104- })();
105-}
106-
107-export function updateFeeds(rpcConnection: RPCConnection) {
108- return Promise.all(
109- subscriptions.map((feed) => updateFeed(rpcConnection, parseFeedId(feed))),
110- );
111-}
main.tsView
@@ -1,0 +1,16 @@
1 +import ScuttlebuttHost from "./ScuttlebuttHost.ts";
2 +
3 +/* starts an SSB peer configured according to command line options */
4 +
5 +//parse params
6 +
7 +//read base config from file, use defaults if missing
8 +
9 +//adapt config according to params
10 +
11 +const config = {
12 + rootDir: "~/.ssb/",
13 +};
14 +
15 +const host = new ScuttlebuttHost(config);
16 +host.start();
play.tsView
@@ -1,75 +1,0 @@
1-import ScuttlebuttRpcPeer from "./ScuttlebuttRpcPeer.ts";
2-import Procedures from "./Procedures.ts";
3-import { updateFeedFrom } from "./feedSubscriptions.ts";
4-import { log, parseAddress, parseBlobId, parseFeedId, path } from "./util.ts";
5-import RPCConnection, { EndOfStream } from "./RPCConnection.ts";
6-import config from "./config.ts";
7-import { getBlobFile } from "./fsStorage.ts";
8-
9-const host = new ScuttlebuttRpcPeer(new Procedures());
10-
11-if (Deno.args.length < 1) {
12- throw new Error("expecting at least one argument");
13-}
14-
15-const addressString = Deno.args[0]; // "net:172.17.0.2:8008~shs:bEhA+VRRIf8mTO474KlSuYTObJACRYZqkwxCl4Id4fk="
16-const address = parseAddress(
17- addressString,
18-);
19-
20-const feedKey = Deno.args.length > 1 ? parseFeedId(Deno.args[1]) : address.key;
21-
22-const rpcConnection: RPCConnection = await host.connect(address);
23-
24-log.info("sending a message...");
25-
26-updateFeedFrom(rpcConnection, feedKey, 1);
27-
28-const blobId = "&cnuH8kTYmu2O685OruWm8TVNR7tKfItKCP+L+pDE8xs=.sha256";
29-const hasBlobP = rpcConnection.sendAsyncRequest({
30- "name": ["blobs", "has"],
31- "args": [blobId],
32-});
33-
34-//hasBlobP.then(log.info, log.error)
35-
36-const hasBlob = await hasBlobP;
37-
38-if (hasBlob) {
39- const blobFile = await getBlobFile(parseBlobId(blobId));
40- const blobStream = await rpcConnection.sendSourceRequest({
41- "name": ["blobs", "get"],
42- "args": [blobId],
43- });
44- (async () => {
45- while (true) {
46- try {
47- const msg = await blobStream.read() as Uint8Array;
48- let written = 0;
49- while (written < msg.length) {
50- written += await blobFile.write(msg.subarray(written));
51- }
52- log.info(`wrote ${written} bytes to file`);
53- //log.info("blob data", msg);
54- } catch (err) {
55- if (err instanceof EndOfStream) {
56- log.error("Stream ended");
57- } else {
58- log.error(err);
59- }
60- break;
61- }
62- }
63- blobFile.close();
64- })();
65-}
66-
67-const wantsStream = await rpcConnection.sendSourceRequest({
68- "name": ["blobs", "createWants"],
69- "args": [],
70-});
71-//(async () => {
72- while (true) {
73- log.info(`They want ${JSON.stringify(await wantsStream.read())}`);
74- }
75-//})();
run.tsView
@@ -1,63 +1,0 @@
1-import ScuttlebuttPeer from "./ScuttlebuttRpcPeer.ts";
2-import BoxConnection from "./BoxConnection.ts";
3-import Procedures from "./Procedures.ts";
4-import { updateFeeds } from "./feedSubscriptions.ts";
5-import { Address, delay, log, parseAddress, path } from "./util.ts";
6-import RPCConnection from "./RPCConnection.ts";
7-import config from "./config.ts";
8-
9-const peersFile = path.join(config.baseDir, "peers.json");
10-
11-function getPeersFromFile() {
12- try {
13- return JSON.parse(Deno.readTextFileSync(peersFile));
14- } catch (error) {
15- if (error instanceof Deno.errors.NotFound) {
16- return [];
17- }
18- throw error;
19- }
20-}
21-
22-function getPeers() {
23- return getPeersFromFile().map(parseAddress);
24-}
25-
26-const peers: Address[] = getPeers();
27-
28-const host = new ScuttlebuttPeer(new Procedures());
29-
30-host.listen();
31-
32-host.addEventListener("connected", async (options) => {
33- log.debug("new connection");
34- const rpcConnection: RPCConnection = (options as CustomEvent).detail;
35- await updateFeeds(rpcConnection);
36-});
37-let initialDelaySec = 0;
38-await Promise.all(peers.map((address) =>
39- (async () => {
40- initialDelaySec += 10;
41- await delay(initialDelaySec * 1000);
42- let minutesDelay = 1;
43- while (true) {
44- try {
45- if (host.boxPeer.connections.length > 20) {
46- log.info("More than 20 connections open, standing by.");
47- } else {
48- log.info(
49- `${host.boxPeer.connections.length} connections open, connecting to ${address}`,
50- );
51- await host.connect(address);
52- }
53- } catch (error) {
54- log.error(
55- `In connection with ${address}: ${error}, now having ${host.boxPeer.connections.length} connections left`,
56- );
57- log.info(`stack: ${error.stack}`);
58- minutesDelay++;
59- }
60- await delay(minutesDelay * 60 * 1000);
61- }
62- })()
63-));

Built with git-ssb-web