美文网首页
自定义输出多个MySQL表的OutputFormat

自定义输出多个MySQL表的OutputFormat

作者: _helloliang | 来源:发表于2016-12-13 20:52 被阅读262次

输出MySQL的表

需要向MySQLstats_visitor_basic表和stats_visitor_browser表中插入数据,插入数据的sql语句为:

query-mapping.xml

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
    <property>
        <name>new_visitor_basic</name>
        <value>
            INSERT INTO `stats_visitor_basic`(
            `platform_dimension_id`,
            `date_dimension_id`,
            `new_install_users`,
            `created`)
            VALUES(?, ?, ?, ?)
            ON DUPLICATE KEY UPDATE `new_install_users` = ?
        </value>
    </property>

    <property>
        <name>new_visitor_browser</name>
        <value>
            INSERT INTO `stats_visitor_browser`(
            `platform_dimension_id`,
            `date_dimension_id`,
            `browser_dimension_id`,
            `new_install_users`,
            `created`)
            VALUES(?, ?, ?, ?, ?) ON DUPLICATE KEY UPDATE
            `new_install_users` = ?
        </value>
    </property>
<configuration>

sql语句参数设置

不同sql语句需要设置的参数个数不同。因此针对不同的表,需要使用不同的参数设置方法。

参数设置接口

/**
 * 定义具体mapreduce对应的输出操作代码
 *
 * @author liangxw
 */

public interface IOutputCollector {

    /**
     * 定义具体执行sql数据插入的方法
     */
    void setArgs(Configuration conf, Dimension key, OutputValue value,
                 PreparedStatement pstmt, IDimensionHandler idh
    ) throws IOException;
}

具体的参数设置方式

对于stats_visitor_basic表的参数设置:

public class NewVisitorBasicCollector implements IOutputCollector {

    @Override
    public void setArgs(Configuration conf, Dimension key, OutputValue value,
                        PreparedStatement pstmt, IDimensionHandler rpcConn)
            throws IOException {

        UserStatsDimension userBehavior = (UserStatsDimension) key;
        IntWritable newVisitorsBasic = (IntWritable) value.getNumberMap().get(new IntWritable(-1));

        int i = 0;
        try {
            pstmt.setInt(++i, rpcConn.getDimensionId(userBehavior.getCommonDimension().getPlatformDimension()));
            pstmt.setInt(++i, rpcConn.getDimensionId(userBehavior.getCommonDimension().getDateDimension()));
            pstmt.setInt(++i, newVisitorsBasic.get());
            pstmt.setString(++i, conf.get(GlobalConstant.RUNNING_DATE));
            pstmt.setInt(++i, newVisitorsBasic.get());

            // 添加一次预处理参数
            pstmt.addBatch();
        } catch (SQLException e) {
            throw new IOException("sql异常", e);
        }
    }

}

对于stats_visitor_browser表的参数设置:

public class NewVisitorBrowserCollector implements IOutputCollector {

    @Override
    public void setArgs(Configuration conf, Dimension key, OutputValue value, PreparedStatement pstmt,
                        IDimensionHandler convertere) throws IOException {

        UserStatsDimension userBehavior = (UserStatsDimension) key;
        IntWritable newVisitorsBrowser = (IntWritable) value.getNumberMap().get(new IntWritable(-1));

        int i = 0;
        try {
            pstmt.setInt(++i, convertere.getDimensionId(userBehavior.getCommonDimension().getPlatformDimension()));
            pstmt.setInt(++i, convertere.getDimensionId(userBehavior.getCommonDimension().getDateDimension()));
            pstmt.setInt(++i, convertere.getDimensionId(userBehavior.getBrowserD()));
            pstmt.setInt(++i, newVisitorsBrowser.get());
            pstmt.setString(++i, conf.get(GlobalConstant.RUNNING_DATE));
            pstmt.setInt(++i, newVisitorsBrowser.get());

            // 批量执行
            pstmt.addBatch();
        } catch (SQLException e) {
            throw new IOException("sql异常", e);
        }
    }

}

自定义输出到MySQL的OutputFormat类

/**
 * 自定义输出到mysql的OutputFormat类
 *
 * @author liangxw
 */
public class MysqlOutputFormat extends OutputFormat<Dimension, OutputValue> {

