Files: db06b3c2442cea31e0706a4c186e8cc21b06d368 / inbox.js
3709 bytesRaw
1 | const binarySearchInsert = require('binary-search-insert') |
2 | const Buffer = require('safe-buffer').Buffer |
3 | |
4 | module.exports = class Inbox { |
5 | /** |
6 | * The port manager manages the the ports. This inculdes creation, deletion |
7 | * fetching and waiting on ports |
8 | * @param {Object} opts |
9 | * @param {Object} opts.state |
10 | * @param {Object} opts.hypervisor |
11 | * @param {Object} opts.exoInterface |
12 | */ |
13 | constructor (opts) { |
14 | this.actor = opts.actor |
15 | this.hypervisor = opts.hypervisor |
16 | this._queue = [] |
17 | this._waitingTagsQueue = [] |
18 | this._oldestMessagePromise = new Promise((resolve, reject) => { |
19 | this._oldestMessageResolve = resolve |
20 | }) |
21 | } |
22 | |
23 | /** |
24 | * queues a message on a port |
25 | * @param {Message} message |
26 | */ |
27 | queue (message) { |
28 | this._queueMessage(message) |
29 | |
30 | const oldestMessage = this._getOldestMessage() |
31 | if (oldestMessage === message) { |
32 | this._oldestMessageResolve(message) |
33 | this._oldestMessagePromise = new Promise((resolve, reject) => { |
34 | this._oldestMessageResolve = resolve |
35 | }) |
36 | } |
37 | } |
38 | |
39 | async waitOnTag (tags, timeout) { |
40 | if (this._waitingTags) { |
41 | throw new Error('already getting next message') |
42 | } |
43 | |
44 | this._waitingTags = new Set(tags) |
45 | this._queue = this._queue.filter(message => !this._queueTaggedMessage(message)) |
46 | |
47 | // todo: add saturation test |
48 | const message = await this.getNextMessage(timeout) |
49 | delete this._waitingTags |
50 | this._waitingTagsQueue.forEach(message => this._queueMessage(message)) |
51 | this._waitingTagsQueue = [] |
52 | |
53 | return message |
54 | } |
55 | |
56 | /** |
57 | * Waits for the the next message if any |
58 | * @returns {Promise} |
59 | */ |
60 | async getNextMessage (timeout = 0) { |
61 | let message = this._getOldestMessage() |
62 | if (message === undefined && timeout === 0) { |
63 | return |
64 | } |
65 | |
66 | timeout += this.actor.ticks |
67 | let oldestTime = this.hypervisor.scheduler.syncLeastNumberOfTicks(this.actor.id) |
68 | |
69 | while (true) { |
70 | if (message && message._fromTicks < timeout) { |
71 | timeout = message._fromTicks |
72 | } |
73 | |
74 | if (oldestTime >= timeout) { |
75 | break |
76 | } |
77 | |
78 | await Promise.race([ |
79 | this.hypervisor.scheduler.wait(timeout, this.actor.id).then(() => { |
80 | message = this._getOldestMessage() |
81 | }), |
82 | this._olderMessage(message).then(m => { |
83 | message = m |
84 | }) |
85 | ]) |
86 | oldestTime = this.hypervisor.scheduler.syncLeastNumberOfTicks(this.actor.id) |
87 | } |
88 | return this._deQueueMessage() |
89 | } |
90 | |
91 | // returns a promise that resolve when a message older then the given message |
92 | // is recived |
93 | _olderMessage (message) { |
94 | return this._oldestMessagePromise |
95 | } |
96 | |
97 | _getOldestMessage () { |
98 | if (this._waitingTags) { |
99 | return this._waitingTagsQueue[0] |
100 | } else { |
101 | return this._queue[0] |
102 | } |
103 | } |
104 | |
105 | _deQueueMessage () { |
106 | if (this._waitingTags) { |
107 | return this._waitingTagsQueue.shift() |
108 | } else { |
109 | return this._queue.shift() |
110 | } |
111 | } |
112 | |
113 | _queueMessage (message) { |
114 | if (!(this._waitingTags && this._queueTaggedMessage(message))) { |
115 | binarySearchInsert(this._queue, messageArbiter, message) |
116 | } |
117 | } |
118 | |
119 | _queueTaggedMessage (message) { |
120 | if (this._waitingTags.has(message.tag)) { |
121 | this._waitingTags.delete(message.tag) |
122 | binarySearchInsert(this._waitingTagsQueue, messageArbiter, message) |
123 | return true |
124 | } else { |
125 | return false |
126 | } |
127 | } |
128 | } |
129 | |
130 | // decides which message to go first |
131 | function messageArbiter (messageA, messageB) { |
132 | // order by number of ticks if messages have different number of ticks |
133 | if (messageA._fromTicks !== messageB._fromTicks) { |
134 | return messageA._fromTicks > messageB._fromTicks |
135 | } else { |
136 | // sender id |
137 | return Buffer.compare(messageA._fromId, messageB._fromId) |
138 | } |
139 | } |
140 |
Built with git-ssb-web