- 创建类:MySQLOutputFormat
package com.atguigu.analysis.format;
import com.atguigu.analysis.converter.impl.DimensionConverter;
import com.atguigu.analysis.kv.base.BaseDimension;
import com.atguigu.analysis.kv.base.BaseValue;
import com.atguigu.analysis.kv.impl.ComDimension;
import com.atguigu.analysis.kv.impl.CountDurationValue;
import com.atguigu.constants.Constants;
import com.atguigu.utils.JDBCCacheBean;
import com.atguigu.utils.JDBCUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
public class MySQLOutputFormat extends OutputFormat<BaseDimension, BaseValue> {
@Override
public RecordWriter<BaseDimension, BaseValue> getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
//创建jdbc连接
Connection conn = null;
try {
conn = JDBCCacheBean.getInstance();
//关闭自动提交,以便于批量提交
conn.setAutoCommit(false);
} catch (SQLException e) {
throw new IOException(e);
}
return new MysqlRecordWriter(conn);
}
@Override
public void checkOutputSpecs(JobContext jobContext) throws IOException, InterruptedException {
// 校检输出
}
@Override
public OutputCommitter getOutputCommitter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
String name = taskAttemptContext.getConfiguration().get(FileOutputFormat.OUTDIR);
Path output = name == null ? null : new Path(name);
return new FileOutputCommitter(output, taskAttemptContext);
}
static class MysqlRecordWriter extends RecordWriter<BaseDimension, BaseValue> {
private Connection conn = null;
private DimensionConverter dimensionConverter = null;
private PreparedStatement preparedStatement = null;
private int batchNumber = 0;
int count = 0;
public MysqlRecordWriter(Connection conn) {
this.conn = conn;
this.batchNumber = Constants.JDBC_DEFAULT_BATCH_NUMBER;
this.dimensionConverter = new DimensionConverter();
}
@Override
public void write(BaseDimension key, BaseValue value) throws IOException, InterruptedException {
try {
// 统计当前PreparedStatement对象待提交的数据量
String sql = "INSERT INTO `tb_call`(`id_date_contact`, `id_date_dimension`, `id_contact`, `call_sum`, `call_duration_sum`) VALUES(?, ?, ?, ?, ?) ON DUPLICATE KEY UPDATE `id_date_contact` = ? ;";
if (preparedStatement == null) {
preparedStatement = conn.prepareStatement(sql);
}
// 本次sql
int i = 0;
ComDimension comDimension = (ComDimension) key;
CountDurationValue countDurationValue = (CountDurationValue) value;
int id_date_dimension = dimensionConverter.getDimensionId(comDimension.getDateDimension());
int id_contact = dimensionConverter.getDimensionId(comDimension.getContactDimension());
int call_sum = countDurationValue.getCallSum();
int call_duration_sum = countDurationValue.getCallDurationSum();
String id_date_contact = id_date_dimension + "_" + id_contact;
preparedStatement.setString(++i, id_date_contact);
preparedStatement.setInt(++i, id_date_dimension);
preparedStatement.setInt(++i, id_contact);
preparedStatement.setInt(++i, call_sum);
preparedStatement.setInt(++i, call_duration_sum);
preparedStatement.setString(++i, id_date_contact);
preparedStatement.addBatch();
//当前缓存了多少个sql语句等待批量执行,计数器
count++;
// 批量提交
if (count >= this.batchNumber) {
preparedStatement.executeBatch(); // 批量提交
conn.commit(); // 连接提交
count = 0;
}
} catch (SQLException e) {
e.printStackTrace();
}
}
@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
try {
preparedStatement.executeBatch();
this.conn.commit();
} catch (SQLException e) {
e.printStackTrace();
}finally {
JDBCUtil.close(conn, preparedStatement, null);
}
}
}
}
- 创建类:BaseDimension
package com.atguigu.analysis.kv.base;
import org.apache.hadoop.io.WritableComparable;
public abstract class BaseDimension implements WritableComparable<BaseDimension> {}
- 创建类:BaseValue
package com.atguigu.analysis.kv.base;
import org.apache.hadoop.io.Writable;
public abstract class BaseValue implements Writable { }
- 创建类:ComDimension
package com.atguigu.analysis.kv.impl;
import com.atguigu.analysis.kv.base.BaseDimension;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class ComDimension extends BaseDimension{
//时间维度
private DateDimension dateDimension = new DateDimension();
//联系人维度
private ContactDimension contactDimension = new ContactDimension();
public ComDimension() {
super();
}
public ComDimension(DateDimension dateDimension, ContactDimension contactDimension) {
super();
this.dateDimension = dateDimension;
this.contactDimension = contactDimension;
}
public DateDimension getDateDimension() {
return dateDimension;
}
public void setDateDimension(DateDimension dateDimension) {
this.dateDimension = dateDimension;
}
public ContactDimension getContactDimension() {
return contactDimension;
}
public void setContactDimension(ContactDimension contactDimension) {
this.contactDimension = contactDimension;
}
@Override
public int compareTo(BaseDimension o) {
if(this == o) return 0;
ComDimension comDimension = (ComDimension) o;
int tmp = this.dateDimension.compareTo(comDimension.getDateDimension());
if(tmp != 0) return tmp;
tmp = this.contactDimension.compareTo(comDimension.getContactDimension());
return tmp;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ComDimension that = (ComDimension) o;
if (dateDimension != null ? !dateDimension.equals(that.dateDimension) : that.dateDimension != null) return false;
return contactDimension != null ? contactDimension.equals(that.contactDimension) : that.contactDimension == null;
}
@Override
public int hashCode() {
int result = dateDimension != null ? dateDimension.hashCode() : 0;
result = 31 * result + (contactDimension != null ? contactDimension.hashCode() : 0);
return result;
}
@Override
public void write(DataOutput dataOutput) throws IOException {
this.dateDimension.write(dataOutput);
this.contactDimension.write(dataOutput);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
this.dateDimension.readFields(dataInput);
this.contactDimension.readFields(dataInput);
}
}
- 创建类:ContactDimension
package com.atguigu.analysis.kv.impl;
import com.atguigu.analysis.kv.base.BaseDimension;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class ContactDimension extends BaseDimension {
//数据库主键
private int id;
//手机号码
private String telephone;
//姓名
private String name;
public ContactDimension() {
super();
}
public ContactDimension(String telephone, String name) {
super();
this.telephone = telephone;
this.name = name;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getTelephone() {
return telephone;
}
public void setTelephone(String telephone) {
this.telephone = telephone;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ContactDimension that = (ContactDimension) o;
if (id != that.id) return false;
if (telephone != null ? !telephone.equals(that.telephone) : that.telephone != null) return false;
return name != null ? name.equals(that.name) : that.name == null;
}
@Override
public int hashCode() {
int result = id;
result = 31 * result + (telephone != null ? telephone.hashCode() : 0);
result = 31 * result + (name != null ? name.hashCode() : 0);
return result;
}
@Override
public int compareTo(BaseDimension o) {
if (o == this) return 0;
ContactDimension contactDimension = (ContactDimension) o;
int tmp = Integer.compare(this.id, contactDimension.getId());
if (tmp != 0) return tmp;
tmp = this.telephone.compareTo(contactDimension.getTelephone());
if (tmp != 0) return tmp;
return this.name.compareTo(contactDimension.getName());
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeInt(this.id);
dataOutput.writeUTF(this.telephone);
dataOutput.writeUTF(this.name);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
this.id = dataInput.readInt();
this.telephone = dataInput.readUTF();
this.name = dataInput.readUTF();
}
@Override
public String toString() {
return "ContactDimension{" +
"id=" + id +
", telephone=" + telephone +
", name='" + name + '\'' +
'}';
}
}
本教程由尚硅谷教育大数据研究院出品,如需转载请注明来源,欢迎大家关注尚硅谷公众号(atguigu)了解更多。
网友评论