Skip to content

Instantly share code, notes, and snippets.

@timClicks
Last active November 28, 2023 23:32
Show Gist options
  • Save timClicks/a49a520a4b6970e964f9c8b038f6534a to your computer and use it in GitHub Desktop.
Save timClicks/a49a520a4b6970e964f9c8b038f6534a to your computer and use it in GitHub Desktop.
async chat server
From Comprehensive Rust
use futures_util::stream::StreamExt;
use futures_util::SinkExt;
use http::Uri;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio_websockets::{ClientBuilder, Message};
#[tokio::main]
async fn main() -> Result<(), tokio_websockets::Error> {
let server = Uri::from_static("ws://127.0.0.1:2000");
let (mut ws_stream, _) =
ClientBuilder::from_uri(server)
.connect()
.await?;
let stdin = tokio::io::stdin();
let mut stdin = BufReader::new(stdin).lines();
todo!();
Ok(())
}
use std::error::Error;
use std::net::SocketAddr;
use futures_util::sink::SinkExt;
use futures_util::stream::StreamExt;
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::broadcast::{channel, Sender};
use tokio_websockets::{Message, ServerBuilder, WebsocketStream};
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
let (bcast_tx, _) = channel(16);
let listener = TcpListener::bind("127.0.0.1:2000").await?;
println!("listening on port 2000");
loop {
let (socket, addr) = listener.accept().await?;
println!("New connection from {addr:?}");
let bcast_tx = bcast_tx.clone();
tokio::spawn(async move {
// Wrap the raw TCP stream into a websocket.
let ws_stream = ServerBuilder::new().accept(socket).await?;
handle_connection(addr, ws_stream, bcast_tx).await
});
}
}
async fn handle_connection(
addr: SocketAddr,
mut ws_stream: WebsocketStream<TcpStream>,
bcast_tx: Sender<String>,
) -> Result<(), Box<dyn Error + Send + Sync>> {
// for every incoming message, broadcast it to everyone
todo!()
}
[package]
name = "chat"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
futures-util = { version = "0.3.28", features = ["sink"] }
http = "0.2.9"
tokio = { version = "1.28.1", features = ["full"] }
tokio-websockets = { version = "0.4.0", features = ["client", "fastrand", "server", "sha1_smol"] }
[[bin]]
name = "chat"
path = "src/client.rs"
[[bin]]
name = "chatserv"
path = "src/server.rs"
use futures_util::stream::StreamExt;
use futures_util::SinkExt;
use http::Uri;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio_websockets::{ClientBuilder, Message};
#[tokio::main]
async fn main() -> Result<(), tokio_websockets::Error> {
let server = Uri::from_static("ws://127.0.0.1:2000");
let (mut ws_stream, _) =
ClientBuilder::from_uri(server)
.connect()
.await?;
let stdin = tokio::io::stdin();
let mut stdin= BufReader::new(stdin).lines();
if let Some(Ok(payload)) = ws_stream.next().await {
if let Some(welcome_message) = payload.as_text() {
println!("{welcome_message}");
}
}
loop {
tokio::select! {
incoming = ws_stream.next() => {
match incoming {
Some(Ok(payload)) => {
if let Some(text) = payload.as_text() {
println!("< {text}");
}
},
Some(Err(err)) => return Err(err),
None => {
eprintln!("connection with server terminated");
return Ok(());
},
}
}
input = stdin.next_line() => {
match input {
Ok(None) => return Ok(()),
Ok(Some(line)) => {
let msg = Message::text(line.to_string());
ws_stream.send(msg).await?;
println!("> {line}");
},
Err(err) => return Err(err.into()),
}
}
}
}
}
use std::error::Error;
use std::net::SocketAddr;
use futures_util::sink::SinkExt;
use futures_util::stream::StreamExt;
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::broadcast::{channel, Sender};
use tokio_websockets::{Message, ServerBuilder, WebsocketStream};
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
let (bcast_tx, _) = channel(16);
let listener = TcpListener::bind("127.0.0.1:2000").await?;
println!("listening on port 2000");
loop {
let (socket, addr) = listener.accept().await?;
println!("New connection from {addr:?}");
let bcast_tx = bcast_tx.clone();
tokio::spawn(async move {
// Wrap the raw TCP stream into a websocket.
let ws_stream = ServerBuilder::new().accept(socket).await?;
handle_connection(addr, ws_stream, bcast_tx).await
});
}
}
async fn handle_connection(
addr: SocketAddr,
mut ws_stream: WebsocketStream<TcpStream>,
bcast_tx: Sender<String>,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let welcome_message = "system: Welcome!".to_string();
ws_stream.send(Message::text(welcome_message)).await?;
let mut bcast_rx = bcast_tx.subscribe();
loop {
tokio::select! {
msg = bcast_rx.recv() => {
ws_stream.send(Message::text(msg?)).await?;
}
incoming = ws_stream.next() => {
match incoming {
Some(Ok(msg)) => {
if let Some(text) = msg.as_text() {
bcast_tx.send(text.into())?;
}
},
Some(Err(err)) => return Err(err.into()),
None => return Ok(()),
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment