Redis is my favourite key/value store; it’s flexible, easy to set up and insanely fast. EventMachine is a popular Ruby library for doing asynchronous I/O using an event loop. Bindings already exist for accessing a Redis server using EM’s async-I/O (courtesy of Jonathan Broad), but unfortunately the resulting code has to use Continuation-Passing Style via Ruby blocks. A very basic example of what that looks like follows:
require 'em-redis'
EventMachine.run do
redis = EventMachine::Protocols::Redis.connect
redis.set("foo", "bar") do |_|
redis.get("foo") do |response|
puts response
end
end
end
Essentially, every single Redis operation incurs another level of indentation. Aside from being aesthetically very unpleasing, there are certain limitations this brings with it, due to the fact that most Ruby code is written in a non-CPS style, and written to work with non-CPS programs.
I set about trying to sidestep this problem, using a relatively old programming
technique. call-with-current-continuation
, commonly abbreviated as
call/cc
, is a construct first invented in Scheme which allows you to use a CPS
function seamlessly from within non-CPS code. In RSpec, call/cc
would pass the
following test:
def myfunc
yield 3
end
callcc(method :myfunc).should == 3
So in a Ruby-specific sense, it transforms a yield
statement into return
.
Here, callcc()
takes a proc (the object representation of a block), and any
number of arguments, and returns whatever calling the block with those arguments
would itself yield. It certainly is a mouthful, but the code explains it better
than English can.
Applied to Redis + EventMachine, we should be able to translate the earlier fubarity into something a little cleaner:
require 'em-redis'
EventMachine.run do
redis = EventMachine::Protocols::Redis.connect
callcc(redis.method(:set), "foo", "bar")
puts callcc(redis.method(:get), "foo")
end
So it’s a little cleaner, but not perfect. We don’t want to have to wrap every
Redis operation with a callcc()
invocation. So a better idea is to
monkey-patch the Redis
EventMachine protocol to use callcc
at a lower level:
module EventMachine::Protocols::Redis
alias :cps_inline_command :inline_command
def inline_command(*args)
callcc(method(:cps_inline_command), *args)
end
alias :cps_multiline_command :multiline_command
def multiline_command(command, *args)
callcc(method(:cps_multiline_command), command, *args)
end
end
The EventMachine::Protocols::Redis
protocol carries out most operations using
these methods, so by overriding them everything else should implicitly start
using callcc()
.
Now the example code looks like this:
require 'em-redis'
EventMachine.run do
redis = EventMachine::Protocols::Redis.connect
redis.set("foo", "bar")
puts redis.get("foo")
end
That’s much more like it.
Now for the hard part. callcc()
is not trivial to implement in a language
without first-class built-in continuations. Ruby 1.9 includes Fibers, a
coroutine implementation which does make it possible to write callcc()
. Anyone
familiar with Lua’s coroutines will feel perfectly at home with Fibers, but for
the Rubyists who haven’t encountered them, I’ll begin with a quick intro.
We all know and love Ruby’s yield
and blocks. But unfortunately, a block
passed to a method will not be available implicitly further down the stack. The
problem is demonstrated here:
def a(&block)
b(&block)
end
def b(&block)
c(&block)
end
def c
puts "ready to receive"
val = yield
puts "received: #{ val.inspect }"
end
a { "some_value" }
In order for c
to be able to call the block, it must be explicitly passed down
by each function in the stack. With Fibers, the code becomes:
def a
b
end
def b
c
end
def c
puts "ready to receive"
val = Fiber.yield
puts "received: #{ val.inspect }"
end
f = Fiber.new &(method :a)
f.resume # get the fiber to the first `Fiber.yield` call.
f.resume "some_value"
A few things to note here:
-
c()
callsFiber.yield
instead ofyield
. This is a non-localyield
which will propagate up to whatever called the current fiber’sresume()
method. -
The code which does the calling becomes a little more complex, but none of the application code in
a()
orb()
need be aware thatc()
will yield. Nevertheless, the pattern shown here can be abstracted away, so that eventually we may only need:fiblock(method :a) { "some_value" }
-
We must explicitly run
a()
in a Fiber; this is a trade-off against having to explicitly acknowledge the presence of a block throughout the stack (as in the previous code).
call/cc
can be implemented relatively simply with Fibers, but there are two
distinct forms:
def callcc_inner(proc, *args)
Fiber.new do
proc.call(*args) { |*yargs| Fiber.yield(*yargs) }
end.resume
end
def callcc_outer(proc, *args)
curr_fiber = Fiber.current
proc.call(*args) { |*rargs| curr_fiber.resume(*rargs) }
Fiber.yield
end
The difference between the two is subtle, but very important:
-
callcc_inner()
creates a new Fiber in which the proc is run. The proc is passed a block which callsFiber.yield
, causing the wrapping Fiber to immediately pass control back to whatever started it. Since this fiber is resumed immediately after creation, we get whatever was yielded by the proc. -
callcc_outer()
operates almost exactly opposite. The proc is called with a block that will immediately resume the current Fiber. ThenFiber.yield()
is called from the top-level. This suspends the current Fiber until proc eventually calls the block with some values; these values are output as the result of the call toFiber.yield()
, and therefore the result of the call tocallcc_outer()
.
Because of their different behaviours, callcc_inner()
will only work when
the proc it is called with yields during its execution (i.e. yields before it
returns). Conversely, callcc_outer()
will only work when the proc it is
called with returns before it yields.
To revisit the CPS employed in the Redis bindings:
redis.set("foo", "bar") do |_|
redis.get("foo") do |value|
puts value
end
end
The call to redis.set()
returns before it executes the given block (since it
adds the block to a list of event loop callbacks); as such, we should use
callcc_outer()
throughout the modified EventMachine::Protocols::Redis
. The
entirety of the code needed to patch em-redis
is:
# File: redis-fiber.rb
require 'fiber'
require 'em-redis'
def callcc(proc, *args)
curr_fiber = Fiber.current
proc.call(*args, &curr_fiber.method(:resume))
return Fiber.yield
end
module EventMachine
def self.run_fiber(*args, &blk)
f = Fiber.new(&blk)
run(*args) { f.resume }
end
module Protocols::RedisFiber
include Protocols::Redis
# Classmethods don't get mixed in.
def self.connect(host = 'localhost', port = 6379)
EventMachine.connect(host, port, self, host, port)
end
alias :cps_inline_command :inline_command
def inline_command(*args)
callcc((method :cps_inline_command), *args)
end
alias :cps_multiline_command :multiline_command
def multiline_command(command, *args)
callcc((method :cps_multiline_command), command, *args)
end
end
end
And the code I use to test it in a very quick-n-dirty way:
# file: redis-fiber-test.rb
require 'redis-fiber'
EventMachine::run_fiber do
redis = EventMachine::Protocols::RedisFiber::connect
puts "set foo = \"bar\""
redis.set("foo", "bar")
print "foo => "
puts redis.get("foo").inspect
EventMachine::stop_event_loop
end
The additional EventMachine::run_fiber()
method is required for
callcc_outer()
to work; since it uses Fiber.yield()
from the top-level, and
the root Fiber cannot yield (since there’s nothing for it to yield to), we need
to run the event loop within its own Fiber. I’ve also created a new RedisFiber
protocol, instead of patching the old one, in case existing code uses the old
one.
The thing to take away from this blog post is:
- Fibers can be used to implement
call/cc
. call/cc
allows you to use EventMachine async-I/O, without the CPS.- It’s pretty easy to monkey-patch existing libraries to use
call/cc
.
I’m very interested in building on what I’ve done here, but due to time constraints I might not be able to put much work in right now. If anyone out there chooses to do so, please let me know.
Thank you for your extremely useful explanation -- it helped me understand why my code wasn't working the way I expected it to.
I'm trying to use your sample code, but am running into compile time problems -- "undefined method
inline_command' for module
EventMachine::Protocols::RedisFiber'"I'm trying to find the reference for the inline_command function inside EventMachine, EM:Protocol or EM:P:Redis that this seems to be overloading, but I can't find it anywhere.
Perhaps it's my lack of knowledge of Ruby showing through, but what is missing? I'm using Ruby 1.9 if it makes a difference.
Thanks much, again.