美文网首页
Flink的DataSet API求WC

Flink的DataSet API求WC

作者: 喵星人ZC | 来源:发表于2020-05-31 00:24 被阅读0次

    IDEA整体结构


    image.png

    pom

    
    <properties>
        <flink.version>1.10.0</flink.version>
        <scala.binary.version>2.11</scala.binary.version>
        <scala.version>2.11.8</scala.version>
      </properties>
    
      <dependencies>
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-scala_${scala.binary.version}</artifactId>
          <version>${flink.version}</version>
        </dependency>
    
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
          <version>${flink.version}</version>
        </dependency>
    
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
          <version>${flink.version}</version>
        </dependency>
    
        <!--Log依赖-->
        <dependency>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-log4j12</artifactId>
          <version>1.7.7</version>
          <scope>runtime</scope>
        </dependency>
        <dependency>
          <groupId>log4j</groupId>
          <artifactId>log4j</artifactId>
          <version>1.2.17</version>
          <scope>runtime</scope>
        </dependency>
    
      </dependencies>
    

    WC.txt

    hadoop  hadoop  spark
    spark   flink   MR
    MR  MR  storm   flink
    

    log4j.properties不想看无关日志就设置只打印error日志

    log4j.rootLogger=error,console
            
    log4j.appender.console=org.apache.log4j.ConsoleAppender
    log4j.appender.console.layout=org.apache.log4j.PatternLayout
    log4j.appender.console.layout.ConversionPattern=[%p]%d{yyy-MM-dd hh:mm:ss} %F - %m%n
    

    BatchJob

    package com.zc.bigdata
    
    import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
    import org.apache.flink.api.scala._
    
    object BatchJob {
      def main(args: Array[String]): Unit = {
        val env = ExecutionEnvironment.getExecutionEnvironment
        val value: DataSet[String] = env.readTextFile("data/wc.txt")
    
        val counts = value.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
          .map (WC(_, 1))
          .groupBy("word")
          .sum("cnt")
    
        counts.print()
      }
    }
    
    case class WC(word: String, cnt: Int)
    
    
    image.png

    相关文章

      网友评论

          本文标题:Flink的DataSet API求WC

          本文链接:https://www.haomeiwen.com/subject/egwkzhtx.html