Skip to content

Instantly share code, notes, and snippets.

@DarinM223
Last active June 16, 2022 03:53
Show Gist options
  • Save DarinM223/0d3263544a919fba714df2ae28ed21cb to your computer and use it in GitHub Desktop.
Save DarinM223/0d3263544a919fba714df2ae28ed21cb to your computer and use it in GitHub Desktop.
Select for TBQueue similar to Golang's select
{-# LANGUAGE ExistentialQuantification #-}
import Control.Concurrent.STM (STM, TBQueue, atomically, isEmptyTBQueue, isFullTBQueue, newTBQueueIO, readTBQueue, retry, writeTBQueue)
import Control.Monad (filterM, replicateM_)
import System.Random (StdGen, newStdGen, uniformR)
data Handler a
= forall b. Recv (TBQueue b) (b -> IO a)
| forall b. Send (TBQueue b) b (IO a)
handlerBlocked :: Handler a -> STM Bool
handlerBlocked (Recv queue _) = isEmptyTBQueue queue
handlerBlocked (Send queue _ _) = isFullTBQueue queue
select :: StdGen -> [Handler a] -> STM (IO a, StdGen)
select gen hs = do
ready <- filterM (fmap not . handlerBlocked) hs
if (null ready)
then retry
else do
let (i, gen') = uniformR (0, length ready - 1) gen
handler = ready !! i
case handler of
Recv queue fn -> do
v <- readTBQueue queue
pure (fn v, gen')
Send queue v fn -> do
writeTBQueue queue v
pure (fn, gen')
selectIO :: [Handler a] -> IO a
selectIO hs = do
gen <- newStdGen
(action, _) <- atomically $ select gen hs
action
test :: IO ()
test = do
q1 <- newTBQueueIO 1
q2 <- newTBQueueIO 1
atomically $ writeTBQueue q1 (1 :: Int)
let msg = "hello"
selectIO
[ Recv q1 $ \i -> putStrLn $ "Received: " ++ show i,
Send q2 msg $ putStrLn $ "Sent: " ++ show msg,
Send q1 2 $ putStrLn "This shouldn't happen",
Recv q2 $ \_ -> putStrLn "This shouldn't happen"
]
main :: IO ()
main = replicateM_ 1000 test
{-# LANGUAGE ExistentialQuantification #-}
import Control.Concurrent (yield)
import Control.Concurrent.Chan.Unagi.Bounded (Element (tryRead), InChan, OutChan, newChan, tryReadChan, tryWriteChan, writeChan)
import Control.Monad (replicateM_)
import System.Random.Shuffle (shuffleM)
data Handler a
= forall b. Recv (OutChan b) (b -> IO a)
| forall b. Send (InChan b) b (IO a)
-- Problem with this is that it can spinlock unlike the STM version
-- which only wakes up when one of the queues changes.
select :: (IO a -> IO a) -> [Handler a] -> IO a
select retry hs0 = go hs0
where
go ((Recv chan f) : hs) = do
(element, _) <- tryReadChan chan
tryRead element >>= maybe (go hs) f
go ((Send chan v m) : hs) = do
succeeded <- tryWriteChan chan v
if succeeded then m else go hs
go [] = retry (select retry hs0)
main :: IO ()
main = replicateM_ 100 $ do
(in1, out1) <- newChan 1
(in2, out2) <- newChan 1
writeChan in1 (1 :: Int)
let msg = "hello"
select (\retry -> yield >> retry)
=<< shuffleM
[ Recv out1 $ \i -> putStrLn $ "Received: " ++ show i,
Send in2 msg $ putStrLn $ "Sent: " ++ show msg,
Send in1 2 $ putStrLn "This shouldn't happen",
Recv out2 $ \_ -> putStrLn "This shouldn't happen"
]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment