Skip to content

Latest commit

 

History

History
141 lines (101 loc) · 3.06 KB

File metadata and controls

141 lines (101 loc) · 3.06 KB

Message Passing

Message passing allows threads to communicate by sending each other data. It’s a safe and popular method for achieving concurrency, avoiding direct access to shared memory.

Rust's standard library provides an implementation of channels to enable this pattern.


What is a Channel?

A channel has two halves:

  • Transmitter (tx) – where you send data (like throwing rubber ducks into a river).
  • Receiver (rx) – where you receive the data (like collecting ducks downstream).

A channel is closed if either the transmitter or receiver is dropped.

Use case example: One thread reads data from Redis, and another processes it concurrently.


Basic Example

use std::sync::mpsc; // mpsc = multiple producers, single consumer
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap(); // sends the message to main thread
    });

    let received = rx.recv().unwrap(); // blocks until a message is received
    println!("Got: {received}");
}

Avoid Using .unwrap()

Using .unwrap() can cause the thread to panic if an error occurs. Instead, use pattern matching to gracefully handle errors:

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        tx.send(String::from("Hello World")).unwrap();
    });

    match rx.recv() {
        Ok(value) => println!("{}", value),
        Err(_) => println!("Error while receiving data"),
    }
}

Q: Sum from 1 to 10⁸ Using Threads

❌ This version almost works:

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    for i in 0..10 {
        let producer = tx.clone();
        thread::spawn(move || {
            let mut ans: u64 = 0;
            for j in 0..10_000_000 {
                ans += i * 10_000_000 + j;
            }
            producer.send(ans).unwrap();
        });
    }

    let mut ans: u64 = 0;
    for val in rx {
        ans += val;
        println!("found value");
    }

    println!("Ans is: {}", ans);
}

❓ Why doesn't it fully work?

The rx side (receiver) waits until all senders are dropped. But since the original tx is still in scope, the loop doesn't terminate even after all spawned threads finish.


✅ Fix: Explicitly Drop the Main tx

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    for i in 0..10 {
        let producer = tx.clone();
        thread::spawn(move || {
            let mut ans: u64 = 0;
            for j in 0..10_000_000 {
                ans += i * 10_000_000 + j;
            }
            producer.send(ans).unwrap();
        });
    }

    drop(tx); // Drop original transmitter to close the channel

    let mut ans: u64 = 0;
    for val in rx {
        ans += val;
        println!("found value");
    }

    println!("Ans is: {}", ans);
}

💡 This version ensures the loop exits properly once all threads are done.