Created
January 28, 2016 10:43
-
-
Save peace2048/0f397b3069f9fecd32ee to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
void Main() | |
{ | |
var connection = ConnectionMultiplexer.Connect("192.168.5.40"); | |
var sx1 = connection.ObservableSubscription("pa"); | |
using (sx1.Subscribe(_ => Console.WriteLine(_))) Console.ReadLine(); | |
var sx2 = connection.ObservableList("qa", TimeSpan.FromSeconds(5)); | |
using (sx2.Subscribe(_ => Console.WriteLine(_))) Console.ReadLine(); | |
var sx3 = connection.ObservableList("qa", "pa"); | |
using (sx3.Subscribe(_ => Console.WriteLine(_))) Console.ReadLine(); | |
var sx4 = connection.ObservableList("qa", "pa", TimeSpan.FromSeconds(5)); | |
using (sx4.Subscribe(_ => Console.WriteLine(_))) Console.ReadLine(); | |
} | |
static class RedisObservableExtensions | |
{ | |
public static IObservable<RedisValue> ObservableSubscription(this ConnectionMultiplexer connection, RedisChannel topic) | |
{ | |
return Observable.Create<RedisValue>(observer => | |
{ | |
var subscriber = connection.GetSubscriber(); | |
subscriber.Subscribe(topic, (channel, value) => | |
{ | |
observer.OnNext(value); | |
}); | |
return () => | |
{ | |
subscriber.Unsubscribe(topic); | |
}; | |
}); | |
} | |
public static IObservable<RedisValue> ObservableList(this ConnectionMultiplexer connection, RedisKey key, RedisChannel topic) | |
{ | |
return ObservableList(connection, key, topic, TimeSpan.FromSeconds(5)); | |
} | |
public static IObservable<RedisValue> ObservableList(this ConnectionMultiplexer connection, RedisKey key, RedisChannel topic, TimeSpan timeout) | |
{ | |
return Observable.Create<RedisValue>(observer => | |
{ | |
var cts = new CancellationTokenSource(); | |
var ctoken = cts.Token; | |
var subscriber = connection.GetSubscriber(); | |
var ev = new AutoResetEvent(false); | |
subscriber.Subscribe(topic, (channel, value) => | |
{ | |
ev.Set(); | |
}); | |
var task = Task.Factory.StartNew(()=> | |
{ | |
try | |
{ | |
var db = connection.GetDatabase(); | |
var waitHandles = new[] {ev, ctoken.WaitHandle}; | |
while (!ctoken.IsCancellationRequested) | |
{ | |
var r = db.ListLeftPop(key); | |
if (r.HasValue) | |
{ | |
observer.OnNext(r); | |
} | |
else | |
{ | |
WaitHandle.WaitAny(waitHandles, timeout); | |
} | |
} | |
} | |
catch (Exception e) | |
{ | |
if (!ctoken.IsCancellationRequested) | |
{ | |
observer.OnError(e); | |
} | |
} | |
}); | |
return () => | |
{ | |
subscriber.Unsubscribe(topic); | |
cts.Cancel(); | |
try { task.Wait(); } | |
catch { } | |
cts.Dispose(); | |
ev.Dispose(); | |
}; | |
}); | |
} | |
public static IObservable<RedisValue> ObservableList(this ConnectionMultiplexer connection, RedisKey key, TimeSpan wait) | |
{ | |
return Observable.Create<RedisValue>(observer => | |
{ | |
var cts = new CancellationTokenSource(); | |
var ctoken = cts.Token; | |
var task = Task.Factory.StartNew(()=> | |
{ | |
try | |
{ | |
var db = connection.GetDatabase(); | |
while (true) | |
{ | |
var r = db.ListLeftPop(key); | |
if (r.HasValue) | |
{ | |
observer.OnNext(r); | |
} | |
else | |
{ | |
Task.Delay(wait).Wait(ctoken); | |
} | |
} | |
} | |
catch (Exception e) | |
{ | |
if (!ctoken.IsCancellationRequested) | |
{ | |
observer.OnError(e); | |
} | |
} | |
}); | |
return () => | |
{ | |
cts.Cancel(); | |
try { task.Wait(); } | |
catch { } | |
cts.Dispose(); | |
}; | |
}); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment