美文网首页
通过flink 插入数据Stream_load 方式

通过flink 插入数据Stream_load 方式

作者: wudl | 来源:发表于2022-01-04 00:13 被阅读0次

1. 场景:

通过模拟数据 发送kafa ---> flink 接收到kakfa 数据--> 然后通过flink 进行 streamLoad 插入数据到doris 中;

2. 建表语句

 CREATE TABLE `wudl_doris01` (
   `id` int NULL COMMENT "",
   `name` varchar(200) NULL COMMENT "",
   `address` string NULL COMMENT "",
   `city` varchar(2000) NULL COMMENT "",
   `phone` varchar(200) NULL COMMENT ""
 ) ENGINE=OLAP
 DUPLICATE KEY(`id`)
 COMMENT "flink sink  测试表"
 DISTRIBUTED BY HASH(`id`) BUCKETS 1
 PROPERTIES (
 "replication_num" = "1",
 "in_memory" = "false",
 "storage_format" = "V2"
 );

3.flink 插入数据代码:

项目结构


flink-doris目录.png

3.1 DorisBean:

package com.wudl.flink.doris.bean;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;

import java.io.Serializable;

/**
 * @author :wudl
 * @date :Created in 2022-01-02 20:31
 * @description:
 * @modified By:
 * @version: 1.0
 */


public class DorisBean  implements Serializable {
    private static final long serialVersionUID = 1L;

    private Integer id;
    private String name;
    private String address;
    private String city;
    private String phone;

    @Override
    public String toString() {
        return "DorisBean{" +
                "id=" + id +
                ", name='" + name + '\'' +
                ", address='" + address + '\'' +
                ", city='" + city + '\'' +
                ", phone='" + phone + '\'' +
                '}';
    }

    public Integer getId() {
        return id;
    }

    public void setId(Integer id) {
        this.id = id;
    }



    public String getAddress() {
        return address;
    }

    public void setAddress(String address) {
        this.address = address;
    }

    public String getCity() {
        return city;
    }

    public void setCity(String city) {
        this.city = city;
    }

    public String getPhone() {
        return phone;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public void setPhone(String phone) {
        this.phone = phone;
    }

    public DorisBean() {
    }

    public DorisBean(Integer id, String name, String address, String city, String phone) {
        this.id = id;
        this.name = name;
        this.address = address;
        this.city = city;
        this.phone = phone;
    }
}

3.2 返回的一个bean

package com.wudl.flink.doris.bean;

import java.io.Serializable;

/**
 *
 *
 * @author wudl
 */
public class RespContent implements Serializable {

    private static final long serialVersionUID = 1L;

    private int TxnId;

    private String Label;

    private String Status;

    private String ExistingJobStatus;

    private String Message;

    private long NumberTotalRows;

    private long NumberLoadedRows;

    private int NumberFilteredRows;

    private int NumberUnselectedRows;

    private long LoadBytes;

    private int LoadTimeMs;

    private int BeginTxnTimeMs;

    private int StreamLoadPutTimeMs;

    private int ReadDataTimeMs;

    private int WriteDataTimeMs;

    private int CommitAndPublishTimeMs;

    private String ErrorURL;

    public int getTxnId() {
        return TxnId;
    }

    public void setTxnId(int txnId) {
        TxnId = txnId;
    }

    public String getLabel() {
        return Label;
    }

    public void setLabel(String label) {
        Label = label;
    }

    public String getStatus() {
        return Status;
    }

    public void setStatus(String status) {
        Status = status;
    }

    public String getExistingJobStatus() {
        return ExistingJobStatus;
    }

    public void setExistingJobStatus(String existingJobStatus) {
        ExistingJobStatus = existingJobStatus;
    }

    public String getMessage() {
        return Message;
    }

    public void setMessage(String message) {
        Message = message;
    }

    public long getNumberTotalRows() {
        return NumberTotalRows;
    }

    public void setNumberTotalRows(long numberTotalRows) {
        NumberTotalRows = numberTotalRows;
    }

    public long getNumberLoadedRows() {
        return NumberLoadedRows;
    }

    public void setNumberLoadedRows(long numberLoadedRows) {
        NumberLoadedRows = numberLoadedRows;
    }

    public int getNumberFilteredRows() {
        return NumberFilteredRows;
    }

    public void setNumberFilteredRows(int numberFilteredRows) {
        NumberFilteredRows = numberFilteredRows;
    }

