美文网首页
Rust for cpp devs - channel

Rust for cpp devs - channel

作者: 找不到工作 | 来源:发表于2021-05-22 15:02 被阅读0次

    与 golang 一样,Rust 也实现了 channel 用于线程间的通信。如同 golang 的口号一样:

    Do not communicate by sharing memory; instead, share memory by communicating

    channel 代表了 communicate 的思想。而传统的 mutex 代表了 share memory 的思想。

    其根本区别在于,communicate 是 single ownership 的,当发送一个值到 channel 后,我们不再关心这个值,相当于将所有权移交给了接收线程。而 share memory 则是 multiple ownership 的,多个线程都会访问共享的变量,因此维护起来比 communicate 方式复杂得多。

    Rust 中的 channel 本质上是多个生产者、一个消费者 (multiple producer single consumer) 的队列实现的,mpsc 提供了两类 channel,分别是:

    • mpsc::channel,一个异步的,具有“无限大”缓冲区的队列,因此发送方永远不会阻塞。

    • mpsc::sync_channel,一个同步的,具有有限缓冲区的队列。发送方在缓冲区满之前不会阻塞,但是缓冲区满了以后就阻塞了。注意,缓冲区大小可以是“0”,类似于 golang 中的 chan,使得发送和接收成为一个原子操作。

    channel 具有发送者和接受者,当任意一方被 drop,channel 就关闭了。

    一个简单的例子如下:

    // multiple producer single consumer
    use std::sync::mpsc;
    use std::thread;
    
    fn main() {
        let (tx, rx) = mpsc::channel();
        thread::spawn(move || {
            let greating = String::from("hi");
            tx.send(greating).unwrap();
        });
    
        let received = rx.recv().unwrap();
        println!("received {}", received);
    }
    

    我们也可以发送多条消息:

    // multiple producer single consumer
    use std::sync::mpsc;
    use std::thread;
    use std::time::Duration;
    
    fn main() {
        let (tx, rx) = mpsc::channel();
        thread::spawn(move || {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("thread"),
            ];
            for val in vals {
                tx.send(val).unwrap();
                thread::sleep(Duration::from_secs(1));
            }
        });
        for received in rx {
            println!("received {}", received);
        }
    }
    

    可以看到,每隔 1s 打印出一个单词。注意,接收时我们不再调用 recv() 方法,而是将 rx 作为一个迭代器。当 channel 关闭时,迭代终止。

    多个生产者

    channel 支持多个生产者模式,而且,tx 是一个可以被 clone 的对象。我们可以如下实现:

    // multiple producer single consumer
    use std::sync::mpsc;
    use std::thread;
    use std::time::Duration;
    
    fn main() {
        let (tx, rx) = mpsc::channel();
    
        let tx1 = tx.clone();
    
        thread::spawn(move || {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("thread"),
            ];
            for val in vals {
                tx.send(val).unwrap();
                thread::sleep(Duration::from_secs(1));
            }
        });
    
    
        thread::spawn(move || {
            let vals = vec![
                String::from("more"),
                String::from("messages"),
                String::from("for"),
                String::from("you"),
            ];
            for val in vals {
                tx1.send(val).unwrap();
                thread::sleep(Duration::from_secs(1));
            }
        });
    
        for received in rx {
            println!("received {}", received);
        }
    }
    

    注意两个任务使用的发送者不一样:

        let tx1 = tx.clone();
    

    相关文章

      网友评论

          本文标题:Rust for cpp devs - channel

          本文链接:https://www.haomeiwen.com/subject/qaenjltx.html