美文网首页
flume:一个例子的分析(二)

flume:一个例子的分析(二)

作者: 博弈史密斯 | 来源:发表于2018-07-19 17:16 被阅读0次

    在 上篇中,flume 使用的是自定义拦截器:LogAnalysisInterceptor ,下面看下代码:

    package com.glbg.flume.interceptors;
    
    
    /**
     * @Description: TODO 日志解析拦截器
     */
    public class LogAnalysisInterceptor implements Interceptor {
    
        @Override
        public void initialize(){
        }
        
        @Override
        public void close(){
        }
    
        @Override
        public Event intercept(Event event){
            Map<String, String> headers = event.getHeaders();
            String body = new String(event.getBody(),"UTF-8");
    
            headers.put(Constant.LOGTYPE, Constant.APP_LOG);
            
            body = URLDecoder.decode(body,"UTF-8");  //先解码
            
            //UTC时区
            String eventDate = PatternUtil.getValueByPattern(Constant.APPFLAYER_EVENT_DATE_PATTERN, body);
            
            String appId = PatternUtil.getValueByPattern(Constant.APPFLAYER_APP_ID_PATTERN, body);
            
            if (!StringUtils.isEmpty(eventDate) && !StringUtils.isEmpty(appId)){
                String[] dateArray = eventDate.split("-");
                
                headers.put(Constant.YEAR, dateArray[0]);
                headers.put(Constant.MONTH, dateArray[1]);
                headers.put(Constant.DAY, dateArray[2]);
                headers.put(Constant.UBCD, Constant.APP_SITE_CODE_MAP.get(appId));
                headers.put(Constant.DATAROUTE, Constant.RIGHT);
    
            } else {
                headers.put(Constant.DATAROUTE, Constant.WRONG);
            }
            
            return event;
        }
    
        @Override
        public List<Event> intercept(List<Event> events){
            List<Event> intercepted = Lists.newArrayListWithCapacity(events.size());
            for (Event event : events){
                Event interceptedEvent = intercept(event);
                if (interceptedEvent != null){
                    intercepted.add(interceptedEvent);
                }
            }
            return intercepted;
        }
    }
    
    public class Constant {
        
        public static final String LOGTYPE = "logtype";
        public static final String APP_LOG = "app-log";
        public static final String APPFLAYER_EVENT_DATE_PATTERN = "event_date_pattern";
        public static final String APPFLAYER_APP_ID_PATTERN = "app_id_pattern";
        public static final String ubcd="ubcd";
        
        public static final String TIME_ZONE_EST = "EST";   //西五区
        public static final String YEAR = "YEAR";
        public static final String MONTH = "MONTH";
        public static final String DAY = "DAY";
        
        public static final String wrong = "wrong";
    
        
        public static Map<String,String> APP_SITE_CODE_MAP = new HashMap<String,String>();
        
        static {
            APP_SITE_CODE_MAP.put("id1078789949", "10013");
            APP_SITE_CODE_MAP.put("com.zaful", "10013");
            APP_SITE_CODE_MAP.put("id1131090631", "10002");
            APP_SITE_CODE_MAP.put("com.globalegrow.app.gearbest", "10002");
        }
    }
    

    相关文章

      网友评论

          本文标题:flume:一个例子的分析(二)

          本文链接:https://www.haomeiwen.com/subject/vxsxyftx.html