美文网首页
尚硅谷大数据技术之电信客服

尚硅谷大数据技术之电信客服

作者: 尚硅谷教育 | 来源:发表于2018-12-25 10:03 被阅读14次
  1. 创建类:MySQLOutputFormat
    package com.atguigu.analysis.format;

import com.atguigu.analysis.converter.impl.DimensionConverter;
import com.atguigu.analysis.kv.base.BaseDimension;
import com.atguigu.analysis.kv.base.BaseValue;
import com.atguigu.analysis.kv.impl.ComDimension;
import com.atguigu.analysis.kv.impl.CountDurationValue;
import com.atguigu.constants.Constants;
import com.atguigu.utils.JDBCCacheBean;
import com.atguigu.utils.JDBCUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;

public class MySQLOutputFormat extends OutputFormat<BaseDimension, BaseValue> {
@Override
public RecordWriter<BaseDimension, BaseValue> getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
//创建jdbc连接
Connection conn = null;
try {
conn = JDBCCacheBean.getInstance();
//关闭自动提交,以便于批量提交
conn.setAutoCommit(false);
} catch (SQLException e) {
throw new IOException(e);
}
return new MysqlRecordWriter(conn);
}

@Override
public void checkOutputSpecs(JobContext jobContext) throws IOException, InterruptedException {
    // 校检输出
}

@Override
public OutputCommitter getOutputCommitter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
    String name = taskAttemptContext.getConfiguration().get(FileOutputFormat.OUTDIR);
    Path output = name == null ? null : new Path(name);
    return new FileOutputCommitter(output, taskAttemptContext);
}

static class MysqlRecordWriter extends RecordWriter<BaseDimension, BaseValue> {
    private Connection conn = null;
    private DimensionConverter dimensionConverter = null;
    private PreparedStatement preparedStatement = null;
    private int batchNumber = 0;
    int count = 0;
    public MysqlRecordWriter(Connection conn) {
        this.conn = conn;
        this.batchNumber = Constants.JDBC_DEFAULT_BATCH_NUMBER;
        this.dimensionConverter = new DimensionConverter();
    }

    @Override
    public void write(BaseDimension key, BaseValue value) throws IOException, InterruptedException {
        try {
            // 统计当前PreparedStatement对象待提交的数据量
            String sql = "INSERT INTO `tb_call`(`id_date_contact`, `id_date_dimension`, `id_contact`, `call_sum`, `call_duration_sum`) VALUES(?, ?, ?, ?, ?) ON DUPLICATE KEY UPDATE `id_date_contact` = ? ;";
            if (preparedStatement == null) {
                preparedStatement = conn.prepareStatement(sql);
            }
            // 本次sql
            int i = 0;
            ComDimension comDimension = (ComDimension) key;
            CountDurationValue countDurationValue = (CountDurationValue) value;

            int id_date_dimension = dimensionConverter.getDimensionId(comDimension.getDateDimension());
            int id_contact = dimensionConverter.getDimensionId(comDimension.getContactDimension());
            int call_sum = countDurationValue.getCallSum();
            int call_duration_sum = countDurationValue.getCallDurationSum();

            String id_date_contact = id_date_dimension + "_" + id_contact;

            preparedStatement.setString(++i, id_date_contact);
            preparedStatement.setInt(++i, id_date_dimension);
            preparedStatement.setInt(++i, id_contact);
            preparedStatement.setInt(++i, call_sum);
            preparedStatement.setInt(++i, call_duration_sum);

            preparedStatement.setString(++i, id_date_contact);
            preparedStatement.addBatch();
            //当前缓存了多少个sql语句等待批量执行,计数器
            count++;

            // 批量提交
            if (count >= this.batchNumber) {
                preparedStatement.executeBatch(); // 批量提交
                conn.commit(); // 连接提交
                count = 0;
            }
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void close(TaskAttemptContext context) throws IOException, InterruptedException {
        try {
            preparedStatement.executeBatch();
            this.conn.commit();
        } catch (SQLException e) {
            e.printStackTrace();
        }finally {
            JDBCUtil.close(conn, preparedStatement, null);
        }
    }
}

}

  1. 创建类:BaseDimension
    package com.atguigu.analysis.kv.base;

import org.apache.hadoop.io.WritableComparable;

public abstract class BaseDimension implements WritableComparable<BaseDimension> {}

  1. 创建类:BaseValue
    package com.atguigu.analysis.kv.base;

import org.apache.hadoop.io.Writable;

public abstract class BaseValue implements Writable { }

