美文网首页
Giraph源码分析(七)—— 添加消息统计功能

Giraph源码分析(七)—— 添加消息统计功能

作者: 数澜科技 | 来源:发表于2019-08-19 16:49 被阅读0次

    作者|白松

    1、 添加类,把每个超步发送的消息量大小写入Hadoop的Counter中。在org.apache.giraph.counters包下新建GiraphMessages类,来统计消息量。

    源代码如下:

    package org.apache.giraph.counters;
    
    import java.util.Iterator;
    import java.util.Map;
    
    import org.apache.hadoop.mapreduce.Mapper.Context;
    import com.google.common.collect.Maps;
    
    /**
     * Hadoop Counters in group "Giraph Messages" for counting every superstep
     * message count.
     */
    public class GiraphMessages extends HadoopCountersBase {
        /** Counter group name for the giraph Messages */
        public static final String GROUP_NAME = "Giraph Messages";
    
        /** Singleton instance for everyone to use */
        private static GiraphMessages INSTANCE;
    
        /** superstep time in msec */
        private final Map superstepMessages;
    
        private GiraphMessages(Context context) {
            super(context, GROUP_NAME);
            superstepMessages = Maps.newHashMap();
        }
    
        /**
         * Instantiate with Hadoop Context.
         * 
         * @param context
         *            Hadoop Context to use.
         */
        public static void init(Context context) {
            INSTANCE = new GiraphMessages(context);
        }
    
        /**
         * Get singleton instance.
         * 
         * @return singleton GiraphTimers instance.
         */
        public static GiraphMessages getInstance() {
            return INSTANCE;
        }
    
        /**
         * Get counter for superstep messages
         * 
         * @param superstep
         * @return
         */
        public GiraphHadoopCounter getSuperstepMessages(long superstep) {
            GiraphHadoopCounter counter = superstepMessages.get(superstep);
            if (counter == null) {
                String counterPrefix = "Superstep- " + superstep+" ";
                counter = getCounter(counterPrefix);
                superstepMessages.put(superstep, counter);
            }
            return counter;
        }
    
        @Override
        public Iterator iterator() {
            return superstepMessages.values().iterator();
        }
    }
    

    2、在BspServiceMaster类中添加统计功能。Master在每次同步时候,会聚集每个Worker发送的消息量大小(求和),存储于GlobalStats中。因此只需要在每次同步后,从GlobalStats对象中取出总的通信量大小,然后写入GiraphMessages中。格式为<SuperStep-Number,TotalMessagesCount>,实际存储于上步GiraphMessages类中定义的Map<Long, GiraphHadoopCounter> superstepMessages 对象中。 在BspServiceMaster的构造方法中,最后面追加一行代码,对GiraphMessages进行初始化。

    GiraphMessages.init(context);
    

    在BspServiceMaster类的SuperstepState coordinateSuperstep()方法中,添加记录功能。片段代码如下:

    ……
    // If the master is halted or all the vertices voted to halt and there
    // are no more messages in the system, stop the computation
    GlobalStats globalStats = aggregateWorkerStats(getSuperstep());  
    
    LOG.info("D-globalStats: "+globalStats+"\n\n");
    //添加下面语句。从第0个超步起开始记录。
    if(getSuperstep() != INPUT_SUPERSTEP) {
        GiraphMessages.getInstance().getSuperstepMessages(getSuperstep()).increment(globalStats.getMessageCount());
    }
    ……
    

    3、实验结果如下:

    1.png

    完!

    相关文章

      网友评论

          本文标题:Giraph源码分析(七)—— 添加消息统计功能

          本文链接:https://www.haomeiwen.com/subject/rxkesctx.html