通过Scala对HDFS的一些操作,包括创建目录,删除目录,上传文件,文件读取,删除文件,Append文件等等;
import java.io._
import java.net.URI
import java.util._
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.zookeeper.common.IOUtils
/**
* <Description> 通过scala操作HDFS<br>
*
* @author Sunny<br>
* @taskId: <br>
* @version 1.0<br>
* @createDate 2018/06/11 9:29 <br>
* @see com.spark.sunny.hdfs <br>
*/
object HDFSUtil {
val hdfsUrl = "hdfs://iotsparkmaster:9000"
var realUrl = ""
/**
* make a new dir in the hdfs
*
* @param dir the dir may like '/tmp/testdir'
* @return boolean true-success, false-failed
*/
def mkdir(dir : String) : Boolean = {
var result = false
if (StringUtils.isNoneBlank(dir)) {
realUrl = hdfsUrl + dir
val config = new Configuration()
val fs = FileSystem.get(URI.create(realUrl), config)
if (!fs.exists(new Path(realUrl))) {
fs.mkdirs(new Path(realUrl))
}
fs.close()
result = true
}
result
}
/**
* delete a dir in the hdfs.
* if dir not exists, it will throw FileNotFoundException
*
* @param dir the dir may like '/tmp/testdir'
* @return boolean true-success, false-failed
*
*/
def deleteDir(dir : String) : Boolean = {
var result = false
if (StringUtils.isNoneBlank(dir)) {
realUrl = hdfsUrl + dir
val config = new Configuration()
val fs = FileSystem.get(URI.create(realUrl), config)
fs.delete(new Path(realUrl), true)
fs.close()
result = true
}
result
}
/**
* list files/directories/links names under a directory, not include embed
* objects
*
* @param dir a folder path may like '/tmp/testdir'
* @return List<String> list of file names
*/
def listAll(dir : String) : List[String] = {
val names : List[String] = new ArrayList[String]()
if (StringUtils.isNoneBlank(dir)) {
realUrl = hdfsUrl + dir
val config = new Configuration()
val fs = FileSystem.get(URI.create(realUrl), config)
val stats = fs.listStatus(new Path(realUrl))
for (i <- 0 to stats.length - 1) {
if (stats(i).isFile) {
names.add(stats(i).getPath.toString)
} else if (stats(i).isDirectory) {
names.add(stats(i).getPath.toString)
} else if (stats(i).isSymlink) {
names.add(stats(i).getPath.toString)
}
}
}
names
}
/**
* upload the local file to the hds,
* notice that the path is full like /tmp/test.txt
* if local file not exists, it will throw a FileNotFoundException
*
* @param localFile local file path, may like F:/test.txt or /usr/local/test.txt
*
* @param hdfsFile hdfs file path, may like /tmp/dir
* @return boolean true-success, false-failed
*
**/
def uploadLocalFile2HDFS(localFile : String, hdfsFile : String) : Boolean = {
var result = false
if (StringUtils.isNoneBlank(localFile) && StringUtils.isNoneBlank(hdfsFile)) {
realUrl = hdfsUrl + hdfsFile
val config = new Configuration()
val hdfs = FileSystem.get(URI.create(hdfsUrl), config)
val src = new Path(localFile)
val dst = new Path(realUrl)
hdfs.copyFromLocalFile(src, dst)
hdfs.close()
result = true
}
result
}
/**
* create a new file in the hdfs. notice that the toCreateFilePath is the full path
* and write the content to the hdfs file.
* create a new file in the hdfs.
* if dir not exists, it will create one
*
* @param newFile new file path, a full path name, may like '/tmp/test.txt'
* @param content file content
* @return boolean true-success, false-failed
**/
def createNewHDFSFile(newFile : String, content : String) : Boolean = {
var result = false
if (StringUtils.isNoneBlank(newFile) && null != content) {
realUrl = hdfsUrl + newFile
val config = new Configuration()
val hdfs = FileSystem.get(URI.create(realUrl), config)
val os = hdfs.create(new Path(realUrl))
os.write(content.getBytes("UTF-8"))
os.close()
hdfs.close()
result = true
}
result
}
/**
* delete the hdfs file
*
* @param hdfsFile a full path name, may like '/tmp/test.txt'
* @return boolean true-success, false-failed
*/
def deleteHDFSFile(hdfsFile : String) : Boolean = {
var result = false
if (StringUtils.isNoneBlank(hdfsFile)) {
realUrl = hdfsUrl + hdfsFile
val config = new Configuration()
val hdfs = FileSystem.get(URI.create(realUrl), config)
val path = new Path(realUrl)
val isDeleted = hdfs.delete(path, true)
hdfs.close()
result = isDeleted
}
result
}
/**
* read the hdfs file content
*
* @param hdfsFile a full path name, may like '/tmp/test.txt'
* @return byte[] file content
*/
def readHDFSFile(hdfsFile : String) : Array[Byte] = {
var result = new Array[Byte](0)
if (StringUtils.isNoneBlank(hdfsFile)) {
realUrl = hdfsUrl + hdfsFile
val config = new Configuration()
val hdfs = FileSystem.get(URI.create(realUrl), config)
val path = new Path(realUrl)
if (hdfs.exists(path)) {
val inputStream = hdfs.open(path)
val stat = hdfs.getFileStatus(path)
val length = stat.getLen.toInt
val buffer = new Array[Byte](length)
inputStream.readFully(buffer)
inputStream.close()
hdfs.close()
result = buffer
}
}
result
}
/**
* append something to file dst
*
* @param hdfsFile a full path name, may like '/tmp/test.txt'
* @param content string
* @return boolean true-success, false-failed
*/
def append(hdfsFile : String, content : String) : Boolean = {
var result = false
if (StringUtils.isNoneBlank(hdfsFile) && null != content) {
realUrl = hdfsUrl + hdfsFile
val config = new Configuration()
config.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER")
config.set("dfs.client.block.write.replace-datanode-on-failure.enable", "true")
val hdfs = FileSystem.get(URI.create(realUrl), config)
val path = new Path(realUrl)
if (hdfs.exists(path)) {
val inputStream = new ByteArrayInputStream(content.getBytes())
val outputStream = hdfs.append(path)
IOUtils.copyBytes(inputStream, outputStream, 4096, true)
outputStream.close()
inputStream.close()
hdfs.close()
result = true
}
} else {
HDFSUtil.createNewHDFSFile(hdfsFile, content);
result = true
}
result
}
}
测试代码如下:
/**
* <Description> <br>
*
* @author Sunny<br>
* @taskId: <br>
* @version 1.0<br>
* @createDate 2018/06/11 13:16 <br>
* @see com.spark.sunny.hdfs <br>
*/
object TestHDFSUtil {
val dir = "/iotcmp/cdr"
val parentDir = "/iotcmp"
val hdfsUrl = "hdfs://iotsparkmaster:9000"
def main(args: Array[String]): Unit = {
//testDeletedirNormal
//testUploadLocalFile2HDFS
//testCreateNewHDFSFileNormal
testDeleteHDFSFile
//testReadHDFSFile
//testAppend
}
@Test
def testMkdirNull(): Unit = {
try{
assertEquals(false, HDFSUtil.mkdir(null));
assertEquals(false, HDFSUtil.mkdir(" "));
assertEquals(false, HDFSUtil.mkdir(""));
} catch {
case ex : Exception => assertEquals(true, false);
}
}
def testMkdirNormal(): Unit = {
HDFSUtil.deleteDir(dir)
var result : Boolean = HDFSUtil.mkdir(dir)
val listFile: List[String] = HDFSUtil.listAll(parentDir)
var existFile = false
for (i <- 0 to listFile.size() - 1) {
val elem = listFile.get(i)
if(elem.equals(hdfsUrl + dir)) {
existFile = true
}
}
println(existFile)
}
@Test
def testDeletedirNull(): Unit = {
try{
assertEquals(false, HDFSUtil.deleteDir(null));
assertEquals(false, HDFSUtil.deleteDir(" "));
assertEquals(false, HDFSUtil.deleteDir(""));
} catch {
case ex : Exception => assertEquals(true, false);
}
}
def testDeletedirNormal(): Unit = {
HDFSUtil.deleteDir(dir)
val listFile: List[String] = HDFSUtil.listAll(parentDir)
var existFile = false
for (i <- 0 to listFile.size() - 1) {
val elem = listFile.get(i)
if(elem.equals(hdfsUrl + dir)) {
existFile = true
}
}
println(existFile)
}
def testUploadLocalFile2HDFS(): Unit = {
val localFile = "C:\\Users\\yaj\\Desktop\\CDR\\USAGE_CDR_53_888_0.cdr"
val remoteFile = dir + "/USAGE_CDR_53_888_0.cdr"
HDFSUtil.mkdir(dir)
HDFSUtil.deleteHDFSFile(remoteFile)
HDFSUtil.uploadLocalFile2HDFS(localFile, remoteFile)
}
def testCreateNewHDFSFileNormal(): Unit = {
val newFile = dir + "/iot.txt"
val content = "iot file1"
HDFSUtil.deleteHDFSFile(newFile)
HDFSUtil.createNewHDFSFile(newFile, content)
val result = new String(HDFSUtil.readHDFSFile(newFile))
println(result)
}
def testDeleteHDFSFile(): Unit = {
this.testCreateNewHDFSFileNormal()
val remoteFile = dir + "/iot.txt"
val isDeleted : Boolean = HDFSUtil.deleteHDFSFile(remoteFile)
println(isDeleted)
}
def testReadHDFSFile(): Unit = {
//this.testUploadLocalFile2HDFS()
val remoteFile = dir + "/USAGE_CDR_53_888_0.cdr"
val result = new String(HDFSUtil.readHDFSFile(remoteFile))
println("USAGE_CDR_53_888_0.cdr: " + result)
}
def testAppend(): Unit = {
val newFile = dir + "/iot.txt"
val content1 = "hello iot append1 \r\n"
val content2 = "hello iot append2 \r\n"
HDFSUtil.deleteHDFSFile(newFile)
HDFSUtil.createNewHDFSFile(newFile, content1)
HDFSUtil.append(newFile, content2)
val result = new String(HDFSUtil.readHDFSFile(newFile))
println(result)
}
}
网友评论