Created
May 19, 2017 21:15
-
-
Save alexander-alvarez/a944011f16e67ce9373436ca8dbbb983 to your computer and use it in GitHub Desktop.
Coalescing aync behavior implementation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { module, test } from 'ember-qunit'; | |
import run from 'ember-runloop' | |
import EmberObject from 'ember-object' | |
import RSVP from 'rsvp'; | |
import { all } from 'ember-concurrency'; | |
import { BufferedChannel } from 'pool/classes'; | |
module('Unit | Utility | BufferedChannel'); | |
test('BufferedChannel test', async function(assert) { | |
assert.expect(3); | |
// GIVEN | |
let args = []; | |
// An aync function that returns results for coalesced requests. | |
const fn = (abc, ids) => { | |
assert.deepEqual([abc, ids], ['abc', [1, 2]], 'the requests are coalesced'); | |
return new RSVP.Promise((resolve, reject) => resolve({ | |
'abc1': 'something that is only for 1', | |
'abc2': 'something that is only for 2' | |
})); | |
}; | |
// An object that usesed the buffered channel | |
let Obj = EmberObject.extend({ | |
task: new BufferedChannel(fn, { | |
enqueue(groupBy, num){ | |
const id = this.queueId++; | |
this.queue.push({ id, num, groupBy }); | |
return id; | |
}, | |
dequeue({ args, id }){ | |
if (this.results.hasOwnProperty(id)) { | |
this.queue = this.queue.filter(({ id }) => id !== id); | |
return this.results[id][args[0] + args[1]]; | |
} | |
return null; | |
}, | |
coalesceEnqueuedRequests(){ | |
const req = this.queue.filter(({ id }) => !this.results.hasOwnProperty(id)).reduce((accum, { id, num, groupBy }) => { | |
if (accum.groupBy && accum.groupBy !== groupBy) { | |
return accum; | |
} else { | |
accum.groupBy = groupBy; | |
} | |
accum.ids.push(id); | |
accum.nums.push(num); | |
return accum; | |
}, { groupBy: null, nums: [], ids: [] }); | |
return { args: [req.groupBy, req.nums], ids: req.ids }; | |
} | |
}) | |
}); | |
let obj; | |
let ref1; | |
let ref2; | |
run(async function() { | |
obj = Obj.create(); | |
// WHEN we perform requests within the allotted timeframe to be coalesced | |
ref1 = obj.get('task.go').perform('abc', 1); | |
ref2 = obj.get('task.go').perform('abc', 2); | |
}); | |
const [res1, res2] = await all([ref1, ref2]); | |
// THEN | |
// the requests are proxied and pull from the same result. | |
assert.equal('something that is only for 1', res1); | |
assert.equal('something that is only for 2', res2); | |
}); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import EmberObject from 'ember-object'; | |
import { task, timeout } from 'ember-concurrency'; | |
import { assert } from 'ember-metal/utils' | |
/** | |
* @class BufferedChannel | |
*/ | |
export default class BufferedChannel extends EmberObject.extend({ | |
requestState: null, | |
timeout: null, | |
go: task(function*() { | |
let args = [...arguments]; | |
const id = this.get('requestState').enqueue(...args); | |
yield timeout(this.get('timeout')); | |
return this.get('bulkRequestHelper').perform({ args, id }); | |
}), | |
bulkRequestHelper: task(function*({ args, id }) { | |
assert('id must be returned from enqueue', id !== undefined); | |
// if request is pending block, otherwise make request | |
const requestState = this.get('requestState'); | |
if (!requestState.isRequestPendingOrResolved({ args, id })) { | |
const { args: coalescedArgs, ids } = requestState.coalesceEnqueuedRequests(); | |
requestState.registerPending(ids); | |
const data = yield this.asyncFunction(...coalescedArgs); | |
requestState.registerResult(ids, data); | |
} | |
return this.get('blockUntilDelegated').perform({ args, id }); | |
}), | |
blockUntilDelegated: task(function*({ args, id }) { | |
let results = null; | |
while (results === null) { | |
yield timeout(100); // pooling frequency to see if bulk request has updated | |
results = this.get('requestState').dequeue({ args, id }); | |
} | |
return results; | |
}) | |
}) { | |
constructor(asyncFunction, { enqueue, dequeue, coalesceEnqueuedRequests }, timeout = 500) { | |
super(); | |
assert('Enqueue, dequeue, and coalesceEnqueuedRequests must be defined', enqueue && dequeue && coalesceEnqueuedRequests); | |
let requestState = new RequestState({ enqueue, dequeue, coalesceEnqueuedRequests }); | |
this.setProperties({ | |
asyncFunction, | |
requestState, | |
timeout | |
}); | |
} | |
} | |
class RequestState { | |
constructor({ enqueue, dequeue, coalesceEnqueuedRequests }) { | |
this.queueId = 0; | |
this.queue = []; | |
this.requested = {}; | |
this.results = {}; | |
this.enqueue = enqueue; | |
this.dequeue = dequeue; | |
this.coalesceEnqueuedRequests = coalesceEnqueuedRequests; | |
} | |
/** | |
* Queues up request arguments (on this.queue) and returns and id that corresponds to those arguments. | |
* | |
* @method enqueue | |
* @return id to trace this request through it's existence | |
*/ | |
enqueue() {} | |
/** | |
* Reads the queue and coalesces similar requests. Returns an object in the format: | |
* ``` | |
* { | |
* args: [], | |
* ids: [] | |
* } | |
* ``` | |
* Where args are the potentially coalesced arguments to apply to the asynchronous function and | |
* ids are the argument ids that were coalesced into one request. | |
* | |
* Returns a list of arguments to be applied to the async function, relating to 1 or more enqueued argument sets. | |
* | |
* @method coalesceEnqueuedRequests | |
* @return Array arguments to be applied to the async function | |
*/ | |
coalesceEnqueuedRequests() {} | |
/** | |
* Returns a boolean value indicating whether or not a request has been made that will contain the outputs | |
* necessary for the incoming arguments | |
* | |
* @method isRequestPending | |
* @property id Arguments id (returned from enqueue method) | |
* @return Boolean | |
*/ | |
isRequestPendingOrResolved({ id }) { | |
return this.requested.hasOwnProperty(id) || this.results.hasOwnProperty(id); | |
} | |
/** | |
* Returns either a value if resolved or null if still not resolved. | |
* The returned value will be returned as resolved value for the taskInstance for the original request. | |
* | |
* @method dequeue | |
* @param args | |
* @param id | |
*/ | |
dequeue({ args, id }) {} | |
/** | |
* | |
* @method registerPending | |
* @property ids List[String] of ids that map to request arguments | |
*/ | |
registerPending(ids) { | |
ids.forEach((id) => this.requested[id] = true); | |
} | |
registerResult(ids, data) { | |
ids.forEach((id) => { | |
this.results[id] = data; | |
}); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment