spark-shell
// 拷贝hive-site.xml到spark/conf,和将mysql驱动包放入spark/jars
$ spark-shell --master local[2]
scala> spark.table("emp").show
+---+------+
| id| name|
+---+------+
| 1|baozi1|
| 2|baozi2|
| 3|baozi3|
+---+------+
scala> spark.sql("show tables").show;
+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
| default| emp| false|
+--------+---------+-----------+
scala> spark.sql("select * from emp").show;
+---+------+
| id| name|
+---+------+
| 1|baozi1|
| 2|baozi2|
| 3|baozi3|
+---+------+
启动时能看到hive.metastore.schema.verification的Error信息,修改spark的conf下的hive-site.xml,禁用掉。
<property>
<name>hive.metastore.schema.verification</name>
<value>false</value>
</property>
spark-sql
启动
$ spark-sql --master local[2]
测试
> select * from emp;
启动时一大堆info,可以去掉
$ cp ~/tools/spark/conf/log4j.properties.template ~/tools/spark/conf/log4j.properties
$ vim ~/tools/spark/conf/log4j.properties
改成warn
thriftserver/beeline
启动thriftserver
$ start-thriftserver.sh --master local[2]
启动beeline。默认thriftserver的端口是10000,可以通过启动thriftserver时加入--hiveconf hive.server2.thrift.port=<listening-port>参数更改
$ beeline -u jdbc:hive2://localhost:10000 -n user000
使用示例
0: jdbc:hive2://localhost:10000> show tables;
+-----------+------------+--------------+--+
| database | tableName | isTemporary |
+-----------+------------+--------------+--+
| default | emp | false |
+-----------+------------+--------------+--+
0: jdbc:hive2://localhost:10000> select * from emp;
+-----+---------+--+
| id | name |
+-----+---------+--+
| 1 | baozi1 |
| 2 | baozi2 |
| 3 | baozi3 |
+-----+---------+--+
thriftserver与spark-shell/spark-sql的区别
1. spark-shell、spark-sql都是一个单独的spark application。
2. 而thriftserver,不管启动多少个客户端(beeline/code),都是一个spark application,解决多个客户端数据共享的问题。
thriftserver/jdbc
不要忘记启动thriftserver
<!-- hive-jdbc:也可以用org.apache.hive -->
<dependency>
<groupId>org.spark-project.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>1.2.1.spark2</version>
</dependency>
import java.sql.DriverManager
/**
* 通过Thriftserver/JDBC方式访问
*/
object ThriftServerApp {
def main(args: Array[String]): Unit = {
Class.forName("org.apache.hive.jdbc.HiveDriver")
// 启动beeline的参数,放到这里
val conn = DriverManager.getConnection("jdbc:hive2://host000:10000", "user000", "")
val pstmt = conn.prepareStatement("select * from emp")
val rs = pstmt.executeQuery()
while (rs.next()) {
printf("id:%d,name:%s \n", rs.getInt("id"),rs.getString("name"))
}
rs.close()
pstmt.close()
conn.close()
}
}
写出到Hive
scala> spark.table("emp").show
+---+------+
| id| name|
+---+------+
| 1|baozi1|
| 2|baozi2|
| 3|baozi3|
+---+------+
scala> spark.table("emp").write.saveAsTable("emp1")
scala> spark.table("emp1").show
+---+------+
| id| name|
+---+------+
| 1|baozi1|
| 2|baozi2|
| 3|baozi3|
+---+------+
设置写出时的分区数:spark.sql.shuffle.partitions,默认是200
scala> spark.sqlContext.setConf("spark.sql.shuffle.partitions","10")
scala> spark.sqlContext.getConf("spark.sql.shuffle.partitions")
res6: String = 10
scala> spark.table("emp").write.saveAsTable("emp2")
scala> spark.table("emp2").show
+---+------+
| id| name|
+---+------+
| 1|baozi1|
| 2|baozi2|
| 3|baozi3|
+---+------+
网友评论