Commit bc308a00381563261f0f7ecf1d231d19e2dfbc52
added nextMessage check
wanderer committed on 12/8/2017, 7:52:53 PMParent: 0e2eb764d9e711043dde259c1e74c27d2b613c09
Files changed
actor.js | changed |
inbox.js | changed |
index.js | changed |
tests/index.js | changed |
actor.js | ||
---|---|---|
@@ -34,10 +34,10 @@ | ||
34 | 34 | * Mints a new capabilitly with a given tag |
35 | 35 | * @param {*} tag - a tag which can be used to identify caps |
36 | 36 | * @return {Object} |
37 | 37 | */ |
38 | - mintCap (tag = 0) { | |
39 | - return new Cap(this.id, tag) | |
38 | + mintCap (tag = 0, funcIndex = 0) { | |
39 | + return new Cap(this.id, tag, funcIndex) | |
40 | 40 | } |
41 | 41 | |
42 | 42 | /** |
43 | 43 | * adds a message to this actor's message queue |
@@ -94,19 +94,18 @@ | ||
94 | 94 | |
95 | 95 | static serializeMetaData (type, transparent = 0, nonce = 0) { |
96 | 96 | const p = new Pipe() |
97 | 97 | leb128.write(type, p) |
98 | - p.write(Buffer.from([transparent])) | |
98 | + p.write(Buffer.from([0])) | |
99 | 99 | leb128.write(nonce, p) |
100 | 100 | return p.buffer |
101 | 101 | } |
102 | 102 | |
103 | 103 | static deserializeMetaData (buffer) { |
104 | 104 | const pipe = new Pipe(buffer) |
105 | 105 | return { |
106 | 106 | type: leb128.read(pipe), |
107 | - nonce: leb128.read(pipe), | |
108 | - transparent: pipe.read(1)[0] | |
107 | + nonce: leb128.read(pipe) | |
109 | 108 | } |
110 | 109 | } |
111 | 110 | |
112 | 111 | /** |
inbox.js | ||
---|---|---|
@@ -52,8 +52,11 @@ | ||
52 | 52 | * @param {Integer} timeout |
53 | 53 | * @returns {Promise} |
54 | 54 | */ |
55 | 55 | async nextTaggedMessage (tags, timeout) { |
56 | + if (this._waitingTags) { | |
57 | + throw new Error('already waiting on tags') | |
58 | + } | |
56 | 59 | this._waitingTags = new Set(tags) |
57 | 60 | this._queue = this._queue.filter(message => !this._queueTaggedMessage(message)) |
58 | 61 | |
59 | 62 | // todo: add saturation test |
@@ -69,15 +72,16 @@ | ||
69 | 72 | * Waits for the the next message if any |
70 | 73 | * @param {Integer} timeout |
71 | 74 | * @returns {Promise} |
72 | 75 | */ |
73 | - async nextMessage (timeout = 0) { | |
74 | - if (this._gettingNextMessage) { | |
75 | - throw new Error('already getting next message') | |
76 | - } else { | |
77 | - this._gettingNextMessage = true | |
76 | + nextMessage (timeout) { | |
77 | + if (!this._gettingNextMessage) { | |
78 | + this._gettingNextMessage = this._nextMessage(timeout) | |
78 | 79 | } |
80 | + return this._gettingNextMessage | |
81 | + } | |
79 | 82 | |
83 | + async _nextMessage (timeout = 0) { | |
80 | 84 | await Promise.all([...this.actor._sending.values()]) |
81 | 85 | let message = this._getOldestMessage() |
82 | 86 | if (message === undefined && timeout === 0) { |
83 | 87 | return |
index.js | ||
---|---|---|
@@ -28,9 +28,9 @@ | ||
28 | 28 | |
29 | 29 | // loads an instance of a container from the state |
30 | 30 | async _loadActor (id) { |
31 | 31 | const state = await this.tree.getSubTree(id) |
32 | - const {type, nonce, transparent} = Actor.deserializeMetaData(state.root['/'][3]) | |
32 | + const {type, nonce} = Actor.deserializeMetaData(state.root['/'][3]) | |
33 | 33 | const container = this._containerTypes[type] |
34 | 34 | |
35 | 35 | // create a new actor instance |
36 | 36 | const actor = new Actor({ |
@@ -38,9 +38,8 @@ | ||
38 | 38 | state, |
39 | 39 | container, |
40 | 40 | id, |
41 | 41 | nonce, |
42 | - transparent, | |
43 | 42 | type |
44 | 43 | }) |
45 | 44 | |
46 | 45 | // save the newly created instance |
tests/index.js | ||
---|---|---|
@@ -485,8 +485,56 @@ | ||
485 | 485 | const stateRoot = await hypervisor.createStateRoot() |
486 | 486 | t.deepEquals(stateRoot, expectedState, 'expected root!') |
487 | 487 | }) |
488 | 488 | |
489 | +tape('return while waiting for tag', async t => { | |
490 | + t.plan(4) | |
491 | + const expectedState = { | |
492 | + '/': Buffer.from('d4291da4536544bf90aa473a1148cb29f913d078', 'hex') | |
493 | + } | |
494 | + | |
495 | + const tree = new RadixTree({ | |
496 | + db: db | |
497 | + }) | |
498 | + | |
499 | + class testVMContainerA extends BaseContainer { | |
500 | + async onMessage (m) { | |
501 | + if (m.tag === 1) { | |
502 | + t.true(m, 'should recive second message') | |
503 | + } else { | |
504 | + t.true(m, 'should recive first message') | |
505 | + const rCap = this.actor.mintCap(1) | |
506 | + const message = new Message() | |
507 | + message.responseCap = rCap | |
508 | + this.actor.send(m.caps[0], message) | |
509 | + this.actor.inbox.nextTaggedMessage([1], 44) | |
510 | + } | |
511 | + } | |
512 | + } | |
513 | + | |
514 | + class testVMContainerB extends BaseContainer { | |
515 | + onMessage (m) { | |
516 | + t.true(m, 'should recive a message') | |
517 | + } | |
518 | + | |
519 | + static get typeId () { | |
520 | + return 8 | |
521 | + } | |
522 | + } | |
523 | + | |
524 | + const hypervisor = new Hypervisor(tree) | |
525 | + hypervisor.registerContainer(testVMContainerA) | |
526 | + hypervisor.registerContainer(testVMContainerB) | |
527 | + | |
528 | + let capA = await hypervisor.createActor(testVMContainerA.typeId, new Message()) | |
529 | + let capB = await hypervisor.createActor(testVMContainerB.typeId, new Message()) | |
530 | + | |
531 | + await hypervisor.send(capA, new Message({caps: [capB]})) | |
532 | + | |
533 | + const stateRoot = await hypervisor.createStateRoot() | |
534 | + t.deepEquals(stateRoot, expectedState, 'expected root!') | |
535 | +}) | |
536 | + | |
489 | 537 | tape('trying to listen for caps more then once', async t => { |
490 | 538 | t.plan(4) |
491 | 539 | const expectedState = { |
492 | 540 | '/': Buffer.from('d4291da4536544bf90aa473a1148cb29f913d078', 'hex') |
Built with git-ssb-web