Created
December 11, 2020 16:56
-
-
Save bartelink/14b7a1474e5376b59e62ddf8caacdb44 to your computer and use it in GitHub Desktop.
AwaitWithStopOnCancellation re https://github.com/jet/FsKafka/pull/83
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(* | |
let a = async { | |
let! ct = Async.CancellationToken | |
do! System.Threading.Tasks.Task.Delay(5000,ct) |> Async.AwaitTask | |
} | |
let b = async { | |
failwith "B throw" | |
} | |
Async.Parallel[a;b] |> Async.RunSynchronously | |
*) | |
module Demo = | |
open System | |
open System.Threading.Tasks | |
type Async with | |
static member AwaitTaskCorrect (task : Task<'T>) : Async<'T> = | |
Async.FromContinuations <| fun (k,ek,_) -> | |
task.ContinueWith (fun (t:Task<'T>) -> | |
if t.IsFaulted then | |
let e = t.Exception | |
if e.InnerExceptions.Count = 1 then ek e.InnerExceptions.[0] | |
else ek e | |
elif t.IsCanceled then ek (TaskCanceledException("Task wrapped with Async has been cancelled.")) | |
elif t.IsCompleted then k t.Result | |
else ek(Exception "invalid Task state!")) | |
|> ignore | |
static member AwaitTaskCorrect (task : Task) : Async<unit> = | |
Async.FromContinuations <| fun (k,ek,_) -> | |
task.ContinueWith (fun (t:Task) -> | |
if t.IsFaulted then | |
let e = t.Exception | |
if e.InnerExceptions.Count = 1 then ek e.InnerExceptions.[0] | |
else ek e | |
elif t.IsCanceled then ek (TaskCanceledException("Task wrapped with Async has been cancelled.")) | |
elif t.IsCompleted then k () | |
else ek(Exception "invalid Task state!")) | |
|> ignore | |
type Consumer(task, stop, name) = | |
interface System.IDisposable with member __.Dispose() = stop () | |
static member Start(name) = | |
let cts = new System.Threading.CancellationTokenSource() | |
let stop () = cts.Cancel() | |
let run = async { | |
while not cts.IsCancellationRequested do | |
do! Async.Sleep 100 | |
printfn "Stopped %s" name | |
} | |
new Consumer(Async.StartAsTask run, stop, name) | |
member __.Stop() = printfn "Stopping %s" name; stop () | |
member __.AwaitShutdown() = | |
// NOTE NOT Async.AwaitTask task, or we hang in the case of termination via `Stop()` | |
Async.AwaitTaskCorrect task | |
let example = async { | |
use c1 = Consumer.Start("c1") | |
use c2 = Consumer.Start("c2") | |
let! ct = Async.CancellationToken | |
use _ = ct.Register(fun () -> c1.Stop(); c2.Stop()) | |
printfn "Waiting..." | |
do! Async.Parallel [ async { failwith "Stop the world!" }; c1.AwaitShutdown(); c2.AwaitShutdown()] |> Async.Ignore<unit[]> | |
printfn "Finished" | |
} | |
[<AutoOpen>] | |
module ConsumerExt = | |
type Consumer with | |
member consumer.AwaitWithStopOnCancellation() = async { | |
let! ct = Async.CancellationToken | |
use _ = ct.Register(fun () -> consumer.Stop()) | |
return! consumer.AwaitShutdown() | |
} | |
let example2 = async { | |
use c1 = Consumer.Start("c1") | |
use c2 = Consumer.Start("c2") | |
printfn "Waiting..." | |
do! Async.Parallel [ async { failwith "Stop the world; kick the people off!" }; c1.AwaitWithStopOnCancellation(); c2.AwaitWithStopOnCancellation()] |> Async.Ignore<unit[]> | |
printfn "Finished" | |
} | |
Async.RunSynchronously example2 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment