本例与前一例的变化是,重构了一个可以通用的task_manager.hpp。
以后需要拆分子任务和分线程池运行,都可以用这个class了,不用再手写了。
程序目录结构如下,

CMakeLists.txt
cmake_minimum_required(VERSION 2.6)
project(hello_world)
add_definitions(-std=c++14)
add_definitions(-g)
find_package(ZLIB)
find_package(OpenCV REQUIRED )
find_package(Boost REQUIRED COMPONENTS
system
filesystem
serialization
program_options
thread
)
include_directories(${Boost_INCLUDE_DIRS} /usr/local/include /usr/local/iODBC/include /opt/snowflake/snowflakeodbc/include/ ${CMAKE_CURRENT_SOURCE_DIR}/../../)
LINK_DIRECTORIES(/usr/local/lib /usr/local/iODBC/lib /opt/snowflake/snowflakeodbc/lib/universal)
file( GLOB APP_SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/*.cpp ${CMAKE_CURRENT_SOURCE_DIR}/../impl/*.cpp ${CMAKE_CURRENT_SOURCE_DIR}/*.h ${CMAKE_CURRENT_SOURCE_DIR}/*.cpp ${CMAKE_CURRENT_SOURCE_DIR}/../../http/impl/*.cpp ${CMAKE_CURRENT_SOURCE_DIR}/../../img_util/impl/*.cpp)
foreach( sourcefile ${APP_SOURCES} )
file(RELATIVE_PATH filename ${CMAKE_CURRENT_SOURCE_DIR} ${sourcefile})
string(FIND "${filename}" "test.cpp" "TEMP")
if( NOT "${TEMP}" STREQUAL "-1" )
string(REPLACE ".cpp" "" file ${filename})
add_executable(${file} ${APP_SOURCES})
target_link_libraries(${file} ${Boost_LIBRARIES} ZLIB::ZLIB ${OpenCV_LIBS})
target_link_libraries(${file} ssl crypto libgtest.a libgtest_main.a pystring libgmock.a iodbc iodbcinst libnanodbc.a pthread)
endif()
endforeach( sourcefile ${APP_SOURCES})
task_manager.hpp
#ifndef _FREDRIC_TASK_MANAGER_HPP_
#define _FREDRIC_TASK_MANAGER_HPP_
#include <boost/asio/post.hpp>
#include <boost/asio/thread_pool.hpp>
#include <vector>
template <typename T>
using task_function_type = std::function<void(const std::vector<T>&, int&)>;
template <typename T>
struct task_manager {
task_manager(const std::vector<T>& all_eles, const int batch_size)
: all_eles_{all_eles}, batch_size_{batch_size} {
batches_ = all_eles_.size() / batch_size_ + 1;
for (int i = 0; i < batches_; ++i) {
compared_failed_nums.push_back(0);
}
}
void divide_elements_to_batches() {
sub_eles_.clear();
for (int i = 0; i < batches_; ++i) {
std::vector<T> sub_ele{};
if (i + 1 < batches_) {
for (int j = 0; j < batch_size_; ++j) {
sub_ele.emplace_back(
std::move(all_eles_[i * batch_size_ + j]));
}
} else {
for (int j = 0; j < all_eles_.size() % batch_size_; ++j) {
sub_ele.emplace_back(
std::move(all_eles_[i * batch_size_ + j]));
}
}
sub_eles_.emplace_back(std::move(sub_ele));
}
}
void start_thread_pool_and_run_tasks(const task_function_type<T>& func) {
// 使用asio thread_pool启动线程池,运行子任务
boost::asio::thread_pool pool{batches_};
for (int i = 0; i < sub_eles_.size(); ++i) {
boost::asio::post(pool,
std::bind(func, std::ref(sub_eles_[i]),
std::ref(compared_failed_nums[i])));
}
pool.join();
}
int collect_failed_count() {
int total_failed_num{0};
for (auto failed_num : compared_failed_nums) {
total_failed_num += failed_num;
}
return total_failed_num;
}
private:
std::vector<T> all_eles_{};
int batch_size_{};
std::size_t batches_{};
std::vector<std::vector<T>> sub_eles_{};
std::vector<int> compared_failed_nums{};
};
#endif
screenshots.h
#ifndef _FREDRIC_SCREENSHOTS_H_
#define _FREDRIC_SCREENSHOTS_H_
#include <iostream>
#include <map>
#include <string>
#include <vector>
using screenshots_value_type = std::map<std::string, std::vector<std::string>>;
using meta_type = std::map<std::string, std::string>;
struct screenshots {
std::string product_key;
screenshots_value_type old_value;
screenshots_value_type new_value;
meta_type meta;
std::string change_time;
friend std::ostream& operator<<(std::ostream& os,
const screenshots& screenshots_);
};
#endif
screenshots.cpp
#include "images_timeline/screenshots.h"
std::ostream& print_screenshot_value(
std::ostream& os, const screenshots_value_type& screenshots_value) {
for (auto&& map_ele : screenshots_value) {
os << map_ele.first << " ";
auto urls = map_ele.second;
for (auto&& url : urls) {
os << url << " ";
}
os << std::endl;
}
return os;
}
std::ostream& operator<<(std::ostream& os, const screenshots& screenshot_) {
os << "Compare screenshots failed: product_key: " << screenshot_.product_key
<< "Old value: ";
print_screenshot_value(os, screenshot_.old_value);
os << "New value: ";
print_screenshot_value(os, screenshot_.new_value);
os << "Meta: ";
for (auto&& map_ele : screenshot_.meta) {
os << map_ele.first << " " << map_ele.second << " ";
os << std::endl;
}
return os;
}
images_.h
#ifndef _FREDRIC_IMAGES_H_
#define _FREDRIC_IMAGES_H_
#include "images_timeline/screenshots.h"
std::vector<screenshots> get_screenshots_from_db();
bool test_all_images_can_be_accessed();
bool test_meta_is_correct();
#endif
images_.cpp
#include "images_timeline/images_.h"
#include "http/http_util.h"
#include "images_timeline/decorator.hpp"
#include "images_timeline/task_manager.hpp"
#include "img_util/img_util.h"
#include "json/json.hpp"
#include "pystring/pystring.h"
#include "sf_db2/sf_db2.h"
#include <boost/asio/post.hpp>
#include <boost/asio/thread_pool.hpp>
#include <algorithm>
#include <functional>
using json = nlohmann::json;
const int BatchSize = 2;
const std::string ConnStr = "dsn=product_odbc;pwd=Lily870104";
const std::string ImagePath = "../images";
const std::string CdnHost = "static-s.aa-cdn.net";
screenshots_value_type parse_screenshot_val(const std::string& value) {
screenshots_value_type ret_val{};
auto json_value = json::parse(value);
// 使用 nlohmann::json 库解析json对象,拿取对象中的URL
// 对象格式 {"default":
// ["gp/20600013289355/OpozImyAlqDxklfG2v3MSHpUfWxeCUIhz2nqJf_g9knQU2cd9o4vY7OSSUnM7ElzBDyI"]}
for (auto &&image_it = json_value.begin(), end = json_value.end();
image_it != end; ++image_it) {
auto key_ = image_it.key();
std::vector<std::string> images_{};
auto image_vals_ = image_it.value();
for (auto image : image_vals_) {
images_.emplace_back(std::move(image.get<std::string>()));
}
ret_val[key_] = std::move(images_);
}
return std::move(ret_val);
}
meta_type parse_meta_val(const std::string& value) {
auto meta_value = json::parse(value);
meta_type ret_val;
for (auto &&meta_it = meta_value.begin(), end = meta_value.end();
meta_it != end; ++meta_it) {
auto key_ = meta_it.key();
auto val_ = meta_it.value().get<std::string>();
ret_val[key_] = val_;
}
return std::move(ret_val);
}
std::vector<screenshots> get_screenshots_from_db() {
auto conn_str = ConnStr;
auto raw_query =
R"(select product_key, old_value, new_value, meta, change_time
from AA_INTELLIGENCE_PRODUCTION.ADL_MASTER.dim_localized_event_service_v1_cluster_by_product_key
where market_code='apple-store'
and event_type_name='screenshot_change'
and meta is not null order by change_time desc limit 10;)";
sf_connection sf{conn_str};
auto res = sf.exec_raw_query(raw_query);
int ele_size = res.affected_rows();
const auto columns = res.columns();
std::vector<screenshots> res_eles{};
const std::string null_value = "null";
while (res.next()) {
auto const product_id_ = res.get<std::string>(0, null_value);
auto const old_json_str = res.get<std::string>(1, null_value);
auto const new_json_str = res.get<std::string>(2, null_value);
auto const meta_str = res.get<std::string>(3, null_value);
auto const change_time = res.get<std::string>(4, null_value);
auto old_value = parse_screenshot_val(old_json_str);
auto new_value = parse_screenshot_val(new_json_str);
auto meta_value = parse_meta_val(meta_str);
screenshots screenshots_{product_id_, old_value, new_value, meta_value,
change_time};
res_eles.emplace_back(std::move(screenshots_));
}
return std::move(res_eles);
}
bool test_a_image(const std::string& host, const std::string& path) {
std::string final_path = "/img/" + path;
std::string result_name = path;
// 原先URL path替换 "/"为 "_",作为文件名,就不用自己生成UUID了
auto tmp_result_name = pystring::replace(result_name, "/", "_");
std::string final_result_name = ImagePath + "/" + tmp_result_name + ".png";
bool res = HttpUtil::get_file(host, final_path, final_result_name);
if (!res) {
return false;
} else {
return true;
}
}
template <typename T>
void test_one_screenshots_value(decorator<T> image_test_func,
const screenshots_value_type& screenshot_values,
int& cant_be_accessed_count) {
for (auto&& device_obj : screenshot_values) {
auto images = device_obj.second;
for (auto& image_ : images) {
auto success = image_test_func(CdnHost, image_);
if (!success) {
std::cerr << "Download [ https://" << CdnHost << "/img/"
<< image_ << "] failed" << std::endl;
++cant_be_accessed_count;
}
}
}
}
void download_a_batch(const std::vector<screenshots>& screenshots_,
int& cant_be_accessed_count) {
auto image_test_func = make_decorator(test_a_image);
for (auto&& screenshot_ : screenshots_) {
auto old_values = screenshot_.old_value;
auto new_values = screenshot_.new_value;
test_one_screenshots_value(image_test_func, old_values,
cant_be_accessed_count);
test_one_screenshots_value(image_test_func, new_values,
cant_be_accessed_count);
}
}
bool test_all_images_can_be_accessed() {
auto screenshots_ = get_screenshots_from_db();
std::cout << "Total screenshots element count: " << screenshots_.size()
<< std::endl;
// 按BatchSize大小进行分批,放进subVector中
task_manager<screenshots> manager_{screenshots_, BatchSize};
manager_.divide_elements_to_batches();
manager_.start_thread_pool_and_run_tasks(download_a_batch);
int total_failed_num = manager_.collect_failed_count();
std::cout << "Total failed nums: [" << total_failed_num << "]" << std::endl;
return total_failed_num == 0;
}
bool compare_a_screenshot_list(const screenshots& screenshots_) {
auto meta_ = screenshots_.meta;
auto old_value_ = screenshots_.old_value;
auto new_value_ = screenshots_.new_value;
for (auto&& meta_ele : meta_) {
auto compare_key_prefix = meta_ele.first;
// The compared meta value
auto compare_val = meta_ele.second;
for (auto&& old_value_ele : old_value_) {
auto real_key = old_value_ele.first;
// Matched the compare key prefix
if (pystring::startswith(real_key, compare_key_prefix)) {
auto old_value_to_cmp = old_value_ele.second;
auto new_value_to_cmp = new_value_[real_key];
int old_size = old_value_to_cmp.size();
int new_size = new_value_to_cmp.size();
int min_size = old_size < new_size ? old_size : new_size;
// The actual compared meta value.
std::string act_compare_val{};
for (int i = 0; i < min_size; ++i) {
auto old_img_url = old_value_to_cmp[i];
auto new_img_url = new_value_to_cmp[i];
auto old_img_path =
ImagePath + "/" +
pystring::replace(old_img_url, "/", "_") + ".png";
auto new_img_path =
ImagePath + "/" +
pystring::replace(new_img_url, "/", "_") + ".png";
auto res =
ImageUtil::compare_equal(old_img_path, new_img_path);
// equal true, no change, add zero
if (res) {
act_compare_val += "0";
} else {
act_compare_val += "1";
}
}
if (!pystring::startswith(compare_val, act_compare_val)) {
return false;
}
}
}
}
return true;
}
void compare_a_batch(const std::vector<screenshots>& screenshots_,
int& compare_failed_count) {
for (auto&& screenshot_ : screenshots_) {
bool ret = compare_a_screenshot_list(screenshot_);
if (!ret) {
std::cerr << screenshot_;
++compare_failed_count;
}
}
}
bool test_meta_is_correct() {
auto screenshots_ = get_screenshots_from_db();
std::cout << "Total screenshots element count: " << screenshots_.size()
<< std::endl;
std::vector<std::vector<screenshots>> sub_eles;
// 按BatchSize大小进行分批,放进subVector中
task_manager<screenshots> manager_{screenshots_, BatchSize};
manager_.divide_elements_to_batches();
manager_.start_thread_pool_and_run_tasks(compare_a_batch);
int total_failed_num = manager_.collect_failed_count();
std::cout << "Total failed nums: [" << total_failed_num << "]" << std::endl;
return total_failed_num == 0;
}
decorator.hpp
#ifndef _FREDRIC_DECORATOR_HPP_
#define _FREDRIC_DECORATOR_HPP_
#include <iostream>
#include <functional>
//-------------------------------
// BEGIN decorator implementation
//-------------------------------
template <class> struct decorator;
const int RetryCount = 3;
template <class R, class... Args>
struct decorator<R(Args ...)>
{
decorator(std::function<R(Args ...)> f) : f_(f) {}
R operator()(Args ... args)
{
R res;
for(int i=0; i<RetryCount; ++i) {
res = f_(args...);
if(res) {
return res;
} else {
std::cout <<"Failed, retry..." << std::endl;
}
}
return res;
}
std::function<R(Args ...)> f_;
};
template<class R, class... Args>
decorator<R(Args...)> make_decorator(R (*f)(Args ...))
{
return decorator<R(Args...)>(std::function<R(Args...)>(f));
}
#endif
程序输出如下,

网友评论