美文网首页
flink example:热门商品统计

flink example:热门商品统计

作者: 阿猫阿狗Hakuna | 来源:发表于2020-06-02 14:15 被阅读0次

一.数据样例

userId, itemId, categoryId, behavior, timestamp
543462,1715,1464116,pv,1511658000
470572,3760250,1299190,pv,1511658001

二.maven仓库

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>learn-flink</artifactId>
    <packaging>pom</packaging>
    <version>1.0-SNAPSHOT</version>
    <modules>
        <module>HotItemsAnalysis</module>
    </modules>

    <properties>
        <flink.version>1.7.2</flink.version>
        <scala.binary.version>2.11</scala.binary.version>
        <kafka.version>2.2.0</kafka.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_${scala.binary.version}</artifactId>
            <version>${kafka.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>
                            jar-with-dependencies
                        </descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <!-- 用于将scala代码编译成class文件 -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.4.6</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

三.代码实现

// 定义输入数据样例类
case class UserBehavior(userId: Long, itemId: Long, categoryId: Int, behavior: String, timestamp: Long)

// 定义窗口聚合结果样例类
case class ItemViewCount(itemId: Long, windowEnd: Long, count: Long)

/**
 * 热门商品统计
 */
object HotItems {
  def main(args: Array[String]): Unit = {

    //1.创建执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    //2.data source
    val dataStream = env.readTextFile("user_behavior.csv")
      .map( data => {
        val dataArray = data.split(",")
        UserBehavior(dataArray(0).trim.toLong, dataArray(1).trim.toLong, dataArray(2).trim.toInt, dataArray(3).trim, dataArray(4).trim.toLong)
      })
      .assignAscendingTimestamps(_.timestamp)

    //3.transform 处理数据
    val processedStream = dataStream
        .filter(_.behavior == "pv")
        .keyBy(_.itemId)
        .timeWindow(Time.hours(1), Time.minutes(5))
        .aggregate(new CountAgg(), new WindowResult())
        .keyBy(_.windowEnd)
        .process(new TopNHostItems(3))

    //4.sink
    dataStream.print()

    env.execute("hot items job")

  }
}

/**
 * 自定义预聚合函数
 * OUT: 就是WindowResult的IN
 */
class CountAgg() extends AggregateFunction[UserBehavior, Long, Long] {
  override def add(in: UserBehavior, acc: Long): Long = acc + 1

  override def createAccumulator(): Long = {
    0L
  }

  override def getResult(acc: Long): Long = {
    acc
  }

  override def merge(acc: Long, acc1: Long): Long = {
    acc + acc1
  }
}

/**
 * 自定义预聚合函数,计算平均数
 */
class AvgAgg() extends AggregateFunction[UserBehavior, (Long, Int), Double] {
  override def add(in: UserBehavior, acc: (Long, Int)): (Long, Int) = {
    (acc._1 + in.timestamp, acc._2 + 1)
  }

  override def createAccumulator(): (Long, Int) = {
    (0L, 0)
  }

  override def getResult(acc: (Long, Int)): Double = {
    acc._1 / acc._2
  }

  override def merge(acc: (Long, Int), acc1: (Long, Int)): (Long, Int) = {
    (acc._1 + acc1._1, acc._2 + acc1._2)
  }
}

/**
 * 自定义窗口函数,输出最终结果
 */
class WindowResult() extends WindowFunction[Long, ItemViewCount, Long, TimeWindow] {
  override def apply(key: Long, window: TimeWindow, input: Iterable[Long], out: Collector[ItemViewCount]): Unit = {

    out.collect(ItemViewCount(key, window.getEnd, input.head))
  }
}

/**
 * 自定义处理函数
 */
class TopNHostItems(topSize: Int) extends KeyedProcessFunction[Long, ItemViewCount, String] {
  private var itemState: ListState[ItemViewCount] = _

  override def open(parameters: Configuration): Unit = {
    itemState = getRuntimeContext.getListState(new ListStateDescriptor[ItemViewCount]("item-state", classOf[ItemViewCount]))
  }

  override def processElement(i: ItemViewCount, context: KeyedProcessFunction[Long, ItemViewCount, String]#Context, collector: Collector[String]): Unit = {
    //把每条数据存入状态列表
    itemState.add(i)
    //注册定时器
    context.timerService().registerEventTimeTimer(i.windowEnd + 100)
  }

  // 定时器触发时,对所有数据排序,并输出结果
  override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, ItemViewCount, String]#OnTimerContext, out: Collector[String]): Unit = {
    //将所有state中数据取出
    val allItems: ListBuffer[ItemViewCount] = new ListBuffer[ItemViewCount]

    import scala.collection.JavaConversions._
    for(item <- itemState.get()) {
      allItems += item
    }

    //按照count大小排序
    val sortedItems = allItems.sortBy(_.count)(Ordering.Long.reverse).take(topSize) //柯里化

    //清空状态
    itemState.clear()

    //输出格式化排名结果
    val result: StringBuilder = new StringBuilder
    result.append("时间: ").append(new Timestamp(timestamp - 1)).append("\n")
    //输出每个商品信息
    for(i <- sortedItems.indices) {
      val currentItem = sortedItems.get(i)
      result.append("No").append(i + 1).append(": ").append(" 商品ID=").append(currentItem.itemId).append(" 浏览量=").append(currentItem.count)
        .append("\n")
    }
    result.append("=======================")
    //控制输出频率
    Thread.sleep(1000)
    out.collect(result.toString())
  }
}

相关文章

网友评论

      本文标题:flink example:热门商品统计

      本文链接:https://www.haomeiwen.com/subject/bveuzhtx.html