美文网首页
flume自定义Inteceptor

flume自定义Inteceptor

作者: 王金松 | 来源:发表于2019-10-22 09:55 被阅读0次
    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.JSONArray;
    import com.alibaba.fastjson.JSONObject;
    import com.google.common.base.Charsets;
    import com.google.common.collect.Lists;
    import com.google.common.collect.Sets;
    import com.google.gson.Gson;
    import com.jd.common.Location;
    import com.jd.constant.DataBaseConstant;
    import com.jd.constant.Globals;
    import com.jd.datasource.MysqlManager;
    import com.jd.datasource.MysqlPoolManager;
    import com.jd.entity.HoneypotAttack;
    import com.jd.util.DateUtils;
    import com.jd.util.GeoHelper;
    import com.jd.util.IDGeneratorUtils;
    import com.jd.util.ListUtils;
    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.flume.interceptor.Interceptor;
    import org.apache.flume.source.kafka.KafkaSourceConstants;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.util.StringUtils;
    
    import java.nio.charset.Charset;
    import java.text.SimpleDateFormat;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.Set;
    
    public class InternalAssetInterceptor implements Interceptor {
        private static final Logger logger = LoggerFactory
                .getLogger(InternalAssetInterceptor.class);
    
        private Charset charset = Charsets.UTF_8;
        private Context context;
        private MysqlPoolManager mysqlPool = null;
        int expiredTime = 3600;
        private static Map<String, String> internalAssetMap = new HashMap<>();
    
    
    
        public InternalAssetInterceptor(Context context){
            Integer expired = context.getInteger("dataexpired");
            if (expired != null) {
                expiredTime = expired;
            }
            this.context = context;
        }
    
        @Override
        public void initialize() {
            try {
                mysqlPool = new MysqlPoolManager(context);
                mysqlPool.init();
                initAttackDic();
                Thread t = new Thread(() -> {
                    try {
                        Thread.sleep(expiredTime * 1000);
                    } catch (Exception e) {}
                    logger.info("start sync attackInfo");
                    initAttackDic();
                });
                t.setDaemon(true);
                t.start();
            } catch (Exception e) {
                logger.error("flume agent init failed", e);
                throw new RuntimeException(e);
            }
        }
    
        private void initAttackDic() {
            List<Map<String, Object>> mapList = new MysqlManager(mysqlPool).queryForList("select * from op_np_pin");
            for (Map<String, Object> map: mapList) {
                try {
                    String pin = map.get("pin").toString();
                    String ip = map.get("ip").toString();
                    internalAssetMap.put(ip, pin);
                } catch (Exception e) {
                    logger.error("sync interval asset error {}", map);
                }
            }
            mapList = null;
            logger.info("sync internal asset success");
        }
    
        @Override
        public Event intercept(Event event){
            try {
                String oldBody = new String(event.getBody(), charset);
                JSONObject body = JSON.parseObject(oldBody);
                JSONObject jdcloud_alert = body.getJSONObject("jdcloud_alert");
                String floattingIp = jdcloud_alert.getString("floating_ip");
                String pin = jdcloud_alert.getString("pin");
                if (StringUtils.isEmpty(pin)) {
                    String depart = internalAssetMap.getOrDefault(floattingIp, "");
                    jdcloud_alert.put("pin", depart);
                }
                event.setBody(body.toJSONString().getBytes(charset));
                return event;
            }catch (Exception e) {
                logger.error("Failed to process event " + event, e);
                event = null;
            }
            return event;
        }
    
        @Override
        public List<Event> intercept(List<Event> events) {
            List<Event> out = Lists.newArrayList();
            for (Event event : events) {
                Event outEvent = intercept(event);
                if (outEvent != null) {
                    out.add(outEvent);
                }
            }
            return out;
        }
    
        @Override
        public void close() {
    
        }
    
        public static class Builder implements Interceptor.Builder {
            private Context context;
    
            @Override
            public Interceptor build() {
                return new InternalAssetInterceptor(context);
            }
    
            @Override
            public void configure(Context context) {
                this.context = context;
            }
        }
    }
    

    相关文章

      网友评论

          本文标题:flume自定义Inteceptor

          本文链接:https://www.haomeiwen.com/subject/qvgfvctx.html