美文网首页
Hadoop开发--MapReduce编程--示例代码(十一)

Hadoop开发--MapReduce编程--示例代码(十一)

作者: 无剑_君 | 来源:发表于2019-12-12 10:45 被阅读0次

一、单词筛选Top K

public class TopK {
    public static final int K = 2;
    public static class KMap extends Mapper<LongWritable, Text, IntWritable, Text> {
        // 定义一个map,用于排序
        TreeMap<Integer, String> map = new TreeMap<>();

        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            // 读取值
            String line = value.toString();
            System.out.println(line+"**************************");
            //进行字符分隔
            if (line.trim().length() > 0 && line.indexOf("\t") != -1) {

                String[] arr = line.split("\t", 2);
                String name = arr[0];
                Integer num = Integer.parseInt(arr[1]);
                System.out.println(num+"-------"+name);
                map.put(num, name);
                // 如果map中的数量大于 K,则移除第一个,也就是保留最大的两个,因为已经排序
                if (map.size() > K) {
                    map.remove(map.firstKey());
                }
            }
        }

        @Override
        protected void cleanup(Mapper<LongWritable, Text, IntWritable, Text>.Context context)
                throws IOException, InterruptedException {
            // 将map中的数据输出,给reduce
            for (Integer num : map.keySet()) {
                context.write(new IntWritable(num), new Text(map.get(num)));
            }

        }

    }

    public static class KReduce extends Reducer<IntWritable, Text, IntWritable, Text> {
        //定义map,进行排序
        TreeMap<Integer, String> map = new TreeMap<Integer, String>();

        public void reduce(IntWritable key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            map.put(key.get(), values.iterator().next().toString());
            //如果超出移除 第一个值
            if (map.size() > K) {
                map.remove(map.firstKey());
            }
        }

        @Override
        protected void cleanup(Reducer<IntWritable, Text, IntWritable, Text>.Context context)
                throws IOException, InterruptedException {
            //输出
            for (Integer num : map.keySet()) {
                context.write(new IntWritable(num), new Text(map.get(num)));
            }
        }
    }

    public static void main(String[] args) {
        // TODO Auto-generated method stub

        Configuration conf = new Configuration();
        try {
            Job job = Job.getInstance(conf, "top-K统计");
            job.setJarByClass(TopK.class);
            
            job.setMapperClass(KMap.class);
            
            job.setCombinerClass(KReduce.class);
            job.setReducerClass(KReduce.class);
            
            job.setOutputKeyClass(IntWritable.class);
            job.setOutputValueClass(Text.class);
            
            FileInputFormat.setInputPaths(job, new Path("hdfs://hadoopslave1:9000/input"));
            FileOutputFormat.setOutputPath(job, new Path("hdfs://hadoopslave1:9000/output"));
            
            System.out.println(job.waitForCompletion(true));
            
        } catch (IOException e) {
            e.printStackTrace();
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

二、筛选出文章中出现频率最高的10个词语

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.map.InverseMapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;

public class WordCountK {
    /**
     * 
     * @类名: TokenizerMapper
     * @描述: 单词计数Mapper
     *
     */
    public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
        private final static IntWritable one = new IntWritable(1);
        // 单词
        private Text word = new Text();

        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString().toLowerCase(); // 全部转换为小写字母
            StringTokenizer itr = new StringTokenizer(line, " \t\n\f\" . , : ; ? ! [ ] ' - ) (");
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                context.write(word, one);
            }
        }
    }

    /**
     * 
     * @类名: TokenizerMapper2
     * @描述: top-k maper
     *
     */
    public static class TokenizerMapper2 extends Mapper<Object, Text, IntWritable, Text> {
        int c = 0;

        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString());
            IntWritable a = new IntWritable(Integer.parseInt(itr.nextToken()));
            Text b = new Text(itr.nextToken());
            if (c < 10) {
                System.out.println("sss");
                context.write(a, b);
                c++;
            }
        }
    }

    /**
     * 
     * @类名: IntSumReducer
     * @描述: Reducer
     *
     */
    public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable result = new IntWritable();

        public void reduce(Text key, Iterable<IntWritable> values, Context context)
                throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }
    }

    /**
     * 
     * @类名: IntWritableDecreasingComparator
     * @描述: 比较器 默认实现是先反序列化成对象,再对对象进行比较
     */
    private static class IntWritableDecreasingComparator extends IntWritable.Comparator {
        public int compare(WritableComparable a, WritableComparable b) {
            // 调用父类的
            return -super.compare(a, b);
        }

        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
            // 调用父类的
            return -super.compare(b1, s1, l1, b2, s2, l2);
        }
    }

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "Word-K统计");
        job.setJarByClass(WordCountK.class);
        Path tempath = new Path("hdfs://hadoopslave1:9000/output");
        //输入输出
        FileInputFormat.addInputPath(job, new Path("hdfs://hadoopslave1:9000/input"));
        FileOutputFormat.setOutputPath(job, tempath);
        //进行统计
        job.setMapperClass(TokenizerMapper.class);
        job.setReducerClass(IntSumReducer.class);
        //设置输出类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
