需求背景
使用spark sql形式操作hive库表数据时,hive-site.xml中的钩子函数不会执行,原因是spark操作hive是独立操作的,简单理解就是spark利用hive的jar方法之间访问底层数据文件。
spark sql访问自己的库表,权限如何控制,实现思路和hive权限控制思路一样,在执行spark sql之前解析sql,校验权限,修改SparkSession中的sql方法
/**
* Executes a SQL query using Spark, returning the result as a `DataFrame`.
* The dialect that is used for SQL parsing can be configured with 'spark.sql.dialect'.
*
* @since 2.0.0
*/
def sql(sqlText: String): DataFrame = {
val osName = System.getProperty("user.name")
if (!"hadoop".equals(osName)) {
propertiesFile = Option(propertiesFile).getOrElse(Utils.getDefaultPropertiesFile(sys.env))
val properties = Utils.getPropertiesFromFile(propertiesFile)
val clusterType: String = properties.get("permissions.cluster.type").get
val checkUrl: String = properties.get("permissions.check.url").get
if (clusterType == null || checkUrl == null) {
log.error(s"clusterType is ${clusterType},checkUrl is ${checkUrl}")
throw new Exception(s"clusterType or checkUrl is null")
}
val sparkUser = sparkContext.sparkUser
val command = sqlText;
val databaseCurrentName = this.sessionState.catalog.getCurrentDatabase
log.info("-------------------------")
log.info("user:" + sparkUser)
log.info("sqlText:" + sqlText)
log.info("currentDatabase:" + this.catalog.currentDatabase)
log.info("sessionState currentDatabase:" + this.sessionState.catalog.getCurrentDatabase)
log.info("clusterType:" + clusterType)
log.info("checkUrl:" + checkUrl)
log.info("-------------------------")
val tempViews = sessionState.catalog.getTempViews()
val checkPermissionModel = new CheckPermissionModel
checkPermissionModel.setCommand(command)
checkPermissionModel.setUserName(sparkUser)
checkPermissionModel.setDatabaseCurrentName(databaseCurrentName)
checkPermissionModel.setTempViews(tempViews)
checkPermissionModel.setClusterType(ClusterType.valueOf(clusterType))
checkPermissionModel.setRequestType(RequestType.SPARK_REQUEST)
val post = HttpTools.sendPost(checkUrl, JacksonTools.obj2json(checkPermissionModel))
val result = JacksonTools.json2pojo(post, classOf[Result])
val status = result.getStatus
if ("0" == status) throw new Exception("request validate permission failed...")
val obj = result.getObj;
val model = JacksonTools.json2pojo(JacksonTools.obj2json(obj), classOf[CheckPermissionModel])
val validateFlag = model.isValidateFlag
if (!validateFlag) {
val msg = "user:" + sparkUser + " has not permissions " + "error message :" + model.getMsg()
throw new Exception(msg)
}
}
Dataset.ofRows(self, sessionState.sqlParser.parsePlan(sqlText))
}
注意spark创建的临时表,默认哪个用户创建的拥有所有权限,修改SessionCatalog类,添加获取当前用户创建的所有临时表
/**
* Return a local temporary views.
*/
def getTempViews(): util.ArrayList[String] = synchronized {
val list = new util.ArrayList[String]();
tempViews.keys.foreach({ p =>
list.add(p)
})
list
}
调用权限校验服务,返回校验结果
网友评论