美文网首页
javascript/python 协程实现并发调度的示例

javascript/python 协程实现并发调度的示例

作者: golden_age | 来源:发表于2017-10-19 10:12 被阅读0次

    协程,也被称为“用户态线程”,是可以由用户去实现并发调度的一种语言设施。

    设想,用户并不知道协程,只提供一般的阻塞api,用户如下使用:

    import time
    
    
    time.sleep(1.0)
    print('after 1s')
    

    可以翻译成协程实现:

    import time
    
    def __thread():
     (yield time.sleep(1.0))
     print('after 1s')
     
    scheduler.spawn(__thread() )
    

    本方案示例了javascript里协程调度器,鉴于python generator和javascript generator模型的一致性,应很容易实现对应python版本。

    1.耗时操作提供异步版本API,如 settimeout 和 node.js API设计

    //异步等待
    settimeout(callback, ms)
     
    //异步读文件
    var fs = require('fs');
    fs.readFile(this.path, function(err, data) {
    //...
    });
    

    2.包装成 异步标记对象

    class coTime extends iYield {
        constructor(time) {
            super();
            this.time = time;
        }
    
        start(task) {
            task.yield.done = 'waiting';
            setTimeout(() => {
                task.yield = task.cofunc.next(this.time);
            }, this.time);
        }
    }
    
    //最终提供的API
    function wait(ms) {
        return new coTime(ms);
    }
    

    3.如多线程一样使用

    // 创建10个(用户)线程
    for (let i = 0; i < 10; ++i) {
        coManager.thread(
            (function*() {
                while (true) {
                    let time = yield wait(Math.random() * 2000);
                    console.log(`thread ${i} wait:` + time);
                }
            })()
        );
    }
    

    这里 coManager是协程调度器,原理是,实现一个异步任务队列,每次异步等待时,将协程中断(suspend),每异步返回时,将中断的协程继续,完整可执行的代码如下

    // 并发任务对象
    class coTask {
        constructor(cofunc) {
            this.cofunc = cofunc;
            this.yield = cofunc.next();
        }
    }
    
    class iYield {
        // 开始异步
        start(task) {}
    }
    
    // 读文件异步调用
    class coReadFile extends iYield {
        constructor(path) {
            super();
            this.path = path;
        }
    
        start(task) {
            item.yield.done = 'waiting';
            var fs = require('fs');
            fs.readFile(this.path, function(err, data) {
                if (err) {
                    task.yield = task.cofunc.throw(err);
                } else {
                    task.yield = task.cofunc.next(data); //done and next
                }
            });
        }
    }
    
    // helper
    function readFile(path) {
        return new coReadFile(path);
    }
    
    
    // 定时器异步对象
    class coTime extends iYield {
        constructor(time) {
            super();
            this.time = time;
        }
    
        start(task) {
            task.yield.done = 'waiting';
            setTimeout(() => {
                task.yield = task.cofunc.next(this.time);
            }, this.time);
        }
    }
    
    function wait(ms) {
        return new coTime(ms);
    }
    
    // 协程管理器,负责调度
    class CoroutineManager {
        constructor() {
            this.taskLs = [];
            this.update.bind(this);
            setInterval(() => {
                let taskLs = this.taskLs;
                this.taskLs = [];
                // 任务队列轮循
                for (let item of taskLs) {
                    if (item.yield.done != true) {
                        this.taskLs.push(item);
                    }
                }
                for (let item of taskLs) {
                    if (item.yield.done === 'waiting') {
                        continue;
                    } else if (item.yield.value instanceof iYield) {
                        item.yield.value.start(item);
                    } else {
                        item.yield = item.cofunc.next();
                    }
                }
            }, 1);
        }
    
        // 开(用户)线程
        thread(cofunc) {
            let yValue = cofunc.next();
            if (yValue.done) {
                return;
            }
            this.taskLs.push({ yield: yValue, cofunc });
        }
    }
    
    let coManager = new CoroutineManager();
    
    // 创建10个(用户)线程
    for (let i = 0; i < 10; ++i) {
        coManager.thread(
            (function*() {
                while (true) {
                    let time = yield wait(Math.random() * 2000);
                    console.log(`thread ${i} wait:` + time);
                }
            })()
        );
    }
    

    python协程:

    from browser import timer as systimer;
    import random
    import time
    
    class Co:
        def __init__(self):
            pass
        
        def getVal(self):
            return None
            
    class timer(Co):
        def __init__(self, last):
            self.last = last
            self.start = time.clock()
        
        def isDone(self):
            last = time.clock()-self.start
            if last>=self.last:
                return True
            else:
                return False
        
        def getVal(self):
            return int(random.random()*self.last)
        
    
    class Scheduler:
        def __init__(self):
            self.working = []
            self.starting = []
            self.timer = None
            
        def run(self):
            self.timer = systimer.set_interval(lambda: self.update(), 20);
            
        def update(self):
            starting = self.starting;
            self.starting = []
            for fun in starting:
                try:
                    print(1)
                    r = next(fun)
                    print(2)
                    while not isinstance(r, Co):
                        r = next(fun)
                    self.working.append((fun, r))
                except StopIteration:
                    pass
                except Exception as ex:
                    print(ex)
                    
            working = self.working;
            self.working = []
            for (fun, r) in working:
                try:
                    if r.isDone():
                        val = r.getVal()
                        r = fun.send(val)
                        while not isinstance(r, Co):
                            r = next(fun)
                        self.working.append((fun, r))
                    else:
                        self.working.append((fun, r))
                except StopIteration:
                    pass
                except Exception as ex:
                    print(ex)
                            
        def startCorotine(self, fun):
            self.starting.append(fun)
    
    
    scheduler = Scheduler()
    

    相关文章

      网友评论

          本文标题:javascript/python 协程实现并发调度的示例

          本文链接:https://www.haomeiwen.com/subject/msfuuxtx.html