美文网首页
Hive的Transform和UDF

Hive的Transform和UDF

作者: 这个该叫什么呢 | 来源:发表于2018-01-31 22:42 被阅读0次

    UDTF

    UDAF

    Hive中的TRANSFORM:使用脚本完成Map/Reduce

    转自: http://www.coder4.com/archives/4052

    首先来看一下数据:

    hive> select * from test;
    OK
    1       3
    2       2
    3       1
    

    假设,我们要输出每一列的md5值。在目前的hive中是没有这个udf的。

    我们看一下Python的代码:

    #!/home/tops/bin/python
    
    import sys
    import hashlib
    
    for line in sys.stdin:
        line = line.strip()
        arr = line.split()
        md5_arr = []
        for a in arr:
            md5_arr.append(hashlib.md5(a).hexdigest())
        print "\t".join(md5_arr)
    

    在Hive中,使用脚本,首先要将他们加入:

    add file /xxxx/test.py
    

    然后,在调用时,使用TRANSFORM语法。

    SELECT 
        TRANSFORM (col1, col2) 
        USING './test.py' 
        AS (new1, new2) 
    FORM 
        test;
    

    这里,我们使用了AS,指定输出的若干个列,分别对应到哪个列名。如果省略这句,则Hive会将第1个tab前的结果作为key,后面其余作为value。

    这里有一个小坑:有时候,我们结合INSERT OVERWRITE使用上述TRANSFORM,而目标表,其分割副可能不是\t。但是请牢记:TRANSFORM的分割符号,传入、传出脚本的,永远是\t。不要考虑外面其他的分割符号!

    最后,解释一下MAP、REDUCE。

    在有的Hive语句中,大家可能会看到SELECT MAP (…) USING ‘xx.py’这样的语法。

    然而,在Hive中,MAP、REDUCE只不过是TRANSFORM的别名,Hive不保证一定会在map/reduce中调用脚本。看看官方文档是怎么说的:

    Formally, MAP ... and REDUCE ... are syntactic transformations of SELECT TRANSFORM ( ... ). In other words, they serve as comments or notes to the reader of the query. BEWARE: Use of these keywords may be dangerous as (e.g.) typing "REDUCE" does not force a reduce phase to occur and typing "MAP" does not force a new map phase!
    

    所以、混用map reduce语法关键字,甚至会引起混淆,所以建议大家还是都用TRANSFORM吧。

    友情提示:如果脚本不是Python,而是awk、sed等系统内置命令,可以直接使用,而不用add file。

    如果表中有MAP,ARRAY等复杂类型,怎么用TRANSFORM生成?

    例如:

    CREATE TABLE features
    (
        id BIGINT,
        norm_features MAP<STRING, FLOAT> 
    );
    

    答案是,要在脚本的输出中,对特殊字段按照HDFS文件中的格式输出即可。

    例如,以上面的表结构为例,每行输出应为:

    1^Ifeature1^C1.0^Bfeature2^C2.0
    

    其中I是tab键,这是TRANSFORM要求的分割符号。B和^C是Hive存储时MAP类型的KV分割符。

    另外,在Hive的TRANSFORM语句的时候,要注意AS中加上类型声明:

    SELECT TRANSFORM(stuff)
    USING 'script'
    AS (thing1 INT, thing2 MAP<STRING, FLOAT>)
    

    Hive中的TRANSFORM:自定义Mapper和Reducer完成Map/Reduce

    /**
     * Mapper.
     */
    public interface Mapper {
      /**
       * Maps a single row into an intermediate rows.
       * 
       * @param record
       *          input record
       * @param output
       *          collect mapped rows.
       * @throws Exception
       *           on error
       */
      void map(String[] record, Output output) throws Exception;
    }
    

    可以将一列拆分为多列

    使用样例:

    public class ExecuteMap {
    
        private static final String FULL_PATH_CLASS = "com.***.dpop.ods.mr.impl.";
    
        private static final Map<String, Mapper> mappers = new HashMap<String, Mapper>();
    
        public static void main(String[] args) throws Exception {
            if (args.length < 1) {
                throw new Exception("Process class must be given");
            }
    
            new GenericMR().map(System.in, System.out,
                    getMapper(args[0], Arrays.copyOfRange(args, 1, args.length)));
        }
    
        private static Mapper getMapper(String parserClass, String[] args)
                throws ClassNotFoundException {
            if (mappers.containsKey(parserClass)) {
                return mappers.get(parserClass);
            }
    
            Class[] classes = new Class[args.length];
            for (int i = 0; i < classes.length; ++i) {
                classes[i] = String.class;
            }
            try {
                Mapper mapper = (Mapper) Class.forName(FULL_PATH_CLASS + parserClass).getConstructor(classes).newInstance(args);
                mappers.put(parserClass, mapper);
                return mapper;
            } catch (ClassNotFoundException e) {
                throw new ClassNotFoundException("Unknown MapperClass:" + parserClass, e);
            } catch (Exception e) {
                throw new  ClassNotFoundException("Error Constructing processor", e);
            }
    
        }
    }
    
    MR_USING=" USING 'java -Xmx512m -Xms512m -cp ods-mr-1.0.jar:hive-contrib-2.3.33.jar com.***.dpop.ods.mr.api.ExecuteMap "
    
    COMMAND="FROM dw_rtb.event_fact_adx_auction "
    COMMAND="${COMMAND} INSERT overwrite TABLE dw_rtb.event_fact_mid_adx_auction_ad PARTITION(yymmdd=${CURRENT_DATE}) SELECT transform(search_id, print_time, pthread_id, ad_s) ${MR_USING} EventFactMidAdxAuctionAdMapper' as search_id, print_time, pthread_id, ad_s, ssp_id WHERE $INSERT_PARTITION and original = 'exinternal' "
    

    Hive Python Streaming的原理及写法

    http://www.tuicool.com/articles/vmumUjA

    相关文章

      网友评论

          本文标题:Hive的Transform和UDF

          本文链接:https://www.haomeiwen.com/subject/hxhrzxtx.html