美文网首页
flink1.12.1扩展flink-sql 支持写入到sqls

flink1.12.1扩展flink-sql 支持写入到sqls

作者: 百岁叶 | 来源:发表于2021-04-21 20:17 被阅读0次

目前业务上有同步数据到sqlServer的需求,但是flink1.12.1版本的JdbcDialects不支持SqlServerDialect,
科学上网后发现袋鼠云的flinkStreamSql已经有支持sqlserver,那就开始动手,参考实现一波

1、参考MySQLDialect及flinkStreamSql 实现SqlServerDialect

主要实现getUpsertStatement的方法,本来以为能直接copy一波flinkStreamSql 的实现,结果发现
报错 SQL statement must not contain ? character.

查看源码发现, flink在构建mysql的Statement,是先把需要替换的字段前面拼接了 : fieldNames,然后在org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl类的parseNamedStatement 替换成 ?号, 既然如此,就针对了buildDualQueryStatement进行修改
完整的SqlServerDialect文件

package org.apache.flink.connector.jdbc.dialect;

/**
 * SqlServerDialect
 *
 * @author zhanjian@pcuuu.com
 * @date 2021/4/20 10:13
 */
public class SqlServerDialect extends AbstractDialect {

    private static final long serialVersionUID = 1L;

    // Define MAX/MIN precision of TIMESTAMP type according to Mysql docs:
    // https://dev.mysql.com/doc/refman/8.0/en/fractional-seconds.html
    private static final int MAX_TIMESTAMP_PRECISION = 6;
    private static final int MIN_TIMESTAMP_PRECISION = 1;

    // Define MAX/MIN precision of DECIMAL type according to Mysql docs:
    // https://dev.mysql.com/doc/refman/8.0/en/fixed-point-types.html
    private static final int MAX_DECIMAL_PRECISION = 65;
    private static final int MIN_DECIMAL_PRECISION = 1;

    @Override
    public String dialectName() {
        return "SqlServer";
    }

    @Override
    public boolean canHandle(String url) {
        return url.startsWith("jdbc:jtds:");
    }

    @Override
    public JdbcRowConverter getRowConverter(RowType rowType) {
        return new SqlServerConverter(rowType);
    }

    @Override
    public Optional<String> defaultDriverName() {
        return Optional.of("net.sourceforge.jtds.jdbc.Driver");
    }

    @Override
    public String quoteIdentifier(String identifier) {
        return identifier;
    }

    @Override
    public Optional<String> getUpsertStatement(
            String tableName, String[] fieldNames, String[] uniqueKeyFields) {

        StringBuilder sb = new StringBuilder();
        sb.append(
                "MERGE INTO "
                        + tableName
                        + " T1 USING "
                        + "("
                        + buildDualQueryStatement(fieldNames)
                        + ") T2 ON ("
                        + buildConnectionConditions(uniqueKeyFields)
                        + ") ");

        String updateSql = buildUpdateConnection(fieldNames, uniqueKeyFields, true);

        if (StringUtils.isNotEmpty(updateSql)) {
            sb.append(" WHEN MATCHED THEN UPDATE SET ");
            sb.append(updateSql);
        }

        sb.append(
                " WHEN NOT MATCHED THEN "
                        + "INSERT ("
                        + Arrays.stream(fieldNames)
                                .map(this::quoteIdentifier)
                                .collect(Collectors.joining(","))
                        + ") VALUES ("
                        + Arrays.stream(fieldNames)
                                .map(col -> "T2." + quoteIdentifier(col))
                                .collect(Collectors.joining(","))
                        + ")");
        sb.append(";");
        return Optional.of(sb.toString());
    }

