美文网首页
spark查询分表mysql

spark查询分表mysql

作者: Cu提 | 来源:发表于2018-11-30 20:34 被阅读0次

    我们的集群使用的是 ucloud的集群,spark版本是1.6。是一个较低的版本,对于mysql的支持非常有限,比如还不能支持in的索引查询等。但是我们分析工作很大一部分都依托于mysql,而且服务端数据经常会对数据分表存储,如下图所示
    实现如下图


    因此便考虑实现一个查询多表数据源的RDD。
    实现一个RDD有三个必要元素。
    1、partitions组成改rdd的每个partition
    2、dependencies 该RDD的依赖关系
    3、iterator 计算每个分区内的元素

    import java.util.Properties
    
    import com.kkworld.common.dao.DBPool
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{Partition, SparkContext, TaskContext}
    
    import scala.collection.mutable.ListBuffer
    import scala.reflect.ClassTag
    
    /**
      * Created by cuti on 2018/11/30.
      */
    
    class MultiTableRDD[T: ClassTag](sqlUrl: String,
                                     sqlProp: Properties,
                                     sql: String,
                                     tableNumber: Int,
                                     sc: SparkContext,
                                     classTag: Class[T]) extends RDD[T](sc, Nil) {
    
    
      if (tableNumber == 1) {
        //单表查询
        assert(!sql.contains("{index}"), "one table do not need table index")
      } else {
        assert(sql.contains("{index}"), "more than one  table  need table index")
      }
    
    
      override def compute(split: Partition, context: TaskContext): Iterator[T] = {
        val map = Jdbc2BeanUtil.getSchemaMap(classTag)
        val pool = new DBPool(sqlUrl, sqlProp)
        val listBuffer = ListBuffer[T]()
        DBPool.withConnectQuery(pool, statement => {
          val partiionedSql = sql.replace("{index}", split.index.toString)
          val resultSet = statement.executeQuery(partiionedSql)
          while (resultSet.next()) {
            val result = Jdbc2BeanUtil.composeResult[T](map, resultSet, classTag)
            listBuffer.+=(result)
          }
        })
        listBuffer.iterator
      }
    
      override protected def getPartitions: Array[Partition] = {
        Range(0, tableNumber).map(index => MysqlPartition(index)).toArray
      }
    
    }
    
    case class MysqlPartition(tableIndex: Int) extends Partition {
      override def index: Int = tableIndex
    }
    import com.google.common.collect.Maps;
    
    import java.lang.reflect.Field;
    import java.sql.ResultSet;
    import java.sql.ResultSetMetaData;
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * Created by cuti on 2018/11/30.
     */
    public class Jdbc2BeanUtil {
    
        public static <T> HashMap<String, AnnonationHandle<Class>> getSchemaMap(Class<T> classTag) {
            HashMap<String, AnnonationHandle<Class>> map = Maps.newHashMap();
            for (Field field : classTag.getFields()) {
                String name = field.getName();
                JDBCField annotation = field.getAnnotation(JDBCField.class);
                String description = annotation.description();
                map.put(name, new AnnonationHandle(field, description, field.getType()));
            }
            for (Map.Entry<String, AnnonationHandle<Class>> stringAnnonationHandleEntry : map.entrySet()) {
    
                System.out.println(stringAnnonationHandleEntry);
            }
            return map;
        }
    
    
        public static <T> T composeResult(HashMap<String, AnnonationHandle<Class>> map, ResultSet resultset, Class<T> classType) {
    
            T student = null;
    
            try {
                student = classType.newInstance();
                for (Map.Entry<String, AnnonationHandle<Class>> entry : map.entrySet()) {
                    Object object = resultset.getObject(entry.getValue().annonationName);
                    entry.getValue().field.set(student, object);
                }
    
            } catch (Exception e) {
                e.printStackTrace();
            }
            return student;
        }
    
    
    }
    import java.lang.annotation.ElementType;
    import java.lang.annotation.Retention;
    import java.lang.annotation.RetentionPolicy;
    import java.lang.annotation.Target;
    
    /**
     * Created by cuti on 2018/11/7.
     */
    @Target(ElementType.FIELD)
    @Retention(RetentionPolicy.RUNTIME)
    public @interface JDBCField {
        String description() default "field name in db";
    
    }
    import java.io.Serializable;
    import java.lang.reflect.Field;
    
    /**
     * Created by cuti on 2018/11/30.
     */
    public class AnnonationHandle<T> implements Serializable{
    
        Field field;
        String annonationName;
        Class<T> fieldType;
    
        public AnnonationHandle(Field field, String annonationName, Class<T> fieldType) {
            this.field = field;
            this.annonationName = annonationName;
            this.fieldType = fieldType;
        }
    }
    
    

    实现细节:
    1、使用反射和注解实现了如何从resultset中取出对应字段的数据。
    2、每个table对应一个分区。将sql中的{index}特殊字段替换成对应的index

    相关文章

      网友评论

          本文标题:spark查询分表mysql

          本文链接:https://www.haomeiwen.com/subject/bolkcqtx.html