美文网首页
自定义sourceAndsink

自定义sourceAndsink

作者: 史鸿福 | 来源:发表于2019-12-22 20:40 被阅读0次

    连接kafka获取数据

    //flink流处理
    StreamExecutionEnvironment exe = StreamExecutionEnvironment.getExecutionEnvironment();
            //将Properties创建出来
            Properties properties = new Properties();
            //将配置信息set到Properties
            properties.setProperty("bootstrap.servers","node4:9092");
            properties.setProperty("group.id","1");
            //使用FlinkKafkaConsumer011(自己的kafka版本[参数](z'ji't'y))
            FlinkKafkaConsumer011<String> jk = new FlinkKafkaConsumer011<>("jk", new SimpleStringSchema(),properties);
            //
            DataStreamSource<String> stringDataStreamSource = exe.addSource(jk);
    

    MySQLsource

                          使用RichSourceFunction自定义source
    public class MySqlSource extends RichSourceFunction<Tuple4<String, String, Double, Long>> {
    
        Connection conn;
        Statement stat;
        boolean flag = true;
    
    
        @Override
        //open内写连接方法
        public void open(Configuration parameters) throws Exception {
            //类反射mysqlDriver
            Class.forName("com.mysql.jdbc.Driver");
            //mysql的连接方式用户密码
            conn = DriverManager.getConnection("jdbc:mysql://node4:3306/1704e", "root", "123456");
            stat = conn.createStatement();
        }
    
        @Override
        //run内写具体的逻辑
        public void run(SourceContext<Tuple4<String, String, Double, Long>> ctx) throws Exception {
            while (flag) {
                ResultSet resultSet = stat.executeQuery("select * from t_dev where dev_state = 0");
                StringBuffer sb = new StringBuffer("(");
                int count = 0;
                while (resultSet.next()){
                    count ++;
                    long id = resultSet.getLong(1);
                    String devId = resultSet.getString(2);
                    String metric = resultSet.getString(3);
                    double value = resultSet.getDouble(4);
                    long timestamp = resultSet.getLong(5);
                    sb.append(id+",");
                    ctx.collect(Tuple4.of(devId, metric, value,timestamp));
                }
                String ids = sb.toString();
                ids = ids.substring(0, ids.length() - 1);
                ids = ids + ")";
                if(count != 0){
                    String updateSql = "update t_dev set dev_state = 1 where id in "+ids;
                    System.out.println(updateSql);
                    stat.execute(updateSql);
                }
                Thread.sleep(10000);
            }
        }
    
        @Override
        public void cancel() {
            flag = false;
        }
    
        @Override
        public void close() throws Exception {
            if(conn != null){
                conn.close();
            }
        }
    }
    

    HBaseSink

                              //继承RichSinkFunction
    public class HBaseSink extends RichSinkFunction<Tuple6<String, String, Double, Double, Double, Long>> {
    
        private org.apache.hadoop.conf.Configuration conf;
        private Connection conn;
        private Table table;
        private SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmm");
    
    
        @Override
        //open内写连接方法
        public void open(Configuration parameters) throws Exception {
            //创建Hbase连接
            conf = HBaseConfiguration.create();
            //配置信息
            conf.set("hbase.zookeeper.quorum","node4:2181");
            System.out.println("long...........");
            conn = ConnectionFactory.createConnection(conf);
            System.out.println("ok....................");
            //获取表
            table = conn.getTable(TableName.valueOf("ns1:t_dev_data"));
        }
    
        @Override
        //invoke 具有逻辑实现
        public void invoke(Tuple6<String, String, Double, Double, Double, Long> value, Context context) throws Exception {
            String format = sdf.format(value.f5);
            String rowKeyStr = value.f0 + value.f1 + format;
    
            Put put = new Put(Bytes.toBytes(rowKeyStr));
            put.addColumn(Bytes.toBytes("f1"),Bytes.toBytes("max"),Bytes.toBytes(value.f2));
            put.addColumn(Bytes.toBytes("f1"),Bytes.toBytes("min"),Bytes.toBytes(value.f3));
            put.addColumn(Bytes.toBytes("f1"),Bytes.toBytes("avg"),Bytes.toBytes(value.f4));
    
            table.put(put);
    
        }
    
        @Override
        public void close() throws Exception {
            if(table != null) {
                table.close();
            }
            if(conn != null){
                conn.close();
            }
        }
    }
    

    相关文章

      网友评论

          本文标题:自定义sourceAndsink

          本文链接:https://www.haomeiwen.com/subject/qctanctx.html