Skip to content

Instantly share code, notes, and snippets.

@jplaza
Last active November 17, 2015 18:59
Show Gist options
  • Save jplaza/5e7b6cb435b3f4d3495f to your computer and use it in GitHub Desktop.
Save jplaza/5e7b6cb435b3f4d3495f to your computer and use it in GitHub Desktop.
Minimal implementation of an SQS long polling daemon written in Clojure
(ns co.datil.sqsd
"Polls SQS periodically for messages."
(:require [amazonica.aws.simpleworkflow :as swf]
[amazonica.aws.sqs :as sqs]
[clojure.tools.logging :as log]
[environ.core :refer [env]]))
(defn process-msg
"Task: body of the message placed in the queue"
[queue-url msg]
(log/info ("Processing message: " msg))
;; Process incoming message in new thread for increased throughput
(future
;;(some-heavy-processing-fn)
)
(defn get-queue []
(-> (sqs/create-queue
{:endpoint "us-west-2"}
:queue-name (env :worker-queue-name)
:attributes
{:VisibilityTimeout 30 ; sec
:MaximumMessageSize 65536 ; bytes
:MessageRetentionPeriod 1209600 ; sec
:ReceiveMessageWaitTimeSeconds 10})
:queue-url))
(defn get-queued-tasks
"Gets tasks queued for the worker to complete."
[queue-url]
(let [resp (sqs/receive-message :queue-url queue-url
:wait-time-seconds 8
:max-number-of-messages 10
:delete true
:attribute-names ["All"])]
(log/info (format "Received %d" (count (:messages resp))))
(dorun (map #(process-msg queue-url (:body %))
(:messages resp)))))
(defn -main
"Starts the long polling SQS daemon to look for new tasks to start."
[& args]
(log/info "Polling for queued tasks")
(while true
(get-queued-tasks (get-queue))
(Thread/sleep 20000)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment