Skip to content

Instantly share code, notes, and snippets.

@gmoothart
Created October 5, 2011 21:09
Show Gist options
  • Save gmoothart/1265744 to your computer and use it in GitHub Desktop.
Save gmoothart/1265744 to your computer and use it in GitHub Desktop.
MassTransit WorkerSelectionStrategy
class FairnessWorkerSelectionStrategy: IWorkerSelectionStrategy<ProcessOrderMessage>
{
const int availabilityThreshold = 3;
const int ordersPerCompanyAtOnce = 5;
/// <summary>
/// Map company id to number of recent orders placed for it
/// </summary>
Dictionary<long, int> companyRecentOrdersCount = new Dictionary<long,int>();
public bool HasAvailableWorker(IEnumerable<WorkerDetails> candidates, ProcessOrderMessage message)
{
return candidates.Any(w => isAvailable(w) || canProcessCompany(message.CompanyId));
}
public WorkerDetails SelectWorker(IEnumerable<WorkerDetails> candidates, ProcessOrderMessage message)
{
var worker = candidates
.Where(w => isAvailable(w) || canProcessCompany(message.CompanyId))
.OrderBy(w => w.InProgress + w.Pending)
.ThenByDescending(w => w.LastUpdate)
.FirstOrDefault();
if (worker != null) {
if (isAvailable(worker)) {
// worker utilization is low, reset order counts for all
// companies.
resetCompanyUsage();
}
else {
// increment the order count for this company
updateCompanyUsage(message.CompanyId);
}
}
return worker;
}
/// <summary>
/// True if this worker is almost out of work to do, false otherwise
/// </summary>
private bool isAvailable(WorkerDetails w)
{
return (w.Pending + w.InProgress) < availabilityThreshold;
}
/// <summary>
/// Orders from this Company can be processed if has no more than
/// `ordersPerCompanyAtOnce` since the last reset.
/// </summary>
private bool canProcessCompany(long companyId)
{
int count;
companyRecentOrdersCount.TryGetValue(companyId, out count);
return count <= ordersPerCompanyAtOnce;
}
private void updateCompanyUsage(long companyId) {
int count;
companyRecentOrdersCount.TryGetValue(companyId, out count);
companyRecentOrdersCount[companyId] = count++;
}
private void resetCompanyUsage()
{
companyRecentOrdersCount.Clear();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment