Skip to content

Instantly share code, notes, and snippets.

@PetrKaleta
Last active December 10, 2015 15:08
Show Gist options
  • Save PetrKaleta/4452196 to your computer and use it in GitHub Desktop.
Save PetrKaleta/4452196 to your computer and use it in GitHub Desktop.
This code demonstrates how you can create queue_classic worker which will do concurrent requests to 3rd party APIs.
# This code is demonstrating a default queue_classic worker.
# The biggest issue is, that requests made by its jobs are blocking
require 'rubygems'
require 'queue_classic'
require 'typhoeus'
trap('INT') { exit }
trap('TERM') { worker.stop }
class Consumer
@@finished_jobs_count = 0
def self.perform(opts={})
req_opts = opts['request']
response = Typhoeus::Request.get req_opts['url'],
:headers => req_opts['headers'],
:params => req_opts['params'],
:body => req_opts['body']
#puts "HTTP#{response.code}"
# just for metrics purposes
# remove this code once you don't need it
@@finished_jobs_count += 1
if @@finished_jobs_count == JOBS_COUNT
total_time = ((Time.now.to_f - START_TIME) * 1000.0).to_i
puts "#{JOBS_COUNT} jobs finished in #{total_time}ms"
end
# end of metrics code
end
end
# Test case
JOBS_COUNT = 100
JOBS_COUNT.times do |i|
job_args = {
'request' => {
'url' => 'http://search.twitter.com/search.json',
'method' => 'get',
'params' => {
'q' => i,
'rpp' => 15
}
}
}
QC.enqueue('Consumer.perform', job_args)
end
# Init worker
worker = QC::Worker.new(:max_attempts => 10,
:listening_worker => true)
# Start worker
START_TIME = Time.now.to_f
worker.start
# I've made 3 tests:
# 100 jobs finished in 56044ms
# 100 jobs finished in 53604ms
# 100 jobs finished in 52626ms
# This code is my custom worker which can do concurrent requests
require 'rubygems'
require 'queue_classic'
require 'typhoeus'
trap('INT') { exit }
trap('TERM') { worker.stop }
# Generic consumer
# Subclass it to create your own job
class TyphoeusConcurrentJob
# To enqueue subclass of this job just use:
#
# QC.enqueue('MySubclass.perform', '{"req_url" => "http://foo.bar", "req_opts" => {}}')
#
# # or you without method specified, which will automatically call perform method
# QC.enqueue('MySubclass', '{"req_url" => "http://foo.bar", "req_opts" => {}}')
#
def self.perform(response, opts={})
# subclass it and define your own
end
end
# Simple consumer class which is doing actual job with response
class Consumer < TyphoeusConcurrentJob
def self.perform(response, opts={})
#puts "HTTP#{response.code}"
end
end
# Custom QC worker
class TyphoeusConcurrentWorker < QC::Worker
def initialize(args={})
super args
@max_concurrency = args[:max_concurrency]
@hydra = Typhoeus::Hydra.new(:max_concurrency => @max_concurrency)
end
def start
# just for metrics
# remove this code once you don't need it
@start_time = Time.now.to_f
@finished_jobs_count = 0
# end of metrics code
while @running
@force_execution = false
# non-blocking operation, so fill the request queue
@max_concurrency.times do
work
# break the loop and execute hydra queue
break if @force_execution
end
# this is a blocking call that returns once all requests are complete
@hydra.run
end
end
def work
if job = lock_job
begin
call(job)
rescue => e
handle_failure(job, e)
end
end
end
def call(job)
spec = job[:method].split('.')
klass = eval(spec.first)
# use specified message or use default one if no specified
message = spec.size > 1 ? spec.last : 'perform'
args = job[:args][0]
raise('Invalid job! TyphoeusConcurrentJob subclass expected!') unless klass < TyphoeusConcurrentJob
req_opts = args['request']
request = Typhoeus::Request.new req_opts['url'],
:method => req_opts['method'].to_sym,
:headers => req_opts['headers'],
:params => req_opts['params'],
:body => req_opts['body']
# async callback
# thats why its resceued once again
# job is deleted once everything is done
request.on_complete do |response|
begin
klass.send(message, response, args)
rescue => e
handle_failure(job, e)
ensure
@queue.delete(job[:id])
# just for metrics purposes
# remove this code once you don't need it
@finished_jobs_count += 1
if @finished_jobs_count == JOBS_COUNT
total_time = ((Time.now.to_f - @start_time) * 1000.0).to_i
puts "#{JOBS_COUNT} jobs finished in #{total_time}ms with concurrency set to #{@max_concurrency}"
end
# end of metrics code
end
end
# enqueue new request
@hydra.queue(request)
end
def lock_job
attempts = 0
job = nil
# idea is to disable waiting for next job if there are some
# requests already waiting in hydra queue
# so in other words, hydra queue must be executed immediately
should_wait = @hydra.queued_requests.size == 0
until !@running || job
job = @queue.lock(@top_bound)
if job.nil? && should_wait && attempts < @max_attempts
wait(2**attempts)
attempts += 1
next
else
break
end
end
# there are some requests in hydra queue but no other job
# is waiting in QC queue
@force_execution = !should_wait && job.nil?
job
end
def handle_failure(job,e)
puts e
end
end
# Test case
MAX_CONCURRENCY = 100
JOBS_COUNT = 100
JOBS_COUNT.times do |i|
job_args = {
'request' => {
'url' => 'http://search.twitter.com/search.json',
'method' => 'get',
'params' => {
'q' => i,
'rpp' => 15
}
}
}
QC.enqueue('Consumer.perform', job_args)
end
# Init worker
worker = TyphoeusConcurrentWorker.new(:max_attempts => 10,
:max_concurrency => MAX_CONCURRENCY,
:listening_worker => true)
# Start worker
worker.start
# I've made 3 tests which concurrencies set to 10, 50 and 100:
# 100 jobs finished in 10304ms with concurrency set to 10
# 100 jobs finished in 13701ms with concurrency set to 10
# 100 jobs finished in 11708ms with concurrency set to 10
# 100 jobs finished in 5408ms with concurrency set to 50
# 100 jobs finished in 5370ms with concurrency set to 50
# 100 jobs finished in 5155ms with concurrency set to 50
# 100 jobs finished in 3767ms with concurrency set to 100
# 100 jobs finished in 3940ms with concurrency set to 100
# 100 jobs finished in 3708ms with concurrency set to 100
# as you can see, the difference is HUGE! Enjoy!
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment