Last active
August 29, 2015 14:14
-
-
Save ane/e8768bdab8fd98da64ab to your computer and use it in GitHub Desktop.
Simple Producer&Consumer program using Redis lists and Pub/Sub.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module WorkerProducer.Producer | |
open Nessos.FsPickler | |
open StackExchange.Redis | |
open System | |
open System.Text | |
open System.Threading | |
open WorkerProducer.Worker | |
let pickler = FsPickler.CreateBinary() | |
let random = new Random() | |
let genRandomTask() = | |
let rnd = random.Next(0, 3) | |
match rnd with | |
| 0 -> Double(random.Next(64)) | |
| 1 -> Half(random.Next(64)) | |
| _ -> Snore | |
let processResult (chan : RedisChannel) (msg : RedisValue) = | |
let res = pickler.UnPickle(ResultPickler, ~~msg) | |
match res with | |
| Value((cmd, i), result) -> printfn "%A Job #%d with command %A finished with result %A." DateTime.Now i cmd result | |
| Zzz(i) -> printfn "%A Worker fell asleep doing job #%d." DateTime.Now i | |
[<EntryPoint>] | |
let main argv = | |
let redis = ConnectionMultiplexer.Connect("localhost") | |
let subscriber = redis.GetSubscriber() | |
let db = redis.GetDatabase() | |
let cts = new CancellationTokenSource() | |
let handler (evt : ConsoleCancelEventArgs) = | |
printf "Shutting down... " | |
cts.Cancel() | |
subscriber.Publish(~~CommandChannel, ~~"stop") |> ignore | |
evt.Cancel <- true | |
printfn "done." | |
Console.ReadKey() |> ignore | |
exit (0) | |
let produce i = | |
let job = pickler.Pickle(JobPickler, (genRandomTask(), i)) | |
db.ListRightPush(~~WorkQueue, ~~job) |> ignore | |
Thread.Sleep(500) | |
Console.CancelKeyPress.Add handler | |
subscriber.Subscribe(~~ResultChannel, Action<RedisChannel, RedisValue>(processResult)) | |
// infinite loop | |
Seq.unfold (fun i -> Some(produce i, i + 1)) 0 | |
// "unthunk" the seq | |
|> Seq.length | |
|> ignore | |
0 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module WorkerProducer.Worker | |
open Nessos.FsPickler | |
open Nessos.FsPickler.Combinators | |
open StackExchange.Redis | |
open System | |
open System.Diagnostics | |
open System.Text | |
open System.Threading | |
let inline (~~) (x : ^a) : ^b = ((^a or ^b) : (static member op_Implicit : ^a -> ^b) x) | |
let WorkQueue = "work_queue" | |
let CommandChannel = "cmd_chan" | |
let ResultChannel = "work_result" | |
let pickler = FsPickler.CreateBinary() | |
type Task = | |
| Double of x : int | |
| Half of x : int | |
| Snore | |
type Job = Task * int | |
type Result = | |
| Value of Job * result : int | |
| Zzz of int | |
let TaskPickler = | |
Pickler.sum (fun v c1 c2 c3 -> | |
match v with | |
| Double(x) -> c1 x | |
| Half(x) -> c2 x | |
| Snore -> c3()) | |
^+ Pickler.case Double Pickler.int ^+ Pickler.case Half Pickler.int ^. Pickler.variant Snore | |
let JobPickler = Pickler.pair TaskPickler Pickler.int | |
let ResultPickler = | |
Pickler.sum (fun v c1 c2 -> | |
match v with | |
| Value(job, result) -> c1 (job, result) | |
| Zzz(x) -> c2 x) | |
^+ Pickler.case Value (Pickler.pair JobPickler Pickler.int) ^. Pickler.case Zzz Pickler.int | |
type TaskType = | |
| Delay | |
| Quit | |
let work (db : IDatabase, subscriber : ISubscriber, interval) = | |
let cts = new CancellationTokenSource() | |
let rec loop() = | |
async { | |
let task = db.ListLeftPop ~~WorkQueue | |
let publish (result : Result) = | |
let pickled = pickler.Pickle(ResultPickler, result) | |
subscriber.Publish(~~ResultChannel, ~~pickled) |> ignore | |
if task.HasValue then | |
let job = pickler.UnPickle(JobPickler, ~~task) | |
let cmd, id = job | |
printfn "%A Got job %A." DateTime.Now job | |
match fst job with | |
| Double(x) -> Value(job, x * 2) | |
| Half(x) -> Value(job, int (x / 2)) | |
| Snore -> Zzz(snd job) | |
|> publish | |
do! Async.Sleep interval | |
return! loop() | |
else | |
do! Async.Sleep interval | |
return! loop() | |
} | |
subscriber.Subscribe(~~CommandChannel, fun _ _ -> cts.Cancel()) | |
try | |
Async.RunSynchronously(loop(), cancellationToken = cts.Token) | |
with :? OperationCanceledException -> printfn "Shut down." | |
[<EntryPoint>] | |
let main argv = | |
let redis = ConnectionMultiplexer.Connect("localhost") | |
let db = redis.GetDatabase() | |
work (db, redis.GetSubscriber(), 1000) | |
Console.WriteLine("Work completed.") | |
0 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment