美文网首页
2022-09-16

2022-09-16

作者: Tomasmule | 来源:发表于2022-09-16 17:07 被阅读0次

    include <iostream>

    include <string>

    include <queue>

    include <thread>

    include <chrono>

    include <limits.h>

    include <condition_variable>

    include <fstream>

    include "json/json.h"

    using namespace std;
    using namespace chrono;

    // couries information
    struct Courier {
    int64_t id;
    int64_t arriveTime;
    bool operator < (const Courier &co) const {
    return arriveTime < co.arriveTime;
    }
    bool operator > (const Courier &co) const {
    return arriveTime > co.arriveTime;
    }
    };

    // order information
    struct Order{
    string id;
    string name;
    int prepareTime; // second
    int64_t dispatchTime;
    int64_t finishTime; // finish finishTime

    Order() {
        prepareTime = 0;
        finishTime = 0;
        id = "";
        name = "";
    }
    Order(Order *order) {
        this->id = order->id;
        this->name = order->name;
        this->prepareTime = order->prepareTime;
        this->finishTime = order->finishTime;
    }
    bool operator < (const Order &od) const
    {
        return finishTime < od.finishTime; 
    }
    bool operator > (const Order &od) const
    {
        return finishTime > od.finishTime;
    }
    

    };

    template<typename NODE>
    class MyQueue {
    public:
    virtual int Init(int cap) = 0;
    virtual void Deinit() = 0;
    virtual int Push(NODE order) = 0;
    virtual NODE GetFrontAndPop(void) = 0;
    virtual bool IsFull(void) = 0;
    virtual bool IsEmpty(void) = 0;
    };
    template<typename NODE>
    class Queue : public MyQueue<NODE> {
    public:
    Queue() {
    this->capicity = INT_MAX;
    }
    int Init(int cap)
    {
    this->capicity = cap;
    pthread_mutex_init(&(this->mutex), NULL);
    }
    void Deinit()
    {
    // lock , free order
    pthread_mutex_lock(&(this->mutex));
    while (!this->que.empty()) {
    this->que.pop();
    }

        pthread_mutex_unlock(&(this->mutex));
    
        //
        pthread_mutex_destroy(&(this->mutex));
        return;
    }
    
    int Push(NODE order)
    {
        //cout<<"input order id: "<<order.id<<endl;
        if (this->que.size() >= this->capicity) {
            return -1;
        }
        this->que.push(order);
        return 0;
    }
    bool IsFull() {
        return this->que.size() == this->capicity;
    }
    bool IsEmpty() {
        return this->que.empty();
    }
    NODE GetFrontAndPop() {
        while (this->que.empty()) {
            // error
            std::this_thread::sleep_for(1000ms);
            cout<<"que is empty\n";
        }
        NODE order = this->que.front();
        this->que.pop();
        return order;
    }
    

    private:
    int capicity;
    queue<NODE> que;
    pthread_mutex_t mutex;
    };
    template<typename NODE>
    class ProQueue : public MyQueue<NODE> {
    public:
    ProQueue() {
    this->capicity = INT_MAX;
    }
    int Init(int cap)
    {
    this->capicity = cap;
    pthread_mutex_init(&(this->mutex), NULL);
    }
    void Deinit()
    {
    // lock , free order
    pthread_mutex_lock(&(this->mutex));
    while (!this->que.empty()) {
    this->que.pop();
    }

        pthread_mutex_unlock(&(this->mutex));
    
        //
        pthread_mutex_destroy(&(this->mutex));
        return;
    }
    
    int Push(NODE order)
    {
        if (this->que.size() >= this->capicity) {
            return -1;
        }
        this->que.push(order);
        return 0;
    }
    bool IsFull() {
        return this->que.size() == this->capicity;
    }
    bool IsEmpty() {
        return this->que.empty();
    }
    NODE GetFrontAndPop() {
        while (this->que.empty()) {
            // error
            std::this_thread::sleep_for(1000ms);
            cout<<"que is empty\n";
        }
        NODE order = this->que.top();
        this->que.pop();
        return order;
    }
    

    private:
    int capicity;
    priority_queue<NODE, vector<NODE>, greater<NODE>> que;
    pthread_mutex_t mutex;
    };

    class Input {
    public:
    int Product(MyQueue<Order> *myque, void *args)
    {
    Order order = (Order)args;
    Order od(order);
    int ret = myque->Push(order);
    return ret;
    }
    };

    //typedef (int*)Rand(int min, int max);

    typedef int (*RANDRANGE)(int min, int max);

    // rand from min to max, [min, max]
    static int Randrange(int min, int max)
    {
    if (max <= min) {
    return -1;
    }
    return min + rand() % (max - min);
    }

    class Dispatch {
    public:
    Dispatch() {
    range = nullptr;
    }
    virtual int DispatchOrder(MyQueue<Order> *que, MyQueue<Courier> *couriers) = 0;
    int RegistRand(RANDRANGE rg) {
    range = rg;
    }
    int CariesArrivedTime(int min, int max)
    {
    if (range != nullptr) {
    return range(min, max);
    }
    return Randrange(min, max);
    }
    void AddFoodWait(int prepareTime)
    {
    this->foodwait += prepareTime;
    }
    int64_t GetFoodWait()
    {
    return foodwait;
    }
    void AddCariesWait(int wait)
    {
    this->carieswait += wait;
    }
    int64_t GetCariesWait()
    {
    return carieswait;
    }
    void AddOrders() {
    orders++;
    }
    int64_t GetOrders() {
    return orders;
    }
    int64_t GetAvgFoodWaitTime() {
    if (orders == 0) {
    return 0;
    }
    return foodwait / orders;
    }
    int64_t GetAvgCariesWaitTime() {
    if (orders == 0) {
    return 0;
    }
    return carieswait / orders;
    }
    private:
    RANDRANGE range;
    int64_t foodwait;
    int64_t carieswait;
    int64_t orders;
    };

    class Matched : public Dispatch
    {
    public:
    int DispatchOrder(MyQueue<Order> *que, MyQueue<Courier> *couriers)
    {
    Queue<Order> *q = (Queue<Order> *)que;
    Queue<Courier> *cou = (Queue<Courier> *)couriers;
    if (cou->IsEmpty()) {
    cout<<"couries is empty!\n";
    return -3;
    }
    Order order = q->GetFrontAndPop();
    Courier courier = cou->GetFrontAndPop();
    if (order.finishTime >= courier.arriveTime) {
    AddCariesWait(order.finishTime - courier.arriveTime);
    } else {
    AddFoodWait(courier.arriveTime - order.finishTime);
    }
    AddOrders();
    // match : 不需要等待,可以派送,算出来

        cout<<"Match Dispatch "<<order.name<<" "<<order.finishTime<<" couries arrived time: "<<courier.arriveTime<<endl;
    
    }
    

    };

    class FIFO : public Dispatch
    {
    public:
    int DispatchOrder(MyQueue<Order> *que, MyQueue<Courier> *couriers)
    {
    ProQueue<Order> q = (ProQueue<Order>)que;
    Order order = q->GetFrontAndPop();
    ProQueue<Courier> cou = (ProQueue<Courier>)couriers;
    Courier cour = cou->GetFrontAndPop();
    auto now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
    if (now >= order.finishTime && now >= cour.arriveTime) {
    if (order.finishTime >= cour.arriveTime) {
    AddCariesWait(order.finishTime - cour.arriveTime);
    } else {
    AddFoodWait(cour.arriveTime - order.finishTime);
    }
    AddOrders();
    cout<<"FIFO Dispatch "<<order.name<<" "<<order.prepareTime<<" "<<order.finishTime<<" "<<now<<endl;
    return 0;
    }

        q->Push(order);
        cou->Push(cour);
        return -2;
    }
    

    };

    class Notify {
    public:
    int FilePath(const char *path) {

    }
    void Print(string s) {
        cout<<s<<endl;
    }
    void Event(string s) {
    
    }
    void Log(string) {
    
    }
    

    };

    enum STRATEGIE {
    STRATEGIE_MATCH = 0x01, // match strategie
    STRATEGIE_FIFO // fifo strategie
    };

    struct Config {
    STRATEGIE st;
    int32_t cap;
    };

    class Server{
    private:
    std::condition_variable full;
    std::condition_variable emp;
    std::mutex mtx; // 保护队列
    int64_t capicity;
    int64_t nums; // 队列中的数量
    int64_t emptys; // 空闲数量
    int stragegie; // 策略
    Dispatch *dispatch;
    MyQueue<Order> *que;
    MyQueue<Courier> *couriers;
    Notify notify;
    bool dispatching;

    public:
    int InitServer(Config *conf);
    void DeInitServer();

    int ReadData(const char* file);
    
    int DispatchOrder();
    int Statistics();
    

    };

    int Server::InitServer(Config *conf)
    {
    // default conf
    if (conf == NULL) {
    stragegie = STRATEGIE_FIFO;
    capicity = 10;
    } else {
    stragegie = conf->st;
    capicity = conf->cap;
    }

    if (stragegie == STRATEGIE_MATCH) {
        Matched *match = new Matched();
        dispatch = (Dispatch*)match;
        Queue<Order> *que = new Queue<Order>();
        que->Init(capicity);
        this->que = (Queue<Order>*)que;
        this->couriers = new Queue<Courier>();
    } else if (stragegie == STRATEGIE_FIFO) {
        FIFO *fifo = new FIFO();
        dispatch = (Dispatch*)fifo;
        ProQueue<Order> *que = new ProQueue<Order>();
        que->Init(capicity);
        this->que = (ProQueue<Order>*)que;
        this->couriers = new ProQueue<Courier>();
    }
    dispatching = true;
    return 0;
    

    }

    int Server::ReadData(const char* filename)
    {
    Json::Reader reader;
    Json::Value root;

    std::ifstream is;
    
    is.open(filename, std::ios::binary);
    int64_t cid = 100;
    if (reader.parse(is, root, false)) {
        int size = root.size();
        cout<<"size "<<size<<endl;
        for (int i = 0; i < size;) {
            if (i % 2 == 1) {
                std::this_thread::sleep_for(2000ms);
            }
            std::unique_lock<std::mutex> lock(mtx);
            this->emp.wait(lock, [this]{return !que->IsFull();});
            // 不足两个空间怎么办?
            int j = 0;
            for (; j < 2 && i + j < size;) {
                Order order;
                Courier courier;
                courier.id = cid++;
                Json::Value tmp = root[i + j];
                order.id = tmp["id"].asString();
                order.name = tmp["name"].asString();
                order.prepareTime = tmp["prepTime"].asInt() * 1000;
                order.dispatchTime = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
                order.finishTime = order.dispatchTime + order.prepareTime;
                courier.arriveTime = order.dispatchTime + dispatch->CariesArrivedTime(3, 15) * 1000;
                que->Push(order);
                notify.Print("order recived: " + order.id +"time "+ to_string(order.dispatchTime));
                couriers->Push(courier);
                notify.Print("courier dispatched: " + to_string(courier.id) + "time " + to_string(order.dispatchTime));
                //notify.Print("push " + order.name + " " + to_string(order.finishTime));
                j++;
            }
            i += j;
            emp.notify_all();
        }
        while (true) {
            std::this_thread::sleep_for(2000ms);
            std::unique_lock<std::mutex> lock(mtx);
            this->emp.wait(lock, [this]{return que->IsEmpty();});
            this->Statistics();
            cout<<" que is empty()\n";
            this->dispatching = false;
            break;
        }
    
    } else {
        cout<<"open file "<<filename<<" failed\n";
    }
    cout<<"close file \n";
    is.close();
    
    return 0;
    

    }
    /*
    int Server::ReadData(const char* file)
    {
    auto start = system_clock::now();
    auto end = system_clock::now();

    // fen duan duqu?
    static int times = 0;
    while (times < 20) {
        std::this_thread::sleep_for(2000ms);
        std::unique_lock<std::mutex> lock(mtx);
    
        this->emp.wait(lock, [this]{return !que->IsFull();});
    
        static int64_t id = 0;
        for (int i = 0; i < 2; i++) {
            Order order;
            order.name = "rndNO" + to_string(id) + "*";
            order.id = to_string(id);
            id++;
            order.prepareTime = rand() % 15000; // 0-15s
            order.dispatchTime = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
            order.finishTime = order.dispatchTime + order.prepareTime;
            que->Push(order);
            Courier courier;
            courier.arriveTime = order.dispatchTime + dispatch->CariesArrivedTime(3, 15) * 1000;
            if(couriers->Push(courier) != 0) {
                cout<<"couriers push error\n";
            }
            notify.Print("push " + order.name + " " + to_string(order.finishTime));
        }
        times++;
        if (times >= 20) {
            int64_t fw = dispatch->GetAvgFoodWaitTime();
            int64_t cw = dispatch->GetAvgCariesWaitTime();
            cout.flush();
            notify.Print("food wait time: " + to_string(fw) + "ms couries wait time: " + to_string(cw));
            //std::quick_exit();
            exit(0);
        }
        emp.notify_all();
    }
    

    }*/

    int Server::Statistics()
    {
    int64_t fw = dispatch->GetAvgFoodWaitTime();
    int64_t cw = dispatch->GetAvgCariesWaitTime();
    cout.flush();
    notify.Print("food wait time: " + to_string(fw) + "ms couries wait time: " + to_string(cw));
    return 0;
    }

    int Server::DispatchOrder()
    {
    while (this->dispatching) {
    std::this_thread::sleep_for(100ms);
    std::unique_lock<std::mutex> lock(mtx);
    this->emp.wait(lock, [this]{return !que->IsEmpty();});
    if (dispatch->DispatchOrder(que, couriers) == -2) { // order not prepared, couries not arrived

        }
        emp.notify_all();
    }
    

    }

    int ProductOrder(Server* server, const char* file)
    {
    pthread_setname_np(pthread_self(), "product");
    return server->ReadData(file);
    }

    int DispatchOrder(Server* server)
    {
    pthread_setname_np(pthread_self(), "dispatch");
    return server->DispatchOrder();
    }

    int main()
    {
    /* Queue myque;
    myque.Init(10);
    Input input;
    //myque.Init(100);
    Matched match;

    thread read(Read, "/home/err", &myque);
    thread disp(DispatchMeth, &myque, &match);
    read.join();
    disp.join(); */
    
    Server server;
    server.InitServer(NULL);
    
    thread read(ProductOrder, &server, "dispatch_orders.json");
    
    thread disp(DispatchOrder, &server);
    //thread disp1(DispatchOrder, &server);
    read.join();
    disp.join();
    //disp1.join();
    server.Statistics();
    

    }

    cmake_minimum_required(VERSION 3.16)
    project(SERVER)
    
    include_directories(
        ${CMAKE_SOURCE_DIR}/include
    )
    
    link_directories(
        ${CMAKE_SOURCE_DIR}/lib
    )
    aux_source_directory(. DIR_SRCS)
    
    
    add_executable(Server ${DIR_SRCS})
    
    target_link_libraries(Server PUBLIC -lpthread -ljsoncpp)
    

    相关文章

      网友评论

          本文标题:2022-09-16

          本文链接:https://www.haomeiwen.com/subject/glwjortx.html