Created
June 8, 2011 19:55
-
-
Save kosta/1015232 to your computer and use it in GitHub Desktop.
Small tcp server
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"net" | |
"bufio" | |
"sync" | |
"os/signal" | |
) | |
//slide 58 | |
func readLinesAndSendToChan(conn net.Conn, messages chan string) { | |
defer conn.Close() | |
reader := bufio.NewReader(conn) | |
for { | |
line, err := reader.ReadString('\n') | |
if err != nil { | |
fmt.Printf("Read error: %s, closing %s\n", err, conn.RemoteAddr()) | |
break | |
} //closes conn due to defer | |
messages <- line | |
} | |
} | |
//slide 60 | |
func writeLinesFromChan(conn net.Conn, | |
brokenChannels chan chan<- string, | |
waits *sync.WaitGroup) chan<- string { | |
waits.Add(1) | |
ch := make(chan string, 100) //buffer of 100 strings | |
go func() { | |
defer func() { brokenChannels <- ch }() | |
defer waits.Done() | |
addr := conn.RemoteAddr() | |
for msg := range ch { | |
_, err := fmt.Fprint(conn, msg) | |
if err != nil { | |
fmt.Printf("Write error: closing %s\n", addr) | |
break | |
} | |
} | |
}() | |
return ch | |
} | |
func main() { | |
//open TCP port - slide 52 | |
tcp, tcperr := net.Listen("tcp", ":0") | |
if tcperr != nil { | |
fmt.Println("Error opening TCP socket:", tcperr) | |
return | |
} | |
fmt.Println("Listening on", tcp.Addr().Network(), tcp.Addr()) | |
//tcp.Accept gofunc - slide 55 | |
newConnections := make(chan net.Conn) | |
go func() { | |
for { | |
conn, connerr := tcp.Accept() | |
if connerr != nil { | |
return //skip error handling for this example | |
} else { | |
newConnections <- conn | |
} | |
} | |
}() | |
//slide 57 - overwritten by slide 59 | |
messages := make(chan string) | |
/*for { | |
select { | |
case conn := <-newConnections: | |
fmt.Fprint(conn, "Hello.\n") | |
go readLinesAndSendToChan(conn, messages) | |
case msg := <-messages: | |
fmt.Println("Got message:", msg) | |
} | |
}*/ | |
waits := new(sync.WaitGroup) | |
//slide 59 | |
connections := make(map[chan<- string]bool) //used as set | |
brokenChannels := make(chan chan<- string) //channel of channels | |
for { | |
select { | |
case conn := <-newConnections: | |
go readLinesAndSendToChan(conn, messages) | |
connections[writeLinesFromChan(conn, brokenChannels, waits)] = true | |
case msg := <-messages: | |
fmt.Print("got message: ", msg) | |
for conn, _ := range connections { | |
conn <- msg | |
} | |
case broken := <-brokenChannels: | |
close(broken) | |
connections[broken] = false, false //remove from map | |
case signal := <-signal.Incoming: | |
fmt.Printf("got signal: '%s'. Quitting.\n", signal) | |
tcp.Close() | |
for conn, _ := range connections { | |
close(conn) | |
} | |
fmt.Println("waiting to flush existing messages...") | |
waits.Wait() | |
close(brokenChannels) | |
return | |
} | |
} | |
} |
- oh yes, a defer went missing there :) I'll move the conn.close() up there.
- The goroutine does not need to run. It was there when the waits.Done() got called after "brokenChannels <- ch". That meant that no waits.Done() waited forever. In the current scheme, I can remove that line :)
As you can see, I incorporated your changes. Again. :)
- ok, will change the listener name.
- I see the problem but do not know how to fix it, except increasing the buffer size. How do I do send non-blockingly to a channel?
- Thanks for catching that. I want to do as few cleanup as possible, so I'll just remove close(brokenChannels). The alternative would be to send on brokenChannel before calling waits.Done(). Then, I could put my go func() { for _ = range brokenChannels }() in there again call waits.Wait() after the goroutine. That would mean that all send operations on brokenChannels are done when waits.Wait() returns. Then I could safely close the channel.
The most important question to me is 2.: How do I do a non-blocking channel send?
The only idea that I had is
if len(conn) = 100 {
//sending would block. That channel is too slow for us. Get rid of it.
close(conn)
} else {
conn <- msg
}
But that feels extremely un-idiomatic and just plain wrong
Don't you need the default statement in order to make it non-blocking? From the spec under "select statements": "if there is no default case, the statement blocks until one of the communications can complete"
In any case, thanks a lot for your feedback, it was very valuable :)
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Thanks for your suggestions. As you can see, I incorporated all of them :) I really like the