美文网首页
2020-08-28

2020-08-28

作者: loukey_j | 来源:发表于2020-08-28 11:30 被阅读0次

    import org.apache.commons.lang3.StringUtils;
    import org.apache.flink.api.common.typeinfo.Types;
    import org.apache.flink.runtime.state.CheckpointListener;
    import org.apache.flink.runtime.state.FunctionInitializationContext;
    import org.apache.flink.runtime.state.FunctionSnapshotContext;
    import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.ProcessFunction;
    import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
    import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
    import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
    import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
    import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
    import org.apache.flink.util.Collector;

    import java.text.SimpleDateFormat;
    import java.util.ArrayList;
    import java.util.Date;
    import java.util.List;

    public class FlinkCheckpointTest {
    public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment steamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
    steamEnv.enableCheckpointing(1000L*2);
    steamEnv
    .addSource(new FSource()).setParallelism(4)
    .transform("开始事务", Types.STRING,new FStart()).setParallelism(1)
    .process(new FCombine()).name("事务预处理").setParallelism(4)
    .addSink(new FSubmit()).name("提交事务").setParallelism(1)
    ;
    steamEnv.execute("test");
    }

    static class FSource extends RichParallelSourceFunction<String>{
    @Override
    public void run(SourceContext<String> sourceContext) throws Exception {
    int I =0;
    while (true){
    I = I + 1;
    sourceContext.collect(Thread.currentThread().getId() +"-" +I);
    Thread.sleep(1000);
    }
    }
    @Override
    public void cancel() {}
    }

    static class FStart extends AbstractStreamOperator<String> implements OneInputStreamOperator<String,String>{
       volatile Long ckid = 0L;
        @Override
        public void processElement(StreamRecord<String> streamRecord) throws Exception {
            String value = streamRecord.getValue() + "-" + ckid;
            streamRecord.replace(value);
            log("收到数据: " + value + "..ckid:" + ckid);
            output.collect(streamRecord);
        }
        @Override
        public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
            log("开启事务: " + checkpointId);
            ckid = checkpointId;
            super.prepareSnapshotPreBarrier(checkpointId);
        }
    }
    
    static class FCombine extends ProcessFunction<String,String> implements CheckpointedFunction {
        List ls = new ArrayList<String>();
        Collector<String> collector =null;
        volatile Long ckid = 0L;
    
        @Override
        public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
            StringBuffer sb = new StringBuffer();
            ls.forEach(x->{sb.append(x).append(";");});
            log("批处理 " + functionSnapshotContext.getCheckpointId() + ": 时收到数据:" + sb.toString());
            Thread.sleep(5*1000);
            collector.collect("["+sb.toString() + "-"+ckid+"]");
            ls.clear();
            log("批处理 " + functionSnapshotContext.getCheckpointId() + " 完成");
            //Thread.sleep(5*1000);
            //Thread.sleep(20*1000);
        }
        @Override
        public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {        }
        @Override
        public void processElement(String s, Context context, Collector<String> out) throws Exception {
            if(StringUtils.isNotBlank(s)){
                ls.add(s);
            }
            log("收到数据 :" + s + "; 这批数据大小为:" + ls.size() + "..ckid:" + ckid);
            if(collector ==null){
                collector = out;
            }
        }
    }
    
    static class FSubmit extends RichSinkFunction<String> implements  CheckpointedFunction, CheckpointListener {
        List ls = new ArrayList<String>();
        volatile Long ckid = 0L;
        @Override
        public void notifyCheckpointComplete(long l) throws Exception {
            ckid = l;
            StringBuffer sb = new StringBuffer();
            ls.forEach(x->{sb.append(x).append("||");});
            log("submit notifyCheckpointComplete " + l + " over data:list size" + ls.size()+ "; detail" + sb.toString());
            Thread.sleep(100000);
            ls.clear();
        }
        @Override
        public void invoke(String value, Context context) throws Exception {
            if(StringUtils.isNotBlank(value)){
                ls.add(value);
            }
            log("收到数据 :" + value + " list zie:" + ls.size() + "..ckid:" + ckid);
        }
    
        @Override
        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            StringBuffer sb = new StringBuffer();
            ls.forEach(x->{sb.append(x).append("||");});
            log("submit snapshotState " + context.getCheckpointId() + " over data:list size" + ls.size()+ "; detail" + sb.toString());
            ls.clear();
        }
    
        @Override
        public void initializeState(FunctionInitializationContext context) throws Exception {
    
        }
    }
    public static void log(String s){
        String name = Thread.currentThread().getName();
        System.out.println(new SimpleDateFormat("HH:mm:ss.SSS").format(new Date())+":"+name + ":" + s);
    }
    

    }

    相关文章

      网友评论

          本文标题:2020-08-28

          本文链接:https://www.haomeiwen.com/subject/vnxlsktx.html