以前工作多偏重于业务逻辑,较少关注到底层的逻辑实现,自己写内部工具中时也比较随意,遇到用多线程的地方都是临时起一个线程处理耗时的复杂任务,任务结束后,自动被主线程回收。线程的频繁创建销毁其实是存在很大开销的。下面的这种设计使用方式可以更高效的利用multi-thread
MessageLoopThread
循环队列任务处理机制简单说明:
- 一个持续运行的Thread(while (1) { do tasks})
- 一个 task queue (如果用的不是线程安全的标准数据结构,则需要另外加锁来保证)
- task queue中存放的是 类似于 C++ Functor的对象,或者简单理解就是task function的一段代码
- 其他线程会将耗时任务封装成task,加入队列中
- Thread 运行时,按queue中顺序依次执行其中的task function
上面有一个很重要解决的问题是,需要把不同的逻辑处理函数 封装成统一的task function,答案就是Android里面使用了chromium的base::Bind来实现统一的封装
可以参考关于std::bind 来理解这个base::Bind作用,实际作用有些像python的functools 里面的辅助函数,也有些类似python里的 wrapper装饰器,简单理解就是把一个函数固化入参后,生成一个新的函数。
-
下面举例说明:
static bt_status_t btif_gatts_add_service(int server_if,
const btgatt_db_element_t* service,
size_t service_count) {
CHECK_BTGATT_INIT();
return do_in_jni_thread(FROM_HERE,
Bind(&add_service_impl, server_if,
std::vector(service, service + service_count)));
}
static void add_service_impl(int server_if,
vector<btgatt_db_element_t> service) {
if (service[0].uuid == Uuid::From16Bit(UUID_SERVCLASS_GATT_SERVER) ||
service[0].uuid == Uuid::From16Bit(UUID_SERVCLASS_GAP_SERVER)) {
LOG_ERROR("%s: Attept to register restricted service", __func__);
HAL_CBACK(bt_gatt_callbacks, server->service_added_cb, BT_STATUS_FAIL,
server_if, service.data(), service.size());
return;
}
BTA_GATTS_AddService(
server_if, service,
jni_thread_wrapper(FROM_HERE, base::Bind(&on_service_added_cb)));
}
bt_status_t do_in_jni_thread(const base::Location& from_here,
base::OnceClosure task) {
if (!jni_thread.DoInThread(from_here, std::move(task))) {
LOG(ERROR) << __func__ << ": Post task to task runner failed!";
return BT_STATUS_FAIL;
}
return BT_STATUS_SUCCESS;
}
static MessageLoopThread jni_thread("bt_jni_thread");
bool MessageLoopThread::DoInThread(const base::Location& from_here,
base::OnceClosure task) {
return DoInThreadDelayed(from_here, std::move(task), base::TimeDelta());
}
bool MessageLoopThread::DoInThreadDelayed(const base::Location& from_here,
base::OnceClosure task,
const base::TimeDelta& delay) {
std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
if (is_main_ && init_flags::gd_rust_is_enabled()) {
if (rust_thread_ == nullptr) {
LOG(ERROR) << __func__ << ": rust thread is null for thread " << *this
<< ", from " << from_here.ToString();
return false;
}
shim::rust::main_message_loop_thread_do_delayed(
**rust_thread_,
std::make_unique<shim::rust::OnceClosure>(std::move(task)),
delay.InMilliseconds());
return true;
}
上面这段代码btif_gatts_add_service 就是将函数add_service_impl通过Bind入参固化后转换成统一的OnceClosure task,放到bt_jni_thread中
这里需要注意的一点shim::rust::main_message_loop_thread_do_delayed
最终调用的是Rust实现,这里似乎是Android在新版本里最终底层多线程的处理都改用Rust实现了,目前尚不熟悉Rust语言和C++的调用Rust处理,对应rust文件message_loop_thread.rs
,据说是Rust相比C++在多线程处理上使用起来更方便高效;DoInThreadDelayed函数也有相关mock实现,可以从mock实现来理解上面所说的细节,这里就不列举出来了
-
下面这段代码的设计也很巧妙,可以在实际应用中借鉴:
static void add_service_impl(int server_if,
vector<btgatt_db_element_t> service) {
if (service[0].uuid == Uuid::From16Bit(UUID_SERVCLASS_GATT_SERVER) ||
service[0].uuid == Uuid::From16Bit(UUID_SERVCLASS_GAP_SERVER)) {
LOG_ERROR("%s: Attept to register restricted service", __func__);
HAL_CBACK(bt_gatt_callbacks, server->service_added_cb, BT_STATUS_FAIL,
server_if, service.data(), service.size());
return;
}
BTA_GATTS_AddService(
server_if, service,
jni_thread_wrapper(FROM_HERE, base::Bind(&on_service_added_cb)));
}
extern void BTA_GATTS_AddService(tGATT_IF server_if,
std::vector<btgatt_db_element_t> service,
BTA_GATTS_AddServiceCb cb) {
do_in_main_thread(FROM_HERE,
base::Bind(&bta_gatts_add_service_impl, server_if,
std::move(service), std::move(cb)));
}
template <typename R, typename... Args>
base::Callback<R(Args...)> jni_thread_wrapper(const base::Location& from_here,
base::Callback<R(Args...)> cb) {
return base::Bind(
[](const base::Location& from_here, base::Callback<R(Args...)> cb,
Args... args) {
do_in_jni_thread(from_here,
base::Bind(cb, std::forward<Args>(args)...));
},
from_here, std::move(cb));
}
static void on_service_added_cb(tGATT_STATUS status, int server_if,
vector<btgatt_db_element_t> service) {
HAL_CBACK(bt_gatt_callbacks, server->service_added_cb, status, server_if,
service.data(), service.size());
}
有两点:
- 用了两个线程来完成一个task,准确的说是一个线程完成主体任务处理,另一个线程完成任务结果callback处理,这样主体任务线程的处理不会因为单一任务callback而阻塞 (这个比较经典的场景就是在UI设计里,以前最开始使用Qt做tool时有遇到:后台处理,前台刷新逻辑做线程分离,后台处理不会导致UI界面卡住,UI界面重绘不会导致后台任务阻塞,甚至复杂处理逻辑可以使用更多的线程来保证)
- jni_thread_wrapper中使用了Bind和C++ 匿名函数对callback函数做统一封装,实际处理时调用
cb.Run(GATT_ERROR, server_if, std::move(service));
void bta_gatts_add_service_impl(tGATT_IF server_if,
std::vector<btgatt_db_element_t> service,
BTA_GATTS_AddServiceCb cb) {
uint8_t rcb_idx =
bta_gatts_find_app_rcb_idx_by_app_if(&bta_gatts_cb, server_if);
LOG(INFO) << __func__ << ": rcb_idx=" << +rcb_idx;
if (rcb_idx == BTA_GATTS_INVALID_APP) {
cb.Run(GATT_ERROR, server_if, std::move(service));
return;
}
Notes:
- 当然上面任务的处理是属于异步的,对于时效性要求特别高的场景,可能不适合使用;如果有很多产生任务的线程,那可能需要合理规划任务处理线程的个数以及实际分配协同等
- 下面rust中多线程处理代码,rust在多线程处理上比C++更高效,这个需要更多研究。
pub fn main_message_loop_thread_do_delayed(
thread: &mut MessageLoopThread,
closure: cxx::UniquePtr<ffi::OnceClosure>,
delay_ms: i64,
) {
assert!(init_flags::gd_rust_is_enabled());
if delay_ms == 0 {
if thread.tx.send(closure).is_err() {
log::error!("could not post task - shutting down?");
}
} else {
thread.rt.spawn(async move {
// NOTE: tokio's sleep can't wake up the system...
// but hey, neither could the message loop from libchrome.
//
// ...and this way we don't use timerfds arbitrarily.
//
// #yolo
tokio::time::sleep(Duration::from_millis(delay_ms.try_into().unwrap_or(0))).await;
closure.Run();
});
}
}
网友评论