美文网首页
spark查询分表mysql

spark查询分表mysql

作者: Cu提 | 来源:发表于2018-11-30 20:34 被阅读0次

我们的集群使用的是 ucloud的集群,spark版本是1.6。是一个较低的版本,对于mysql的支持非常有限,比如还不能支持in的索引查询等。但是我们分析工作很大一部分都依托于mysql,而且服务端数据经常会对数据分表存储,如下图所示
实现如下图


因此便考虑实现一个查询多表数据源的RDD。
实现一个RDD有三个必要元素。
1、partitions组成改rdd的每个partition
2、dependencies 该RDD的依赖关系
3、iterator 计算每个分区内的元素

import java.util.Properties

import com.kkworld.common.dao.DBPool
import org.apache.spark.rdd.RDD
import org.apache.spark.{Partition, SparkContext, TaskContext}

import scala.collection.mutable.ListBuffer
import scala.reflect.ClassTag

/**
  * Created by cuti on 2018/11/30.
  */

class MultiTableRDD[T: ClassTag](sqlUrl: String,
                                 sqlProp: Properties,
                                 sql: String,
                                 tableNumber: Int,
                                 sc: SparkContext,
                                 classTag: Class[T]) extends RDD[T](sc, Nil) {


  if (tableNumber == 1) {
    //单表查询
    assert(!sql.contains("{index}"), "one table do not need table index")
  } else {
    assert(sql.contains("{index}"), "more than one  table  need table index")
  }


  override def compute(split: Partition, context: TaskContext): Iterator[T] = {
    val map = Jdbc2BeanUtil.getSchemaMap(classTag)
    val pool = new DBPool(sqlUrl, sqlProp)
    val listBuffer = ListBuffer[T]()
    DBPool.withConnectQuery(pool, statement => {
      val partiionedSql = sql.replace("{index}", split.index.toString)
      val resultSet = statement.executeQuery(partiionedSql)
      while (resultSet.next()) {
        val result = Jdbc2BeanUtil.composeResult[T](map, resultSet, classTag)
        listBuffer.+=(result)
      }
    })
    listBuffer.iterator
  }

  override protected def getPartitions: Array[Partition] = {
    Range(0, tableNumber).map(index => MysqlPartition(index)).toArray
  }

}

case class MysqlPartition(tableIndex: Int) extends Partition {
  override def index: Int = tableIndex
}
import com.google.common.collect.Maps;

import java.lang.reflect.Field;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.util.HashMap;
import java.util.Map;

/**
 * Created by cuti on 2018/11/30.
 */
public class Jdbc2BeanUtil {

    public static <T> HashMap<String, AnnonationHandle<Class>> getSchemaMap(Class<T> classTag) {
        HashMap<String, AnnonationHandle<Class>> map = Maps.newHashMap();
        for (Field field : classTag.getFields()) {
            String name = field.getName();
            JDBCField annotation = field.getAnnotation(JDBCField.class);
            String description = annotation.description();
            map.put(name, new AnnonationHandle(field, description, field.getType()));
        }
        for (Map.Entry<String, AnnonationHandle<Class>> stringAnnonationHandleEntry : map.entrySet()) {

            System.out.println(stringAnnonationHandleEntry);
        }
        return map;
    }


    public static <T> T composeResult(HashMap<String, AnnonationHandle<Class>> map, ResultSet resultset, Class<T> classType) {

        T student = null;

        try {
            student = classType.newInstance();
            for (Map.Entry<String, AnnonationHandle<Class>> entry : map.entrySet()) {
                Object object = resultset.getObject(entry.getValue().annonationName);
                entry.getValue().field.set(student, object);
            }

        } catch (Exception e) {
            e.printStackTrace();
        }
        return student;
    }


}
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
 * Created by cuti on 2018/11/7.
 */
@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
public @interface JDBCField {
    String description() default "field name in db";

}
import java.io.Serializable;
import java.lang.reflect.Field;

/**
 * Created by cuti on 2018/11/30.
 */
public class AnnonationHandle<T> implements Serializable{

    Field field;
    String annonationName;
    Class<T> fieldType;

    public AnnonationHandle(Field field, String annonationName, Class<T> fieldType) {
        this.field = field;
        this.annonationName = annonationName;
        this.fieldType = fieldType;
    }
}

实现细节:
1、使用反射和注解实现了如何从resultset中取出对应字段的数据。
2、每个table对应一个分区。将sql中的{index}特殊字段替换成对应的index

相关文章

  • spark查询分表mysql

    我们的集群使用的是 ucloud的集群,spark版本是1.6。是一个较低的版本,对于mysql的支持非常有限,比...

  • mysql优化

    Mysql分库分表方案 Mysql分库分表方案 1.为什么要分表: 当一张表的数据达到几千万时,你查询一次所花的时...

  • 2019-02-22

    mySQL Navicat for mySQL 关系型数据库:用表传数据 如何建表:查询→新建查询 注释: -- ...

  • 2021-01-20

    Mysql查询excel表

  • mysql2——11-21

    mysql2 mysql查询 查询整个表 从表查精确查询字段 精确查询多个字段AND为必传字段,key和value...

  • Mysql分库分表方案

    Mysql分库分表方案 1.为什么要分表: 当一张表的数据达到几千万时,你查询一次所花的时间会变多,如果有联合查询...

  • Mysql分库分表方案

    Mysql分库分表方案 1.为什么要分表: 当一张表的数据达到几千万时,你查询一次所花的时间会变多,如果有联合查询...

  • 查询MYSQL表注释以及字段注释

    查询MYSQL数据库所有表名以及表注释 查询MYSQL数据库所有字段名以及字段注释

  • mysql表关系

    mysql数据库 知识要点: 单表查询 子查询 联表查询 事务 在进行查询之前,我们要先建好关系表,并往数据表中插...

  • mysql查询和创建数据表,排序

    mysql基本查询:查询所有: select * from 表名; 创建数据表:(举例)create table...

网友评论

      本文标题:spark查询分表mysql

      本文链接:https://www.haomeiwen.com/subject/bolkcqtx.html