美文网首页
hadoop处理数据模版

hadoop处理数据模版

作者: 肥了个大西瓜 | 来源:发表于2018-11-07 20:43 被阅读0次

1 利用hql处理数据

利用hql处理数据需要用到两个文件,handle.py和handle.sql文件,利用python handle.py运行。

  • handle.py

#!/usr/bin/env python2.7
# -*- coding:gb18030 -*-
from tool import *
import time

name=__file__[2:-3]
date=sys.argv[1]
user = ''

if len(sys.argv) > 2:
    user = sys.argv[2]
os.putenv('JAVA_HOME', 'java_home_path')

system('cat %s.sql'%name)
system('mkdir '+name)
system('queryengine_path --hivevar user=%s --hivevar date=%s --hivevar date2=%s --hivevar date3=%s --hivevar name=%s --sessionconf engine=wing -f %s.sql > %s/%s'%(user,date,date2,date3,name,name,name,date))

handle.py 基本不需要变动,修改handle.sql即可。

  • handle.sql

SET mapred.job.name=${hivevar:user}_${hivevar:date}_${hivevar:name};
SET mapred.job.priority=HIGH;

sessiondb = database 'session:/';
use sessiondb;
create table test_table(user_id string, user_name sring) row format delimited fields terminated by '\t';
load data inpath "data_hdfs_input_path" overwrite into table cuid_sid;  --将hdfs中的数据load为table

INSERT OVERWRITE DIRECTORY 'data_save_path' --存储地址
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' --存储\t分隔

SELECT
    *
FROM
    table

2 利用Map Reduce处理数据

利用Map Reduce处理数据需要用到三个文件,map.py、red.py和run.sh文件,map.py和red.py编写计算逻辑,run.sh文件将数据放到hadoop集群上进行运算。

2.1 Map Reduce处理hdfs中数据:

-map.py

# -*-coding:utf-8 -*-

import sys
import os


input_file = os.getenv('map_input_file')
#print  input_file
day = sys.argv[1]

for line in sys.stdin:
    if str1 in input_file:
        print "%s\t%s" % ("", "")


    if str2 in input_file:
        print "%s\t%s" % ("", "")

-red.py

import sys

current_word = None
current_count = 0

for line in sys.stdin:
    line = line.strip()
    word, count = line.split("\t", 1)

    count = int(count)
    if current_word == word:
         current_count += count
    else:
        if current_word:
            print "%s\t%s" % (current_word, current_count)
        current_word = word
        current_count = count

if current_word == word:
    print "%s\t%s" % (current_word, current_count)

-run.sh

#!/bin/bash

ipath=""
opath=""

HADOOP=hadoop_path

$HADOOP fs -rmr $opath
$HADOOP  streaming \
        -input  ${ipath} \
        -output ${opath} \
        -partitioner '' \
        -mapper "python map.py" \
        -reducer "python red.py" \
        -file map.py  \
        -file red.py \
        -jobconf mapred.job.priority=VERY_HIGH \
        -jobconf mapred.job.queue.name="" \
        -jobconf mapred.job.name="" \
        -jobconf mapred.reduce.tasks=1000 \
        -jobconf mapred.job.map.capacity=1000 \
        -jobconf mapred.job.reduce.capacity=100 \
        -jobconf num.key.fields.for.partition=1 \
        -jobconf stream.num.map.output.key.fields=1 \
        -cacheArchive ""

if [ $? -ne 0 ];then
    echo "hadoop streaming failed"
    exit 1
else
    ${HADOOP} fs -touchz ${opath}/@manifest
fi

echo "${opath}"
echo "$1 finished"

2.1 Map Reduce处理hive中数据:

  • BdAppCuidSidDau.py (parse数据)

  • map.py

# -*- coding:utf-8 -*-
"""
author: fengshuchao(@baidu.com)
date: 2018/07/25
version: 1.0
"""

import sys
import json
import copy
sys.path.append('path_BdAppCuidSidDau.py')
from BdAppCuidSidDau import RecordParser


