美文网首页
Android 源码学习 -- 底层多线程task设计

Android 源码学习 -- 底层多线程task设计

作者: 打出了枫采 | 来源:发表于2022-05-04 14:19 被阅读0次

以前工作多偏重于业务逻辑,较少关注到底层的逻辑实现,自己写内部工具中时也比较随意,遇到用多线程的地方都是临时起一个线程处理耗时的复杂任务,任务结束后,自动被主线程回收。线程的频繁创建销毁其实是存在很大开销的。下面的这种设计使用方式可以更高效的利用multi-thread

MessageLoopThread

循环队列任务处理机制简单说明:

  • 一个持续运行的Thread(while (1) { do tasks})
  • 一个 task queue (如果用的不是线程安全的标准数据结构,则需要另外加锁来保证)
  • task queue中存放的是 类似于 C++ Functor的对象,或者简单理解就是task function的一段代码
  • 其他线程会将耗时任务封装成task,加入队列中
  • Thread 运行时,按queue中顺序依次执行其中的task function
    上面有一个很重要解决的问题是,需要把不同的逻辑处理函数 封装成统一的task function,答案就是Android里面使用了chromium的base::Bind来实现统一的封装

可以参考关于std::bind 来理解这个base::Bind作用,实际作用有些像python的functools 里面的辅助函数,也有些类似python里的 wrapper装饰器,简单理解就是把一个函数固化入参后,生成一个新的函数。

  • 下面举例说明:
static bt_status_t btif_gatts_add_service(int server_if,
                                          const btgatt_db_element_t* service,
                                          size_t service_count) {
  CHECK_BTGATT_INIT();
  return do_in_jni_thread(FROM_HERE,
                          Bind(&add_service_impl, server_if,
                               std::vector(service, service + service_count)));
}

static void add_service_impl(int server_if,
                             vector<btgatt_db_element_t> service) {
  if (service[0].uuid == Uuid::From16Bit(UUID_SERVCLASS_GATT_SERVER) ||
      service[0].uuid == Uuid::From16Bit(UUID_SERVCLASS_GAP_SERVER)) {
    LOG_ERROR("%s: Attept to register restricted service", __func__);
    HAL_CBACK(bt_gatt_callbacks, server->service_added_cb, BT_STATUS_FAIL,
              server_if, service.data(), service.size());
    return;
  }

  BTA_GATTS_AddService(
      server_if, service,
      jni_thread_wrapper(FROM_HERE, base::Bind(&on_service_added_cb)));
}

bt_status_t do_in_jni_thread(const base::Location& from_here,
                             base::OnceClosure task) {
  if (!jni_thread.DoInThread(from_here, std::move(task))) {
    LOG(ERROR) << __func__ << ": Post task to task runner failed!";
    return BT_STATUS_FAIL;
  }
  return BT_STATUS_SUCCESS;
}

static MessageLoopThread jni_thread("bt_jni_thread");

bool MessageLoopThread::DoInThread(const base::Location& from_here,
                                   base::OnceClosure task) {
  return DoInThreadDelayed(from_here, std::move(task), base::TimeDelta());
}

