Concurrency in Rust: Fearless Parallelism
use std::thread;
use std::sync::{Arc, Mutex};
fn main() {
// Shared immutable data
let data = Arc::new(vec![1, 2, 3]);
// Shared mutable data
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..3 {
let data = Arc::clone(&data);
let counter = Arc::clone(&counter);
handles.push(thread::spawn(move || {
// Read access to immutable data
println!("Data: {:?}", data);
// Write access to mutable data
let mut num = counter.lock().unwrap();
*num += 1;
}));
}
for handle in handles {
handle.join().unwrap();
}
println!("Final count: {}", *counter.lock().unwrap());
}
Why Rust's Concurrency is Special
Rust enforces thread safety at compile time through:
- Ownership system - Prevents data races
- Type system - Guarantees proper synchronization
- Zero-cost abstractions - No runtime overhead
Threading Models
1. Native OS Threads
use std::thread;
let handle = thread::spawn(|| {
println!("Hello from a thread!");
});
handle.join().unwrap();
2. Fork-Join Parallelism
use rayon::prelude::*;
let sum = (0..1000).into_par_iter()
.map(|x| x * x)
.sum::<i32>();
3. Actor Model (using actix
)
use actix::prelude::*;
struct MyActor;
impl Actor for MyActor {
type Context = Context<Self>;
}
struct Message;
impl Message for Message {
type Result = String;
}
impl Handler<Message> for MyActor {
fn handle(&mut self, _: Message, _: &mut Context<Self>) -> String {
"Hello from actor!".to_string()
}
}
Shared-State Concurrency
1. Mutex (Mutual Exclusion)
use std::sync::Mutex;
let m = Mutex::new(5);
{
let mut num = m.lock().unwrap();
*num = 6;
}
2. RwLock (Multiple Readers)
use std::sync::RwLock;
let lock = RwLock::new(5);
{
let r1 = lock.read().unwrap();
let r2 = lock.read().unwrap(); // Multiple readers OK
}
{
let mut w = lock.write().unwrap(); // Single writer
*w += 1;
}
3. Atomic Types
use std::sync::atomic::{AtomicUsize, Ordering};
let counter = AtomicUsize::new(0);
counter.fetch_add(1, Ordering::SeqCst);
Message Passing
1. Channels (MPSC)
use std::sync::mpsc;
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
tx.send("Hello from thread").unwrap();
});
println!("{}", rx.recv().unwrap());
2. Multiple Producers
let (tx, rx) = mpsc::channel();
let tx1 = tx.clone();
thread::spawn(move || {
tx.send(1).unwrap();
});
thread::spawn(move || {
tx1.send(2).unwrap();
});
for received in rx {
println!("Got: {}", received);
}
Async/Await
Basic Example
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
let handle = tokio::spawn(async {
sleep(Duration::from_secs(1)).await;
println!("Hello from async task");
});
handle.await.unwrap();
}
Select Operation
use tokio::sync::oneshot;
async fn race_tasks() {
let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
tokio::spawn(async {
let _ = tx1.send("one");
});
tokio::spawn(async {
let _ = tx2.send("two");
});
tokio::select! {
val = rx1 => println!("{:?}", val),
val = rx2 => println!("{:?}", val),
}
}
Common Patterns
1. Worker Pool
use std::sync::mpsc;
use std::thread;
let (tx, rx) = mpsc::channel();
let mut workers = Vec::with_capacity(4);
for id in 0..4 {
let rx = rx.clone();
workers.push(thread::spawn(move || {
while let Ok(job) = rx.recv() {
println!("Worker {} got job {}", id, job);
}
}));
}
for job in 0..10 {
tx.send(job).unwrap();
}
drop(tx); // Close channel
for worker in workers {
worker.join().unwrap();
}
2. Parallel Map
use rayon::prelude::*;
let results: Vec<_> = (0..1000)
.into_par_iter()
.map(|x| x * x)
.collect();
Performance Considerations
1. False Sharing
use std::sync::atomic::{AtomicU64, Ordering};
use std::thread;
#[repr(align(64))] // Cache line alignment
struct AlignedCounter(AtomicU64);
let counters = vec![AlignedCounter(AtomicU64::new(0)); 4];
thread::scope(|s| {
for (i, counter) in counters.iter().enumerate() {
s.spawn(move || {
for _ in 0..1_000_000 {
counter.0.fetch_add(1, Ordering::Relaxed);
}
});
}
});
2. Lock Granularity
// Prefer:
{
let lock1 = mutex1.lock().unwrap();
let lock2 = mutex2.lock().unwrap();
// Critical section
} // Locks released
// Over:
let lock1 = mutex1.lock().unwrap();
let lock2 = mutex2.lock().unwrap();
// Long operations while holding locks
Testing Concurrent Code
1. Loom (for testing concurrency)
#[test]
fn test_concurrent_access() {
loom::model(|| {
let arc = Arc::new(Mutex::new(0));
let arc2 = arc.clone();
let t1 = loom::thread::spawn(move || {
let mut lock = arc.lock().unwrap();
*lock += 1;
});
let t2 = loom::thread::spawn(move || {
let mut lock = arc2.lock().unwrap();
*lock += 1;
});
t1.join().unwrap();
t2.join().unwrap();
});
}
Further Reading
- The Rustonomicon: Concurrency (opens in a new tab)
- Tokio Documentation (opens in a new tab)
- Rayon Documentation (opens in a new tab)
- Fearless Concurrency Blog Post (opens in a new tab)
- Crossbeam Crate (opens in a new tab)
Last updated on April 10, 2025