    public int getNumberUnselectedRows() {
        return NumberUnselectedRows;
    }

    public void setNumberUnselectedRows(int numberUnselectedRows) {
        NumberUnselectedRows = numberUnselectedRows;
    }

    public long getLoadBytes() {
        return LoadBytes;
    }

    public void setLoadBytes(long loadBytes) {
        LoadBytes = loadBytes;
    }

    public int getLoadTimeMs() {
        return LoadTimeMs;
    }

    public void setLoadTimeMs(int loadTimeMs) {
        LoadTimeMs = loadTimeMs;
    }

    public int getBeginTxnTimeMs() {
        return BeginTxnTimeMs;
    }

    public void setBeginTxnTimeMs(int beginTxnTimeMs) {
        BeginTxnTimeMs = beginTxnTimeMs;
    }

    public int getStreamLoadPutTimeMs() {
        return StreamLoadPutTimeMs;
    }

    public void setStreamLoadPutTimeMs(int streamLoadPutTimeMs) {
        StreamLoadPutTimeMs = streamLoadPutTimeMs;
    }

    public int getReadDataTimeMs() {
        return ReadDataTimeMs;
    }

    public void setReadDataTimeMs(int readDataTimeMs) {
        ReadDataTimeMs = readDataTimeMs;
    }

    public int getWriteDataTimeMs() {
        return WriteDataTimeMs;
    }

    public void setWriteDataTimeMs(int writeDataTimeMs) {
        WriteDataTimeMs = writeDataTimeMs;
    }

    public int getCommitAndPublishTimeMs() {
        return CommitAndPublishTimeMs;
    }

    public void setCommitAndPublishTimeMs(int commitAndPublishTimeMs) {
        CommitAndPublishTimeMs = commitAndPublishTimeMs;
    }

    public String getErrorURL() {
        return ErrorURL;
    }

    public void setErrorURL(String errorURL) {
        ErrorURL = errorURL;
    }

    @Override
    public String toString() {
        return "RespContent{" +
                "TxnId=" + TxnId +
                ", Label='" + Label + '\'' +
                ", Status='" + Status + '\'' +
                ", ExistingJobStatus='" + ExistingJobStatus + '\'' +
                ", Message='" + Message + '\'' +
                ", NumberTotalRows=" + NumberTotalRows +
                ", NumberLoadedRows=" + NumberLoadedRows +
                ", NumberFilteredRows=" + NumberFilteredRows +
                ", NumberUnselectedRows=" + NumberUnselectedRows +
                ", LoadBytes=" + LoadBytes +
                ", LoadTimeMs=" + LoadTimeMs +
                ", BeginTxnTimeMs=" + BeginTxnTimeMs +
                ", StreamLoadPutTimeMs=" + StreamLoadPutTimeMs +
                ", ReadDataTimeMs=" + ReadDataTimeMs +
                ", WriteDataTimeMs=" + WriteDataTimeMs +
                ", CommitAndPublishTimeMs=" + CommitAndPublishTimeMs +
                ", ErrorURL='" + ErrorURL + '\'' +
                '}';
    }

}

3.4 DorisSink

package com.wudl.flink.doris.sink;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.wudl.flink.doris.bean.RespContent;
import com.wudl.flink.doris.utils.DorisStreamLoad;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

/**
 * @author :wudl
 * @date :Created in 2022-01-02 20:00
 * @description:
 * @modified By:
 * @version: 1.0
 */

public class DorisSink extends RichSinkFunction<String> {

    private static final Logger log = LoggerFactory.getLogger(DorisSink.class);

    private final static List<String> DORIS_SUCCESS_STATUS = new ArrayList<>(Arrays.asList("Success", "Publish Timeout"));

    private DorisStreamLoad dorisStreamLoad;

    private String columns;

    private String jsonFormat;

    public DorisSink(DorisStreamLoad dorisStreamLoad, String columns, String jsonFormat) {
        this.dorisStreamLoad = dorisStreamLoad;
        this.columns = columns;
        this.jsonFormat = jsonFormat;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
    }


    /**
     * 判断StreamLoad是否成功
     *
     * @param respContent streamload返回的响应信息(JSON格式)
     * @return
     */
    public static Boolean checkStreamLoadStatus(RespContent respContent) {
        return DORIS_SUCCESS_STATUS.contains(respContent.getStatus())
                && respContent.getNumberTotalRows() == respContent.getNumberLoadedRows();
    }

