美文网首页我爱编程
hadoop 两表连接

hadoop 两表连接

作者: 爱吃秋刀鱼的大猫 | 来源:发表于2018-03-25 11:03 被阅读0次

有两张表,一张是客户表,一张是订单表,他们都有一个共同的cid来连接。

如果客户表比较小,那么可以在map端的setup阶段将这个表加载到内存中,那么就可以直接在mapper中将两个表连接。

如果客户表和订单表都很大,那么就需要一个mapper和reducer来处理了。

先展示当客户表比较小的时候的处理方法

Mapper

package com.huawei.join;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
import java.util.HashMap;
import java.util.Map;

public class JoinMapper extends Mapper<LongWritable,Text,Text,NullWritable>{

    Map<String,String>customsMap=new HashMap<String, String>();
    @Override
    protected void setup(Context context) throws IOException {
        Configuration conf= context.getConfiguration();
        FileSystem fs= FileSystem.get(conf);
        FSDataInputStream fis=fs.open(new Path("/Users/simmucheng/tmp/join_test/customer.txt"));
        BufferedReader br=new BufferedReader(new InputStreamReader(fis));
        String line=null;
        while((line=br.readLine())!=null){
            String cid =line.substring(0,line.indexOf(","));
            customsMap.put(cid,line);
        }

    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
       String str=value.toString();
       String id=str.substring(str.lastIndexOf(",")+1);
       String productInfo=str.substring(0,str.lastIndexOf(','));
       String customerinfo=customsMap.get(id);
       context.write(new Text(customerinfo+','+productInfo),NullWritable.get());
    }
}

Main端代码

package com.huawei.join;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * 顾客表为小表,所以
 * 将顾客表在mapper的setup阶段将小表加载到内存中,然后直接在mapper端进行连接,而不需要
 * 多余的reducer
 */

public class JoinApp {
    public static void main(String[] args) throws Exception {
        Configuration conf=new Configuration();
        conf.set("fs.defaultFS","file:///");
        FileSystem fs=FileSystem.get(conf);
        if(fs.exists(new Path(args[1]))){
            fs.delete(new Path(args[1]));
        }
        Job job =Job.getInstance(conf);

        job.setJarByClass(JoinApp.class);
        job.setMapperClass(JoinMapper.class);
        job.setOutputKeyClass(Text.class);
        job.setJobName("JoinTest");
        job.setNumReduceTasks(0);
        FileInputFormat.addInputPath(job,new Path(args[0]));

        FileOutputFormat.setOutputPath(job,new Path(args[1]));
        job.waitForCompletion(true);

    }
}

如果客户表和订单表都是大表,也就是数据量很大的表,那么处理办法为
首先要自定义数据类
JoinBean

package com.huawei.JoinBoth;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class JoinBean implements WritableComparable {
    private int type;
    private int cid;
    private int pid;
    private String customerInfo="";
    private String productorInfo="";

    /*
    type 0 customer 1 productor
     */
    public int getType() {
        return type;
    }

    public void setType(int type) {
        this.type = type;
    }

    public int getCid() {
        return cid;
    }

    public void setCid(int cid) {
        this.cid = cid;
    }

    public int getPid() {
        return pid;
    }

    public void setPid(int pid) {
        this.pid = pid;
    }

    public String getCustomerInfo() {
        return customerInfo;
    }

    public void setCustomerInfo(String customerInfo) {
        this.customerInfo = customerInfo;
    }

    public String getProductorInfo() {
        return productorInfo;
    }

    public void setProductorInfo(String productorInfo) {
        this.productorInfo = productorInfo;
    }

    public int compareTo(Object o) {
        JoinBean jb=(JoinBean)o;

        if(cid==jb.getCid()){
            if(type!=jb.getType()){
                return type-jb.getType();
            }
            else return -(pid-jb.pid);
        }
        else {
            return (cid-jb.getCid());
        }
    }

    public void write(DataOutput out) throws IOException {

        out.writeInt(type);
        out.writeInt(cid);
        out.writeInt(pid);
        out.writeUTF(customerInfo);
        out.writeUTF(productorInfo);
    }

    public void readFields(DataInput in) throws IOException {

        this.type=in.readInt();
        this.cid=in.readInt();
        this.pid=in.readInt();
        this.customerInfo=in.readUTF();
        this.productorInfo=in.readUTF();
    }
}

自定义分区类

package com.huawei.JoinBoth;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Partitioner;

public class JoinPartitioner extends Partitioner<JoinBean,NullWritable>{


    /**
     * 按照用户id来划分
     * @param joinBean
     * @param nullWritable
     * @param numPartitions
     * @return
     */
    public int getPartition(JoinBean joinBean, NullWritable nullWritable, int numPartitions) {
        return (joinBean.getCid()%numPartitions);
    }
}

自定义reducer端的分组类

package com.huawei.JoinBoth;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class JoinCompareGroup extends WritableComparator{
    public JoinCompareGroup() {
        super(JoinBean.class,true);
    }

    /**
     * 按照cid进行分组
     * 不管是分组对比器还是排序比较器,都需要重写compare(WritableComparable类型的方法)
     * @param a
     * @param b
     * @return
     */
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        JoinBean ja=(JoinBean)a;
        JoinBean jb=(JoinBean)b;
        return (ja.getCid()-jb.getCid());
    }
}

自定义排序比较器

