Languages
Rust
Fearless Parallelism

Concurrency in Rust: Fearless Parallelism

use std::thread;
use std::sync::{Arc, Mutex};
 
fn main() {
    // Shared immutable data
    let data = Arc::new(vec![1, 2, 3]);
 
    // Shared mutable data
    let counter = Arc::new(Mutex::new(0));
 
    let mut handles = vec![];
 
    for _ in 0..3 {
        let data = Arc::clone(&data);
        let counter = Arc::clone(&counter);
 
        handles.push(thread::spawn(move || {
            // Read access to immutable data
            println!("Data: {:?}", data);
 
            // Write access to mutable data
            let mut num = counter.lock().unwrap();
            *num += 1;
        }));
    }
 
    for handle in handles {
        handle.join().unwrap();
    }
 
    println!("Final count: {}", *counter.lock().unwrap());
}

Why Rust's Concurrency is Special

Rust enforces thread safety at compile time through:

  • Ownership system - Prevents data races
  • Type system - Guarantees proper synchronization
  • Zero-cost abstractions - No runtime overhead

Threading Models

1. Native OS Threads

use std::thread;
 
let handle = thread::spawn(|| {
    println!("Hello from a thread!");
});
 
handle.join().unwrap();

2. Fork-Join Parallelism

use rayon::prelude::*;
 
let sum = (0..1000).into_par_iter()
                   .map(|x| x * x)
                   .sum::<i32>();

3. Actor Model (using actix)

use actix::prelude::*;
 
struct MyActor;
impl Actor for MyActor {
    type Context = Context<Self>;
}
 
struct Message;
impl Message for Message {
    type Result = String;
}
 
impl Handler<Message> for MyActor {
    fn handle(&mut self, _: Message, _: &mut Context<Self>) -> String {
        "Hello from actor!".to_string()
    }
}

Shared-State Concurrency

1. Mutex (Mutual Exclusion)

use std::sync::Mutex;
 
let m = Mutex::new(5);
{
    let mut num = m.lock().unwrap();
    *num = 6;
}

2. RwLock (Multiple Readers)

use std::sync::RwLock;
 
let lock = RwLock::new(5);
{
    let r1 = lock.read().unwrap();
    let r2 = lock.read().unwrap(); // Multiple readers OK
}
{
    let mut w = lock.write().unwrap(); // Single writer
    *w += 1;
}

3. Atomic Types

use std::sync::atomic::{AtomicUsize, Ordering};
 
let counter = AtomicUsize::new(0);
counter.fetch_add(1, Ordering::SeqCst);

Message Passing

1. Channels (MPSC)

use std::sync::mpsc;
 
let (tx, rx) = mpsc::channel();
 
thread::spawn(move || {
    tx.send("Hello from thread").unwrap();
});
 
println!("{}", rx.recv().unwrap());

2. Multiple Producers

let (tx, rx) = mpsc::channel();
let tx1 = tx.clone();
 
thread::spawn(move || {
    tx.send(1).unwrap();
});
 
thread::spawn(move || {
    tx1.send(2).unwrap();
});
 
for received in rx {
    println!("Got: {}", received);
}

Async/Await

Basic Example

use tokio::time::{sleep, Duration};
 
#[tokio::main]
async fn main() {
    let handle = tokio::spawn(async {
        sleep(Duration::from_secs(1)).await;
        println!("Hello from async task");
    });
 
    handle.await.unwrap();
}

Select Operation

use tokio::sync::oneshot;
 
async fn race_tasks() {
    let (tx1, rx1) = oneshot::channel();
    let (tx2, rx2) = oneshot::channel();
 
    tokio::spawn(async {
        let _ = tx1.send("one");
    });
 
    tokio::spawn(async {
        let _ = tx2.send("two");
    });
 
    tokio::select! {
        val = rx1 => println!("{:?}", val),
        val = rx2 => println!("{:?}", val),
    }
}

Common Patterns

1. Worker Pool

use std::sync::mpsc;
use std::thread;
 
let (tx, rx) = mpsc::channel();
let mut workers = Vec::with_capacity(4);
 
for id in 0..4 {
    let rx = rx.clone();
    workers.push(thread::spawn(move || {
        while let Ok(job) = rx.recv() {
            println!("Worker {} got job {}", id, job);
        }
    }));
}
 
for job in 0..10 {
    tx.send(job).unwrap();
}
 
drop(tx); // Close channel
 
for worker in workers {
    worker.join().unwrap();
}

2. Parallel Map

use rayon::prelude::*;
 
let results: Vec<_> = (0..1000)
    .into_par_iter()
    .map(|x| x * x)
    .collect();

Performance Considerations

1. False Sharing

use std::sync::atomic::{AtomicU64, Ordering};
use std::thread;
 
#[repr(align(64))] // Cache line alignment
struct AlignedCounter(AtomicU64);
 
let counters = vec![AlignedCounter(AtomicU64::new(0)); 4];
 
thread::scope(|s| {
    for (i, counter) in counters.iter().enumerate() {
        s.spawn(move || {
            for _ in 0..1_000_000 {
                counter.0.fetch_add(1, Ordering::Relaxed);
            }
        });
    }
});

2. Lock Granularity

// Prefer:
{
    let lock1 = mutex1.lock().unwrap();
    let lock2 = mutex2.lock().unwrap();
    // Critical section
} // Locks released
 
// Over:
let lock1 = mutex1.lock().unwrap();
let lock2 = mutex2.lock().unwrap();
// Long operations while holding locks

Testing Concurrent Code

1. Loom (for testing concurrency)

#[test]
fn test_concurrent_access() {
    loom::model(|| {
        let arc = Arc::new(Mutex::new(0));
        let arc2 = arc.clone();
 
        let t1 = loom::thread::spawn(move || {
            let mut lock = arc.lock().unwrap();
            *lock += 1;
        });
 
        let t2 = loom::thread::spawn(move || {
            let mut lock = arc2.lock().unwrap();
            *lock += 1;
        });
 
        t1.join().unwrap();
        t2.join().unwrap();
    });
}

Further Reading

Last updated on April 10, 2025