    @Override
    public void invoke(String value, Context context) throws Exception {
        // 截取有效数据

        JSONObject data = JSONObject.parseObject(value);
//        value = JSON.toJSONString(value);
        value = JSON.toJSONString(data.get("data"));
        System.out.println("value----"+value);
        DorisStreamLoad.LoadResponse loadResponse = dorisStreamLoad.loadBatch(value, columns, jsonFormat);
        if (loadResponse != null && loadResponse.status == 200) {
            RespContent respContent = JSON.parseObject(loadResponse.respContent, RespContent.class);
            if (!checkStreamLoadStatus(respContent)) {
                log.error("Stream Load fail{}:", loadResponse);
            } else {
                log.info("Stream Load success{}:", loadResponse);
            }
        } else {
            log.error("Stream Load Request failed:{}", loadResponse);
        }
    }

}

3.5 GenerateData 生成数据

package com.wudl.flink.doris.source;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.wudl.flink.doris.bean.DorisBean;
import com.wudl.flink.doris.utils.MyKafkaUtil;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.text.SimpleDateFormat;
import java.util.*;

/**
 * @author :wudl
 * @date :Created in 2022-01-01 13:05
 * @description: 产生数据
 * @modified By:
 * @version: 1.0
 */

public class GenerateData implements SourceFunction<String>  {


        private boolean isRunning = true;

        String[] citys = {"北京","广东","山东","江苏","河南","上海","河北","浙江","香港","山西","陕西","湖南","重庆","福建","天津","云南","四川","广西","安徽","海南","江西","湖北","山西","辽宁","内蒙古"};

        Integer i = 0;
        List<DorisBean>  list = new ArrayList<>();
        @Override
        public void run(SourceContext<String> ctx) throws Exception {

            Random random = new Random();
            SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            while (isRunning) {
                int number = random.nextInt(4) + 1;
                Integer id =  i++;
                String name = df.format(new Date());
//                Integer name = id+100;
                String address = "1";
                String city = citys[random.nextInt(citys.length)];
//                int age = random.nextInt(25);
                String phone = getTel();
                DorisBean  dorisBean = new DorisBean(id,name,address,city,phone);
                list.add(dorisBean);
                if (list.size()==2000)
                {
                    Map<String,List> map = new HashMap<>();
                    map.put("data",list);
                    String s = JSON.toJSONString(map);
                    System.out.println("map--->"+s);
                    list = new ArrayList<>();
                    ctx.collect(s);
//                    Thread.sleep(5000*2);
                }


            }
        }

        @Override
        public void cancel() {

            isRunning = false;
        }


    private static String[] telFirst="134,135,136,137,138,139,150,151,152,157,158,159,130,131,132,155,156,133,153".split(",");
    private static String getTel() {
        int index=getNum(0,telFirst.length-1);
        String first=telFirst[index];
        String second=String.valueOf(getNum(1,888)+10000).substring(1);
        String third=String.valueOf(getNum(1,9100)+10000).substring(1);
        return first+second+third;
    }

    public static int getNum(int start,int end) {
        return (int)(Math.random()*(end-start+1)+start);
    }

    public static void main(String[] args) throws Exception {
         String default_topic = "wudltopicdoris01";
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(8);
        DataStreamSource<String> generateSource = env.addSource(new GenerateData());
        generateSource.print("--------");
        generateSource.addSink(MyKafkaUtil.getKafkaProducer(default_topic));

        env.execute();


    }

}

3.6 doris 加载工具类

package com.wudl.flink.doris.utils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.*;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Calendar;
import java.util.UUID;

/**
 * Doris Stream Load 工具类
 *
 * @author wudl
 */
public class DorisStreamLoad implements Serializable {
    private static final long serialVersionUID = 1L;

    private static final Logger log = LoggerFactory.getLogger(DorisStreamLoad.class);
    /**
     * 连接地址,这里使用的是连接FE
     */
    private static String loadUrlPattern = "http://%s/api/%s/%s/_stream_load?";
    /**
     * fe ip地址
     */
    private String hostPort;
    /**
     * 数据库
     */
    private String db;
    /**
     * 要导入的数据表名
     */
    private String table;
    /**
     * 用户名
     */
    private String username;
    /**
     * 密码
     */
    private String password;
    /**
     * 导入连接
     */
    private String loadUrlStr;
    /**
     * 授权
     */
    private String authEncoding;

    public DorisStreamLoad(String hostPort, String db, String table, String username, String password) {
        this.hostPort = hostPort;
        this.db = db;
        this.table = table;
        this.username = username;
        this.password = password;
        this.loadUrlStr = String.format(loadUrlPattern, hostPort, db, table);
        this.authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", username, password).getBytes(StandardCharsets.UTF_8));
    }

    /**
     * 获取http连接信息
     */
    private HttpURLConnection getConnection(String urlStr, String label, String columns, String jsonformat) throws IOException {
        URL url = new URL(urlStr);
        HttpURLConnection conn = (HttpURLConnection) url.openConnection();
        conn.setInstanceFollowRedirects(false);
        conn.setRequestMethod("PUT");
        conn.setRequestProperty("Authorization", "Basic " + authEncoding);
        conn.addRequestProperty("Expect", "100-continue");
        conn.addRequestProperty("Content-Type", "text/plain; charset=UTF-8");
        conn.addRequestProperty("label", label);
        conn.addRequestProperty("max_filter_ratio", "0");
        conn.addRequestProperty("strict_mode", "true");
        conn.addRequestProperty("columns", columns);
        conn.addRequestProperty("format", "json");
        conn.addRequestProperty("jsonpaths", jsonformat);
        conn.addRequestProperty("strip_outer_array", "true");
        conn.setDoOutput(true);
        conn.setDoInput(true);
        return conn;
    }

    public static class LoadResponse {
        public int status;
        public String respMsg;
        public String respContent;

        public LoadResponse(int status, String respMsg, String respContent) {
            this.status = status;
            this.respMsg = respMsg;
            this.respContent = respContent;
        }

        @Override
        public String toString() {
            StringBuilder sb = new StringBuilder();
            sb.append("status: ").append(status);
            sb.append(", resp msg: ").append(respMsg);
            sb.append(", resp content: ").append(respContent);
            return sb.toString();
        }
    }

    /**
     * 执行数据导入
     */
    public LoadResponse loadBatch(String data, String columns, String jsonformat) {
        Calendar calendar = Calendar.getInstance();
        //导入的lable,全局唯一
        String label = String.format("flink_import_%s%02d%02d_%02d%02d%02d_%s",
                calendar.get(Calendar.YEAR), calendar.get(Calendar.MONTH) + 1, calendar.get(Calendar.DAY_OF_MONTH),
                calendar.get(Calendar.HOUR_OF_DAY), calendar.get(Calendar.MINUTE), calendar.get(Calendar.SECOND),
                UUID.randomUUID().toString().replaceAll("-", ""));

        HttpURLConnection feConn = null;
        HttpURLConnection beConn = null;

        try {
            // build request and send to fe
            feConn = getConnection(loadUrlStr, label, columns, jsonformat);
            int status = feConn.getResponseCode();
            // fe send back http response code TEMPORARY_REDIRECT 307 and new be location
            if (status != 307) {
                throw new Exception("status is not TEMPORARY_REDIRECT 307, status: " + status);
            }
            String location = feConn.getHeaderField("Location");
            if (location == null) {
                throw new Exception("redirect location is null");
            }
            // build request and send to new be location
            beConn = getConnection(location, label, columns, jsonformat);
            // send data to be
            BufferedOutputStream bos = new BufferedOutputStream(beConn.getOutputStream());
            bos.write(data.getBytes());
            bos.close();

            // get respond
            status = beConn.getResponseCode();
            String respMsg = beConn.getResponseMessage();
            InputStream stream = (InputStream) beConn.getContent();
            BufferedReader br = new BufferedReader(new InputStreamReader(stream));
            StringBuilder response = new StringBuilder();
            String line;
            while ((line = br.readLine()) != null) {
                response.append(line);
            }
            return new LoadResponse(status, respMsg, response.toString());

        } catch (Exception e) {
            e.printStackTrace();
            String err = "failed to load audit via AuditLoader plugin with label: " + label;
            log.warn(err, e);
            return new LoadResponse(-1, e.getMessage(), err);
        } finally {
            if (feConn != null) {
                feConn.disconnect();
            }
            if (beConn != null) {
                beConn.disconnect();
            }
        }
    }

}

3.7 kafka 工具类

package com.wudl.flink.doris.utils;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;

import java.util.Properties;

/**
 * @ClassName : MyKafkaUtil
 * @Description : kakfa 工具类
 * @Author :wudl
 * @Date: 2021-10-07 21:18
 */

public class MyKafkaUtil {
    private static String brokers = "192.168.1.161:6667";
    private static String default_topic = "wudltopic";

