美文网首页
Hyperopt 基于MongoDB的并行计算

Hyperopt 基于MongoDB的并行计算

作者: rick_z | 来源:发表于2018-07-27 16:18 被阅读0次

    Hyperopt是实现超参数优化的python第三方库, 最近发现其可以运用mongo进行并行计算, 稍微研究了一番,记录并分享一下.

    Mongo的安装就不说了, 遵循链接内容即可

    在Ubuntu下进行MongoDB安装步骤

    安装完成后启动mongo, 运行下官方的demo看一下:

    import math
    from hyperopt import fmin, tpe, hp
    from hyperopt.mongoexp import MongoTrials
    
    trials = MongoTrials('mongo://localhost:1234/foo_db/jobs', exp_key='exp1')
    best = fmin(math.sin, hp.uniform('x', -2, 2), trials=trials, algo=tpe.suggest, max_evals=10)
    

    以上的代码中, 实例化 MongoTrials 并赋值给trials变量, 其第一个参数是 mongo 进程, 数据库是 'foodb', 'jobs' 表. 'exp_key' 是任务的编号.(如果修改这个参数, 表明是一个新的任务, 会重新运行搜索而不是从数据库中取结果).
    实际运行demin的过程中, fmin 会被阻塞. 这是因为 MongoTrials 会将 fmin 作为异步对象, 所以出现新的搜索点(参数组合)时, fmin 不会去评估目标函数而是等待另一个进程替它完成这个工作.

    hyperopt-mongo-worker 脚本就是干这个活滴, 新开一个 shell 输入
    hyperopt-mongo-worker --mongo=localhost:1234/foo_db --poll-interval=0.1
    第一个参数就是 mongo 的地址, 第二个参数是轮询间隔. 由于demo很简单, 我们很快就得到一个最优的 x 值.

    但以上的demo太简单了, 我们想将自己编写的模型替换掉 math.sin. 以一个随机森林举例:

    import hyperopt.mongoexp
    import pandas as pd
    import numpy as np
    
    from hyperopt import fmin, tpe, hp, space_eval, pyll, rand, anneal
    from hyperopt.mongoexp import MongoTrials
    from sklearn.ensemble import RandomForestClassifier
    from sklearn.model_selection import cross_val_score, cross_val_predict, train_test_split
    
    def randomforest(args):
        class_weight = args['class_weight']
        criterion = args['criterion']
        min_impurity_split = args['min_impurity_split']
        n_estimators = args['n_estimators']
        min_samples_leaf = args['min_samples_leaf']
        min_samples_split = args['min_samples_split']
    
        estim = RandomForestClassifier(
                                                            n_estimators=n_estimators, 
                                                            class_weight=class_weight, 
                                                            criterion=criterion, 
                                                            min_impurity_decrease=min_impurity_split,
                                                            min_samples_leaf=min_samples_leaf,
                                                            min_samples_split = min_samples_split
                                                            )
        
        y_pred = cross_val_predict(estim, train_x, train_y, cv=3)
        metric = f1_score(train_y, y_pred)
        return -metric
    
    space = {
                'class_weight': hp.choice('class_weight', [None, 'balanced']),
                'criterion': hp.choice('criterion', ['gini', 'entropy']),
                'min_impurity_split': hp.lognormal('min_impurity_split', 1e-10, 1e-4)*1e-7,
                'min_samples_leaf': hp.randint('min_samples_leaf', 10)+1,
                'min_samples_split': hp.randint('min_samples_split', 10)+1,
                'n_estimators': hp.randint('n_estimators', 950)+50
                    }
    
    if __name__  == '__main__':
        trials = MongoTrials('mongo://localhost:1234/foo_db/jobs', exp_key='exp2')
        best = fmin(fn=randomforest, space=space,  algo=rand.suggest, max_evals=100, trials=trials)
        print best
    

    很遗憾有个属性错误, 就是找不到 randomforest 这个模块.
    AttributeError: Can't get attribute 'randomforest' on <module '__main__' from ...hyperopt-mongo-worker
    google了一下, 有网友给出了一些解决办法, 我们先将 objective function 写到另外的脚本中, 例如:

    # hyperopt_model.py
    # !-*- coding: utf-8 -*-
    from sklearn.metrics import accuracy_score
    from sklearn.ensemble import RandomForestClassifier
    from sklearn.model_selection import cross_val_score, cross_val_predict,  train_test_split
    
    import pandas as pd
    df = pd.read_csv('xxxxx.csv', header=0)
    y, X = df[df.columns[0]], df[df.columns[1:]]
    
    def randomforest(args):
        n_estimators = args['n_estimators']
        criterion = args['criterion']
        max_features = args['max_features']
        min_impurity_split = args['min_impurity_split']
        min_samples_leaf = args['min_samples_leaf']
        min_samples_split = args['min_samples_split']
        class_weight = args['class_weight']
    
        global X, y
        clf = RandomForestClassifier(
                                    class_weight=class_weight,
                                    criterion=criterion,
                                    max_features=max_features,
                                    min_samples_leaf=min_samples_leaf,
                                    min_impurity_split=min_impurity_split,
                                    min_samples_split=min_samples_split,
                                    n_estimators=n_estimators,
                                    random_state=1
                                    )
        y_pred = cross_val_predict(clf, X, y, cv=3)
        metric = accuracy_score(y, y_pred)
        return -metric
    

    将这个脚本命名为 hyperopt_model.py 并将其写入环境变量中, 顺便修改下最上面的脚本:
    export PYTHONPATH="${PYTHONPATH}:<hyperopt_model.py>"

    import pandas as pd
    import numpy as np
    import hyperopt_model
    import hyperopt.mongoexp
    
    from hyperopt import fmin, tpe, hp, space_eval, pyll, rand, anneal
    from hyperopt.mongoexp import MongoTrials
    from sklearn.ensemble import RandomForestClassifier
    from sklearn.model_selection import cross_val_score, cross_val_predict, train_test_split
    
    if __name__  == '__main__':
        trials = MongoTrials('mongo://localhost:1234/foo_db/jobs', exp_key='exp2')
        best = fmin(fn=hyperopt_model.randomforest, space=hyperopt_model.space,  algo=rand.suggest, max_evals=100, trials=trials)
        print best
    

    之后再运行 hyperopt-mongo-worker 就ok了, 总体时间消耗大概降低了50% 左右.

    我还尝试了用进程管理池管理这两个进程(代码如下), 但是总有一些error没有解决, 如果那位大佬有更好的方法, 烦请告知, 感谢!

    # coding: utf-8
    import sys
    import logging
    import hyperopt_model
    
    from multiprocessing import Pool, Process
    from hyperopt import fmin, tpe, hp, rand
    from hyperopt.mongoexp import MongoTrials
    
    
    def task1():
        logging.basicConfig(stream=sys.stderr, level=logging.INFO)
        print 'task1 running'    
        sys.exit(hyperopt.mongoexp.main_worker())
        
    
    def task2(msg):
        trials = MongoTrials('mongo://localhost:1234/foo_db/jobs', exp_key='exp3')
        best = fmin(fn=hyperopt_model.randomforest, space=hyperopt_model.space,  algo=rand.suggest, max_evals=100, trials=trials)
        print msg
        print 'task2 is running'
        return best
    
    if __name__ == '__main__':
        pool = Pool(processes=4)
        p = Process(target=task1)
        
        p.start()
        ret = pool.apply_async(task2, args=(1,))
        
        pool.close()
        pool.join()
        p.join()
        
        print 'processes done, result:'
        print ret.get()
    
    ### hyperopt  ### MongoDB  ### 并行计算  ### 自定义超参优化模型

    相关文章

      网友评论

          本文标题:Hyperopt 基于MongoDB的并行计算

          本文链接:https://www.haomeiwen.com/subject/nydgmftx.html