Scheduler
ThreadPool构造函数中创建了一个Scheduler对象。
Scheduler构造参数kNumThreadsPerScheduler,值为2。
Scheduler使用boost::thread_group创建了kNumThreadsPerScheduler个线程,也就是2个线程。
线程方法为Scheduler::serviceQueue。
在Scheduler::serviceQueue方法中,循环检测任务队列判断超时来执行到期任务。
使用了条件变量std::condition_variable和互斥锁std::mutex来并发任务。
这个Scheduler实例同时又被传入下面的Worker对象中用来实现定时计划任务。
Worker
ThreadPool构造时传入numWorkers,默认配置文件设置此值为24,根据numWorkers创建对应的worker,保存在workers队列。
Worker构造函数中创建了service_和service_worker_:
service_{},
service_worker_{new asio_worker::element_type(service_)}
service_和service_worker_的定义:
typedef std::unique_ptr<boost::asio::io_service::work> asio_worker;
boost::asio::io_service service_;
asio_worker service_worker_;
创建ThreadPool之后,要调用ThreadPool::start()
:
void ThreadPool::start() {
std::vector<std::shared_ptr<std::promise<void>>> promises(workers_.size());
int index = 0;
for (auto worker : workers_) {
promises[index] = std::make_shared<std::promise<void>>();
worker->start(promises[index++]);
}
for (auto promise : promises) {
promise->get_future().wait();
}
}
逐个调用 worker->start(promises[index++]);
。
void Worker::start(std::shared_ptr<std::promise<void>> start_promise) {
auto this_ptr = shared_from_this();
auto worker = [this_ptr, start_promise] {
start_promise->set_value();
if (!this_ptr->closed_) {
return this_ptr->service_.run();
}
return size_t(0);
};
group_.add_thread(new boost::thread(worker));
}
每个Woker内部都创建了一个线程,在线程中把boost::asio::io_service service_
启动run起来.
因为io_service关联了一个io_service::work任务对象,所以io_service会一直运行。
Worker的使用者调用Worker::task将Task任务放入队列中等待执行。
void Worker::task(Task f) {
service_.post(f);
}
用户可以使用ThreadPool::getLessUsedWorker()
从workers中获取一个当前引用计数最少的Worker:
std::shared_ptr<Worker> ThreadPool::getLessUsedWorker() {
std::shared_ptr<Worker> chosen_worker = workers_.front();
for (auto worker : workers_) {
if (chosen_worker.use_count() > worker.use_count()) {
chosen_worker = worker;
}
}
return chosen_worker;
}
线程安全
如果想要一组Task同步执行那么需要在同一个Worker实例中加入Task。
如果想并发执行Task,那么可以每次从ThreadPool::getLessUsedWorker()取出一个Worker来加入Task。
因为Task执行在Worker的线程中,跟调用者不在同一线程,如果Task中的代码跟调用者有资源竞争那么需要自己实现加锁。
因为licode使用node.js做为sdk库的调用者,所以创建的WebRtcConnection和MediaStream都执行在Node.js中的单线程中。
每个WebRtcConnection和MediaStream又独立使用Worker对象来执行异步线程任务。
所以WebRtcConnection和MediaStream的内部代码执行在两个线程下,一个Node.js主线程,一个Worker线程。
当释放WebRtcConnection和MediaStream对象时,在Node.js中调用对象的close方法同时释放对象引用,close方法会在Worker中执行syncClose来释放内部资源。
我看了一下加入到Worker中的Task基本都没有加锁,那么开发时需要严格保证两个线程中的资源不能冲突。
关于IOThreadPool和IOWorker
默认配置在整个系统中只创建了一个IOWorker。
实际上IOWorker只在使用NicerConnection时才有用,而如果使用LibNiceConnection,那么此IOWorker不需要start,也就没有任何作用。
因为libnice有自己的内部线程,所以不需要外部线程来处理IO。
libnice收到数据后回调LibNiceConnection::onData,加锁boost::mutex::scoped_lock lock(close_mutex_);
检测当前IceState状态。
在LibNiceConnection::close也会加锁设置IceState并且释放libnice。
网友评论