Files: ce9d7d51405b9e45388197443ff818f21fafee7a / inbox.js
3224 bytesRaw
1 | const binarySearchInsert = require('binary-search-insert') |
2 | |
3 | module.exports = class Inbox { |
4 | /** |
5 | * The port manager manages the the ports. This inculdes creation, deletion |
6 | * fetching and waiting on ports |
7 | * @param {Object} opts |
8 | * @param {Object} opts.state |
9 | * @param {Object} opts.hypervisor |
10 | * @param {Object} opts.exoInterface |
11 | */ |
12 | constructor (opts) { |
13 | this.actor = opts.actor |
14 | this.hypervisor = opts.hypervisor |
15 | this._queue = [] |
16 | this._awaitedTags = new Set() |
17 | this._oldestMessagePromise = new Promise((resolve, reject) => { |
18 | this._oldestMessageResolve = resolve |
19 | }) |
20 | } |
21 | |
22 | /** |
23 | * queues a message on a port |
24 | * @param {Message} message |
25 | */ |
26 | queue (message) { |
27 | binarySearchInsert(this._queue, messageArbiter, message) |
28 | this._queueWaitingTags(message) |
29 | |
30 | const oldestMessage = this._getOldestMessage() |
31 | |
32 | if (oldestMessage === message) { |
33 | this._oldestMessageResolve(message) |
34 | this._oldestMessagePromise = new Promise((resolve, reject) => { |
35 | this._oldestMessageResolve = resolve |
36 | }) |
37 | } |
38 | } |
39 | |
40 | /** |
41 | * Waits for the the next message if any |
42 | * @returns {Promise} |
43 | */ |
44 | async getNextMessage (tags, timeout = Infinity) { |
45 | let oldestTime = this.hypervisor.scheduler.leastNumberOfTicks() |
46 | |
47 | if (this._waitingTags) { |
48 | throw new Error('already getting next message') |
49 | } |
50 | |
51 | if (tags) { |
52 | this._waitingTags = new Set(tags) |
53 | this._queue.forEach(message => { |
54 | this._queueWaitingTags(message) |
55 | }) |
56 | } |
57 | |
58 | let message = this._getOldestMessage() |
59 | let timeouted = false |
60 | |
61 | while (message && oldestTime <= message._fromTicks && !timeouted) { |
62 | await Promise.race([ |
63 | this.hypervisor.scheduler.wait(message._fromTicks, this.actor.id).then(() => { |
64 | timeouted = true |
65 | message = this._getOldestMessage() |
66 | }), |
67 | this._olderMessage(message).then(m => { |
68 | message = m |
69 | }) |
70 | ]) |
71 | oldestTime = this.hypervisor.scheduler.leastNumberOfTicks() |
72 | } |
73 | |
74 | if (this._waitingTags) { |
75 | message = this._waitingTagsQueue.shift() |
76 | } else { |
77 | message = this._queue.shift() |
78 | } |
79 | |
80 | this._waitingTagsQueue = [] |
81 | delete this._waitingTags |
82 | |
83 | return message |
84 | } |
85 | |
86 | // returns a promise that resolve when a message older then the given message |
87 | // is recived |
88 | _olderMessage (message) { |
89 | return this._oldestMessagePromise |
90 | } |
91 | |
92 | _getOldestMessage () { |
93 | if (this._waitingTags) { |
94 | return this._waitingTagsQueue[0] |
95 | } else { |
96 | return this._queue[0] |
97 | } |
98 | } |
99 | |
100 | _queueWaitingTags (message) { |
101 | if (this._waitingTags && this._waitingTags.has(message.tag)) { |
102 | this._waitingAddresses.delete(message.tag) |
103 | binarySearchInsert(this._waitingAddressesQueue, messageArbiter, message) |
104 | } |
105 | } |
106 | } |
107 | |
108 | // decides which message to go first |
109 | function messageArbiter (messageA, messageB) { |
110 | if (!messageA) { |
111 | return messageB |
112 | } else if (!messageB) { |
113 | return messageA |
114 | } |
115 | |
116 | // order by number of ticks if messages have different number of ticks |
117 | if (messageA._fromTicks !== messageB._fromTicks) { |
118 | return messageA._fromTicks < messageB._fromTicks ? messageA : messageB |
119 | } else { |
120 | // insertion order |
121 | return messageA |
122 | } |
123 | } |
124 |
Built with git-ssb-web