Created
February 9, 2014 18:47
-
-
Save YoEight/8904145 to your computer and use it in GitHub Desktop.
Possible Process Haskell impl.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{-# LANGUAGE ExistentialQuantification #-} | |
{-# LANGUAGE RankNTypes #-} | |
{-# LANGUAGE GADTs #-} | |
module Data.Process where | |
import Prelude hiding (zipWith) | |
import Control.Applicative | |
import Control.Monad | |
import Data.Foldable | |
import System.IO | |
import System.IO.Error | |
infixr 9 <~ | |
infixl 9 ~> | |
data Step m o r where | |
Yield :: o -> r -> Step m o r | |
Await :: m a -> (a -> r) -> r -> Step m o r | |
Stop :: Step m o r | |
newtype Process m o | |
= Process | |
{ unProcess :: Step m o (Process m o) } | |
data Is a b where | |
Is :: Is a a | |
type Process1 a b = Process (Is a) b | |
data TeeOp a b c where | |
TL :: TeeOp a b a | |
TR :: TeeOp a b b | |
type Tee a b c = Process (TeeOp a b) c | |
data WyeOp a b c where | |
WL :: WyeOp a b a | |
WR :: WyeOp a b b | |
WBoth :: WyeOp a b (Either a b) | |
type Wye a b c = Process (WyeOp a b) c | |
type Channel a m b = Process m (a -> m b) | |
type Sink m a = Channel a m () | |
newtype Plan f o u | |
= Plan | |
{ unPlan :: | |
forall r. | |
(u -> r) -> -- done | |
(o -> r -> r) -> -- emit | |
(forall a. f a -> (a -> r) -> r -> r) -> -- await | |
r -> -- halt | |
r | |
} | |
instance Functor (Plan f o) where | |
fmap f (Plan k) = Plan $ \kp -> k (kp . f) | |
instance Applicative (Plan f o) where | |
pure = return | |
(<*>) = ap | |
instance Monad (Plan f o) where | |
return a = Plan $ \kp _ _ _ -> kp a | |
Plan k >>= f = Plan $ \kp ke ka kr -> | |
k (\a -> unPlan (f a) kp ke ka kr) ke ka kr | |
repeatedly :: Plan m o u -> Process m o | |
repeatedly (Plan k) = r | |
where | |
r = Process $ k | |
(const $ unProcess r) | |
(\o n -> Yield o (Process n)) | |
(\rq c fb -> Await rq (Process . c) (Process fb)) | |
Stop | |
process :: Plan m o u -> Process m o | |
process (Plan k) = | |
Process $ k | |
(const Stop) | |
(\o n -> Yield o (Process n)) | |
(\rq c fb -> Await rq (Process . c) (Process fb)) | |
Stop | |
resource :: IO r | |
-> (r -> IO ()) | |
-> (r -> IO (Maybe o)) | |
-> Plan IO o () | |
resource ack release step = onAwait go ack | |
where | |
go r = onAwaitFb (go1 r) (handle r) (cleanup r) | |
handle r = catchIOError (step r) $ \e -> do | |
release r | |
ioError e | |
go1 r (Just o) = yield o >> onAwaitFb (go1 r) (handle r) (cleanup r) | |
go1 r _ = cleanup r | |
cleanup r = await (release r) | |
-- | Use fallback only if Await req has been requested | |
onAwaitFb :: (a -> Plan f o u) | |
-> f a | |
-> Plan f o u | |
-> Plan f o u | |
onAwaitFb k rq fb = Plan $ \kp ke ka kr -> | |
let go a = unPlan (k a) kp ke ka (unPlan fb kp ke ka kr) in | |
ka rq go kr | |
onAwait :: (a -> Plan f o u) -> f a -> Plan f o u | |
onAwait k rq = Plan $ \kp ke ka kr -> | |
let go a = unPlan (k a) kp ke ka kr in | |
ka rq go kr | |
await :: f a -> Plan f o a | |
await fa = Plan $ \kp _ ka kr -> ka fa kp kr | |
await1 :: Plan (Is a) o a | |
await1 = await Is | |
awaitL :: Plan (TeeOp a b) o a | |
awaitL = await TL | |
awaitR :: Plan (TeeOp a b) o b | |
awaitR = await TR | |
yield :: o -> Plan f o () | |
yield o = Plan $ \kp ke _ _ -> ke o (kp ()) | |
halt :: Plan f o a | |
halt = Plan $ \_ _ _ kr -> kr | |
stopped :: Process m o | |
stopped = Process Stop | |
liftF :: (a -> b) -> Process1 a b | |
liftF f = repeatedly $ do | |
a <- await1 | |
yield (f a) | |
(<~) :: Process1 a b -> Process m a -> Process m b | |
p1 <~ p2 = | |
Process $ | |
case unProcess p1 of | |
Stop -> Stop | |
Yield b n -> Yield b (n <~ p2) | |
Await Is k fb -> | |
case unProcess p2 of | |
Stop -> unProcess (fb <~ stopped) | |
Yield a pn -> unProcess (k a <~ pn) | |
Await rq c pfb -> Await rq (\r -> p1 <~ c r) (p1 <~ pfb) | |
(~>) :: Process m a -> Process1 a b -> Process m b | |
p1 ~> p2 = p2 <~ p1 | |
tee :: Tee a b c -> Process m a -> Process m b -> Process m c | |
tee p1 p2 p3 = | |
Process $ | |
case unProcess p1 of | |
Stop -> Stop | |
Yield c n -> Yield c (tee n p2 p3) | |
Await TL k fb -> | |
case unProcess p2 of | |
Stop -> unProcess (tee fb stopped p3) | |
Yield a p2n -> unProcess (tee (k a) p2n p3) | |
Await rq ca pfa -> Await rq (\r -> tee p1 (ca r) p3) | |
(tee p1 pfa p3) | |
Await TR k fb -> | |
case unProcess p3 of | |
Stop -> unProcess (tee fb p2 stopped) | |
Yield b p3n -> unProcess (tee (k b) p2 p3n) | |
Await rq cb pfb -> Await rq (\r -> tee p1 p2 (cb r)) | |
(tee p1 p2 pfb) | |
zipWith :: (a -> b -> c) -> Tee a b c | |
zipWith f = repeatedly $ do | |
a <- awaitL | |
b <- awaitR | |
yield (f a b) | |
eval :: Process m (m a) -> Process m a | |
eval p = | |
Process $ | |
case unProcess p of | |
Stop -> Stop | |
Yield ma n -> Await ma (\a -> Process $ Yield a (eval n)) stopped | |
Await rq c fb -> Await rq (eval . c) (eval fb) | |
through :: Process m a -> Channel a m b -> Process m b | |
through p1 p2 = eval $ tee (zipWith (\a f -> f a)) p1 p2 | |
source :: Foldable f => f a -> Process m a | |
source = process . traverse_ yield | |
run :: Monad m => Process m a -> m () | |
run m = | |
case unProcess m of | |
Stop -> return () | |
Yield _ n -> run n | |
Await rq c _ -> run . c =<< rq | |
printSink :: Show a => Sink IO a | |
printSink = repeatedly $ yield print | |
test :: IO () | |
test = run $ through (source [1..10]) printSink | |
-- Output | |
-- 1 | |
-- 2 | |
-- 3 | |
-- 4 | |
-- 5 | |
-- 6 | |
-- 7 | |
-- 8 | |
-- 9 | |
-- 10 | |
testFile :: FilePath -> IO () | |
testFile path = run $ through src printSink | |
where | |
src = process $ resource open close go | |
open = do | |
print "Open" | |
openFile path ReadMode | |
close h = do | |
hClose h | |
print "Close" | |
go h = do | |
eof <- hIsEOF h | |
if eof | |
then return Nothing | |
else fmap Just (hGetLine h) | |
-- *Data.Process> testFile "stream.cabal" | |
-- "Open" | |
-- "-- Initial stream.cabal generated by cabal init. For further" | |
-- "-- documentation, see http://haskell.org/cabal/users-guide/" | |
-- "" | |
-- "name: stream" | |
-- "version: 0.1.0.0" | |
-- "-- synopsis:" | |
-- "-- description:" | |
-- "-- license:" | |
-- "license-file: LICENSE" | |
-- "author: Yorick Laupa" | |
-- "-- maintainer:" | |
-- "-- copyright:" | |
-- "category: Data" | |
-- "build-type: Simple" | |
-- "cabal-version: >=1.8" | |
-- "" | |
-- "library" | |
-- " exposed-modules: Data.Process" | |
-- " -- other-modules:" | |
-- " build-depends: base ==4.6.*" | |
-- " , semigroups >=0.9.2" | |
-- "Close" | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment