在Spark1.0中所有的Catalyst Optimizer都是基于规则 (rule) 优化的。为了产生比较好的查询规 则,优化器需要理解数据的特性,于是在Spark2.0中引入了基于代价的优化器 (cost-based optimizer),也就是所谓的CBO。然而,CBO也无法解决很多问题,比如:
- 数据统计信息普遍缺失,统计信息的收集代价较高;
- 储存计算分离的架构使得收集到的统计信息可能不再准确;
- Spark部署在某一单一的硬件架构上,cost很难被估计;
- Spark的UDF(User-defined Function)简单易用,种类繁多,但是对于CBO来说是个黑 盒子,无法估计其cost;
总而言之,由于种种限制,Spark的优化器无法产生最好的Plan。
也许你会想:Spark为什么不解决这个问题呢?这里有很多挑战,比如:
- 统计信息的缺失,统计信息的不准确,那么就是默认依据文件大小来预估表的大小,但是文件 往往是压缩的,尤其是列存储格式,比如parquet 和 ORC,而Spark是基于行处理,如果数据连续重复,file size可能和真实的行存储的真实大小,差别非常之大。这也是为何提高 autoBroadcastJoinThreshold,即使不是太大也可能会导致out of memory;
- Filter复杂、UDFs的使用都会使Spark无法准确估计Join输入数据量的大小。当你的queryplan异常大和复杂的时候,这点尤其明显;
- 其中,Spark3.0中基于运行期的统计信息,将Sort Merge Join 转换为Broadcast Hash Join。
基于RBO优化:
left join case
var appSql: String ="""|select| *|from| tab_spark_test as t1|left join tab_spark_test_2 as t2|on t1.id = t2.id|and t1.id > 5+5""".stripMarginsparkSession.sql("use default;")sparkSession.sql(appSql).explain(mode = "extended")
基于CBO优化:
CBO 优化主要在物理计划层面,原理是计算所有可能的物理计划的代价,并挑选出代价最小的物理执行计划。充分考虑了数据本身的特点(如大小、分布)以及操作算子的特点(中间结果集的分布及大小)及代价,从而更好的选择执行代价最小的物理执行计划。
而每个执行节点的代价,分为两个部分:
1、该执行节点对数据集的影响,即该节点输出数据集的大小与分布;
2、该执行节点操作算子的代价。
每个操作算子的代价相对固定,可用规则来描述。而执行节点输出数据集的大小与分布,分为两个部分:
1、初始数据集,也即原始表,其数据集的大小与分布可直接通过统计得到;
2、中间节点输出数据集的大小与分布可由其输入数据集的信息与操作本身的特点推算。
需要先执行特定的 SQL 语句来收集所需的表和列的统计信息。
--表级别统计信息
ANALYZE TABLE 表名 COMPUTE STATISTICS
--生成列级别统计信息
ANALYZE TABLE 表名 COMPUTE STATISTICS FOR COLUMNS 列 1,列 2,列 3--显示统计信息
DESC FORMATTED 表名
--显示列统计信息
DESC FORMATTED 表名 列名
没有执行 ANALYZE状态
执行 ANALYZE后,发现多了很多spark.sql.statistics信息
使用 CBO
通过 "spark.sql.cbo.enabled" 来开启,默认是 false。配置开启 CBO 后,CBO 优化器可以基于表和列的统计信息,进行一系列的估算,最终选择出最优的查询计划。比如:Build 侧选择、优化 Join 类型、优化多表 Join 顺序等。
参数 |
描述 | 默认值 |
spark.sql.cbo.enabled |
true 表示打开,false 表示关闭。 要使用该功能,需确保相关表和列的统计信息已经生成。 |
false |
spark.sql.cbo.joinReorder.enabled |
使用 CBO 来自动调整连续的 inner join 的顺序。 true:表示打开,false:表示关闭 要使用该功能,需确保相关表和列的统计信息已经生成,且 CBO 总开关打开。 |
false |
spark.sql.cbo.joinReorder.dp.threshold |
使用 CBO 来自动调整连续 inner join 的表的个数阈值。 如果超出该阈值,则不会调整 join 顺序。 |
10 |
def main(args: Array[String]): Unit = {val sparkConf: SparkConf = new SparkConf().setAppName("CBO").set("spark.sql.cbo.enabled", "true").set("spark.sql.cbo.joinReorder.enabled", "true").setMaster("local[*]")val sparkSession: SparkSession = Util.SparkSession2hive(sparkConf)var appSql: String ="""|select| t1.name,count(1)|from| tab_spark_test as t1|left join tab_spark_test_2 as t2|on t1.id = t2.id|group by t1.name""".stripMarginsparkSession.sql("use default;")sparkSession.sql(appSql).show()while (true) {}}
123