    /**
     * build T1."A"=T2."A" or T1."A"=nvl(T2."A",T1."A")
     *
     * @param fieldNames
     * @param uniqueKeyFields
     * @param allReplace
     * @return
     */
    private String buildUpdateConnection(
            String[] fieldNames, String[] uniqueKeyFields, boolean allReplace) {
        List<String> uniqueKeyList = Arrays.asList(uniqueKeyFields);
        return Arrays.stream(fieldNames)
                .filter(col -> !uniqueKeyList.contains(col))
                .map(
                        col -> {
                            return allReplace
                                    ? quoteIdentifier("T1")
                                            + "."
                                            + quoteIdentifier(col)
                                            + " ="
                                            + quoteIdentifier("T2")
                                            + "."
                                            + quoteIdentifier(col)
                                    : quoteIdentifier("T1")
                                            + "."
                                            + quoteIdentifier(col)
                                            + " =ISNULL("
                                            + quoteIdentifier("T2")
                                            + "."
                                            + quoteIdentifier(col)
                                            + ","
                                            + quoteIdentifier("T1")
                                            + "."
                                            + quoteIdentifier(col)
                                            + ")";
                        })
                .collect(Collectors.joining(","));
    }

    private String buildConnectionConditions(String[] uniqueKeyFields) {
        return Arrays.stream(uniqueKeyFields)
                .map(col -> "T1." + quoteIdentifier(col) + "=T2." + quoteIdentifier(col))
                .collect(Collectors.joining(","));
    }

    /**
     * build select sql , such as (SELECT ? "A",? "B" FROM DUAL)
     *
     * @param column destination column
     * @return
     */
    public String buildDualQueryStatement(String[] column) {
        StringBuilder sb = new StringBuilder("SELECT ");
        String collect =
                Arrays.stream(column)
                        .map(col -> ":" + quoteIdentifier(col) + " " + quoteIdentifier(col))
                        .collect(Collectors.joining(", "));
        sb.append(collect);
        return sb.toString();
    }

    @Override
    public int maxDecimalPrecision() {
        return MAX_DECIMAL_PRECISION;
    }

    @Override
    public int minDecimalPrecision() {
        return MIN_DECIMAL_PRECISION;
    }

    @Override
    public int maxTimestampPrecision() {
        return MAX_TIMESTAMP_PRECISION;
    }

    @Override
    public int minTimestampPrecision() {
        return MIN_TIMESTAMP_PRECISION;
    }

    @Override
    public List<LogicalTypeRoot> unsupportedTypes() {
        // The data types used in Mysql are list at:
        // https://dev.mysql.com/doc/refman/8.0/en/data-types.html

        // TODO: We can't convert BINARY data type to
        //  PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO in
        // LegacyTypeInfoDataTypeConverter.
        return Arrays.asList(
                LogicalTypeRoot.BINARY,
                LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE,
                LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE,
                LogicalTypeRoot.INTERVAL_YEAR_MONTH,
                LogicalTypeRoot.INTERVAL_DAY_TIME,
                LogicalTypeRoot.ARRAY,
                LogicalTypeRoot.MULTISET,
                LogicalTypeRoot.MAP,
                LogicalTypeRoot.ROW,
                LogicalTypeRoot.DISTINCT_TYPE,
                LogicalTypeRoot.STRUCTURED_TYPE,
                LogicalTypeRoot.NULL,
                LogicalTypeRoot.RAW,
                LogicalTypeRoot.SYMBOL,
                LogicalTypeRoot.UNRESOLVED);
    }

2、另外参考MySQLRowConverter另外实现一个SqlServerConverter

最后替换原有的flink-jar包后,就可以用类似flink定义mysql的ddl进行定义表了
注意url写法为:jdbc:jtds:sqlserver://xxx:1433;databaseName=master;

[flinkStreamSQL链接] https://github.com/DTStack/flinkStreamSQL/blob/1.11_release/sqlserver/sqlserver-sink/src/main/java/com/dtstack/flink/sql/sink/sqlserver/SqlserverDialect.java

相关文章

网友评论

      本文标题:flink1.12.1扩展flink-sql 支持写入到sqls

      本文链接:https://www.haomeiwen.com/subject/buozlltx.html