diff --git a/.githooks/pre-commit b/.githooks/pre-commit new file mode 100755 index 0000000..d44245c --- /dev/null +++ b/.githooks/pre-commit @@ -0,0 +1,14 @@ +#!/usr/bin/env bash +# pre-commit hook: runs fmt check, build, and clippy before every commit. +set -e + +echo "==> [pre-commit] Checking formatting..." +cargo fmt --all -- --check + +echo "==> [pre-commit] Building..." +cargo build --all --quiet + +echo "==> [pre-commit] Running clippy..." +cargo clippy --all-targets --all-features -- -D warnings + +echo "==> [pre-commit] All checks passed!" diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..c9394cf --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,76 @@ +name: CI + +on: + push: + branches: ["**"] + pull_request: + branches: ["**"] + +jobs: + check: + name: Format / Clippy / Test + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - name: Install stable Rust + uses: dtolnay/rust-toolchain@stable + with: + components: rustfmt, clippy + + - name: Cache cargo registry + uses: actions/cache@v4 + with: + path: | + ~/.cargo/registry + ~/.cargo/git + target + key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }} + + - name: Check formatting + run: cargo fmt --all -- --check + + - name: Clippy (no warnings) + run: cargo clippy --all-targets --all-features -- -D warnings + + - name: Run unit & integration tests + run: cargo test --all + + e2e: + name: End-to-End smoke test + runs-on: ubuntu-latest + needs: check + steps: + - uses: actions/checkout@v4 + + - name: Install stable Rust + uses: dtolnay/rust-toolchain@stable + + - name: Cache cargo registry + uses: actions/cache@v4 + with: + path: | + ~/.cargo/registry + ~/.cargo/git + target + key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }} + + - name: Build release binaries + run: cargo build --release --all + + - name: Start server in background + run: | + ./target/release/server & + echo "SERVER_PID=$!" >> "$GITHUB_ENV" + # Wait for server to be ready + sleep 1 + + - name: Send a message via client (non-interactive) + run: | + # Use a here-doc to pipe commands into the client's stdin + # The client reads stdin line by line; we send a message then leave. + printf 'send Hello from CI!\nleave\n' | \ + HOST=127.0.0.1 PORT=8080 USERNAME=ci_bot ./target/release/client + EXIT=$? + kill "$SERVER_PID" || true + exit $EXIT diff --git a/.gitignore b/.gitignore index 6985cf1..9f97022 100644 --- a/.gitignore +++ b/.gitignore @@ -1,14 +1 @@ -# Generated by Cargo -# will have compiled files and executables -debug/ -target/ - -# Remove Cargo.lock from gitignore if creating an executable, leave it for libraries -# More information here https://doc.rust-lang.org/cargo/guide/cargo-toml-vs-cargo-lock.html -Cargo.lock - -# These are backup files generated by rustfmt -**/*.rs.bk - -# MSVC Windows builds of rustc generate these, which store debugging information -*.pdb +target/ \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock new file mode 100644 index 0000000..eb6be16 --- /dev/null +++ b/Cargo.lock @@ -0,0 +1,290 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 4 + +[[package]] +name = "bitflags" +version = "2.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "843867be96c8daad0d758b57df9392b6d8d271134fce549de6ce169ff98a92af" + +[[package]] +name = "bytes" +version = "1.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33" + +[[package]] +name = "cfg-if" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9330f8b2ff13f34540b44e946ef35111825727b38d33286ef986142615121801" + +[[package]] +name = "client" +version = "0.1.0" +dependencies = [ + "serde", + "serde_json", + "tokio", +] + +[[package]] +name = "errno" +version = "0.3.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" +dependencies = [ + "libc", + "windows-sys", +] + +[[package]] +name = "itoa" +version = "1.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f42a60cbdf9a97f5d2305f08a87dc4e09308d1276d28c869c684d7777685682" + +[[package]] +name = "libc" +version = "0.2.184" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48f5d2a454e16a5ea0f4ced81bd44e4cfc7bd3a507b61887c99fd3538b28e4af" + +[[package]] +name = "lock_api" +version = "0.4.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "224399e74b87b5f3557511d98dff8b14089b3dadafcab6bb93eab67d3aace965" +dependencies = [ + "scopeguard", +] + +[[package]] +name = "memchr" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8ca58f447f06ed17d5fc4043ce1b10dd205e060fb3ce5b979b8ed8e59ff3f79" + +[[package]] +name = "mio" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50b7e5b27aa02a74bac8c3f23f448f8d87ff11f92d3aac1a6ed369ee08cc56c1" +dependencies = [ + "libc", + "wasi", + "windows-sys", +] + +[[package]] +name = "parking_lot" +version = "0.12.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93857453250e3077bd71ff98b6a65ea6621a19bb0f559a85248955ac12c45a1a" +dependencies = [ + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.9.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2621685985a2ebf1c516881c026032ac7deafcda1a2c9b7850dc81e3dfcb64c1" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "smallvec", + "windows-link", +] + +[[package]] +name = "pin-project-lite" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a89322df9ebe1c1578d689c92318e070967d1042b512afbe49518723f4e6d5cd" + +[[package]] +name = "proc-macro2" +version = "1.0.106" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8fd00f0bb2e90d81d1044c2b32617f68fcb9fa3bb7640c23e9c748e53fb30934" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "quote" +version = "1.0.45" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41f2619966050689382d2b44f664f4bc593e129785a36d6ee376ddf37259b924" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "redox_syscall" +version = "0.5.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed2bf2547551a7053d6fdfafda3f938979645c44812fbfcda098faae3f1a362d" +dependencies = [ + "bitflags", +] + +[[package]] +name = "scopeguard" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" + +[[package]] +name = "serde" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a8e94ea7f378bd32cbbd37198a4a91436180c5bb472411e48b5ec2e2124ae9e" +dependencies = [ + "serde_core", + "serde_derive", +] + +[[package]] +name = "serde_core" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41d385c7d4ca58e59fc732af25c3983b67ac852c1a25000afe1175de458b67ad" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "serde_json" +version = "1.0.149" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "83fc039473c5595ace860d8c4fafa220ff474b3fc6bfdb4293327f1a37e94d86" +dependencies = [ + "itoa", + "memchr", + "serde", + "serde_core", + "zmij", +] + +[[package]] +name = "server" +version = "0.1.0" +dependencies = [ + "serde", + "serde_json", + "tokio", +] + +[[package]] +name = "signal-hook-registry" +version = "1.4.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4db69cba1110affc0e9f7bcd48bbf87b3f4fc7c61fc9155afd4c469eb3d6c1b" +dependencies = [ + "errno", + "libc", +] + +[[package]] +name = "smallvec" +version = "1.15.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03" + +[[package]] +name = "socket2" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a766e1110788c36f4fa1c2b71b387a7815aa65f88ce0229841826633d93723e" +dependencies = [ + "libc", + "windows-sys", +] + +[[package]] +name = "syn" +version = "2.0.117" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e665b8803e7b1d2a727f4023456bbbbe74da67099c585258af0ad9c5013b9b99" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "tokio" +version = "1.50.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "27ad5e34374e03cfffefc301becb44e9dc3c17584f414349ebe29ed26661822d" +dependencies = [ + "bytes", + "libc", + "mio", + "parking_lot", + "pin-project-lite", + "signal-hook-registry", + "socket2", + "tokio-macros", + "windows-sys", +] + +[[package]] +name = "tokio-macros" +version = "2.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c55a2eff8b69ce66c84f85e1da1c233edc36ceb85a2058d11b0d6a3c7e7569c" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "unicode-ident" +version = "1.0.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6e4313cd5fcd3dad5cafa179702e2b244f760991f45397d14d4ebf38247da75" + +[[package]] +name = "wasi" +version = "0.11.1+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ccf3ec651a847eb01de73ccad15eb7d99f80485de043efb2f370cd654f4ea44b" + +[[package]] +name = "windows-link" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5" + +[[package]] +name = "windows-sys" +version = "0.61.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae137229bcbd6cdf0f7b80a31df61766145077ddf49416a728b02cb3921ff3fc" +dependencies = [ + "windows-link", +] + +[[package]] +name = "zmij" +version = "1.0.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8848ee67ecc8aedbaf3e4122217aff892639231befc6a1b58d29fff4c2cabaa" diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..9335fe9 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,3 @@ +[workspace] +members = ["server", "client"] +resolver = "2" diff --git a/README.md b/README.md index 8c4d4e1..fc222b0 100644 --- a/README.md +++ b/README.md @@ -1,71 +1,115 @@ # Simple Chat -## Summary - -You have been tasked with writing a simple asynchronous chat server and CLI -client. - -Since this is a simple chat server there is only a single room. Users may -freely join or leave this room. They may also send messages to the room, which -will be sent to all connected users minus the sender. - -Even though the server is simple, it has high throughput. Because of this, all -code should be non-blocking for maximum concurrency. - -The following is a rough specification of the server and client. - -## Server - -* The servers job is to manage users. -* It should be able to receive a message from a user and process it. -* The user may wish to join, leave or send a message through the chat server. -* Any other user who is currently connected should get the message sent to -them. -* The user who sent the message should not get the message. -* When a user sends a leave message, or disconnects their client, the server -should no longer send messages to them, and do any internal bookkeeping to -clean up. -* Username's should be unique. -* The server should be able to support many users without a large delay -* The server should be able to support many users with a small memory footprint - - -## Client - -* The client is an async CLI program. -* It is responsible for sending messages to the server and displaying any -messages from the server. -* The client should accept environment variables or command line arguments -indicating the host and port where the server is listening. It should also -accept a username that will be used as an identifier on the server. -* The client should automatically connect to the chat server upon -initialization using the specified host and port. -* The client should display an interactive command prompt. The prompt should -be able to handle the following inputs: - * `send ` where `` is the message that should be sent to the - server - * `leave` this will disconnect the client from the server and exit the CLI. - - -## Additional Requirements - -* Your source should contain both unit and integration tests where necessary. -* All code must be formatted using the standard formatting tool. -* Code must compile without clippy errors. - -## Submission - -Please fork this repository to your own GitHub account and submit a pull -request to your own repository. Your pull request should include a -video of a working demo at the top along with any other key information -that should be highlighted. A link to the pull request can be submitted when -it is ready for review. - -## Bonus - -* Include a pre-commit hook that will ensure that all code is formatted, compiles -without error, and is free of clippy errors. -* Create a GitHub Action that will launch your chat server and attempt to -send a message to the server from the client. Make sure that niether the server -or client exit with a failure. This action should be run anytime new code -is pushed to a branch or landed on the main branch. +A minimal async TCP chat server and CLI client written in Rust with Tokio. + +## Demo + +> https://github.com/user-attachments/assets/269ce9b6-45f4-4f91-a5ab-45433cd9feeb + +## Architecture + +``` +┌──────────┐ JSON/TCP ┌────────────────────────────────────────┐ +│ client │ ──────────► │ server │ +│ │ ◄────────── │ • broadcast channel (256 capacity) │ +└──────────┘ │ • one task per connection (non-block) │ + │ • HashMap for users │ + └────────────────────────────────────────┘ +``` + +### Protocol + +All messages are newline-delimited JSON. + +**Client → Server** + +| Message | Payload | Description | +|---------|---------|-------------| +| `Join` | `{ username }` | Must be first message; rejected if name taken | +| `Send` | `{ text }` | Broadcast text to all other users | +| `Leave` | — | Graceful disconnect | + +**Server → Client** + +| Message | Payload | Description | +|---------|---------|-------------| +| `Chat` | `{ from, text }` | A message from another user | +| `Info` | `{ text }` | Join/leave announcements | +| `Error` | `{ text }` | e.g. duplicate username | + +## Running + +### Prerequisites + +```bash +rustup toolchain install stable +``` + +### Install the pre-commit hook (optional but recommended) + +```bash +git config core.hooksPath .githooks +chmod +x .githooks/pre-commit +``` + +### Start the server + +```bash +# defaults: HOST=127.0.0.1 PORT=8080 +cargo run -p server + +# or with custom host/port +HOST=0.0.0.0 PORT=9000 cargo run -p server +``` + +### Start a client + +```bash +# Pass host, port, username as positional args … +cargo run -p client -- 127.0.0.1 8080 alice + +# … or via env vars +HOST=127.0.0.1 PORT=8080 USERNAME=alice cargo run -p client +``` + +### Client commands + +``` +> send Hello everyone! # broadcast a message +> leave # disconnect and exit +``` + +### Multi-user example (three terminals) + +``` +# Terminal 1 +cargo run -p server + +# Terminal 2 +cargo run -p client -- 127.0.0.1 8080 alice + +# Terminal 3 +cargo run -p client -- 127.0.0.1 8080 bob +``` + +## Testing + +```bash +cargo test --all +``` + +## CI / CD + +The GitHub Actions workflow (`.github/workflows/ci.yml`) runs on every push and PR: + +1. **Format check** – `cargo fmt --all -- --check` +2. **Clippy** – zero warnings allowed +3. **Unit + integration tests** – `cargo test --all` +4. **E2E smoke test** – starts the server, connects with the client, sends a message, then leaves + +## Design decisions + +- **`broadcast::channel`** – single channel for all users; each connection task subscribes. Old messages are dropped for lagging readers rather than blocking the sender. +- **`OwnedWriteHalf` behind `Arc>`** – lets the forward task and the main connection task share the TCP writer safely without an extra channel. +- **Non-blocking everywhere** – every I/O call is async; no `std::thread::sleep` or blocking reads. +- **Small memory footprint** – one `broadcast::Sender`, one `HashMap` entry per user, one task pair (read + forward) per connection. diff --git a/client/Cargo.toml b/client/Cargo.toml new file mode 100644 index 0000000..d19ff56 --- /dev/null +++ b/client/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "client" +version = "0.1.0" +edition = "2021" + +[[bin]] +name = "client" +path = "src/main.rs" + +[dependencies] +tokio = { version = "1", features = ["full"] } +serde = { version = "1", features = ["derive"] } +serde_json = "1" + +[dev-dependencies] +tokio = { version = "1", features = ["full"] } diff --git a/client/src/main.rs b/client/src/main.rs new file mode 100644 index 0000000..d1ad168 --- /dev/null +++ b/client/src/main.rs @@ -0,0 +1,217 @@ +use serde::{Deserialize, Serialize}; +use std::io::{self, Write}; +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; +use tokio::net::TcpStream; +use tokio::sync::mpsc; + +/// Must mirror server's ClientMessage +#[derive(Debug, Serialize, Deserialize, Clone)] +#[serde(tag = "type", content = "payload")] +pub enum ClientMessage { + Join { username: String }, + Send { text: String }, + Leave, +} + +/// Must mirror server's ServerMessage +#[derive(Debug, Serialize, Deserialize, Clone)] +#[serde(tag = "type", content = "payload")] +pub enum ServerMessage { + Chat { from: String, text: String }, + Info { text: String }, + Error { text: String }, +} + +#[tokio::main] +async fn main() { + // --- Configuration via env vars or CLI args --- + // Priority: CLI args > env vars > defaults + // Usage: client [host] [port] [username] + let args: Vec = std::env::args().collect(); + + let host = args + .get(1) + .cloned() + .or_else(|| std::env::var("HOST").ok()) + .unwrap_or_else(|| "127.0.0.1".to_string()); + + let port = args + .get(2) + .cloned() + .or_else(|| std::env::var("PORT").ok()) + .unwrap_or_else(|| "8080".to_string()); + + let username = args + .get(3) + .cloned() + .or_else(|| std::env::var("USERNAME").ok()) + .expect("Username required: pass as 3rd argument or set USERNAME env var"); + + let addr = format!("{host}:{port}"); + println!("Connecting to {addr} as '{username}'..."); + + let stream = TcpStream::connect(&addr) + .await + .expect("Failed to connect to server"); + + println!("Connected! Commands: `send ` | `leave`"); + + let (reader, writer) = stream.into_split(); + let mut server_lines = BufReader::new(reader).lines(); + + // Channel to send outgoing messages from stdin task -> writer task + let (out_tx, mut out_rx) = mpsc::channel::(32); + + // --- Task 1: Write outgoing messages to server --- + let write_task = tokio::spawn(async move { + let mut writer = writer; + // Send join + let join = ClientMessage::Join { + username: username.clone(), + }; + write_msg(&mut writer, &join).await; + + while let Some(msg) = out_rx.recv().await { + write_msg(&mut writer, &msg).await; + if matches!(msg, ClientMessage::Leave) { + break; + } + } + }); + + // --- Task 2: Read messages from server and print them --- + let print_task = tokio::spawn(async move { + while let Ok(Some(line)) = server_lines.next_line().await { + match serde_json::from_str::(&line) { + Ok(ServerMessage::Chat { from, text }) => { + println!("\r[{from}]: {text}"); + } + Ok(ServerMessage::Info { text }) => { + println!("\r[INFO]: {text}"); + } + Ok(ServerMessage::Error { text }) => { + eprintln!("\r[ERROR]: {text}"); + } + Err(e) => eprintln!("Parse error: {e}"), + } + // Re-print the prompt after incoming message + print!("> "); + let _ = io::stdout().flush(); + } + }); + + // --- Task 3 (main): Read stdin and dispatch commands --- + let stdin = tokio::io::stdin(); + let mut stdin_lines = BufReader::new(stdin).lines(); + + loop { + print!("> "); + let _ = io::stdout().flush(); + + match stdin_lines.next_line().await { + Ok(Some(line)) => { + let line = line.trim().to_string(); + if line.is_empty() { + continue; + } + if line == "leave" { + let _ = out_tx.send(ClientMessage::Leave).await; + break; + } else if let Some(msg) = line.strip_prefix("send ") { + let _ = out_tx + .send(ClientMessage::Send { + text: msg.to_string(), + }) + .await; + } else { + eprintln!("Unknown command. Use: `send ` or `leave`"); + } + } + _ => break, // EOF / error + } + } + + // Graceful shutdown + write_task.await.ok(); + print_task.abort(); + println!("Disconnected. Goodbye!"); +} + +async fn write_msg(writer: &mut tokio::net::tcp::OwnedWriteHalf, msg: &ClientMessage) { + let mut s = serde_json::to_string(msg).unwrap(); + s.push('\n'); + if let Err(e) = writer.write_all(s.as_bytes()).await { + eprintln!("Write error: {e}"); + } +} + +// ─── Tests ─────────────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn serialize_join() { + let msg = ClientMessage::Join { + username: "alice".into(), + }; + let s = serde_json::to_string(&msg).unwrap(); + assert!(s.contains("Join")); + assert!(s.contains("alice")); + } + + #[test] + fn serialize_send() { + let msg = ClientMessage::Send { + text: "hello world".into(), + }; + let s = serde_json::to_string(&msg).unwrap(); + assert!(s.contains("Send")); + assert!(s.contains("hello world")); + } + + #[test] + fn serialize_leave() { + let msg = ClientMessage::Leave; + let s = serde_json::to_string(&msg).unwrap(); + assert!(s.contains("Leave")); + } + + #[test] + fn deserialize_chat() { + let s = r#"{"type":"Chat","payload":{"from":"bob","text":"hi"}}"#; + let msg: ServerMessage = serde_json::from_str(s).unwrap(); + assert!( + matches!(msg, ServerMessage::Chat { ref from, ref text } if from == "bob" && text == "hi") + ); + } + + #[test] + fn deserialize_info() { + let s = r#"{"type":"Info","payload":{"text":"alice joined"}}"#; + let msg: ServerMessage = serde_json::from_str(s).unwrap(); + assert!(matches!(msg, ServerMessage::Info { ref text } if text == "alice joined")); + } + + #[test] + fn deserialize_error() { + let s = r#"{"type":"Error","payload":{"text":"Username already taken"}}"#; + let msg: ServerMessage = serde_json::from_str(s).unwrap(); + assert!(matches!(msg, ServerMessage::Error { ref text } if text.contains("taken"))); + } + + #[test] + fn test_command_parse_send() { + let line = "send hello world"; + assert!(line.starts_with("send ")); + let text = line.strip_prefix("send ").unwrap(); + assert_eq!(text, "hello world"); + } + + #[test] + fn test_command_parse_leave() { + let line = "leave"; + assert_eq!(line, "leave"); + } +} diff --git a/demo.mp4 b/demo.mp4 new file mode 100644 index 0000000..3f3d3c6 Binary files /dev/null and b/demo.mp4 differ diff --git a/server/Cargo.toml b/server/Cargo.toml new file mode 100644 index 0000000..670be7c --- /dev/null +++ b/server/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "server" +version = "0.1.0" +edition = "2021" + +[[bin]] +name = "server" +path = "src/main.rs" + +[dependencies] +tokio = { version = "1", features = ["full"] } +serde = { version = "1", features = ["derive"] } +serde_json = "1" + +[dev-dependencies] +tokio = { version = "1", features = ["full"] } diff --git a/server/src/main.rs b/server/src/main.rs new file mode 100644 index 0000000..44b6cf1 --- /dev/null +++ b/server/src/main.rs @@ -0,0 +1,384 @@ +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::net::SocketAddr; +use std::sync::Arc; +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; +use tokio::net::{TcpListener, TcpStream}; +use tokio::sync::{broadcast, Mutex}; + +/// Messages sent from client to server +#[derive(Debug, Serialize, Deserialize, Clone)] +#[serde(tag = "type", content = "payload")] +pub enum ClientMessage { + Join { username: String }, + Send { text: String }, + Leave, +} + +/// Messages sent from server -> client +#[derive(Debug, Serialize, Deserialize, Clone)] +#[serde(tag = "type", content = "payload")] +pub enum ServerMessage { + /// A chat message broadcast to other users + Chat { from: String, text: String }, + /// Acknowledgement / status messages + Info { text: String }, + /// Error (e.g. username taken) + Error { text: String }, +} + +/// Shared state: maps username -> sender so we can detect duplicates +type Users = Arc>>; + +#[tokio::main] +async fn main() { + let host = std::env::var("HOST").unwrap_or_else(|_| "127.0.0.1".to_string()); + let port = std::env::var("PORT").unwrap_or_else(|_| "8080".to_string()); + let addr = format!("{host}:{port}"); + + let listener = TcpListener::bind(&addr).await.expect("Failed to bind"); + println!("Server listening on {addr}"); + + // broadcast channel – capacity 256, old messages dropped for slow readers + let (tx, _rx) = broadcast::channel::(256); + let users: Users = Arc::new(Mutex::new(HashMap::new())); + + loop { + match listener.accept().await { + Ok((stream, addr)) => { + let tx = tx.clone(); + let users = Arc::clone(&users); + tokio::spawn(handle_connection(stream, addr, tx, users)); + } + Err(e) => eprintln!("Accept error: {e}"), + } + } +} + +async fn handle_connection( + stream: TcpStream, + addr: SocketAddr, + tx: broadcast::Sender, + users: Users, +) { + let (reader, writer) = stream.into_split(); + let mut lines = BufReader::new(reader).lines(); + let writer = Arc::new(Mutex::new(writer)); + + // Subscribe to broadcast BEFORE we read the join message so we don't miss anything + let mut rx = tx.subscribe(); + + // First message must be Join + let username = loop { + let line = match lines.next_line().await { + Ok(Some(l)) => l, + _ => return, // client disconnected before joining + }; + match serde_json::from_str::(&line) { + Ok(ClientMessage::Join { username }) => { + let mut locked = users.lock().await; + if locked.contains_key(&username) { + let msg = ServerMessage::Error { + text: "Username already taken".to_string(), + }; + let _ = send_msg(&writer, &msg).await; + return; + } + locked.insert(username.clone(), addr); + drop(locked); + + let info = ServerMessage::Info { + text: format!("{username} joined the chat"), + }; + let _ = tx.send(info); + break username; + } + _ => { + let _ = send_msg( + &writer, + &ServerMessage::Error { + text: "Send Join message first".to_string(), + }, + ) + .await; + } + } + }; + + // Task: forward broadcast messages to this client + let writer_clone = Arc::clone(&writer); + let username_clone = username.clone(); + let forward_task = tokio::spawn(async move { + loop { + match rx.recv().await { + Ok(msg) => { + // Don't echo back messages sent by this user + if let ServerMessage::Chat { ref from, .. } = msg { + if from == &username_clone { + continue; + } + } + if send_msg(&writer_clone, &msg).await.is_err() { + break; + } + } + Err(broadcast::error::RecvError::Lagged(n)) => { + eprintln!("Client {username_clone} lagged by {n} messages"); + } + Err(broadcast::error::RecvError::Closed) => break, + } + } + }); + + // Main loop: read messages from this client + loop { + let line = match lines.next_line().await { + Ok(Some(l)) => l, + _ => break, // disconnected + }; + + match serde_json::from_str::(&line) { + Ok(ClientMessage::Send { text }) => { + let msg = ServerMessage::Chat { + from: username.clone(), + text, + }; + let _ = tx.send(msg); + } + Ok(ClientMessage::Leave) | Ok(ClientMessage::Join { .. }) => break, + Err(e) => eprintln!("Parse error from {username}: {e}"), + } + } + + // Cleanup + forward_task.abort(); + users.lock().await.remove(&username); + let _ = tx.send(ServerMessage::Info { + text: format!("{username} left the chat"), + }); + println!("{username} disconnected"); +} + +/// Serialize and write a server message to the writer +async fn send_msg( + writer: &Arc>, + msg: &ServerMessage, +) -> std::io::Result<()> { + let mut json = serde_json::to_string(msg).unwrap(); + json.push('\n'); + writer.lock().await.write_all(json.as_bytes()).await +} + +// ─── Tests ─────────────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::*; + use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; + use tokio::net::TcpListener; + + /// Spin up a real server on a random port and return the address. + async fn start_test_server() -> SocketAddr { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + let (tx, _) = broadcast::channel::(256); + let users: Users = Arc::new(Mutex::new(HashMap::new())); + tokio::spawn(async move { + loop { + let (stream, peer) = listener.accept().await.unwrap(); + let tx = tx.clone(); + let users = Arc::clone(&users); + tokio::spawn(handle_connection(stream, peer, tx, users)); + } + }); + addr + } + + async fn connect( + addr: SocketAddr, + ) -> ( + tokio::net::tcp::OwnedWriteHalf, + tokio::io::Lines>, + ) { + let stream = TcpStream::connect(addr).await.unwrap(); + let (r, w) = stream.into_split(); + (w, BufReader::new(r).lines()) + } + + async fn write_msg(w: &mut tokio::net::tcp::OwnedWriteHalf, msg: &ClientMessage) { + let mut s = serde_json::to_string(msg).unwrap(); + s.push('\n'); + w.write_all(s.as_bytes()).await.unwrap(); + } + + async fn read_msg( + lines: &mut tokio::io::Lines>, + ) -> ServerMessage { + let line = lines.next_line().await.unwrap().unwrap(); + serde_json::from_str(&line).unwrap() + } + + // ── Unit tests ────────────────────────────────────────────────────────── + + #[test] + fn client_message_serialization() { + let msg = ClientMessage::Join { + username: "alice".to_string(), + }; + let json = serde_json::to_string(&msg).unwrap(); + let decoded: ClientMessage = serde_json::from_str(&json).unwrap(); + assert!(matches!(decoded, ClientMessage::Join { ref username } if username == "alice")); + } + + #[test] + fn server_message_serialization() { + let msg = ServerMessage::Chat { + from: "bob".to_string(), + text: "hello".to_string(), + }; + let json = serde_json::to_string(&msg).unwrap(); + let decoded: ServerMessage = serde_json::from_str(&json).unwrap(); + assert!( + matches!(decoded, ServerMessage::Chat { ref from, ref text } if from == "bob" && text == "hello") + ); + } + + // ── Integration tests ─────────────────────────────────────────────────── + + #[tokio::test] + async fn test_join_and_receive_info() { + let addr = start_test_server().await; + let (mut w, mut lines) = connect(addr).await; + + write_msg( + &mut w, + &ClientMessage::Join { + username: "alice".to_string(), + }, + ) + .await; + + // Server broadcasts "alice joined the chat" – alice herself also gets it via broadcast + // (she's a subscriber). The Info message is not filtered, only Chat is filtered. + let msg = read_msg(&mut lines).await; + assert!(matches!(msg, ServerMessage::Info { ref text } if text.contains("alice"))); + } + + #[tokio::test] + async fn test_duplicate_username_rejected() { + let addr = start_test_server().await; + + let (mut w1, _lines1) = connect(addr).await; + write_msg( + &mut w1, + &ClientMessage::Join { + username: "bob".to_string(), + }, + ) + .await; + tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; + + let (mut w2, mut lines2) = connect(addr).await; + write_msg( + &mut w2, + &ClientMessage::Join { + username: "bob".to_string(), + }, + ) + .await; + + let msg = read_msg(&mut lines2).await; + assert!(matches!(msg, ServerMessage::Error { ref text } if text.contains("taken"))); + } + + #[tokio::test] + async fn test_message_broadcast_to_others() { + let addr = start_test_server().await; + + // alice joins + let (mut wa, mut la) = connect(addr).await; + write_msg( + &mut wa, + &ClientMessage::Join { + username: "alice".to_string(), + }, + ) + .await; + // drain alice's join info + read_msg(&mut la).await; + + // bob joins – alice gets bob's join info + let (mut wb, mut lb) = connect(addr).await; + write_msg( + &mut wb, + &ClientMessage::Join { + username: "bob".to_string(), + }, + ) + .await; + // drain bob's join info (alice sees it, bob sees it) + read_msg(&mut la).await; // alice sees "bob joined" + read_msg(&mut lb).await; // bob sees "bob joined" + + // bob sends a message + write_msg( + &mut wb, + &ClientMessage::Send { + text: "hi alice!".to_string(), + }, + ) + .await; + + // alice should receive it + let msg = read_msg(&mut la).await; + assert!( + matches!(msg, ServerMessage::Chat { ref from, ref text } if from == "bob" && text == "hi alice!") + ); + + // Give bob a moment – he should NOT receive his own message + // We verify by sending another message from alice and checking bob gets that + write_msg( + &mut wa, + &ClientMessage::Send { + text: "hey bob".to_string(), + }, + ) + .await; + let msg = read_msg(&mut lb).await; + assert!(matches!(msg, ServerMessage::Chat { ref from, .. } if from == "alice")); + } + + #[tokio::test] + async fn test_leave_removes_user() { + let addr = start_test_server().await; + + let (mut wa, mut la) = connect(addr).await; + write_msg( + &mut wa, + &ClientMessage::Join { + username: "charlie".to_string(), + }, + ) + .await; + read_msg(&mut la).await; // join info + + let (mut wb, mut lb) = connect(addr).await; + write_msg( + &mut wb, + &ClientMessage::Join { + username: "dave".to_string(), + }, + ) + .await; + read_msg(&mut la).await; // charlie sees dave join + read_msg(&mut lb).await; // dave sees own join + + // dave leaves + write_msg(&mut wb, &ClientMessage::Leave).await; + tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; + + // charlie should see "dave left" + let msg = read_msg(&mut la).await; + assert!(matches!(msg, ServerMessage::Info { ref text } if text.contains("dave"))); + } +}