Created
January 11, 2021 19:32
-
-
Save hackwaly/a79aba088e3ab3180dd4dee4ccc6126f to your computer and use it in GitHub Desktop.
OCaml saga powered by lwt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
type 'a t = { | |
f : unit -> 'a Lwt.t; | |
} | |
type 'a task = < | |
is_running : bool; | |
is_cancelled : bool; | |
result : 'a option; | |
error : (exn * Printexc.raw_backtrace) option; | |
done_ : unit Lwt.t; | |
cancel : unit -> unit; | |
> | |
type 'a saga_task = < | |
'a task; | |
_run : (unit -> 'a Lwt.t) -> unit; | |
_attach_child : generalized_saga_task -> unit; | |
> | |
and generalized_saga_task = Generalized : _ saga_task -> generalized_saga_task | |
exception Saga_error of exn * Printexc.raw_backtrace | |
let create f = { | |
f; | |
} | |
let current_saga_task_key = Lwt.new_key () | |
let join t = | |
t#done_ | |
let new_saga_task ?parent ?(detached=false) () : 'a saga_task = | |
let task = (object (self) | |
val parent : generalized_saga_task option = parent | |
val mutable child_tasks : generalized_saga_task list = [] | |
val mutable p : 'a Lwt.t = Obj.magic () | |
method is_running = | |
match Lwt.state p with | |
| Sleep -> true | |
| _ -> false | |
method is_cancelled = | |
match Lwt.state p with | |
| Fail Lwt.Canceled -> true | |
| _ -> false | |
method result = | |
match Lwt.state p with | |
| Return r -> Some r | |
| _ -> None | |
method error = | |
match Lwt.state p with | |
| Fail (Saga_error (exn, bt)) -> Some (exn, bt) | |
| _ -> None | |
method done_ = | |
match%lwt p with | |
| exception Saga_error (exn, bt) -> Printexc.raise_with_backtrace exn bt | |
| _ -> Lwt.return () | |
method cancel () = | |
Lwt.cancel p | |
method private _cancel_child_tasks () = | |
let cancel1 (Generalized t) = t#cancel () in | |
List.iter cancel1 child_tasks | |
method private _join_child_tasks () = | |
let join1 (Generalized t) = join t in | |
Lwt_list.iter_p join1 child_tasks | |
method _run (f : unit -> 'a Lwt.t) = | |
let p' = | |
Lwt.pause ();%lwt | |
try%lwt | |
let%lwt r = Lwt.with_value current_saga_task_key (Some (Generalized self)) f in | |
self#_join_child_tasks ();%lwt | |
Lwt.return r | |
with exn -> | |
raise_notrace (Saga_error (exn, Printexc.get_raw_backtrace ())) in | |
Lwt.on_failure p' (fun _ -> self#_cancel_child_tasks ()); | |
p <- p' | |
method _attach_child (c : generalized_saga_task) = | |
child_tasks <- c :: child_tasks | |
end) in | |
if not detached then ( | |
match parent with | |
| Some (Generalized parent) -> parent#_attach_child (Generalized task); | |
| None -> () | |
); | |
task | |
let run (saga : 'a t) : 'a task = | |
let task = (new_saga_task () : 'a saga_task) in | |
task#_run saga.f; | |
(task :> 'a task) | |
let fork (f : unit -> 'a Lwt.t) : 'a task = | |
match Lwt.get current_saga_task_key with | |
| Some parent -> | |
let task = new_saga_task ~parent () in | |
task#_run f; | |
(task :> 'a task) | |
| None -> assert false | |
let spawn (f : unit -> 'a Lwt.t) : 'a task = | |
match Lwt.get current_saga_task_key with | |
| Some parent -> | |
let task = new_saga_task ~parent ~detached:true () in | |
task#_run f; | |
(task :> 'a task) | |
| None -> assert false |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
type 'a t | |
type 'a task = < | |
is_running : bool; | |
is_cancelled : bool; | |
result : 'a option; | |
error : (exn * Printexc.raw_backtrace) option; | |
done_ : unit Lwt.t; | |
cancel : unit -> unit; | |
> | |
val create : (unit -> 'a Lwt.t) -> 'a t | |
val run : 'a t -> 'a task | |
val fork : (unit -> 'a Lwt.t) -> 'a task | |
val join : 'a task -> unit Lwt.t | |
val spawn : (unit -> 'a Lwt.t) -> 'a task |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment