线程池
线程池是对一组线程的管理,可以复用已有线程,并不会执行每次任务的时候新开线程,这在效率上和系统开支上都有极大的好处。以往线程池往往用在服务端开发上,目前随着移动端开发的兴起成熟,端上的复杂逻辑实现往往也需要用到线程池。拿Android开发举例,请求网络的基础框架往往会有一个线程池来管理请求和重用线程,加载图片的基础框架也往往需要用到线程池来管理忙于加载图片的工作线程。Android开发是基于Java语言的,Java语言提供了完备的线程池支持,看一段代码。
class NetworkService implements Runnable {
private final ServerSocket serverSocket;
private final ExecutorService pool;
public NetworkService(int port, int poolSize)
throws IOException {
serverSocket = new ServerSocket(port);
pool = Executors.newFixedThreadPool(poolSize);
}
public void run() { // run the service
try {
for (;;) {
pool.execute(new Handler(serverSocket.accept()));
}
} catch (IOException ex) {
pool.shutdown();
}
}
}
class Handler implements Runnable {
private final Socket socket;
Handler(Socket socket) { this.socket = socket; }
public void run() {
// read and service request on socket
}
}
Java中的线程池提供了有限的几个对外接口,使用上十分方便。如今C++11标准赋予了C++语言同样的能力,下面结合之前几篇文章中介绍的C++11的新特性,提供一个基于C++11的线程池实现,使用起来同样如Java一样简单方便。直接看代码
C++11版本的线程池实现
#include <thread>
#include <mutex>
#include <vector>
#include <deque>
#include <condition_variable>
#include <atomic>
namespace stdx
{
class ThreadPool
{
private:
std::size_t m_pool_size;
std::size_t m_pool_size_max_growth;
enum
{
DEFAULT_THREAD_POOL_COUNT = 5,
DEFAULT_THREAD_MAX_COUNT = 24,
DEFAULT_THREAD_POOL_COUNT_THRESHOLD = 128
};
std::vector<std::thread> m_threads;
std::deque<std::function<void()> > m_threadfuncs;
std::condition_variable m_cv;
std::mutex m_mutex;
std::atomic_int m_atmoic_working_couter;
std::atomic_bool m_atmoic_quit_sign;
std::atomic_bool m_atmoic_fifo_lifo;
using LOCK = std::unique_lock<std::mutex>;
void init()
{
for (int i = 0; i < m_pool_size; ++i)
{
pushback_thread();
}
}
//should invoked in lockguard
void pushback_thread()
{
m_threads.emplace_back([this]() -> void
{
while (!m_atmoic_quit_sign.load())
{
std::function<void()> fn;
try
{
LOCK lock(m_mutex);
m_cv.wait(lock,
[this]() -> bool
{
return !m_threadfuncs.empty() ||
m_atmoic_quit_sign.load();
}
);
if (m_atmoic_quit_sign.load())
{
break;
}
if (!m_threadfuncs.empty())
{
if (m_atmoic_fifo_lifo.load())
{
fn = std::move(m_threadfuncs.front());
m_threadfuncs.pop_front();
}
else
{
fn = std::move(m_threadfuncs.back());
m_threadfuncs.pop_back();
}
}
lock.unlock();
++m_atmoic_working_couter;
if (fn)
{
fn();
}
--m_atmoic_working_couter;
}
catch (...)
{
std::cout << "catch exp:" << __LINE__ << std::endl;
}
}
});
}
void uninit()
{
for (auto &thread : m_threads)
{
if (thread.joinable())
{
thread.join();
}
}
m_threads.clear();
}
//should invoked in lockguard
bool has_idle()
{
return m_atmoic_working_couter.load() < m_threads.size();
}
public:
ThreadPool() : ThreadPool(DEFAULT_THREAD_POOL_COUNT, DEFAULT_THREAD_MAX_COUNT)
{
}
ThreadPool(std::size_t count_begin, std::size_t count_max_growth) : m_pool_size(count_begin),
m_pool_size_max_growth(count_max_growth),
m_atmoic_working_couter(0),
m_atmoic_quit_sign(false),
m_atmoic_fifo_lifo(true)
{
if (m_pool_size > DEFAULT_THREAD_POOL_COUNT_THRESHOLD)
{
m_pool_size = DEFAULT_THREAD_POOL_COUNT_THRESHOLD;
}
if (m_pool_size_max_growth > DEFAULT_THREAD_POOL_COUNT_THRESHOLD)
{
m_pool_size_max_growth = DEFAULT_THREAD_POOL_COUNT_THRESHOLD;
}
init();
}
~ThreadPool()
{
quit();
uninit();
}
ThreadPool(const ThreadPool &) = delete;
ThreadPool(ThreadPool &&) = delete;
void operator=(const ThreadPool &) = delete;
void operator=(ThreadPool &&) = delete;
///////////////////////////////////////////////////////////////
template<typename Fn, typename ...Params>
void commit(Fn &&fn, Params &&... params)
{
try
{
LOCK lock(m_mutex);
if (!has_idle() && m_threads.size() < m_pool_size_max_growth)
{
pushback_thread();
}
m_threadfuncs.emplace_back(
std::bind(std::forward<Fn>(fn), std::forward<Params>(params)...)
);
m_cv.notify_one();
lock.unlock();
}
catch (...)
{
std::cout << "catch exp:" << __LINE__ << std::endl;
}
}
void quit()
{
m_atmoic_quit_sign.store(true);
try
{
LOCK lock(m_mutex);
m_cv.notify_all();
}
catch (...)
{
std::cout << "catch exp:" << __LINE__ << std::endl;
}
}
std::size_t get_pending_count()
{
std::size_t count = 0;
try
{
LOCK lock(m_mutex);
count = m_threadfuncs.size();
}
catch (...)
{
std::cout << "catch exp:" << __LINE__ << std::endl;
}
return count;
}
std::size_t get_working_thread_count()
{
std::size_t count = 0;
try
{
LOCK lock(m_mutex);
count = m_threads.size();
}
catch (...)
{
std::cout << "catch exp:" << __LINE__ << std::endl;
}
return count;
}
int get_working_count()
{
return m_atmoic_working_couter.load();
}
void set_execute_fifo_lifo(bool b)
{
m_atmoic_fifo_lifo.store(b);
}
};
}
测试代码如下
class ThreadPoolTest
{
#define SLEEP(x) std::this_thread::sleep_for(x)
using TIMESECONDS = std::chrono::seconds;
using TIMEMILLI = std::chrono::milliseconds;
public:
static void callback(std::string str, float f, const char *sztmp)
{
std::cout << "commit2:" << str << " " << f << sztmp << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(3));
}
struct functor
{
void operator()()
{
std::cout << "commit3:" << "nothing" << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(3));
}
};
template<typename CALLBACK>
static void thread_callback(int a, CALLBACK fn)
{
std::cout << "commit 5:" << a << std::endl;
fn();
}
static void execute()
{
stdx::ThreadPool thread_pool(1, 5);
// thread_pool.set_execute_fifo_lifo(false);
thread_pool.commit(
[](int a, char b) -> int
{
std::cout << "commit1:" << a << " " << b << std::endl;
SLEEP(TIMESECONDS(3));
return 12;
},
123,
'c'
);
SLEEP(TIMEMILLI (10));
thread_pool.commit(
callback,
std::string("456"),
1.1f,
"789"
);
thread_pool.commit(
functor()
);
auto lambdacallback = []() -> void
{
std::cout << "thread rans end" << std::endl;
};
thread_pool.commit(thread_callback<decltype(lambdacallback)>,
999,
lambdacallback
);
for (int i = 0; i < 10; ++i)
{
thread_pool.commit(
[=]
{
std::cout << "commitn:" << i << std::endl;
if (i == 9)
{
SLEEP(TIMESECONDS(0));
}
}
);
}
SLEEP(TIMESECONDS(1));
std::cout << "idle?:" << thread_pool.has_idle() << std::endl;
std::cout << "pending count:" << thread_pool.get_pending_count() << std::endl;
SLEEP(TIMESECONDS(3));
std::cout << "working count:" << thread_pool.get_working_count() << std::endl;
SLEEP(TIMESECONDS(5));
}
};
上面的代码在windows平台,mac平台,android平台均通过了测试,iOS平台没有进行测试但问题应该不大。ThreadPool类管理了若干线程和线程要执行的函数,线程执行函数是一个队列可以指定为先进先出和后进先出两种模式。测试代码中用了普通函数、functor、lambda作为传入参数对commit进行测试,依照个人使用习惯各种线程函数方式均支持,并且后续线程函数需要的参数可以任意。如果需要线程函数执行完后回调通知,则可以将回调函数作为参数传递给线程函数,上面的测试代码也包含了这种情况。
网友评论