Commit 26c361018ce2740027699d601b339a3001fea89b
updated lockMap
Signed-off-by: wanderer <mjbecze@gmail.com>wanderer committed on 11/17/2017, 11:17:53 PM
Parent: 6da610e32ec129cd6b73621fbc7218782741896b
Files changed
inbox.js | changed |
package-lock.json | changed |
package.json | changed |
scheduler.js | changed |
tests/index.js | changed |
inbox.js | ||
---|---|---|
@@ -13,9 +13,9 @@ | ||
13 | 13 | constructor (opts) { |
14 | 14 | this.actor = opts.actor |
15 | 15 | this.hypervisor = opts.hypervisor |
16 | 16 | this._queue = [] |
17 | - this._awaitedTags = new Set() | |
17 | + this._waitingTagsQueue = [] | |
18 | 18 | this._oldestMessagePromise = new Promise((resolve, reject) => { |
19 | 19 | this._oldestMessageResolve = resolve |
20 | 20 | }) |
21 | 21 | } |
@@ -24,10 +24,9 @@ | ||
24 | 24 | * queues a message on a port |
25 | 25 | * @param {Message} message |
26 | 26 | */ |
27 | 27 | queue (message) { |
28 | - binarySearchInsert(this._queue, messageArbiter, message) | |
29 | - this._queueWaitingTags(message) | |
28 | + this._queueMessage(message) | |
30 | 29 | |
31 | 30 | const oldestMessage = this._getOldestMessage() |
32 | 31 | if (oldestMessage === message) { |
33 | 32 | this._oldestMessageResolve(message) |
@@ -36,52 +35,57 @@ | ||
36 | 35 | }) |
37 | 36 | } |
38 | 37 | } |
39 | 38 | |
39 | + async waitOnTag (tags, timeout) { | |
40 | + if (this._waitingTags) { | |
41 | + throw new Error('already getting next message') | |
42 | + } | |
43 | + | |
44 | + this._waitingTags = new Set(tags) | |
45 | + this._queue.forEach(message => this._queueWaitingTags(message)) | |
46 | + | |
47 | + const message = await this.getNextMessage(timeout) | |
48 | + // console.log('***', message) | |
49 | + | |
50 | + delete this._waitingTags | |
51 | + return message | |
52 | + } | |
53 | + | |
40 | 54 | /** |
41 | 55 | * Waits for the the next message if any |
42 | 56 | * @returns {Promise} |
43 | 57 | */ |
44 | - async getNextMessage (tags, timeout = Infinity) { | |
58 | + async getNextMessage (timeout = 0) { | |
59 | + let message = this._getOldestMessage() | |
60 | + if (message === undefined && timeout === 0) { | |
61 | + return | |
62 | + } | |
63 | + | |
45 | 64 | let oldestTime = this.hypervisor.scheduler.leastNumberOfTicks(this.actor.id) |
65 | + timeout += this.actor.ticks | |
46 | 66 | |
47 | - if (this._waitingTags) { | |
48 | - throw new Error('already getting next message') | |
49 | - } | |
67 | + while (true) { | |
68 | + if (message && message._fromTicks < timeout) { | |
69 | + timeout = message._fromTicks | |
70 | + } | |
71 | + // console.log(timeout, oldestTime) | |
50 | 72 | |
51 | - if (tags) { | |
52 | - this._waitingTags = new Set(tags) | |
53 | - this._queue.forEach(message => { | |
54 | - this._queueWaitingTags(message) | |
55 | - }) | |
56 | - } | |
73 | + if (oldestTime >= timeout) { | |
74 | + break | |
75 | + } | |
57 | 76 | |
58 | - let message = this._getOldestMessage() | |
59 | - let timeouted = false | |
60 | - | |
61 | - while (message && oldestTime <= message._fromTicks && !timeouted) { | |
62 | 77 | await Promise.race([ |
63 | - this.hypervisor.scheduler.wait(message._fromTicks, this.actor.id).then(() => { | |
64 | - timeouted = true | |
78 | + this.hypervisor.scheduler.wait(timeout, this.actor.id).then(() => { | |
65 | 79 | message = this._getOldestMessage() |
66 | 80 | }), |
67 | 81 | this._olderMessage(message).then(m => { |
68 | 82 | message = m |
69 | 83 | }) |
70 | 84 | ]) |
71 | 85 | oldestTime = this.hypervisor.scheduler.leastNumberOfTicks(this.actor.id) |
72 | 86 | } |
73 | - | |
74 | - if (this._waitingTags) { | |
75 | - message = this._waitingTagsQueue.shift() | |
76 | - } else { | |
77 | - message = this._queue.shift() | |
78 | - } | |
79 | - | |
80 | - this._waitingTagsQueue = [] | |
81 | - delete this._waitingTags | |
82 | - | |
83 | - return message | |
87 | + return this._deQueueMessage() | |
84 | 88 | } |
85 | 89 | |
86 | 90 | // returns a promise that resolve when a message older then the given message |
87 | 91 | // is recived |
@@ -96,12 +100,29 @@ | ||
96 | 100 | return this._queue[0] |
97 | 101 | } |
98 | 102 | } |
99 | 103 | |
104 | + _deQueueMessage () { | |
105 | + if (this._waitingTags) { | |
106 | + return this._waitingTagsQueue.shift() | |
107 | + } else { | |
108 | + return this._queue.shift() | |
109 | + } | |
110 | + } | |
111 | + | |
112 | + _queueMessage (message) { | |
113 | + if (this._waitingTags) { | |
114 | + this._queueWaitingTags(message) | |
115 | + } | |
116 | + binarySearchInsert(this._queue, messageArbiter, message) | |
117 | + } | |
118 | + | |
100 | 119 | _queueWaitingTags (message) { |
101 | - if (this._waitingTags && this._waitingTags.has(message.tag)) { | |
120 | + if (this._waitingTags.has(message.tag)) { | |
102 | 121 | this._waitingAddresses.delete(message.tag) |
103 | - binarySearchInsert(this._waitingAddressesQueue, messageArbiter, message) | |
122 | + binarySearchInsert(this._waitingTagsQueue, messageArbiter, message) | |
123 | + // keep the taged waiting quueue pruned | |
124 | + this._waitingTagsQueue = [this._waitingTagsQueue[0]] | |
104 | 125 | } |
105 | 126 | } |
106 | 127 | } |
107 | 128 |
package-lock.json | ||
---|---|---|
The diff is too large to show. Use a local git client to view these changes. Old file size: 397779 bytes New file size: 397811 bytes |
package.json | ||
---|---|---|
@@ -30,9 +30,9 @@ | ||
30 | 30 | "contributors": "Alex Beregszaszi <alex@rtfs.hu>", |
31 | 31 | "license": "MPL-2.0", |
32 | 32 | "dependencies": { |
33 | 33 | "binary-search-insert": "^1.0.3", |
34 | - "lockmap": "0.0.0", | |
34 | + "lockmap": "^0.1.0", | |
35 | 35 | "primea-message": "0.0.3", |
36 | 36 | "safe-buffer": "^5.1.1", |
37 | 37 | "sortedmap": "0.0.1", |
38 | 38 | "typedarray-addition": "0.0.1" |
scheduler.js | ||
---|---|---|
@@ -49,9 +49,9 @@ | ||
49 | 49 | * @param {String} id |
50 | 50 | * @return {Object} |
51 | 51 | */ |
52 | 52 | getInstance (id) { |
53 | - return this.instances.get(id) || this._loadingInstances.getLock(id) || this.systemServices.get(id) | |
53 | + return this.instances.get(id) || this._loadingInstances.get(id) || this.systemServices.get(id) | |
54 | 54 | } |
55 | 55 | |
56 | 56 | /** |
57 | 57 | * deletes an instance from the scheduler |
tests/index.js | ||
---|---|---|
@@ -477,4 +477,54 @@ | ||
477 | 477 | |
478 | 478 | t.deepEquals(stateRoot, expectedState, 'expected root!') |
479 | 479 | t.equals(hypervisor.scheduler.leastNumberOfTicks(), 0) |
480 | 480 | }) |
481 | + | |
482 | +tape.skip('basic tagged caps', async t => { | |
483 | + t.plan(4) | |
484 | + let message | |
485 | + const expectedState = { | |
486 | + '/': Buffer.from('fc935489953ed357f06171dd23439d83190b3a1b', 'hex') | |
487 | + } | |
488 | + | |
489 | + const tree = new RadixTree({ | |
490 | + db: db | |
491 | + }) | |
492 | + | |
493 | + class testVMContainerA extends BaseContainer { | |
494 | + async onMessage (m) { | |
495 | + t.true(m, 'should recive first message') | |
496 | + const rCap = this.kernel.mintCap(1) | |
497 | + const message = new Message() | |
498 | + message.responseCap = rCap | |
499 | + console.log('here') | |
500 | + this.kernel.send(m.caps[0], message) | |
501 | + const rMessage = await this.kernel.inbox.waitOnTag([1], 44) | |
502 | + t.true(rMessage, 'should recive a response message') | |
503 | + } | |
504 | + } | |
505 | + | |
506 | + class testVMContainerB extends BaseContainer { | |
507 | + onMessage (m) { | |
508 | + console.log('&&&&&&&&&&&&&&&&&&&&&&&&&&') | |
509 | + t.true(m, 'should recive a message') | |
510 | + } | |
511 | + | |
512 | + static get typeId () { | |
513 | + return 8 | |
514 | + } | |
515 | + } | |
516 | + | |
517 | + const hypervisor = new Hypervisor(tree) | |
518 | + hypervisor.registerContainer(testVMContainerA) | |
519 | + hypervisor.registerContainer(testVMContainerB) | |
520 | + | |
521 | + let capA = await hypervisor.createInstance(testVMContainerA.typeId, new Message()) | |
522 | + let capB = await hypervisor.createInstance(testVMContainerB.typeId, new Message()) | |
523 | + | |
524 | + await hypervisor.send(capA, new Message({caps: [capB]})) | |
525 | + | |
526 | + const stateRoot = await hypervisor.createStateRoot(Infinity) | |
527 | + | |
528 | + t.deepEquals(stateRoot, expectedState, 'expected root!') | |
529 | + t.equals(hypervisor.scheduler.leastNumberOfTicks(), 0) | |
530 | +}) |
Built with git-ssb-web