idx_list = []

output = dict.fromkeys(idx_list, 0)


if __name__ == '__main__':

    record_parser = RecordParser()

    for line in sys.stdin:
        record = record_parser.parser(line)

        #过滤有效record和sid_list

        if type(record).__name__ != 'dict':   #部分bool型
            continue
            
        print '%s$%s$%s\t%s' % (record[''], \
                                   record[''], \
                                   record[''], \
                                   output)
  • red.py

  • run.sh

#!/bin/sh
day=$1
INPUT_TABLE=""
INPUT_COLS="*"
INPUT_PROJECT="${INPUT_TABLE}#filter={event_day=${day}}#inputcols=${INPUT_COLS}"
OUTPUT_DIR="ouput_path"

LIB_DIR=
HADOOP_BIN_PATH=

export HADOOP_CLASSPATH=
export LD_LIBRARY_PATH=

${HADOOP_BIN_PATH}/hadoop fs -rmr ${OUTPUT_DIR}

${HADOOP_BIN_PATH}/hadoop streaming -libjars ${LIB_DIR}/upi-mr.jar \
    -jobconf udw.upi.input="${INPUT_PROJECT}" \
    -input "anything" \
    -output ${OUTPUT_DIR} \
    -inputformat com.baidu.udw.mapred.MultiTableInputFormat \
    -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
    -mapper "python map.py" \
    -reducer "cat" \
    -file "map.py" \
    -file "BdAppCuidSidDau.py \
    -jobconf mapred.job.queue.name="" \
    -jobconf mapred.job.priority=VERY_HIGH \
    -jobconf mapred.map.tasks=1000 \
    -jobconf mapred.reduce.tasks=1000 \
    -jobconf mapred.job.map.capacity=500 \
    -jobconf mapred.job.reduce.capacity=500 \
    -jobconf num.key.fields.for.partition=1 \
    -jobconf stream.num.map.output.key.fields=2 \
    -jobconf mapred.job.name="" \
    -cacheArchive  \


if [ $? -ne 0 ];then
    echo "M/R Job Info fails"
    exit 1
fi

echo "~~~sucessful~~~"

相关文章

  • hadoop处理数据模版

    1 利用hql处理数据 利用hql处理数据需要用到两个文件,handle.py和handle.sql文件,利用py...

  • Hadoop篇(三)——Hadoop Streaming完成的数

    1Hadoop Streaming完成的数据处理详解 1.1 Hadoop Streaming完成大数据处理详解 ...

  • Hadoop压缩

    Hadoop 作为一个较通用的海量数据处理平台,每次运算都会需要处理大量数据,我们会在 Hadoop 系统中对数据...

  • 面向对象

    对大数据的认识 大数据:数据量大 全量数据 数据处理模式:流处理 (storm) 批处理(Hadoop、spar...

  • 大数据学习-spark

    spark比较于Hadoop-MapReduce Hadoop 提供的 MapReduce 框架处理大数据的时候,...

  • hadoop生态

    【67】 Hadoop 用于数据存储和分析的分布式系统。 Hadoop数据处理的核心:数据本地化。 为什么需要ha...

  • hadoop发行版本之间的区别

    Hadoop是一个能够对大量数据进行分布式处理的软件框架。Hadoop 以一种可靠、高效、可伸缩的方式进行数据处理...

  • 膜拜!华为内部都在强推的783页大数据处理系统:Hadoop源代

    前言 大数据处理系统:Hadoop源代码情景分析,采用的是Hadoop2.6。如果你有点野心,想对大数据处理系统有...

  • 2018-11-15

    为什么我们选择Docker来构建Crayon的数据处理平台 随着Hadoop成为大数据的事实标准,Hadoop的生...

  • hadoop简介

    Hadoop简介 *Hadoop 擅长处理一次写入,多次读出的数据 hadoop两大核心组件是 HDFS 和 ma...

网友评论

      本文标题:hadoop处理数据模版

      本文链接:https://www.haomeiwen.com/subject/jcjbxqtx.html