我们的集群使用的是 ucloud的集群,spark版本是1.6。是一个较低的版本,对于mysql的支持非常有限,比如还不能支持in的索引查询等。但是我们分析工作很大一部分都依托于mysql,而且服务端数据经常会对数据分表存储,如下图所示
实现如下图
因此便考虑实现一个查询多表数据源的RDD。
实现一个RDD有三个必要元素。
1、partitions组成改rdd的每个partition
2、dependencies 该RDD的依赖关系
3、iterator 计算每个分区内的元素
import java.util.Properties
import com.kkworld.common.dao.DBPool
import org.apache.spark.rdd.RDD
import org.apache.spark.{Partition, SparkContext, TaskContext}
import scala.collection.mutable.ListBuffer
import scala.reflect.ClassTag
/**
* Created by cuti on 2018/11/30.
*/
class MultiTableRDD[T: ClassTag](sqlUrl: String,
sqlProp: Properties,
sql: String,
tableNumber: Int,
sc: SparkContext,
classTag: Class[T]) extends RDD[T](sc, Nil) {
if (tableNumber == 1) {
//单表查询
assert(!sql.contains("{index}"), "one table do not need table index")
} else {
assert(sql.contains("{index}"), "more than one table need table index")
}
override def compute(split: Partition, context: TaskContext): Iterator[T] = {
val map = Jdbc2BeanUtil.getSchemaMap(classTag)
val pool = new DBPool(sqlUrl, sqlProp)
val listBuffer = ListBuffer[T]()
DBPool.withConnectQuery(pool, statement => {
val partiionedSql = sql.replace("{index}", split.index.toString)
val resultSet = statement.executeQuery(partiionedSql)
while (resultSet.next()) {
val result = Jdbc2BeanUtil.composeResult[T](map, resultSet, classTag)
listBuffer.+=(result)
}
})
listBuffer.iterator
}
override protected def getPartitions: Array[Partition] = {
Range(0, tableNumber).map(index => MysqlPartition(index)).toArray
}
}
case class MysqlPartition(tableIndex: Int) extends Partition {
override def index: Int = tableIndex
}
import com.google.common.collect.Maps;
import java.lang.reflect.Field;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.util.HashMap;
import java.util.Map;
/**
* Created by cuti on 2018/11/30.
*/
public class Jdbc2BeanUtil {
public static <T> HashMap<String, AnnonationHandle<Class>> getSchemaMap(Class<T> classTag) {
HashMap<String, AnnonationHandle<Class>> map = Maps.newHashMap();
for (Field field : classTag.getFields()) {
String name = field.getName();
JDBCField annotation = field.getAnnotation(JDBCField.class);
String description = annotation.description();
map.put(name, new AnnonationHandle(field, description, field.getType()));
}
for (Map.Entry<String, AnnonationHandle<Class>> stringAnnonationHandleEntry : map.entrySet()) {
System.out.println(stringAnnonationHandleEntry);
}
return map;
}
public static <T> T composeResult(HashMap<String, AnnonationHandle<Class>> map, ResultSet resultset, Class<T> classType) {
T student = null;
try {
student = classType.newInstance();
for (Map.Entry<String, AnnonationHandle<Class>> entry : map.entrySet()) {
Object object = resultset.getObject(entry.getValue().annonationName);
entry.getValue().field.set(student, object);
}
} catch (Exception e) {
e.printStackTrace();
}
return student;
}
}
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* Created by cuti on 2018/11/7.
*/
@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
public @interface JDBCField {
String description() default "field name in db";
}
import java.io.Serializable;
import java.lang.reflect.Field;
/**
* Created by cuti on 2018/11/30.
*/
public class AnnonationHandle<T> implements Serializable{
Field field;
String annonationName;
Class<T> fieldType;
public AnnonationHandle(Field field, String annonationName, Class<T> fieldType) {
this.field = field;
this.annonationName = annonationName;
this.fieldType = fieldType;
}
}
实现细节:
1、使用反射和注解实现了如何从resultset中取出对应字段的数据。
2、每个table对应一个分区。将sql中的{index}特殊字段替换成对应的index
网友评论