美文网首页数据仓库Flink专题
flink1.9 使用LookupableTableSource

flink1.9 使用LookupableTableSource

作者: todd5167 | 来源:发表于2019-11-24 16:22 被阅读0次

    LookupableTableSource

    LookupableTableSource 是Flink新增的接口,支持通过一个表中的某几列进行数据查找访问。当流数据通过维表关联进行字段补齐时,该接口是非常实用的。目前只在Blink planner支持该接口,并且在未来版本中可能会发生改变。

    LookupableTableSource 接口实现以后需要使用temporal table join syntax语法来实现维表JOIN,也就是JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r语法,calcite目前已经支持解析该语法。

    LookupableTableSource是TableSource的子类,getLookupFunction将lookupKeys对应的value值,传递给TableFunction,该TableFunction会同步加载数据。getAsyncLookupFunction则通过异步的方式加载数据,如果使用异步加载的方式需要将isAsyncEnabled设置为true,那么flink会使用异步加载机制来执行自定义的AsyncTableFunction。

    public interface LookupableTableSource<T> extends TableSource<T> {
        TableFunction<T> getLookupFunction(String[] lookupKeys);
        AsyncTableFunction<T> getAsyncLookupFunction(String[] lookupKeys);
        boolean isAsyncEnabled();
    }
    

    异步维表读取Mysql案例

    接下来我们通过实现LookupableTableSourceAsyncTableFunction接口,完成从serversocket读取数据,同时异步读取Mysql维表数据,实现异步维表JOIN,最后将查询的结果打印输出。

    • 构建MysqlAsyncLookupFunction,使用vertx从mysql异步加载数据
    import cn.todd.flink.jdbc.JDBCParse;
    import io.vertx.core.Vertx;
    import io.vertx.core.VertxOptions;
    import io.vertx.core.json.JsonArray;
    import io.vertx.core.json.JsonObject;
    import io.vertx.ext.jdbc.JDBCClient;
    import io.vertx.ext.sql.SQLConnection;
    import org.apache.flink.api.common.typeinfo.TypeInformation;
    import org.apache.flink.api.java.typeutils.RowTypeInfo;
    import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
    import org.apache.flink.table.functions.AsyncTableFunction;
    import org.apache.flink.table.functions.FunctionContext;
    import org.apache.flink.table.functions.FunctionRequirement;
    import org.apache.flink.types.Row;
    
    import java.sql.Date;
    import java.sql.Timestamp;
    import java.text.ParseException;
    import java.text.SimpleDateFormat;
    import java.time.LocalDateTime;
    import java.util.Arrays;
    import java.util.Collection;
    import java.util.List;
    import java.util.Optional;
    import java.util.Set;
    import java.util.concurrent.CompletableFuture;
    import java.util.stream.Collectors;
    
    
    public class MysqlAsyncLookupFunction extends AsyncTableFunction<Row> {
    
        private final static String MYSQL_DRIVER = "com.mysql.jdbc.Driver";
        private final static String URL = "jdbc:mysql://127.0.0.1:3306/mqtest?charset=utf8";
    
        private JDBCClient jdbcClient = null;
        private final String[] fieldNames;
        private final String[] connectionField;
        private final TypeInformation[] fieldTypes;
    
        public MysqlAsyncLookupFunction(String[] fieldNames, String[] connectionField, TypeInformation[] fieldTypes) {
            this.fieldNames = fieldNames;
            this.fieldTypes = fieldTypes;
            this.connectionField = connectionField;
        }
    
        /**
         *   根据传递的keys异步查询维表数据
         * @param resultFuture
         * @param keys   源表某些字段的值,通常用来做数据筛选时使用
         */
        public void eval(CompletableFuture<Collection<Row>> resultFuture, Object... keys) {
            JsonArray inputParams = new JsonArray();
            Arrays.asList(keys).forEach(inputParams::add);
    
            jdbcClient.getConnection(conn -> {
                if (conn.failed()) {
                    resultFuture.completeExceptionally(conn.cause());
                    return;
                }
                final SQLConnection connection = conn.result();
                String sqlCondition = getSelectFromStatement("sidetest", fieldNames, connectionField);
                // vertx异步查询
                connection.queryWithParams(sqlCondition, inputParams, rs -> {
                    if (rs.failed()) {
                        resultFuture.completeExceptionally(rs.cause());
                        return;
                    }
    
                    int resultSize = rs.result().getResults().size();
                    if (resultSize > 0) {
                        List<Row> rowList = Lists.newArrayList();
                        for (JsonArray line : rs.result().getResults()) {
                            Row row = buildRow(line);
                            rowList.add(row);
                        }
                        resultFuture.complete(rowList);
                    } else {
                        resultFuture.complete(Collections.emptyList());
                    }
    
                    // and close the connection
                    connection.close(done -> {
                        if (done.failed()) {
                            throw new RuntimeException(done.cause());
                        }
                    });
                });
            });
        }
    
        private Row buildRow(JsonArray line) {
            Row row = new Row(fieldNames.length);
            for (int i = 0; i < fieldNames.length; i++) {
                row.setField(i, line.getValue(i));
            }
            return row;
        }
    
        //  数据返回类型
        @Override
        public TypeInformation<Row> getResultType() {
            return  new RowTypeInfo(fieldTypes, fieldNames);
        }
        
        @Override
        public void open(FunctionContext context) throws Exception {
            // 使用vertx来实现异步jdbc查询
            JsonObject mysqlClientConfig = new JsonObject();
            mysqlClientConfig.put("url", URL)
                    .put("driver_class", MYSQL_DRIVER)
                    .put("user", "xxx")
                    .put("password", "xxx");
            System.setProperty("vertx.disableFileCPResolving", "true");
    
            VertxOptions vo = new VertxOptions();
            vo.setFileResolverCachingEnabled(false);
            vo.setWarningExceptionTime(60000);
            vo.setMaxEventLoopExecuteTime(60000);
            Vertx vertx = Vertx.vertx(vo);
            jdbcClient = JDBCClient.createNonShared(vertx, mysqlClientConfig);
        }
    
        public static String quoteIdentifier(String identifier) {
            return "`" + identifier + "`";
        }
        //  构建查询维表使用的sql
        public static String getSelectFromStatement(String tableName, String[] selectFields, String[] conditionFields) {
            String fromClause = Arrays.stream(selectFields).map(JDBCParse::quoteIdentifier).collect(Collectors.joining(", "));
            String whereClause = Arrays.stream(conditionFields).map(f -> quoteIdentifier(f) + "=? ").collect(Collectors.joining(", "));
            String sqlStatement = "SELECT " + fromClause + " FROM " + quoteIdentifier(tableName) + (conditionFields.length > 0 ? " WHERE " + whereClause : "");
            return sqlStatement;
        }
    
    
        @Override
        public void close() throws Exception {
            jdbcClient.close();
        }
    
        @Override
        public String toString() {
            return super.toString();
        }
    
        @Override
        public Set<FunctionRequirement> getRequirements() {
            return null;
        }
    
        @Override
        public boolean isDeterministic() {
            return false;
        }
        //  属性构建
        public static final class Builder {
            // 查询维表中的字段
            private String[] fieldNames;
            // 查询条件,where中的条件
            private String[] connectionField;
            // 维表数据返回的类型
            private TypeInformation[] fieldTypes;
    
            private Builder() {
            }
    
            public static Builder getBuilder() {
                return new Builder();
            }
    
            public Builder withFieldNames(String[] fieldNames) {
                this.fieldNames = fieldNames;
                return this;
            }
    
            public Builder withConnectionField(String[] connectionField) {
                this.connectionField = connectionField;
                return this;
            }
    
            public Builder withFieldTypes(TypeInformation[] fieldTypes) {
                this.fieldTypes = fieldTypes;
                return this;
            }
    
            public MysqlAsyncLookupFunction build() {
                return new MysqlAsyncLookupFunction(fieldNames, connectionField, fieldTypes);
            }
        }
    
    }
    
    
    • 创建MysqlAsyncLookupTableSource传递AsyncTableFunction
    import org.apache.flink.api.common.typeinfo.TypeInformation;
    import org.apache.flink.api.java.typeutils.RowTypeInfo;
    import org.apache.flink.table.api.TableSchema;
    import org.apache.flink.table.functions.AsyncTableFunction;
    import org.apache.flink.table.functions.TableFunction;
    import org.apache.flink.table.sources.LookupableTableSource;
    import org.apache.flink.table.types.DataType;
    import org.apache.flink.table.types.utils.TypeConversions;
    import org.apache.flink.types.Row;
    
    
    public class MysqlAsyncLookupTableSource implements LookupableTableSource<Row> {
        private final String[] fieldNames;
        private final String[] connectionField;
        private final TypeInformation[] fieldTypes;
    
        public MysqlAsyncLookupTableSource(String[] fieldNames, String[] connectionField, TypeInformation[] fieldTypes) {
            this.fieldNames = fieldNames;
            this.fieldTypes = fieldTypes;
            this.connectionField = connectionField;
        }
        
        @Override
        public TableFunction<Row> getLookupFunction(String[] lookupKeys) {
            return null;
        }
    
        //  使用AsyncTableFunction,加载维表数据
        @Override
        public AsyncTableFunction<Row> getAsyncLookupFunction(String[] lookupKeys) {
            return MysqlAsyncLookupFunction.Builder.getBuilder()
                    .withFieldNames(fieldNames)
                    .withFieldTypes(fieldTypes)
                    .withConnectionField(connectionField)
                    .build();
        }
    
        @Override
        public boolean isAsyncEnabled() {
            return true;
        }
        // 读取的数据类型
        @Override
        public DataType getProducedDataType() {
            // 旧版本的Typeinfo类型转新版本的DataType
            return TypeConversions.fromLegacyInfoToDataType(new RowTypeInfo(fieldTypes, fieldNames));
        }
    
        @Override
        public TableSchema getTableSchema() {
            return TableSchema.builder()
                    .fields(fieldNames, TypeConversions.fromLegacyInfoToDataType(fieldTypes))
                    .build();
        }
    
        public static final class Builder {
            private String[] fieldNames;
            private String[] connectionField;
            private TypeInformation[] fieldTypes;
    
            private Builder() {
            }
    
            public static Builder newBuilder() {
                return new Builder();
            }
    
            public Builder withFieldNames(String[] fieldNames) {
                this.fieldNames = fieldNames;
                return this;
            }
    
            public Builder withFieldTypes(TypeInformation[] fieldTypes) {
                this.fieldTypes = fieldTypes;
                return this;
            }
    
            public Builder withConnectionField(String[] connectionField) {
                this.connectionField = connectionField;
                return this;
            }
    
            public MysqlAsyncLookupTableSource build() {
                return new MysqlAsyncLookupTableSource(fieldNames,connectionField, fieldTypes);
            }
        }
    }
    
    • 创建CustomerSocketTextStreamFunction从serverSocket读取文本并转换为JSON
    import org.apache.flink.api.common.typeinfo.TypeInformation;
    import org.apache.flink.api.java.typeutils.RowTypeInfo;
    import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
    import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
    import org.apache.flink.streaming.api.functions.source.SourceFunction;
    import org.apache.flink.types.Row;
    import org.apache.flink.util.IOUtils;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.net.InetSocketAddress;
    import java.net.Socket;
    import java.util.Iterator;
    
    
    public class CustomerSocketTextStreamFunction implements SourceFunction<Row> {
        private static final Logger LOG = LoggerFactory.getLogger(CustomerSocketTextStreamFunction.class);
    
        /**
         * Default delay between successive connection attempts.
         */
        private static final int DEFAULT_CONNECTION_RETRY_SLEEP = 2000;
    
        /**
         * Default connection timeout when connecting to the server socket (infinite).
         */
        private static final int CONNECTION_TIMEOUT_TIME = 0;
    
        private final ObjectMapper objectMapper = new ObjectMapper();
    
        /**
         * Type information describing the result type.
         */
        private final TypeInformation<Row> typeInfo;
    
        /**
         * Field names to parse. Indices match fieldTypes indices.
         */
        private final String[] fieldNames;
    
        /**
         * Types to parse fields as. Indices match fieldNames indices.
         */
        private final TypeInformation<?>[] fieldTypes;
    
        private volatile boolean isRunning = true;
    
        private transient Socket currentSocket;
    
        ServersocketSourceTableInfo tableInfo;
    
        public CustomerSocketTextStreamFunction(ServersocketSourceTableInfo tableInfo, TypeInformation<Row> typeInfo) {
            this.typeInfo = typeInfo;
    
            this.fieldNames = ((RowTypeInfo) typeInfo).getFieldNames();
    
            this.fieldTypes = ((RowTypeInfo) typeInfo).getFieldTypes();
    
            this.tableInfo = tableInfo;
        }
    
        @Override
        public void run(SourceContext<Row> ctx) throws Exception {
            final StringBuilder buffer = new StringBuilder();
            long attempt = 0;
    
            while (isRunning) {
                try {
                    Socket socket = new Socket();
                    currentSocket = socket;
                    socket.connect(new InetSocketAddress(tableInfo.getHostname(), tableInfo.getPort()), CONNECTION_TIMEOUT_TIME);
    
                    BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                    char[] cbuf = new char[8192];
                    int bytesRead;
                    while (isRunning && (bytesRead = reader.read(cbuf)) != -1) {
                        buffer.append(cbuf, 0, bytesRead);
                        int delimPos;
                        String delimiter = tableInfo.getDelimiter();
                        while (buffer.length() >= delimiter.length() && (delimPos = buffer.indexOf(delimiter)) != -1) {
                            String record = buffer.substring(0, delimPos);
                            // truncate trailing carriage return
                            if (delimiter.equals("\n") && record.endsWith("\r")) {
                                record = record.substring(0, record.length() - 1);
                            }
                            ctx.collect(convertToRow(record));
                            buffer.delete(0, delimPos + delimiter.length());
                        }
                    }
                } catch (Exception e) {
                    LOG.info("Connection server failed, Please check configuration  !!!!!!!!!!!!!!!!");
                }
    
    
                // if we dropped out of this loop due to an EOF, sleep and retry
                if (isRunning) {
                    attempt++;
                    if (tableInfo.getMaxNumRetries() == -1 || attempt < tableInfo.getMaxNumRetries()) {
                        Thread.sleep(DEFAULT_CONNECTION_RETRY_SLEEP);
                    } else {
                        // this should probably be here, but some examples expect simple exists of the stream source
                        // throw new EOFException("Reached end of stream and reconnects are not enabled.");
                        break;
                    }
                }
            }
    
            // collect trailing data
            if (buffer.length() > 0) {
                ctx.collect(convertToRow(buffer.toString()));
            }
        }
    
        public Row convertToRow(String record) throws IOException {
            JsonNode root = objectMapper.readTree(record);
            Row row = new Row(fieldNames.length);
            for (int i = 0; i < fieldNames.length; i++) {
                JsonNode node = getIgnoreCase(root, fieldNames[i]);
                if (node == null) {
                    row.setField(i, null);
                } else {
                    // Read the value as specified type
                    Object value = objectMapper.treeToValue(node, fieldTypes[i].getTypeClass());
                    row.setField(i, value);
                }
            }
            return row;
        }
    
    
        @Override
        public void cancel() {
            isRunning = false;
    
            // we need to close the socket as well, because the Thread.interrupt() function will
            // not wake the thread in the socketStream.read() method when blocked.
            Socket theSocket = this.currentSocket;
            if (theSocket != null) {
                IOUtils.closeSocket(theSocket);
            }
        }
    
        public JsonNode getIgnoreCase(JsonNode jsonNode, String key) {
            Iterator<String> iter = jsonNode.fieldNames();
            while (iter.hasNext()) {
                String key1 = iter.next();
                if (key1.equalsIgnoreCase(key)) {
                    return jsonNode.get(key1);
                }
            }
            return null;
        }
    }
    
    
    • Serversocket实体类
    import java.io.Serializable;
    
    public class ServersocketSourceTableInfo implements Serializable {
    
        public ServersocketSourceTableInfo() { }
    
        public ServersocketSourceTableInfo(String hostname, int port, String delimiter, long maxNumRetries) {
            this.hostname = hostname;
            this.port = port;
            this.delimiter = delimiter;
            this.maxNumRetries = maxNumRetries;
        }
    
        private String hostname;
    
        private int port;
    
        private String delimiter;
    
        private long maxNumRetries;
    
        public String getHostname() {
            return hostname;
        }
    
        public void setHostname(String hostname) {
            this.hostname = hostname;
        }
    
        public int getPort() {
            return port;
        }
    
        public void setPort(int port) {
            this.port = port;
        }
    
        public String getDelimiter() {
            return delimiter;
        }
    
        public void setDelimiter(String delimiter) {
            this.delimiter = delimiter;
        }
    
        public long getMaxNumRetries() {
            return maxNumRetries;
        }
    
        public void setMaxNumRetries(long maxNumRetries) {
            this.maxNumRetries = maxNumRetries;
        }
    
        @Override
        public String toString() {
            return "ServersocketSourceTableInfo{" +
                    "hostname='" + hostname + '\'' +
                    ", port=" + port +
                    ", delimiter='" + delimiter + '\'' +
                    ", maxNumRetries=" + maxNumRetries +
                    '}';
        }
    }
    
    
    • 维表关联执行入口
    import cn.todd.flink.join.side.MysqlAsyncLookupTableSource;
    import cn.todd.flink.join.source.CustomerSocketTextStreamFunction;
    import cn.todd.flink.join.source.ServersocketSourceTableInfo;
    import org.apache.flink.api.common.typeinfo.TypeInformation;
    import org.apache.flink.api.common.typeinfo.Types;
    import org.apache.flink.api.java.typeutils.RowTypeInfo;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.EnvironmentSettings;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.java.StreamTableEnvironment;
    import org.apache.flink.table.sources.StreamTableSource;
    import org.apache.flink.types.Row;
    
    
    public class SideTabelJoin {
    
        public static void main(String[] args) throws Exception {
            // use blink and streammode
            EnvironmentSettings settings = EnvironmentSettings.newInstance()
                    .useBlinkPlanner()
                    .inStreamingMode()
                    .build();
    
            StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
            StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings);
    
            RowTypeInfo typeInformation = buildRowTypeInfo();
            CustomerSocketTextStreamFunction sourceFunction = buildCustomerSocketTextStreamFunction(typeInformation);
            String tableName = "user_visit";
    
            DataStreamSource serversocketSource = streamEnv.addSource(sourceFunction, tableName, typeInformation);
            // 源表
            //serversocketSource.print();
            tableEnv.registerDataStream(tableName, serversocketSource, String.join(",", typeInformation.getFieldNames()) + ",proctime.proctime");
    
            // 维表
            MysqlAsyncLookupTableSource tableSource = MysqlAsyncLookupTableSource.Builder.newBuilder()
                    .withFieldNames(new String[]{"a", "b", "c", "d"})
                    .withFieldTypes(new TypeInformation[]{Types.STRING, Types.LONG, Types.STRING, Types.STRING})
                    .withConnectionField(new String[]{"a"})
                    .build();
    
            tableEnv.registerTableSource("sideTable", tableSource);
    
            Table table = tableEnv.sqlQuery("select t1.visitCount,t1.name , t1.proctime,s1.b ,s1.c from user_visit as t1 join sideTable " +
                    " FOR SYSTEM_TIME AS OF t1.proctime s1 on t1.name =s1.a ");
            // 查询的结果直接打印
            DataStream<Row> rowDataStream = tableEnv.toAppendStream(table, Row.class);
            rowDataStream.print();
            
            streamEnv.execute();
    
        }
    
        private static RowTypeInfo buildRowTypeInfo() {
            TypeInformation[] types = new TypeInformation[]{Types.STRING, Types.STRING, Types.LONG};
            String[] fields = new String[]{"id", "name", "visitCount"};
            return new RowTypeInfo(types, fields);
        }
    
        private static CustomerSocketTextStreamFunction buildCustomerSocketTextStreamFunction(RowTypeInfo typeInformation) {
            ServersocketSourceTableInfo tableInfo = new ServersocketSourceTableInfo("127.0.0.1", 9900, "\n", 3);
            return new CustomerSocketTextStreamFunction(tableInfo, typeInformation);
        }
    
    • 测试
    1. 源表数据
    nc -l 9900
    {"name": "xc","visitCount": 88,"id": "1001"}
    
    1. 维表只有a,b,c,d四个字段数据
    xc  10  1   xc
    
    1. 打印
    4> 88,xc,2019-11-24T06:57:57.455,10,1
    

    维表缓存

    使用异步访问函数,提高了程序的吞吐量,不需要每条记录访问返回数据后,才去处理下一条记录。在实际生产环境下,通常会使用缓存机制避免频繁读取维表,在维表关联之前使用Keyby操作从而提高缓存命中率。

    StreamSQL实现维表关联两种方式

    • UDTF
      LookupFunction机制使用的就是UDTF的方式,底层实际也是JOIN该UDTF,类似LATERA JOIN语法。
      如果没有LookupFunction机制,而又想通过标准SQL实现维表JOIN,则可以在上一层使用sql解析工具(例如calcite)做一次转换。如:
    // Side的连接以及字段信息已知
    SELECT
      so.*, s1.a,s1.b
    FROM
      Source so 
    JOIN 
      Side s1
    ON 
      so.id = s1.c
    // 解析JOIN条件,从Side中加载并注册UDTF,并重新拼接SQL,交给flinksql执行
    SELECT
      so.*, s1.a,s1.b
    FROM
      Source so 
    LATERAL TABLE (MysqlAsyncFun(id)) AS s1;
    
    
    • flatMap or asyncInvoke算子
      该方式是将源表的数据打到asyncInvoke算子,由该算子异步加载数据,将源表和维表组成一个宽表后输出并注册为一个新表,并交由flink执行SQL语句,详细可以参考开源项目flinkStreamSQL。其内部sql从转换到执行实例如下:
    // 用户sql
    SELECT
      so.id,so.name,s1.city,s1.address
    FROM 
        Source so
    JOIN
        Side s1
    ON
        so.sid=s1.id
    // 1.向flink注册源表
    // 2.将源表数据到asyncInvoke,输出宽表
    // 3. 注册宽表、替换表名、执行最终的SQL语句
    SELECT
      so_s1.id,so_s1.name,so_s1.city,so_s1.address
    FROM 
        Source_Side so_s1
    

    这种方式其实有三个弊端:1.DataStream到Table来回切换不方便。2. 将两个表合并为一个宽表时,如果两个表包含相同属性名称的话需要做区分。3. select语句中的表名要做替换,考虑的情况比较多。不过这种方式思路比较清晰、可扩展性比较高。

    参考文章:
    Flink 实时计算 - 维表 Join 的实现
    flink1.9新特性:维表JOIN解读
    User-defined Sources & Sinks

    相关文章

      网友评论

        本文标题:flink1.9 使用LookupableTableSource

        本文链接:https://www.haomeiwen.com/subject/lkelwctx.html