    /*
      返回一个具体定义如何输出数据的对象, RecordWriter被称为数据的输出器
    */
    @Override
    public RecordWriter<Dimension, OutputValue> getRecordWriter(TaskAttemptContext context)
            throws IOException, InterruptedException {

        Connection conn;
        Configuration conf = context.getConfiguration();
        // 远程访问RPC服务
        IDimensionHandler rpcConn = DimensionHandlerClient.createDimensionConnector(conf);

        try {
            conn = JdbcManager.getConnection(GlobalConstant.MYSQL_DATABASE);
            conn.setAutoCommit(false); // 关闭自动提交机制
        } catch (Exception e) {
            throw new RuntimeException("获取数据库连接失败", e);
        }
        return new MySQLRecordWriter(conn, conf, rpcConn);
    }

    @Override
    public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {
        // 这个方法在自己实现的时候不需要关注,如果你非要关注,最多检查一下表数据存在
    }

    @Override
    public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
        return new FileOutputCommitter(FileOutputFormat.getOutputPath(context), context);
    }

    /**
     * 自定义的数据输出器
     */
    private class MySQLRecordWriter extends RecordWriter<Dimension, OutputValue> {

        private Connection conn = null;
        private Configuration conf = null;
        private IDimensionHandler rpcConn = null;

        // kpiSqlMap中,存放Kpi以及对应的sql语句
        private Map<KpiType, PreparedStatement> kpiSqlMap = new HashMap<>();
        // kpiNumberMap中,存放kpi以及对应出现的次数
        private Map<KpiType, Integer> kpiNumberMap = new HashMap<>();

        MySQLRecordWriter(Connection conn, Configuration conf, IDimensionHandler rpcConn) {
            super();
            this.conn = conn;
            this.conf = conf;
            this.rpcConn = rpcConn;
        }

        // 当Reduce调用context.write()时,底层调用的是该方法
        @Override
        public void write(Dimension key, OutputValue value) throws IOException, InterruptedException {

            KpiType kpiType = value.getKpiType();

            //从query-mappiing.xml中拿出sql字符串
            String sql = this.conf.get(kpiType.name);

            PreparedStatement pstmt;

            int count = 1;
            try {
                pstmt = kpiSqlMap.get(kpiType);

                if (pstmt == null) {// 第一次创建
                    pstmt = this.conn.prepareStatement(sql);
                    kpiSqlMap.put(kpiType, pstmt);
                } else {// 表示已经存在
                    if (!kpiNumberMap.containsKey(kpiType)) {
                        kpiNumberMap.put(kpiType, count);
                    }
                    count = kpiNumberMap.get(kpiType);
                    count++;
                }
                kpiNumberMap.put(kpiType, count);

                /*针对不同的Kpi(不同的表)有不同的参数设置方法*/
                String collectorClassName = conf.get(GlobalConstant.OUTPUT_COLLECTOR_PREFIX + kpiType.name);
                Class<?> clazz = Class.forName(collectorClassName);
                // 创建对象, 要求实现子类一定要有一个无参数的构造方法
                IOutputCollector collector = (IOutputCollector) clazz.newInstance();
                collector.setArgs(conf, key, value, pstmt, rpcConn);

                // 批量执行
                if (count % conf.getInt(GlobalConstant.JDBC_BATCH_NUMBER, GlobalConstant.DEFAULT_JDBC_BATCH_NUMBER) == 0) {
                    pstmt.executeBatch();
                    conn.commit();
                    // 移除已经执行过的Kpi
                    kpiNumberMap.remove(kpiType);
                }
            } catch (Exception e) {
                throw new IOException("数据输出产生异常", e);
            }
        }

        @Override
        public void close(TaskAttemptContext context) throws IOException, InterruptedException {
            // 关闭资源使用,最终一定会调用
            try {

                try {
                    for (Map.Entry<KpiType, PreparedStatement> entry : this.kpiSqlMap.entrySet()) {
                        entry.getValue().executeBatch();
                    }
                } catch (Exception e) {
                    throw new IOException("输出数据出现异常", e);
                } finally {
                    try {
                        if (conn != null) {
                            conn.commit();
                        }
                    } catch (Exception e) {
                        // nothings
                    } finally {
                        if (conn != null) {
                            for (Map.Entry<KpiType, PreparedStatement> entry : this.kpiSqlMap.entrySet()) {
                                try {
                                    entry.getValue().close();
                                } catch (SQLException e) {
                                    // nothings
                                }
                            }
                            try {
                                conn.close();
                            } catch (SQLException e) {
                                // nothings
                            }
                        }
                    }
                }
            } finally {
                // 关闭远程连接
                DimensionHandlerClient.stopDimensionHandlerProxy(rpcConn);
            }
        }

    }

}

相关文章

网友评论

      本文标题:自定义输出多个MySQL表的OutputFormat

      本文链接:https://www.haomeiwen.com/subject/yqvbmttx.html