Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save surfingtomchen/e7e9db916270134453928069e29b80b8 to your computer and use it in GitHub Desktop.
Save surfingtomchen/e7e9db916270134453928069e29b80b8 to your computer and use it in GitHub Desktop.
Async in actix websocket actor
pub struct WsActor;
#[derive(Debug)]
struct BinResponse(websocket::Response);
#[derive(Message)]
#[rtype(result = "Result<(), ()>")]
struct Router(Bytes);
impl Actor for WsActor {
type Context = ws::WebsocketContext<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
println!("Websocket is started!!!!!");
ctx.ping(b"");
}
}
impl StreamHandler<BinResponse> for WsActor {
fn handle(&mut self, msg: BinResponse, ctx: &mut Self::Context) {
let resp = msg.0;
let b = resp.write_to_bytes().unwrap();
ctx.binary(b);
}
}
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WsActor {
fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
println!("WS: {:?}", msg);
match msg {
Ok(ws::Message::Ping(msg)) => {
ctx.pong(&msg);
}
Ok(ws::Message::Text(_)) => {
ctx.text("No text message allowed.".to_string());
}
Ok(ws::Message::Binary(bin)) => ctx.notify(Router(bin)),
Ok(ws::Message::Close(reason)) => {
ctx.close(reason);
ctx.stop();
}
_ => (),
}
}
}
impl Handler<Router> for WsActor {
type Result = Result<(), ()>;
fn handle(&mut self, msg: Router, ctx: &mut Self::Context) -> Self::Result {
let fut = async move {
let bytes = msg.0;
let mut wrapper = websocket::Request::new();
let mut stream = CodedInputStream::from_bytes(bytes.as_ref());
let mut error_reply = websocket::Response::new();
error_reply.field_type = ResponseType::ResponseTypeNone;
error_reply.code = AppError::ProtobufError { cause: "".to_owned() }.error_code();
error_reply.set_none_response(NoneResp::new());
if let Err(e) = wrapper.merge_from(&mut stream) {
common::error!("failed to parse protobuf; err = {:?}", e);
error_reply.error_message = format!("failed to parse protobuf: {:?}", e);
return error_reply;
}
match wrapper.field_type {
RequestType::TypeHelloReq => handlers::hello(wrapper.get_hello_request()).await,
_ => handlers::hello(wrapper.get_hello_request()).await,
}
.unwrap_or_else(|err| {
error_reply.error_message = format!("failed to handle websocket request: {:?}", err);
error_reply
})
};
ctx.add_stream(stream::once(async { BinResponse(fut.await) }));
Ok(())
}
}
impl WsActor {
pub fn new() -> Self {
WsActor
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment