pom
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.5.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.hadoop</groupId>
<artifactId>demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>demo</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
<hadoop.version>2.7.2</hadoop.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
application.yml
server:
port: 8080
#name-node
hadoop.name-node: hdfs://192.168.0.105:9000
#hdfs目录
hadoop.namespace: /mydata
logging:
level:
com.example.demo.mapper: debug
com.jackroy.www: ERROR
learning: trace
log4j:
rootLogger: WARN
DemoApplication
package com.hadoop.demo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
System.setProperty("hadoop.home.dir", "D:\\JApplication\\environment\\hadoop-2.7.2");
SpringApplication.run(DemoApplication.class, args);
}
}
HadoopConfig
package com.hadoop.demo.component.config;
import com.hadoop.demo.template.HadoopTemplate;
import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.fs.FileSystem;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.net.URI;
/**
* <p>
*
* @Description: TODO
* </p>
* @ClassName Config
* @Author pl
* @Date 2020/10/31
* @Version V1.0.0
*/
@Configuration
@ConditionalOnProperty(name="hadoop.name-node")
@Slf4j
public class HadoopConfig {
@Value("${hadoop.name-node}")
private String nameNode;
/**
* Configuration conf=new Configuration();
* 创建一个Configuration对象时,其构造方法会默认加载hadoop中的两个配置文件,
* 分别是hdfs-site.xml以及core-site.xml,这两个文件中会有访问hdfs所需的参数值,
* 主要是fs.default.name,指定了hdfs的地址,有了这个地址客户端就可以通过这个地址访问hdfs了。
* 即可理解为configuration就是hadoop中的配置信息。
* @return
*/
@Bean("fileSystem")
public FileSystem createFs() throws Exception{
//读取配置文件
org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
conf.set("fs.defalutFS", nameNode);
conf.set("dfs.replication", "3");
URI uri = new URI(nameNode.trim());
FileSystem fs = FileSystem.get(uri,conf,"root");
log.info("fileSystem 加载了");
System.out.println("fs.defaultFS: "+conf.get("fs.defaultFS"));
return fs;
}
}
HadoopTemplate
package com.hadoop.demo.template;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.io.IOException;
/**
* <p>
*
* @Description: TODO
* </p>
* @ClassName HadoopTemplate
* @Author pl
* @Date 2020/10/31
* @Version V1.0.0
*/
@Component
@ConditionalOnClass(FileSystem.class)
@Slf4j
public class HadoopTemplate {
@Autowired
private FileSystem fileSystem;
@Value("${hadoop.name-node}")
private String nameNode;
@Value("${hadoop.namespace:/}")
private String nameSpace;
@PostConstruct
public void init(){
existDir(nameSpace,true);
}
public void uploadFile(String srcFile){
copyFileToHDFS(false,true,srcFile,nameSpace);
}
public void uploadFile(boolean del,String srcFile){
copyFileToHDFS(del,true,srcFile,nameSpace);
}
public void uploadFile(String srcFile,String destPath){
copyFileToHDFS(false,true,srcFile,destPath);
}
public void uploadFile(boolean del,String srcFile,String destPath){
copyFileToHDFS(del,true,srcFile,destPath);
}
public void delFile(String fileName){
rmdir(nameSpace,fileName) ;
}
public void delDir(String path){
nameSpace = nameSpace + "/" +path;
rmdir(path,null) ;
}
public void download(String fileName,String savePath){
getFile(nameSpace+"/"+fileName,savePath);
}
/**
* 判断nameSpace 目录是否存在,如果不存在就创建该目录
* @param filePath
* @param create
* @return
*/
public boolean existDir(String filePath, boolean create){
boolean flag = false;
if(StringUtils.isEmpty(filePath)){
throw new IllegalArgumentException("filePath不能为空");
}
try{
Path path = new Path(filePath);
if (create){
if (!fileSystem.exists(path)){
fileSystem.mkdirs(path);
}
}
if (fileSystem.isDirectory(path)){
flag = true;
}
}catch (Exception e){
log.error("", e);
}
return flag;
}
/**
* 文件上传至 HDFS
* @param delSrc 指是否删除源文件,true为删除,默认为false
* @param overwrite 是否重写
* @param srcFile 源文件,上传文件路径
* @param destPath hdfs的目的路径
*/
public void copyFileToHDFS(boolean delSrc, boolean overwrite,String srcFile,String destPath) {
// 源文件路径是Linux下的路径,如果在 windows 下测试,需要改写为Windows下的路径,比如D://hadoop/djt/weibo.txt
Path srcPath = new Path(srcFile);
log.info("@@@@@ 上传文件路径:"+srcFile);
// 目的路径
if(StringUtils.isNotBlank(nameNode)){
destPath = nameNode + destPath;
}
Path dstPath = new Path(destPath);
// 实现文件上传
try {
// 获取FileSystem对象
//fileSystem.copyFromLocalFile(srcPath, dstPath);
log.info("@@@@@ 开始上传");
fileSystem.copyFromLocalFile(delSrc,overwrite,srcPath, dstPath);
log.info("@@@@@ 上传完成");
//释放资源
// fileSystem.close();
} catch (IOException e) {
log.error("", e);
}
}
/**
* 删除文件或者文件目录
*
* @param path
*/
public void rmdir(String path,String fileName) {
try {
// 返回FileSystem对象
if(StringUtils.isNotBlank(nameNode)){
path = nameNode + path;
}
if(StringUtils.isNotBlank(fileName)){
path = path + "/" +fileName;
}
// 删除文件或者文件目录 delete(Path f) 此方法已经弃用
fileSystem.delete(new Path(path),true);
} catch (IllegalArgumentException | IOException e) {
log.error("", e);
}
}
/**
* 从 HDFS 下载文件
*
* @param hdfsFile
* @param destPath 文件下载后,存放地址
*/
public void getFile(String hdfsFile,String destPath) {
// 源文件路径
if(StringUtils.isNotBlank(nameNode)){
hdfsFile = nameNode + hdfsFile;
}
Path hdfsPath = new Path(hdfsFile);
Path dstPath = new Path(destPath);
try {
// 下载hdfs上的文件
log.info("@@@@@ 开始下载");
fileSystem.copyToLocalFile(hdfsPath, dstPath);
log.info("@@@@@ 下载完成");
// 释放资源
// fs.close();
} catch (IOException e) {
log.error("", e);
}
}
public String getNameSpace(){
return nameSpace;
}
}
HdfsController
package com.hadoop.demo.controller;
import com.hadoop.demo.template.HadoopTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
* <p>
*
* @Description: TODO
* </p>
* @ClassName HdfsController
* @Author pl
* @Date 2020/10/31
* @Version V1.0.0
*/
@RestController
public class HdfsController {
@Autowired
private HadoopTemplate hadoopTemplate;
/**
* 将本地文件srcFile,上传到hdfs
* @param srcFile
* @return
*/
@PostMapping("/upload")
public String upload(String srcFile){
hadoopTemplate.uploadFile(srcFile);
return "copy";
}
@DeleteMapping("/delFile")
public String del(@RequestParam String fileName){
hadoopTemplate.delFile(fileName);
return "delFile";
}
@GetMapping("/download")
public String download(@RequestParam String fileName,@RequestParam String savePath){
hadoopTemplate.download(fileName,savePath);
return "download";
}
}
test
package com.hadoop.demo;
import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
import java.io.IOException;
@SpringBootTest
@Slf4j
class DemoApplicationTests {
@Test
void contextLoads() {
}
@Autowired
private FileSystem fileSystem;
@Value("${hadoop.namespace:/}")
private String nameSpace;
@Test
public void filesInfo() throws IOException {
RemoteIterator<LocatedFileStatus> files = fileSystem.listFiles(new Path(nameSpace), true);
while (files.hasNext()){
LocatedFileStatus fileStatus = files.next();
log.info("名字: "+fileStatus.getPath().getName());
log.info("文件分组: "+fileStatus.getGroup());
log.info("文件长度: "+String.valueOf(fileStatus.getLen()));
log.info("文件权限: "+String.valueOf(fileStatus.getPermission()));
BlockLocation[] blockLocations = fileStatus.getBlockLocations();
log.info("block 数:"+blockLocations.length);
for (BlockLocation blockLocation : blockLocations) {
String[] hosts = blockLocation.getHosts();
for (String host : hosts) {
System.out.println(fileStatus.getPath().getName()+"block主机节点:"+host);
}
}
}
}
@Test
public void isFile() throws IOException {
FileStatus[] fileStatuses = fileSystem.listStatus(new Path(nameSpace));
for (FileStatus fileStatus : fileStatuses) {
if (fileStatus.isFile()){
log.info("文件:"+fileStatus.getPath().getName());
}
}
}
}
网友评论