本系列是Spark系列分享的第三期。第一期分享了Spark Core的一些基础原理和一些基本概念,包括一些核心组件。Spark的所有组件都围绕Spark Core来运转,其中最活跃的一个上层组件是Spark SQL。第二期分享则专门介绍了Spark SQL的基本架构和原理。从第三期开始,后续的分享都围绕着Spark SQL展开,尤其是Spark SQL的优化。Spark作为一个常用的批处理大数据引擎,在各大公司的这个业务线上,存在于离线计算及一些机器查询的场景,而这些场景下最常用的方式是兼具易用性和学习门槛低等特点的 Spark SQL。今天的分享是关于解析层及其优化,解析层处于Spark SQL处理流程的第一个阶段,和后续将要分享的优化内容相比,较为简单且易于大家理解。
首先介绍数新网络与Spark SQL相关的两个基本的产品。第一个是CyberEngine,也叫Cyber数智引擎。Cyber数智引擎旨在构建一个基于云原生的数据湖底座,能支持用户更好地去分析和挖掘数据,提升自身在市场和商业上的竞争力。Spark自从开源以来很快成为大数据领域的事实标准,CyberEngine支持对Spark SQL的管理。数新网络基于Spark SQL实现了统一的SQL查询平台,即CyberSQL。
另一个Spark有关产品是CyberData。CyberData是一个数据开发平台,基于批流一体、湖仓一体、数智一体,支持公有云、私有云、混合云,并且支持各种大数据文件格式,包括结构化、半结构化和非结构化数据,在此基础上,提供了各种数据治理、数据服务、数据调度和数据开发的能力。
Spark SQL的执行流程经过了解析层、优化层、执行计划层,直到最后成为真正可执行的物理进程(例如JVM进程或Native进程)。执行流程的第一步就是解析层,解析层中有一个Spark SQL Parser,作为最前端的组件。该组件封装了很多子组件,其中很多是基于 ANTLR 实现的。在此基础上,Spark SQL 实现了对 SQL 的解析。
ANTLR4 对SQL语言提供支持,第一步是要定义语法模板。ANTLR4语法模板的以.g4作为文件后缀名。例如上图中,简要截取了Spark SQL自定义的SqlBaseLexer.g4和SqlBaseParser.g4这两个.g4文件。在早期,只存在一个.g4文件,即SqlBaseParser.g4文件,该文件包含了词法分析的定义和语法分析的模板。后来Spark社区为了对其在定义上进行解耦,以及便于易于维护,将其分为两个文件。SqlBaseParser.g4文件,需要引用词法分析里定义的一些变量和函数,包括一些token(例如SELECT关键字、ANTLR4的GC文件中定义的函数和变量名)。
首先介绍词法分析,即左边绿色文件的主要内容。该文件包含一个members作用域,定义了很多在Parser代码中复用的变量和函数,此外文件还包含了一些Token定义(例如SELECT、FROM等关键字),在上图中已省略。其次介绍语法分析,即右边蓝色文件主要内容,内容中options引用了左边文件的词法分析的内容,members作用域定义了一些变量和函数,其中还包含了语法定义(如DQL、DEL、DML、Spark SQL自定义语法等等)。
以上是SqlBaseParser.g4文件的摘要。为便于分享展示,从2000行代码中挑选了最易于理解、最关键的语法定义片段。下图展示的是SQL查询语句的语法定义,这中间还包括了singleStatement、statement、query、queryOrganization等语法块。ANTLR4对其进行编译之后,会生成一些接口和抽象类实现,开发者针对这些接口和抽象类,能轻松实现自定义的操作。
AstBuilder中有一个visitQuery方法,这是与SQL中SELECT查询语法最紧密的接口实现,这样的解决方法会调用其他一些方法,最终返回LogicPlan类型。LogicPlan是Spark内部的逻辑计划表示,其本身是一个树形结构,能够理解为AST的一个具体实现。在visitQuery方法中,有一部分是withQueryResultClauses,这部分就是对查询结果从句的处理。下图展示了其关键代码。能够正常的看到,其中有对ORDER BY、SORT BY、DISTRIBUTE BY、CLUSTER BY、窗口函数、OFFSET、LIMIT等语法的处理。
另外,在visitQuery方法中,还有一部分是withCTE,以下是其关键代码。CTE即公用的表表达式,比如针对一个表做了查询,且这个查询在整个SQL中需重复多次使用。那么在此类场景下,CTE语法就很有用,可以把一个针对表的查询定义成表达式或变量以复用。
再介绍一下SqlBaseParser入口。SqlBaseParser的父类AbstractParser定义了parse函数,可以针对DDL、DML等类型的语法做处理。首先进行词法分析(包括Token的划分),接着进行语法解析,返回解析后的结果,具体而言,子类会返回一个LogicPlan。
最后通过一个图来总结ANTLR4和Spark SQL的融合解析流程。左边是两个.g4文件之间的引用关系,经过ANTLR4编译构建之后,生成一些接口和抽象类。右边图例说明其具体实现是交给Spark SQL完成的。
前文介绍了Spark SQL解析层的原理,接下来将围绕这些原理介绍一个优化案例。
本人在Spark 3.2版本向社区陆续贡献了两个语法PR。最早是在语法定义模板里修改了部分定义实现相应功能,增加了percentile_cont和percentile_disc这两个函数作为聚合函数和窗口函数的一些功能。
观察上图左边的代码能够正常的看到,优化之前这两个函数(可作为聚合函数也可以作为窗口函数)的一些代码逻辑定义,处于表达式的模板定义之下。当时为实现这两个函数的这些功能,做了如下操作。针对ANTLR编译之后生成的visitPercentile接口,在AstBuilder中进行了实现,包括解析层、分析层、优化层。这里只展示了解析层的内容。Spark选择了Scala语言来实现,对代码优雅性、代码质量发展要求非常高,这种实现引入了一些额外代码,总共约20-30行,被社区要求改进。观察到Spark的AstBuilder中,有一个较为通用的functionCall函数,以下展示了其具体代码。
把该代码与visitPercentile代码作对比,不难发现很多重复代码,同样是需要去改进的。因此,考虑复用此代码进行功能实现,对visitFunctionCall代码做修改,针对要修改的两个函数做定制化处理。
优化后,在语法层使得原来增加的6-7行代码变为只需一行代码;而针对visitFunctionCall,只需要变动2行代码。从代码角度来说,相比优化前的近30行代码,总共仅需变动3行代码,这样的优化更为合理且代码也更为优雅。在图片中能够正常的看到整个优化过程,是通过复用代码,让代码更加的精巧和优雅。从这个优化能够准确的看出,一个软件尤其是大型软件的性能优势其实都是日积月累起来的。
本次分享介绍了Spark SQL解析层的原理。最重要的包含Spark SQL与ANTLR4之间的协作关系、ANTLR4生成的接口和抽象类、Spark SQL的具体实现类、Spark SQL的解析入口和解析流程等,最后通过一个优化案例介绍了Spark解析层的优化。Spark SQL经过ANTLR4的语法校验和类型校验后,通过parse方法解析得到抽象语法树,并将其交给分析层处理,那么关于分析层优化将在下一期分享继续介绍,之后也会有更多性能相关优化,欢迎各位继续关注。
Q1:如何计算一个Spark SQL的资源消耗,从而对比不同SQL的性价比?
A:现在 Spark SQL基本上大多数组件都是有度量系统的,Spark 内部的Spark Core,Spark SQL 里都有一个度量子架构,其中可以定义很多监控指标,或者称为度量信息。比如一个读数据的scan 相关的算子,在算子里面就会去计算读了多少字节、读了多少行、读了多少时间等。这些 metrics 也可以由用户自定义,通过这一些 metrics 信息能够直接进行性能比较。另一方面,Spark SQL 目前也是支持基于TPC-DS数据集进行性能压测或性能对比。
A2:评价SQL 的优化效果,主要看优化的目的是什么。如果优化目的是提高稳定性,比如优化之前这个 SQL 常常会出现跑不出来、跑失败,优化后能跑出来就达到了预期的优化效果。还有一种是如果优化目的是希望能跑得更快,那么就用时间来评价,所以评价标准主要根据自己的实际需求。
Q3:Spark向量化技术慢慢的变多,老师怎么看?是不是未来的Spark作业都要使用Native引擎?
A3:Spark 使用Scala,其本质也是用JVM 运行的。对于JVM类语言,其天生最大的一个优势或者说在商业上最成功的一个点,就是语言的平台无关性。熟悉大数据运维的同学肯定有这样的体会,对于一些用 Java 语言开发的大数据组件,包括Spark、Flink、Hadoop、MR 这些组件,它们在任何硬件系统和操作系统上都可以跑。那对公司的商业层面来说,它的运维代价、部署代价、维护代价都很低,而且学习成本也很低,这些实际上也是代表着公司的一个核心竞争力。那现在为什么很多公司会追求向量化?一种原因是现在数据体量慢慢的变大,也就从另一方面代表着任务的执行可能耗时会更多,就可能会引起对于硬件 CPU 的占用更多。对于大公司来说,尤其是本身就有云提供能力的厂商,他们都有自己的运维团队,花费的成本还是相对可控的。但是对于一些中小公司来说,就会使用付费的云服务,价格会更昂贵,那成本就会更高。在追求降本增效的环境下,对于大数据引擎,包括Spark及其他采用Java开发的大数据组件而言,它们在Native层面上的优化尝试日益增多。实际上的意思就是指通过深入挖掘计算机硬件和指令集的性能潜力,来实现成本降低与效率提升的双重目标。我认为还是要围绕公司架构,如果是ToC的业务,那么去做向量化或者native,这种更接近底层硬件的性能优化问题不大;但若公司是ToB 的,大数据产品或技术要输出到不同的公司客户,而不一样的客户又选择了不同的操作系统和不同的 CPU 架构,那这可能就会成为这个公司的噩梦。所以从长远看不管是 Spark 的向量化,还是原生的基于JVM方式的 Spark,两种方式各有优劣,针对不同的目的都会有其存在的价值。
A4:作为一个搞开源的人,我建议可以去看ANTLR4官网,能够理解官网的全部内容,会比任何书都更有价值。
A5:我的理解是, Calcite 相比于 ANTLR4,它的功能会更多,它提供了一些优化规则方面的一些处理。ANTLR4其实只是一个解析层的东西,解析出来的东西如果不去进一步处理就没有一点价值,但 Calcite解析完之后,它还附带了一些比较基础的一些优化规则等。从这个方面来说,如果一个公司想要去开发一门语言,基于Calcite也许会比用ANTLR4的开发周期更短。
A6:目前Spark、Flink 都在向着流批一体的方向发展,其实Spark 的优势是在于批处理,而Flink则是在流的部分。因为Flink面对的业务场景较少,则市场占有率就会比较少,所以通过推出流批一体也可以来扩大市场。Spark社区在面对Flink 在批处理上的挑战时,也会去和Flink 在流的市场上做一些争夺,总之二者在这种商业角逐下,很多方面能相互借鉴。
Q7:在数据量大时Spark易出现超出内存被Kill的情况,Shuffle时同样都有溢写到磁盘的功能,为什么 MR 很少出现问题?
A7:因为 MR 不怎么用内存,如果把MR 的JVM内存设置得比较小,也会出现被 Kill 的问题。至于Spark 容易超出内存被Kill,这样的一个问题是因为有可能与第三方资源管理(比如Yarn、K8S)有关。
Q8:SQL 解析是否提供了一些对外的接口?例如公司需要分析离线任务的血缘关系,是否有一些接口能深入解析过程拿到表名的?
A8:现在 Spark SQL 的这些解析层的组件,就是作为公共 API 方式存在的。Spark 社区为便于用户使用,也在积极维护接口的向前兼容性,这其实是考虑到用户只是去做一层解析的使用场景,所以你可完全这么使用的。
电话: 134-5561-6515
地址: 山东省潍坊市昌乐