package com.huawei.JoinBoth;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class JoinComparator extends WritableComparator{
    public JoinComparator() {
        super(JoinBean.class,true);
    }

    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        JoinBean ja=(JoinBean)a;
        JoinBean jb=(JoinBean)b;
        return ja.compareTo(jb);
    }
}

Mapper

package com.huawei.JoinBoth;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.IOException;

public class JoinMapper extends Mapper<LongWritable,Text,JoinBean,NullWritable>{
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String str=value.toString();
        FileSplit split=(FileSplit) context.getInputSplit();
        String path=split.getPath().toString();
        JoinBean joinBean=new JoinBean();
        if(path.contains("customer")){
            joinBean.setType(0);
            String cid=str.substring(0,str.indexOf(","));
            String cusinfo=str;
            joinBean.setCid(Integer.parseInt(cid));
            joinBean.setCustomerInfo(cusinfo);
        }
        else {
            joinBean.setType(1);
            String cid=str.substring(str.lastIndexOf(',')+1);
            String pid=str.substring(0,str.indexOf(','));
            String proinfo=str.substring(0,str.lastIndexOf(','));
            joinBean.setPid(Integer.parseInt(pid));
            joinBean.setProductorInfo(proinfo);
            joinBean.setCid(Integer.parseInt(cid));
        }

        context.write(joinBean,NullWritable.get());

    }
}

Reducer

package com.huawei.JoinBoth;

import org.apache.commons.collections.iterators.IteratorChain;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.Iterator;

public class JoinReducer extends Reducer<JoinBean,NullWritable,Text,NullWritable>{
    @Override
    protected void reduce(JoinBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        JoinBean joinBean=new JoinBean();
        Iterator<NullWritable> it=values.iterator();
        it.next();
        int type=key.getType();
        int cid=key.getCid();
        String custominfo=key.getCustomerInfo();
        while(it.hasNext()){
            it.next();
            String proinfo=key.getProductorInfo();
            context.write(new Text(custominfo+","+proinfo),NullWritable.get());
        }
    }
}

主函数

package com.huawei.JoinBoth;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class JoinTest {
    public static void main(String[] args) throws Exception {
        Configuration conf=new Configuration();
        conf.set("fs.defaultFS","file:///");
        Job job=Job.getInstance(conf);
        FileSystem fs=FileSystem.get(conf);
        if(fs.exists(new Path(args[1]))){
            fs.delete(new Path(args[1]));
        }
        job.setJarByClass(JoinTest.class);
        job.setMapperClass(JoinMapper.class);
        job.setReducerClass(JoinReducer.class);
        job.setInputFormatClass(TextInputFormat.class);

        job.setMapOutputKeyClass(JoinBean.class);
        job.setMapOutputValueClass(NullWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        job.setNumReduceTasks(1);

        job.setGroupingComparatorClass(JoinCompareGroup.class);
        //job.setSortComparatorClass(JoinComparator.class);
        job.setPartitionerClass(JoinPartitioner.class);

        FileInputFormat.addInputPath(job,new Path(args[0]));
        FileOutputFormat.setOutputPath(job,new Path(args[1]));



        job.waitForCompletion(true);

    }
}
'''
客户表和订单表如下:
![屏幕快照 2018-03-25 上午11.09.59.png](https://img.haomeiwen.com/i2111066/fd4f3937ef0e309a.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

![屏幕快照 2018-03-25 上午11.10.08.png](https://img.haomeiwen.com/i2111066/653df3ce9afb65aa.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

结果如下:
![屏幕快照 2018-03-25 上午11.10.16.png](https://img.haomeiwen.com/i2111066/4745975cd51bb15f.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

相关文章

  • hadoop 两表连接

    有两张表,一张是客户表,一张是订单表,他们都有一个共同的cid来连接。 如果客户表比较小,那么可以在map端的se...

  • Hadoop6- MapReduce join

    Hadoop MapReduce join MapReduce提供了表连接操作其中包括Map端join、Reduc...

  • Mysql中的连接

    知识点:交叉连接内连接外连接:左外连接、右外连接 以下实验涉及到两张表,表a和表b如下 交叉连接 对两张表进行笛卡...

  • MySQL左连接与右连接

    先通过下面两个表展示一下左连接和右连接的结果 1.左连接与右连接 员工表: 学生表: 左连接 右连接 通过以上两个...

  • 大数据开发之Hive案例篇4-Map数过少导致性能低下

    一. 问题描述 大表 通过 非等值连接 关联 小表,数据量激增,但是Hadoop没有读取到,导致只分配了6个ma...

  • 多表查询

    两张表: 左连接: 左表全部查询,以左表为基准 右连接: 右表全部查询,以右表为基准 内连接: 找交集

  • MySQL连接查询:交叉连接、内连接

    连接查询 所谓连接查询就是指两个或两个以上的表连接成为一个表进行查询。实际上,两个表的完全连接是指:A表的每一行和...

  • 六、SQL–表连接②(内连接)

    inner join: 内连接组合两张表,并且基于两张表中的关联关系来连接它们。使用内连接需要指定表中哪些字段组成...

  • Mysql的连接

    Mysql内连接=等值连接就相当于求两个表的相同处表1 表2 内连接语法 实例 left join on以左表为主...

  • mysql

    Mysql 内连接、外连接 创建一个学生表 创建一个成绩表 给 两张表插入数据 内连接 左连接select * f...

网友评论

    本文标题:hadoop 两表连接

    本文链接:https://www.haomeiwen.com/subject/jushcftx.html