美文网首页
flink1.9.1 之 sql 解析流中的数组

flink1.9.1 之 sql 解析流中的数组

作者: 犹豫的猫抓板 | 来源:发表于2019-12-15 22:19 被阅读0次

输入

{
  "a": 1,
  "b": [
    {
      "name": "name1",
      "age": 11
    },
    {
      "name": "name2",
      "age": 12
    },
    {
      "name": "name3",
      "age": 13
    }
  ]
}

期望输出

1 name1 11
1 name2 12
1 name3 13

知识点

unnest

--Unnesting WITH ORDINALITY is not supported yet.

SELECT users, tag
FROM Orders CROSS JOIN UNNEST(tags) AS t (tag)

demo

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.xp</groupId>
    <artifactId>test-flink</artifactId>
    <version>1.9.1</version>

    <properties>
        <scala.binary.version>2.11</scala.binary.version>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
            <version>${project.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
            <version>${project.version}</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.0</version>
                <configuration>
                    <source>${java.version}</source>
                    <target>${java.version}</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>
public class TestUnnested {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(environment);


        List<Row> rows = Arrays.asList(
                Row.of(1, new Row[]{Row.of(12, "sd"), Row.of(15, "sd")}),
                Row.of(2, new Row[]{Row.of(13, "sd"), Row.of(16, "sd")}),
                Row.of(3, new Row[]{Row.of(14, "sd"), Row.of(17, "sd")})
        );

        TypeInformation<?>[] types = new TypeInformation[]{Types.INT, Types.OBJECT_ARRAY(Types.ROW(Types.INT,Types.STRING))};
//        TypeInformation<?>[] types = new TypeInformation[]{Types.INT, ObjectArrayTypeInfo.getInfoFor(new RowTypeInfo(Types.INT, Types.STRING))};
        String[] typeNames = new String[]{"a", "b"};

        DataStream<Row> source = environment
                .fromCollection(rows)
                .returns(new RowTypeInfo(types, typeNames));

        tableEnvironment.registerDataStream("source", source);

        Table a = tableEnvironment.sqlQuery("select a,t.c,t.d from source,unnest(b) as t (c,d)");

        tableEnvironment.toAppendStream(a, Row.class).print();

        environment.execute();
    }
}

相关文章

网友评论

      本文标题:flink1.9.1 之 sql 解析流中的数组

      本文链接:https://www.haomeiwen.com/subject/bpmlnctx.html