git ssb

0+

farewellutopia-dev / deno-ssb-experiments



Commit 6d313b00bb8274f45e13adc58899abcff3d6912c

better logging, more blob handling

Reto Gmür committed on 9/26/2021, 1:24:18 PM
Parent: 75c5953f5d4db6b841133e22c7a1b4bb4832dfc5

Files changed

agents/Agent.tschanged
agents/blobs/BlobsAgent.tschanged
comm/rpc/RpcConnection.tschanged
agents/Agent.tsView
@@ -31,9 +31,9 @@
3131 try {
3232 await agent.outgoingConnection(con);
3333 } catch (error) {
3434 log.error(
35- `Error processing outgoing connection to ${address} by ${agent}: ${error}`,
35 + `Error processing outgoing connection to ${address} by ${agent.constructor.name}: ${error}\n${error.stack}`,
3636 );
3737 }
3838 },
3939 );
agents/blobs/BlobsAgent.tsView
@@ -30,14 +30,19 @@
3030 }
3131
3232 class PendingWant {
3333 readonly created: number;
34 + readonly interestedPeers: Set<string> = new Set<string>();
3435
3536 constructor(
3637 public want: BlobWant,
37- public alreadyAsked: Set<FeedId> = new Set<FeedId>(),
38 + onBehalfOf?: FeedId,
39 + public alreadyAsked: Set<string> = new Set<string>(), //using base 64 strings as mutable Uint8Aray can't be compared
3840 ) {
3941 this.created = Date.now();
42 + if (onBehalfOf) {
43 + this.interestedPeers.add(onBehalfOf.base64Key);
44 + }
4045 }
4146
4247 [x: string]: unknown
4348 }
@@ -55,16 +60,33 @@
5560 want(blobId: BlobId) {
5661 this.processWant(new BlobWant(blobId));
5762 }
5863
59- private processWant(want: BlobWant) {
60- //write to feeds
61- this.wantFeeds.forEach((f) => f(want));
62- //and store to ask on new conections
63- this.pendingWants.set(want.blobId.base64Key, new PendingWant(want));
64 + private processWant(want: BlobWant, onBehalfOf?: FeedId) {
65 + const alreadyPendingWant = this.pendingWants.get(
66 + want.blobId.base64FilenameSafe,
67 + );
68 + if (alreadyPendingWant) {
69 + if (onBehalfOf) {
70 + alreadyPendingWant.interestedPeers.add(onBehalfOf.base64Key);
71 + }
72 + } else {
73 + //write to feeds
74 + for (const f of this.wantFeeds.entries()) {
75 + if (f[0] !== onBehalfOf?.base64Key) {
76 + f[1](want);
77 + }
78 + }
79 + //and store to ask on new conections
80 + this.pendingWants.set(
81 + want.blobId.base64Key,
82 + new PendingWant(want, onBehalfOf),
83 + );
84 + }
6485 }
6586
66- private wantFeeds = new Set<(_: BlobWant) => void>();
87 + //maps a FeedKey to the feed on which to sent wants/has
88 + private wantFeeds = new Map<string, (_: BlobWant) => void>();
6789
6890 /** key is base64 of BlobId */
6991 private pendingWants = new Map<string, PendingWant>();
7092
@@ -89,23 +111,26 @@
89111 args: Record<string, string>[],
90112 ): AsyncIterable<Record<string, unknown>> {
91113 log.info(`${feedId} invoked blobs.createWants with ${args}`);
92114 for (const p of pendingWants.values()) {
93- if (!p.alreadyAsked.has(feedId)) {
115 + if (
116 + !p.alreadyAsked.has(feedId.base64Key) &&
117 + !p.interestedPeers.has(feedId.base64Key)
118 + ) {
94119 yield p.want.shortWant;
95- p.alreadyAsked.add(feedId);
120 + p.alreadyAsked.add(feedId.base64Key);
96121 }
97122 }
98123 while (true) {
99124 yield await new Promise((resolve) => {
100125 /*resolve({
101126 value: new BlobWant(new BlobId(new Uint8Array())),
102127 });*/
103128 const wanter = ((want: BlobWant) => {
104- wantFeeds.delete(wanter);
129 + wantFeeds.delete(feedId.base64Key);
105130 resolve({ value: want.shortWant });
106131 });
107- wantFeeds.add(wanter);
132 + wantFeeds.set(feedId.base64Key, wanter);
108133 });
109134 }
110135 },
111136 },
@@ -132,19 +157,54 @@
132157 `Got has/want from ${rpcConnection.boxConnection.peer}: ${
133158 JSON.stringify(hasOrWant)
134159 }`,
135160 );
136- if (hasOrWant.level > 0) {
161 + if (hasOrWant.level >= 0) {
137162 //a has
138163 if (this.pendingWants.has(hasOrWant.blobId.base64Key)) {
139164 const pendingWant = this.pendingWants.get(
140165 hasOrWant.blobId.base64Key,
141- );
166 + )!;
142167 await this.retrieveBlobFromPeer(
143- pendingWant!.want.blobId,
168 + pendingWant.want.blobId,
144169 rpcConnection.boxConnection.peer,
145170 );
171 + //tell interested peers that we have it
172 + await Promise.all(
173 + [...pendingWant.interestedPeers].map(async (feedKey) => {
174 + const wantFeed = this.wantFeeds.get(feedKey);
175 + if (wantFeed) {
176 + wantFeed(
177 + new BlobWant(
178 + hasOrWant.blobId,
179 + (await FsStorage.getBlob(hasOrWant.blobId)).length,
180 + ),
181 + );
182 + }
183 + }),
184 + );
146185 }
186 + } else {
187 + //a want
188 + if (await FsStorage.hasBlob(hasOrWant.blobId)) {
189 + const wantFeed = this.wantFeeds.get(
190 + rpcConnection.boxConnection.peer.base64Key,
191 + );
192 + if (wantFeed) {
193 + const blob = await FsStorage.getBlob(hasOrWant.blobId);
194 + wantFeed(new BlobWant(hasOrWant.blobId, blob.length));
195 + } else {
196 + //TODO tell them if and when they invoke createWants, add to a broadened pendingWants set
197 + log.warning(
198 + `${rpcConnection.boxConnection.peer} asked for a blob we have, but we can't tell them`,
199 + );
200 + }
201 + } else {
202 + this.processWant(
203 + new BlobWant(hasOrWant.blobId, hasOrWant.level - 1),
204 + rpcConnection.boxConnection.peer,
205 + );
206 + }
147207 }
148208 }
149209 }
150210 }
comm/rpc/RpcConnection.tsView
@@ -242,8 +242,11 @@
242242 resolve(parse(message, header.bodyType));
243243 } else {
244244 const endMessage = textDecoder.decode(message);
245245 if (endMessage === "true") {
246 + log.debug(
247 + `Got end-message on response on ${request.name} by ${this.boxConnection.peer}`,
248 + );
246249 reject(new EndOfStream());
247250 } else {
248251 reject(
249252 new Error(

Built with git-ssb-web