DistributedCacheDriver
public class DistributedCacheDriver {
public static void main(String[] args) throws Exception {
// 0 根据自己电脑路径重新配置
args = new String[]{"e:/input/inputtable2", "e:/output1"};
// 1 获取job信息
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
// 2 设置加载jar包路径
job.setJarByClass(DistributedCacheDriver.class);
// 3 关联map
job.setMapperClass(DistributedCacheMapper.class);
// 4 设置最终输出数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
// 5 设置输入输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 6 加载缓存数据
job.addCacheFile(new URI("file:///e:/input/inputcache/pd.txt"));
// 7 Map端Join(的逻辑不需要Reducer阶段,设置reducerTask数量为0
job.setNumReduceTasks(0);
// 8 提交
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
DistributedCacheMapper
public class DistributedCacheMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
HashMap<String, String> pdMap = new HashMap<>();
@Override
protected void setup(Context context) throws IOException, InterruptedException {
//缓存小表
URI[] cacheFiles = context.getCacheFiles();
String path = cacheFiles[0].getPath().toString();
BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(path), "UTF-8"));
String line;
while (StringUtils.isNotEmpty(line = reader.readLine())) {
// 1 切割
String[] fileds = line.split("\t");
pdMap.put(fileds[0], fileds[1]);
}
// 2 关闭资源
IOUtils.closeStream(reader);
}
Text k = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1 获取一行
String line = value.toString();
// 2 切割
String[] fileds = line.split("\t");
// 3 获取pid
String pid = fileds[1];
// 4 取出pname
String pname = pdMap.get(pid);
// 5 拼接
line = line + "\t" + pname;
k.set(line);
// 6 写出
context.write(k, NullWritable.get());
}
}
网友评论