美文网首页
03_一个基于java,hadoop的数据采集程序demo

03_一个基于java,hadoop的数据采集程序demo

作者: 会摄影的程序员 | 来源:发表于2019-01-28 22:22 被阅读0次

    1.文件分布

    文件分布

    2. 环境搭建

    参考:02_在window环境开发hadoop_HDFS

    3 需要采集的目录

    image.png

    4 说明:

    1. 需要采集的文件在accesslog里面并且以.log结尾
    2. 采集的思路是
      1. 先从accesslog移动进temp文件夹
      2. 从temp向HDFS上传
      3. 将temp里面的文件移动进backup文件夹
    3. 此demo是采用读取配置文件来得到路径以及配置信息
    4. 读取配置文件采用单列模式的懒汉式并考虑了线程安全

    5. code

    5.1 config.properties

    LOG_SOURCE_DIR=H:/logs/accesslog/
    LOG_TEMP_DIR=H:/logs/temp/
    LOG_BACKUP_DIR=H:/logs/backup/
    
    LOG_TIMEOUT=24
    LOG_NAME=.log
    
    HDFS_URL=hdfs://vm01:9000/
    HDFS_DIR=/logs/
    HDFS_F_NAME=access_log_
    HDFS_L_NAME=.log
    

    5.2 PropertiesHolderLaze

    package com.looc.D02数据采集demo;
    
    import java.io.IOException;
    import java.util.Properties;
    
    /**
     * 单列模式:懒汉式——并考虑线程安全
     * @author chenPeng
     * @version 1.0.0
     * @ClassName PropertiesHolderLaze.java
     * @Description TODO
     * @createTime 2019年01月28日 19:47:00
     */
    public class PropertiesHolderLaze {
        private static Properties pros = null;
    
        public static Properties getPros() throws IOException {
            if (pros==null){
                synchronized (PropertiesHolderLaze.class){
                    if (pros==null){
                        pros = new Properties();
                        pros.load(PropertiesHolderLaze.class.getClassLoader().getResourceAsStream(
                                "config.properties"));
                    }
                }
            }
            return pros;
        }
    }
    

    5.3 CollectionData

    package com.looc.D02数据采集demo;
    
    
    import org.apache.commons.io.FileUtils;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    
    import java.io.File;
    import java.io.IOException;
    import java.net.URI;
    import java.net.URISyntaxException;
    import java.text.SimpleDateFormat;
    import java.util.Calendar;
    import java.util.Properties;
    import java.util.TimerTask;
    import java.util.UUID;
    
    /**
     * @author chenPeng
     * @version 1.0.0
     * @ClassName CollectionData.java
     * @Description TODO
     * @createTime 2019年01月28日 19:18:00
     */
    public class CollectionData extends TimerTask {
    
        private String LOG_SOURCE_DIR;
        private String LOG_TEMP_DIR;
        private String LOG_BACKUP_DIR;
        private String LOG_TIMEOUT;
        private String LOG_NAME;
        private String HDFS_URL;
        private String HDFS_DIR;
        private String HDFS_F_NAME;
        private String HDFS_L_NAME;
        private String TIME_MATE = "yyyy-MM-dd";
    
    
        /**
         * The action to be performed by this timer task.
         */
        @Override
        public void run() {
            //移动数据到temp
            //temp上传到hdfs
            //temp移动到backup
            try {
                init();
                //移动之前判断是否有文件
                if (!isNullDir()){
                    moveToTemp();
                    setUpToHdfs();
                    tempToBackup();
                }
            } catch (IOException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (URISyntaxException e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 初始化得到配置文件
         * @Author chenpeng
         * @Description //TODO
         * @Date 20:19
         * @Param []
         * @return void
         **/
        public void init() throws IOException {
            Properties pros = PropertiesHolderLaze.getPros();
    
            LOG_SOURCE_DIR = pros.getProperty("LOG_SOURCE_DIR");
            LOG_TEMP_DIR = pros.getProperty("LOG_TEMP_DIR");
            LOG_BACKUP_DIR = pros.getProperty("LOG_BACKUP_DIR");
            LOG_TIMEOUT = pros.getProperty("LOG_TIMEOUT");
            LOG_NAME = pros.getProperty("LOG_NAME");
            HDFS_URL = pros.getProperty("HDFS_URL");
            HDFS_DIR = pros.getProperty("HDFS_DIR");
            HDFS_F_NAME = pros.getProperty("HDFS_F_NAME");
            HDFS_L_NAME = pros.getProperty("HDFS_L_NAME");
        }
    
    
        /**
         * 判断文件夹是否为空
         * @Author chenpeng
         * @Description //TODO
         * @Date 21:19
         * @Param []
         * @return boolean
         **/
        public boolean isNullDir(){
            if (new File(LOG_SOURCE_DIR).list().length==0){
                return true;
            }
            return false;
        }
        /**
         * 移动数据到temp
         * @Author chenpeng
         * @Description //TODO
         * @Date 20:17
         * @Param []
         * @return void
         **/
        public void moveToTemp() throws IOException {
            //拿到文件
            File[] logSourceArray = new File(LOG_SOURCE_DIR).listFiles();
    
            //移动文件
            for (File file : logSourceArray) {
                FileUtils.moveFileToDirectory(file,new File(LOG_TEMP_DIR),true);
            }
        }
    
        /**
         * temp上传到hdfs
         * @Author chenpeng
         * @Description //TODO
         * @Date 20:18
         * @Param []
         * @return void
         **/
        public void setUpToHdfs() throws URISyntaxException, IOException, InterruptedException {
            //拿到全部的文件
            File[] logTempArray = new File(LOG_TEMP_DIR).listFiles();
    
            //获取hdfs链接
            FileSystem fileSystem =
                    FileSystem.get(new URI(HDFS_URL),new Configuration(),"root");
    
            //获取一个时间
            SimpleDateFormat sdf = new SimpleDateFormat(TIME_MATE);
            Calendar calendar = Calendar.getInstance();
            String time = sdf.format(calendar.getTime());
    
            //上传文件
            for (File file : logTempArray) {
                fileSystem.copyFromLocalFile(
                        new Path(file.getAbsolutePath()),
                        new Path(HDFS_DIR+"/"+time+"/"+
                                HDFS_F_NAME+time+UUID.randomUUID()+HDFS_L_NAME));
            }
    
            //关闭流
            fileSystem.close();
        }
    
        /**
         * temp移动到backup
         * @Author chenpeng
         * @Description //TODO
         * @Date 20:18
         * @Param []
         * @return void
         **/
        public void tempToBackup() throws IOException {
            //拿到全部的文件
            File[] logTempArray = new File(LOG_TEMP_DIR).listFiles();
    
            //拿到时间
            SimpleDateFormat sdf = new SimpleDateFormat(TIME_MATE);
            Calendar calendar = Calendar.getInstance();
            String time = sdf.format(calendar.getTime());
    
            for (File file : logTempArray) {
                FileUtils.moveFileToDirectory(
                        file,new File(LOG_BACKUP_DIR+"/"+time),true);
            }
        }
    }
    

    5.4 ClearDate

    package com.looc.D02数据采集demo;
    
    import org.apache.commons.io.FileUtils;
    
    import java.io.File;
    import java.io.IOException;
    import java.text.ParseException;
    import java.text.SimpleDateFormat;
    import java.util.Calendar;
    import java.util.Properties;
    import java.util.TimerTask;
    
    /**
     * @author chenPeng
     * @version 1.0.0
     * @ClassName ClearDate.java
     * @Description TODO
     * @createTime 2019年01月28日 19:19:00
     */
    public class ClearDate extends TimerTask {
    
        private String TIME_MATE = "yyyy-MM-dd";
        private String LOG_BACKUP_DIR;
    
        /**
         * The action to be performed by this timer task.
         */
        @Override
        public void run() {
            try {
                init();
                fondTimeOutFile();
            } catch (IOException e) {
                e.printStackTrace();
            } catch (ParseException e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 初始化路径
         * @Author chenpeng
         * @Description //TODO
         * @Date 21:25
         * @Param []
         * @return void
         **/
        public void init() throws IOException {
            Properties properties = PropertiesHolderLaze.getPros();
    
            LOG_BACKUP_DIR = properties.getProperty("LOG_BACKUP_DIR");
        }
    
        /**
         * 扫描文件夹是否存在超时文件
         * @Author chenpeng
         * @Description //TODO
         * @Date 21:26
         * @Param []
         * @return void
         **/
        public void fondTimeOutFile() throws IOException, ParseException {
            //拿到路径
            File[] fileArray = new File(LOG_BACKUP_DIR).listFiles();
    
            //拿到时间
            Calendar calendar = Calendar.getInstance();
            calendar.add(Calendar.DAY_OF_MONTH,-1);
            SimpleDateFormat sdf = new SimpleDateFormat(TIME_MATE);
    
            for (File file : fileArray) {
    
                if (calendar.getTime().getTime() > sdf.parse(file.getName()).getTime()){
                    //删除
                    FileUtils.deleteDirectory(file);
                }
            }
        }
    }
    

    5.5 Main

    package com.looc.D02数据采集demo;
    
    
    import java.io.IOException;
    import java.util.Properties;
    import java.util.Timer;
    
    /**
     * 程序入口
     * @author chenPeng
     * @version 1.0.0
     * @ClassName Main.java
     * @Description TODO
     * @createTime 2019年01月28日 20:08:00
     */
    public class Main {
        private static Integer LOG_TIMEOUT;
    
        public static void init() throws IOException {
            Properties properties = PropertiesHolderLaze.getPros();
            LOG_TIMEOUT = Integer.parseInt(properties.getProperty("LOG_TIMEOUT"));
        }
    
        public static void main(String[] args){
            Timer timer = new Timer();
            timer.schedule(new CollectionData(),0,LOG_TIMEOUT*60*60*1000L);
            timer.schedule(new ClearDate(),0,LOG_TIMEOUT*60*60*1000L);
        }
    }
    
    

    相关文章

      网友评论

          本文标题:03_一个基于java,hadoop的数据采集程序demo

          本文链接:https://www.haomeiwen.com/subject/htizjqtx.html