bool MessageLoopThread::DoInThreadDelayed(const base::Location& from_here,
                                          base::OnceClosure task,
                                          const base::TimeDelta& delay) {
  std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
  if (is_main_ && init_flags::gd_rust_is_enabled()) {
    if (rust_thread_ == nullptr) {
      LOG(ERROR) << __func__ << ": rust thread is null for thread " << *this
                 << ", from " << from_here.ToString();
      return false;
    }

    shim::rust::main_message_loop_thread_do_delayed(
        **rust_thread_,
        std::make_unique<shim::rust::OnceClosure>(std::move(task)),
        delay.InMilliseconds());
    return true;
  }

上面这段代码btif_gatts_add_service 就是将函数add_service_impl通过Bind入参固化后转换成统一的OnceClosure task,放到bt_jni_thread中

这里需要注意的一点shim::rust::main_message_loop_thread_do_delayed最终调用的是Rust实现,这里似乎是Android在新版本里最终底层多线程的处理都改用Rust实现了,目前尚不熟悉Rust语言和C++的调用Rust处理,对应rust文件message_loop_thread.rs,据说是Rust相比C++在多线程处理上使用起来更方便高效;DoInThreadDelayed函数也有相关mock实现,可以从mock实现来理解上面所说的细节,这里就不列举出来了

  • 下面这段代码的设计也很巧妙,可以在实际应用中借鉴:
static void add_service_impl(int server_if,
                             vector<btgatt_db_element_t> service) {
  if (service[0].uuid == Uuid::From16Bit(UUID_SERVCLASS_GATT_SERVER) ||
      service[0].uuid == Uuid::From16Bit(UUID_SERVCLASS_GAP_SERVER)) {
    LOG_ERROR("%s: Attept to register restricted service", __func__);
    HAL_CBACK(bt_gatt_callbacks, server->service_added_cb, BT_STATUS_FAIL,
              server_if, service.data(), service.size());
    return;
  }

  BTA_GATTS_AddService(
      server_if, service,
      jni_thread_wrapper(FROM_HERE, base::Bind(&on_service_added_cb)));
}

extern void BTA_GATTS_AddService(tGATT_IF server_if,
                                 std::vector<btgatt_db_element_t> service,
                                 BTA_GATTS_AddServiceCb cb) {
  do_in_main_thread(FROM_HERE,
                    base::Bind(&bta_gatts_add_service_impl, server_if,
                               std::move(service), std::move(cb)));
}

template <typename R, typename... Args>
base::Callback<R(Args...)> jni_thread_wrapper(const base::Location& from_here,
                                              base::Callback<R(Args...)> cb) {
  return base::Bind(
      [](const base::Location& from_here, base::Callback<R(Args...)> cb,
         Args... args) {
        do_in_jni_thread(from_here,
                         base::Bind(cb, std::forward<Args>(args)...));
      },
      from_here, std::move(cb));
}

static void on_service_added_cb(tGATT_STATUS status, int server_if,
                                vector<btgatt_db_element_t> service) {
  HAL_CBACK(bt_gatt_callbacks, server->service_added_cb, status, server_if,
            service.data(), service.size());
}

有两点:

  • 用了两个线程来完成一个task,准确的说是一个线程完成主体任务处理,另一个线程完成任务结果callback处理,这样主体任务线程的处理不会因为单一任务callback而阻塞 (这个比较经典的场景就是在UI设计里,以前最开始使用Qt做tool时有遇到:后台处理,前台刷新逻辑做线程分离,后台处理不会导致UI界面卡住,UI界面重绘不会导致后台任务阻塞,甚至复杂处理逻辑可以使用更多的线程来保证)
  • jni_thread_wrapper中使用了Bind和C++ 匿名函数对callback函数做统一封装,实际处理时调用cb.Run(GATT_ERROR, server_if, std::move(service));
void bta_gatts_add_service_impl(tGATT_IF server_if,
                                std::vector<btgatt_db_element_t> service,
                                BTA_GATTS_AddServiceCb cb) {
  uint8_t rcb_idx =
      bta_gatts_find_app_rcb_idx_by_app_if(&bta_gatts_cb, server_if);

  LOG(INFO) << __func__ << ": rcb_idx=" << +rcb_idx;

  if (rcb_idx == BTA_GATTS_INVALID_APP) {
    cb.Run(GATT_ERROR, server_if, std::move(service));
    return;
  }

Notes:

  • 当然上面任务的处理是属于异步的,对于时效性要求特别高的场景,可能不适合使用;如果有很多产生任务的线程,那可能需要合理规划任务处理线程的个数以及实际分配协同等
  • 下面rust中多线程处理代码,rust在多线程处理上比C++更高效,这个需要更多研究。
pub fn main_message_loop_thread_do_delayed(
    thread: &mut MessageLoopThread,
    closure: cxx::UniquePtr<ffi::OnceClosure>,
    delay_ms: i64,
) {
    assert!(init_flags::gd_rust_is_enabled());
    if delay_ms == 0 {
        if thread.tx.send(closure).is_err() {
            log::error!("could not post task - shutting down?");
        }
    } else {
        thread.rt.spawn(async move {
            // NOTE: tokio's sleep can't wake up the system...
            // but hey, neither could the message loop from libchrome.
            //
            // ...and this way we don't use timerfds arbitrarily.
            //
            // #yolo
            tokio::time::sleep(Duration::from_millis(delay_ms.try_into().unwrap_or(0))).await;
            closure.Run();
        });
    }
}

相关文章

网友评论

      本文标题:Android 源码学习 -- 底层多线程task设计

      本文链接:https://www.haomeiwen.com/subject/cnxyyrtx.html