美文网首页
hive UDF函数中读取mysql配置文件最佳实践

hive UDF函数中读取mysql配置文件最佳实践

作者: DuLaGong | 来源:发表于2019-06-14 13:54 被阅读0次

    上代码:

    <properties>

    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

    <hive.version>1.1.0</hive.version>

    <hadoop.version>2.6.0</hadoop.version>

    <mysql.version>5.1.34</mysql.version>

    </properties>

    <dependencies>

    <dependency>

    <groupId>org.apache.hive</groupId>

    <artifactId>hive-exec</artifactId>

    <version>${hive.version}</version>

    </dependency>

    <dependency>

    <groupId>org.apache.hadoop</groupId>

    <artifactId>hadoop-common</artifactId>

    <version>${hadoop.version}</version>

    <exclusions>

    <exclusion>

    <groupId>jdk.tools</groupId>

    <artifactId>jdk.tools</artifactId>

    </exclusion>

    </exclusions>

    </dependency>

    <dependency>

    <groupId>mysql</groupId>

    <artifactId>mysql-connector-java</artifactId>

    <version>${mysql.version}</version>

    </dependency>

    </dependencies>

    java代码:

    import java.io.IOException;

    import java.net.URI;

    import java.sql.Connection;

    import java.sql.DriverManager;

    import java.sql.ResultSet;

    import java.sql.SQLException;

    import java.sql.Statement;

    import java.util.Properties;

    import org.apache.hadoop.conf.Configuration;

    import org.apache.hadoop.fs.FSDataInputStream;

    import org.apache.hadoop.fs.FileSystem;

    import org.apache.hadoop.fs.Path;

    import org.apache.hadoop.hive.ql.exec.UDF;

    public class GetNextTradeDate extends UDF{

    public String evaluate (){

    Properties p=null;

    try {

    p = getPro();

    } catch (Exception e1) {

    e1.printStackTrace();

    }

    String DB_URL=p.getProperty("DB_URL");

    String JDBC_DRIVER=p.getProperty("JDBC_DRIVER");

    String USER=p.getProperty("USER");

    String PASS=p.getProperty("PASS");

    Connection conn = null;

            Statement stmt = null;

            String init_date = "";

            try{

                Class.forName(JDBC_DRIVER);

                conn = DriverManager.getConnection(DB_URL,USER,PASS);

                stmt = conn.createStatement();

                String sql = "SELECT date FROM xxx.xxx limit 1";

                ResultSet rs = stmt.executeQuery(sql);

                while(rs.next()){

                init_date= rs.getString("date");

                }

                rs.close();

                stmt.close();

                conn.close();

            }catch(SQLException se){

                se.printStackTrace();

            }catch(Exception e){

                e.printStackTrace();

            }finally{

                try{

                    if(stmt!=null) stmt.close();

                }catch(SQLException se2){

                }

                try{

                    if(conn!=null) conn.close();

                }catch(SQLException se){

                    se.printStackTrace();

                }

            }

            return init_date;

        }

    private Properties getPro() throws Exception{

            String dsf= "/user/xxx/mysql-for-hive.properties";

            Configuration conf = new Configuration();

            FileSystem fs = FileSystem.get(URI.create(dsf),conf);

            FSDataInputStream inputStream = fs.open(new Path(dsf));

            Properties p = new Properties();

          try {

          p.load(inputStream);

          } catch (IOException e) {

          e.printStackTrace();

          }

    return p;

    }

    }

    将上述代码导出为jar包。

    配置文件mysql-for-hive.properties内容如下:

    JDBC_DRIVER=com.mysql.jdbc.Driver

    DB_URL=jdbc:mysql://192.168.xxx.xxx:3306/xxx

    USER=xxxx

    PASS=xxxx

    这种是创建永久UDF函数,需要把jar包和配置文件上传hdfs;

    hdfs dfs -put -p getNextTradeDate.jar /user/xxx

    hdfs dfs -put -p mysql-for-hive.properties /user/xxx

    创建函数

    CREATE FUNCTION getNextTradeDate AS 'com.xxx.xxx.xxx.GetNextTradeDate' USING JAR 'hdfs://xxxx:8020/user/xxxx/getNextTradeDate.jar';

    删除函数

    drop function getNextTradeDate;

    测试函数:

    select getNextTradeDate() tradeDate from xxxx.xxxx;

    创建临时函数略有不同,临时函数当前session有效:

    如果你的临时函数没有输入,只有输出,会被hive优化为本地执行,不需要集群,这时候读取配置文件就可以读本地,add file后,直接用getResourceAsStream读本地文件即可,不再采用从集群中读取配置文件的方式。

    读入配置文件的语句修改如下:

    InputStream inputStream = this.getClass().getClassLoader().getResourceAsStream("./mysql-for-hive.properties");

    然后再Properties p = new Properties();  p.load(inputStream); .........

    打jar包上传,然后打开hive,输入:

    add file /home/mysql-for-hive.properties;

    add jar /home/getNextTradeDate.jar;

    create temporary function getNextTradeDate as "com.xxx.xxx.xxx.GetNextTradeDate";

    select getNextTradeDate() tradeDate from xxxx.xxxx;

    有结果的话就测试成功。

    删除临时函数

    drop temporary function getNextTradeDate;

    相关文章

      网友评论

          本文标题:hive UDF函数中读取mysql配置文件最佳实践

          本文链接:https://www.haomeiwen.com/subject/vpyufctx.html