美文网首页
02 managing threads

02 managing threads

作者: rasishou | 来源:发表于2019-12-26 11:45 被阅读0次

C++ Concurrency in Action 2nd Edition note
主线程由C++运行时启动。
2.1 基本线程管理
启动一个线程(参数是函数):

void do_some_work();
std::thread my_thread(do_some_work);

   std::thread可以接收任何的可调用类型.(函数,函数指针,函数对象,lambda表达式,functional包装,bind)。函数对象的例子:

class background_task{
public:
    void operator()()const{
        do_something();
        do_something_else();
}
};
background_task f;
std::thread my_thread(f);

  此时,函数对象被拷贝到新创建线程的存储中,并且被调用。
  如果传递一个临时对象而不是一个命名变量,那么在语法上和函数声明的语法一样,此时编译器解释为函数,而不是一个对象的定义。
std::thread my_thread(background_task());
  编译器解释为声明一个接受单个参数(参数类型为指向一个没有参数并且返回background_task对象的函数指针)并且返回std::thread对象的函数my_thread,而不是启动一个新线程。可以通过传入命名函数对象或者使用括号,或者使用统一初始化语法来避免:

std::thread my_thread((background_task()));
std::thread my_thread{background_task()};

  额外的括号防止将函数对象解释为函数声明。my_thread声明为std::thread类型的对象。
  lambda表达式允许你写一个局部函数,在函数中捕获局部变量,避免额外参数的传递。

std::thread my_thread([]{
    do_something();
    do_something_else();
});

  必须在主线程结束之前显式地join或者detach新线程。如果std::thread对象销毁前没有detachjoin,那么进程将被终止(std::thread析构函数调用std::terminate()).所以确保线程被join或者被detach(包括异常)很重要。
  如果detach线程,你需要确保线程要访问的数据是一直有效的。
  一种情况是线程函数中包含指向局部变量的指针或者引用,并且主线程(函数)退出的时候线程还没有退出(线程继续访问主线程函数中的局部变量)。

struct func{
    int&I;
    func(int&i_):i(i_){}
    void operator()(){
    for(unsigned j=0;j<1000000;++j){
        do_something(i);
}
}
};
void oops(){
    int some_local_state=0;
    func my_func(some_local_state);
    std::thread my_thread(my_func);
    my_thread.detach();
}

  当oops退出的时候,my_thread线程很可能还在运行。如果线程还在运行,下一次调用do_something(i)的时候将会访问一个已经销毁的变量。
  一种解决方法是线程函数自包含并且将数据拷贝到线程而不是共享数据。如果使用可调用对象作为线程函数,该对象将复制到线程,原始对象可以立即销毁。但还是要注意包含指针或引用的对象。除非能够保证线程在主线程函数退出之前完成。
  或者,在主线程函数退出前调用join,保证线程的执行完成。

  join()清理任何线程相关的存储,所以std::thread对象不再与现在完成的线程相关,它(std::thread对象)不与任何线程相关。只能对一个线程调用一次join(),如果已经调用过join(),那么std::thread对象不再可连接,joinable()会返回false

  如果在线程开始之后join()调用之前抛出异常,join()的调用可能会被跳过。
  如果准备在无异常的情况下调用join(),那么还需要考虑在异常的情况下调用join()

struct func;
void f(){
    int some_local_state=0;
    func my_func(some_local_state);
    std::thread t(my_func);
    try{
        do_something_in_current_thread();
}
catch(…){
    t.join();
    throw;
}
t.join();
}

  使用try/catch块不是理想的办法。需要保证所有可能的出口路径,都要调用join(),无论正常或异常。
  一种方法是使用RAII并且把join()加到析构函数。

class thread_guard{
    std::thread&t;
public:
    explicit thread_guard(std::thread&t_):t(t_){}
    ~thread_guard(){
        if(t.joinable()){
    t.join();
}
}
thread_guard(thread_guard const&)=delete;
thread_guard&operator=(thread_guard const&)=delete;
};
struct func;
void f(){
    int some_local_state=0;
    func my_func(some_local_state);
    std::thread t(my_func);
    thread_guard g(t);
    do_something_in_current_thread();
}

  thread_guard对象g先销毁,并且线程在析构函数中连接,即使do_something_in_current_thread抛出异常导致函数退出。
  线程只能join()一次。
  detach()打断线程和std::thread对象的关联,并且保证当std::thread销毁时不调用std::terminate(),即使线程仍然在后台中运行。

  对std::thread对象调用detach()使线程在后台运行,没有任何直接的方式能与线程沟通。如果线程已经分离,就不能再连接,因为无法获得std::thread对象。分离线程完全运行在后台,所有权和控制移交到C++运行时库。线程退出时,C++运行时库保证线程相关的资源正确回收。
  分离的线程称为守护线程,守护线程运行在后台并且没有任何显式的用户接口。守护线程通常是长时间运行的。
  detach()调用完成后,std::thread对象不再与实际的线程关联,所以不再可连接。

