主要內(nèi)容本篇主要從FlinkSQL實現(xiàn)的內(nèi)核與原理,工作流等的視角帶大家構(gòu)建一幅FlinkSQL全景圖(以Blink為主介紹),探知背后支撐的“男人們”(組件)。建議收藏,僅此一份。
主要內(nèi)容:
1. Table API 與 SQL
2. Apache Calcite
3. 元數(shù)據(jù)
4. SQL 函數(shù)
5. Flink Planner 與 Blink Planner
6. Blink SQL執(zhí)行過程
7. SQL優(yōu)化器
8. 總結(jié)
Table API 與 Table SQLTable API 和 Table SQL 集成在同一套 API 中。這套 API 的核心概念是Table,用作查詢的輸入和輸出。
Apache Flink 具有兩個關(guān)系型 API - Table API 和 Table SQL - 用于統(tǒng)一的流和批處理。Table API 是 Scala 和 Java 的語言集成查詢 API,它允許用非常直觀的方式從關(guān)系運(yùn)算符(如選擇、過濾和連接)組成查詢。Flink 的 SQL 支持是基于 Apache Calcite,它實現(xiàn)了 SQL 標(biāo)準(zhǔn)。無論輸入是批處理輸入(DataSet)還是流輸入(DataStream),在任一接口中指定的查詢都具有相同的語義,并指定相同的結(jié)果。
Table API 和 SQL 接口與 Flink 的 DataStream 和 DataSet API 緊密集成。你可以很容易地在所有 API 和建立在 API 基礎(chǔ)上的庫之間切換。
Apache CalciteCalcite 是什么
Apache Calcite是一款開源的動態(tài)數(shù)據(jù)管理框架,它提供了標(biāo)準(zhǔn)的 SQL 語言、多種查詢優(yōu)化和連接各種數(shù)據(jù)源的能力,但不包括數(shù)據(jù)存儲、處理數(shù)據(jù)的算法和存儲元數(shù)據(jù)的存儲庫。
Calcite采用的是業(yè)界大數(shù)據(jù)查詢框架的一種通用思路,它的目標(biāo)是“one size fits all(一種方案適應(yīng)所有需求場景)”,希望能為不同計算平臺和數(shù)據(jù)源提供統(tǒng)一的查詢引擎。
Calcite作為一個強(qiáng)大的SQL計算引擎,在Flink內(nèi)部的SQL引擎模塊就是基于Calcite。
Calcite 的特點
支持標(biāo)準(zhǔn)SQL語言;
獨(dú)立于編程語言和數(shù)據(jù)源,可以支持不同的前端和后端;
支持關(guān)系代數(shù)、可定制的邏輯規(guī)則和基于成本模型優(yōu)化的查詢引擎;
支持物化視圖(materialized view)的管理(創(chuàng)建、丟棄、持久化和自動識別);
基于物化視圖的Lattice和Tile機(jī)制,以應(yīng)用于OLAP分析;
支持對流數(shù)據(jù)的查詢。
Calcite 的功能
1. SQL 解析
Calcite 的SQL解析是通過JavaCC實現(xiàn)的,使用JavaCC編寫SQL語法描述文件,將SQL解析成未經(jīng)校驗的AST語法樹。
2. SQL 校驗
無狀態(tài)的校驗:驗證SQL語句是否符合規(guī)范。有狀態(tài)的校驗:通過與元數(shù)據(jù)結(jié)合驗證SQL的Schema,F(xiàn)ield,F(xiàn)unction是否存在,輸入和輸出是否符合。
3. 查詢優(yōu)化
對RelNode和邏輯計劃樹進(jìn)行優(yōu)化,得到優(yōu)化后的生成物理執(zhí)行計劃。
4. SQL 生成器
將物理執(zhí)行計劃生成特定平臺的可執(zhí)行程序,比如Flink,Hive,不同規(guī)則的SQL查詢語句。
5. 執(zhí)行
通過各個執(zhí)行平臺在內(nèi)存中編譯,然后執(zhí)行查詢。
FlinkSQL 結(jié)合 Calcite
一條SQL從提交到Calcite解析,優(yōu)化,到最后的Flink執(zhí)行,一般分以下過程:
1. Sql Parser: 將sql語句通過java cc解析成AST(語法樹),在calcite中用SqlNode表示AST;
2. Sql Validator: 結(jié)合數(shù)字字典(catalog)去驗證sql語法;
3. 生成Logical Plan: 將sqlNode表示的AST轉(zhuǎn)換成LogicalPlan, 用relNode表示;
4. 生成 optimized LogicalPlan: 先基于calcite rules 去優(yōu)化logical Plan,基于flink定制的一些優(yōu)化rules去優(yōu)化logical Plan;
5. 生成Flink PhysicalPlan: 這里也是基于flink里頭的rules將,將optimized LogicalPlan轉(zhuǎn)成成Flink的物理執(zhí)行計劃;
6. 將物理執(zhí)行計劃轉(zhuǎn)成Flink ExecutionPlan: 就是調(diào)用相應(yīng)的tanslateToPlan方法轉(zhuǎn)換和利用CodeGen元編程成Flink的各種算子。
Table API 來提交任務(wù)的話,基本流程和運(yùn)行SQL類似,稍微不同的是:table api parser: flink會把table api表達(dá)的計算邏輯也表示成一顆樹,用treeNode去表式;在這棵樹上的每個節(jié)點的計算邏輯用Expression來表示。
簡單說一下SQL優(yōu)化:RBO(基于規(guī)則)
RBO主要是開發(fā)人員在使用SQL的過程中,有些發(fā)現(xiàn)有些通用的規(guī)則,可以顯著提高SQL執(zhí)行的效率,比如最經(jīng)典的filter下推:
將Filter下推到Join之前執(zhí)行,這樣做的好處是減少了Join的數(shù)量,同時降低了CPU,內(nèi)存,網(wǎng)絡(luò)等方面的開銷,提高效率。
SQL優(yōu)化的發(fā)展,則可以分為兩個階段,即RBO(基于規(guī)則),和CBO(基于代價)
RBO和CBO的區(qū)別大概在于: RBO只為應(yīng)用提供的rule,而CBO會根據(jù)給出的Cost信息,智能應(yīng)用rule,求出一個Cost最低的執(zhí)行計劃。需要糾正很多人誤區(qū)的一點是,CBO其實也是基于rule的,接觸到RBO和CBO這兩個概念的時候,很容易將他們對立起來。但實際上CBO,可以理解為就是加上Cost的RBO。
元數(shù)據(jù)Catalog 提供了元數(shù)據(jù)信息,例如數(shù)據(jù)庫、表、分區(qū)、視圖以及數(shù)據(jù)庫或其他外部系統(tǒng)中存儲的函數(shù)和信息。
數(shù)據(jù)處理最關(guān)鍵的方面之一是管理元數(shù)據(jù)。元數(shù)據(jù)可以是臨時的,例如臨時表、或者通過 TableEnvironment 注冊的 UDF。元數(shù)據(jù)也可以是持久化的,例如 Hive Metastore 中的元數(shù)據(jù)。Catalog 提供了一個統(tǒng)一的API,用于管理元數(shù)據(jù),并使其可以從 Table API 和 SQL 查詢語句中來訪問。
1. 目前支持的類型
(1) GenericInMemoryCatalog
是基于內(nèi)存實現(xiàn)的 Catalog,所有元數(shù)據(jù)只在 session 的生命周期內(nèi)可用。
(2) JdbcCatalog
JdbcCatalog 使得用戶可以將 Flink 通過 JDBC 協(xié)議連接到關(guān)系數(shù)據(jù)庫。PostgresCatalog 是當(dāng)前實現(xiàn)的唯一一種 JDBC Catalog。
(3) HiveCatalog
HiveCatalog 有兩個用途:作為原生 Flink 元數(shù)據(jù)的持久化存儲,以及作為讀寫現(xiàn)有 Hive 元數(shù)據(jù)的接口。
(4) 用戶自定義 Catalog
Catalog 是可擴(kuò)展的,用戶可以通過實現(xiàn) Catalog 接口來開發(fā)自定義 Catalog。想要在 SQL CLI 中使用自定義 Catalog,用戶除了需要實現(xiàn)自定義的 Catalog 之外,還需要為這個 Catalog 實現(xiàn)對應(yīng)的 CatalogFactory 接口。
CatalogFactory 定義了一組屬性,用于 SQL CLI 啟動時配置 Catalog。這組屬性集將傳遞給發(fā)現(xiàn)服務(wù),在該服務(wù)中,服務(wù)會嘗試將屬性關(guān)聯(lián)到 CatalogFactory 并初始化相應(yīng)的 Catalog 實例。
2. 元數(shù)據(jù)分類
catalog定義主要有三種數(shù)據(jù)類型接口,也就是常用到的數(shù)據(jù)庫,表&視圖,函數(shù)。當(dāng)然還有最上層的Catalog容器。
(1) 數(shù)據(jù)庫
等同于數(shù)據(jù)庫中庫的實例,接口定義為CatalogDatabase,定義數(shù)據(jù)庫實例的元數(shù)據(jù),一個數(shù)據(jù)庫實例中包含表,視圖,函數(shù)等多種對象。
(2) 表&視圖
CatalogTable對應(yīng)數(shù)據(jù)庫中的表,CatalogView隊形數(shù)據(jù)庫中的視圖。
表是一種存儲的實體,包換了字段信息,表的分區(qū),屬性,描述信息。其實說白了字段定義和之前印象的數(shù)據(jù)庫很是類似。你可以對比過來。不同的是,拿flink來說,所有的表都是外部數(shù)據(jù)源,除了上面所說的,還需要訪問信息,比如IP端口,mater地址,connector連接類等等。
視圖是一個虛擬概念,本質(zhì)上是一條SQL查詢語句,底層對應(yīng)一張表或者多張表。包含SQL查詢語句,視圖的字段信息,視圖的屬性等等的信息。
(3) 函數(shù)
CatalogFunction是函數(shù)元數(shù)據(jù)的接口。函數(shù)元數(shù)據(jù)包含了所在的類信息和編程語言。
3. 數(shù)據(jù)訪問
Flink的Table API和SQL程序可以連接到其他外部系統(tǒng),用于讀和寫批處理表和流表。source table提供對存儲在外部系統(tǒng)(如數(shù)據(jù)庫、消息隊列或文件系統(tǒng))中的數(shù)據(jù)的訪問。sink table 向外部存儲系統(tǒng)發(fā)送表。根據(jù)source和sink器的類型,它們支持不同的格式,如CSV、Avro、Parquet或ORC。
(1) TableSchema
Table Source 和 Sink需要具備對外數(shù)據(jù)源的描述能力,所以Flink定義了TableSchema對象來定義字段名稱和字段類型,存儲格式等等信息
(2) 時間屬性
支持處理時間和時間時間
(3) Watermark
用來處理亂序的數(shù)據(jù)。
4. Table Source & Table Sink
Flink本地支持各種連接器,可以查看往期總結(jié)
Filesystem
Elasticsearch
Apache Kafka
Amazon Kinesis Data Streams
JDBC
Apache HBase
Apache Hive
幾個主要Table Source與Sink體系
(1) StreamTableSource
流數(shù)據(jù)抽象,區(qū)分了無界數(shù)據(jù)與有界數(shù)據(jù)。
(2) LookupableTableSource
按照J(rèn)oin條件中的字段進(jìn)行關(guān)聯(lián)。
(3) FilterableTableSource
過濾不符合條件的記錄。
(4) LimitableTableSource
限制記錄條數(shù)。
(5) ProjectableTableSource
過濾不會被使用的字段。
(6) AppendStreamTableSink
追加模式的TableSink 支持追加,不支持更新。
(7) RetractStreamTableSink
支持召回模式的TableSink,召回模式其實就是流上的update。
(8) UpsertStreamTableSink
有則更新,無則插入
SQL 函數(shù)臨時函數(shù)和持久化函數(shù)。臨時函數(shù)始終由用戶創(chuàng)建,它容易改變并且僅在會話的生命周期內(nèi)有效。持久化函數(shù)不是由系統(tǒng)提供,就是存儲在 Catalog 中,它在會話的整個生命周期內(nèi)都有效。
內(nèi)置函數(shù)
Table API和SQL為用戶提供了一組用于數(shù)據(jù)轉(zhuǎn)換的內(nèi)置函數(shù)。如果您需要的函數(shù)還不受支持,您可以實現(xiàn)用戶定義的函數(shù)
(1) Comparison Functions(比較型函數(shù))
eg:value1 = value2
(2) Logical Functions(邏輯函數(shù))
eg: boolean1 OR boolean2
(3) Arithmetic Functions(算術(shù)函數(shù))
eg: numeric1 + numeric2
(4) String Functions(字符串函數(shù))
UPPER(string)
(5) Temporal Functions(時間函數(shù))
YEAR(date)
(6) Conditional Functions(有條件的函數(shù))
IF(condition, true_value, false_value)
(7) Type Conversion Functions(類型轉(zhuǎn)換函數(shù))
CAST(value AS type)
(8) Collection Functions(集合函數(shù))
array ‘[’ INT ‘]’
(9) Value Construction Functions , Value Access Functions,Grouping Functions,Hash Functions,Auxiliary Functions,Aggregate Functions,Column Functions (不一一列舉)
自定義函數(shù)
(1) 標(biāo)量函數(shù)(UDF)
標(biāo)量函數(shù) 將標(biāo)量值轉(zhuǎn)換成一個新標(biāo)量值,也就是對一行數(shù)據(jù)中的一個或者多個字段返回一個單值。
(2) 聚合函數(shù)(UDAGG)
自定義聚合函數(shù)(UDAGG)是把一個表(一行或者多行,每行可以有一列或者多列)聚合成一個標(biāo)量值。
(3) 表值函數(shù)(UDTF)
表值函數(shù) 將標(biāo)量值轉(zhuǎn)換成新的行數(shù)據(jù)。可以接收一個或者多個字段作為參數(shù),輸出多行列數(shù)據(jù)。
(4) 表值聚合函數(shù)(UDTAGG)
自定義表值聚合函數(shù)(UDTAGG)可以把一個表(一行或者多行,每行有一列或者多列)聚合成另一張表,結(jié)果中可以有多行多列。
(5) 異步表值函數(shù)
異步表值函數(shù) 是異步查詢外部數(shù)據(jù)系統(tǒng)的特殊函數(shù)。
Planner 與 Blink PlannerFlink Table/SQL體系中的Planner(即查詢處理器)是溝通Flink與Calcite的橋梁,為Table/SQL API提供完整的解析、優(yōu)化和執(zhí)行環(huán)境。
Flink Table 的新架構(gòu)實現(xiàn)了查詢處理器的插件化,項目完整保留原有 Flink Planner (Old Planner),同時又引入了新的 Blink Planner,用戶可以自行選擇使用 Old Planner 還是 Blink Planner。
主要區(qū)別:
Blink做到了真正的流批統(tǒng)一,即將批看做是特殊的流,把處理批的API和處理流的API做成了一樣的。也就是說不管是批數(shù)據(jù)還是流數(shù)據(jù),底層統(tǒng)統(tǒng)都是DataStream。所以使用Blink作為table planner的程序,Table和DataSet是不能相互轉(zhuǎn)換的。
Blink planner是不支持BatchTableSource的,它只支持StreamTableSource。
Blink Planner和Old Planner的FilterableTableSource是不兼容的。Old - Planner會下推PlannerExpression到FilterableTableSource。而Blink planner下推的是Expression。
基于String的鍵值對配置項只能用于Blink Planner
Blink Planner會優(yōu)化多個sink到同一個TableEnvironment和StreamTableEnvironment。而Old Planner會為不同的sink優(yōu)化到自己的DAG中,也就是說有幾個sink就有幾個DAG。
Old Planner 不支持 catalog統(tǒng)計,Blink支持。
Old Planner 不支持版本表(versioned Table)。版本表類似HBASE中版本表的意思,每個key可以記住過去的幾個值。
Blink SQL執(zhí)行過程
SQL執(zhí)行過程分三個階段
(1) 從SQL到 Operation
(2) 從Operation 到 Transformation
(3) 環(huán)境的執(zhí)行階段
從SQL到 Operation
(1) 解析SQL轉(zhuǎn)換為QueryOperation;
(2) SQL解析為SqlNode;
(3) 校驗SqlNode;
(4) 調(diào)用Calcite SQLToRelConvertrt將SqlNode轉(zhuǎn)化為RelNode邏輯樹;
(5) RelNode轉(zhuǎn)化為Operation。
Operation 到 Transformation
(1) DQL(數(shù)據(jù)查詢語言)轉(zhuǎn)換,在flink中作為中間運(yùn)算;
(2) DML(數(shù)據(jù)操作語言),DQL轉(zhuǎn)換。
整個轉(zhuǎn)換從Operation開始,先轉(zhuǎn)換為Calcite的邏輯計劃樹,再轉(zhuǎn)化為Flink的邏輯計劃樹,然后進(jìn)行優(yōu)化。優(yōu)化后的邏輯樹轉(zhuǎn)換為Flink的物理執(zhí)行,物理執(zhí)行生成一系列的算子,udf等等,包裝到Transformation中。
環(huán)境的執(zhí)行階段
有了Transformation后正式進(jìn)入到StreamGraph的過程中,最終交給Flink集群去運(yùn)行。
SQL優(yōu)化器查詢優(yōu)化器
再次提到兩個優(yōu)化器:RBO(基于規(guī)則的優(yōu)化器) 和 CBO(基于代價的優(yōu)化器)
(1) RBO(基于規(guī)則的優(yōu)化器)會將原有表達(dá)式裁剪掉,遍歷一系列規(guī)則(Rule),只要滿足條件就轉(zhuǎn)換,生成最終的執(zhí)行計劃。一些常見的規(guī)則包括分區(qū)裁剪(Partition Prune)、列裁剪、謂詞下推(Predicate Pushdown)、投影下推(Projection Pushdown)、聚合下推、limit下推、sort下推、常量折疊(Constant Folding)、子查詢內(nèi)聯(lián)轉(zhuǎn)join等。
(2) CBO(基于代價的優(yōu)化器)會將原有表達(dá)式保留,基于統(tǒng)計信息和代價模型,嘗試探索生成等價關(guān)系表達(dá)式,最終取代價最小的執(zhí)行計劃。CBO的實現(xiàn)有兩種模型,Volcano模型,Cascades模型。這兩種模型思想很是相似,不同點在于Cascades模型一邊遍歷SQL邏輯樹,一邊優(yōu)化,從而進(jìn)一步裁剪掉一些執(zhí)行計劃。
目前各大數(shù)據(jù)庫和計算引擎傾向于CBO。
總結(jié)在目前情況下,在阿里對Flink社區(qū)的貢獻(xiàn)下,F(xiàn)link包含了Flink SQL 和 Blink SQL體系,F(xiàn)link Planner稱之為 Old Planner,Blink Planner稱之為 New Planner。從中可以發(fā)現(xiàn) Blink Planner是未來,F(xiàn)link Planner將會被淘汰。
FlinkSQL依靠 Calcite提供了一套SQL驗證,解析,優(yōu)化等等操作。同時FlinkSQL提供元數(shù)據(jù)管理,SQL函數(shù),數(shù)據(jù)源的建設(shè)。也自由化地提供了自定義函數(shù),自定義connector連接,豐富了場景的使用。
FlinkSQL你值得擁有!!!
編輯:jq
-
處理器
+關(guān)注
關(guān)注
68文章
19404瀏覽量
230813 -
數(shù)據(jù)管理
+關(guān)注
關(guān)注
1文章
300瀏覽量
19653 -
SQL
+關(guān)注
關(guān)注
1文章
773瀏覽量
44219 -
UDF
+關(guān)注
關(guān)注
0文章
4瀏覽量
6481
原文標(biāo)題:干貨:詳解 FlinkSQL 實現(xiàn)原理
文章出處:【微信號:DBDevs,微信公眾號:數(shù)據(jù)分析與開發(fā)】歡迎添加關(guān)注!文章轉(zhuǎn)載請注明出處。
發(fā)布評論請先 登錄
相關(guān)推薦
評論