Skip to content

Instantly share code, notes, and snippets.

@acazsouza
Last active June 14, 2016 18:10
Show Gist options
  • Save acazsouza/d039482f51973d4792f753771e83a8e3 to your computer and use it in GitHub Desktop.
Save acazsouza/d039482f51973d4792f753771e83a8e3 to your computer and use it in GitHub Desktop.
do
{
try
{
if (await queueStore.WaitForOperationsAsync(waitForOperationsHandle, TimeSpan.FromSeconds(5)))
{
interLockSyncRoot.Increment();
var oprtHandle = await queueStore.CreateHandleAsync(DateTime.UtcNow.AddMinutes(10));
try
{
var oprt = await queueStore.TryLoadRunnableOperationAsync(oprtHandle, TimeSpan.FromSeconds(10));
if (oprt == null)
{
interLockSyncRoot.Decrement();
await queueStore.DeleteHandleAsync(oprtHandle);
}
else
{
var callbackHandle = new OperationCallbackExecutionContext(oprt, ExtensionLocator.Current)
{
OperationInstanceHandle = oprtHandle,
InterLockSyncRoot = interLockSyncRoot,
QueueStore = queueStore
};
if (oprt.Attempts == 1)
{
oprt.IsRecurrent = true;
}
oprt.Attempts++;
await callbackHandle.QueueStore.UpdateOperationAsync(oprt);
Task.Run(() => OperationManager.Current.RunOperationAsync(oprt)
.ContinueWith(OperationManagerNotifyAsyncListenersCallbackAsync, callbackHandle));
}
}
catch (Exception)
{
interLockSyncRoot.Decrement();
queueStore.DeleteHandleAsync(oprtHandle);
oprtHandle.Dispose();
}
}
else
{
Thread.Sleep(TimeSpan.FromSeconds(5));
}
}
catch (Exception)
{
//TODO: trace some exception info here
waitForOperationsHandle.Dispose();
waitForOperationsHandle = null;
}
if (waitForOperationsHandle == null)
{
waitForOperationsHandle = await queueStore.CreateHandleAsync();
}
} while (!(await IsCancellationRequestedAsync(queueStore, interLockSyncRoot.CancellationToken)));
private class OperationQueueInstanceHandleImpl : OperationQueueInstanceHandle
{
private ManualResetEventSlim _isInUse = new ManualResetEventSlim(true);
private bool _disposed = false;
private TimeSpan _timeout;
public OperationQueueInstanceHandleImpl(Guid ownerId, DateTime expirationDate)
{
this.OwnerID = ownerId;
this.ExpirationDate = expirationDate;
_timeout = expirationDate - DateTime.UtcNow;
}
~OperationQueueInstanceHandleImpl()
{
this.Dispose(false);
}
public void RenewHandleExpirationDate()
{
this.ExpirationDate = DateTime.UtcNow.Add(_timeout);
}
public void AcquireLock()
{
if (_isInUse.IsSet)
{
_isInUse.Reset();
}
}
public void ReleaseLock()
{
_isInUse.Set();
}
public bool IsLocked
{
get
{
return !_isInUse.IsSet;
}
}
public async Task<bool> StartWaitingAsync(Func<Task<bool>> action, TimeSpan timeout)
{
bool result = false;
var wait = new SpinWait();
if (!this.IsLocked)
{
this.AcquireLock();
DateTime startDate = DateTime.UtcNow;
do
{
result = await action();
if (result)
{
break;
}
else if (!wait.NextSpinWillYield)
{
wait.SpinOnce();
}
else
{
Thread.Sleep(500);
}
} while (timeout > (DateTime.UtcNow - startDate));
this.ReleaseLock();
}
return result;
}
public override void Dispose()
{
this.Dispose(true);
GC.SuppressFinalize(this);
}
private void Dispose(bool disposing)
{
if (!_disposed && disposing)
{
_isInUse.Set();
_isInUse.Dispose();
}
_disposed = true;
}
}
do
{
var result = PegarEProcessarDaFila();
if (!wait.NextSpinWillYield)
{
wait.SpinOnce();
}
else
{
Thread.Sleep(500); //sincronizar este tempo
}
} while (cancellationToken.IsCancellationRequested);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment