美文网首页实时数据相关
flink 流表join维表

flink 流表join维表

作者: 岳过山丘 | 来源:发表于2019-01-03 15:34 被阅读0次

1.流表

 DataStream<Integer> dataStream = streamExecutionEnvironment
                .socketTextStream("localhost", 9999).map(x -> {
                    return Integer.parseInt(x);
                });

2.mysql 维表

CREATE TABLE `source5` (
  `id` int(11) NOT NULL,
  `user_name` varchar(19) DEFAULT NULL,
  `age` int(11) DEFAULT NULL,
  PRIMARY KEY (`id`)
)

 TypeInformation<?>[] fieldTypes = new TypeInformation<?>[]{
                BasicTypeInfo.INT_TYPE_INFO,
                BasicTypeInfo.STRING_TYPE_INFO,
                BasicTypeInfo.INT_TYPE_INFO

        };
        RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);

        JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
                .setDrivername("com.mysql.jdbc.Driver")
                .setDBUrl("jdbc:mysql://localhost/test1")
                .setUsername("root")
                .setPassword("123456")
                .setQuery("select *  from source5")
                .setRowTypeInfo(rowTypeInfo)
                .finish();
        StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment stableEnv = new StreamTableEnvironment(streamExecutionEnvironment, TableConfig.DEFAULT());

        DataStreamSource<Row> dataSource1 = streamExecutionEnvironment.createInput(jdbcInputFormat);

3.join

  stableEnv.registerDataStream("mysql", dataSource1, "id1,name,age");
        stableEnv.registerDataStream("stream1", dataStream, "id2");
//        stableEnv.sqlQuery("update mysql set user_name='b where id1 =410000");
        Table mysql = stableEnv.sqlQuery("select * from mysql");
        Table stream1 = stableEnv.scan("stream1");
        Table joined = stream1.join(mysql, "id1=id2");

        DataStream<Row> res = stableEnv.toAppendStream(joined, Row.class);
        res.addSink(new SinkFunction<Row>() {
            @Override
            public void invoke(Row value) throws Exception {
                logger.info(value.toString());
            }
       });
        streamExecutionEnvironment.execute("Window WordCount");

4.在终端执行 nc -lk 9999,即可输入流数据,实现流数据与mysql数据的join
5.依赖

<properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.7.0</flink.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.11</scala.binary.version>
        <maven.compiler.source>${java.version}</maven.compiler.source>
        <maven.compiler.target>${java.version}</maven.compiler.target>
    </properties>

<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-jdbc_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>

        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

       <!-- <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>

        </dependency>-->

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table_${scala.binary.version}</artifactId>
            <version>1.7.0</version>
        </dependency>

        <!-- Add connector dependencies here. They must be in the default scope (compile). -->


        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.46</version>
        </dependency>

        <!-- Add logging framework, to produce console output when running in the IDE. -->
        <!-- These dependencies are excluded from the application JAR by default. -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.7</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
            <scope>runtime</scope>
        </dependency>

相关文章

网友评论

    本文标题:flink 流表join维表

    本文链接:https://www.haomeiwen.com/subject/fiwfrqtx.html