Last active
March 21, 2016 12:00
-
-
Save djellemah/3f068006775d3dd9a735 to your computer and use it in GitHub Desktop.
One-shot thread-safe value store with blocking semantics.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# One-shot thread-safe value store with blocking explicit semantics, aka | |
# Future/Promise/IVar https://en.wikipedia.org/wiki/Futures_and_promises. | |
# | |
# In a sense, it's a state machine pattern for 3 states (fresh/unset, | |
# valued/set and exception). Apparently this is the same set of states as the | |
# PromiseA+ specification. Who knew? | |
# | |
# It can be initialised with a value, or the value can be assigned later on. | |
# | |
# You use it like this: | |
# | |
# inst = OneTime.new | |
# | |
# These threads will wait | |
# | |
# Thread.new{inst.value} | |
# Thread.new{inst.value} | |
# | |
# now set a value, and the threads will wake up | |
# | |
# inst.value = :we_have_a_value! | |
# | |
# But if you do this instead | |
# | |
# inst.raise 'the critical server died and went to the cloud' | |
# | |
# then the threads will wake up to an exception. | |
# | |
class OneTime | |
# ASSUMPTION initialize will never have more than one thread per instance | |
# executing concurrently. | |
def initialize( *args ) | |
# @mutex and @flag set in Fresh.extended hook, below. | |
case args.size | |
when 0 | |
extend Fresh | |
when 1 | |
@value = args.first | |
extend Valued | |
else | |
::Kernel.raise ArgumentError, "can only pass 0 or 1 arguments" | |
end | |
end | |
# only for extended. Will cause havoc if called at the wrong time. | |
def destruct_machinery | |
remove_instance_variable :@mutex if instance_variable_defined? :@mutex | |
remove_instance_variable :@flag if instance_variable_defined? :@flag | |
end | |
# only for extended. Will do unnecessary work if called at the wrong time. | |
def construct_machinery | |
@mutex = Mutex.new | |
@flag = ConditionVariable.new | |
end | |
# define access methods in the modules so we don't have to synchronize them | |
# unnecessarily. The module-switching is synchronized anyway, so that | |
# approach is still thread-safe. | |
# ASSUMPTION extending an instance with a module is threadsafe. | |
# has a value, so no need for synchronisation, and it can't be set again. | |
module Valued | |
attr_reader :value | |
def assigned?; true end | |
# would be nice to alias these, but can't because raise_assigned | |
# is defined in OneTime not in this module. | |
def value=(_) raise_assigned end | |
def raise(*_) raise_assigned end | |
def self.extended( base ); base.destruct_machinery end | |
end | |
# has an exception, so no need for synchronisation, and it can't be set again. | |
module Exceptional | |
def value; Kernel::raise @value; end | |
def assigned?; true end | |
def value=(_) raise_assigned end | |
def raise(*_) raise_assigned end | |
def self.extended( base ); base.destruct_machinery end | |
end | |
# doesn't have a value yet, so we need the synchronisation machinery. | |
module Fresh | |
def assigned?; false end | |
# This method is left lying around after the module switch. Which is a bit eww. | |
private def set_value( the_value, mod ) | |
@mutex.synchronize do | |
# this happens when two threads contend to set the the_value. | |
raise_assigned if assigned? | |
@value = the_value | |
# Since the value won't change again, replace the synchronisation | |
# because it's no longer necessary. Safe to do this here, because | |
# other waiting threads will be waiting on @mutex.synchronize. So | |
# they'll just finish in their current method, and any subsequent | |
# calls will just use the un-synchronised accessors | |
flag = @flag | |
extend mod | |
freeze | |
# all waiters can now have access to the value they've been waiting for. | |
flag.broadcast | |
end | |
end | |
# All threads waiting on .value will wake up to the value | |
def value=( the_value ) | |
set_value the_value, Valued | |
end | |
# All threads waiting on .value will wake up to an exception. Same idea | |
# as Thread#value. | |
# arguments same as Kernel.raise(string|Exception, [string, [backtrace]]) | |
# 'raise' clashes with Kernel.raise but its worthwhile because it fits in with | |
# Thread, so classes using this can treat this as a promise and Thread as a future. | |
def raise( *args ) | |
# parser chokes without ( ... ) | |
set_value( (::Kernel.raise(*args) rescue $!), Exceptional ) | |
end | |
# threads calling this before the value is assigned will be put to sleep | |
# here until there is a value. Semantics are same as Thread#value, ie will | |
# wait for the value, or raise an exception | |
# | |
# In the worst case a thread will wait on @flag because assigned? returns | |
# false, but in the meantime @value has just been set. But that's OK because | |
# the thread will be woken by the broadcast, which happens after the module | |
# switch (which also changes assigned to true). | |
def value | |
@mutex.synchronize do | |
@flag.wait @mutex unless assigned? | |
# method definition will be different by now because of module switch. | |
# So lean on those to return / raise as appropriate. | |
value | |
end | |
end | |
def self.extended( base ) | |
# Do this here rather than in OneTime constructor because they're not | |
# used by the Valued and Exceptional states of OneTime. | |
# It's safe because extend is called from initialize. Which is thread- | |
# safe except for (possible?) interference from ObjectSpace. | |
base.construct_machinery | |
end | |
end | |
def raise_assigned(*args) | |
Kernel::raise ArgumentError, "value already set to #{@value.inspect}" | |
end | |
def empty?; !assigned?; end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'one_time.rb' | |
describe OneTime do | |
before :all do | |
@mutex = Mutex.new | |
end | |
def subject | |
@mutex.synchronize do | |
@subject ||= OneTime.new | |
end | |
end | |
describe '.new' do | |
it 'has value after' do | |
inst = OneTime.new :hi_there | |
inst.value.should == :hi_there | |
end | |
it 'fails on > 1 argument' do | |
->{OneTime.new :one, :two}.should raise_error(ArgumentError) | |
end | |
it 'has no value' do | |
subject.should_not be_assigned | |
subject.should be_empty | |
end | |
end | |
describe '#assigned?' do | |
it 'false when fresh' do | |
subject.should_not be_assigned | |
end | |
it 'true after raise' do | |
subject.raise 'oops' | |
subject.should be_assigned | |
end | |
it 'true after value=' do | |
subject.value = 'not oops' | |
subject.should be_assigned | |
end | |
end | |
describe '#value=' do | |
it 'has value after assignment' do | |
subject.value = :some_value | |
subject.value.should == :some_value | |
end | |
it 'can only assign once' do | |
subject.value = :some_value | |
rand(1..15).times do | |
->{subject.value = :other_value}.should raise_error(ArgumentError) | |
end | |
end | |
it 'assignment of exception does not raise' do | |
subject.value = RuntimeError.new 'ya did wrang' | |
->{subject.value}.should_not raise_error | |
end | |
end | |
describe 'threads' do | |
it 'threads wait for value' do | |
# subject is not thread-safe, so just use a local | |
one_time = OneTime.new | |
random_thread_count = rand 2..105 | |
threads = random_thread_count.times.map{ Thread.new{one_time.value} } | |
# need a little wait for threads to start waiting on the value | |
sleep 0.00001 while threads.count{|t| t.status == 'sleep'} < 2 | |
# set the value | |
one_time.value = :long_awaited | |
# and check that other threads woke up and got it. | |
threads.each do |thr| | |
thr.value.should == :long_awaited | |
thr.status.should == false | |
end | |
end | |
it 'all threads but one fail assignment' do | |
# subject is not thread-safe, so just use a local | |
one_time = OneTime.new | |
random_thread_count = rand 100..200 | |
threads = random_thread_count.times.map do |i| | |
Thread.new{ one_time.value = i; :assigned_ok } | |
end | |
results = threads.map{|thr| thr.value rescue $!} | |
results.count{|v| v == :assigned_ok}.should == 1 | |
results.count{|v| v.is_a? Exception}.should == random_thread_count - 1 | |
end | |
it 'threads get exceptions' do | |
one_time = OneTime.new | |
threads = rand(2..105).times.map{ Thread.new{one_time.value} } | |
# set the value | |
one_time.raise RuntimeError, 'oops' | |
# and check that other threads woke up and got it. | |
threads.each do |thr| | |
->{thr.value}.should raise_error(RuntimeError, 'oops') | |
end | |
end | |
end | |
describe '#raise' do | |
it '()' do | |
subject.raise | |
->{subject.value}.should raise_error(RuntimeError,'') | |
end | |
it '(string)' do | |
subject.raise 'oops' | |
->{subject.value}.should raise_error(RuntimeError,'oops') | |
end | |
it '(exception_instance)' do | |
subject.raise RuntimeError.new('hereyago') | |
->{subject.value}.should raise_error(RuntimeError, 'hereyago') | |
end | |
it '(Exception,message)' do | |
subject.raise RuntimeError, 'grumpy crunchies' | |
->{subject.value}.should raise_error(RuntimeError, 'grumpy crunchies') | |
end | |
end | |
describe 'modules' do | |
it 'has synchronisation before assign' do | |
subject.method(:value).owner.should == OneTime::Fresh | |
subject.method(:value=).owner.should == OneTime::Fresh | |
end | |
it 'has no synchronisation after assign' do | |
subject.value = :some_value | |
subject.method(:value).owner.should == OneTime::Valued | |
subject.method(:value=).owner.should == OneTime::Valued | |
end | |
it 'Fresh has same methods as Valued' do | |
OneTime::Fresh.instance_methods.sort.should == OneTime::Valued.instance_methods.sort | |
OneTime::Fresh.instance_methods.each do |method_name| | |
fresh_method = OneTime::Fresh.instance_method method_name | |
valued_method = OneTime::Valued.instance_method method_name | |
fresh_method.arity.should == valued_method.arity | |
end | |
end | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'one_time.rb' | |
require 'delegate.rb' | |
# Waiter-on, geddit? Hurhurhur. | |
# | |
# Use it like this: | |
# | |
# w = Waitron.new | |
# Thread.new{puts w + 10} | |
# Thread.new{puts w + 20} | |
# | |
# and then you will see nothing on stdout. | |
# | |
# w._ = 9 | |
# | |
# and then you will see 1929 on stdout. | |
# | |
# This object's job is to be a very transparent delegator for a value store, | |
# defaulting to OneTime which is single-assigment with blocking semantics. The | |
# idea being that if you already have some complex algorithm in terms of ordinary | |
# objects, just instantiate these instead and pass them in. | |
# | |
# When they're used to wrap Thread instances, the calculation becomes automatically multi-threaded. | |
class Waitron < Delegator | |
# 0 or 1 arguments. | |
def initialize( *args, storage: ::OneTime) | |
@storage = storage | |
@wrapped = | |
case args.size | |
when 0 | |
storage.new | |
when 1 | |
case arg1 = args.first | |
when storage, ::Thread | |
# can wait on one of these, so just reference it. | |
arg1 | |
else | |
# the value, which we will end up not waiting on. | |
storage.new arg1 | |
end | |
else | |
raise ArgumentError, 'Waitron.new takes 0 or 1 arguments' | |
end | |
end | |
# I'm not sure this is a good idea. | |
def self.new( *args ) | |
if args.size == 1 && (obj=args.first).is_a?(Waitron) | |
obj | |
else | |
super | |
end | |
end | |
# make several | |
def self.[]( *objs ) | |
objs.map do |obj| | |
new obj | |
end | |
end | |
# make several Waitrons | |
def self.instances( count ) | |
count.times.map{new} | |
end | |
# Implementation for Delegator | |
def __setobj__( value ) | |
@wrapped.value = value | |
end | |
def __getobj__ | |
@wrapped.value | |
end | |
# handle a bit of self-schizophrenia here | |
def nil?; __getobj__.nil? end | |
# In Prolog, Scala and some other languages, _ effectively means 'unbound | |
# variable' which kinda fits here as a nice way than __setobj__ to set the value. | |
module UnderscoreAccessor | |
def _=(*args); __assign__ *args; end | |
def _; __getobj__; end | |
end | |
include UnderscoreAccessor | |
# true if the object has a value, false otherwise. Non-blocking. | |
# | |
# I'm sure *somebody* will find a way to misuse this to avoid blocking... | |
def __assigned? | |
# this is quite horrible, but short of reopening Thread, I can't see another way to do it. | |
# Lots of things respond_to? :value, so can't rely on that. | |
case @wrapped | |
when ::Thread | |
# ie normal termination or exception | |
@wrapped.status == false || @wrapped.status == nil | |
when @storage | |
@wrapped.assigned? | |
else | |
raise "Don't know how to figure out assigned value for #{@wrapped.inspect}" | |
end | |
end | |
# be nice to pry and other REPLs | |
def inspect | |
if __assigned? | |
"#<Waitron:0x#{'%x' % __id__} #{__getobj__.inspect}>" rescue "#<Waitron:0x#{'%x' % __id__} #{$!.inspect}>" | |
else | |
"#<Waitron:0x#{'%x' % __id__} unset>" | |
end | |
end | |
# have to handle this otherwise pry (and maybe irb) will block | |
def pretty_print(pp) | |
if __assigned? | |
super | |
else | |
pp.text inspect | |
end | |
end | |
# for awesome_print | |
def ai; inspect end | |
protected | |
# This allows assigning to an Exception instance, which will cause accesses | |
# to raise. Like Thread#value. | |
# Will need to be overridden unless storage respond_to? :raise | |
def __assign__( *args ) | |
if (ex = args.first).is_a? Exception | |
@wrapped.raise ex | |
else | |
__setobj__ *args | |
end | |
self | |
end | |
end | |
class Object | |
def to_waitron | |
Waitron.new self | |
end | |
end | |
class Thread | |
def self.waitron( *args, &block ) | |
Waitron.new new( *args, &block ) | |
end | |
end | |
module Kernel | |
def waitron( *args ) | |
case args.size | |
when 0; Waitron.new | |
when 1; Waitron.new args.first | |
else; Waitron[args] | |
end | |
end | |
def thread(*args, &block) | |
Waitron.new Thread.new(*args, &block) | |
end | |
end | |
class Waitron | |
def to_waitron | |
self | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'waitron.rb' | |
# turn off the "old syntax" warnings | |
RSpec.configure do |config| | |
config.mock_with :rspec do |c| | |
c.syntax = [:should, :expect] | |
end | |
config.expect_with :rspec do |c| | |
c.syntax = [:should, :expect] | |
end | |
end | |
describe Waitron do | |
it 'network' do | |
x,y,z = 3.times.map{Waitron.new} | |
t = Thread.new{(x + y + z) * (x - y - z)} | |
t.status.should == 'run' | |
x._ = 8 | |
y._ = 2 | |
z._ = 1 | |
# this must block | |
t.value.should == 55 | |
t.value.should == 55 | |
t.status.should == false | |
t.value.should == 55 | |
end | |
it 'concurrent updates' do | |
x,y,z = Waitron.instances 3 | |
t = Thread.new{(x + y + z) * (x - y - z)} | |
sleep 0.001 | |
t.status.should == 'sleep' | |
Thread.new{sleep rand; x._ = 8} | |
Thread.new{sleep rand; y._ = 2} | |
Thread.new{sleep rand; z._ = 1} | |
(t.alive? && t.stop?).should == true | |
# this should block | |
t.value.should == 55 | |
end | |
it 'associative' do | |
x,y,z = Waitron.instances 3 | |
x._ = 6 | |
y._ = x | |
z._ = y | |
x.should == y | |
x.should == z | |
y.should == x | |
y.should == z | |
z.should == x | |
z.should == y | |
z.should == 6 | |
6.should == z | |
end | |
it 'wraps a Thread' do | |
w = Waitron.new Thread.new{:through_value} | |
w.__getobj__.should == :through_value | |
w._.should == :through_value | |
end | |
it 'new does not wrap other waitron' do | |
w = Waitron.new :the_relevant_value | |
x = Waitron.new w | |
x.object_id.should == w.object_id | |
x.should == w | |
end | |
it 'calculation' do | |
future_value = Waitron.new Thread.new{sleep rand; 120034.0} | |
present_value = Waitron.new | |
projected_value = Waitron.new | |
interest = Waitron.new | |
# get percentage and multiply by projected value and interest | |
calc = Thread.new{(( future_value / present_value ) - 1) * projected_value * (1+interest)} | |
# wait a moment for thread to sleep | |
sleep 0.001 | |
calc.status.should == 'sleep' | |
# raise exception if something goes wrong | |
Thread.new do | |
present_value._ = 100000 | |
projected_value._ = 150000 | |
interest._ = 0.08 | |
end.join | |
calc.value.should == 32455.079999999994 | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment