美文网首页
flink demo坏境搭建

flink demo坏境搭建

作者: 百岁叶 | 来源:发表于2020-04-20 11:39 被阅读0次

    #flink demo开发坏境搭建
    flink 入门idea的demo,Flink 提供了比较方便的创建 Flink 工程的方法
    1、windows坏境,打开git bash

    curl https://flink.apache.org/q/quickstart.sh | bash -s 1.10.0
    
    图片.png

    2、batch demo

    public class BatchJob {
    
        public static void main(String[] args) throws Exception {
            // set up the batch execution environment
            final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    
            DataSource<String> text = env.fromElements(
                    "Flink flink flink ",
                    "spark spark spark",
                    "Spark Spark Spark");
    
            AggregateOperator<Tuple2<String, Integer>> sum = text.flatMap(new LineSplitter())
                    .groupBy(0)
                    .sum(1);
    
            sum.print();
    
    
            //env.execute("Flink Batch Java API Skeleton");
    
            /*
             * Here, you can start creating your execution plan for Flink.
             *
             * Start with getting some data from the environment, like
             *  env.readTextFile(textPath);
             *
             * then, transform the resulting DataSet<String> using operations
             * like
             *  .filter()
             *  .flatMap()
             *  .join()
             *  .coGroup()
             *
             * and many more.
             * Have a look at the programming guide for the Java API:
             *
             * https://flink.apache.org/docs/latest/apis/batch/index.html
             *
             * and the examples
             *
             * https://flink.apache.org/docs/latest/apis/batch/examples.html
             *
             */
    
            // execute program
    
        }
    
    
        public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String,Integer>>{
    
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] split = s.toLowerCase().split("\\W+");
    
                for (String tokoen: split){
                    collector.collect(new Tuple2<String, Integer>(tokoen,1));
                }
            }
    
        };
    }
    
    

    3 table sql demo

    public class SqlDemo {
    
    
        public static void main(String[] args) throws Exception {
    
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    
            BatchTableEnvironment batchTableEnvironment = BatchTableEnvironment.create(env);
    
            String word="hello to flink flink";
            String[] split = word.split("\\W+");
            ArrayList<WC> list = new ArrayList<>();
            for(String wc:split){
                list.add(new WC(wc,1L));
            }
    
            DataSource<WC> arrayListDataSource = env.fromCollection(list);
            Table table = batchTableEnvironment.fromDataSet(arrayListDataSource,"word,num");
    
            table.printSchema();
    
            batchTableEnvironment.createTemporaryView("test",table);
    
            Table table1 = batchTableEnvironment.sqlQuery("select word,sum(num) as num from test group by word");
    
            DataSet<WC> wcDataSet = batchTableEnvironment.toDataSet(table1, WC.class);
    
            wcDataSet.printToErr();
    
    
        }
    
    
        public static  class WC  {
            public String word;
            public Long num;
    
            public WC() {
            }
    
            public WC(String word, Long num) {
                this.word = word;
                this.num = num;
            }
    
    
            @Override
            public String toString() {
                return "WC{" +
                        "word='" + word + '\'' +
                        ", num=" + num +
                        '}';
            }
        }
    
    
    }
    
    

    pom文件依赖

        <dependencies>
            <!-- Apache Flink dependencies -->
            <!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-java</artifactId>
                <version>${flink.version}</version>
                <!--<scope>provided</scope>-->
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
                <!--<scope>provided</scope>-->
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-api-java-bridge_2.11</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-planner-blink_2.11</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-planner_2.11</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
                <version>${flink.version}</version>
            </dependency>
            
            <!-- Add connector dependencies here. They must be in the default scope (compile). -->
            <!-- Example:
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            -->
    
            <!-- Add logging framework, to produce console output when running in the IDE. -->
            <!-- These dependencies are excluded from the application JAR by default. -->
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
                <version>1.7.7</version>
                <scope>runtime</scope>
            </dependency>
            <dependency>
                <groupId>log4j</groupId>
                <artifactId>log4j</artifactId>
                <version>1.2.17</version>
                <scope>runtime</scope>
            </dependency>
        </dependencies>
    

    相关文章

      网友评论

          本文标题:flink demo坏境搭建

          本文链接:https://www.haomeiwen.com/subject/vqqrihtx.html