美文网首页实时数据相关
flink cep,使用groovy脚本及表达式求值

flink cep,使用groovy脚本及表达式求值

作者: 岳过山丘 | 来源:发表于2019-01-02 19:50 被阅读0次

    提要:
    使用表达式求值和groovy脚本来生成flink cep Pattern。减少代码量,一份代码可通过传script脚本的方式生成不同的pattern。
    1.source

     SingleOutputStreamOperator<MetricEvent> metricEvent = dataStreamSource
                    .flatMap(new ParseMetricEventFunction()).returns(MetricEvent.class);
    

    2.pattern

    Pattern p1 = ScriptEngine.getPattern(
    
                    "  import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy\n" +
                            "import org.apache.flink.cep.pattern.Pattern\n" +
                            "import test.cep.function.AviatorCondition \n" +
    
                            "where1 = new AviatorCondition(" +
                            "   \"getT(tags,\\\"cluster_name\\\")==\\\"terminus-x\\\"&&getF(fields,\\\"load5\\\")>15 \"" +
                            "        )\n" +
    
                            "def get(){ " +
                            "      return Pattern.begin(\"start\", AfterMatchSkipStrategy.noSkip())\n" +
                            "        .where(where1)" +
                            "}",
                    "get");
    

    3.result

      PatternStream pStream2 = CEP.pattern(metricEvent.keyBy(metricEvent1 -> metricEvent1.getName() + Joiner.on(",").join(metricEvent1.getTags().values())), p);
            SingleOutputStreamOperator filter2 = pStream2.select(new PatternSelectFunction<MetricEvent, String>() {
                @Override
                public String select(Map<String, List<MetricEvent>> pattern) throws Exception {
                    return "-----------------------------" + pattern.toString();
                }
            });
    
            filter2.print();
    
            StateBackend memory = new MemoryStateBackend(10 * 5 * 1024 * 1024, true);
            env.setStateBackend(memory);
            env.execute("----flink cep alert ----");
    

    4.model

    @Data
    public class MetricEvent {
    
        private String name;
    
        private long timestamp;
    
        private Map<String, Object> fields = new HashMap<>();
    
        private Map<String, String> tags = new HashMap<>();
    }
    

    5.ScriptEngine

    import org.apache.flink.cep.pattern.Pattern;
    
    import javax.script.Bindings;
    import javax.script.Invocable;
    import javax.script.ScriptEngineManager;
    import javax.script.ScriptException;
    import java.util.Date;
    
    public class ScriptEngine {
    
       public static Pattern getPattern(String text,String name) throws ScriptException, NoSuchMethodException {
           ScriptEngineManager factory = new ScriptEngineManager();
           javax.script.ScriptEngine engine =  factory.getEngineByName("groovy");
           System.out.println(engine.toString());
           assert engine != null;
           engine.eval(text);
           Pattern pattern = (Pattern)((Invocable)engine).invokeFunction(name);
           return pattern;
       }
    }
    

    6.表达式相关
    GetTagMapFunction

    public class GetTagMapFunction extends AbstractFunction {
        @Override
        public String getName() {
            return "getF";
        }
        @Override
        public AviatorDouble call(Map<String, Object> env, AviatorObject args1, AviatorObject args2) {
            Map<String, Object> map = (Map<String, Object>) FunctionUtils.getJavaObject(args1, env);
            String field = FunctionUtils.getStringValue(args2, env);
            return new AviatorDouble((Double) map.get(field));
        }
    }
    

    GetFieldMapFunction

    public class GetFieldMapFunction extends AbstractFunction {
        @Override
        public String getName() {
            return "getT";
        }
    
        @Override
        public AviatorString call(Map<String, Object> env, AviatorObject args1, AviatorObject args2) {
            Map<String, String> map = (Map<String, String>) FunctionUtils.getJavaObject(args1, env);
            String field = FunctionUtils.getStringValue(args2, env);
            return new AviatorString(map.get(field));
        }
    
    }
    
    

    7.表达式Condition,注册表达式

    public class AviatorCondition extends SimpleCondition<MetricEvent> implements Serializable {
        private static Logger logger = LoggerFactory.getLogger(AviatorCondition.class);
        private String script;
        static {
            AviatorEvaluator.addFunction(new GetFieldMapFunction());
            AviatorEvaluator.addFunction(new GetTagMapFunction());
        }
        public AviatorCondition(String script) {
            this.script = script;
        }
        @Override
        public boolean filter(MetricEvent event) throws Exception {
            Map<String, Object> env = new HashMap<String, Object>();
            env.put("event", event);
            env.put("fields", event.getFields());
            env.put("tags", event.getTags());
            Boolean result = false;
            try {
                result = (Boolean) AviatorEvaluator.execute(script, env);
            } catch (Exception e) {
                logger.error("execute script with event error,script:{},event:{},error;{}", script, event, e);
            }
            return result;
        }
    }
    

    8.依赖

     <dependency>
                <groupId>com.googlecode.aviator</groupId>
                <artifactId>aviator</artifactId>
                <version>2.3.3</version>
            </dependency>
            <dependency>
                <groupId>org.codehaus.groovy</groupId>
                <artifactId>groovy</artifactId>
                <version>2.4.7</version>
            </dependency>
            <dependency>
                <groupId>org.codehaus.groovy</groupId>
                <artifactId>groovy-jsr223</artifactId>
                <version>2.4.7</version>
            </dependency>
            <dependency>
                <groupId>org.codehaus.groovy</groupId>
                <artifactId>groovy</artifactId>
                <version>2.4.7</version>
            </dependency>
           
    
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.51</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-cep-scala_2.11</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>5.1.34</version>
            </dependency>
            <dependency>
                <groupId>com.google.guava</groupId>
                <artifactId>guava</artifactId>
                <version>26.0-jre</version>
            </dependency>
    

    相关文章

      网友评论

        本文标题:flink cep,使用groovy脚本及表达式求值

        本文链接:https://www.haomeiwen.com/subject/odjtrqtx.html