美文网首页
MXNet Based on ps-kafka

MXNet Based on ps-kafka

作者: gb_QA_log | 来源:发表于2018-04-22 21:55 被阅读0次

configure

  • modify MXNet的Makefile
  • modify ps-kafka的Makefile和make/*.mk
  • 修改了MXNet和dmlc-core的一小部分,用来解析--brokers
  • 修改了ps-lite,变成基于kafka的ps-kafka
  • 修改kafka配置,根据已有实验MXNet单个消息大小可以达到1600000bytes,rdkafka默认message.max.bytes为1000000。
    • 需要修改Broker中的 message.max.bytes=104857600

    • Broker中的socket.request.max.bytes大于message.max.byte

    • 修改Broker中的fetch.message.max.bytes大于message.max.byte

    • 配置rdkafka的Producer和Consumer的message.max.bytes = message.max.byte。

    • 配置rdkafka的C的fetch.message.max.bytes >= message.max.bytes

    • Kafka的Topic的partitions <= brokre数量

    • 重启Kafka
      参考rdkafka docsKafka docs

broker.id=0
listeners=PLAINTEXT://YourThisHostIPorHostnameYouMustUseItAlways:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
#########################################################
delete.topic.enable=true
fetch.message.max.bytes=104857600
message.max.bytes=104857600
#auto.create.topics.enable=true
num.partitions=3
default.replication.factor=1
connections.max.idle.ms=1209600000 #2 weeks
socket.request.max.bytes=104857600

编译安装

编译

sudo yum install build-essential git
sudo yum install lapack-devel openblas-devel opencv-devel
# sudo apt-get install -y build-essential libatlas-base-dev libopencv-dev graphviz

git clone --recursive https://github.com/gbxu/mxnet-kafka.git
cd mxnet-kafka
# cmake -D USE_CUDA=OFF .
make clean_all
make -j $(nproc) USE_OPENCV=1 USE_BLAS=openblas USE_DIST_KVSTORE=1   USE_PROFILER=1 

更新mxnet 和 submodule

git pull
#cd ps-kafka 然后 git fetch 与 git merge  origin/master或者:
git submodule update --remote 3rdparty/ps-kafka

安装

sudo pip uninstall mxnet
cd python
sudo pip install --upgrade pip
sudo pip install -e .

运行

启动kafka broker

参考kafka搭建及测试

  • 创建三个topic,命令格式如下:
    ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic TOSCHEDULER
  • 删除topic命令:
    ./kafka-topics.sh --zookeeper localhost:2181 --delete --topic "TOWORKERS"
  • 后期增加partitions
    ./kafka-topics.sh --zookeeper node14:2181 --alter --topic TOWORKERS --partitions 5
  • topic及需要的partitions数量:
topic partitions
TOSERVERS -s+1
TOWORKERS -n+1
TOSCHEDULER 1

运行 MXNet-kafka

参考MXNet运行(分布式+动态库),复制库到example下的mxnet文件夹中,并为其他节点配置环境

最后的运行命令改为:

python ../../tools/launch.py  -s 1 -n 1 --launcher ssh -H hosts --brokers node14:9092,node15:9092,node16:9092 --sync-dst-dir /home/xugb/kafka_test/  python train_mnist.py --network lenet --kv-store dist_sync 

或者在python文件中增加一句环境变量即可
os.environ['BROKERS']='node14'

保存结果:
将上述命令写到out_1_1.sh,再运行下述命令即可:

nohup ./out_1_1.sh >/home/xugb/out/out14/kfk_s1n1_1.txt  &

查看当前结果:

tail -n 2 kfk_s1n1_1.txt 

==========
手动测试:

export DMLC_PS_ROOT_URI=10.0.0.14; export DMLC_ROLE=worker; export DMLC_PS_ROOT_PORT=9091; export BROKERS=node14:9092,node15:9092,node16:9092; export DMLC_NUM_WORKER=1; export DMLC_NUM_SERVER=1; cd /home/xugb/ic_kafka_test/; python train_mnist.py --network lenet --kv-store dist_sync

export DMLC_PS_ROOT_URI=10.0.0.14; export DMLC_ROLE=server; export DMLC_PS_ROOT_PORT=9091; export BROKERS=node14:9092,node15:9092,node16:9092; export DMLC_NUM_WORKER=1; export DMLC_NUM_SERVER=1; cd /home/xugb/ic_kafka_test/; python train_mnist.py --network lenet --kv-store dist_sync

export DMLC_PS_ROOT_URI=10.0.0.14; export DMLC_ROLE=scheduler; export DMLC_PS_ROOT_PORT=9091; export BROKERS=node14:9092,node15:9092,node16:9092; export DMLC_NUM_WORKER=1; export DMLC_NUM_SERVER=1;cd /home/xugb/ic_kafka_test/; python train_mnist.py --network lenet --kv-store dist_sync

==============
本地:

export DMLC_PS_ROOT_URI=127.0.0.1; export DMLC_PS_ROOT_PORT=8000; export DMLC_NUM_SERVER=1; export DMLC_NUM_WORKER=1; export BROKERS=localhost:9092; export DMLC_ROLE=scheduler;

export DMLC_PS_ROOT_URI=127.0.0.1;export DMLC_PS_ROOT_PORT=8000;export DMLC_NUM_SERVER=1;export DMLC_NUM_WORKER=1;export BROKERS=localhost:9092;export DMLC_NODE_HOST=127.0.0.1;export DMLC_ROLE=server;

export DMLC_PS_ROOT_URI=127.0.0.1;export DMLC_PS_ROOT_PORT=8000;export DMLC_NUM_SERVER=1;export DMLC_NUM_WORKER=1;export BROKERS=localhost:9092;export DMLC_NODE_HOST=127.0.0.1;export DMLC_ROLE=worker;

相关文章

网友评论

      本文标题:MXNet Based on ps-kafka

      本文链接:https://www.haomeiwen.com/subject/fgwzkftx.html