美文网首页大数据运维程序员
大数据运维问题记录(五)

大数据运维问题记录(五)

作者: 火车飞侠 | 来源:发表于2016-07-28 21:40 被阅读94次

问题描述:集群中原有采集程序从源文件入hbase出现积压,优化修改程序都无济于事,需要赶紧出个方案进行解决

问题解决:集群中的采集程序也有一条线是从源文件入到hdfs的,所以计划以hdfs里的数据为源数据采用mapreduce生成hfile后通过bulkload的方式入hbase避免了原始数据的清洗操作

以下是开发的程序

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsShell;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.mapreduce.SimpleTotalOrderPartitioner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class HFileGenerator {

    public static class HFileMapper extends
            Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue> {
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            String line = value.toString();
            String symbol = "_";
            if ("".equals(line)||null == line) {
                return;
            }
            String[] items = line.split("\\|", -1);
            //根据业务需要组合rowkey
            byte[] row = Bytes.toBytes(items[0]+symbol+items[1]+symbol+items[2]+symbol+items[3]+symbol+items[4]);  
            ImmutableBytesWritable rowkey = new ImmutableBytesWritable(row);
            System.out.println(rowkey);
            KeyValue kv = new KeyValue(row,
                    "f1".getBytes(), "column1".getBytes(),
                    System.currentTimeMillis(), Bytes.toBytes(line));
            if (null != kv) {
                System.out.println("kv"+kv);
                context.write(rowkey, kv);
            }
        }
    }

    public static void main(String[] args) throws Exception {
        Table table = null;
        try{
        Configuration conf = HBaseConfiguration.create();
        Connection connection = ConnectionFactory.createConnection(conf);
        table = connection.getTable(TableName.valueOf("hbase_test"));
        Job job = Job.getInstance(conf);
        job.setJobName("HFile bulk load test");
        job.setJarByClass(HFileGenerator.class);

        job.setOutputKeyClass(ImmutableBytesWritable.class);    
        job.setOutputValueClass(KeyValue.class);
        
        job.setMapperClass(HFileMapper.class);
        job.setReducerClass(KeyValueSortReducer.class);

        job.setPartitionerClass(SimpleTotalOrderPartitioner.class);
         // 判断output文件夹是否存在,如果存在则删除  
        Path path = new Path("hdfs://lip1:8020/user/lipeng/hbase/output");
        FileSystem fileSystem = path.getFileSystem(conf); 
        if (fileSystem.exists(path)) {  
            fileSystem.delete(path, true); 
        }  
       Path path1 =  new Path("hdfs://lip1:8020/user/lipeng/hbase/output");
        FileInputFormat.addInputPath(job, new Path("hdfs://lip1:8020/user/lipeng/hbase/input"));
        FileOutputFormat.setOutputPath(job, path1);
        HFileOutputFormat.configureIncrementalLoad(job, (HTable) table);
         if (job.waitForCompletion(true)) {  
             FsShell shell = new FsShell(conf);  
             try {  
                 //将该目录赋予777权限
                 shell.run(new String[]{"-chmod", "-R", "777", "hdfs://lip1:8020/user/lipeng/hbase/output"});  
             } catch (Exception e) {  
                 throw new IOException(e);  
             }  
             //加载到hbase表  
             LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);  
             loader.doBulkLoad(path1, (HTable) table);  
         } else {  
             System.exit(1);  
         }  
        }catch(Exception e){
           e.printStackTrace();
        }finally{
             if (table != null) {  
                 table.close();
                }  
        }
    }
}

执行的时候需要将hbase的classpath添加到hadoop的hadoop-env.sh中,要不然会报找不到hbase相关的类的错

相关文章

  • 大数据运维问题记录(五)

    问题描述:集群中原有采集程序从源文件入hbase出现积压,优化修改程序都无济于事,需要赶紧出个方案进行解决 问题解...

  • 教你制作最强运维监控大屏

    IT新一代运维大屏,数据驱动运维智能化 重新定义,运维监控平台 运维数据融合集成,构建最强运维平台 用户知识图谱化...

  • 大数据运维问题记录(九)

    问题描述:部门承接的某运营商的CRM项目日志查询出现延迟,最高延迟达半小时,严重影响业务运行。问题解决:首先对他们...

  • 大数据运维问题记录(一)

    问题描述:在hive里运行select count(1)操作一个表时,老是失败,但是其它sql确不报错能正常跑出结...

  • 大数据运维问题记录(三)

    问题描述:在resourcemanager页面查看到一些节点的Health report中报 1/4 local-...

  • 大数据运维问题记录(二)

    问题描述:一个项目组的同事反应他们的集群hive突然出现了问题,走mr就报错 问题解决:首先问了下他们最近做了些什...

  • 大数据运维问题记录(四)

    问题描述:有个hadoop集群,跑hive任务的时候慢,而且经常跑的跑的就挂了,报内存不够等等的相关异常,需要我们...

  • 大数据运维问题记录(六)

    问题描述:公司之前的采集产品由于对大数据这块水土不服,入库慢等原因,再加上负责这个产品的团队全部走光,导致现在出了...

  • 大数据运维问题记录(八)

    问题描述:一个hbase集群出现数据倾斜,并且服务器经常会宕机问题解决:登录60010界面查看region的分布情...

  • 大数据运维问题记录(七)

    问题描述:公司中一个项目我们用netty接收厂商提供的数据入kafka,接收速度较慢,入kafka也比较慢,需要对...

网友评论

    本文标题:大数据运维问题记录(五)

    本文链接:https://www.haomeiwen.com/subject/tebajttx.html