  1. 创建类:ComDimension
    package com.atguigu.analysis.kv.impl;

import com.atguigu.analysis.kv.base.BaseDimension;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class ComDimension extends BaseDimension{
//时间维度
private DateDimension dateDimension = new DateDimension();
//联系人维度
private ContactDimension contactDimension = new ContactDimension();

public ComDimension() {
    super();
}

public ComDimension(DateDimension dateDimension, ContactDimension contactDimension) {
    super();
    this.dateDimension = dateDimension;
    this.contactDimension = contactDimension;
}

public DateDimension getDateDimension() {
    return dateDimension;
}

public void setDateDimension(DateDimension dateDimension) {
    this.dateDimension = dateDimension;
}

public ContactDimension getContactDimension() {
    return contactDimension;
}

public void setContactDimension(ContactDimension contactDimension) {
    this.contactDimension = contactDimension;
}

@Override
public int compareTo(BaseDimension o) {
    if(this == o) return 0;
    ComDimension comDimension = (ComDimension) o;

    int tmp = this.dateDimension.compareTo(comDimension.getDateDimension());
    if(tmp != 0) return tmp;

    tmp = this.contactDimension.compareTo(comDimension.getContactDimension());
    return tmp;
}

@Override
public boolean equals(Object o) {
    if (this == o) return true;
    if (o == null || getClass() != o.getClass()) return false;

    ComDimension that = (ComDimension) o;

    if (dateDimension != null ? !dateDimension.equals(that.dateDimension) : that.dateDimension != null) return false;
    return contactDimension != null ? contactDimension.equals(that.contactDimension) : that.contactDimension == null;
}

@Override
public int hashCode() {
    int result = dateDimension != null ? dateDimension.hashCode() : 0;
    result = 31 * result + (contactDimension != null ? contactDimension.hashCode() : 0);
    return result;
}

@Override
public void write(DataOutput dataOutput) throws IOException {
    this.dateDimension.write(dataOutput);
    this.contactDimension.write(dataOutput);
}

@Override
public void readFields(DataInput dataInput) throws IOException {
    this.dateDimension.readFields(dataInput);
    this.contactDimension.readFields(dataInput);
}

}

  1. 创建类:ContactDimension
    package com.atguigu.analysis.kv.impl;

import com.atguigu.analysis.kv.base.BaseDimension;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class ContactDimension extends BaseDimension {
//数据库主键
private int id;
//手机号码
private String telephone;
//姓名
private String name;

public ContactDimension() {
    super();
}

public ContactDimension(String telephone, String name) {
    super();
    this.telephone = telephone;
    this.name = name;
}

public int getId() {
    return id;
}

public void setId(int id) {
    this.id = id;
}

public String getTelephone() {
    return telephone;
}

public void setTelephone(String telephone) {
    this.telephone = telephone;
}

public String getName() {
    return name;
}

public void setName(String name) {
    this.name = name;
}

@Override
public boolean equals(Object o) {
    if (this == o) return true;
    if (o == null || getClass() != o.getClass()) return false;

    ContactDimension that = (ContactDimension) o;

    if (id != that.id) return false;
    if (telephone != null ? !telephone.equals(that.telephone) : that.telephone != null) return false;
    return name != null ? name.equals(that.name) : that.name == null;
}

@Override
public int hashCode() {
    int result = id;
    result = 31 * result + (telephone != null ? telephone.hashCode() : 0);
    result = 31 * result + (name != null ? name.hashCode() : 0);
    return result;
}

@Override
public int compareTo(BaseDimension o) {
    if (o == this) return 0;
    ContactDimension contactDimension = (ContactDimension) o;

    int tmp = Integer.compare(this.id, contactDimension.getId());
    if (tmp != 0) return tmp;

    tmp = this.telephone.compareTo(contactDimension.getTelephone());
    if (tmp != 0) return tmp;

    return this.name.compareTo(contactDimension.getName());
}

@Override
public void write(DataOutput dataOutput) throws IOException {
    dataOutput.writeInt(this.id);
    dataOutput.writeUTF(this.telephone);
    dataOutput.writeUTF(this.name);
}

@Override
public void readFields(DataInput dataInput) throws IOException {
    this.id = dataInput.readInt();
    this.telephone = dataInput.readUTF();
    this.name = dataInput.readUTF();
}

@Override
public String toString() {
    return "ContactDimension{" +
            "id=" + id +
            ", telephone=" + telephone +
            ", name='" + name + '\'' +
            '}';
}

}

本教程由尚硅谷教育大数据研究院出品,如需转载请注明来源,欢迎大家关注尚硅谷公众号(atguigu)了解更多。

相关文章

网友评论

      本文标题:尚硅谷大数据技术之电信客服

      本文链接:https://www.haomeiwen.com/subject/elmxlqtx.html