std::thread t(do_background_work);
t.detach();
assert(!t.joinable());

  从std::thread对象分离线程,首先要有一个关联线程。只有当t.joinable()返回true时,才能对std::thread对象调用t.detach()

2.2 线程函数传入实参
  实参默认拷贝到新创建线程的内部存储,然后像临时对象那样以右值传递到可调用对象或者函数。即使相应的参数在函数中是引用的形式。

void f(int I,std::string const&s);
std::thread t(f,3,”hello”);

  即使f的第二个形参是std::string,实参传进来的是char const*并且在新线程的上下文中转换为std::string。当实参是一个指向局部变量的指针时:

void f(int i, std::string const&s);
void oops(int some_param){
    char buffer[1024];
    sprint(buffer,”%i”,some_param);
    std::thread t(f,3,buffer);
    t.detach();
}

  我们依赖隐式转换将指向buffer的指针转换为std::string对象,但是std::thread的构造函数只拷贝了指针的值,没有将它转换为我们需要的类型。
  oops函数很可能在新线程将buffer转换为std::string之前,就已经退出了,从而导致未定义行为。解决的办法是将buffer传递给std::thread的构造函数之前先将buffer转换为std::string

void f(int i, std::string const&s);
void oops(int some_param){
    char* buffer[1024];
    sprint(buffer,”%i”,some_param);
    std::thread t(f,3,std::string(buffer));
    t.detach();
}

  非const引用:

void update_date_for_widget(widget_id w, widget_data& data);
void oops_again(widget_id w){
    widget_data data;
    std::thread t(update_data_for_widget,w,data);
    display_status();
    t.join();
    process_widget_data(data);
}

  std::thread构造函数拷贝了提供的数据。但是为了处理只可移动类型,内部代码将该拷贝的实参作为右值传递,导致用一个右值调用update_data_for_widget。这会编译失败,因为不能传递一个右值给一个非const引用。你需要用std::ref包装一下这个实参:
std::thread t(update_data_for_widget, w, std::ref(data));
  如果提供一个合适的对象指针作为第一个实参,那么可以传递成员函数指针作为线程函数。

class X{
public:
    void do_lengthy_work();
};
X my_x;
std::thread t(&X::do_lengthy_work, &my_x);

  也可以为成员函数调用提供实参:std::thread构造函数的第三个实参作为成员函数的第一个实参,以此类推。
  当实参只可移动不可复制:源对象是临时对象时,move是自动调用的;源对象是命名值时,需要调用std::move()

void process_big_object(std::unique_ptr<big_object>);
std::unique_ptr<big_object> p(new big_object);
p->prepare_data(42);
std::thread t(process_big_object, std::move(p));

  big_object的所有权先转移到新线程的内部存储,然后转移到process_big_object
  每一个std::thread实例负责管理一个线程。线程的所有权可以在实例之间转移,因为std::thread实例是可移动的,但是不可复制。这确保任何时刻只有一个对象和线程关联,同时允许程序员在对象之间转移所有权。

2.3 线程所有权转移

void some_function();
void some_other_function();
std::thread t1(some_function);
std::thread t2=std::move(t1);
t1=std::thread(some_other_function);
std::thread t3;
t3=std::move(t2);
t1=std::move(t3);

  最后一个move将运行some_function的线程的所有权转移回t1。但是t1已经有一个关联的线程(运行some_other_function),所以std::terminate()被调用来终止程序。这由std::thread的析构函数完成。必须在线程析构前显式地等待线程完成或分离,赋值同理:你不能通过给std::thread对象赋一个新值来丢弃一个线程。
  线程所有权可以转移到函数外:

std::thread f(){
    void some_function();
    return std::thread(some_function);
}
std::thread g(){
    void some_other_function(int);
    std::thread t(some_other_function,42);
    return t;
}

  如果需要转移所有权到一个函数中,可以传入std::thread实例的值作为函数参数。

void f(std::thread t);
void g(){
    void some_function();
    f(std::thread(some_function));
    std::thread t(some_function);
    f(std;:move(t));
}

  可以使用thread_guard类并取得线程的所有权。使用thread_guard可以避免thread_guard对象的生命周期比它引用的线程长,并且线程的所有权一旦转移到对象,那么其他对象就不能连接或分离线程。

