Last active
January 15, 2016 23:40
-
-
Save isaacabraham/0e2ac661a5f31daf71a9 to your computer and use it in GitHub Desktop.
Mailbox Processor in Cloud Agent with automatic dead lettering and at-least-once processing.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
let CreateActor (ActorKey name) = | |
MailboxProcessor.Start(fun mailbox -> | |
let messageStore = GetMessageStore name | |
let rec loop data = | |
async { | |
// Wait asynchronously until we get a message + reply channel | |
let! message, replyWith = mailbox.Receive() | |
match message with | |
| Clear -> | |
messageStore.DeleteIfExists() | |
replyWith Completed // confirm processing! | |
return! loop { Count = 0; Messages = [] } | |
| Record message when data.Count < 5 -> | |
let updatedData = | |
{ data with | |
Count = data.Count + 1 | |
Messages = data.Messages @ [ message ] } | |
messageStore.SetData(updatedData) | |
replyWith Completed // confirm processing! | |
return! (loop updatedData) | |
| Record message -> | |
// bad data - max limit of messages is 5 | |
replyWith Abandoned | |
} | |
let data = defaultArg (messageStore.GetData()) { Count = 0; Messages = [] } | |
loop data) | |
// Change how we start listening to the pool as well - ResilientCloudAgent instead of BasicCloudAgent | |
let disposable = ConnectionFactory.StartListening(cloudConnection, CreateActor >> ResilientCloudAgent) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment