-
-
Save matthiasgoergens/6ec7f88825cb766b36c4cd7715687e0b to your computer and use it in GitHub Desktop.
STM example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"math/rand" | |
"net" | |
"strconv" | |
"time" | |
) | |
const port = "8000" | |
func startPublisher(messages chan<- string) { | |
rand.Seed(time.Now().Unix()) | |
for { | |
time.Sleep(1 * time.Second) | |
messages <- strconv.Itoa(rand.Int()) | |
} | |
} | |
func startAcceptor(ln net.Listener, incoming chan<- net.Conn, closing chan<- net.Conn) { | |
for { | |
conn, err := ln.Accept() | |
if err != nil { | |
fmt.Println(err) | |
break | |
} | |
incoming <- conn | |
go func() { | |
waitForDisconnect(conn) | |
closing <- conn | |
}() | |
} | |
} | |
func waitForDisconnect(conn net.Conn) { | |
discard := make([]byte, 1024) | |
for { | |
bytes, err := conn.Read(discard) | |
if err != nil || bytes == 0 { | |
break | |
} | |
} | |
} | |
func indexOf(conn net.Conn, clients []net.Conn) int { | |
for i, c := range clients { | |
if c == conn { | |
return i | |
} | |
} | |
return -1 | |
} | |
func loop( | |
clients []net.Conn, | |
messages <-chan string, | |
incoming <-chan net.Conn, | |
closing <-chan net.Conn) { | |
for { | |
select { | |
case conn := <-incoming: | |
fmt.Println("Incoming:", conn) | |
clients = append(clients, conn) | |
case conn := <-closing: | |
fmt.Println("Closing:", conn) | |
ix := indexOf(conn, clients) | |
if ix > -1 { | |
clients[ix] = clients[len(clients)-1] | |
clients = clients[:len(clients)-1] | |
} | |
case n := <-messages: | |
fmt.Println("Publishing", n, "to", len(clients), "clients") | |
for _, conn := range clients { | |
buf := []byte(n + "\n") | |
conn.Write(buf) | |
} | |
} | |
} | |
} | |
func main() { | |
ln, err := net.Listen("tcp", ":"+port) | |
if err != nil { | |
panic(err) | |
} | |
defer ln.Close() | |
messages := make(chan string) | |
incoming := make(chan net.Conn) | |
closing := make(chan net.Conn) | |
go startPublisher(messages) | |
go startAcceptor(ln, incoming, closing) | |
clients := make([]net.Conn, 0) | |
loop(clients, messages, incoming, closing) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{-# LANGUAGE ScopedTypeVariables #-} | |
module Main where | |
import Network ( withSocketsDo, listenOn, PortID(..) ) | |
import Network.Socket ( accept, close, recv, send, Socket ) | |
import Control.Concurrent ( forkIO, threadDelay, ThreadId ) | |
import Control.Concurrent.STM ( atomically | |
, orElse | |
, newTChan | |
, readTChan | |
, writeTChan | |
, TChan | |
) | |
import Control.Exception ( onException ) | |
import Control.Applicative ((<$), (<*)) | |
import Control.Monad ( forM_, forever, join, unless, (<=<) ) | |
import Data.List ( delete ) | |
import System.Random ( randomIO ) | |
import Text.Printf ( printf ) | |
port :: Int | |
port = 8000 | |
startPublisher :: TChan String -> IO ThreadId | |
startPublisher messages = | |
forkIO $ forever $ do | |
n <- randomIO :: IO Int | |
atomically (writeTChan messages (show n)) | |
threadDelay 1000000 | |
startAcceptor :: Socket -> TChan Socket -> TChan Socket -> IO ThreadId | |
startAcceptor sock incoming closing = | |
forkIO $ forever $ do | |
(conn, _) <- accept sock | |
atomically (writeTChan incoming conn) | |
forkIO $ do | |
waitForClose conn | |
atomically (writeTChan closing conn) | |
where | |
waitForClose conn = do | |
buf <- recv conn 4096 `catch` \(_ :: SomeException) -> return "" | |
unless (null buf) $ waitForClose conn | |
main :: IO () | |
main = withSocketsDo $ do | |
sock <- listenOn (PortNumber (fromIntegral port)) | |
incoming <- atomically newTChan | |
closing <- atomically newTChan | |
messages <- atomically newTChan | |
startPublisher messages | |
startAcceptor sock incoming closing | |
let handle clients = | |
(do conn <- readTChan incoming | |
(conn : client) <$ printf "Incoming: %s\n" (show conn)) | |
`orElse` | |
(do conn <- readTChan closing | |
delete conn clients <$ printf "Closing: %s\n" (show conn) <* close conn) | |
`orElse` | |
(do message <- readTChan messages | |
clients <$ do | |
printf "Publishing %s to %d clients\n" message (length clients) | |
forM_ clients (\conn -> | |
send conn (message ++ "\n") `onException` | |
atomically (writeTChan closing conn))) | |
loop = loop <=< (join.atomically.handle) | |
loop [] | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
name: stm-example | |
version: 0.1.0.0 | |
synopsis: Fanout TCP to demonstrate STM | |
description: Fanout TCP to demonstrate STM | |
homepage: http://lcfrs.org | |
license: BSD3 | |
license-file: LICENSE | |
author: Neuman vong | |
maintainer: neuman.vong@gmail.com | |
copyright: 2016 Neuman Vong | |
category: Web | |
build-type: Simple | |
-- extra-source-files: | |
cabal-version: >=1.10 | |
executable stm-example-exe | |
hs-source-dirs: . | |
main-is: Main.hs | |
ghc-options: -threaded -rtsopts -with-rtsopts=-N | |
build-depends: base | |
, network | |
, stm | |
, random | |
default-language: Haskell2010 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment