美文网首页
(6)FlinkSQL将kafka数据写入到mysql方式一

(6)FlinkSQL将kafka数据写入到mysql方式一

作者: NBI大数据可视化分析 | 来源:发表于2022-08-11 10:57 被阅读0次
    1.png

    这里不展开zookeeper、kafka安装配置
    (1)首先需要启动zookeeper和kafka


    1.1.png

    (2)定义一个kafka生产者

    package com.producers;
    
    import com.alibaba.fastjson.JSONObject;
    import com.pojo.Event;
    import com.pojo.WaterSensor;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    import java.util.Properties;
    import java.util.Random;
    
    /**
     * Created by lj on 2022-07-09.
     */
    public class Kafaka_Producer {
        public final static String bootstrapServers = "127.0.0.1:9092";
    
        public static void main(String[] args) {
            Properties props = new Properties();
            //设置Kafka服务器地址
            props.put("bootstrap.servers", bootstrapServers);
            //设置数据key的序列化处理类
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            //设置数据value的序列化处理类
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            KafkaProducer<String, String> producer = new KafkaProducer<>(props);
    
            try {
                int i = 0;
                Random r=new Random();   //不传入种子
                String[] lang = {"flink","spark","hadoop","hive","hbase","impala","presto","superset","nbi"};
    
                while(true) {
                    Thread.sleep(2000);
                    WaterSensor waterSensor = new WaterSensor(lang[r.nextInt(lang.length)],i,i);
                    i++;
    
                    String msg = JSONObject.toJSONString(waterSensor);
                    System.out.println(msg);
                    RecordMetadata recordMetadata = producer.send(new ProducerRecord<>("kafka_data_waterSensor", null, null,  msg)).get();
    //                System.out.println("recordMetadata: {"+ recordMetadata +"}");
                }
    
            } catch (Exception e) {
                System.out.println(e.getMessage());
            }
        }
    }
    

    (3)定义一个消息对象

    package com.pojo;
    
    import java.io.Serializable;
    
    /**
     * Created by lj on 2022-07-05.
     */
    public class WaterSensor implements Serializable {
        private String id;
        private long ts;
        private int vc;
    
        public WaterSensor(){
    
        }
    
        public WaterSensor(String id,long ts,int vc){
            this.id = id;
            this.ts = ts;
            this.vc = vc;
        }
    
        public int getVc() {
            return vc;
        }
    
        public void setVc(int vc) {
            this.vc = vc;
        }
    
        public String getId() {
            return id;
        }
    
        public void setId(String id) {
            this.id = id;
        }
    
        public long getTs() {
            return ts;
        }
    
        public void setTs(long ts) {
            this.ts = ts;
        }
    }
    

    (4)从kafka接入数据,并写入到mysql

    public static void main(String[] args) throws Exception {
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    
            //读取kafka的数据
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers","127.0.0.1:9092");
            properties.setProperty("group.id", "consumer-group");
            properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            properties.setProperty("auto.offset.reset", "latest");
    
            DataStreamSource<String> streamSource = env.addSource(
                    new FlinkKafkaConsumer<String>(
                            "kafka_waterSensor",
                            new SimpleStringSchema(),
                            properties)
            );
    
            SingleOutputStreamOperator<WaterSensor> waterDS = streamSource.map(new MapFunction<String, WaterSensor>() {
                @Override
                public WaterSensor map(String s) throws Exception {
                    JSONObject json  = (JSONObject)JSONObject.parse(s);
                    return new WaterSensor(json.getString("id"),json.getLong("ts"),json.getInteger("vc"));
                }
            });
    
            // 将流转化为表
            Table table = tableEnv.fromDataStream(waterDS,
                    $("id"),
                    $("ts"),
                    $("vc"),
                    $("pt").proctime());
    
            tableEnv.createTemporaryView("EventTable", table);
    
    
            tableEnv.executeSql("CREATE TABLE flinksink (" +
                    "componentname STRING," +
                    "componentcount BIGINT NOT NULL," +
                    "componentsum BIGINT" +
                    ") WITH (" +
                    "'connector.type' = 'jdbc'," +
                    "'connector.url' = 'jdbc:mysql://localhost:3306/testdb?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai'," +
                    "'connector.table' = 'flinksink'," +
                    "'connector.driver' =  'com.mysql.cj.jdbc.Driver'," +
                    "'connector.username' = 'root'," +
                    "'connector.password' = 'root'," +
                    "'connector.write.flush.max-rows'='3'\r\n" +
                    ")"
            );
            Table mysql_user = tableEnv.from("flinksink");
            mysql_user.printSchema();
    
            Table result = tableEnv.sqlQuery(
                    "SELECT " +
                            "id as componentname, " +                //window_start, window_end,
                            "COUNT(ts) as componentcount ,SUM(ts) as componentsum " +
                            "FROM TABLE( " +
                            "TUMBLE( TABLE EventTable , " +
                            "DESCRIPTOR(pt), " +
                            "INTERVAL '10' SECOND)) " +
                            "GROUP BY id , window_start, window_end"
            );
    
            //方式一:写入数据库
    //        result.executeInsert("flinksink").print(); //;.insertInto("flinksink");
    
            //方式二:写入数据库
            tableEnv.createTemporaryView("ResultTable", result);
            tableEnv.executeSql("insert into flinksink SELECT * FROM ResultTable").print();
    
    //        tableEnv.toAppendStream(result, Row.class).print("toAppendStream");           //追加模式
            env.execute();
    
        }
    

    (5)效果演示


    2.png 3.png

    相关文章

      网友评论

          本文标题:(6)FlinkSQL将kafka数据写入到mysql方式一

          本文链接:https://www.haomeiwen.com/subject/bxkwwrtx.html