美文网首页
flink1.9.1 之 sql 解析流中的数组

flink1.9.1 之 sql 解析流中的数组

作者: 犹豫的猫抓板 | 来源:发表于2019-12-15 22:19 被阅读0次

    输入

    {
      "a": 1,
      "b": [
        {
          "name": "name1",
          "age": 11
        },
        {
          "name": "name2",
          "age": 12
        },
        {
          "name": "name3",
          "age": 13
        }
      ]
    }
    

    期望输出

    1 name1 11
    1 name2 12
    1 name3 13
    

    知识点

    unnest

    --Unnesting WITH ORDINALITY is not supported yet.
    
    SELECT users, tag
    FROM Orders CROSS JOIN UNNEST(tags) AS t (tag)
    

    demo

    pom.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com.xp</groupId>
        <artifactId>test-flink</artifactId>
        <version>1.9.1</version>
    
        <properties>
            <scala.binary.version>2.11</scala.binary.version>
            <java.version>1.8</java.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
                <version>${project.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
                <version>${project.version}</version>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.8.0</version>
                    <configuration>
                        <source>${java.version}</source>
                        <target>${java.version}</target>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    </project>
    
    public class TestUnnested {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
            StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(environment);
    
    
            List<Row> rows = Arrays.asList(
                    Row.of(1, new Row[]{Row.of(12, "sd"), Row.of(15, "sd")}),
                    Row.of(2, new Row[]{Row.of(13, "sd"), Row.of(16, "sd")}),
                    Row.of(3, new Row[]{Row.of(14, "sd"), Row.of(17, "sd")})
            );
    
            TypeInformation<?>[] types = new TypeInformation[]{Types.INT, Types.OBJECT_ARRAY(Types.ROW(Types.INT,Types.STRING))};
    //        TypeInformation<?>[] types = new TypeInformation[]{Types.INT, ObjectArrayTypeInfo.getInfoFor(new RowTypeInfo(Types.INT, Types.STRING))};
            String[] typeNames = new String[]{"a", "b"};
    
            DataStream<Row> source = environment
                    .fromCollection(rows)
                    .returns(new RowTypeInfo(types, typeNames));
    
            tableEnvironment.registerDataStream("source", source);
    
            Table a = tableEnvironment.sqlQuery("select a,t.c,t.d from source,unnest(b) as t (c,d)");
    
            tableEnvironment.toAppendStream(a, Row.class).print();
    
            environment.execute();
        }
    }
    

    相关文章

      网友评论

          本文标题:flink1.9.1 之 sql 解析流中的数组

          本文链接:https://www.haomeiwen.com/subject/bpmlnctx.html