    public static FlinkKafkaProducer<String> getKafkaProducer(String topic) {
        return new FlinkKafkaProducer<String>(brokers,
                topic,
                new SimpleStringSchema());
    }

    public static <T> FlinkKafkaProducer<T> getKafkaProducer(KafkaSerializationSchema<T> kafkaSerializationSchema) {

        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);

        return new FlinkKafkaProducer<T>(default_topic,
                kafkaSerializationSchema,
                properties,
                FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
    }

    public static FlinkKafkaConsumer<String> getKafkaConsumer(String topic, String groupId) {

        Properties properties = new Properties();

        properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);

        return new FlinkKafkaConsumer<String>(topic,
                new SimpleStringSchema(),
                properties);

    }

    /**
     * 拼接kafka 相关属性到ddl
     * @param topic
     * @param groupId
     * @return
     */
    //拼接Kafka相关属性到DDL
    public static String getKafkaDDL(String topic, String groupId) {
        return  " 'connector' = 'kafka', " +
                " 'topic' = '" + topic + "'," +
                " 'properties.bootstrap.servers' = '" + brokers + "', " +
                " 'properties.group.id' = '" + groupId + "', " +
                " 'format' = 'json', " +
                " 'scan.startup.mode' = 'latest-offset'  ";
    }


}

3.8 程序入口类

package com.wudl.flink.doris;

import com.wudl.flink.doris.sink.DorisSink;
import com.wudl.flink.doris.source.GenerateData;
import com.wudl.flink.doris.utils.DorisStreamLoad;
import com.wudl.flink.doris.utils.MyKafkaUtil;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.util.Properties;

/**
 * @author :wudl
 * @date :Created in 2022-01-01 13:15
 * @description:
 * @modified By:
 * @version: 1.0
 */

public class DorisApp {

    private static final String bootstrapServer = "192.168.1.161:6667";

    private static final String groupName = "flink_doris_group006";

    private static final String topicName = "wudltopicdoris01";

    private static final String hostPort = "192.168.1.161:8090";

    private static final String dbName = "wudldb";

    private static final String tbName = "wudl_doris01";

    private static final String userName = "root";

    private static final String password = "";

//    private static final String columns = "id,name,address,city,phone";
    private static final String columns = "address,city,id,name,phone";
//"address":"广东省","city":"海南","id":183,"name":"2022-01-03 00:41:37","phone":"15007840220"}

//    private static final String jsonFormat = "[\"$.name\",\"$.age\",\"$.price\",\"$.sale\"]";
    private static final String jsonFormat = "[\"$.address\",\"$.city\",\"$.id\",\"$.name\",\"$.phone\"]";
//    private static final String jsonFormat =   "[\"$.address\",\"$.city\",\"$.id\",\"$.name\",\"$.phone\"]";

    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put("bootstrap.servers", bootstrapServer);
        props.put("group.id", groupName);
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//        props.put("auto.offset.reset", "earliest");
//        props.put("max.poll.records", "10000");

        SimpleStringSchema simpleStringSchema = new SimpleStringSchema();
        StreamExecutionEnvironment blinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
//        blinkStreamEnv.setParallelism(1);
        blinkStreamEnv.enableCheckpointing(10000);
        blinkStreamEnv.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer<>(topicName,
                simpleStringSchema,
                props);

//        DataStreamSource<String> dataStreamSource = blinkStreamEnv.socketTextStream("192.168.1.163", 9999);

//        DataStreamSource<String> dataStreamSource = blinkStreamEnv.addSource(flinkKafkaConsumer);
                DataStreamSource<String> dataStreamSource = blinkStreamEnv.addSource(new GenerateData());

        dataStreamSource.map(new MapFunction<String, String>() {
            @Override
            public String map(String s) throws Exception {
                System.out.println(s);
                return s;
            }
        });


        DorisStreamLoad dorisStreamLoad = new DorisStreamLoad(hostPort, dbName, tbName, userName, password);

        dataStreamSource.addSink(new DorisSink(dorisStreamLoad, columns, jsonFormat));

        blinkStreamEnv.execute("flink kafka to doris");

    }
}

4.执行结果 执行半个小时 插入插入 4千万数据

doris-count.png