class scoped_thread{
    std::thread t;
public:
    explicit scoped_thread(std::thread t_):t(std::move(t_)){
    if(!t.joinable())
        throw std::logic_error(“no thread”);
}
~scoped_thread(){
    t.join();
}
    scoped_thread(scoped_thread const&)=delete;
    scoped_thread&operator=(scoped_thread const&)=delete;
};
struct func;
void f(){
    int some_local_state;
    scoped_thread t{std::thread(func(some_local_state))};
    do_something_in_current_thread();
}

  thread_guard类必须检查线程是否可连接,scoped_thread类是在构造函数中检查是否可连接。如果不可连接,则抛出异常。(thread_guard是在析构函数中检查,scoped_thread是在构造函数中检查)。
std::jthread提供join()的方式类似于scoped_thread

class joining_thread{
    std::thread t;
public:
    joining_thread()noexcept=default;
    template<typename Callable, typename … Args>
    explicit joining_thread(Callable&& func, Args&& …args):t(std::forward<Callable>(func), std::forward<Args>(args)…){}
explicit joining_thread(std::thread t_)noexcept:t(std::move(t_)){}
joining_thread(joining_thread&&other)noexcept:t(std::move(other.t)){}
    joining_thread&operator=(joining_thread&&other)noexcept{
    if(joinable()){
    join();
       }
t=std::move(other.t);
return *this;
}
joining_thread&operator=(std::thread other)noexcept{
    if(joinable()){
            join();
        }
        t=std::move(other);
         return *this;
}
~joining_thread()noexcept{
    if(joinable())
        join();
}
void swap(joining_thread&other)noexcept{
    t.swap(other.t);
}
std::thread::id get_id()const noexcept{
    return t.get_id();
}
bool joinable() const noexcept{
    return t.joinable();
}
void join(){
    t.join();
}
void detach(){
    t.detach();
}
std::thread& as_thread() noexcept{
    return t;
}
const std::thread& as_thread() const noexcept{
    return t;
}
};

  容器可以容纳std::thread对象:

void do_work(unsigned id);
void f(){
    std::vector<std::thread> threads;
    for(unsigned i=0;i<20;++i){
    threads.emplace_back(do_work,i);
}
for(auto& entry:threads){
    entry.join();
}
}

2.4 在运行时选择线程的数量
  std::thread::hardware_concurrency() 返回线程的数量(48线程返回8)。
  std::accumulate一个简单的并行实现。它将工作分配给线程。每个线程有最低元素数目,防止分配过多的线程。

template<typename Iterator, typename T>
struct accumulate_block{
    void operator()(Iterator first, Iterator last, T& result){
    result=std::accumulate(first, last, result);
}
};
template<typename Iterator, typename T>
T parallel_accumulate(Iterator first, Iterator last, T init){
    unsigned long const length=std::distance(first, last);
    if(!length)//empty
        return init;
    unsigned long const min_per_thread=25;
    unsigned long const max_threads=(length+min_per_thread-1)/min_per_thread;
    unsigned long const hardware_concurrency=std::thread::hardware_concurrency();
    unsigned long const num_threads=std::min(hardware_threads!=0?hardware_threads:2, max_threads);
    unsigned long const block_size=length/num_threads;
    
    std::vector<T> results(num_threads);
    std::vector<std::thread> threads(num_threads-1);
    Iterator block_start=first;
    for(unsigned long i=0;i<(num_threads-1);++i){
    Iterator block_end=block_start;
    std::advance(block_end,block_size);
    threads[i]=std::thread(accumulate_block<Iterator,T>(),block_start,block_end,std::ref(results[i]));
    block_start=block_end;
}
accumulate_block<Iterator, T>()(block_start,last,results[num_threads-1]);
for(auto& entry : threads){
    entry.join();
}
return std::accumulate(results.begin(), results.end(), init);
}

2.5 标识线程
  线程标识的类型是std::thread::id,可以通过std::thread::get_id()或者std::this_thread::get_id()取得。
  std::thread::id类型的对象可以拷贝和比较。
  标准库提供std::hash<std::thread::id>,使得std::thread::id类型的值可以用于无序关联容器中作为key

std::thread::id master_thread;
void some_core_part_of_algorithm(){
    if(std::this_thread::get_id()==master_thread){
    do_master_thread_work();
}
do_common_work();
}

  std::thread::id实例可用于输出流如std::cout
std::cout<<std::this_thread::get_id();
  具体的输出由实现决定。

相关文章

网友评论

      本文标题:02 managing threads

      本文链接:https://www.haomeiwen.com/subject/lodkoctx.html