//使用二进制处理类型不匹配问题
        //设置输出文件类型为二进制类型
        job.setOutputFormatClass(SequenceFileOutputFormat.class);
        job.waitForCompletion(false);

        //第二个任务
        Job jobsort = Job.getInstance(conf, "Word-K排序");
        //输入文件路径
        FileInputFormat.addInputPath(jobsort, tempath);
        jobsort.setOutputKeyClass(IntWritable.class);
        jobsort.setOutputValueClass(Text.class);
        jobsort.setInputFormatClass(SequenceFileInputFormat.class);
        //键值反转
        jobsort.setMapperClass(InverseMapper.class);
        jobsort.setNumReduceTasks(1);
        Path result = new Path("hdfs://hadoopslave1:9000/result");
        FileOutputFormat.setOutputPath(jobsort, result);
        //进行排序
        jobsort.setSortComparatorClass(IntWritableDecreasingComparator.class);

        jobsort.waitForCompletion(false);
    }
}

三、单表关联

public class SingleJoin {
    // Map类
    public static class MapClass extends Mapper<LongWritable, Text, Text, Text> {
        private static Text child = new Text();
        private static Text parent = new Text();
        private static Text tempChild = new Text();
        private static Text tempParent = new Text();

        protected void map(LongWritable key, Text value, Context context)
                throws java.io.IOException, InterruptedException {
            //使用空白字符作为分隔 包括空格、制表符、换页符等等。等价于 [ \f\n\r\t\v]
            String[] splits = value.toString().split("\\s+");
            if (splits.length != 2) {
                return;
            }
            
            child.set(splits[0]);
            parent.set(splits[1]);
            
            //值添加前缀
            tempChild.set("1" + splits[0]);
            tempParent.set("2" + splits[1]);
            context.write(parent, tempChild);
            context.write(child, tempParent);
        };
    }

    // Reduce类
    public static class ReduceClass extends Reducer<Text, Text, Text, Text> {
        private static Text child = new Text();
        private static Text grand = new Text();
        private static List<String> childs = new ArrayList<String>();
        private static List<String> grands = new ArrayList<String>();

        protected void reduce(Text key, Iterable<Text> values, Context context)
                throws java.io.IOException, InterruptedException {
            //数据处理
            // 1child 2 grand
            for (Text value : values) {
                String temp = value.toString();
                if (temp.startsWith("1"))
                    childs.add(temp.substring(1));
                else
                    grands.add(temp.substring(1));
            }
            // 笛卡尔积
            for (String c : childs) {
                for (String g : grands) {
                    child.set(c);
                    grand.set(g);
                    context.write(child, grand);
                }
            }
            // 清理
            childs.clear();
            grands.clear();
        };
    }

