pom 依赖:
<!-- https://mvnrepository.com/artifact/com.hortonworks/shc-core -->
<dependency>
<groupId>com.hortonworks</groupId>
<artifactId>shc-core</artifactId>
<version>1.1.1-2.1-s_2.11</version>
</dependency>
部分代码:
package xxxx.xxxx
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.execution.datasources.hbase._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* 提交方式
* spark2-submit --name HBASE-CONNECTOR --files /etc/hbase/conf/hbase-site.xml --class xxx.xxxx --master yarn --deploy-mode cluster --driver-memory 2g --driver-cores 2 --executor-memory 2g --executor-cores 1 --num-executors 2 wmanxiety-1.0-SNAPSHOT.jar --day 20180810 --repartition 500 --interval 7
*/
object MileageAnxiety {
def cat = s"""{
|"table":{"namespace":"default", "name":"trip_signal", "tableCoder":"PrimitiveType"},
|"rowkey":"key",
|"columns":{
|"rowkey" :{"cf":"rowkey", "col":"key", "type":"string"},
|"vin" :{"cf":"info", "col":"vin", "type":"string"},
|"tripStatus" :{"cf":"info", "col":"tripStatus", "type":"string"},
|"tripStartTime":{"cf":"info", "col":"tripStartTime", "type":"string"},
|"tripEndTime" :{"cf":"info", "col":"tripEndTime", "type":"string"},
|"tripDistance" :{"cf":"info", "col":"tripDistance", "type":"string"},
|"startSoc" :{"cf":"info", "col":"startSoc", "type":"string"},
|"endSoc" :{"cf":"info", "col":"endSoc", "type":"string"},
|"maxSpeed" :{"cf":"info", "col":"maxSpeed", "type":"string"},
|"startMileage" :{"cf":"info", "col":"startMileage", "type":"string"},
|"coordinate" :{"cf":"info", "col":"coordinate", "type":"string"}
|}
|}""".stripMargin
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.WARN)
val spark = SparkSession.builder()
.appName(this.getClass.getName)
.master("local[2]")
.getOrCreate()
val sc = spark.sparkContext
val sqlContext = spark.sqlContext
import sqlContext.implicits._
def withCatalog(cat: String): DataFrame = {
sqlContext
.read
.options(Map(HBaseTableCatalog.tableCatalog -> cat))
.format("org.apache.spark.sql.execution.datasources.hbase")
.load()
}
val df = withCatalog(cat)
df.show()
spark.stop()
}
}
网友评论