上传文件
PutFile.java
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class PutFile {
public static void main(String[] args) throws IOException,URISyntaxException {
Configuration conf = new Configuration();
URI uri = new URI("hdfs://192.168.56.31:9000");
FileSystem fs = FileSystem.get(uri,conf);
//本地文件
Path src = new Path("D:\\scala\\文档\\63\\access.txt");
//HDFS存放位置
Path dst = new Path("/");
fs.copyFromLocalFile(src, dst);
System.out.println("Upload to " + conf.get("fs.defaultFS"));
// 以下相当于执行hdfs dfs -ls /
FileStatus files[] = fs.listStatus(dst);
for (FileStatus file:files) {
System.out.println(file.getPath());
}
}
}
创建文件
CreateFile.java
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class CreateFile {
public static void main(String[] args) throws Exception {
FileSystem fs = FileSystem.get(new URI("hdfs://192.168.56.31:9000"),new Configuration());
// 定义新文件
Path dfs = new Path("/hdfsfile");
// 创建新文件,如果有则覆盖(true)
FSDataOutputStream create = fs.create(dfs,true);
create.writeBytes("Hello,HDFS !");
}
}
查看文件详细信息
FileLocation.java
import java.net.URI;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class FileLocation {
public static void main(String[] args) throws Exception {
FileSystem fs = FileSystem.get(new URI("hdfs://192.168.56.31:9000"),new Configuration());
Path fpath = new Path("/access.txt");
FileStatus filestatus = fs.getFileStatus(fpath);
/*
* 获取文件在HDFS集群位置:
* FileSystem.getFileBlockLocation(FileStatus file,long start, long len)"
* 可查找指定文件在HDFS集群上的位置,其中file为文件的完整路径,start和len来标识查找文件的路径
*/
BlockLocation[]blkLocations = fs.getFileBlockLocations(filestatus, 0, filestatus.getLen());
filestatus.getAccessTime();
for(int i=0;i<blkLocations.length;i++) {
String[] hosts = blkLocations[i].getHosts();
System.out.println("block_"+i+"_location:"+hosts[0]);
}
// 格式化日期输出
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
// 获取文件访问时间,返回long
long accessTime = filestatus.getAccessTime();
System.out.println("access:"+formatter.format(new Date(accessTime)));
// 获取文件修改时间,返回long
long modificationTime = filestatus.getModificationTime();
System.out.println("modification:"+formatter.format(new Date(modificationTime)));
// 获取块大小,单位B
long blockSize = filestatus.getBlockSize();
System.out.println("blockSize:"+blockSize);
// 获取文件大小,单位B
long len = filestatus.getLen();
System.out.println("length:"+len);
// 获取文件所在用户组
String group = filestatus.getGroup();
System.out.println("group:"+group);
// 获取文件拥有者
String owner = filestatus.getOwner();
System.out.println("owner:"+owner);
// 获取文件拷贝数
short replication = filestatus.getReplication();
System.out.println("replication:"+replication);
}
}
下载文件
GetFile.java
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class GetFile {
public static void main(String[] args) throws Exception {
FileSystem fs = FileSystem.get(new URI("hdfs://192.168.56.31:9000"),new Configuration());
//hdfs上文件
Path src = new Path("/access.txt");
// 下载到本地的文件名
Path dst = new Path("D:\\scala\\文档\\63\\newfile.txt");
fs.copyToLocalFile(src, dst);
}
}
RPC通信
反射机制
Student.java
interface people{
public void study();
}
public class Student implements people {
private String name; //名字;
private int age;
//构造方法1;
public Student() {}
// 构造方法2;
public Student(String name,int age) {
this.name = name;
this.age = age;
}
//set和get方法;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
public void study() {
System.out.println("正在学习");
}
// 程序的主方法;
public static void main(String[] args) {
//
Class<? extends Student> tmp=Student.class;
String cName = tmp.getName();
System.out.println("类的名字是"+cName);
try {
// 动态加载指定类名
Class c = Class.forName(cName);
//得到类中的方法;
java.lang.reflect.Method[] ms = c.getMethods();
for(java.lang.reflect.Method m:ms) {
System.out.println("方法的名字是"+m.getName());
System.out.println("方法的返回值类型是"+m.getReturnType().toString());
System.out.println("方法的参数类型是"+m.getParameterTypes());
}
//得到属性
java.lang.reflect.Field[] fields = c.getFields();
for(java.lang.reflect.Field f:fields) {
System.out.println("参数类型是"+f.getType());
}
// 得到父接口
Class[] is = c.getInterfaces();
for(Class s:is) {
System.out.println("父接口的名字是"+s.getName());
}
// 判断是否是数组
System.out.println("数组:"+c.isArray());
String CLName = c.getClassLoader().getClass().getName();
System.out.println("类加载器:"+CLName);
// 实例化构造器
java.lang.reflect.Constructor cons = c.getConstructor(String.class,int.class);
Student stu = (Student) cons.newInstance("hadoop",23);
System.out.println(stu.getName()+":"+stu.getAge());
}catch (Exception e) {
e.printStackTrace();
}
}
}
MapReduce实现技术
WordMapper.java
package wordcount;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
// 创建一个WordMapper类继承于Mapper抽象类
public class WordMapper extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
//Mapper抽象类的核心方法,三个参数
public void map( Object key, //首字符偏移量
Text value, //文件的一行内容
Context context) //Mapper端的上下文,与outputCollector和 Reporter的功能类似
throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
WordReduce.java
package wordcount;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
// 创建一个WordReducer类继承于Reducer抽象类
public class WordReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable(); //记录词频
//Reducer 抽象类的核心方法,3个参数
public void reduce( Text key, //Map端输出的key值
Iterable<IntWritable> values, // Map端输出的Value集合
Context context)
throws IOException,InterruptedException {
int sum = 0;
for (IntWritable val : values) //遍历values集合,并把值相加
{
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
WordMain.java
package wordcount;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordMain {
public static void main(String[] args) throws Exception {
//Configuration类:读取Hadoop的配置文件,如core-site.xml...;
// 也可用set方法重新设置(会覆盖):conf.set("fs.default.name",//"hdfs://xxxx:9000")
Configuration conf = new Configuration();
// 将命令行中参数自动设置到变量conf中
String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
// conf.set("fs.defaultFS", "hdfs://192.168.56.31:9000");
// conf.set("hadoop.job.user", "root");
// conf.set("mapreduce.framework.name", "yarn");
// conf.set("mapreduce.jobtracker.address", "192.168.56.31:9001");
// conf.set("yarn.resourcemanager.hostname", "192.168.56.31");
// conf.set("yarn.resourcemanager.admin.address", "192.168.56.31:8033");
// conf.set("yarn.resourcemanager.address", "192.168.56.31:80312");
// conf.set("yarn.resourcemanager.resource-tracker.address", "192.168.56.31:8031");
// conf.set("yarn.resourcemanager.scheduler.address", "192.168.56.31:8030");
if(otherArgs.length != 2)
{
System.err.println("Usage: wordcount <in><out>");
System.exit(2);
}
Job job = new Job(conf, "word count"); // 新建一个job,传入配置信息
job.setJarByClass(WordMain.class); //设置主类
job.setMapperClass(WordMapper.class); //设置Mapper类
job.setCombinerClass(WordReducer.class); //设置作业合成类
job.setReducerClass(WordReducer.class); //设置Reducer类
job.setOutputKeyClass(Text.class); //设置输出数据的关键类
job.setOutputValueClass(IntWritable.class); //设置输出值类
FileInputFormat.addInputPath(job, new Path(otherArgs[0])); //文件输入
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); // 文件输出
System.exit(job.waitForCompletion(true) ? 0 : 1); // 等待完成退出
}
}
打包上传
hdfs dfs -mkdir /user/hadoop
hdfs dfs -mkdir /user/hadoop/input
hdfs dfs -put file* /user/hadoop/input
hdfs dfs -ls /user/hadoop/input
hadoop jar wordcount.jar wordcount.WordMain /user/hadoop/input/file* /user/hadoop/output
hdfs dfs -ls /user/hadoop/output
hdfs dfs -text /user/hadoop/output/part-r-00000
hdfs://192.168.56.31:9000/user/hadoop/input
hdfs://192.168.56.31:9000/user/hadoop/output2
image.png
WordCount2.java
package wordcount2;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCount2 {
public static class TokenizerMapper extends Mapper<Object, Text, Text,IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void Map(Object key,Text value, Context context) throws IOException,InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens())
{
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer extends Reducer<Text,IntWritable,Text, IntWritable>
{
private IntWritable result = new IntWritable();
public void reduce(Text key,Iterable<IntWritable>value, Context context) throws IOException, InterruptedException
{
int sum = 0;
for (IntWritable val: value)
{
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://192.168.56.31:9000");
conf.set("hadoop.job.user", "root");
conf.set("mapreduce.framework.name", "yarn");
conf.set("mapreduce.jobtracker.address", "192.168.56.31:9001");
conf.set("yarn.resourcemanager.hostname", "192.168.56.31");
conf.set("yarn.resourcemanager.admin.address", "192.168.56.31:8033");
conf.set("yarn.resourcemanager.address", "192.168.56.31:80312");
conf.set("yarn.resourcemanager.resource-tracker.address", "192.168.56.31:8031");
conf.set("yarn.resourcemanager.scheduler.address", "192.168.56.31:8030");
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2)
{
System.err.println("Usage:wordcount <in><out>");
System.exit(2);
}
Job job = new Job(conf, "word count2"); // 新建一个job,传入配置信息
job.setJarByClass(WordCount2.class); //设置主类
job.setMapperClass(TokenizerMapper.class); //设置Mapper类
job.setCombinerClass(IntSumReducer.class); //设置作业合成类
job.setReducerClass(IntSumReducer.class); //设置Reducer类
job.setOutputKeyClass(Text.class); //设置输出数据的关键类
job.setOutputValueClass(IntWritable.class); //设置输出值类
FileInputFormat.addInputPath(job, new Path(otherArgs[0])); //文件输入
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); // 文件输出
boolean flag = job.waitForCompletion(true);
System.out.println("SUCCEED !"+flag); //任务完成提示
System.exit(flag ? 0 : 1);
System.out.println();
}
}
网友评论