    public static void main(String[] args) throws Exception {
        String inputPath="hdfs://hadoopslave1:9000/input";
        String outputPath="hdfs://hadoopslave1:9000/output";
        Configuration conf = new Configuration();
    
        Job job = Job.getInstance(conf, "单表连接");
        job.setJarByClass(SingleJoin.class);

        job.setMapperClass(MapClass.class);
        job.setReducerClass(ReduceClass.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        FileInputFormat.addInputPath(job, new Path(inputPath));
        FileOutputFormat.setOutputPath(job, new Path(outputPath));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

四、多表连接

public class MultiJoin {
    public static int time = 0;
    /*
     * 在map中先区分输入行属于左表还是右表,然后对两列值进行分割, 
     * 保存连接列在key值,剩余列和左右表标志在value中,最后输出
     */
    public static class MapClass extends Mapper<Object, Text, Text, Text> {
        /* 实现map函数 */
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();         /* 每行文件 */
            String relationtype = new String();     /* 左右表标识 */

            /* 输入文件首行,不处理 */
            if (line.contains("factoryname") == true|| line.contains("addressed") == true) {
                return;
            }

            /* 输入的一行预处理文本 */
            StringTokenizer itr = new StringTokenizer(line);
            String mapkey = new String();
            String mapvalue = new String();
            
            int i = 0;
            while (itr.hasMoreTokens()) {
                /* 先读取一个单词 */
                String token = itr.nextToken();
                /* 判断该地址ID就把存到"values[0]" */
                //根据 第一列是否是0-9判断是左表还是右表
                if (token.charAt(0) >= '0' && token.charAt(0) <= '9') {
                    mapkey = token;
                    if (i > 0) {
                        relationtype = "1";
                    } else {
                        relationtype = "2";
                    }
                    continue;
                }

                /* 存工厂名 */
                mapvalue += token + " ";
                i++;
            }
            /* 输出左右表 */
            context.write(new Text(mapkey), new Text(relationtype + "+" + mapvalue));
        }
    }

    /*
     * reduce解析map输出,将value中数据按照左右表分别保存,
     * 然后求出笛卡尔积,并输出。
     */
    public static class ReduceClass extends Reducer<Text, Text, Text, Text> {
        /* 实现reduce函数 */
        public void reduce(Text key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            /* 输出表头 */
            if (0 == time) {
                context.write(new Text("factoryname"), new Text("addressname"));
                time++;
            }
            int factorynum = 0;
            String[] factory = new String[10];
            int addressnum = 0;
            String[] address = new String[10];
            Iterator ite = values.iterator();
            while (ite.hasNext()) {
                String record = ite.next().toString();
                int len = record.length();
                int i = 2;
                if (0 == len) {
                    continue;
                }
                /* 取得左右表标识 */
                char relationtype = record.charAt(0);
                /* 左表 */
                if ('1' == relationtype) {
                    factory[factorynum] = record.substring(i);
                    factorynum++;
                }
                /* 右表 */
                if ('2' == relationtype) {
                    address[addressnum] = record.substring(i);
                    addressnum++;
                }
            }
            /* 求笛卡尔积 */
            if (0 != factorynum && 0 != addressnum) {
                for (int m = 0; m < factorynum; m++) {
                    for (int n = 0; n < addressnum; n++) {
                        /* 输出结果 */
                        context.write(new Text(factory[m]),
                                new Text(address[n]));
                    }
                }
            }
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String inputPath0="hdfs://hadoopslave1:9000/factoy";
                String inputPath1="hdfs://hadoopslave1:9000/address";
                Path[] paths=new Pah[]{new Paht(inputPath0),new Path(inputPath1)};

        String outputPath="hdfs://hadoopslave1:9000/output";

        Job job = Job.getInstance(conf, "多表连接");
        job.setJarByClass(MultiJoin.class);
        /* 设置Map和Reduce处理类 */
        job.setMapperClass(MapClass.class);
        job.setReducerClass(ReduceClass.class);
        /* 设置输出类型 */
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        /* 设置输入和输出目录 */
        // FileInputFormat.setInputPaths(job, new Path(inputPath0),new Path(inputPath1));
                 FileInputFormat.setInputPaths(job,paths );
        FileOutputFormat.setOutputPath(job, new Path(outputPath));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

相关文章

网友评论

      本文标题:Hadoop开发--MapReduce编程--示例代码(十一)

      本文链接:https://www.haomeiwen.com/subject/jvljwctx.html