美文网首页
MXNet Based on ps-kafka

MXNet Based on ps-kafka

作者: gb_QA_log | 来源:发表于2018-04-22 21:55 被阅读0次

    configure

    • modify MXNet的Makefile
    • modify ps-kafka的Makefile和make/*.mk
    • 修改了MXNet和dmlc-core的一小部分,用来解析--brokers
    • 修改了ps-lite,变成基于kafka的ps-kafka
    • 修改kafka配置,根据已有实验MXNet单个消息大小可以达到1600000bytes,rdkafka默认message.max.bytes为1000000。
      • 需要修改Broker中的 message.max.bytes=104857600

      • Broker中的socket.request.max.bytes大于message.max.byte

      • 修改Broker中的fetch.message.max.bytes大于message.max.byte

      • 配置rdkafka的Producer和Consumer的message.max.bytes = message.max.byte。

      • 配置rdkafka的C的fetch.message.max.bytes >= message.max.bytes

      • Kafka的Topic的partitions <= brokre数量

      • 重启Kafka
        参考rdkafka docsKafka docs

    broker.id=0
    listeners=PLAINTEXT://YourThisHostIPorHostnameYouMustUseItAlways:9092
    num.network.threads=3
    num.io.threads=8
    socket.send.buffer.bytes=102400
    socket.receive.buffer.bytes=102400
    log.dirs=/tmp/kafka-logs
    num.partitions=1
    num.recovery.threads.per.data.dir=1
    offsets.topic.replication.factor=1
    transaction.state.log.replication.factor=1
    transaction.state.log.min.isr=1
    log.retention.hours=168
    log.segment.bytes=1073741824
    log.retention.check.interval.ms=300000
    zookeeper.connect=localhost:2181
    zookeeper.connection.timeout.ms=6000
    group.initial.rebalance.delay.ms=0
    #########################################################
    delete.topic.enable=true
    fetch.message.max.bytes=104857600
    message.max.bytes=104857600
    #auto.create.topics.enable=true
    num.partitions=3
    default.replication.factor=1
    connections.max.idle.ms=1209600000 #2 weeks
    socket.request.max.bytes=104857600
    

    编译安装

    编译

    sudo yum install build-essential git
    sudo yum install lapack-devel openblas-devel opencv-devel
    # sudo apt-get install -y build-essential libatlas-base-dev libopencv-dev graphviz
    
    git clone --recursive https://github.com/gbxu/mxnet-kafka.git
    cd mxnet-kafka
    # cmake -D USE_CUDA=OFF .
    make clean_all
    make -j $(nproc) USE_OPENCV=1 USE_BLAS=openblas USE_DIST_KVSTORE=1   USE_PROFILER=1 
    

    更新mxnet 和 submodule

    git pull
    #cd ps-kafka 然后 git fetch 与 git merge  origin/master或者:
    git submodule update --remote 3rdparty/ps-kafka
    

    安装

    sudo pip uninstall mxnet
    cd python
    sudo pip install --upgrade pip
    sudo pip install -e .
    

    运行

    启动kafka broker

    参考kafka搭建及测试

    • 创建三个topic,命令格式如下:
      ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic TOSCHEDULER
    • 删除topic命令:
      ./kafka-topics.sh --zookeeper localhost:2181 --delete --topic "TOWORKERS"
    • 后期增加partitions
      ./kafka-topics.sh --zookeeper node14:2181 --alter --topic TOWORKERS --partitions 5
    • topic及需要的partitions数量:
    topic partitions
    TOSERVERS -s+1
    TOWORKERS -n+1
    TOSCHEDULER 1

    运行 MXNet-kafka

    参考MXNet运行(分布式+动态库),复制库到example下的mxnet文件夹中,并为其他节点配置环境

    最后的运行命令改为:

    python ../../tools/launch.py  -s 1 -n 1 --launcher ssh -H hosts --brokers node14:9092,node15:9092,node16:9092 --sync-dst-dir /home/xugb/kafka_test/  python train_mnist.py --network lenet --kv-store dist_sync 
    

    或者在python文件中增加一句环境变量即可
    os.environ['BROKERS']='node14'

    保存结果:
    将上述命令写到out_1_1.sh,再运行下述命令即可:

    nohup ./out_1_1.sh >/home/xugb/out/out14/kfk_s1n1_1.txt  &
    

    查看当前结果:

    tail -n 2 kfk_s1n1_1.txt 
    

    ==========
    手动测试:

    export DMLC_PS_ROOT_URI=10.0.0.14; export DMLC_ROLE=worker; export DMLC_PS_ROOT_PORT=9091; export BROKERS=node14:9092,node15:9092,node16:9092; export DMLC_NUM_WORKER=1; export DMLC_NUM_SERVER=1; cd /home/xugb/ic_kafka_test/; python train_mnist.py --network lenet --kv-store dist_sync

    export DMLC_PS_ROOT_URI=10.0.0.14; export DMLC_ROLE=server; export DMLC_PS_ROOT_PORT=9091; export BROKERS=node14:9092,node15:9092,node16:9092; export DMLC_NUM_WORKER=1; export DMLC_NUM_SERVER=1; cd /home/xugb/ic_kafka_test/; python train_mnist.py --network lenet --kv-store dist_sync

    export DMLC_PS_ROOT_URI=10.0.0.14; export DMLC_ROLE=scheduler; export DMLC_PS_ROOT_PORT=9091; export BROKERS=node14:9092,node15:9092,node16:9092; export DMLC_NUM_WORKER=1; export DMLC_NUM_SERVER=1;cd /home/xugb/ic_kafka_test/; python train_mnist.py --network lenet --kv-store dist_sync

    ==============
    本地:

    export DMLC_PS_ROOT_URI=127.0.0.1; export DMLC_PS_ROOT_PORT=8000; export DMLC_NUM_SERVER=1; export DMLC_NUM_WORKER=1; export BROKERS=localhost:9092; export DMLC_ROLE=scheduler;

    export DMLC_PS_ROOT_URI=127.0.0.1;export DMLC_PS_ROOT_PORT=8000;export DMLC_NUM_SERVER=1;export DMLC_NUM_WORKER=1;export BROKERS=localhost:9092;export DMLC_NODE_HOST=127.0.0.1;export DMLC_ROLE=server;

    export DMLC_PS_ROOT_URI=127.0.0.1;export DMLC_PS_ROOT_PORT=8000;export DMLC_NUM_SERVER=1;export DMLC_NUM_WORKER=1;export BROKERS=localhost:9092;export DMLC_NODE_HOST=127.0.0.1;export DMLC_ROLE=worker;

    相关文章

      网友评论

          本文标题:MXNet Based on ps-kafka

          本文链接:https://www.haomeiwen.com/subject/fgwzkftx.html