Created
March 5, 2020 12:23
-
-
Save jasper-lyons/7b9ee6a682f3efb8460e0ddc760e6f3e to your computer and use it in GitHub Desktop.
Light weight Channel, Fibers and Task scheduler in Lua
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- This is just me working through this blog post: | |
-- https://www.wingolog.org/archives/2018/05/16/lightweight-concurrency-in-lua | |
-- | |
-- My goal was just to explore how one can make concurrency in lua more expressive without | |
-- compiling your own c extensions. Eventually I want to explore using such a system to build | |
-- a concurrent socket server but I suspect that will need c. Can't escape the blocking. | |
local Tasks = { | |
queue = {} | |
} | |
function Tasks:schedule(task) | |
table.insert(self.queue, task) | |
end | |
function Tasks:run() | |
local queue = self.queue | |
self.queue = {} | |
for _, task in ipairs(queue) do | |
task() | |
end | |
end | |
Tasks:schedule(function () | |
print('hello') | |
end) | |
Tasks:schedule(function () | |
print('task') | |
end) | |
Tasks:schedule(function () | |
print('world') | |
end) | |
Tasks:run() | |
local Fibers = { | |
current = nil | |
} | |
function Fibers:spawn(func) | |
local fiber = coroutine.create(func) | |
Tasks:schedule(function () | |
Fibers:resume(fiber) | |
end) | |
end | |
function Fibers:resume(fiber, ...) | |
Fibers.current = fiber | |
local ok, err = coroutine.resume(fiber, ...) | |
Fibers.current = nil | |
if not ok then | |
print('Error running fiber: ' .. tostring(err)) | |
end | |
end | |
function Fibers:suspend(reschedule) | |
reschedule(Fibers.current) | |
return coroutine.yield() | |
end | |
function Fibers:yield() | |
return Fibers:suspend(function (fiber) | |
Tasks:schedule(function () | |
Fibers:resume(fiber) | |
end) | |
end) | |
end | |
Fibers:spawn(function () | |
Fibers:yield() | |
print('hello') | |
end) | |
Fibers:spawn(function () | |
print('fiber') | |
end) | |
Fibers:spawn(function () | |
print('world') | |
end) | |
Tasks:run() | |
Tasks:run() | |
local Suspension = {} | |
function Suspension:new(fiber) | |
local base = { fiber=fiber, waiting=true } | |
setmetatable(base, self) | |
self.__index = self | |
return base | |
end | |
function Suspension:isWaiting() | |
return self.waiting | |
end | |
function Suspension:complete(wrap, value) | |
assert(self.waiting) | |
self.waiting = false | |
Tasks:schedule(function () | |
Fibers:resume(self.fiber, wrap, value) | |
end) | |
end | |
local Implementation = {} | |
function Implementation:new(try, block, wrap) | |
local base = { try=try, block=block, wrap=wrap } | |
setmetatable(base, self) | |
self.__index = self | |
return base | |
end | |
local Operation = {} | |
function Operation:new(implementations) | |
local base = { implementations=implementations } | |
setmetatable(base, self) | |
self.__index = self | |
return base | |
end | |
function Operation:perform() | |
for _, implementation in ipairs(self.implementations) do | |
local success, value = implementation.try() | |
if success then | |
return implementation.wrap(value) | |
end | |
end | |
local wrap, value = Fibers:suspend(function (fiber) | |
local suspension = Suspension:new(fiber) | |
for _, implementation in ipairs(self.implementations) do | |
implementation.block(suspension, implementation.wrap) | |
end | |
end) | |
return wrap(value) | |
end | |
function Operation:wrap(wrapper) | |
local wrapped = {} | |
for _, implementation in ipairs(self.implementations) do | |
table.insert(wrapped, Implementation:new( | |
implementation.try, | |
implementation.block, | |
function (value) | |
return wrapper(implementation.wrap(value)) | |
end | |
)) | |
end | |
return Operation:new(wrapped) | |
end | |
local Queue = {} | |
function Queue:new() | |
local base = {} | |
setmetatable(base, self) | |
self.__index = self | |
return base | |
end | |
function Queue:isEmpty() | |
return #self == 0 | |
end | |
function Queue:peek() | |
return self[1] | |
end | |
function Queue:pop() | |
return table.remove(self, 1) | |
end | |
function Queue:push(value) | |
self[#self + 1] = value | |
end | |
function Queue:removeStale(predicate) | |
local index = 1 | |
while index <= #self do | |
if predicate(self[index]) then | |
table.remove(self, index) | |
else | |
index = index + 1 | |
end | |
end | |
end | |
local Channel = {} | |
function Channel:new() | |
local base = { getQueue = Queue:new(), putQueue = Queue:new() } | |
setmetatable(base, self) | |
self.__index = self | |
return base | |
end | |
function Channel:putFactory(value) | |
return Operation:new({ | |
Implementation:new( | |
function () | |
self.getQueue:removeStale(function (entry) | |
return not entry.suspension:isWaiting() | |
end) | |
if self.getQueue:isEmpty() then | |
return false, nil | |
else | |
local remote = self.getQueue:pop() | |
remote.suspension:complete(remote.wrap, value) | |
return true, nil | |
end | |
end, | |
function (suspension, wrap) | |
self.putQueue:removeStale(function (entry) | |
return not entry.suspension:waiting() | |
end) | |
self.putQueue:push({ | |
suspension = suspension, | |
wrap = wrap, | |
value = value | |
}) | |
end, | |
function (value) | |
return value | |
end | |
) | |
}) | |
end | |
function Channel:getFactory() | |
return Operation:new({ | |
Implementation:new( | |
function () | |
self.putQueue:removeStale(function (entry) | |
return not entry.suspension:isWaiting() | |
end) | |
if self.putQueue:isEmpty() then | |
return false, nil | |
else | |
local remote = self.putQueue:pop() | |
remote.suspension:complete(remote.wrap) | |
return true, remote.value | |
end | |
end, | |
function (suspension, wrap) | |
self.getQueue:removeStale(function (entry) | |
return not entry.suspension:waiting() | |
end) | |
self.getQueue:push({ | |
suspension = suspension, | |
wrap = wrap | |
}) | |
end, | |
function (value) | |
return value | |
end | |
) | |
}) | |
end | |
function Channel:put(value) | |
return self:putFactory(value):perform() | |
end | |
function Channel:get() | |
return self:getFactory():perform() | |
end | |
function numbers(from) | |
local sink = Channel:new() | |
Fibers:spawn(function () | |
while true do | |
sink:put(from) | |
from = from + 1 | |
end | |
end) | |
return sink | |
end | |
local done = false | |
Fibers:spawn(function () | |
local source = numbers(0) | |
for i=0,10 do print(source:get()) end | |
done = true | |
end) | |
while not done do Tasks:run() end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment