git ssb

0+

wanderer🌟 / js-primea-hypervisor



Tree: 0b60f6ffa1bef52d1491a87e1d7aa604092d0af2

Files: 0b60f6ffa1bef52d1491a87e1d7aa604092d0af2 / inbox.js

3224 bytesRaw
1const binarySearchInsert = require('binary-search-insert')
2
3module.exports = class Inbox {
4 /**
5 * The port manager manages the the ports. This inculdes creation, deletion
6 * fetching and waiting on ports
7 * @param {Object} opts
8 * @param {Object} opts.state
9 * @param {Object} opts.hypervisor
10 * @param {Object} opts.exoInterface
11 */
12 constructor (opts) {
13 this.actor = opts.actor
14 this.hypervisor = opts.hypervisor
15 this._queue = []
16 this._awaitedTags = new Set()
17 this._oldestMessagePromise = new Promise((resolve, reject) => {
18 this._oldestMessageResolve = resolve
19 })
20 }
21
22 /**
23 * queues a message on a port
24 * @param {Message} message
25 */
26 queue (message) {
27 binarySearchInsert(this._queue, messageArbiter, message)
28 this._queueWaitingTags(message)
29
30 const oldestMessage = this._getOldestMessage()
31
32 if (oldestMessage === message) {
33 this._oldestMessageResolve(message)
34 this._oldestMessagePromise = new Promise((resolve, reject) => {
35 this._oldestMessageResolve = resolve
36 })
37 }
38 }
39
40 /**
41 * Waits for the the next message if any
42 * @returns {Promise}
43 */
44 async getNextMessage (tags, timeout = Infinity) {
45 let oldestTime = this.hypervisor.scheduler.leastNumberOfTicks()
46
47 if (this._waitingTags) {
48 throw new Error('already getting next message')
49 }
50
51 if (tags) {
52 this._waitingTags = new Set(tags)
53 this._queue.forEach(message => {
54 this._queueWaitingTags(message)
55 })
56 }
57
58 let message = this._getOldestMessage()
59 let timeouted = false
60
61 while (message && oldestTime <= message._fromTicks && !timeouted) {
62 await Promise.race([
63 this.hypervisor.scheduler.wait(message._fromTicks, this.actor.id).then(() => {
64 timeouted = true
65 message = this._getOldestMessage()
66 }),
67 this._olderMessage(message).then(m => {
68 message = m
69 })
70 ])
71 oldestTime = this.hypervisor.scheduler.leastNumberOfTicks()
72 }
73
74 if (this._waitingTags) {
75 message = this._waitingTagsQueue.shift()
76 } else {
77 message = this._queue.shift()
78 }
79
80 this._waitingTagsQueue = []
81 delete this._waitingTags
82
83 return message
84 }
85
86 // returns a promise that resolve when a message older then the given message
87 // is recived
88 _olderMessage (message) {
89 return this._oldestMessagePromise
90 }
91
92 _getOldestMessage () {
93 if (this._waitingTags) {
94 return this._waitingTagsQueue[0]
95 } else {
96 return this._queue[0]
97 }
98 }
99
100 _queueWaitingTags (message) {
101 if (this._waitingTags && this._waitingTags.has(message.tag)) {
102 this._waitingAddresses.delete(message.tag)
103 binarySearchInsert(this._waitingAddressesQueue, messageArbiter, message)
104 }
105 }
106}
107
108// decides which message to go first
109function messageArbiter (messageA, messageB) {
110 if (!messageA) {
111 return messageB
112 } else if (!messageB) {
113 return messageA
114 }
115
116 // order by number of ticks if messages have different number of ticks
117 if (messageA._fromTicks !== messageB._fromTicks) {
118 return messageA._fromTicks < messageB._fromTicks ? messageA : messageB
119 } else {
120 // insertion order
121 return messageA
122 }
123}
124

Built with git-ssb-web