提要:
使用表达式求值和groovy脚本来生成flink cep Pattern。减少代码量,一份代码可通过传script脚本的方式生成不同的pattern。
1.source
SingleOutputStreamOperator<MetricEvent> metricEvent = dataStreamSource
.flatMap(new ParseMetricEventFunction()).returns(MetricEvent.class);
2.pattern
Pattern p1 = ScriptEngine.getPattern(
" import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy\n" +
"import org.apache.flink.cep.pattern.Pattern\n" +
"import test.cep.function.AviatorCondition \n" +
"where1 = new AviatorCondition(" +
" \"getT(tags,\\\"cluster_name\\\")==\\\"terminus-x\\\"&&getF(fields,\\\"load5\\\")>15 \"" +
" )\n" +
"def get(){ " +
" return Pattern.begin(\"start\", AfterMatchSkipStrategy.noSkip())\n" +
" .where(where1)" +
"}",
"get");
3.result
PatternStream pStream2 = CEP.pattern(metricEvent.keyBy(metricEvent1 -> metricEvent1.getName() + Joiner.on(",").join(metricEvent1.getTags().values())), p);
SingleOutputStreamOperator filter2 = pStream2.select(new PatternSelectFunction<MetricEvent, String>() {
@Override
public String select(Map<String, List<MetricEvent>> pattern) throws Exception {
return "-----------------------------" + pattern.toString();
}
});
filter2.print();
StateBackend memory = new MemoryStateBackend(10 * 5 * 1024 * 1024, true);
env.setStateBackend(memory);
env.execute("----flink cep alert ----");
4.model
@Data
public class MetricEvent {
private String name;
private long timestamp;
private Map<String, Object> fields = new HashMap<>();
private Map<String, String> tags = new HashMap<>();
}
5.ScriptEngine
import org.apache.flink.cep.pattern.Pattern;
import javax.script.Bindings;
import javax.script.Invocable;
import javax.script.ScriptEngineManager;
import javax.script.ScriptException;
import java.util.Date;
public class ScriptEngine {
public static Pattern getPattern(String text,String name) throws ScriptException, NoSuchMethodException {
ScriptEngineManager factory = new ScriptEngineManager();
javax.script.ScriptEngine engine = factory.getEngineByName("groovy");
System.out.println(engine.toString());
assert engine != null;
engine.eval(text);
Pattern pattern = (Pattern)((Invocable)engine).invokeFunction(name);
return pattern;
}
}
6.表达式相关
GetTagMapFunction
public class GetTagMapFunction extends AbstractFunction {
@Override
public String getName() {
return "getF";
}
@Override
public AviatorDouble call(Map<String, Object> env, AviatorObject args1, AviatorObject args2) {
Map<String, Object> map = (Map<String, Object>) FunctionUtils.getJavaObject(args1, env);
String field = FunctionUtils.getStringValue(args2, env);
return new AviatorDouble((Double) map.get(field));
}
}
GetFieldMapFunction
public class GetFieldMapFunction extends AbstractFunction {
@Override
public String getName() {
return "getT";
}
@Override
public AviatorString call(Map<String, Object> env, AviatorObject args1, AviatorObject args2) {
Map<String, String> map = (Map<String, String>) FunctionUtils.getJavaObject(args1, env);
String field = FunctionUtils.getStringValue(args2, env);
return new AviatorString(map.get(field));
}
}
7.表达式Condition,注册表达式
public class AviatorCondition extends SimpleCondition<MetricEvent> implements Serializable {
private static Logger logger = LoggerFactory.getLogger(AviatorCondition.class);
private String script;
static {
AviatorEvaluator.addFunction(new GetFieldMapFunction());
AviatorEvaluator.addFunction(new GetTagMapFunction());
}
public AviatorCondition(String script) {
this.script = script;
}
@Override
public boolean filter(MetricEvent event) throws Exception {
Map<String, Object> env = new HashMap<String, Object>();
env.put("event", event);
env.put("fields", event.getFields());
env.put("tags", event.getTags());
Boolean result = false;
try {
result = (Boolean) AviatorEvaluator.execute(script, env);
} catch (Exception e) {
logger.error("execute script with event error,script:{},event:{},error;{}", script, event, e);
}
return result;
}
}
8.依赖
<dependency>
<groupId>com.googlecode.aviator</groupId>
<artifactId>aviator</artifactId>
<version>2.3.3</version>
</dependency>
<dependency>
<groupId>org.codehaus.groovy</groupId>
<artifactId>groovy</artifactId>
<version>2.4.7</version>
</dependency>
<dependency>
<groupId>org.codehaus.groovy</groupId>
<artifactId>groovy-jsr223</artifactId>
<version>2.4.7</version>
</dependency>
<dependency>
<groupId>org.codehaus.groovy</groupId>
<artifactId>groovy</artifactId>
<version>2.4.7</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.51</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep-scala_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.34</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>26.0-jre</version>
</dependency>
网友评论