美文网首页
SpringBoot整合Hadoop

SpringBoot整合Hadoop

作者: 乙腾 | 来源:发表于2020-10-31 22:55 被阅读0次

pom

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
   <modelVersion>4.0.0</modelVersion>
   <parent>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-parent</artifactId>
       <version>2.3.5.RELEASE</version>
       <relativePath/> <!-- lookup parent from repository -->
   </parent>
   <groupId>com.hadoop</groupId>
   <artifactId>demo</artifactId>
   <version>0.0.1-SNAPSHOT</version>
   <name>demo</name>
   <description>Demo project for Spring Boot</description>

   <properties>
       <java.version>1.8</java.version>
       <hadoop.version>2.7.2</hadoop.version>
   </properties>

   <dependencies>
       <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-web</artifactId>
       </dependency>
       <dependency>
           <groupId>org.projectlombok</groupId>
           <artifactId>lombok</artifactId>
           <optional>true</optional>
       </dependency>
       <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-test</artifactId>
           <scope>test</scope>
           <exclusions>
               <exclusion>
                   <groupId>org.junit.vintage</groupId>
                   <artifactId>junit-vintage-engine</artifactId>
               </exclusion>
           </exclusions>
       </dependency>

       <dependency>
           <groupId>org.apache.hadoop</groupId>
           <artifactId>hadoop-client</artifactId>
           <version>${hadoop.version}</version>
           <exclusions>
               <exclusion>
                   <groupId>org.slf4j</groupId>
                   <artifactId>slf4j-log4j12</artifactId>
               </exclusion>
               <exclusion>
                   <groupId>javax.servlet</groupId>
                   <artifactId>servlet-api</artifactId>
               </exclusion>
           </exclusions>
       </dependency>
       <dependency>
           <groupId>org.apache.hadoop</groupId>
           <artifactId>hadoop-common</artifactId>
           <version>${hadoop.version}</version>
           <exclusions>
               <exclusion>
                   <groupId>org.slf4j</groupId>
                   <artifactId>slf4j-log4j12</artifactId>
               </exclusion>
               <exclusion>
                   <groupId>javax.servlet</groupId>
                   <artifactId>servlet-api</artifactId>
               </exclusion>
           </exclusions>
       </dependency>
       <dependency>
           <groupId>org.apache.hadoop</groupId>
           <artifactId>hadoop-hdfs</artifactId>
           <version>${hadoop.version}</version>
           <exclusions>
               <exclusion>
                   <groupId>org.slf4j</groupId>
                   <artifactId>slf4j-log4j12</artifactId>
               </exclusion>
               <exclusion>
                   <groupId>javax.servlet</groupId>
                   <artifactId>servlet-api</artifactId>
               </exclusion>
           </exclusions>
       </dependency>
   </dependencies>

   <build>
       <plugins>
           <plugin>
               <groupId>org.springframework.boot</groupId>
               <artifactId>spring-boot-maven-plugin</artifactId>
           </plugin>
       </plugins>
   </build>

</project>

application.yml

server:
  port: 8080

#name-node
hadoop.name-node: hdfs://192.168.0.105:9000
#hdfs目录
hadoop.namespace: /mydata

logging:
  level:
    com.example.demo.mapper: debug
    com.jackroy.www: ERROR
    learning: trace
log4j:
  rootLogger: WARN

DemoApplication

package com.hadoop.demo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class DemoApplication {

    public static void main(String[] args) {
        System.setProperty("hadoop.home.dir", "D:\\JApplication\\environment\\hadoop-2.7.2");
        SpringApplication.run(DemoApplication.class, args);
    }

}

HadoopConfig

package com.hadoop.demo.component.config;

import com.hadoop.demo.template.HadoopTemplate;
import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.fs.FileSystem;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.net.URI;

/**
 * <p>
 *
 * @Description: TODO
 * </p>
 * @ClassName Config
 * @Author pl
 * @Date 2020/10/31
 * @Version V1.0.0
 */
@Configuration
@ConditionalOnProperty(name="hadoop.name-node")
@Slf4j
public class HadoopConfig  {
    @Value("${hadoop.name-node}")
    private String nameNode;


