Skip to content

Instantly share code, notes, and snippets.

@PetrKaleta
Last active December 10, 2015 12:28
Show Gist options
  • Save PetrKaleta/4434069 to your computer and use it in GitHub Desktop.
Save PetrKaleta/4434069 to your computer and use it in GitHub Desktop.
Custom Typhoeus based QC Worker Note: I've never tested this code, its just an example
require 'queue_classic'
require 'typhoeus'
trap('INT') { exit }
trap('TERM') { worker.stop }
# Generic consumer
class TyphoeusConcurrentJob
def initialize(opts={})
@opts = opts
end
def perform(response)
# make your own
end
def request
req = Typhoeus::Request.new @opts['req_url'], @opts['req_opts']
# once request has completed, call consumer
req.on_complete do |response|
perform(response)
end
req
end
end
# Simple consumer class which is doing actual job with response
class Consumer < TyphoeusConcurrentJob
def perform(response)
puts response.body
puts @opts.inspect
end
end
# Enqueing new job
job_args = {
'req_url' => 'http://foo.bar', # request url
'req_opts' => {}, # aditional request options
'foo' => 'bar' # another options...
}
QC.enqueue('Consumer', job_args)
# Custom QC worker
class TyphoeusConcurrentWorker < QC::Worker
def initialize(args={})
super args
@max_concurrency = args[:max_concurrency]
@hydra = Typhoeus::Hydra.new(:max_concurrency => @max_concurrency)
end
def start
while @running
@force_execution = false
# non-blocking operation, so fill the request queue
@max_concurrency.times do
work
# break the loop and execute hydra queue
break if @force_execution
end
# this is a blocking call that returns once all requests are complete
@hydra.run
end
end
def call(job)
obj = eval(job[:method]).new(job[:args])
return unless obj.kind_of?(TyphoeusConcurrentJob)
# enqueue new request
@hydra.queue(obj.request)
end
def lock_job
attempts = 0
job = nil
# idea is to disable waiting for next job if there are some
# requests already waiting in hydra queue
# so in other words, hydra queue must be executed immediately
should_wait = @hydra.queued_requests.size == 0
until !@running || job
job = @queue.lock(@top_bound)
if job.nil? && should_wait && attempts < @max_attempts
wait(2**attempts)
attempts += 1
next
else
break
end
end
# there are some requests in hydra queue but no other job
# is waiting in QC queue
@force_execution = !should_wait && job.nil?
job
end
end
# Init worker
worker = TyphoeusConcurrentWorker.new(:max_attempts => 10,
:max_concurrency => 10,
:q_name => 'typhoeus',
:listening_worker => true)
# Start worker
worker.start
@PetrKaleta
Copy link
Author

The only question is, if job should be removed from QC queue once its request is queued in hydra queue or once Consumer is performed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment