美文网首页
使用简单的消息队列实现文件列表

使用简单的消息队列实现文件列表

作者: FredricZhu | 来源:发表于2023-09-20 09:43 被阅读0次

    本例是Bartosz Milewski C++11 Concurrency课程中第9课的代码,使用std::condition_variable 和 std::mutex实现简单的消息队列,来进行文件列表功能的视线。
    程序代码如下,
    conanfile.txt

    [requires]
    boost/1.72.0
    
    [generators]
    cmake
    

    CMakeLists.txt

    cmake_minimum_required(VERSION 3.3)
    
    project(1_list_dir_server)
    
    set(ENV{PKG_CONFIG_PATH} "$ENV{PKG_CONFIG_PATH}:/usr/local/lib/pkgconfig/")
    
    set ( CMAKE_CXX_FLAGS "-pthread")
    set(CMAKE_CXX_STANDARD 17)
    add_definitions(-g)
    
    include(${CMAKE_BINARY_DIR}/conanbuildinfo.cmake)
    conan_basic_setup()
    
    include_directories(${INCLUDE_DIRS})
    LINK_DIRECTORIES(${LINK_DIRS})
    
    file( GLOB main_file_list ${CMAKE_CURRENT_SOURCE_DIR}/*.cpp) 
    
    foreach( main_file ${main_file_list} )
        file(RELATIVE_PATH filename ${CMAKE_CURRENT_SOURCE_DIR} ${main_file})
        string(REPLACE ".cpp" "" file ${filename})
        add_executable(${file}  ${main_file})
        target_link_libraries(${file} ${CONAN_LIBS} pthread)
    endforeach( main_file ${main_file_list})
    

    async_out.hpp

    #ifndef _FREDRIC_ASYNC_OUT_HPP_
    #define _FREDRIC_ASYNC_OUT_HPP_
    
    #include <sstream>
    #include <mutex>
    #include <iostream>
    
    struct AsyncOut: public std::stringstream {
        static inline std::mutex cout_mutex;
    
        ~AsyncOut() {
            std::lock_guard<std::mutex> lock(cout_mutex);
            std::cout << rdbuf();
            std::cout.flush();
        }
    };
    
    #define aout AsyncOut{}
    
    #endif
    

    clocker.hpp

    #ifndef _FREDRIC_CLOCKER_HPP_
    #define _FREDRIC_CLOCKER_HPP_
    #include <chrono>
    #include "asyc_out.hpp"
    
    struct Clocker {
        std::chrono::time_point<std::chrono::high_resolution_clock> start;
    
        Clocker() {
            start = std::chrono::high_resolution_clock::now();
        }
    
        ~Clocker() {
            auto end = std::chrono::high_resolution_clock::now();
            auto mill_dur = std::chrono::duration_cast<std::chrono::milliseconds>(end-start).count();
            aout << "Takes " << mill_dur << " milliseconds\n";
        }
    };
    #endif
    

    main.cpp

    #include "boost/filesystem.hpp"
    #include <iostream>
    #include <thread>
    #include <algorithm>
    #include <boost/foreach.hpp>
    #include <vector>
    #include <string>
    #include <cassert>
    #include <future>
    #include <mutex>
    #include <chrono>
    #include <condition_variable>
    #include <deque>
    #include "asyc_out.hpp"
    #include "clocker.hpp"
    
    using path_type = boost::filesystem::path;
    int const NUM_THREADS = 8;
    
    template <typename T>
    class MsgQueue {
        std::deque<T> queue_;
        std::condition_variable cond_;
        std::mutex mutex_;
    
    public:
        void send(T&& msg) {
            {
                std::lock_guard<std::mutex> lock(mutex_);
                queue_.push_front(std::move(msg));
            }
            cond_.notify_one();
        }
    
        T receive() {
            // 注意这里必须用std::unique_lock,因为cond_.wait的时候会调用lock和unlock接口
            std::unique_lock<std::mutex> lock(mutex_);
            cond_.wait(lock, [this](){return !queue_.empty();});
            T msg = std::move(queue_.back());
            queue_.pop_back();
            return msg;
        }
    };
    
    
    void list_dir_server(MsgQueue<path_type>& dir_queue, MsgQueue<std::string>& file_queue) {
        for(;;) {
            path_type dir = dir_queue.receive();
            boost::filesystem::directory_iterator it(dir);
            for(auto& sub_path: it) {
                if(boost::filesystem::is_directory(sub_path)) {
                    auto path_ = sub_path.path();
                    dir_queue.send(std::move(path_));
                } else {
                    auto path_leaf =  sub_path.path().filename().string();
                    file_queue.send(std::move(path_leaf));
                }
            }
        }
    }
    
    void print_server(MsgQueue<std::string>& name_queue) {
        for(;;) {
            std::string name = name_queue.receive();
            aout << name << "\n";
        }
    }
    
    void list_tree(path_type&& root_dir) {
        MsgQueue<path_type> dir_queue;
        MsgQueue<std::string> file_queue;
        dir_queue.send(std::move(root_dir));
    
        std::vector<std::future<void>> futures;
        for(int i=0; i<NUM_THREADS; ++i) {
            futures.push_back(std::async(std::launch::async, &list_dir_server, 
                std::ref(dir_queue),
                std::ref(file_queue)));
        }
    
        futures.push_back(std::async(std::launch::async, &print_server, std::ref(file_queue)));
    
        try {
            while(!futures.empty()) {
                auto ftr = std::move(futures.back());
                futures.pop_back();
                ftr.get();
            }
        } catch(boost::filesystem::filesystem_error& err) {
            aout << "File system error: " << err.code().message() << "\n";
        } catch(std::exception& ex) {
            aout << "Exception: " << ex.what() << "\n";
        } catch(...) {
            aout << "Unknown exception\n";
        }
    }
    
    int main(int argc, char* argv[]) {
        path_type root_dir("/home/fredric/code");
        list_tree(std::move(root_dir));
    
        return EXIT_SUCCESS;
    }
    

    程序的输出如下,


    image.png

    相关文章

      网友评论

          本文标题:使用简单的消息队列实现文件列表

          本文链接:https://www.haomeiwen.com/subject/zqwovdtx.html