Skip to content

Instantly share code, notes, and snippets.

@eocron
Created February 5, 2018 20:46
Show Gist options
  • Save eocron/afd09f5746f199050f98ce8bc00ba283 to your computer and use it in GitHub Desktop.
Save eocron/afd09f5746f199050f98ce8bc00ba283 to your computer and use it in GitHub Desktop.
Used for synchronization of two storages
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
namespace ConsoleApplication1
{
/// <summary>
/// Performs synchronization between continuous collection and unknown collection.
/// For example, between continuous MSSQL table ids and unknown mapped id in other system, like ElasticSearch.
/// It is useful to synchronize collections in different systems.
/// </summary>
/// <typeparam name="TAKey"></typeparam>
/// <typeparam name="TBKey"></typeparam>
public abstract class PagedMapSync<TAKey, TBKey>
{
/// <summary>
/// Get or sets batch size. Possible to change in runtime.
/// </summary>
public volatile int BatchSize;
public int Offset => _offset;
public event EventHandler OnStart;
public event EventHandler OnProgress;
public event EventHandler OnFinish;
private int _offset;
private readonly object _lock = new object();
private CancellationToken? _ct;
protected PagedMapSync(int batchSize = 100)
{
BatchSize = batchSize;
}
protected bool IsCancellationRequested => _ct?.IsCancellationRequested ?? false;
public void Sync(CancellationToken? ct = null)
{
lock (_lock)
{
_ct = ct;
try
{
OnStart?.Invoke(this, new EventArgs());
_offset = 0;
while (!IsCancellationRequested)
{
var batchSize = BatchSize;
if(!ProcessBatch(_offset, batchSize))
break;
_offset += batchSize;
OnProgress?.Invoke(this, new EventArgs());
}
OnFinish?.Invoke(this, new EventArgs());
}
finally
{
_ct = null;
}
}
}
private bool ProcessBatch(int offset, int count)
{
IReadOnlyList<TAKey> pagedIds;
if (!TryGetPagedValues(offset, count, out pagedIds))
{
return false;
}
Debug.Assert(pagedIds != null);
var unpagedIds = OnMap(pagedIds);
Debug.Assert(unpagedIds != null);
Debug.Assert(pagedIds.Count == unpagedIds.Count);
OnSync(pagedIds, unpagedIds);
return true;
}
protected abstract bool TryGetPagedValues(int offset, int count, out IReadOnlyList<TAKey> pagedIds);
/// <summary>
/// Performs mapping of one element to another in coresponding position
/// </summary>
/// <param name="pagedIds"></param>
/// <returns></returns>
protected abstract IReadOnlyList<TBKey> OnMap(IReadOnlyList<TAKey> pagedIds);
/// <summary>
/// Performs sync of two subcollections. Paged ids is LEFT JOIN'ed with unpaged ids -> corresponding items will be filled or default.
/// </summary>
/// <param name="pagedIds"></param>
/// <param name="unpagedIds"></param>
protected abstract void OnSync(IReadOnlyList<TAKey> pagedIds, IReadOnlyList<TBKey> unpagedIds);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment