Created
June 20, 2011 20:58
-
-
Save kingcu/1036565 to your computer and use it in GitHub Desktop.
Mailinator - email processing server, listens for emails pipes over socket, enqueues email to Resque to be processed
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'logger' | |
require 'socket' | |
require 'tempfile' | |
require 'rubygems' | |
require 'bundler/setup' #so we can require gems in the Gemfile | |
require 'resque' #in Gemfile, allowed to require by bundler/setup | |
require 'app/workers/email_worker' | |
@num_workers = 2 | |
@sleep_time = 10 | |
@logger = Logger.new("log/email_processor_server.log") | |
#file = ARGV[0] | |
#File.unlink(file) if File.exists?(file) && File.socket?(file) | |
TIMEOUT = 30 #seconds | |
WORKERS = {} | |
#SERVERS = [UNIXServer.new(file)] | |
SERVERS = [TCPServer.new(ARGV[0], ARGV[1])] | |
SELF_PIPE = [] #will be used in future for signal handling between master/children | |
#TERM/INT immediately and uncleanly shutdown | |
#QUIT gracefully shutsdown and HUP will eventually reload config | |
ACCEPTED_SIGS = [:QUIT, :HUP, :TERM, :INT] | |
#pile up signals to be handled | |
SIG_QUEUE = [] | |
#renames processes - not sure what i want to do with this yet | |
def procname name | |
$0 = name | |
end | |
def init_self_pipe | |
SELF_PIPE.each { |io| io.close rescue nil } | |
SELF_PIPE.replace(IO.pipe) | |
SELF_PIPE.each { |io| io.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) } | |
end | |
#called from the master loop - we don't want to spin in a loop | |
#and eatup resources, so we use select. The master only wakes | |
#up when something writes to the end of the pipe (awake_master()) | |
def master_sleep time | |
#SELF_PIPE[0] is the read pipe | |
IO.select([SELF_PIPE[0]], nil, nil, time) or return | |
SELF_PIPE[0].readpartial(1) | |
end | |
#write something to the pipe to trigger the select on the read end | |
#to wakeup the master process | |
def wake_master | |
SELF_PIPE[1].write('.') | |
@logger.debug("WOKE MASTER, SIG_QUEUE.size: #{SIG_QUEUE.size}") | |
end | |
def master_loop | |
procname("mailinator master") | |
begin | |
#kill any children who are zombied | |
reap_all_workers() | |
case SIG_QUEUE.shift | |
when nil | |
#spawn new workers if needed | |
spawn_workers() | |
#go back to sleep | |
master_sleep(@sleep_time) | |
when :QUIT | |
break #end loop, which will return/end program | |
when :TERM, :INT | |
stop(false) | |
break | |
when :HUP #reload configs when/if i go this route | |
end | |
rescue => e | |
WORKERS.each_pair { |wpid, worker| kill_worker(:QUIT, wpid) } | |
@logger.error("rescued exception in master loop: #{e}") | |
#error handling...kill children since master fucked up | |
end while true | |
stop() | |
end | |
def stop(graceful=true) | |
timeout = Time.now + 30 | |
while WORKERS.size > 0 || Time.now > timeout | |
kill_all_workers(graceful ? :QUIT : :TERM) | |
sleep(0.1) #don't slam the CPU for thirty seconds! | |
reap_all_workers() | |
end | |
#if timeout is exceeded and there are still workers | |
#this will kill them with a quickness | |
kill_all_workers(:KILL) | |
end | |
def reap_all_workers | |
begin | |
#wait for child to exit (zombie): -1 for any child, WNOHANG saying do not block if no child | |
wpid, status = Process.waitpid2(-1, Process::WNOHANG) | |
return unless wpid #don't hang if no wpid | |
worker = WORKERS.delete(wpid) and worker[:tmp].close rescue nil | |
#TODO: logging - status.success? happy, else error | |
rescue Errno::ECHILD #could be no child, so break | |
break | |
end while true | |
end | |
def kill_all_workers(sig) | |
WORKERS.keys.each { |wpid| kill_worker(sig, wpid) } | |
end | |
def kill_worker(signal, wpid) | |
@logger.info("Killing worker with pid: #{wpid}") | |
Process.kill(signal, wpid) | |
worker = WORKERS.delete(wpid) and worker[:tmp].close rescue nil | |
end | |
#Actually does work when given data through the socket | |
#Workers call this when something is available. We pass | |
#off to resque since that is aware of our rails app, and | |
#we don't want to tie up workers here. | |
def handle_data(res) | |
sock, sockaddr = res | |
data = sock.read | |
@logger.debug("Accepted email: #{data}") | |
Resque.enqueue(EmailWorker, data) | |
end | |
#borrowed heavily from unicorn's excellent philosophy. each worker | |
#keeps a tempfile and each loop changes permissions using chmod, this | |
#way we can check the ctime of each tempfile in the workers to see | |
#if a particular worker is hung. Each worker should be quick to handle | |
#a request. | |
def worker_loop worker | |
flipper = 0 #flips between 0/1 | |
alive = true | |
#ugly place to put these traps, but we need the TERM/INT to be in the loop | |
#so we can exit out of the loop by setting alive=false | |
trap(:QUIT) { alive = false; SERVERS.each { |s| s.close rescue nil } } | |
[:TERM, :INT].each { |sig| trap(sig) { exit!(0) } } | |
trap(:CHLD, 'DEFAULT') | |
begin | |
worker[:tmp].chmod(flipper = 0 == flipper ? 1 : 0) | |
SERVERS.each do |server| | |
data = server.accept | |
handle_data(data) | |
end | |
#do it before and after, so we get a more resolute idea of actual | |
#time worker is doing something | |
worker[:tmp].chmod(flipper = 0 == flipper ? 1 : 0) | |
rescue Errno::EAGAIN, Errno::ECONNABORTED, Errno::EPROTO, Errno::EINTR | |
#timeout to detect parent death or activity | |
IO.select(SERVERS, nil, SELF_PIPE, TIMEOUT) | |
retry | |
end while alive | |
end | |
def spawn_workers | |
@num_workers.times do |i| | |
#don't spawn if already have a worker of that number. | |
#we might be increasing worker count during load | |
break if @num_workers == WORKERS.length | |
worker = {:tmp => Tempfile.new("worker[#{i}]"), :num => i} | |
WORKERS[fork { | |
procname("mailinator worker[#{i}]") | |
#disconnect child processes from signals | |
ACCEPTED_SIGS.each { |sig| trap(sig, nil) } | |
SIG_QUEUE.clear() | |
#init_self_pipe() #replaces the pipe | |
worker_loop(worker) | |
}] = worker | |
end | |
end | |
ACCEPTED_SIGS.each { |sig| trap(sig) { SIG_QUEUE << sig; wake_master() } } | |
#trap signals from the child - just wakening the master so it can continue | |
#its loop, which will reap any child processes that are now zombied. The | |
#master will then respawn a new child process if needed to maintain | |
#the worker count | |
trap(:CHLD) { wake_master() } | |
init_self_pipe() | |
spawn_workers() | |
master_loop() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment