Last active
April 13, 2018 07:26
-
-
Save moteus/645497f1133ed683a85f6c6c083f1279 to your computer and use it in GitHub Desktop.
Run multiple curl requests from coroutines simultaneously
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
local cURL = require "cURL.safe" | |
local json = require "cjson.safe" | |
------------------------------------------------------------------- | |
local MultiRequests = {} do | |
MultiRequests.__index = MultiRequests | |
function MultiRequests.new(...) | |
local self = setmetatable({}, MultiRequests) | |
return self:__init(...) | |
end | |
function MultiRequests:__init() | |
self._workers = {} | |
self._multi = cURL.multi() | |
self._responses = {} | |
self._handels = {} | |
self._remain = 0 | |
self._error_handlers = {} | |
return self | |
end | |
function MultiRequests:add_worker(fn, errf) | |
local co = coroutine.create(fn) | |
table.insert(self._workers, co) | |
self._error_handlers[co] = errf | |
return self | |
end | |
local function append_request(self, easy, co) | |
local ok, err = self._multi:add_handle(easy) | |
if not ok then | |
return nil, err | |
end | |
self._remain = self._remain + 1 | |
local response = {_co = co, content = {}} | |
self._responses[easy] = response | |
easy:setopt_writefunction(table.insert, response.content) | |
return self | |
end | |
local function remove_worker(self, co) | |
for i, worker in ipairs(self._workers) do | |
if worker == co then | |
table.remove(self._workers, i) | |
self._error_handlers[worker] = nil | |
break | |
end | |
end | |
end | |
local function proceed_next(self, co, response) | |
local ok, easy = coroutine.resume(co, response) | |
if ok and easy then | |
return append_request(self, easy, co) | |
end | |
local errf = self._error_handlers[co] | |
remove_worker(self, co) | |
if easy and not ok then | |
if errf then errf(easy) end | |
end | |
return ok, easy | |
end | |
local function proceed_response(self, easy, response) | |
local co = response._co | |
self._responses[easy] = nil | |
self._remain = self._remain - 1 | |
response._co = nil | |
response.url = easy:getinfo_effective_url() | |
response.code = easy:getinfo_response_code() | |
response.content = table.concat(response.content) | |
proceed_next(self, co, response) | |
end | |
function MultiRequests:send_request(request) | |
local easy = table.remove(self._handels) or cURL.easy() | |
easy:reset() | |
local body | |
if request.body then | |
if type(request.body) == 'string' then | |
body = request.body | |
else | |
local err | |
body, err = json.encode(request.body) | |
if not body then | |
return nil, err | |
end | |
end | |
end | |
local headers = request.headers | |
if headers and #headers == 0 then | |
headers = nil | |
end | |
local ok, err = easy:setopt{ | |
url = request.url, | |
timeout = request.timeout, | |
followlocation = request.followlocation, | |
post = request.post, | |
httpheader = headers, | |
postfields = body, | |
} | |
if not ok then | |
return nil, err | |
end | |
local response = coroutine.yield(easy) | |
table.insert(self._handels, easy) | |
if response.error then | |
return nil, response.error | |
end | |
return response | |
end | |
function MultiRequests:run() | |
while #self._workers > 0 do | |
for i = #self._workers, 1, -1 do | |
local co = self._workers[i] | |
proceed_next(self, co, self) | |
end | |
while self._remain > 0 do | |
local last = self._multi:perform() -- do some work | |
if last < self._remain then -- we have done some tasks | |
while true do -- proceed results/errors | |
local easy, ok, err = self._multi:info_read(true) -- get result and remove handle | |
if easy == 0 then break end -- no more data avaliable for now | |
local response = self._responses[easy] | |
response.error = err | |
proceed_response(self, easy, response) | |
end | |
end | |
self._multi:wait() -- wait while libcurl do io select | |
end | |
end | |
end | |
end | |
------------------------------------------------------------------- | |
local mrequest = MultiRequests.new() | |
local urls = { | |
"http://httpbin.org/get?key=1", | |
"http://httpbin.org/get?key=2", | |
"http://httpbin.org/get?key=3", | |
"http://httpbin.org/get?key=4", | |
} | |
for i = 1, 2 do | |
mrequest:add_worker(function(requester) | |
for j, url in ipairs(urls) do | |
local response, err = requester:send_request{url = url} | |
local prefix = string.format("[%d/%d]", i, j) | |
if response then | |
print(prefix, response.code, response.content) | |
else | |
print(prefix, err) | |
end | |
end | |
end, function(err) | |
print('[ERROR]', err) | |
end) | |
end | |
mrequest:run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment