Skip to content

Instantly share code, notes, and snippets.

@straight-shoota
Last active November 18, 2023 06:10
Show Gist options
  • Save straight-shoota/4437971943bae7000f03fabf3d814a2f to your computer and use it in GitHub Desktop.
Save straight-shoota/4437971943bae7000f03fabf3d814a2f to your computer and use it in GitHub Desktop.
Spindle - Structured Concurrency for Crystal (PoC)
def bob
10.times do
puts "B"
sleep 0.006
end
end
def alice
3.times do
puts "A"
sleep 0.01
end
raise "Alice aborted"
end
concurrent do |spindle|
spindle.spawn "Alice" do
alice
end
spindle.spawn "Bob", ->bob
end
def concurrent
spindle = Spindle.new
begin
yield spindle
ensure
spindle.spin
end
end
macro concurrent(first, *args)
concurrent do
spawn do
{{ first }}
end
{% for arg in args %}
spawn do
{{ arg }}
end
{% end %}
end
end
class Spindle
@fibers = [] of Fiber
@finished = Channel(Tuple(Fiber, Exception?)).new
def spawn(name = nil, &block : -> _)
spawn(name, block)
end
def spawn(name, proc)
puts "spindle spawning #{name}"
finished = @finished
fiber = Fiber.new(name) do
begin
proc.call
rescue exc
finished.send({Fiber.current, exc})
else
finished.send({Fiber.current, nil})
end
end
@fibers << fiber
Scheduler.enqueue fiber
end
def <<(fiber : Fiber)
@fibers << fiber
end
def spin
exception = nil
until @fibers.empty?
fiber, exc = @finished.receive
@fibers.delete fiber
case exc
when Fiber::CancelledException
puts "#{fiber.name} cancelled"
when Exception
@fibers.each &.cancel
puts "#{fiber.name} raised"
else
puts "#{fiber.name} finished"
end
exception = exc unless exception
end
raise exception if exception
end
end
class Fiber
# A `CancelRequestException` holds the callstack of `CancelledException` where
# `Fiber#cancel` was called.
class CancelRequestException < Exception
getter fiber : Fiber
def initialize(@callstack : CallStack, message : String = "Fiber cancel request")
super(message)
@fiber = Fiber.current
end
end
# A `CancelledException` is raised when a fiber is resumed after it was cancelled.
# See `Fiber#cancel` for details.
#
# If `cause` is `nil`, the fiber was cancelled while executing.
class CancelledException < Exception
getter fiber : Fiber
def initialize(@fiber : Fiber, cause : CancelRequestException? = nil)
super("Fiber cancelled: #{fiber}", cause)
end
def cause : CancelRequestException?
@cause.as(CancelRequestException?)
end
end
@cancel_request : CancelRequestException? = nil
# Stops this fiber from executing again and unwinds its stack.
#
# This method requests the fiber to raise a `CancelledException` the next time
# it is resumed and enqueues it in the scheduler. Therefore the unwinding will
# only take place the next time the scheduler reschedules.
#
# Raises `CancelledException` if this is the current fiber.
def cancel
if Fiber.current == self
# In case the current fiber is to be canceled, just raise the exception directly.
raise CancelledException.new self
else
# Otherwise register a cancel request, it will be evaluated on next resume.
@cancel_request ||= CancelRequestException.new(CallStack.new)
end
# Trigger scheduling
@resume_event.try &.free
Scheduler.enqueue(self)
end
def resume
previous_def
if cancel_request = Fiber.current.@cancel_request
raise CancelledException.new Fiber.current, cancel_request
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment