美文网首页
Flink-1.12(二)测试模拟实时 Stream

Flink-1.12(二)测试模拟实时 Stream

作者: _大叔_ | 来源:发表于2021-06-09 18:46 被阅读0次

windows 开发的话,我们可以下载nc,把其中的nc.exe 拷贝到我们 C:\Users\当前登录的用户 下就可以。
然后 cmd 输入以下命令,-L 为监听,p是端口,监听7777,下载地址 https://eternallybored.org/misc/netcat/

nc -Lp 7777

启动以下程序就可以模拟实时stream

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.12.2</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>1.12.2</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>1.12.2</version>
        </dependency>

    </dependencies>
public class EventData{

    private Integer id;
    private Long eventTime;
    private String data;
    private Integer num;

    public EventData(){

    };

    public Integer getNum() {
        return num;
    }

    public void setNum(Integer num) {
        this.num = num;
    }

    public Integer getId() {
        return id;
    }

    public void setId(Integer id) {
        this.id = id;
    }

    public Long getEventTime() {
        return eventTime;
    }

    public void setEventTime(Long eventTime) {
        this.eventTime = eventTime;
    }

    public String getData() {
        return data;
    }

    public void setData(String data) {
        this.data = data;
    }

    @Override
    public String toString() {
        return "EventData{" +
                "id=" + id +
                ", eventTime=" + eventTime +
                ", data='" + data + '\'' +
                ", num=" + num +
                '}';
    }
}

package com.example.demo;

import com.example.demo.bean.EventData;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author big uncle
 * @date 2021/6/5 15:36
 * @module
 **/
public class EventTime {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // 监听本地IP
        DataStream<String> dataStream = env.socketTextStream("192.168.200.58", 7777);
        // 数据转换
        dataStream.map(new MapFunction<String, EventData>() {
            @Override
            public EventData map(String value) throws Exception {
                String[] strs = value.split(",");
                EventData eventData = new EventData();
                eventData.setId(Integer.valueOf(strs[0]));
                eventData.setEventTime(Long.valueOf(strs[1]));
                eventData.setData(strs[2]);
                eventData.setNum(Integer.valueOf(strs[3]));
                return eventData;

            }
        }).print();
        env.execute("test");
    }
}

测试数据

1,1623051400,test data,1
1,1623051401,test data,1
1,1623051402,test data,1
1,1623051405,test data,3
1,1623051406,test data,3
1,1623051409,test data,3
1,1623051410,test data,5

相关文章

网友评论

      本文标题:Flink-1.12(二)测试模拟实时 Stream

      本文链接:https://www.haomeiwen.com/subject/txlosltx.html