    /**
     * Configuration conf=new Configuration();
     * 创建一个Configuration对象时,其构造方法会默认加载hadoop中的两个配置文件,
     * 分别是hdfs-site.xml以及core-site.xml,这两个文件中会有访问hdfs所需的参数值,
     * 主要是fs.default.name,指定了hdfs的地址,有了这个地址客户端就可以通过这个地址访问hdfs了。
     * 即可理解为configuration就是hadoop中的配置信息。
     * @return
     */
    @Bean("fileSystem")
    public FileSystem createFs() throws Exception{
        //读取配置文件
        org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();

        conf.set("fs.defalutFS", nameNode);
        conf.set("dfs.replication", "3");
        URI uri = new URI(nameNode.trim());
        FileSystem fs = FileSystem.get(uri,conf,"root");
        log.info("fileSystem 加载了");
        System.out.println("fs.defaultFS: "+conf.get("fs.defaultFS"));
        return  fs;
    }


}

HadoopTemplate

package com.hadoop.demo.template;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.io.IOException;
/**
 * <p>
 *
 * @Description: TODO
 * </p>
 * @ClassName HadoopTemplate
 * @Author pl
 * @Date 2020/10/31
 * @Version V1.0.0
 */
@Component
@ConditionalOnClass(FileSystem.class)
@Slf4j
public class HadoopTemplate {

    @Autowired
    private FileSystem fileSystem;

    @Value("${hadoop.name-node}")
    private String nameNode;

    @Value("${hadoop.namespace:/}")
    private String nameSpace;

    @PostConstruct
    public void init(){
        existDir(nameSpace,true);
    }

    public void uploadFile(String srcFile){
        copyFileToHDFS(false,true,srcFile,nameSpace);
    }

    public void uploadFile(boolean del,String srcFile){
        copyFileToHDFS(del,true,srcFile,nameSpace);
    }

    public void uploadFile(String srcFile,String destPath){
        copyFileToHDFS(false,true,srcFile,destPath);
    }

    public void uploadFile(boolean del,String srcFile,String destPath){
        copyFileToHDFS(del,true,srcFile,destPath);
    }

    public void delFile(String fileName){
        rmdir(nameSpace,fileName) ;
    }

    public void delDir(String path){
        nameSpace = nameSpace + "/" +path;
        rmdir(path,null) ;
    }

    public void download(String fileName,String savePath){
        getFile(nameSpace+"/"+fileName,savePath);
    }


    /**
     * 判断nameSpace 目录是否存在,如果不存在就创建该目录
     * @param filePath
     * @param create
     * @return
     */
    public boolean existDir(String filePath, boolean create){
        boolean flag = false;
        if(StringUtils.isEmpty(filePath)){
            throw new IllegalArgumentException("filePath不能为空");
        }
        try{
            Path path = new Path(filePath);
            if (create){
                if (!fileSystem.exists(path)){
                    fileSystem.mkdirs(path);
                }
            }
            if (fileSystem.isDirectory(path)){
                flag = true;
            }
        }catch (Exception e){
            log.error("", e);
        }
        return flag;
    }




    /**
     * 文件上传至 HDFS
     * @param delSrc       指是否删除源文件,true为删除,默认为false
     * @param overwrite    是否重写
     * @param srcFile      源文件,上传文件路径
     * @param destPath     hdfs的目的路径
     */
    public  void copyFileToHDFS(boolean delSrc, boolean overwrite,String srcFile,String destPath) {
        // 源文件路径是Linux下的路径,如果在 windows 下测试,需要改写为Windows下的路径,比如D://hadoop/djt/weibo.txt
        Path srcPath = new Path(srcFile);
        log.info("@@@@@ 上传文件路径:"+srcFile);
        // 目的路径
        if(StringUtils.isNotBlank(nameNode)){
            destPath = nameNode + destPath;
        }
        Path dstPath = new Path(destPath);
        // 实现文件上传
        try {
            // 获取FileSystem对象
            //fileSystem.copyFromLocalFile(srcPath, dstPath);
            log.info("@@@@@ 开始上传");
            fileSystem.copyFromLocalFile(delSrc,overwrite,srcPath, dstPath);
            log.info("@@@@@ 上传完成");
            //释放资源
            //    fileSystem.close();
        } catch (IOException e) {
            log.error("", e);
        }
    }


    /**
     * 删除文件或者文件目录
     *
     * @param path
     */
    public void rmdir(String path,String fileName) {
        try {
            // 返回FileSystem对象
            if(StringUtils.isNotBlank(nameNode)){
                path = nameNode + path;
            }
            if(StringUtils.isNotBlank(fileName)){
                path =  path + "/" +fileName;
            }
            // 删除文件或者文件目录  delete(Path f) 此方法已经弃用
            fileSystem.delete(new Path(path),true);
        } catch (IllegalArgumentException | IOException e) {
            log.error("", e);
        }
    }

    /**
     * 从 HDFS 下载文件
     *
     * @param hdfsFile
     * @param destPath 文件下载后,存放地址
     */
    public void getFile(String hdfsFile,String destPath) {
        // 源文件路径
        if(StringUtils.isNotBlank(nameNode)){
            hdfsFile = nameNode + hdfsFile;
        }
        Path hdfsPath = new Path(hdfsFile);
        Path dstPath = new Path(destPath);
        try {
            // 下载hdfs上的文件
            log.info("@@@@@ 开始下载");
            fileSystem.copyToLocalFile(hdfsPath, dstPath);
            log.info("@@@@@ 下载完成");
            // 释放资源
            // fs.close();
        } catch (IOException e) {
            log.error("", e);
        }
    }

    public String getNameSpace(){
        return nameSpace;
    }

}

HdfsController

package com.hadoop.demo.controller;

import com.hadoop.demo.template.HadoopTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

/**
 * <p>
 *
 * @Description: TODO
 * </p>
 * @ClassName HdfsController
 * @Author pl
 * @Date 2020/10/31
 * @Version V1.0.0
 */
@RestController
public class HdfsController {

    @Autowired
    private HadoopTemplate hadoopTemplate;

    /**
     * 将本地文件srcFile,上传到hdfs
     * @param srcFile
     * @return
     */
    @PostMapping("/upload")
    public String upload(String srcFile){
        hadoopTemplate.uploadFile(srcFile);
        return "copy";
    }

    @DeleteMapping("/delFile")
    public String del(@RequestParam String fileName){
        hadoopTemplate.delFile(fileName);
        return "delFile";
    }

    @GetMapping("/download")
    public String download(@RequestParam String fileName,@RequestParam String savePath){
        hadoopTemplate.download(fileName,savePath);
        return "download";
    }
}

test

package com.hadoop.demo;

import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;

import java.io.IOException;

@SpringBootTest
@Slf4j
class DemoApplicationTests {

    @Test
    void contextLoads() {
    }

    @Autowired
    private FileSystem fileSystem;

    @Value("${hadoop.namespace:/}")
    private String nameSpace;


    @Test
    public void filesInfo() throws IOException {
        RemoteIterator<LocatedFileStatus> files = fileSystem.listFiles(new Path(nameSpace), true);
         while (files.hasNext()){
             LocatedFileStatus fileStatus = files.next();
             log.info("名字: "+fileStatus.getPath().getName());
             log.info("文件分组: "+fileStatus.getGroup());
             log.info("文件长度: "+String.valueOf(fileStatus.getLen()));
             log.info("文件权限: "+String.valueOf(fileStatus.getPermission()));
             BlockLocation[] blockLocations = fileStatus.getBlockLocations();
             log.info("block 数:"+blockLocations.length);
             for (BlockLocation blockLocation : blockLocations) {
                 String[] hosts = blockLocation.getHosts();
                 for (String host : hosts) {
                     System.out.println(fileStatus.getPath().getName()+"block主机节点:"+host);
                 }
             }
         }
    }


    @Test
    public void isFile() throws IOException {
        FileStatus[] fileStatuses = fileSystem.listStatus(new Path(nameSpace));
        for (FileStatus fileStatus : fileStatuses) {
            if (fileStatus.isFile()){
                log.info("文件:"+fileStatus.getPath().getName());
            }
        }
    }
}

相关文章

网友评论

      本文标题:SpringBoot整合Hadoop

      本文链接:https://www.haomeiwen.com/subject/lnbevktx.html