Files: 77a97926a01c38ec889ed82e584cdf833cad6a31 / inbox.js
4763 bytesRaw
1 | const Buffer = require('safe-buffer').Buffer |
2 | const binarySearchInsert = require('binary-search-insert') |
3 | |
4 | // decides which message to go first |
5 | function messageArbiter (messageA, messageB) { |
6 | // order by number of ticks if messages have different number of ticks |
7 | if (messageA._fromTicks !== messageB._fromTicks) { |
8 | return messageA._fromTicks > messageB._fromTicks |
9 | } else { |
10 | // sender id |
11 | return Buffer.compare(messageA._fromId, messageB._fromId) |
12 | } |
13 | } |
14 | |
15 | module.exports = class Inbox { |
16 | /** |
17 | * The inbox manages and sorts incoming messages and provides functions |
18 | * to wait on messages |
19 | * @param {Object} opts |
20 | * @param {Object} opts.state |
21 | * @param {Object} opts.hypervisor |
22 | */ |
23 | constructor (opts) { |
24 | this.actor = opts.actor |
25 | this.hypervisor = opts.hypervisor |
26 | this._queue = [] |
27 | this._waitingTagsQueue = [] |
28 | this._oldestMessagePromise = new Promise((resolve, reject) => { |
29 | this._oldestMessageResolve = resolve |
30 | }) |
31 | } |
32 | |
33 | /** |
34 | * queues a message |
35 | * @param {Message} message |
36 | */ |
37 | queue (message) { |
38 | this._queueMessage(message) |
39 | |
40 | const oldestMessage = this._getOldestMessage() |
41 | if (oldestMessage === message) { |
42 | this._oldestMessageResolve(message) |
43 | this._oldestMessagePromise = new Promise((resolve, reject) => { |
44 | this._oldestMessageResolve = resolve |
45 | }) |
46 | } |
47 | } |
48 | |
49 | /** |
50 | * Waits for a message sent with a capablitly that has one of the given tags |
51 | * @param {Array<*>} tags |
52 | * @param {Integer} timeout |
53 | * @returns {Promise} |
54 | */ |
55 | async nextTaggedMessage (tags, timeout) { |
56 | this._waitingTags = new Set(tags) |
57 | this._queue = this._queue.filter(message => !this._queueTaggedMessage(message)) |
58 | |
59 | // todo: add saturation test |
60 | const message = await this.nextMessage(timeout) |
61 | delete this._waitingTags |
62 | this._waitingTagsQueue.forEach(message => this._queueMessage(message)) |
63 | this._waitingTagsQueue = [] |
64 | |
65 | return message |
66 | } |
67 | |
68 | /** |
69 | * Waits for the the next message if any |
70 | * @param {Integer} timeout |
71 | * @returns {Promise} |
72 | */ |
73 | nextMessage (timeout, getCurrent = false) { |
74 | if (!this._gettingNextMessage) { |
75 | this._gettingNextMessage = this._nextMessage(timeout) |
76 | this._gettingNextMessage.then(() => { |
77 | this._gettingNextMessage = false |
78 | }) |
79 | } else if (!getCurrent) { |
80 | throw new Error('already waiting for next message') |
81 | } |
82 | return this._gettingNextMessage |
83 | } |
84 | |
85 | async _nextMessage (timeout) { |
86 | let message = this._getOldestMessage() |
87 | if (message === undefined && timeout === 0) { |
88 | return |
89 | } |
90 | |
91 | timeout += this.actor.ticks |
92 | let oldestTime = this.hypervisor.scheduler.leastNumberOfTicks(this.actor.id) |
93 | |
94 | while (true) { |
95 | if (message) { |
96 | // if the message we recived had more ticks then we currently have then |
97 | // update our ticks to it, since we jumped forward in time |
98 | if (message._fromTicks > this.actor.ticks) { |
99 | this.actor.ticks = message._fromTicks |
100 | this.hypervisor.scheduler.update(this.actor) |
101 | } |
102 | |
103 | // if there is a message that is "older" then the timeout, the lower |
104 | // the timeout to the oldest message |
105 | if (message._fromTicks < timeout) { |
106 | timeout = message._fromTicks |
107 | } |
108 | } |
109 | |
110 | // if all actors are "older" then the time out then stop waiting for messages |
111 | // since we konw that we can not receive one |
112 | if (oldestTime >= timeout) { |
113 | break |
114 | } |
115 | |
116 | await Promise.race([ |
117 | this.hypervisor.scheduler.wait(timeout, this.actor.id).then(() => { |
118 | message = this._getOldestMessage() |
119 | }), |
120 | this._olderMessage(message).then(m => { |
121 | message = m |
122 | }) |
123 | ]) |
124 | oldestTime = this.hypervisor.scheduler.leastNumberOfTicks(this.actor.id) |
125 | } |
126 | this.currentMessage = this._deQueueMessage() |
127 | return this.currentMessage |
128 | } |
129 | |
130 | // returns a promise that resolve when a message older then the given message |
131 | // is recived |
132 | _olderMessage (message) { |
133 | return this._oldestMessagePromise |
134 | } |
135 | |
136 | _getOldestMessage () { |
137 | if (this._waitingTags) { |
138 | return this._waitingTagsQueue[0] |
139 | } else { |
140 | return this._queue[0] |
141 | } |
142 | } |
143 | |
144 | _deQueueMessage () { |
145 | if (this._waitingTags) { |
146 | return this._waitingTagsQueue.shift() |
147 | } else { |
148 | return this._queue.shift() |
149 | } |
150 | } |
151 | |
152 | _queueMessage (message) { |
153 | if (!(this._waitingTags && this._queueTaggedMessage(message))) { |
154 | binarySearchInsert(this._queue, messageArbiter, message) |
155 | } |
156 | } |
157 | |
158 | _queueTaggedMessage (message) { |
159 | if (this._waitingTags.has(message.tag)) { |
160 | this._waitingTags.delete(message.tag) |
161 | binarySearchInsert(this._waitingTagsQueue, messageArbiter, message) |
162 | return true |
163 | } else { |
164 | return false |
165 | } |
166 | } |
167 | } |
168 |
Built with git-ssb-web