前提
- spark 2.4.4,依赖Scala 2.12
- CDH集群为6.0.1(hadoop对应版本为3.0.0)
安装scala
- 下载地址 https://www.scala-lang.org/files/archive/scala-2.12.10.tgz
- 配置环境变量
SCALA_HOME
和path
安装maven
安装比较简单,过程略。
安装winutils(windows才需要安装)
参考https://www.jianshu.com/p/6b0367a3bd42
idea安装scala插件
打开settings 安装scala插件安装完成后重启idea
idea配置
打开Project Structure添加scala Libraries
选择scala目录
idea 创建项目
创建maven项目点击“Next”
输入项目信息
点击“Next”
完成
点击“Finish”即可
项目创建好以后,暂时删除test目录,把java文件夹修改为scala(这一步不是必须,看个人爱好)
spark local模式连接远程hadoop集群
- 创建test.txt文件,写入几行数据,上传到hdfs中
- 把hadoop集群的配置文件
core-site.xml
和hdfs-site.xml
复制到项目的resources
目录下
经过测试,linux下运行idea,没有这两个文件也能连上hadoop集群
- 创建scala代码
package hdfs
import org.apache.spark.sql.SparkSession
object HdfsTest {
def main(args: Array[String]): Unit = {
// 如果在windows本地跑,需要从widnows访问HDFS,需要指定一个合法的身份
// System.setProperty("HADOOP_USER_NAME", "hdfs")
val spark = SparkSession.builder()
.appName("hdfs-test")
.master("local")
// 设置参数
.config("dfs.client.use.datanode.hostname", "false")
.getOrCreate();
// spark.sparkContext.setLogLevel("DEBUG")
//支持通配符路径,支持压缩文件读取
val path = "hdfs://10.121.138.145:8020/test.txt"
val rdd = spark.read.textFile(path)
//统计数量
println("count = "+rdd.count())
//停止spark
spark.stop()
}
}
重点说明:
- 如果在windows下运行,请添加
System.setProperty("HADOOP_USER_NAME", "hdfs")
代码,否则会提示Permission Denied
- CDH默认
dfs.client.use.datanode.hostname
为true
,意思为使用hostname连接hdfs,如果我们不修改本机的host文件,本地是无法连接datanode机器。有三种方式解决
- 修改本机的host,需要配置上所有的hadoop的host及ip
- 修改
hdfs-site.xml
配置中的dfs.client.use.datanode.hostname
为`false - 代码中通过
sparkSession
设置.config("dfs.client.use.datanode.hostname", "false")
(推荐)
直接运行此代码即可,如果输出test.txt中的文本行数,恭喜你实验成功!!
spark local模式连接远程hive集群
连接hive集群,没有找到类似
dfs.client.use.datanode.hostname
hive相关的配置,所以只能通过hosts文件来解决。
第一步:修改本机hosts文件,配置hive集群的hostname及ip
修改本机hosts文件,让本机能够解析hive集群中的机器名及ip
比如我的配置:
10.121.138.145 lfg00
10.121.138.146 lfg01
10.121.138.147 lfg02
10.121.138.148 lfg03
第二步:连接hive
这步有两种方式:
- 复制hive的配置文件hive-site.xml至项目的resources文件夹下
- 在代码中增加
.config("hive.metastore.uris","thrift://lfg00:9083")
第三步:代码
package hdfs
import org.apache.spark.sql.SparkSession
import utils.GeoHash
/**
* 从hadoop读取数据,计算geohash并写入hadoop/hive
*/
object GeohashTest {
def main(args: Array[String]): Unit = {
// 如果在windows本地跑,需要从widnows访问HDFS,需要指定一个合法的身份
// System.setProperty("HADOOP_USER_NAME", "hdfs")
val spark = SparkSession.builder()
.appName("hdfs-test")
.master("local")
// use ip
.config("dfs.client.use.datanode.hostname", "false")
// 如果输出目录已存在,则直接覆盖
.config("spark.hadoop.validateOutputSpecs", "false")
// 连接hive地址,如果复制hive-site.xml到resources下,则不需要此配置
// .config("hive.metastore.uris","thrift://lfg00:9083")
// 启用hive支持
.enableHiveSupport()
.getOrCreate();
// spark.sparkContext.setLogLevel("DEBUG")
//支持通配符路径,支持压缩文件读取
val inputPath = "hdfs://10.121.138.145:8020/test/put2/*"
writeToHive(spark,inputPath)
//停止spark
spark.stop()
}
/**
* 写入hive
* @param spark
* @param inputPath
*/
def writeToHive(spark:SparkSession, inputPath:String): Unit = {
val cols = List("lat", "lng", "value", "hash")
val rdd = spark.read.textFile(inputPath)
import spark.implicits._
val out = rdd.map(line => {
val columns = line.split(",")
// 计算geohash
val hash = GeoHash.encode(columns(0).toDouble, columns(1).toDouble, 8)
(columns(0), columns(1), columns(2), hash)
}).toDF(cols: _*)
out.printSchema()
// 创建临时表
out.createOrReplaceTempView("tmp_data")
// 把临时表数据写入hive
spark.sql(
"""
| insert overwrite table unicomm_poc.test_geohash select * from tmp_data
|""".stripMargin)
}
}
网友评论