概述
由于要在Flink SQL上二次开发,所以得研究下Flink SQL的SQL编译模块。
SQL解析主流程、Flink对Calcite进行的扩展(例如FlinkSqlParserImpl类就是一个Calcite扩展)。
主要有两个类 Parser和Planner,TableEnvironment的初始化会这两个类的工厂。
Flink SQL解析器的创建
ParserImpl类
作用:Flink抽象出的解析器,主要方法是parse()。
Calcite解析成SqlNode
SqlToOperationConverter用SqlNode和Planner转换算子。
FlinkSqlParserImpl
SqlParser是Flink自定义的config参数传进来。
SqlParser parser = SqlParser.create(sql, config);return parser.parseStmt();
巧妙的运用Supplier,这个其实是工厂方法,每次调用parse时重新创建parser和planner。这么设计的目的是catalog在每次调用时会变化,所以才动态创建对象。
@Overridepublic List<Operation> parse(String statement) {CalciteParser parser = calciteParserSupplier.get();FlinkPlannerImpl planner = validatorSupplier.get();// parse the sql querySqlNode parsed = parser.parse(statement);Operation operation = SqlToOperationConverter.convert(planner, catalogManager, parsed).orElseThrow(() -> new TableException("Unsupported query: " + statement));return Collections.singletonList(operation);}
自定义的Calcite解析逻辑
在这个目录下:flink/flink-table/flink-sql-parser
参考资料
https://matt33.com/2019/03/07/apache-calcite-process-flow/
https://github.com/quxiucheng/apache-calcite-tutorial
https://miaowenting.site/2019/11/10/Flink-SQL-with-Calcite/
