美文网首页
Flink-ValueState实例

Flink-ValueState实例

作者: 卡门001 | 来源:发表于2021-12-19 22:51 被阅读0次

功能描述

当计数到达3时求平均数,并清空已计算过的数值

知识
ValueStateDescriptor
ValueState

package com.flink.state;

import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.ArrayList;
import java.util.List;

public class StateApp {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

        test01(env);
        env.execute("StateApp");
    }

    /**
     * 使用ValueState
     * 统计窗口平均数
     *
     * 需求:入元素为Tuple2
     * 如果输入的元素>2,则求平均数,并半结果
     * @param env
     */
    private static void test01(StreamExecutionEnvironment env) {
        List<Tuple2<Long,Long>> list = new ArrayList<>();
        list.add(Tuple2.of(1l,3l));
        list.add(Tuple2.of(1l,7l));
        list.add(Tuple2.of(2l,4l));
        list.add(Tuple2.of(1l,5l));
        list.add(Tuple2.of(1l,2l));
        list.add(Tuple2.of(2l,3l));
        list.add(Tuple2.of(2l,5l));

        env.fromCollection(list).keyBy(x->x.f0).flatMap(new AvgWithValueState()).print();
    }


    static class AvgWithValueState extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Double>> {

        //求平均数:总和/记录条数
        //关键字transient:,序列化对象的时候,这个属性就不会被序列化。
        private transient ValueState<Tuple2<Long, Long>> sum;

        @Override
        public void open(Configuration parameters) throws Exception {
            ValueStateDescriptor<Tuple2<Long,Long>> descriptor = new ValueStateDescriptor<Tuple2<Long, Long>>("avg", Types.TUPLE(Types.LONG,Types.LONG));
            sum = getRuntimeContext().getState(descriptor);
        }

        @Override
        public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Double>> out) throws Exception {
            //in.
            Tuple2<Long,Long> currentState = sum.value();
            if (currentState==null){
                currentState = Tuple2.of(0L,0L);
            }
            currentState.f0 += 1;  //求数总
            currentState.f1 += value.f1; // 求和
            sum.update(currentState);

            //达到3条数据,就算出平均数
            if (sum.value().f0>=3){
                out.collect(Tuple2.of(value.f0,currentState.f1/currentState.f0.doubleValue()));
                sum.clear();//清理
            }
        }
    }
}

相关文章

  • Flink-ValueState实例

    功能描述 当计数到达3时求平均数,并清空已计算过的数值 知识ValueStateDescriptorValueState

  • SQL C语言基本操作

    相关API 打开 实例 关闭 实例 获取错误消息 操作表 实例创建 实例插入 实例修改 实例删除 实例回调查询 非回调

  • Python-数据类型及其操作方法

    数字类型 代码实例: 字符串类型 代码实例: 列表 代码实例: 元组 代码实例 字典: 代码实例 集合 代码实例:

  • HTML基础-03

    HTML 标题 实例 HTML 段落 实例 HTML 链接 实例 HTML 图像 实例

  • Python 类属性、实例属性、类方法、实例方法

    1、实例属性 实例属性,就是赋给由类创建的实例的属性,实例属性属于它所属的实例,不同实例之间的实例属性可以不同。 ...

  • STL算法之常用拷贝和替换

    copy API 实例 replace API 实例 replace_if API 实例 swap API 实例

  • Vue 基础

    Vue 实例 1. Vue实例 2. 实例属性 3. 实例方法/数据 4. 实例方法/事件 5. 实例方法/生命周...

  • 类中的方法

    1.实例方法的调用方式 实例对象.实例方法() 类对象.实例方法(实例对象) 例如: class Student ...

  • C语言100例

    C 练习实例01C 练习实例02C 练习实例03C 练习实例04C 练习实例05C 练习实例06 C 练习实例07...

  • AWS云计算助手级架构师认证之EC2-实例购买类型

    在购买EC2实例的时候,这里有三种购买类型需要理解:按需实例,预留实例,计划实例,竞价实例。 按需实例: ...

网友评论

      本文标题:Flink-ValueState实例

      本文链接:https://www.haomeiwen.com/subject/aikafrtx.html