5. pom 文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<!--    <parent>-->
<!--        <artifactId>Flink-learning</artifactId>-->
<!--        <groupId>com.wudl.flink</groupId>-->
<!--        <version>1.0-SNAPSHOT</version>-->
<!--    </parent>-->

    <groupId>org.wudlflink13</groupId>
    <version>1.0-SNAPSHOT</version>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>wudl-flink-13</artifactId>

    <!-- 指定仓库位置,依次为aliyun、apache和cloudera仓库 -->
    <repositories>
        <repository>
            <id>aliyun</id>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
        </repository>
        <repository>
            <id>apache</id>
            <url>https://repository.apache.org/content/repositories/snapshots/</url>
        </repository>
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>

        <repository>
            <id>spring-plugin</id>
            <url>https://repo.spring.io/plugins-release/</url>
        </repository>

    </repositories>

    <properties>
        <encoding>UTF-8</encoding>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <java.version>1.8</java.version>
        <scala.version>2.11</scala.version>
        <flink.version>1.13.3</flink.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.78</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-elasticsearch7_2.11</artifactId>
            <version>1.12.3</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-hbase-2.2_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>


        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-jdbc_2.11</artifactId>
            <version>1.10.3</version>
        </dependency>

        <!--依赖Scala语言-->


        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>${flink.version}</version>
            <exclusions>
                <exclusion>
                    <artifactId>slf4j-api</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala-bridge_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- blink执行计划,1.11+默认的-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.12</artifactId>
            <version>${flink.version}</version>
            <exclusions>
                <exclusion>
                    <artifactId>slf4j-api</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-cep_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>


        <!-- flink连接器-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-sql-connector-kafka_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-csv</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.bahir</groupId>
            <artifactId>flink-connector-redis_2.11</artifactId>
            <version>1.0</version>
            <exclusions>
                <exclusion>
                    <artifactId>flink-streaming-java_2.11</artifactId>
                    <groupId>org.apache.flink</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>flink-runtime_2.11</artifactId>
                    <groupId>org.apache.flink</groupId>
                </exclusion>
                <!--                <exclusion>-->
                <!--                    <artifactId>flink-core</artifactId>-->
                <!--                    <groupId>org.apache.flink</groupId>-->
                <!--                </exclusion>-->
                <exclusion>
                    <artifactId>flink-java</artifactId>
                    <groupId>org.apache.flink</groupId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-hive_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-metastore</artifactId>
            <version>2.1.0</version>
            <exclusions>
                <exclusion>
                    <artifactId>hadoop-hdfs</artifactId>
                    <groupId>org.apache.hadoop</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>slf4j-api</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>
<!--        <dependency>-->
<!--            <groupId>org.apache.hive</groupId>-->
<!--            <artifactId>hive-exec</artifactId>-->
<!--            <version>2.1.0</version>-->
<!--            <exclusions>-->
<!--                <exclusion>-->
<!--                    <artifactId>slf4j-api</artifactId>-->
<!--                    <groupId>org.slf4j</groupId>-->
<!--                </exclusion>-->
<!--            </exclusions>-->
<!--        </dependency>-->

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-shaded-hadoop-2-uber</artifactId>
            <version>2.7.5-10.0</version>
            <exclusions>
                <exclusion>
                    <artifactId>slf4j-log4j12</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>2.1.0</version>
            <exclusions>
                <exclusion>
                    <artifactId>slf4j-api</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>slf4j-log4j12</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.38</version>
            <!--<version>8.0.20</version>-->
        </dependency>

        <!-- 高性能异步组件:Vertx-->
        <dependency>
            <groupId>io.vertx</groupId>
            <artifactId>vertx-core</artifactId>
            <version>3.9.0</version>
        </dependency>
        <dependency>
            <groupId>io.vertx</groupId>
            <artifactId>vertx-jdbc-client</artifactId>
            <version>3.9.0</version>
        </dependency>
        <dependency>
            <groupId>io.vertx</groupId>
            <artifactId>vertx-redis-client</artifactId>
            <version>3.9.0</version>
        </dependency>

        <!-- 日志 -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.7</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
            <scope>runtime</scope>
        </dependency>


        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.2</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>3.1.3</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>3.1.3</version>
            <exclusions>
                <exclusion>
                    <artifactId>slf4j-api</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>slf4j-log4j12</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-hbase_2.11</artifactId>
            <version>1.10.1</version>
        </dependency>


    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

相关文章

网友评论

      本文标题:通过flink 插入数据Stream_load 方式

      本文链接:https://www.haomeiwen.com/subject/fiofcrtx.html