美文网首页
Hadoop MR ETL离线项目

Hadoop MR ETL离线项目

作者: 喵星人ZC | 来源:发表于2019-04-03 00:40 被阅读0次

    一、需求及步骤解析

    1、需求

    利用MR对日志进行清洗后交由Hive统计分析

    2、步骤解析

    1、自己造一份日志,包含(cdn,region,level,time,ip,domain,url、traffic)字段,且time、ip、domain、traffic变化,50M到100M大小
    2、编写MR程序对日志进行清洗
    3、清洗完后的日志移动到Hive外表的location上
    4、刷新Hive分区信息
    5、查询每个domain的traffic的总和
    6、利用Shell封装整个运行过程

    二、利用日志生成器生成日志并上传至HDFS

    日志生成器

    package com.ruoze.hadoop.utils;
    
    import java.io.File;
    import java.io.FileWriter;
    import java.io.IOException;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.Random;
    
    public class GenerateLogUtils {
        public static void main(String[] args) {
            generateLog();
        }
    
        private static String generateLog() {
            try {
    
                //创建文件只创建一次  此处代码不能放到for循环中 不然会很耗费性能
                File file = new File("access.log");
                if (!file.exists()) {
                    file.createNewFile();
                }
    
                for (int i = 0; i < 1000000; i++) {
                    Random rd = new Random();
                    Date date = randomDate("2019-01-01", "2019-01-31");
    
                    String[] domainStr = new String[]{
                            "v1.go2yd.com",
                            "v2.go2yd.com",
                            "v3.go2yd.com",
                            "v4.go2yd.com",
                            "v5.go2yd.com",
                    };
                    int domainNum = rd.nextInt(domainStr.length - 1);
    
                    String[] trafficStr = new String[]{
                            "136662",
                            "785966",
                            "987422",
                            "975578",
                            "154851",
                            ""
                    };
    
                    int trafficNum = rd.nextInt(trafficStr.length - 1);
    
                    StringBuilder builder = new StringBuilder();
                    builder
                            .append("baidu").append("\t")
                            .append("CN").append("\t")
                            .append("2").append("\t")
                            .append(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(date)).append("\t")
                            .append(getRandomIp()).append("\t")
                            .append(domainStr[domainNum]).append("\t")
                            .append("http://v1.go2yd.com/user_upload/1531633977627104fdecdc68fe7a2c4b96b2226fd3f4c.mp4_bd.mp4").append("\t")
                            .append(trafficStr[trafficNum]).append("\t");
                    FileWriter fileWriter = new FileWriter(file.getName(), true);
                    fileWriter.write(builder.toString() + "\n");
    
                    fileWriter.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
            return "";
        }
    
    
        /**
         * 随机生成时间
         *
         * @param beginDate
         * @param endDate
         * @return
         */
        private static Date randomDate(String beginDate, String endDate) {
            try {
                SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");
                Date start = format.parse(beginDate);
                Date end = format.parse(endDate);
    
                if (start.getTime() >= end.getTime()) {
                    return null;
                }
                long date = random(start.getTime(), end.getTime());
                return new Date(date);
            } catch (Exception e) {
                e.printStackTrace();
            }
            return null;
        }
    
        private static long random(long begin, long end) {
            long rtn = begin + (long) (Math.random() * (end - begin));
            if (rtn == begin || rtn == end) {
                return random(begin, end);
            }
            return rtn;
        }
    
    
        /**
         * 随机生成IP-----------------------------------------------------
         *
         * @return
         */
        public static String getRandomIp() {
    
            // ip范围
            int[][] range = {{607649792, 608174079},// 36.56.0.0-36.63.255.255
                    {1038614528, 1039007743},// 61.232.0.0-61.237.255.255
                    {1783627776, 1784676351},// 106.80.0.0-106.95.255.255
                    {2035023872, 2035154943},// 121.76.0.0-121.77.255.255
                    {2078801920, 2079064063},// 123.232.0.0-123.235.255.255
                    {-1950089216, -1948778497},// 139.196.0.0-139.215.255.255
                    {-1425539072, -1425014785},// 171.8.0.0-171.15.255.255
                    {-1236271104, -1235419137},// 182.80.0.0-182.92.255.255
                    {-770113536, -768606209},// 210.25.0.0-210.47.255.255
                    {-569376768, -564133889}, // 222.16.0.0-222.95.255.255
            };
    
            Random rdint = new Random();
            int index = rdint.nextInt(10);
            String ip = num2ip(range[index][0] + new Random().nextInt(range[index][1] - range[index][0]));
            return ip;
        }
    
        /*
         * 将十进制转换成ip地址
         */
        public static String num2ip(int ip) {
            int[] b = new int[4];
            String x = "";
    
            b[0] = (int) ((ip >> 24) & 0xff);
            b[1] = (int) ((ip >> 16) & 0xff);
            b[2] = (int) ((ip >> 8) & 0xff);
            b[3] = (int) (ip & 0xff);
            x = Integer.toString(b[0]) + "." + Integer.toString(b[1]) + "." + Integer.toString(b[2]) + "." + Integer.toString(b[3]);
    
            return x;
        }
    
    
    }
    
    

    将access.log上传至HDFS路径

     hadoop fs -put access.log  /g6/hadoop/accesslog/20190402/
    

    三、MR清洗

    1、编写清洗日志的LogUtils类

    package com.ruoze.hadoop.utils;
    
    import java.text.DateFormat;
    import java.text.ParseException;
    import java.text.SimpleDateFormat;
    import java.util.Locale;
    
    public class LogUtils {
        DateFormat sourceFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss",Locale.ENGLISH);
        DateFormat targetFormat = new SimpleDateFormat("yyyyMMddHHmmss");
    
        /**
         * 日志文件解析,对内容进行字段的处理
         * 按\t分割
         */
        public String parse(String log) {
            String result = "";
            try {
                String[] splits = log.split("\t");
                String cdn = splits[0];
                String region = splits[1];
                String level = splits[2];
                String timeStr = splits[3];
    //            String time = timeStr.substring(1, timeStr.length() - 7);
    
                String time = targetFormat.format(sourceFormat.parse(timeStr));
    
                String ip = splits[4];
                String domain = splits[5];
                String url = splits[6];
                String traffic = splits[7];
    
                StringBuilder builder = new StringBuilder("");
                builder.append(cdn).append("\t")
                        .append(region).append("\t")
                        .append(level).append("\t")
                        .append(time).append("\t")
                        .append(ip).append("\t")
                        .append(domain).append("\t")
                        .append(url).append("\t")
                        .append(traffic);
    
                result = builder.toString();
            } catch (ParseException e) {
                e.printStackTrace();
            }
            return result;
        }
    }
    

    2、LogUtils的单元测试

    package com.ruoze.hadoop.utils;
    
    import org.junit.After;
    import org.junit.Before;
    import org.junit.Test;
    
    public class LogUtilsTest {
        private LogUtils utils;
    
        @Test
        public void LogUtilsTest() {
    
            String log = "baidu\tCN\t2\t2019-01-10 16:02:54\t121.77.143.199\tv2.go2yd.com\thttp://v3.go2yd.com/user_upload/1531633977627104fdecdc68fe7a2c4b96b2226fd3f4c.mp4_bd.mp4\t97557845";
            String result = utils.parse(log);
            System.out.println(result);
        }
    
        @Before
        public void setUp() {
    
            utils = new LogUtils();
        }
    
        @After
        public void trarDown() {
            utils = null;
        }
    }
    

    测试结果如图:


    3、Mapper

    package com.ruoze.hadoop.mapreduce;
    
    import com.ruoze.hadoop.utils.LogUtils;
    import org.apache.commons.lang.StringUtils;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    public class LogETLMapper  extends Mapper<LongWritable,Text,NullWritable,Text>{
        /**
         * 通过mapreduce框架的map方式进行数据清洗
         * 进来一条数据就按照我们的解析规则清洗完以后输出
         */
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            int length = value.toString().split("\t").length;
            String traffic = value.toString().split("\t")[7];
            if(length == 8 && traffic != null) {
    
                LogUtils utils = new LogUtils();
                String result = utils.parse(value.toString());
                if(StringUtils.isNotBlank(result)) {
                    context.write(NullWritable.get(), new Text(result));
                }
            }
        }
    }
    
    

    4、Job

    package com.ruoze.hadoop.mapreduce;
    
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class LogETLDriver {
        public static void main(String[] args) throws Exception{
            if (args.length != 2) {
                System.err.println("please input 2 params: input output");
                System.exit(0);
            }
    
            String input = args[0];
            String output = args[1];  
    
            //System.setProperty("hadoop.home.dir", "D:\\Hadoop\\hadoop-2.6.0-cdh5.7.0");
    
            Configuration configuration = new Configuration();
    
            FileSystem fileSystem = FileSystem.get(configuration);
            Path outputPath = new Path(output);
            if (fileSystem.exists(outputPath)) {
                fileSystem.delete(outputPath, true);
            }
    
            Job job = Job.getInstance(configuration);
            job.setJarByClass(LogETLDriver.class);
            job.setMapperClass(LogETLMapper.class);
            job.setMapOutputKeyClass(NullWritable.class);
            job.setMapOutputValueClass(Text.class);
    
            FileInputFormat.setInputPaths(job, new Path(input));
            FileOutputFormat.setOutputPath(job, new Path(output));
    
            job.waitForCompletion(true);
        }
    
    }
    

    以上程序编写后打包上传至服务器:

    [hadoop@hadoop000 lib]$ ll
    total 12
    -rw-r--r-- 1 hadoop hadoop 8754 Mar 29 22:38 hadoop-1.0.jar
    

    在HDFS上创建MR程序的输出路径:

    hadoop fs -mkdir -p /g6/hadoop/access/output/day=20190402
    

    四、创建Hive外表

    create external table g6_access (
    cdn string,
    region string,
    level string,
    time string,
    ip string,
    domain string,
    url string,
    traffic bigint
    ) partitioned by (day string)
    ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
    LOCATION '/g6/hadoop/access/clear' 
    

    因为MR程序每次运行时会删除输出路径,所以Hive的location不要指向输出路径,等MR跑完后将数据移动到location下。

    五、运行Hadoop MR程序进行测试

    1、运行MR

    hadoop jar /home/hadoop/lib/hadoop-1.0.jar com.ruoze.hadoop.mapreduce.LogETLDriver /g6/hadoop/accesslog/20190402/ /g6/hadoop/access/output/day=20190402
    

    2、将输出结果移动到Location下

    hadoop fs -mv /g6/hadoop/access/output/day=20190402 /g6/hadoop/access/clear
    

    3、刷新Hive分区(不刷新Hive是查询不到数据的)

    alter table g6_access add if not exists partition(day=20190402);
    

    4、Hive统计分析每个domain的traffic的总和

    hive (g6_hadoop)> select domain,count(*) from g6_access group by domain;
    Query ID = hadoop_20190402232525_4b5c6115-d9a4-4dbd-8cbd-768f298decb4
    Total jobs = 1
    Launching Job 1 out of 1
    Number of reduce tasks not specified. Estimated from input data size: 1
    In order to change the average load for a reducer (in bytes):
      set hive.exec.reducers.bytes.per.reducer=<number>
    In order to limit the maximum number of reducers:
      set hive.exec.reducers.max=<number>
    In order to set a constant number of reducers:
      set mapreduce.job.reduces=<number>
    Starting Job = job_1554215624276_0002, Tracking URL = http://hadoop000:8088/proxy/application_1554215624276_0002/
    Kill Command = /home/hadoop/soul/app/hadoop-2.6.0-cdh5.7.0/bin/hadoop job  -kill job_1554215624276_0002
    Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1
    2019-04-02 23:30:45,007 Stage-1 map = 0%,  reduce = 0%
    2019-04-02 23:30:51,476 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 1.73 sec
    2019-04-02 23:30:57,940 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 2.77 sec
    MapReduce Total cumulative CPU time: 2 seconds 770 msec
    Ended Job = job_1554215624276_0002
    MapReduce Jobs Launched: 
    Stage-Stage-1: Map: 1  Reduce: 1   Cumulative CPU: 2.77 sec   HDFS Read: 44772154 HDFS Write: 76 SUCCESS
    Total MapReduce CPU Time Spent: 2 seconds 770 msec
    OK
    domain  _c1
    v1.go2yd.com    74908
    v2.go2yd.com    74795
    v3.go2yd.com    75075
    v4.go2yd.com    75222
    Time taken: 21.612 seconds, Fetched: 4 row(s)
    

    六、shell封装整个流程

    g6_mr_etl.sh

    #/bin/bash
    
    source ~/.bash_profile
    
    if [ $# != 1 ] ; then
    echo "Usage: g6_mr_etl.sh <dateString>"
    echo "E.g.: g6_mr_etl.sh 20190402"
    exit 1;
    fi
    
    
    process_date=$1 
    
    echo -e "\033[36m###### step1:MR ETL ######\033[0m"  
    hadoop jar /home/hadoop/lib/hadoop-1.0.jar com.ruoze.hadoop.mapreduce.LogETLDriver /g6/hadoop/accesslog/$process_date/ /g6/hadoop/access/output/day=$pro
    cess_date
    
    
    
    hive -e "use hive;
    create external table if  not exists g6_access (
    cdn string,
    region string,
    level string,
    time string,
    ip string,
    domain string,
    url string,
    traffic bigint
    ) partitioned by (day string)
    ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
    LOCATION '/g6/hadoop/access/clear' ;"
    
    echo -e "\033[36m###### step2:Mv Data to DW ###### \033[0m"  
    hadoop fs -mv /g6/hadoop/access/output/day=$process_date /g6/hadoop/access/clear
    
    
    echo -e "\033[36m###### step3:Alter metadata ######\033[0m"  
    database=g6_hadoop
    hive -e "use ${database}; alter table g6_access add if not exists partition(day=$process_date);"
    

    相关文章

      网友评论

          本文标题:Hadoop MR ETL离线项目

          本文链接:https://www.haomeiwen.com/subject/uduobqtx.html