美文网首页
2020-08-28

2020-08-28

作者: loukey_j | 来源:发表于2020-08-28 11:30 被阅读0次

import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.Collector;

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;

public class FlinkCheckpointTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment steamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
steamEnv.enableCheckpointing(1000L*2);
steamEnv
.addSource(new FSource()).setParallelism(4)
.transform("开始事务", Types.STRING,new FStart()).setParallelism(1)
.process(new FCombine()).name("事务预处理").setParallelism(4)
.addSink(new FSubmit()).name("提交事务").setParallelism(1)
;
steamEnv.execute("test");
}

static class FSource extends RichParallelSourceFunction<String>{
@Override
public void run(SourceContext<String> sourceContext) throws Exception {
int I =0;
while (true){
I = I + 1;
sourceContext.collect(Thread.currentThread().getId() +"-" +I);
Thread.sleep(1000);
}
}
@Override
public void cancel() {}
}

static class FStart extends AbstractStreamOperator<String> implements OneInputStreamOperator<String,String>{
   volatile Long ckid = 0L;
    @Override
    public void processElement(StreamRecord<String> streamRecord) throws Exception {
        String value = streamRecord.getValue() + "-" + ckid;
        streamRecord.replace(value);
        log("收到数据: " + value + "..ckid:" + ckid);
        output.collect(streamRecord);
    }
    @Override
    public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
        log("开启事务: " + checkpointId);
        ckid = checkpointId;
        super.prepareSnapshotPreBarrier(checkpointId);
    }
}

static class FCombine extends ProcessFunction<String,String> implements CheckpointedFunction {
    List ls = new ArrayList<String>();
    Collector<String> collector =null;
    volatile Long ckid = 0L;

    @Override
    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        StringBuffer sb = new StringBuffer();
        ls.forEach(x->{sb.append(x).append(";");});
        log("批处理 " + functionSnapshotContext.getCheckpointId() + ": 时收到数据:" + sb.toString());
        Thread.sleep(5*1000);
        collector.collect("["+sb.toString() + "-"+ckid+"]");
        ls.clear();
        log("批处理 " + functionSnapshotContext.getCheckpointId() + " 完成");
        //Thread.sleep(5*1000);
        //Thread.sleep(20*1000);
    }
    @Override
    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {        }
    @Override
    public void processElement(String s, Context context, Collector<String> out) throws Exception {
        if(StringUtils.isNotBlank(s)){
            ls.add(s);
        }
        log("收到数据 :" + s + "; 这批数据大小为:" + ls.size() + "..ckid:" + ckid);
        if(collector ==null){
            collector = out;
        }
    }
}

static class FSubmit extends RichSinkFunction<String> implements  CheckpointedFunction, CheckpointListener {
    List ls = new ArrayList<String>();
    volatile Long ckid = 0L;
    @Override
    public void notifyCheckpointComplete(long l) throws Exception {
        ckid = l;
        StringBuffer sb = new StringBuffer();
        ls.forEach(x->{sb.append(x).append("||");});
        log("submit notifyCheckpointComplete " + l + " over data:list size" + ls.size()+ "; detail" + sb.toString());
        Thread.sleep(100000);
        ls.clear();
    }
    @Override
    public void invoke(String value, Context context) throws Exception {
        if(StringUtils.isNotBlank(value)){
            ls.add(value);
        }
        log("收到数据 :" + value + " list zie:" + ls.size() + "..ckid:" + ckid);
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        StringBuffer sb = new StringBuffer();
        ls.forEach(x->{sb.append(x).append("||");});
        log("submit snapshotState " + context.getCheckpointId() + " over data:list size" + ls.size()+ "; detail" + sb.toString());
        ls.clear();
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {

    }
}
public static void log(String s){
    String name = Thread.currentThread().getName();
    System.out.println(new SimpleDateFormat("HH:mm:ss.SSS").format(new Date())+":"+name + ":" + s);
}

}

相关文章

  • 2020-08-27springcloud

    2020-08-27 无操作 先学一下dubbo,zookeeper,分布式基础 2020-08-28 dubbo...

  • docker 相关问题

    作者时间雨中星辰2020-08-28 1. 将本地镜像推送到远程仓库,报x509: certificate sig...

  • 重读--一位市委副书记的高考:一名搬运工通过高考改变命运

    【我的读后感】 2020-08-28 重读安永全先生是1945 年生人,1961年8月霍县搬运公司工人;经过刻苦努...

  • 2020-08-29

    2020-08-28 姓名 :曹静杰 企业名称 : 辽宁辽阳丛迪服装有限公司 组别 388期 反省1组 志工529...

  • 写给陌生人的第602封信

    写给陌生人的第602封信 陌生人: 你好。见信佳!周五快乐。 我是写信人。 这封信写于2020-08-28。 这是...

  • 2020-08-28

    2020-08-28 吸引力法则七期二阶 教练-卢颖丽 一、照镜子:美女早上好!我爱你!亲吻自己! 感悟:喜欢自己...

  • 若水日记/26/ 等待

    2020-08-28 星期五 小雨转多云 夜凉如水,早晨起来仍飘着微雨。 秋的气息已渗入到骨骼,我将身上的短袖衣服...

  • 慢性腹泻体质的调理日记

    2020-08-28 慢性腹泻很多年了,记忆力是从研究生开始,研二夏天就开始了,有的时候拉的严重,肚子酸溜溜,工作...

  • 爱一样吗?

    中原焦点团队中19胡利娜坚持分享第409天2020-08-28 今天早上6点30开始叫孩子们起床,7点钟准时出门。...

  • 杨紫中餐厅吃剩菜,背后你不知道的真相

    杨紫中餐厅吃剩菜,背后你不知道的真相 风中划过的飞鸿2020-08-28 17:32:40 1 《中餐厅》的一期节...

网友评论

      本文标题:2020-08-28

      本文链接:https://www.haomeiwen.com/subject/vnxlsktx.html