Skip to content

Instantly share code, notes, and snippets.

@yanowitz
Last active April 21, 2016 00:27
Show Gist options
  • Save yanowitz/011b1b78dfdae593c51454b67465d800 to your computer and use it in GitHub Desktop.
Save yanowitz/011b1b78dfdae593c51454b67465d800 to your computer and use it in GitHub Desktop.
defer sidekiq job enqueing until after activerecord transactions commit (config/initializers/sidekiq.rb). WARNING: this changes the semantics of sidekiq. Understand that before using :)
### WARNING -- On Sidekiq upgrade, make sure Sidekiq::Client still has this push interface
## note -- we also don't support Sidekiq.redis_pool'ing here -- which only matters if you are
## sharding Sidekiq jobs across several redis instances, which is not recommended...
# Also note -- Sidekiq test mode does an end-run around all this code...
module Sidekiq
class Client
def self.run_deferred_jobs
jobs = Thread.current[:sidekiq_deferred_jobs]
if jobs
client = self.new
jobs.each { |j| client.send(:raw_push_without_deferred_in_transactions,j) }
Thread.current[:sidekiq_deferred_jobs] = nil
end
end
def self.clear_deferred_jobs
jobs = Thread.current[:sidekiq_deferred_jobs]
if jobs
Rails.logger.info([self.name, "transaction rollback, ditching jobs enqueued during transaction", jobs: jobs])
end
ensure
Thread.current[:sidekiq_deferred_jobs] = nil
end
private
def raw_push_with_deferred_in_transactions(payloads)
if ActiveRecord::Base.connection.open_transactions == 0
raw_push_without_deferred_in_transactions(payloads)
else
Thread.current[:sidekiq_deferred_jobs] ||= []
Thread.current[:sidekiq_deferred_jobs] << payloads
end
end
alias_method_chain :raw_push, :deferred_in_transactions
end
end
ActiveRecord::Base.after_commit { Sidekiq::Client.run_deferred_jobs }
ActiveRecord::Base.after_rollback { Sidekiq::Client.clear_deferred_jobs }
# if there's no database records that change inside a transaction, callbacks don't run, so we
# need this fail-safe
# mucking with this aliasing in test mode really really messes things up.
unless Rails.env.test?
module ActiveRecord
class Base
def self.transaction_with_sidekiq_job_drain(options = {}, &block)
result = transaction_without_sidekiq_job_drain(options, &block)
if ActiveRecord::Base.connection.open_transactions == 0
Sidekiq::Client.run_deferred_jobs
end
rescue Exception => e
Sidekiq::Client.clear_deferred_jobs
raise
ensure
result
end
class << self
alias_method_chain :transaction, :sidekiq_job_drain
end
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment