git ssb

0+

wanderer🌟 / js-primea-hypervisor



Tree: 622b5589b3a43d425004534b7b44c376d3e740c7

Files: 622b5589b3a43d425004534b7b44c376d3e740c7 / inbox.js

3841 bytesRaw
1const binarySearchInsert = require('binary-search-insert')
2const Buffer = require('safe-buffer').Buffer
3
4module.exports = class Inbox {
5 /**
6 * The inbox manages and sorts incoming messages and provides functions
7 * to wait on messages
8 * @param {Object} opts
9 * @param {Object} opts.state
10 * @param {Object} opts.hypervisor
11 */
12 constructor (opts) {
13 this.actor = opts.actor
14 this.hypervisor = opts.hypervisor
15 this._queue = []
16 this._waitingTagsQueue = []
17 this._oldestMessagePromise = new Promise((resolve, reject) => {
18 this._oldestMessageResolve = resolve
19 })
20 }
21
22 /**
23 * queues a message
24 * @param {Message} message
25 */
26 queue (message) {
27 this._queueMessage(message)
28
29 const oldestMessage = this._getOldestMessage()
30 if (oldestMessage === message) {
31 this._oldestMessageResolve(message)
32 this._oldestMessagePromise = new Promise((resolve, reject) => {
33 this._oldestMessageResolve = resolve
34 })
35 }
36 }
37
38 /**
39 * Waits for a message sent with a capablitly that has one of the given tags
40 * @param {Array<*>} tags
41 * @param {Integer} timeout
42 * @returns {Promise}
43 */
44 async waitOnTag (tags, timeout) {
45 if (this._waitingTags) {
46 throw new Error('already getting next message')
47 }
48
49 this._waitingTags = new Set(tags)
50 this._queue = this._queue.filter(message => !this._queueTaggedMessage(message))
51
52 // todo: add saturation test
53 const message = await this.getNextMessage(timeout)
54 delete this._waitingTags
55 this._waitingTagsQueue.forEach(message => this._queueMessage(message))
56 this._waitingTagsQueue = []
57
58 return message
59 }
60
61 /**
62 * Waits for the the next message if any
63 * @param {Integer} timeout
64 * @returns {Promise}
65 */
66 async getNextMessage (timeout = 0) {
67 let message = this._getOldestMessage()
68 if (message === undefined && timeout === 0) {
69 return
70 }
71
72 timeout += this.actor.ticks
73 let oldestTime = this.hypervisor.scheduler.leastNumberOfTicks(this.actor.id)
74
75 while (true) {
76 if (message && message._fromTicks < timeout) {
77 timeout = message._fromTicks
78 }
79
80 if (oldestTime >= timeout) {
81 break
82 }
83
84 await Promise.race([
85 this.hypervisor.scheduler.wait(timeout, this.actor.id).then(() => {
86 message = this._getOldestMessage()
87 }),
88 this._olderMessage(message).then(m => {
89 message = m
90 })
91 ])
92 oldestTime = this.hypervisor.scheduler.leastNumberOfTicks(this.actor.id)
93 }
94 return this._deQueueMessage()
95 }
96
97 // returns a promise that resolve when a message older then the given message
98 // is recived
99 _olderMessage (message) {
100 return this._oldestMessagePromise
101 }
102
103 _getOldestMessage () {
104 if (this._waitingTags) {
105 return this._waitingTagsQueue[0]
106 } else {
107 return this._queue[0]
108 }
109 }
110
111 _deQueueMessage () {
112 if (this._waitingTags) {
113 return this._waitingTagsQueue.shift()
114 } else {
115 return this._queue.shift()
116 }
117 }
118
119 _queueMessage (message) {
120 if (!(this._waitingTags && this._queueTaggedMessage(message))) {
121 binarySearchInsert(this._queue, messageArbiter, message)
122 }
123 }
124
125 _queueTaggedMessage (message) {
126 if (this._waitingTags.has(message.tag)) {
127 this._waitingTags.delete(message.tag)
128 binarySearchInsert(this._waitingTagsQueue, messageArbiter, message)
129 return true
130 } else {
131 return false
132 }
133 }
134}
135
136// decides which message to go first
137function messageArbiter (messageA, messageB) {
138 // order by number of ticks if messages have different number of ticks
139 if (messageA._fromTicks !== messageB._fromTicks) {
140 return messageA._fromTicks > messageB._fromTicks
141 } else {
142 // sender id
143 return Buffer.compare(messageA._fromId, messageB._fromId)
144 }
145}
146

Built with git-ssb-web