Spark内部针对DataSource表的查询做了缓存优化,使得在同一任务中多次访问同一张DataSource表场景下可以跳过重复的获取表meta数据过程,以提升表读取性能。缓存的内容是表名和其对应的LogicalRelation
。
缓存机制:
SQL语法解析后进行Analyzer的过程,因为我们关注表的缓存机制,所以只看表分析中的一个关键Rule:ResolveRelations
。Analyzer 对Parsed Logical Plan
进行遍历,发现UnresolvedRelation
后,就对它启动合规性检查:HiveMetastoreCatalog
的lookupRelation
方法。
首先判断该表是否是DataSource表:
1)从externalCatalog中getTable,如获取不到就抛出表不存在异常;
2)判断是否是DataSource表:表的data source provider
(比如parquet,json,hive等)不为空,并且不等于Hive;
3)不是DataSource表不进行缓存优化。
如果判定是DataSource表就会进行表的Cache逻辑,以下是Cache实例:
protected[hive] val cachedDataSourceTables: LoadingCache[QualifiedTableName, LogicalPlan]
LoadingCache
是guava
中的一个缓存接口,spark里面多次用到。
缓存的内容是库表名和对应的LogicalPlan(其实是LogicalRelation,LogicalRelation是叶子节点的LogicalPlan)
val dataSourceTable = cachedDataSourceTables(qualifiedTableName)
逻辑是:如果表存在于缓存,返回表对应的LogicalRelation信息,如果不存在,通过CacheLoader重写的load方法获取LogicalRelation信息,载入缓存。
上面的代码底层调用了LocalCache的getOrLoad方法。
1)如果表是第一次访问,那么缓存中是不存在表的缓存信息,通过load方法生成LogicalRelation,存入缓存;
2)表之前访问过,并于缓存中存在,直接返回缓存中对应该表的LogicalRelation。
缓存何时失效
两种情况下表的缓存会失效:
- 超出缓存实例配置的缓存个数阈值;
- 主动执行refresh table操作。
缓存阈值
spark在定义cachedDataSourceTables缓存实例的时候指定了表的缓存上限:1000
CacheBuilder.newBuilder().maximumSize(1000).build(cacheLoader)
当缓存接近上限时,CacheBuild会通过一定的机制将一些表缓存信息驱逐出LoadingCache。
refresh table
主动执行refresh table会使cachedDataSourceTables中表的缓存信息失效:先remove掉之前的缓存数据,再重新load。
下面场景会被动触发refresh table:
1)TRUNCATE TABLE
2)ANALYZE TABLE
3)DROP TABLE
4)ALTER TABLE table RECOVER PARTITIONS
5)Insert into table
问题解决
如果有常驻的app对DataSource表进行查询,而又有外部的app对同一张DataSource表进行写入,那么由于缓存机制,spark没有去主动感知表底层元数据的变化,查询跟实际数据会不一致。有下面两种方案可以解决:
方案:
- 在每条语句前加refresh table语句;
- 将缓存上限设置为0,不缓存表的信息。
两种方式都是重新加载表的meta信息,性能上都有损耗。个人感觉方案2较为合适,比较轻量,不需要增加执行逻辑。
网友评论