Last active
December 21, 2020 08:40
-
-
Save kimathie/1cdf724e7dd1e747eed69e4bb8e442ee to your computer and use it in GitHub Desktop.
A NIO TCP Server
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.IOException; | |
import java.net.InetSocketAddress; | |
import java.nio.ByteBuffer; | |
import java.nio.channels.ClosedChannelException; | |
import java.nio.channels.SelectionKey; | |
import java.nio.channels.Selector; | |
import java.nio.channels.ServerSocketChannel; | |
import java.nio.channels.SocketChannel; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Executors; | |
import java.util.Iterator; | |
import java.util.Set; | |
/** | |
* | |
* @author kimathie | |
*/ | |
public class TcpServer { | |
private final Acceptor acceptor; | |
public TcpServer(String host, int port) { | |
this.acceptor = new Acceptor(host, port); | |
} | |
public void start() { | |
this.acceptor.start(); | |
} | |
public void stop() { | |
this.acceptor.interrupt(); | |
} | |
class Acceptor extends Thread { | |
private final int port; | |
private final String host; | |
private Selector selector; | |
private final ExecutorService pool; | |
public Acceptor(String host, int port) { | |
this.host = host; | |
this.port = port; | |
this.pool = Executors.newFixedThreadPool(5); | |
} | |
@Override | |
public void run() { | |
try { | |
this.selector = Selector.open(); | |
ServerSocketChannel server = ServerSocketChannel.open(); | |
server.bind(new InetSocketAddress(host, port)); | |
server.configureBlocking(false); | |
server.register(selector, SelectionKey.OP_ACCEPT); | |
SocketAddress localAddress = server.getLocalAddress(); | |
System.out.println("server " + localAddress + " open."); | |
while (true) { | |
if (isInterrupted()) { | |
interrupt(); | |
pool.shutdownNow(); | |
System.out.println("server " + localAddress + " closed."); | |
break; | |
} | |
int ready = selector.select(); | |
if (ready != 0) { | |
Set<SelectionKey> selectedKeys = selector.selectedKeys(); | |
Iterator<SelectionKey> selectionKeys = selectedKeys.iterator(); | |
while (selectionKeys.hasNext()) { | |
SelectionKey key = selectionKeys.next(); | |
if (!key.isValid()) { | |
continue; | |
} | |
selectionKeys.remove(); | |
processKey(key); | |
} | |
} | |
} | |
} catch (IOException e) { | |
interrupt(); | |
} | |
} | |
private void processKey(SelectionKey key) throws IOException { | |
switch (key.interestOps()) { | |
case SelectionKey.OP_ACCEPT: { | |
try { | |
ServerSocketChannel server = (ServerSocketChannel) key.channel(); | |
SocketChannel channel = server.accept(); | |
channel.configureBlocking(false); | |
channel.register(key.selector(), SelectionKey.OP_READ); | |
System.out.println("client " + channel.getRemoteAddress() + " connected."); | |
} catch (ClosedChannelException e) { | |
e.printStackTrace(); | |
key.cancel(); | |
} catch (IOException e) { | |
e.printStackTrace(); | |
key.cancel(); | |
} | |
} | |
break; | |
case SelectionKey.OP_READ: { | |
SocketChannel channel = (SocketChannel) key.channel(); | |
try { | |
ByteBuffer buffer = ByteBuffer.allocate(1024 * 8); | |
int read = channel.read(buffer); | |
if (read <= -1) { | |
key.cancel(); | |
System.out.println("client " + channel.getRemoteAddress() + " disconnected."); | |
} else { | |
pool.execute(() -> { | |
byte[] bytes = new byte[read]; | |
int len = buffer.position(); | |
for (int i = 0; i < len; i++) { | |
bytes[i] = buffer.get(i); | |
} | |
System.out.println("data of length " + bytes.length + " read."); | |
buffer.flip(); | |
buffer.clear(); | |
channel.write(buffer.wrap(bytes)); | |
System.out.println("data of length " + bytes.length + " written."); | |
}); | |
} | |
} catch (IOException e) { | |
key.cancel(); | |
e.printStackTrace(); | |
} | |
} | |
break; | |
} | |
} | |
} | |
public static void main(String[] args) { | |
TcpServer server = new TcpServer("127.0.0.1", 8191); | |
server.start(); | |
Runtime.getRuntime().addShutdownHook(new Thread(() -> { | |
server.stop(); | |
}, "main")); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment