Last active
August 29, 2015 14:00
-
-
Save aclemmensen/11272381 to your computer and use it in GitHub Desktop.
Clojure: Fetch a lists of URLs in parallel with core.async. Is there a better way?
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns plinks.core | |
(:require [clojure.core.async :as async] | |
[clj-http.client :as client]) | |
(:gen-class)) | |
; First attempt using only the most primitive async operators | |
(defn | |
ptry-links | |
"Fetch a list of URLs in parallel through core.async channels" | |
[links threads] | |
(let [input (async/chan) | |
output (async/chan) | |
result (async/chan)] | |
; Load links into input channel. Close the channel when done loading. | |
(async/go | |
(doseq [link links] | |
(async/>! input link)) | |
(async/close! input)) | |
; Start [threads] loops that fetch links. Results are pushed to output | |
; channel. When input closes, we close the output too. | |
(dotimes [i threads] | |
(async/go-loop | |
[link (async/<! input)] | |
(when-not (nil? link) | |
(try | |
(client/get link) | |
(println link "OK") | |
(async/>! output {:link link | |
:status :ok}) | |
(catch Exception e | |
(println link "BAD") | |
(async/>! output {:link link | |
:status :bad | |
:message (. e getMessage)})) ) | |
(recur (async/<! input))) | |
(async/close! output))) | |
; Start thread to pick up data from output channel. Builds list of received statuses. | |
(async/go-loop | |
[status (async/<! output) | |
statuses []] | |
(when-not (nil? status) | |
(recur | |
(async/<! output) | |
(conj statuses status))) | |
(async/>! result statuses)) | |
; Block until something is pushed on the result channel | |
(async/<!! result))) | |
; Second attempt using some higher-order async ops | |
(defn | |
pprocess-links | |
"Do the same but with async library functions" | |
([f links & {:keys [threads timeout] | |
:or {threads 8 | |
timeout 60000}}] | |
(let [input (async/to-chan links) | |
output (async/merge (repeat threads (async/map< f input))) | |
result (async/reduce #(conj %1 %2) [] output) | |
fail (async/timeout timeout)] | |
(async/alt!! | |
result ([v c] v) ; We got a result | |
fail (println "Timed out waiting for" timeout "msec"))))) | |
; Create a partially applied function that checks whether links are OK | |
(def trylinks (partial | |
pprocess-links | |
#(try | |
(client/get %) | |
(println % "OK") | |
{:link % :status :ok} | |
(catch Exception e | |
(println % "BAD") | |
{:link % :status :bad :message (. e getMessage)})))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment