Skip to content

Instantly share code, notes, and snippets.

@mbbx6spp
Created February 7, 2012 17:58
Show Gist options
  • Save mbbx6spp/1761004 to your computer and use it in GitHub Desktop.
Save mbbx6spp/1761004 to your computer and use it in GitHub Desktop.
Websockets with Netty (Scala translation) - WORK IN PROGRESS! Currently it expects an integer and will respond with its square. It isn't very interesting but it serves as a boilerplate/skeleton.

Experimentation with Netty and Scala's Types

Status

  • Only first pass complete: raw translation from Java example to Scala.
  • Second pass [incomplete]: abstracting to make server and client specific code composable.

TODO:

  • Create declarative and composable Pipeline definition
  • Offer simplified handler base that can also be composed with other handlers
  • Split reusable code out and show server and client examples using new abstractions
package net.susanpotter.wsnettyexperiment
// WIP - Incomplete! Just doodling at the moment...nothing here to see (unless you want to help out;))
import scalaz._
import Scalaz._
import scalaz.effects._
// TODO Think of a better name, or implement this like Haskell's pipes[1]
// [1] http://hackage.haskell.org/package/pipes-1.0
// Eventually want to do something like: `def getPipeline = pipe1 >> pipe2 >> pipe3`
// and also something like:
trait Pipe[A, B, M, R] {
// TODO MonadTrans (Pipe a b)
// TODO Monad m => Monad (Pipe a b m)
// TODO Monad m => Functor (Pipe a b m)
// TODO Monad m => Applicative (Pipe a b m)
}
// A pipe that can only produce values
type Producer[B, M, R] = Pipe[null, B, M, R]
// A pipe that can only consume values
type Consumer[A, M, R] = Pipe[A, null, M, R]
// A self-contained pipeline that is ready to be run
type Pipeline[M, R] = Pipe[null, null, M, R]
object PipeHandler {
implicit val ShowPipeHandler: Show[PipeHandler] = showA
implicit val EqualPipeHandler: Equal[PipeHandler] = equalA
implicit val OrderPipeHandler: Order[PipeHandler] = orderBy {
case PipeHandler(order, _, _) => order
}
// TODO add more behaviors relevant to this concept of "pipe"
}
case class PipeHandler(order: Int, name: String, handler: ChannelHandler)
// Simple/raw translation of the [bad] Java Netty WS example to Scala. Nothing interesting...just first phase.
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.handler.codec.http.HttpHeaders._
import org.jboss.netty.handler.codec.http.HttpHeaders.Names._
import org.jboss.netty.handler.codec.http.HttpHeaders.Values._
import org.jboss.netty.handler.codec.http.HttpMethod._
import org.jboss.netty.handler.codec.http.HttpResponseStatus._
import org.jboss.netty.handler.codec.http.HttpVersion._
import org.jboss.netty.buffer.{ChannelBuffer,ChannelBuffers}
import org.jboss.netty.channel.{ChannelFuture,ChannelFutureListener,ChannelHandlerContext,ChannelPipeline,ExceptionEvent,MessageEvent,SimpleChannelUpstreamHandler}
import org.jboss.netty.channel.Channels._
import org.jboss.netty.channel.{ChannelPipeline,ChannelPipelineFactory}
import org.jboss.netty.handler.codec.http.{HttpChunkAggregator,HttpRequestDecoder,HttpResponseEncoder}
import org.jboss.netty.handler.codec.http.{DefaultHttpResponse,HttpHeaders,HttpRequest,HttpResponse,HttpResponseStatus}
import org.jboss.netty.handler.codec.http.HttpHeaders.{Names,Values}
import org.jboss.netty.handler.codec.http.websocket.{DefaultWebSocketFrame,WebSocketFrame,WebSocketFrameDecoder,WebSocketFrameEncoder}
import org.jboss.netty.util.CharsetUtil;
case class UnknownRequestType(msg: String) extends Exception(msg)
object WebSocketServerHandler {
val WS_PATH = "/ws"
}
class WebSocketServerHandler extends SimpleChannelUpstreamHandler {
@throws(classOf[Exception])
override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent) {
val msg = e.getMessage()
msg match {
case _: HttpRequest => handleHttpRequest(ctx, msg.asInstanceOf[HttpRequest])
case _: WebSocketFrame => handleWebSocketFrame(ctx, msg.asInstanceOf[WebSocketFrame])
case _ => throw new UnknownRequestType("Unknown request message type: " + msg.getClass.getName)
}
}
private[this] def handleHttpRequest(ctx: ChannelHandlerContext, req: HttpRequest) {
req.getUri match {
case WebSocketServerHandler.WS_PATH => sendHttpResponse(ctx, req, new DefaultHttpResponse(HTTP_1_1, FORBIDDEN))
case _ => sendHttpResponse(ctx, req, new DefaultHttpResponse(HTTP_1_1, NOT_FOUND))
}
}
/*
* This is the meat of the server event processing
* Here we square the number and return it
*/
private[this] def handleWebSocketFrame(ctx: ChannelHandlerContext, frame: WebSocketFrame) {
System.out.println(frame.getTextData)
try {
val number = Integer.parseInt(frame.getTextData)
ctx.getChannel().write(new DefaultWebSocketFrame((number*number).toString))
} catch {
case e: NumberFormatException =>
ctx.getChannel().write(new DefaultWebSocketFrame("ERROR"))
}
}
private[this] def sendHttpResponse(ctx: ChannelHandlerContext, req: HttpRequest, res: HttpResponse) {
// Generate an error page if response status code is not OK (200).
res.getStatus.getCode match {
case 200 =>
res.setContent(ChannelBuffers.copiedBuffer(res.getStatus.toString, CharsetUtil.UTF_8))
setContentLength(res, res.getContent.readableBytes)
case _ =>
if (!isKeepAlive(req)) {
val f = ctx.getChannel.write(res)
f.addListener(ChannelFutureListener.CLOSE)
}
}
}
@throws(classOf[Exception])
override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent) {
e.getCause.printStackTrace
e.getChannel.close
}
private[this] def getWebSocketLocation(req: HttpRequest) = "ws://" + req.getHeader(HttpHeaders.Names.HOST) + WebSocketServerHandler.WS_PATH
}
class WebSocketServerPipelineFactory extends ChannelPipelineFactory {
@throws(classOf[Exception])
def getPipeline(): ChannelPipeline = {
val pl = pipeline()
pl.addLast("decoder", new HttpRequestDecoder)
pl.addLast("aggregator", new HttpChunkAggregator(65536))
pl.addLast("encoder", new HttpResponseEncoder)
pl.addLast("handler", new WebSocketServerHandler)
pl
}
}
/**
* Representing an HTTP server that will serve WebSocket requests on path /ws
*/
object WebSocketRunner {
val PORT: Int = 8080
def main(args: Array[String]) {
// Configure the server.
val bootstrap = new ServerBootstrap(
new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool()))
// Set up the event pipeline factory.
bootstrap.setPipelineFactory(new WebSocketServerPipelineFactory())
// Bind and start to accept incoming connections
bootstrap.bind(new InetSocketAddress(PORT))
println("WebSocket Server Started")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment