From a2c3e66404f567b1efa954e67529ba552e4c5d72 Mon Sep 17 00:00:00 2001 From: gbaranski Date: Sun, 5 Mar 2023 23:31:36 +0100 Subject: [PATCH 1/3] feat: prepare for actix-web support, missing core stream functionality --- Cargo.toml | 5 +- examples/chat-server-actix-web/Cargo.toml | 15 +++ examples/chat-server-actix-web/src/main.rs | 145 +++++++++++++++++++++ src/actix_web.rs | 133 +++++++++++++++++++ src/lib.rs | 3 + 5 files changed, 300 insertions(+), 1 deletion(-) create mode 100644 examples/chat-server-actix-web/Cargo.toml create mode 100644 examples/chat-server-actix-web/src/main.rs create mode 100644 src/actix_web.rs diff --git a/Cargo.toml b/Cargo.toml index ade0e62..c17ede9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,6 +24,8 @@ cfg-if = "1.0.0" axum_crate = { package = "axum", version = "0.6.1", features = ["ws"], optional = true } tokio-tungstenite = { version = "0.18.0", optional = true } +actix-web_crate = { package = "actix-web", version = "4.3.1", optional = true } +actix-http = { version = "3.3.1", optional = true } [features] default = ["client", "server"] @@ -33,6 +35,7 @@ client = ["tokio-tungstenite"] server = [] tungstenite = ["server", "tokio-tungstenite"] axum = ["server", "axum_crate"] +actix-web = ["actix-web_crate", "actix-http"] [dev-dependencies] tokio = { version = "1.17.0", features = ["full"] } @@ -46,7 +49,7 @@ all-features = true rustdoc-args = ["--cfg", "docsrs"] [workspace] -members = ["examples/chat-client", "examples/chat-server", "examples/chat-server-axum", "examples/echo-server", "examples/simple-client", "examples/counter-server"] +members = ["examples/chat-client", "examples/chat-server", "examples/chat-server-axum", "examples/chat-server-actix-web", "examples/echo-server", "examples/simple-client", "examples/counter-server"] [[test]] name = "axum" diff --git a/examples/chat-server-actix-web/Cargo.toml b/examples/chat-server-actix-web/Cargo.toml new file mode 100644 index 0000000..505e3be --- /dev/null +++ b/examples/chat-server-actix-web/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "ezsockets-chat-actix-web" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +actix-web = "4.3.1" +async-trait = "0.1.52" +axum = "0.6.1" +ezsockets = { path = "../../", features = ["actix-web"] } +tokio = { version = "1.17.0", features = ["full"] } +tracing = "0.1.32" +tracing-subscriber = "0.3.9" diff --git a/examples/chat-server-actix-web/src/main.rs b/examples/chat-server-actix-web/src/main.rs new file mode 100644 index 0000000..531fb7f --- /dev/null +++ b/examples/chat-server-actix-web/src/main.rs @@ -0,0 +1,145 @@ +use async_trait::async_trait; +use axum::extract::Extension; +use axum::response::IntoResponse; +use axum::routing::get; +use axum::Router; +use ezsockets::axum::Upgrade; +use ezsockets::Error; +use ezsockets::Server; +use ezsockets::Socket; +use std::collections::HashMap; +use std::io::BufRead; +use std::net::SocketAddr; + +type SessionID = u16; +type Session = ezsockets::Session; + +#[derive(Debug)] +enum ChatMessage { + Send { from: SessionID, text: String }, +} + +struct ChatServer { + sessions: HashMap, + handle: Server, +} + +#[async_trait] +impl ezsockets::ServerExt for ChatServer { + type Session = ChatSession; + type Params = ChatMessage; + + async fn accept( + &mut self, + socket: Socket, + _address: SocketAddr, + _args: ::Args, + ) -> Result { + let id = (0..).find(|i| !self.sessions.contains_key(i)).unwrap_or(0); + let session = Session::create( + |_| ChatSession { + id, + server: self.handle.clone(), + }, + id, + socket, + ); + self.sessions.insert(id, session.clone()); + Ok(session) + } + + async fn disconnected( + &mut self, + id: ::ID, + ) -> Result<(), Error> { + assert!(self.sessions.remove(&id).is_some()); + Ok(()) + } + + async fn call(&mut self, params: Self::Params) -> Result<(), Error> { + match params { + ChatMessage::Send { text, from } => { + let sessions = self.sessions.iter().filter(|(id, _)| from != **id); + let text = format!("from {from}: {text}"); + for (id, handle) in sessions { + tracing::info!("sending {text} to {id}"); + handle.text(text.clone()); + } + } + }; + Ok(()) + } +} + +struct ChatSession { + id: SessionID, + server: Server, +} + +#[async_trait] +impl ezsockets::SessionExt for ChatSession { + type ID = SessionID; + type Args = (); + type Params = (); + + fn id(&self) -> &Self::ID { + &self.id + } + async fn text(&mut self, text: String) -> Result<(), Error> { + tracing::info!("received: {text}"); + self.server.call(ChatMessage::Send { + from: self.id, + text, + }); + Ok(()) + } + + async fn binary(&mut self, _bytes: Vec) -> Result<(), Error> { + unimplemented!() + } + + async fn call(&mut self, params: Self::Params) -> Result<(), Error> { + let () = params; + Ok(()) + } +} + +#[tokio::main] +async fn main() { + tracing_subscriber::fmt::init(); + let (server, _) = Server::create(|handle| ChatServer { + sessions: HashMap::new(), + handle, + }); + + let app = Router::new() + .route("/websocket", get(websocket_handler)) + .layer(Extension(server.clone())); + + let address = SocketAddr::from(([127, 0, 0, 1], 8080)); + + tokio::spawn(async move { + tracing::debug!("listening on {}", address); + axum::Server::bind(&address) + .serve(app.into_make_service_with_connect_info::()) + .await + .unwrap(); + }); + + let stdin = std::io::stdin(); + let lines = stdin.lock().lines(); + for line in lines { + let line = line.unwrap(); + server.call(ChatMessage::Send { + text: line, + from: SessionID::MAX, // reserve some ID for the server + }); + } +} + +async fn websocket_handler( + Extension(server): Extension>, + ezsocket: Upgrade, +) -> impl IntoResponse { + ezsocket.on_upgrade(server, ()) +} diff --git a/src/actix_web.rs b/src/actix_web.rs new file mode 100644 index 0000000..9e5edca --- /dev/null +++ b/src/actix_web.rs @@ -0,0 +1,133 @@ +// This code comes mostly from https://github.com/actix/actix-web and actix-web-actors crate + +use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; + +use actix_http::ws::hash_key; +pub use actix_http::ws::{CloseCode, CloseReason, Frame, HandshakeError, Message, ProtocolError}; +use actix_web::{ + error::Error, + http::{ + header::{self, HeaderValue}, + Method, StatusCode, + }, + web, HttpRequest, HttpResponse, +}; +use actix_web_crate as actix_web; +use tokio::net::TcpStream; +use tokio_tungstenite::tungstenite; + +use crate::{socket::Config, Server, ServerExt, SessionExt, Socket}; + +pub async fn accept( + req: HttpRequest, + payload: web::Payload, + server: Server, + args: ::Args, +) -> Result<(HttpResponse, SX::ID), Error> +where + SE: ServerExt, + SX: SessionExt, +{ + // WebSocket accepts only GET + if *req.method() != Method::GET { + Err(HandshakeError::GetMethodRequired)?; + } + + // check for "UPGRADE" to WebSocket header + let has_hdr = if let Some(hdr) = req.headers().get(&header::UPGRADE) { + if let Ok(s) = hdr.to_str() { + s.to_ascii_lowercase().contains("websocket") + } else { + false + } + } else { + false + }; + if !has_hdr { + Err(HandshakeError::NoWebsocketUpgrade)? + } + + // Upgrade connection + if !req.head().upgrade() { + Err(HandshakeError::NoConnectionUpgrade)? + } + + // check supported version + if !req.headers().contains_key(&header::SEC_WEBSOCKET_VERSION) { + Err(HandshakeError::NoVersionHeader)? + } + let supported_ver = { + if let Some(hdr) = req.headers().get(&header::SEC_WEBSOCKET_VERSION) { + hdr == "13" || hdr == "8" || hdr == "7" + } else { + false + } + }; + if !supported_ver { + Err(HandshakeError::UnsupportedVersion)? + } + + // check client handshake for validity + if !req.headers().contains_key(&header::SEC_WEBSOCKET_KEY) { + Err(HandshakeError::BadWebsocketKey)? + } + let key = { + let key = req.headers().get(&header::SEC_WEBSOCKET_KEY).unwrap(); + hash_key(key.as_ref()) + }; + + // TODO: Remove this + let protocols: &[&'static str] = &[]; + // check requested protocols + let protocol = req + .headers() + .get(&header::SEC_WEBSOCKET_PROTOCOL) + .and_then(|req_protocols| { + let req_protocols = req_protocols.to_str().ok()?; + req_protocols + .split(',') + .map(|req_p| req_p.trim()) + .find(|req_p| protocols.iter().any(|p| p == req_p)) + }); + + let mut response = HttpResponse::build(StatusCode::SWITCHING_PROTOCOLS) + .upgrade("websocket") + .insert_header(( + header::SEC_WEBSOCKET_ACCEPT, + // key is known to be header value safe ascii + HeaderValue::from_bytes(&key).unwrap(), + )) + .take(); + + if let Some(protocol) = protocol { + response.insert_header((header::SEC_WEBSOCKET_PROTOCOL, protocol)); + } + + // TODO: Somehow construct a stream that satisfies AsyncRead + AsyncWrite + Unpin + let stream = (|| todo!())(); + // The TcpStream is just for now, to satisfy the trait bounds + let websocket_stream = tokio_tungstenite::WebSocketStream::::from_raw_socket( + stream, + tungstenite::protocol::Role::Server, + None, + ) + .await; + + let socket = Socket::new(websocket_stream, Config::default()); + + let address = req + .peer_addr() + .or_else(|| { + // Using this random address, because the `peer_addr()` is going to return `None` only during the unit test anyways + Some(SocketAddr::V4(SocketAddrV4::new( + Ipv4Addr::new(123, 123, 123, 123), + 1234, + ))) + }) + .unwrap(); + + let session_id = server.accept(socket, address, args).await; + + let response = response.await?; + Ok((response, session_id)) +} diff --git a/src/lib.rs b/src/lib.rs index 9f37645..6f82ad3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -18,6 +18,9 @@ pub use socket::Stream; #[cfg(feature = "axum")] pub mod axum; +#[cfg(feature = "actix-web")] +pub mod actix_web; + #[cfg(feature = "tokio-tungstenite")] pub mod tungstenite; From 95c75145007ade0cfd9db0f4047c845a0cb19125 Mon Sep 17 00:00:00 2001 From: gbaranski Date: Fri, 17 Mar 2023 13:32:11 +0100 Subject: [PATCH 2/3] fix(examples/chat-server-actix-web): update for new API --- examples/chat-server-actix-web/src/main.rs | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/examples/chat-server-actix-web/src/main.rs b/examples/chat-server-actix-web/src/main.rs index 531fb7f..f206652 100644 --- a/examples/chat-server-actix-web/src/main.rs +++ b/examples/chat-server-actix-web/src/main.rs @@ -27,9 +27,9 @@ struct ChatServer { #[async_trait] impl ezsockets::ServerExt for ChatServer { type Session = ChatSession; - type Params = ChatMessage; + type Call = ChatMessage; - async fn accept( + async fn on_connect( &mut self, socket: Socket, _address: SocketAddr, @@ -48,7 +48,7 @@ impl ezsockets::ServerExt for ChatServer { Ok(session) } - async fn disconnected( + async fn on_disconnect( &mut self, id: ::ID, ) -> Result<(), Error> { @@ -56,8 +56,8 @@ impl ezsockets::ServerExt for ChatServer { Ok(()) } - async fn call(&mut self, params: Self::Params) -> Result<(), Error> { - match params { + async fn on_call(&mut self, call: Self::Call) -> Result<(), Error> { + match call { ChatMessage::Send { text, from } => { let sessions = self.sessions.iter().filter(|(id, _)| from != **id); let text = format!("from {from}: {text}"); @@ -80,12 +80,12 @@ struct ChatSession { impl ezsockets::SessionExt for ChatSession { type ID = SessionID; type Args = (); - type Params = (); + type Call = (); fn id(&self) -> &Self::ID { &self.id } - async fn text(&mut self, text: String) -> Result<(), Error> { + async fn on_text(&mut self, text: String) -> Result<(), Error> { tracing::info!("received: {text}"); self.server.call(ChatMessage::Send { from: self.id, @@ -94,12 +94,12 @@ impl ezsockets::SessionExt for ChatSession { Ok(()) } - async fn binary(&mut self, _bytes: Vec) -> Result<(), Error> { + async fn on_binary(&mut self, _bytes: Vec) -> Result<(), Error> { unimplemented!() } - async fn call(&mut self, params: Self::Params) -> Result<(), Error> { - let () = params; + async fn on_call(&mut self, call: Self::Call) -> Result<(), Error> { + let () = call; Ok(()) } } From e09378fbf190dbe18133598cce08afdf1eca0bdc Mon Sep 17 00:00:00 2001 From: gbaranski Date: Fri, 17 Mar 2023 13:50:00 +0100 Subject: [PATCH 3/3] feat(actix-web): prepare the example API --- examples/chat-server-actix-web/Cargo.toml | 1 - examples/chat-server-actix-web/src/main.rs | 62 +++++++++------------- src/actix_web.rs | 2 +- 3 files changed, 26 insertions(+), 39 deletions(-) diff --git a/examples/chat-server-actix-web/Cargo.toml b/examples/chat-server-actix-web/Cargo.toml index 505e3be..2d1db5e 100644 --- a/examples/chat-server-actix-web/Cargo.toml +++ b/examples/chat-server-actix-web/Cargo.toml @@ -8,7 +8,6 @@ edition = "2021" [dependencies] actix-web = "4.3.1" async-trait = "0.1.52" -axum = "0.6.1" ezsockets = { path = "../../", features = ["actix-web"] } tokio = { version = "1.17.0", features = ["full"] } tracing = "0.1.32" diff --git a/examples/chat-server-actix-web/src/main.rs b/examples/chat-server-actix-web/src/main.rs index f206652..1f59928 100644 --- a/examples/chat-server-actix-web/src/main.rs +++ b/examples/chat-server-actix-web/src/main.rs @@ -1,14 +1,13 @@ +use actix_web::App; +use actix_web::HttpRequest; +use actix_web::HttpResponse; +use actix_web::HttpServer; +use actix_web::web; use async_trait::async_trait; -use axum::extract::Extension; -use axum::response::IntoResponse; -use axum::routing::get; -use axum::Router; -use ezsockets::axum::Upgrade; use ezsockets::Error; use ezsockets::Server; use ezsockets::Socket; use std::collections::HashMap; -use std::io::BufRead; use std::net::SocketAddr; type SessionID = u16; @@ -104,42 +103,31 @@ impl ezsockets::SessionExt for ChatSession { } } -#[tokio::main] -async fn main() { +struct AppState { + server: Server, +} + +#[actix_web::main] +async fn main() -> std::io::Result<()> { tracing_subscriber::fmt::init(); let (server, _) = Server::create(|handle| ChatServer { sessions: HashMap::new(), handle, }); + HttpServer::new(move || { + App::new() + .route("/ws", web::get().to(index)) + .app_data(web::Data::new(AppState { server: server.clone() })) + }) + .bind(("127.0.0.1", 8080))? + .run() + .await +} - let app = Router::new() - .route("/websocket", get(websocket_handler)) - .layer(Extension(server.clone())); - - let address = SocketAddr::from(([127, 0, 0, 1], 8080)); - tokio::spawn(async move { - tracing::debug!("listening on {}", address); - axum::Server::bind(&address) - .serve(app.into_make_service_with_connect_info::()) - .await - .unwrap(); - }); - let stdin = std::io::stdin(); - let lines = stdin.lock().lines(); - for line in lines { - let line = line.unwrap(); - server.call(ChatMessage::Send { - text: line, - from: SessionID::MAX, // reserve some ID for the server - }); - } -} - -async fn websocket_handler( - Extension(server): Extension>, - ezsocket: Upgrade, -) -> impl IntoResponse { - ezsocket.on_upgrade(server, ()) -} +async fn index(req: HttpRequest, stream: web::Payload, data: web::Data) -> Result { + let (resp, id) = ezsockets::actix_web::accept(req, stream, &data.server, ()).await?; + tracing::info!(%id, ?resp, "new connection"); + Ok(resp) +} \ No newline at end of file diff --git a/src/actix_web.rs b/src/actix_web.rs index 9e5edca..2227ebe 100644 --- a/src/actix_web.rs +++ b/src/actix_web.rs @@ -21,7 +21,7 @@ use crate::{socket::Config, Server, ServerExt, SessionExt, Socket}; pub async fn accept( req: HttpRequest, payload: web::Payload, - server: Server, + server: &Server, args: ::Args, ) -> Result<(HttpResponse, SX::ID), Error> where