Last active
August 29, 2015 14:03
-
-
Save eulerfx/3e7ced2423d3297c4e8d to your computer and use it in GitHub Desktop.
F# EventStore API
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
type Stream = string | |
type EventType = string | |
type ExpectedVersion = int | |
type EventData = byte[] | |
type EventMetadata = byte[] | |
type ResolveLinks = bool | |
type From = int | |
type BatchSize = int | |
type BufferSize = int | |
type CheckpointStore = (unit -> Async<int option>) * (int -> Async<unit>) | |
module EventStore = | |
val conn : IPEndPoint -> IEventStoreConnection | |
val connHost : string -> IEventStoreConnection | |
val append : IEventStoreConnection -> Stream -> EventType -> ExpectedVersion -> EventData -> EventMetadata -> Async<unit> | |
val appendAll : IEventStoreConnection -> Stream -> ExpectedVersion -> seq<EventData> -> Async<unit> | |
val readOne : IEventStoreConnection -> Stream -> From -> ResolveLinks -> Async<ResolvedEvent option> | |
val readAll : IEventStoreConnection -> Stream -> ResolveLinks -> From -> BatchSize -> AsyncSeq<StreamEventsSlice> | |
val observe : IEventStoreConnection -> Stream -> ResolveLinks -> IObservable<ResolvedEvent> | |
val subscribeFrom : IEventStoreConnection -> Stream -> ResolveLinks -> BatchSize -> BufferSize -> From -> AsyncSeq<ResolvedEvent> | |
val subscribeFromBuffered : IEventStoreConnection -> Stream -> ResolveLinks -> BatchSize -> BufferSize -> TimeSpan -> From -> AsyncSeq<ResolvedEvent[]> | |
val subscribeWithCheckpointsBuffered : IEventStoreConnection -> Stream -> ResolveLinks -> BatchSize -> BufferSize -> TimeSpan -> CheckpointStore -> AsyncSeq<ResolvedEvent[]> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module AsyncSeq = | |
/// Iterates the async sequence and starts an action for each element in parallel. | |
let iterAsyncThrottled (parallelism:int) (f:'a -> Async<unit>) (s:AsyncSeq<'a>) : Async<unit> = async { | |
let cde = new CountdownEvent(1) | |
let sem = new SemaphoreSlim(parallelism) | |
let inline release() = | |
sem.Release() |> ignore | |
cde.Signal() |> ignore | |
let inline cont() = release() | |
let inline exCont ex = release() | |
let inline cnCont ex = release() | |
try | |
s |> toBlockingSeq |> Seq.iter (fun item -> | |
sem.Wait() | |
cde.AddCount(1) | |
Async.StartWithContinuations(f item, cont, exCont, cnCont) | |
) | |
cde.Signal() |> ignore | |
cde.Wait() | |
finally | |
cde.Dispose() | |
sem.Dispose() } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment