本例是Bartosz Milewski C++11 Concurrency课程的第7课中的代码,分别使用 move机制传递数据。和使用一个大的result对象共享数据。然后使用chrono库度量两种方法的效率。
最终得出的结论是两种方式的效率差不多。在我的机器上使用move机制传递数据因为没有上锁,效率更高一点,少用了0.1s的时间。
最终得出结论,在并行算法的数据项能抽象成无依赖的move形式时,最好使用move形式。不要使用加锁的共享形式,不仅拖慢速度,而且代码也更复杂。
程序代码如下,
conanfile.txt
[requires]
boost/1.72.0
[generators]
cmake
CMakeLists.txt
cmake_minimum_required(VERSION 3.3)
project(1_shared_data)
set(ENV{PKG_CONFIG_PATH} "$ENV{PKG_CONFIG_PATH}:/usr/local/lib/pkgconfig/")
set ( CMAKE_CXX_FLAGS "-pthread")
set(CMAKE_CXX_STANDARD 17)
add_definitions(-g)
include(${CMAKE_BINARY_DIR}/conanbuildinfo.cmake)
conan_basic_setup()
include_directories(${INCLUDE_DIRS})
LINK_DIRECTORIES(${LINK_DIRS})
file( GLOB main_file_list ${CMAKE_CURRENT_SOURCE_DIR}/*.cpp)
foreach( main_file ${main_file_list} )
file(RELATIVE_PATH filename ${CMAKE_CURRENT_SOURCE_DIR} ${main_file})
string(REPLACE ".cpp" "" file ${filename})
add_executable(${file} ${main_file})
target_link_libraries(${file} ${CONAN_LIBS} pthread)
endforeach( main_file ${main_file_list})
async_out.hpp
#ifndef _FREDRIC_ASYNC_OUT_HPP_
#define _FREDRIC_ASYNC_OUT_HPP_
#include <sstream>
#include <mutex>
#include <iostream>
struct AsyncOut: public std::stringstream {
static inline std::mutex cout_mutex;
~AsyncOut() {
std::lock_guard<std::mutex> lock(cout_mutex);
std::cout << rdbuf();
std::cout.flush();
}
};
#define aout AsyncOut{}
#endif
clocker.hpp
#ifndef _FREDRIC_CLOCKER_HPP_
#define _FREDRIC_CLOCKER_HPP_
#include <chrono>
#include "asyc_out.hpp"
struct Clocker {
std::chrono::time_point<std::chrono::high_resolution_clock> start;
Clocker() {
start = std::chrono::high_resolution_clock::now();
}
~Clocker() {
auto end = std::chrono::high_resolution_clock::now();
auto mill_dur = std::chrono::duration_cast<std::chrono::milliseconds>(end-start).count();
aout << "Takes " << mill_dur << " milliseconds\n";
}
};
#endif
main.cpp
#include "boost/filesystem.hpp"
#include <iostream>
#include <thread>
#include <algorithm>
#include <boost/foreach.hpp>
#include <vector>
#include <string>
#include <cassert>
#include <future>
#include <mutex>
#include <chrono>
#include "asyc_out.hpp"
#include "clocker.hpp"
using path_type = boost::filesystem::path;
// 每个线程遍历完成之后返回的std::future数据
struct Result {
std::vector<std::string> files;
std::vector<path_type> dirs;
Result() {}
Result(Result&& r): files(std::move(r.files)), dirs(std::move(r.dirs)) {}
};
Result list_dir(path_type&& path_) {
Result result;
boost::filesystem::directory_iterator it(path_);
for(auto& sub_path: it) {
if(boost::filesystem::is_directory(sub_path)) {
result.dirs.push_back(sub_path);
} else {
result.files.push_back(sub_path.path().filename().string());
}
}
return result;
}
void test_move_data() {
std::string root = "/home/fredric/code";
std::vector<path_type> dirs_to_do;
dirs_to_do.push_back(root);
std::vector<std::string> files;
aout << "Max threads count: " << std::thread::hardware_concurrency() << "\n";
auto max_thread_size = std::thread::hardware_concurrency();
{
Clocker clocker;
while(!dirs_to_do.empty()) {
std::vector<std::future<Result>> futures;
for(int i=0; i<max_thread_size && !dirs_to_do.empty(); ++i) {
auto fut = std::async(&list_dir, std::move(dirs_to_do.back()));
dirs_to_do.pop_back();
futures.push_back(std::move(fut));
}
try {
while(!futures.empty()) {
auto ftr = std::move(futures.back());
futures.pop_back();
Result result = ftr.get();
// 改用std::move 避免拷贝
std::move(result.files.begin(), result.files.end(), std::back_inserter(files));
std::move(result.dirs.begin(), result.dirs.end(), std::back_inserter(dirs_to_do));
}
} catch(boost::filesystem::filesystem_error& err) {
aout << "File system error: " << err.code().message() << "\n";
} catch(std::exception& ex) {
aout << "Exception: " << ex.what() << "\n";
} catch(...) {
aout << "Unknown exception\n";
}
}
}
aout << "Total files count: " << files.size() << "\n";
}
class MonitorResult {
Result result;
std::mutex mutex_;
public:
void put_file(std::string const& file) {
std::lock_guard<std::mutex> lock(mutex_);
result.files.emplace_back(std::move(file));
}
std::vector<std::string> get_files() {
std::lock_guard<std::mutex> lock(mutex_);
return result.files;
}
void put_dir(path_type const& path) {
std::lock_guard<std::mutex> lock(mutex_);
result.dirs.emplace_back(std::move(path));
}
std::vector<path_type> get_dirs(int n) {
std::vector<path_type> dirs;
std::lock_guard<std::mutex> lock(mutex_);
for(int i=0; i<n && !result.dirs.empty() ; ++i) {
dirs.push_back(result.dirs.back());
result.dirs.pop_back();
}
return dirs;
}
bool is_dirs_empty() {
std::lock_guard<std::mutex> lock(mutex_);
return result.dirs.empty();
}
};
void list_dir_shared(path_type&& dir, MonitorResult& result) {
boost::filesystem::directory_iterator it(dir);
for(auto& sub_dir: it) {
if(boost::filesystem::is_directory(sub_dir)) {
result.put_dir(sub_dir.path());
} else {
result.put_file(sub_dir.path().filename().string());
}
}
}
void test_share_data() {
std::string root = "/home/fredric/code";
MonitorResult result;
result.put_dir(path_type(root));
int max_thread_size = std::thread::hardware_concurrency();
{
Clocker clocker;
while(!result.is_dirs_empty()) {
auto dirs_to_do = result.get_dirs(max_thread_size);
std::vector<std::future<void>> futures;
while(!dirs_to_do.empty()) {
auto ftr = std::async(&list_dir_shared, std::move(dirs_to_do.back()), std::ref(result));
dirs_to_do.pop_back();
futures.push_back(std::move(ftr));
try{
while(!futures.empty()) {
auto ftr = std::move(futures.back());
futures.pop_back();
ftr.wait();
}
} catch(boost::filesystem::filesystem_error& err) {
aout << "File system error: " << err.code().message() << "\n";
} catch(std::exception& ex) {
aout << "Exception: " << ex.what() << "\n";
} catch(...) {
aout << "Unknown exception\n";
}
}
}
}
aout << "Total file size: " << result.get_files().size() << "\n";
}
int main(int argc, char* argv[]) {
for(int i=0; i<20; ++i) {
aout << "Move data: ";
test_move_data();
aout << "\n";
aout << "Shared data: ";
test_share_data();
aout << "\n";
}
aout << "Press enter\n";
char c;
std::cin.get(c);
return EXIT_SUCCESS;
}
程序输出如下,
![](https://img.haomeiwen.com/i8982195/e06bf9b11e327432.png)
网友评论