SerDe是Serialize/Deserilize的简称,目的是用于序列化和反序列化。
序列化作用
序列化是对象转换为字节序列的过程。
序列化是字节序列恢复为对象的过程。
对象的序列化主要有两种用途:对象的持久化,即把对象转换成字节序列后保存到文件中;对象数据的网络传送。
除了上面两点, hive的序列化的作用还包括:Hive的反序列化是对key/value反序列化成hive table的每个列的值。Hive可以方便的将数据加载到表中而不需要对数据进行转换,这样在处理海量数据时可以节省大量的时间。
SerDe说明hive如何去处理一条记录,包括Serialize/Deserilize两个功能, Serialize把hive使用的java object转换成能写入hdfs的字节序列,或者其他系统能识别的流文件。Deserilize把字符串或者二进制流转换成hive能识别的java object对象。比如:select语句会用到Serialize对象, 把hdfs数据解析出来;insert语句会使用Deserilize,数据写入hdfs系统,需要把数据序列化。
SerDe使用
hive创建表时, 通过自定义的SerDe或使用Hive内置的SerDe类型指定数据的序列化和反序列化方式。
CREATE [EXTERNAL] TABLE [IF NOT EXISTS] table_name
[(col_name data_type [COMMENT col_comment], ...)]
[COMMENT table_comment]
[PARTITIONED BY (col_name data_type [COMMENT col_comment], ...)]
[CLUSTERED BY (col_name, col_name, ...)
[SORTED BY (col_name [ASC|DESC], ...)]
INTO num_buckets BUCKETS]
[ROW FORMAT row_format]
[STORED AS file_format]
[LOCATION hdfs_path]
如上创建表语句, 使用row format 参数说明SerDe的类型。
SerDe包括内置类型
Avro
ORC
RegEx
Thrift
Parquet
CSV
JsonSerDe
自定义类型
自定义类型使用的步骤:
- 定义一个类, 继承抽象类AbstractSerDe, 实现initialize, deserialize和
package com.coder4.hive;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.serde.Constants;
import org.apache.hadoop.hive.serde2.AbstractSerDe;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.SerDeStats;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
public class MySerDe extends AbstractSerDe {
// params
private List<String> columnNames = null;
private List<TypeInfo> columnTypes = null;
private ObjectInspector objectInspector = null;
// seperator
private String nullString = null;
private String lineSep = null;
private String kvSep = null;
@Override
public void initialize(Configuration conf, Properties tbl)
throws SerDeException {
// Read sep
lineSep = "\n";
kvSep = "=";
nullString = tbl.getProperty(Constants.SERIALIZATION_NULL_FORMAT, "");
// Read Column Names
String columnNameProp = tbl.getProperty(Constants.LIST_COLUMNS);
if (columnNameProp != null && columnNameProp.length() > 0) {
columnNames = Arrays.asList(columnNameProp.split(","));
} else {
columnNames = new ArrayList<String>();
}
// Read Column Types
String columnTypeProp = tbl.getProperty(Constants.LIST_COLUMN_TYPES);
// default all string
if (columnTypeProp == null) {
String[] types = new String[columnNames.size()];
Arrays.fill(types, 0, types.length, Constants.STRING_TYPE_NAME);
columnTypeProp = StringUtils.join(types, ":");
}
columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProp);
// Check column and types equals
if (columnTypes.size() != columnNames.size()) {
throw new SerDeException("len(columnNames) != len(columntTypes)");
}
// Create ObjectInspectors from the type information for each column
List<ObjectInspector> columnOIs = new ArrayList<ObjectInspector>();
ObjectInspector oi;
for (int c = 0; c < columnNames.size(); c++) {
oi = TypeInfoUtils
.getStandardJavaObjectInspectorFromTypeInfo(columnTypes
.get(c));
columnOIs.add(oi);
}
objectInspector = ObjectInspectorFactory
.getStandardStructObjectInspector(columnNames, columnOIs);
}
@Override
public Object deserialize(Writable wr) throws SerDeException {
// Split to kv pair
if (wr == null)
return null;
Map<String, String> kvMap = new HashMap<String, String>();
Text text = (Text) wr;
for (String kv : text.toString().split(lineSep)) {
String[] pair = kv.split(kvSep);
if (pair.length == 2) {
kvMap.put(pair[0], pair[1]);
}
}
// Set according to col_names and col_types
ArrayList<Object> row = new ArrayList<Object>();
String colName = null;
TypeInfo type_info = null;
Object obj = null;
for (int i = 0; i < columnNames.size(); i++) {
colName = columnNames.get(i);
type_info = columnTypes.get(i);
obj = null;
if (type_info.getCategory() == ObjectInspector.Category.PRIMITIVE) {
PrimitiveTypeInfo p_type_info = (PrimitiveTypeInfo) type_info;
switch (p_type_info.getPrimitiveCategory()) {
case STRING:
obj = StringUtils.defaultString(kvMap.get(colName), "");
break;
case LONG:
case INT:
try {
obj = Long.parseLong(kvMap.get(colName));
} catch (Exception e) {
}
}
}
row.add(obj);
}
return row;
}
@Override
public ObjectInspector getObjectInspector() throws SerDeException {
return objectInspector;
}
@Override
public SerDeStats getSerDeStats() {
return null;
}
@Override
public Class<? extends Writable> getSerializedClass() {
return Text.class;
}
@Override
public Writable serialize(Object arg0, ObjectInspector arg1)
throws SerDeException {
return null;
}
}
- 添加自定义的SerDe类的jar包
hive > add jar MySerDe.jar
- 创建表格时属性row fromat指定自定义的SerDe类
CREATE EXTERNAL TABLE IF NOT EXISTS teacher (
id BIGINT,
name STRING,
age INT)
ROW FORMAT SERDE 'com.coder4.hive.MySerDe'
STORED AS TEXTFILE
LOCATION '/usr/hive/text/'
网友评论
参看官网:
https://cwiki.apache.org/confluence/display/Hive/DeveloperGuide#DeveloperGuide-HiveSerDe
HDFS files --> InputFileFormat --> <key, value> --> Deserializer --> Row object
Row object --> Serializer --> <key, value> --> OutputFileFormat --> HDFS files