git ssb

0+

wanderer🌟 / js-primea-hypervisor



Tree: e8b4f99fd6e0dfe281b7a34b3472f07323a34c0d

Files: e8b4f99fd6e0dfe281b7a34b3472f07323a34c0d / inbox.js

3709 bytesRaw
1const binarySearchInsert = require('binary-search-insert')
2const Buffer = require('safe-buffer').Buffer
3
4module.exports = class Inbox {
5 /**
6 * The port manager manages the the ports. This inculdes creation, deletion
7 * fetching and waiting on ports
8 * @param {Object} opts
9 * @param {Object} opts.state
10 * @param {Object} opts.hypervisor
11 * @param {Object} opts.exoInterface
12 */
13 constructor (opts) {
14 this.actor = opts.actor
15 this.hypervisor = opts.hypervisor
16 this._queue = []
17 this._waitingTagsQueue = []
18 this._oldestMessagePromise = new Promise((resolve, reject) => {
19 this._oldestMessageResolve = resolve
20 })
21 }
22
23 /**
24 * queues a message on a port
25 * @param {Message} message
26 */
27 queue (message) {
28 this._queueMessage(message)
29
30 const oldestMessage = this._getOldestMessage()
31 if (oldestMessage === message) {
32 this._oldestMessageResolve(message)
33 this._oldestMessagePromise = new Promise((resolve, reject) => {
34 this._oldestMessageResolve = resolve
35 })
36 }
37 }
38
39 async waitOnTag (tags, timeout) {
40 if (this._waitingTags) {
41 throw new Error('already getting next message')
42 }
43
44 this._waitingTags = new Set(tags)
45 this._queue = this._queue.filter(message => !this._queueTaggedMessage(message))
46
47 // todo: add saturation test
48 const message = await this.getNextMessage(timeout)
49 delete this._waitingTags
50 this._waitingTagsQueue.forEach(message => this._queueMessage(message))
51 this._waitingTagsQueue = []
52
53 return message
54 }
55
56 /**
57 * Waits for the the next message if any
58 * @returns {Promise}
59 */
60 async getNextMessage (timeout = 0) {
61 let message = this._getOldestMessage()
62 if (message === undefined && timeout === 0) {
63 return
64 }
65
66 timeout += this.actor.ticks
67 let oldestTime = this.hypervisor.scheduler.syncLeastNumberOfTicks(this.actor.id)
68
69 while (true) {
70 if (message && message._fromTicks < timeout) {
71 timeout = message._fromTicks
72 }
73
74 if (oldestTime >= timeout) {
75 break
76 }
77
78 await Promise.race([
79 this.hypervisor.scheduler.wait(timeout, this.actor.id).then(() => {
80 message = this._getOldestMessage()
81 }),
82 this._olderMessage(message).then(m => {
83 message = m
84 })
85 ])
86 oldestTime = this.hypervisor.scheduler.syncLeastNumberOfTicks(this.actor.id)
87 }
88 return this._deQueueMessage()
89 }
90
91 // returns a promise that resolve when a message older then the given message
92 // is recived
93 _olderMessage (message) {
94 return this._oldestMessagePromise
95 }
96
97 _getOldestMessage () {
98 if (this._waitingTags) {
99 return this._waitingTagsQueue[0]
100 } else {
101 return this._queue[0]
102 }
103 }
104
105 _deQueueMessage () {
106 if (this._waitingTags) {
107 return this._waitingTagsQueue.shift()
108 } else {
109 return this._queue.shift()
110 }
111 }
112
113 _queueMessage (message) {
114 if (!(this._waitingTags && this._queueTaggedMessage(message))) {
115 binarySearchInsert(this._queue, messageArbiter, message)
116 }
117 }
118
119 _queueTaggedMessage (message) {
120 if (this._waitingTags.has(message.tag)) {
121 this._waitingTags.delete(message.tag)
122 binarySearchInsert(this._waitingTagsQueue, messageArbiter, message)
123 return true
124 } else {
125 return false
126 }
127 }
128}
129
130// decides which message to go first
131function messageArbiter (messageA, messageB) {
132 // order by number of ticks if messages have different number of ticks
133 if (messageA._fromTicks !== messageB._fromTicks) {
134 return messageA._fromTicks > messageB._fromTicks
135 } else {
136 // sender id
137 return Buffer.compare(messageA._fromId, messageB._fromId)
138 }
139}
140

Built with git-ssb-web