美文网首页笔记本📒
java实现JavaPairDStream写入hbase

java实现JavaPairDStream写入hbase

作者: 涓涓自然卷 | 来源:发表于2020-03-04 18:41 被阅读0次
    热爱生活.png

    spark整合hbase

    • 1、添加pom依赖
            <!-- hbase -->
            <dependency>
                <groupId>org.apache.hbase</groupId>
                <artifactId>hbase</artifactId>
                <version>${hbase.version}</version>
                <type>pom</type>
            </dependency>
    
            <!-- /hbase-client -->
            <dependency>
                <groupId>org.apache.hbase</groupId>
                <artifactId>hbase-client</artifactId>
                <version>${hbase.version}</version>
            </dependency>
            <!-- /hbase-server -->
            <dependency>
                <groupId>org.apache.hbase</groupId>
                <artifactId>hbase-server</artifactId>
                <version>${hbase.version}</version>
            </dependency>
    
            <!-- /hbase-common -->
            <dependency>
                <groupId>org.apache.hbase</groupId>
                <artifactId>hbase-common</artifactId>
                <version>${hbase.version}</version>
            </dependency>
    
    

    -2、主程序代码:

    public class Demo04ReadFromKafkaToHbaseOk {
        private static final Logger logger = LoggerFactory.getLogger(Demo04ReadFromKafkaToHbaseOk.class);
    
        public static void main(String[] args) {
            // 构建SparkStreaming上下文
            SparkConf conf = new SparkConf()
                    .setAppName("Demo04ReadFromKafkaToHbaseOk")
                    .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
                    .registerKryoClasses(new Class[]{Demo04ReadFromKafkaToHbaseOk.class})
                    .set("spark.kryoserializer.buffer.mb", "256")
                    .set("spark.kryoserializer.buffer.max", "512");
    
            SparkUtils.setMaster(conf);
    
            // 绑定sc参数,并设置循环取数时间间隔为5s
            JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));
    //checkpoint目录
    //        jssc.checkpoint(ConfigurationManager.getProperty(Constants.STREAMING_CHECKPOINT_DIR));
            jssc.checkpoint("./streaming-checkpoint");
    
            // 构建kafka参数map
            // 主要放置的是连接的kafka集群的地址(broker集群的地址列表)
            Map<String, Object> kafkaParams = new HashMap<>();
            kafkaParams.put("bootstrap.servers", ConfigurationManager.getProperty(Constants.KAFKA_BOOTSTRAP_SERVERS));
            kafkaParams.put("key.deserializer", StringDeserializer.class);
            kafkaParams.put("value.deserializer", StringDeserializer.class);
            kafkaParams.put("group.id", ConfigurationManager.getProperty(Constants.GROUP_ID));
            kafkaParams.put("auto.offset.reset", "latest");
            //如果true,consumer定期地往zookeeper写入每个分区的offset
            kafkaParams.put("enable.auto.commit", false);
    
            // 构建topic set
            String kafkaTopics = ConfigurationManager.getProperty(Constants.KAFKA_TOPICS);
            String[] kafkaTopicsSplited = kafkaTopics.split(",");
    
            Collection<String> topics = new HashSet<>();
            for (String kafkaTopic : kafkaTopicsSplited) {
                topics.add(kafkaTopic);
            }
    
            try {
                // 获取kafka的数据
                final JavaInputDStream<ConsumerRecord<String, String>> stream =
                        KafkaUtils.createDirectStream(
                                jssc,
                                LocationStrategies.PreferConsistent(),
                                ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
                        );
    
    
                // 获取words
                JavaDStream<String> words = getWords(stream);
    //            words.print();
    
                //获取word,1格式数据
                JavaPairDStream<String, Integer> wordsAndOne = getWordsAndOne(words);
    
                //聚合本次5s的拉取的数据
                final JavaPairDStream<String, Integer> wordsCount = getWordsCount(wordsAndOne);
    //            wordsCount.print();
    
                //历史累计 60秒checkpoint一次
                DStream<Tuple2<String, Integer>> result = getResults60s(wordsAndOne);
    
    //            logger.info("==result print==");
    //            result.print();
    
                //开窗函数 5秒计算一次 计算前15秒的数据聚合
                JavaPairDStream<String, Integer> result2 = getResults15s(wordsAndOne);
    
                logger.info("==result2 print==");
                result2.print();
    
                // 写入Hbase
                writeToHbase(result2);
    
                jssc.start();
                jssc.awaitTermination();
                jssc.close();
    
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 获取数据
         *
         * @param stream : JavaInputDStream
         * @return :
         */
        private static JavaDStream<String> getWords(JavaInputDStream<ConsumerRecord<String, String>> stream) {
            return stream.flatMap(new FlatMapFunction<ConsumerRecord<String, String>, String>() {
                @Override
                public Iterator<String> call(ConsumerRecord<String, String> s) throws Exception {
                    List<String> list = new ArrayList<>();
                    //todo 获取到kafka的每条数据 进行操作
                    long offset = s.offset();
                    logger.info("ConsumerRecord<String, String> s =" + s);
                    logger.info("offset = " + offset); // offset = 180
                    logger.info("*** " + s.value() + " ***"); // *** kafka ***
                    list.add(s.value());
                    return list.iterator();
                }
            });
        }
    
        /**
         * 映射成 Tuple2<>(wordSplit, 1)格式
         *
         * @param words :JavaDStream<String>
         * @return :JavaPairDStream<String, Integer>
         */
        private static JavaPairDStream<String, Integer> getWordsAndOne(JavaDStream<String> words) {
            return (JavaPairDStream<String, Integer>) words.mapToPair(new PairFunction<String, String, Integer>() {
                @Override
                public Tuple2<String, Integer> call(String word) throws Exception {
                    String wordSplit = "未知"; //
                    if (word.contains(" ")) {
                        String[] s = word.split(" ");
                        for (int i = 0; i < s.length; i++) {
                            wordSplit = s[i];
                        }
                    } else {
                        wordSplit = word;
                    }
    
    //                    return new Tuple2<>(word, 1);
                    return new Tuple2<>(wordSplit, 1);
                }
            });
        }
    
        /**
         * 聚合本次5s的拉取的数据
         *
         * @param wordsAndOne :
         * @return :
         */
        private static JavaPairDStream<String, Integer> getWordsCount(JavaPairDStream<String, Integer> wordsAndOne) {
            return wordsAndOne.reduceByKey((Function2<Integer, Integer, Integer>) (a, b) -> a + b);
    
        }
    
        /**
         * 历史累计 60秒checkpoint一次
         *
         * @param wordsAndOne :
         * @return :
         */
        private static DStream<Tuple2<String, Integer>> getResults60s(JavaPairDStream<String, Integer> wordsAndOne) {
            return wordsAndOne.updateStateByKey(((Function2<List<Integer>, org.apache.spark.api.java.Optional<Integer>, org.apache.spark.api.java.Optional<Integer>>) (values, state) -> {
                Integer updatedValue = 0;
                if (state.isPresent()) {
                    updatedValue = Integer.parseInt(state.get().toString());
                    logger.info("updatedValue = ", updatedValue);
                }
                for (Integer value : values) {
                    updatedValue += value;
                }
                return Optional.of(updatedValue);
            })).checkpoint(Durations.seconds(30));
        }
    
        /**
         * 开窗函数 15 秒计算一次 计算前15秒的数据聚合
         *
         * @param wordsAndOne :
         * @return :
         */
        private static JavaPairDStream<String, Integer> getResults15s(JavaPairDStream<String, Integer> wordsAndOne) {
            return wordsAndOne.reduceByKeyAndWindow(new Function2<Integer, Integer, Integer>() {
                @Override
                public Integer call(Integer v1, Integer v2) throws Exception {
                    return v1 + v2;
                }
            }, Durations.seconds(15), Durations.seconds(15));
        }
    
        /**
         * 写入hbase
         *
         * @param result2 : JavaPairDStream<String, Integer>
         */
        private static void writeToHbase(JavaPairDStream<String, Integer> result2) {
            result2.foreachRDD(new VoidFunction2<JavaPairRDD<String, Integer>, Time>() {
                @Override
                public void call(JavaPairRDD<String, Integer> v1, Time v2) throws Exception {
                    v1.foreach(new VoidFunction<Tuple2<String, Integer>>() {
                        @Override
                        public void call(Tuple2<String, Integer> tuple2) throws Exception {
    
                            HBasePoolConnection.getConnection();
    
                            String s1 = tuple2._1;
                            String s2 = tuple2._2.toString();
    
                            String rowKey = s1 + "00000";
    
                            TableName tableName = TableName.valueOf("wxj-test4");
    
                            Table table = HBasePoolConnection.getConnection().getTable(tableName);
    
                            Put put = new Put(Bytes.toBytes(rowKey));
                            put.addColumn(Bytes.toBytes("word")
                                    , Bytes.toBytes("name"), Bytes.toBytes(s1));
                            put.addColumn(Bytes.toBytes("count")
                                    , Bytes.toBytes("number"), Bytes.toBytes(s2));
    
                            table.put(put);
                            logger.info("write to hbase success!");
    
                        }
                    });
                }
            });
        }
    
    }
    
    • 3、SparkUtils代码:
    public class SparkUtils {
        /**
         * 根据当前是否本地测试的配置
         * 决定,如何设置SparkConf的master
         */
        public static void setMaster(SparkConf conf) {
            boolean local = ConfigurationManager.getBoolean(Constants.SPARK_LOCAL);
            if(local) {
                conf.setMaster("local");
            }
        }
    }
    
    
    • 4、HBasePoolConnection方法:
    /**
     * Hbase连接池
     * 
     * @author :
     * @date :
     *
     */
    public class HBasePoolConnection {
        private HBasePoolConnection() {
        }
    
        // 连接池
        private static Connection connection = null;
        // 配置文件
        static Configuration hbaseConfiguration = HBaseConfiguration.create();
    
        public static Connection getConnection() {
            if (connection == null) {
                ExecutorService pool = Executors.newFixedThreadPool(10);// 建立一个固定大小的线程池
                hbaseConfiguration.addResource("hbase-site.xml");
                try {
                    connection = ConnectionFactory.createConnection(hbaseConfiguration, pool);// 创建连接时,拿到配置文件和线程池
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            return connection;
        }
    
    }
    
    • 5、配置管理组件
    public class ConfigurationManager {
        private static Properties prop = new Properties();
    
        static {
            try {
                InputStream in = ConfigurationManager.class
                        .getClassLoader().getResourceAsStream("application.properties");
                prop.load(in);
    
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 获得属性
         *
         * @param key :
         * @return :
         */
        public static String getProperty(String key) {
            return prop.getProperty(key);
        }
    
        /**
         * 获取整数类型的配置项
         *
         * @param key :
         * @return value
         */
        public static Integer getInteger(String key) {
            String value = getProperty(key);
            try {
                return Integer.valueOf(value);
            } catch (Exception e) {
                e.printStackTrace();
            }
            return 0;
        }
    
        /**
         * 获取布尔类型的配置项
         *
         * @param key :
         * @return :value
         */
        public static Boolean getBoolean(String key) {
            String value = getProperty(key);
            try {
                return Boolean.valueOf(value);
            } catch (Exception e) {
                e.printStackTrace();
            }
            return false;
        }
    
        /**
         * 获取Long类型的配置项
         *
         * @param key :
         * @return :
         */
        public static Long getLong(String key) {
            String value = getProperty(key);
            try {
                return Long.valueOf(value);
            } catch (Exception e) {
                e.printStackTrace();
            }
            return 0L;
        }
    
    • 6、常量接口
    /**
     * 常量接口
     */
    public interface Constants {
    
        // * 项目配置相关的常量
        // Cluster Mode Config :
        String SPARK_LOCAL = "spark.local";
    
        // * Spark作业相关的常量
        String SPARK_SERIALIZER = "spark.serializer"; // 序列化类型
    
        // * KAFKA
        String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers";
        String GROUP_ID = "group.id";
        String KAFKA_TOPICS = "kafka.topics";
        String STREAMING_CHECKPOINT_DIR = "streaming.checkpoint.dir";
    
        // * Hbase
        String HBASE_ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum";
        String HBASE_ZOOKEEPER_PROPERTY_CLIENTPORT = "hbase.zookeeper.property.clientPort";
    
    }
    
    • 7、application.properties
    # Spark Config #
    spark.local = ${spark.local}
    spark.serializer = ${spark.serializer}
    # KAFKA
    kafka.bootstrap.servers = ${bootstrap.servers}
    group.id = ${group.id}
    kafka.topics = ${kafka.topics}
    streaming.checkpoint.dir = ${streaming.checkpoint.dir}
    # HBASE
    hbase.zookeeper.quorum = ${hbase.zookeeper.quorum}
    hbase.zookeeper.property.clientPort = ${hbase.zookeeper.property.clientPort}
    # END Config #
    myvalue=${myvalue}
    
    • 8、dev.properties
    # Spark Config #
    spark.local = true
    spark.serializer = org.apache.spark.serializer.KryoSerializer
    
    # KAFKA
    bootstrap.servers=×××.×××.×××.×××:9092,×××.×××.×××.×××:9092,×××.×××.×××.×××:9092
    #bootstrap.servers=localhost:9092
    
    kafka.topics=test-wxj1
    #kafka.topics=myFlumeKafka
    group.id=wxj
    streaming.checkpoint.dir=hdfs://×××.×××.×××.×××:8022/streaming-checkpoint
    # HBASE
    hbase.zookeeper.quorum=cdh1,cdh2,cdh3
    hbase.zookeeper.property.clientPort=2181
    # end config #
    myvalue=dev
    

    好好吃饭好好睡觉.png

    相关文章

      网友评论

        本文标题:java实现JavaPairDStream写入hbase

        本文链接:https://www.haomeiwen.com/subject/ylublhtx.html