美文网首页
Storm-kafka项目

Storm-kafka项目

作者: 小左伯爵 | 来源:发表于2021-01-17 22:02 被阅读0次

    1.项目架构

    未命名文件.jpg
    • 1.生产数据发送到kafka,数据格式为:
      100 29448-000005 2021-01-17 21:05:21 2
      序号 小区编号 时间 掉话代码(0正常,1掉话,2断话)
    • 2.storm作为消费者从kafka拉取数据作为spout的数据源
    • 3.经过storm处理后存放到hbase中

    2.项目文件结构

    2021-01-17_212322.jpg

    2.1pom.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>cn.itbin</groupId>
        <artifactId>storm-kafka</artifactId>
        <version>1.0-SNAPSHOT</version>
      
        <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <maven.compiler.source>1.8</maven.compiler.source>
            <maven.compiler.target>1.8</maven.compiler.target>
            <!-- 跳过测试 -->
            <skipTests>true</skipTests>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.apache.storm</groupId>
                <artifactId>storm-core</artifactId>
                <version>2.2.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.storm</groupId>
                <artifactId>storm-kafka-client</artifactId>
                <version>2.2.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>2.6.0</version>
            </dependency>
            <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-client -->
            <dependency>
                <groupId>org.apache.hbase</groupId>
                <artifactId>hbase-client</artifactId>
                <version>2.1.0</version>
            </dependency>
        </dependencies>
    </project>
    

    hbase-site.xml

    <?xml version="1.0"?>
    <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
    <!--
    /*
     * Licensed to the Apache Software Foundation (ASF) under one
     * or more contributor license agreements.  See the NOTICE file
     * distributed with this work for additional information
     * regarding copyright ownership.  The ASF licenses this file
     * to you under the Apache License, Version 2.0 (the
     * "License"); you may not use this file except in compliance
     * with the License.  You may obtain a copy of the License at
     *
     *     http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    -->
    <configuration>
    <property>
        <name>hbase.rootdir</name>
        <value>hdfs://mycluster/hbase</value>
        </property>
        <property>
        <name>hbase.cluster.distributed</name>
        <value>true</value>
        </property>
        <property>
         <name>hbase.zookeeper.quorum</name>
         <value>node02,node03,node04</value>
        </property>
    </configuration>
    
    

    hdfs-site.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
    <!--
      Licensed under the Apache License, Version 2.0 (the "License");
      you may not use this file except in compliance with the License.
      You may obtain a copy of the License at
    
        http://www.apache.org/licenses/LICENSE-2.0
    
      Unless required by applicable law or agreed to in writing, software
      distributed under the License is distributed on an "AS IS" BASIS,
      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      See the License for the specific language governing permissions and
      limitations under the License. See accompanying LICENSE file.
    -->
    
    <!-- Put site-specific property overrides in this file. -->
    
    <configuration>
    <property>
            <name>dfs.replication</name>
            <value>3</value>
    </property>
    <property>
      <name>dfs.nameservices</name>
      <value>mycluster</value>
    </property>
    <property>
      <name>dfs.ha.namenodes.mycluster</name>
      <value>nn1,nn2,nn3</value>
    </property>
    #rpc通讯
    <property>
      <name>dfs.namenode.rpc-address.mycluster.nn1</name>
      <value>node01:8020</value>
    </property>
    <property>
      <name>dfs.namenode.rpc-address.mycluster.nn2</name>
      <value>node02:8020</value>
    </property>
    <property>
      <name>dfs.namenode.rpc-address.mycluster.nn3</name>
      <value>node03:8020</value>
    </property>
    #http通讯
    <property>
      <name>dfs.namenode.http-address.mycluster.nn1</name>
      <value>node01:9870</value>
    </property>
    <property>
      <name>dfs.namenode.http-address.mycluster.nn2</name>
      <value>node02:9870</value>
    </property>
    <property>
      <name>dfs.namenode.http-address.mycluster.nn3</name>
      <value>node03:9870</value>
    </property>
    <property>
      <name>dfs.namenode.shared.edits.dir</name>
      <value>qjournal://node01:8485;node02:8485;node03:8485/mycluster</value>
    </property>
    <property>
      <name>dfs.client.failover.proxy.provider.mycluster</name>
      <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
    </property>
    <property>
      <name>dfs.ha.fencing.methods</name>
      <value>sshfence</value>
    </property>
    
    <property>
       <name>dfs.ha.fencing.ssh.private-key-files</name>
       <value>/root/.ssh/id_rsa</value>
    </property>
    <property>
      <name>dfs.journalnode.edits.dir</name>
      <value>/opt/hadoop/full/data</value>
    </property>
    <property>
       <name>dfs.ha.automatic-failover.enabled</name>
       <value>true</value>
    </property>
    <property>
            <name>dfs.permissions</name>
            <value>false</value>
    </property>
    </configuration>
    
    

    3.生产数据并发送到kafka

    3.1启动kafka集群

    #1.首先启动zookeeper集群
    [root@node02 ~]# zkServer.sh start
    ZooKeeper JMX enabled by default
    Using config: /opt/zookeeper/apache-zookeeper-3.6.2/bin/../conf/zoo.cfg
    Starting zookeeper ... STARTED
    [root@node02 ~]# jps
    15724 Jps
    15677 QuorumPeerMain
    
    [root@node03 ~]# zkServer.sh start
    ZooKeeper JMX enabled by default
    Using config: /opt/zookeeper/apache-zookeeper-3.6.2/bin/../conf/zoo.cfg
    Starting zookeeper ... STARTED
    [root@node03 ~]# jps
    11398 Jps
    11351 QuorumPeerMain
    
    [root@node04 ~]# zkServer.sh start
    ZooKeeper JMX enabled by default
    Using config: /opt/zookeeper/apache-zookeeper-3.6.2/bin/../conf/zoo.cfg
    Starting zookeeper ... STARTED
    [root@node04 ~]# jps
    6138 QuorumPeerMain
    6191 Jps
    
    #2.然后启动kafka集群
    [root@node02 ~]# kafka-server-start.sh /opt/kafka/kafka_2.13-2.6.0/config/server.properties
    [root@node03 ~]# kafka-server-start.sh /opt/kafka/kafka_2.13-2.6.0/config/server.properties
    [root@node04 ~]# kafka-server-start.sh /opt/kafka/kafka_2.13-2.6.0/config/server.properties
    
    [root@node02 ~]# jps
    16164 Jps
    15739 Kafka
    15677 QuorumPeerMain
    
    [root@node03 ~]# jps
    11414 Kafka
    11351 QuorumPeerMain
    11835 Jps
    
    [root@node04 ~]# jps
    6629 Jps
    6138 QuorumPeerMain
    6207 Kafka
    

    3.2生产数据的代码

    package cn.itbin.kafka;
    
    import cn.itbin.constants.Constant;
    import cn.itbin.tools.DateFmt;
    import org.apache.kafka.clients.producer.*;
    import org.apache.kafka.common.serialization.StringSerializer;
    import org.apache.kafka.common.utils.Utils;
    
    import java.util.Properties;
    import java.util.Random;
    
    /**
     * @author chenxiaogao
     * @className CellProducer
     * @description TODO
     * @date 2021/1/15
     **/
    public class CellProducer extends Thread {
    
        private final Producer<String, String> producer;
        private final String topic;
        private final Properties props = new Properties();
    
    
        public CellProducer(String topic) {
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Constant.BROKER_LIST);
            producer = new KafkaProducer<>(props);
            this.topic = topic;
        }
    
        @Override
        public void run() {
            Random random = new Random();
            String[] cell_num = { "29448-37062", "29448-51331", "29448-51331", "29448-51333", "29448-51343" };
            // 正常0; 掉话1(信号断断续续); 断话2(完全断开)
            String[] drop_num = { "0", "1", "2" };
            int i = 0;
            while (true) {
                i++;
                String testStr = String.format("%06d", random.nextInt(10) + 1);
                // messageStr: 2494 29448-000003 2016-01-05 10:25:17 1
                String messageStr = i + "\t" + ("29448-" + testStr) + "\t" + DateFmt.getCountDate(null, DateFmt.date_long)
                        + "\t" + drop_num[random.nextInt(drop_num.length)];
                System.out.println("product:" + messageStr);
                producer.send(new ProducerRecord<String, String>(topic, messageStr));
                producer.flush();
                Utils.sleep(5000);
            }
        }
    
        public static void main(String[] args) {
            CellProducer test = new CellProducer(Constant.TOPIC_NAME);
            test.start();
        }
    
    
    
    }
    
    
    

    3.2.1Constant

    package cn.itbin.constants;
    
    /**
     * @author chenxiaogao
     * @className Constant
     * @description TODO
     * @date 2021/1/17
     **/
    public class Constant {
    
        public static final String BROKER_LIST = "node02:9092,node03:9092,node04:9092";
    
        public static final String TOPIC_NAME = "test";
    }
    

    3.3测试生产数据是否正确

    #启动一个kafka消费者,监听test主题
    [root@node02 ~]# kafka-console-consumer.sh --bootstrap-server node02:9092,node03:9092,node04:9092 --from-beginning --topic test  
    
    #2.运行3.2CellProducer的代码
    #3.kafka消费端显示,表示正常
    1       29448-000010    2021-01-17 21:45:48     2
    2       29448-000008    2021-01-17 21:45:54     0
    3       29448-000008    2021-01-17 21:45:59     1
    4       29448-000010    2021-01-17 21:46:04     2
    5       29448-000009    2021-01-17 21:46:09     0
    6       29448-000007    2021-01-17 21:46:14     1
    7       29448-000003    2021-01-17 21:46:19     1
    8       29448-000001    2021-01-17 21:46:24     2
    

    4.storm从kafka拉取数据

    SkcTopo

    package cn.itbin.skc;
    
    import cn.itbin.blot.CellDaoltBolt;
    import cn.itbin.blot.CellFilterBolt;
    import cn.itbin.constants.Constant;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.StormSubmitter;
    import org.apache.storm.kafka.spout.KafkaSpout;
    import org.apache.storm.kafka.spout.KafkaSpoutConfig;
    import org.apache.storm.topology.TopologyBuilder;
    import org.apache.storm.tuple.Fields;
    
    /**
     * @author chenxiaogao
     * @className SkcTopo
     * @description TODO
     * @date 2021/1/16
     **/
    public class SkcTopo {
    
        public static void main(String[] args) throws Exception {
    
            final TopologyBuilder builder = new TopologyBuilder();
            builder.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig(Constant.BROKER_LIST, Constant.TOPIC_NAME)), 3);
            builder.setBolt("cell_filter", new CellFilterBolt(),3).shuffleGrouping("kafka_spout");
            builder.setBolt("cell_dao", new CellDaoltBolt(),5).fieldsGrouping("cell_filter", new Fields("cell_num"));
    
            // 如果外部传参 cluster 则代表线上环境启动,否则代表本地启动
            if (args.length > 0 && args[0].equals("cluster")) {
                try {
                    StormSubmitter.submitTopology("ClusterReadingFromKafkaApp", new Config(), builder.createTopology());
                } catch (Exception e) {
                    e.printStackTrace();
                }
            } else {
                LocalCluster cluster = new LocalCluster();
                cluster.submitTopology("LocalReadingFromKafkaApp",
                        new Config(), builder.createTopology());
            }
        }
    
        private static KafkaSpoutConfig<String, String> getKafkaSpoutConfig(String bootstrapServers, String topic) {
            return KafkaSpoutConfig.builder(bootstrapServers, topic)
                    // 除了分组 ID,以下配置都是可选的。分组 ID 必须指定,否则会抛出 InvalidGroupIdException 异常
                    .setProp(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group")
                    // 定时提交偏移量的时间间隔,默认是 15s
                    .setOffsetCommitPeriodMs(10_000)
                    .build();
        }
    }
    
    

    CellFilterBolt

    package cn.itbin.blot;
    
    import cn.itbin.tools.DateFmt;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.BasicOutputCollector;
    import org.apache.storm.topology.IBasicBolt;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.tuple.Values;
    
    import java.util.Arrays;
    import java.util.List;
    import java.util.Map;
    
    
    public class CellFilterBolt implements IBasicBolt {
    
        /**
         * 
         */
        private static final long serialVersionUID = 1L;
    
        @Override
        public void execute(Tuple input, BasicOutputCollector collector) {
    
            List<Object> values = input.getValues();
            String value = null;
            if (values.size() > 4) {
                value = (String) values.get(4);
            }
            try {
                if (value != null) {
                    String arr[] = value.split("\\t");
                    System.out.println("arr = " + arr.length);
                    String s = Arrays.toString(arr);
                    System.out.println("s = " + s);
                    // messageStr格式:消息编号\t小区编号\t时间\t状态
                    // 例:   2494  29448-000003  2016-01-05 10:25:17  1
                    // DateFmt.date_short是yyyy-MM-dd,把2016-01-05 10:25:17格式化2016-01-05
                    // 发出的数据格式: 时间, 小区编号, 掉话状态 
                    collector.emit(new Values(DateFmt.getCountDate(arr[2], DateFmt.date_short), arr[1], arr[3]));
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("date", "cell_num", "drop_num"));
        }
    
        @Override
        public Map<String, Object> getComponentConfiguration() {
            return null;
        }
    
        @Override
        public void cleanup() {
            // TODO Auto-generated method stub
        }
    
        @Override
        public void prepare(Map map, TopologyContext arg1) {
            // TODO Auto-generated method stub
        }
    
    }
    
    

    CellDaoltBolt

    package cn.itbin.blot;
    
    import cn.itbin.hbase.HBaseDAO;
    import cn.itbin.hbase.impl.HBaseDAOImpl;
    import cn.itbin.tools.DateFmt;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.BasicOutputCollector;
    import org.apache.storm.topology.IBasicBolt;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.tuple.Tuple;
    
    import java.util.Calendar;
    import java.util.HashMap;
    import java.util.Iterator;
    import java.util.Map;
    import java.util.Set;
    
    
    public class CellDaoltBolt implements IBasicBolt {
    
        private static final long serialVersionUID = 1L;
    
        HBaseDAO dao = null;
    
        long beginTime = System.currentTimeMillis();
        long endTime = 0;
    
        // 通话总数
        Map<String, Long> cellCountMap = new HashMap<String, Long>();
        // 掉话数
        Map<String, Long> cellDropCountMap = new HashMap<String, Long>();
    
        String todayStr = null;
    
        @Override
        public void execute(Tuple input, BasicOutputCollector collector) {
            // input为2016-01-05,29448-000003,1
            if (input != null) {
                String dateStr = input.getString(0);
                String cellNum = input.getString(1);
                String dropNum = input.getString(2);
    
                // 判断是否是当天,不是当天 就清除map 避免内存过大
                // 基站数目 大概5-10万(北京市)
                // http://bbs.c114.net/thread-793707-1-1.html
                todayStr = DateFmt.getCountDate(null, DateFmt.date_short);
    
                // 跨天的处理,大于当天的数据来了,就清空两个map
                // 思考: 如果程序崩溃了,map清零了,如果不出问题,一直做同一个cellid的累加
                // 这个逻辑不好,应该换成一个线程定期的清除map数据,而不是这里判断
                if (todayStr != dateStr && todayStr.compareTo(dateStr) < 0) {
                    cellCountMap.clear();
                    cellDropCountMap.clear();
                }
    
                // 当前cellid的通话数统计
                Long cellAll = cellCountMap.get(cellNum);
                if (cellAll == null) {
                    cellAll = 0L;
                }
                cellCountMap.put(cellNum, ++cellAll);
    
                // 掉话数统计,大于0就是掉话
                Long cellDropAll = cellDropCountMap.get(cellNum);
                int t = Integer.parseInt(dropNum);
                if (t > 0) {
                    if (cellDropAll == null) {
                        cellDropAll = 0L;
                    }
                    cellDropCountMap.put(cellNum, ++cellDropAll);
                }
    
                // 1.定时写库.为了防止写库过于频繁 这里间隔一段时间写一次
                // 2.也可以检测map里面数据size 写数据到 hbase
                // 3.自己可以设计一些思路 ,当然 采用redis 也不错
                // 4.采用tick定时存储也是一个思路
                endTime = System.currentTimeMillis();
    
                // flume+kafka 集成
                // 当前掉话数
                // 1.每小时掉话数目
                // 2.每小时 通话数据
                // 3.每小时 掉话率
                // 4.昨天的历史轨迹
                // 5.同比去年今天的轨迹(如果有数据)
    
                // hbase 按列存储的数据()
                // 10万
                // rowkey cellnum+ day
                if (endTime - beginTime >= 5000) {
                    // 5s 写一次库
                    if (cellCountMap.size() > 0 && cellDropCountMap.size() > 0) {
                        // x轴,相对于小时的偏移量,格式为 时:分,数值 数值是时间的偏移
                        String arr[] = this.getAxsi();
    
                        // 当前日期
                        String today = DateFmt.getCountDate(null, DateFmt.date_short);
                        // 当前分钟
                        String today_minute = DateFmt.getCountDate(null, DateFmt.date_minute);
    
                        // cellCountMap为通话数据的map
                        Set<String> keys = cellCountMap.keySet();
                        for (Iterator iterator = keys.iterator(); iterator.hasNext();) {
                            
                            String key_cellnum = (String) iterator.next();
                            
                            System.out.println("key_cellnum: " + key_cellnum + "***" 
                                    + arr[0] + "---" 
                                    + arr[1] + "---"
                                    + cellCountMap.get(key_cellnum) + "----" 
                                    + cellDropCountMap.get(key_cellnum));
                            
                            //写入HBase数据,样例: {time_title:"10:45",xAxis:10.759722222222223,call_num:140,call_drop_num:91}
                            
                            dao.insert("cell_monitor_table", 
                                    key_cellnum + "_" + today, 
                                    "cf", 
                                    new String[] { today_minute },
                                    new String[] { "{" + "time_title:\"" + arr[0] + "\",xAxis:" + arr[1] + ",call_num:"
                                            + cellCountMap.get(key_cellnum) + ",call_drop_num:" + cellDropCountMap.get(key_cellnum) + "}" }
                                    );
                        }
                    }
                    // 需要重置初始时间
                    beginTime = System.currentTimeMillis();
                }
            }
        }
    
        @Override
        public void prepare(Map stormConf, TopologyContext context) {
            // TODO Auto-generated method stub
            dao = new HBaseDAOImpl();
            Calendar calendar = Calendar.getInstance();
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            // TODO Auto-generated method stub
        }
    
        @Override
        public Map<String, Object> getComponentConfiguration() {
            // TODO Auto-generated method stub
            return null;
        }
    
        // 获取X坐标,就是当前时间的坐标,小时是单位
        public String[] getAxsi() {
            // 取当前时间
            Calendar c = Calendar.getInstance();
            int hour = c.get(Calendar.HOUR_OF_DAY);
            int minute = c.get(Calendar.MINUTE);
            int sec = c.get(Calendar.SECOND);
            // 总秒数
            int curSecNum = hour * 3600 + minute * 60 + sec;
    
            // (12*3600+30*60+0)/3600=12.5
            Double xValue = (double) curSecNum / 3600;
            // 时:分,数值 数值是时间的偏移
            String[] end = { hour + ":" + minute, xValue.toString() };
            return end;
        }
    
        @Override
        public void cleanup() {
        }
    }
    

    5.保存数据到hbase

    #1.启动hadoop集群
    [root@node01 ~]# start-all.sh
    WARNING: HADOOP_SECURE_DN_USER has been replaced by HDFS_DATANODE_SECURE_USER. Using value of HADOOP_SECURE_DN_USER.
    Starting namenodes on [node01 node02 node03]
    Starting datanodes
    Starting journal nodes [node01 node02 node03]
    Starting ZK Failover Controllers on NN hosts [node01 node02 node03]
    Starting resourcemanagers on [ node01 node02 node03]
    Starting nodemanagers
    [root@node01 ~]# jps
    26368 NameNode
    26854 DFSZKFailoverController
    27639 Jps
    27272 ResourceManager
    26652 JournalNode
    
    #2.启动hbase集群
    [root@node01 ~]# start-hbase.sh 
    node04: running regionserver, logging to /opt/hbase/hbase-2.3.3/bin/../logs/hbase-root-regionserver-node04.out
    node03: running regionserver, logging to /opt/hbase/hbase-2.3.3/bin/../logs/hbase-root-regionserver-node03.out
    node02: running regionserver, logging to /opt/hbase/hbase-2.3.3/bin/../logs/hbase-root-regionserver-node02.out
    node02: running master, logging to /opt/hbase/hbase-2.3.3/bin/../logs/hbase-root-master-node02.out
    
    [root@node01 ~]# hbase shell
    Version 2.3.3, r3e4bf4bee3a08b25591b9c22fea0518686a7e834, Wed Oct 28 06:36:25 UTC 2020
    Took 0.0005 seconds                                                                                                              
    hbase(main):001:0> status
    1 active master, 1 backup masters, 1 servers, 0 dead, 0.0000 average load
    Took 1.3308 seconds   
    
    正常启动
    
    
    package cn.itbin.hbase;
    
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.client.Result;
    
    import java.util.List;
    
    public interface HBaseDAO {
    
        public void save(Put put, String tableName);
    
        public void insert(String tableName, String rowKey, String family, String quailifer, String value);
    
        public void insert(String tableName, String rowKey, String family, String quailifer[], String value[]);
    
        public void save(List<Put> Put, String tableName);
    
        public Result getOneRow(String tableName, String rowKey);
    
        public List<Result> getRows(String tableName, String rowKey_like);
    
        public List<Result> getRows(String tableName, String rowKeyLike, String cols[]);
    
        public List<Result> getRows(String tableName, String startRow, String stopRow);
    
        public void deleteRecords(String tableName, String rowKeyLike);
    
        public void deleteTable(String tableName);
    
        public void createTable(String tableName,  String columnFamilies);
    }
    
    
    package cn.itbin.hbase.impl;
    
    import cn.itbin.hbase.HBaseDAO;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.hbase.*;
    import org.apache.hadoop.hbase.client.*;
    import org.apache.hadoop.hbase.util.Bytes;
    
    import java.io.IOException;
    import java.util.List;
    
    /**
     * @author chenxiaogao
     * @className HBaseDAOImpl
     * @description TODO
     * @date 2021/1/17
     **/
    public class HBaseDAOImpl implements HBaseDAO {
    
        private static final String TABLE_NAME = "PHONE";
        private static final String CF_DEFAULT = "DEFAULT_COLUMN_PHONE";
        Admin admin = null;
        Connection connection = null;
    
        public HBaseDAOImpl()  {
            try {
                Configuration conf = HBaseConfiguration.create();
                conf.addResource(new Path("D:\\ideawork\\storm-kafka\\src\\main\\resources\\hbase-site.xml"));
                conf.addResource(new Path("D:\\ideawork\\storm-kafka\\src\\main\\resources\\hbase-site.xml"));
                connection = ConnectionFactory.createConnection(conf);
                admin = connection.getAdmin();
            } catch (IOException e) {
                e.printStackTrace();
            }
    
        }
    
        @Override
        public void save(Put put, String tableName) {
    
        }
    
        @Override
        public void insert(String tableName, String rowKey, String family, String quailifer, String value) {
            Table table  = null;
            try {
                 table = connection.getTable(TableName.valueOf(tableName));
                Put put = new Put(rowKey.getBytes());
                put.addColumn(family.getBytes(), quailifer.getBytes(), value.getBytes());
                table.put(put);
            } catch (IOException e) {
                e.printStackTrace();
            }finally {
                try {
                    table.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    
        @Override
        public void insert(String tableName, String rowKey, String family, String[] quailifer, String[] value) {
            Table table = null;
            try {
                table = connection.getTable(TableName.valueOf(tableName));
                Put put = new Put(rowKey.getBytes());
                // 批量添加
                for (int i = 0; i < quailifer.length; i++) {
                    String col = quailifer[i];
                    String val = value[i];
                    put.addColumn(family.getBytes(), col.getBytes(), val.getBytes());
                }
                table.put(put);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                try {
                    table.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
    
        }
    
        @Override
        public void save(List<Put> Put, String tableName) {
    
        }
    
        @Override
        public Result getOneRow(String tableName, String rowKey) {
            return null;
        }
    
        @Override
        public List<Result> getRows(String tableName, String rowKey_like) {
            return null;
        }
    
        @Override
        public List<Result> getRows(String tableName, String rowKeyLike, String[] cols) {
            return null;
        }
    
        @Override
        public List<Result> getRows(String tableName, String startRow, String stopRow) {
            return null;
        }
    
        @Override
        public void deleteRecords(String tableName, String rowKeyLike) {
    
        }
    
        @Override
        public void deleteTable(String tableName) {
    
        }
    
        @Override
        public void createTable(String tableName, String columnFamilies) {
            TableDescriptor td = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName))
                    .setColumnFamily(ColumnFamilyDescriptorBuilder.of(columnFamilies))
                    .build();
            try {
                admin.createTable(td);
            } catch (IOException e) {
                e.printStackTrace();
            }
            System.out.print("Creating table. ");
        }
    }
    
    
    package cn.itbin.tools;
    
    import cn.itbin.hbase.impl.HBaseDAOImpl;
    
    import java.text.ParseException;
    import java.text.SimpleDateFormat;
    import java.util.Calendar;
    import java.util.Date;
    
    public class DateFmt {
    
        public static final String date_long = "yyyy-MM-dd HH:mm:ss";
        public static final String date_short = "yyyy-MM-dd";
        public static final String date_minute = "yyyyMMddHHmm";
    
        public static SimpleDateFormat sdf = new SimpleDateFormat(date_short);
    
    
        public static String getCountDate(String date, String pattern) {
            SimpleDateFormat sdf = new SimpleDateFormat(pattern);
            Calendar cal = Calendar.getInstance();
            if (date != null) {
                try {
                    // 2016-01-05 10:25:17
                    cal.setTime(sdf.parse(date));
                } catch (ParseException e) {
                    e.printStackTrace();
                }
            }
            return sdf.format(cal.getTime());
        }
    
        public static String getCountDate(String date, String pattern, int step) {
            SimpleDateFormat sdf = new SimpleDateFormat(pattern);
            Calendar cal = Calendar.getInstance();
            if (date != null) {
                try {
                    cal.setTime(sdf.parse(date));
                } catch (ParseException e) {
                    e.printStackTrace();
                }
            }
            cal.add(Calendar.DAY_OF_MONTH, step);
            return sdf.format(cal.getTime());
        }
    
        public static Date parseDate(String dateStr) throws Exception {
            return sdf.parse(dateStr);
        }
    
        public static void main(String[] args) throws Exception {
            
        }
    
    }
    
    

    6.测试

    #创建hbase 表
    hbase(main):002:0> create 'cell_monitor_table','cf'
    #启动程序查看数据
    hbase(main):004:0> scan 'cell_monitor_table'
    ROW                               COLUMN+CELL                                                                                    
     01                               column=cf:cf01, timestamp=2021-01-18T00:34:46.235, value=test_value                            
     29448-000001_2021-01-17          column=cf:202101171737, timestamp=2021-01-18T01:37:35.925, value={time_title:"17:37",xAxis:17.6
                                      2638888888889,call_num:4,call_drop_num:4}  
    

    相关文章

      网友评论

          本文标题:Storm-kafka项目

          本文链接:https://www.haomeiwen.com/subject/oayzaktx.html