与 golang 一样,Rust 也实现了 channel 用于线程间的通信。如同 golang 的口号一样:
Do not communicate by sharing memory; instead, share memory by communicating
channel 代表了 communicate 的思想。而传统的 mutex 代表了 share memory 的思想。
其根本区别在于,communicate 是 single ownership 的,当发送一个值到 channel 后,我们不再关心这个值,相当于将所有权移交给了接收线程。而 share memory 则是 multiple ownership 的,多个线程都会访问共享的变量,因此维护起来比 communicate 方式复杂得多。
Rust 中的 channel 本质上是多个生产者、一个消费者 (multiple producer single consumer) 的队列实现的,mpsc
提供了两类 channel,分别是:
-
mpsc::channel
,一个异步的,具有“无限大”缓冲区的队列,因此发送方永远不会阻塞。 -
mpsc::sync_channel
,一个同步的,具有有限缓冲区的队列。发送方在缓冲区满之前不会阻塞,但是缓冲区满了以后就阻塞了。注意,缓冲区大小可以是“0”,类似于 golang 中的chan
,使得发送和接收成为一个原子操作。
channel 具有发送者和接受者,当任意一方被 drop,channel 就关闭了。
一个简单的例子如下:
// multiple producer single consumer
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let greating = String::from("hi");
tx.send(greating).unwrap();
});
let received = rx.recv().unwrap();
println!("received {}", received);
}
我们也可以发送多条消息:
// multiple producer single consumer
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
for received in rx {
println!("received {}", received);
}
}
可以看到,每隔 1s 打印出一个单词。注意,接收时我们不再调用 recv()
方法,而是将 rx
作为一个迭代器。当 channel 关闭时,迭代终止。
多个生产者
channel 支持多个生产者模式,而且,tx
是一个可以被 clone
的对象。我们可以如下实现:
// multiple producer single consumer
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
let tx1 = tx.clone();
thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
thread::spawn(move || {
let vals = vec![
String::from("more"),
String::from("messages"),
String::from("for"),
String::from("you"),
];
for val in vals {
tx1.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
for received in rx {
println!("received {}", received);
}
}
注意两个任务使用的发送者不一样:
let tx1 = tx.clone();
网友评论