美文网首页
Map Join案例实操

Map Join案例实操

作者: bullion | 来源:发表于2019-02-01 16:02 被阅读0次

    DistributedCacheDriver

    public class DistributedCacheDriver {

        public static void main(String[] args) throws Exception {

            // 0 根据自己电脑路径重新配置

            args = new String[]{"e:/input/inputtable2", "e:/output1"};

            // 1 获取job信息

            Configuration configuration = new Configuration();

            Job job = Job.getInstance(configuration);

            // 2 设置加载jar包路径

            job.setJarByClass(DistributedCacheDriver.class);

            // 3 关联map

            job.setMapperClass(DistributedCacheMapper.class);

            // 4 设置最终输出数据类型

            job.setOutputKeyClass(Text.class);

            job.setOutputValueClass(NullWritable.class);

            // 5 设置输入输出路径

            FileInputFormat.setInputPaths(job, new Path(args[0]));

            FileOutputFormat.setOutputPath(job, new Path(args[1]));

            // 6 加载缓存数据

            job.addCacheFile(new URI("file:///e:/input/inputcache/pd.txt"));

            // 7 Map端Join(的逻辑不需要Reducer阶段,设置reducerTask数量为0

            job.setNumReduceTasks(0);

            // 8 提交

            boolean result = job.waitForCompletion(true);

            System.exit(result ? 0 : 1);

        }

    }

    DistributedCacheMapper

    public class DistributedCacheMapper extends Mapper<LongWritable, Text, Text, NullWritable> {

        HashMap<String, String> pdMap = new HashMap<>();

        @Override

        protected void setup(Context context) throws IOException, InterruptedException {

            //缓存小表

            URI[] cacheFiles = context.getCacheFiles();

            String path = cacheFiles[0].getPath().toString();

            BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(path), "UTF-8"));

            String line;

            while (StringUtils.isNotEmpty(line = reader.readLine())) {

                // 1 切割

                String[] fileds = line.split("\t");

                pdMap.put(fileds[0], fileds[1]);

            }

            // 2 关闭资源

            IOUtils.closeStream(reader);

        }

        Text k = new Text();

        @Override

        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

            // 1 获取一行

            String line = value.toString();

            // 2 切割

            String[] fileds = line.split("\t");

            // 3 获取pid

            String pid = fileds[1];

            // 4 取出pname

            String pname = pdMap.get(pid);

            // 5 拼接

            line = line + "\t" + pname;

            k.set(line);

            // 6 写出

            context.write(k, NullWritable.get());

        }

    }

    相关文章

      网友评论

          本文标题:Map Join案例实操

          本文链接:https://www.haomeiwen.com/subject/mfedsqtx.html