美文网首页
自定义输出多个MySQL表的OutputFormat

自定义输出多个MySQL表的OutputFormat

作者: _helloliang | 来源:发表于2016-12-13 20:52 被阅读262次

    输出MySQL的表

    需要向MySQLstats_visitor_basic表和stats_visitor_browser表中插入数据,插入数据的sql语句为:

    query-mapping.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
    <configuration>
        <property>
            <name>new_visitor_basic</name>
            <value>
                INSERT INTO `stats_visitor_basic`(
                `platform_dimension_id`,
                `date_dimension_id`,
                `new_install_users`,
                `created`)
                VALUES(?, ?, ?, ?)
                ON DUPLICATE KEY UPDATE `new_install_users` = ?
            </value>
        </property>
    
        <property>
            <name>new_visitor_browser</name>
            <value>
                INSERT INTO `stats_visitor_browser`(
                `platform_dimension_id`,
                `date_dimension_id`,
                `browser_dimension_id`,
                `new_install_users`,
                `created`)
                VALUES(?, ?, ?, ?, ?) ON DUPLICATE KEY UPDATE
                `new_install_users` = ?
            </value>
        </property>
    <configuration>
    

    sql语句参数设置

    不同sql语句需要设置的参数个数不同。因此针对不同的表,需要使用不同的参数设置方法。

    参数设置接口

    /**
     * 定义具体mapreduce对应的输出操作代码
     *
     * @author liangxw
     */
    
    public interface IOutputCollector {
    
        /**
         * 定义具体执行sql数据插入的方法
         */
        void setArgs(Configuration conf, Dimension key, OutputValue value,
                     PreparedStatement pstmt, IDimensionHandler idh
        ) throws IOException;
    }
    

    具体的参数设置方式

    对于stats_visitor_basic表的参数设置:

    public class NewVisitorBasicCollector implements IOutputCollector {
    
        @Override
        public void setArgs(Configuration conf, Dimension key, OutputValue value,
                            PreparedStatement pstmt, IDimensionHandler rpcConn)
                throws IOException {
    
            UserStatsDimension userBehavior = (UserStatsDimension) key;
            IntWritable newVisitorsBasic = (IntWritable) value.getNumberMap().get(new IntWritable(-1));
    
            int i = 0;
            try {
                pstmt.setInt(++i, rpcConn.getDimensionId(userBehavior.getCommonDimension().getPlatformDimension()));
                pstmt.setInt(++i, rpcConn.getDimensionId(userBehavior.getCommonDimension().getDateDimension()));
                pstmt.setInt(++i, newVisitorsBasic.get());
                pstmt.setString(++i, conf.get(GlobalConstant.RUNNING_DATE));
                pstmt.setInt(++i, newVisitorsBasic.get());
    
                // 添加一次预处理参数
                pstmt.addBatch();
            } catch (SQLException e) {
                throw new IOException("sql异常", e);
            }
        }
    
    }
    

    对于stats_visitor_browser表的参数设置:

    public class NewVisitorBrowserCollector implements IOutputCollector {
    
        @Override
        public void setArgs(Configuration conf, Dimension key, OutputValue value, PreparedStatement pstmt,
                            IDimensionHandler convertere) throws IOException {
    
            UserStatsDimension userBehavior = (UserStatsDimension) key;
            IntWritable newVisitorsBrowser = (IntWritable) value.getNumberMap().get(new IntWritable(-1));
    
            int i = 0;
            try {
                pstmt.setInt(++i, convertere.getDimensionId(userBehavior.getCommonDimension().getPlatformDimension()));
                pstmt.setInt(++i, convertere.getDimensionId(userBehavior.getCommonDimension().getDateDimension()));
                pstmt.setInt(++i, convertere.getDimensionId(userBehavior.getBrowserD()));
                pstmt.setInt(++i, newVisitorsBrowser.get());
                pstmt.setString(++i, conf.get(GlobalConstant.RUNNING_DATE));
                pstmt.setInt(++i, newVisitorsBrowser.get());
    
                // 批量执行
                pstmt.addBatch();
            } catch (SQLException e) {
                throw new IOException("sql异常", e);
            }
        }
    
    }
    

    自定义输出到MySQL的OutputFormat类

    /**
     * 自定义输出到mysql的OutputFormat类
     *
     * @author liangxw
     */
    public class MysqlOutputFormat extends OutputFormat<Dimension, OutputValue> {
    
        /*
          返回一个具体定义如何输出数据的对象, RecordWriter被称为数据的输出器
        */
        @Override
        public RecordWriter<Dimension, OutputValue> getRecordWriter(TaskAttemptContext context)
                throws IOException, InterruptedException {
    
            Connection conn;
            Configuration conf = context.getConfiguration();
            // 远程访问RPC服务
            IDimensionHandler rpcConn = DimensionHandlerClient.createDimensionConnector(conf);
    
            try {
                conn = JdbcManager.getConnection(GlobalConstant.MYSQL_DATABASE);
                conn.setAutoCommit(false); // 关闭自动提交机制
            } catch (Exception e) {
                throw new RuntimeException("获取数据库连接失败", e);
            }
            return new MySQLRecordWriter(conn, conf, rpcConn);
        }
    
        @Override
        public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {
            // 这个方法在自己实现的时候不需要关注,如果你非要关注,最多检查一下表数据存在
        }
    
        @Override
        public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
            return new FileOutputCommitter(FileOutputFormat.getOutputPath(context), context);
        }
    
        /**
         * 自定义的数据输出器
         */
        private class MySQLRecordWriter extends RecordWriter<Dimension, OutputValue> {
    
            private Connection conn = null;
            private Configuration conf = null;
            private IDimensionHandler rpcConn = null;
    
            // kpiSqlMap中,存放Kpi以及对应的sql语句
            private Map<KpiType, PreparedStatement> kpiSqlMap = new HashMap<>();
            // kpiNumberMap中,存放kpi以及对应出现的次数
            private Map<KpiType, Integer> kpiNumberMap = new HashMap<>();
    
            MySQLRecordWriter(Connection conn, Configuration conf, IDimensionHandler rpcConn) {
                super();
                this.conn = conn;
                this.conf = conf;
                this.rpcConn = rpcConn;
            }
    
            // 当Reduce调用context.write()时,底层调用的是该方法
            @Override
            public void write(Dimension key, OutputValue value) throws IOException, InterruptedException {
    
                KpiType kpiType = value.getKpiType();
    
                //从query-mappiing.xml中拿出sql字符串
                String sql = this.conf.get(kpiType.name);
    
                PreparedStatement pstmt;
    
                int count = 1;
                try {
                    pstmt = kpiSqlMap.get(kpiType);
    
                    if (pstmt == null) {// 第一次创建
                        pstmt = this.conn.prepareStatement(sql);
                        kpiSqlMap.put(kpiType, pstmt);
                    } else {// 表示已经存在
                        if (!kpiNumberMap.containsKey(kpiType)) {
                            kpiNumberMap.put(kpiType, count);
                        }
                        count = kpiNumberMap.get(kpiType);
                        count++;
                    }
                    kpiNumberMap.put(kpiType, count);
    
                    /*针对不同的Kpi(不同的表)有不同的参数设置方法*/
                    String collectorClassName = conf.get(GlobalConstant.OUTPUT_COLLECTOR_PREFIX + kpiType.name);
                    Class<?> clazz = Class.forName(collectorClassName);
                    // 创建对象, 要求实现子类一定要有一个无参数的构造方法
                    IOutputCollector collector = (IOutputCollector) clazz.newInstance();
                    collector.setArgs(conf, key, value, pstmt, rpcConn);
    
                    // 批量执行
                    if (count % conf.getInt(GlobalConstant.JDBC_BATCH_NUMBER, GlobalConstant.DEFAULT_JDBC_BATCH_NUMBER) == 0) {
                        pstmt.executeBatch();
                        conn.commit();
                        // 移除已经执行过的Kpi
                        kpiNumberMap.remove(kpiType);
                    }
                } catch (Exception e) {
                    throw new IOException("数据输出产生异常", e);
                }
            }
    
            @Override
            public void close(TaskAttemptContext context) throws IOException, InterruptedException {
                // 关闭资源使用,最终一定会调用
                try {
    
                    try {
                        for (Map.Entry<KpiType, PreparedStatement> entry : this.kpiSqlMap.entrySet()) {
                            entry.getValue().executeBatch();
                        }
                    } catch (Exception e) {
                        throw new IOException("输出数据出现异常", e);
                    } finally {
                        try {
                            if (conn != null) {
                                conn.commit();
                            }
                        } catch (Exception e) {
                            // nothings
                        } finally {
                            if (conn != null) {
                                for (Map.Entry<KpiType, PreparedStatement> entry : this.kpiSqlMap.entrySet()) {
                                    try {
                                        entry.getValue().close();
                                    } catch (SQLException e) {
                                        // nothings
                                    }
                                }
                                try {
                                    conn.close();
                                } catch (SQLException e) {
                                    // nothings
                                }
                            }
                        }
                    }
                } finally {
                    // 关闭远程连接
                    DimensionHandlerClient.stopDimensionHandlerProxy(rpcConn);
                }
            }
    
        }
    
    }
    

    相关文章

      网友评论

          本文标题:自定义输出多个MySQL表的OutputFormat

          本文链接:https://www.haomeiwen.com/subject/yqvbmttx.html