簡介
背景
Hash join是解決復(fù)雜join的一個重要手段,但其無法避免拉取左右兩端的數(shù)據(jù)到計算層進行計算,導(dǎo)致某些場景下執(zhí)行效率不高。作為一種補充,bka join則可以利用OLTP數(shù)據(jù)庫中的索引,通過join構(gòu)造inner表的predicate命中表索引,在某些場景下有比較好的join效率。PolarDB-X是面向HTAP設(shè)計的分布式數(shù)據(jù)庫,在復(fù)雜查詢時也會重點考慮利用數(shù)據(jù)庫的索引信息來提升join的查詢效率,因此有了本文的semi bka join。
要解決的問題
在這篇源碼解讀中,我們要解決的是類似這樣一條SQL的執(zhí)行效率問題,select * from t1 where c1 in (select c1 from t2);其中t2為一張大表,t2表的c2列上有索引,t1為一張小表,且匹配后數(shù)據(jù)量很小。
Q:這個問題重要么?
A:其實還挺重要的,這代表了一類經(jīng)典的TP型SQL,客戶用的也比較多。
經(jīng)典執(zhí)行模式的不足
比如hash join,sort merge join,nested loop join,都需要把兩張表的數(shù)據(jù)都拉取到計算節(jié)點,而t2是一張大表,導(dǎo)致執(zhí)行效率不高。
解決方案
避免拉取t2這張大表的數(shù)據(jù),引入semi bka join的執(zhí)行模式,后面會詳細展開。
講述方式
為了避免大家在閱讀源碼時迷失在大量的細節(jié)之中,我們會用一種重構(gòu)的形式來逐步構(gòu)建semi bka join。同時,有些內(nèi)容對于理解該部分設(shè)計,可能并沒有太大的作用,比如代碼中對于新分區(qū)表與老分區(qū)表的if eles判斷,因此在重構(gòu)中刪除了該部分;最后,我們對于一些邏輯進行了重構(gòu),以便能夠更加清晰的進行講述。 綜上,感興趣的朋友可以參考我在https://github.com/wcf2333/galaxysql.git的代碼提交記錄,結(jié)合本篇文章閱讀,體驗會更好。
在分支refactor_semi_join分支上,移除了對于全局二級索引和新分區(qū)表的支持,精簡了執(zhí)行器的一些處理;
在wcf2333_build_semi_bka_join分支上,基于refactor_semi_join分支,首先移除了semi bka join的實現(xiàn),然后逐步進行了豐富和實現(xiàn),步驟可見提交記錄,如下。
remove semi bka join and materialized semi joinnnadd the simplest optimizer rule for semi bka joinnnadd the simplest executor for semi bka joinnnsupport multi column in lookup predicatennenhance optimizernnsupport stream lookup joinnnsupport dynamic pruningnnsupport single sharding key when table rule is not simplennsupport lookup predicate with multi-column when pruning
注:本文中bka(batch key access)和lookup表達相同的含義
目標
在寫這篇文章時,是期望大家可以在學(xué)到一些東西之后,能夠在一個暫時沒有實現(xiàn)該功能的系統(tǒng)上真正實現(xiàn)一個semi bka join。它足夠魯棒,包含大量的細節(jié),理想情況下足以在生產(chǎn)環(huán)境中使用。這也是為什么會花挺大力氣采用重構(gòu)的方式來寫這篇文章的原因。 顯然,一篇文章顯然是不足以實現(xiàn)上述目標的,所以不出意外的話陸陸續(xù)續(xù)還會有幾篇,我們期望回答所有重要的細節(jié)。因此,如果大家有什么問題的話,歡迎提問,無論是留言還是直接聯(lián)系我,我們會在后續(xù)的文章中把大家的問題都囊括其中。 提醒:我們會在本篇中介紹一些細節(jié),有些細節(jié)不進行debug可能不太容易理解,所以感興趣的朋友可以搭建debug環(huán)境邊閱讀邊調(diào)試。
前置知識
在理解semi bka join的詳細設(shè)計和實現(xiàn)時,不可避免的要對其相關(guān)組件進行介紹,從中我們挑了兩個重要的來展開介紹一下,hash join的核心設(shè)計與如何接入異步執(zhí)行框架。
HashJoin的核心設(shè)計
我們之所以要講解hash join的核心設(shè)計,是因為bka join與hash join有很多非常像的地方。理解了hash join,有助于我們更好的理解bka join。同時,這樣的對比學(xué)習(xí),可能會讓大家有更多的收獲。
hash join執(zhí)行流程
hash join的左邊是Outer端(探測hash table的一端),右邊是inner端(構(gòu)建hash table的一端),執(zhí)行流程如下所示。

流程中的幾個核心問題
1.保存在hash table中的是什么?
public class ConcurrentRawHashTable implements Hash {nn public static final int NOT_EXISTS = -1;nn n private final AtomicIntegerArray keys;nn n private final int mask;nn n private final int n;n}
該行在緩存數(shù)據(jù)中的位置(keys中保存的int值),而非join key的值或者記錄本身(所以為了輸出完整數(shù)據(jù),我們需要緩存build端的數(shù)據(jù),即ChunksIndex buildChunks)
2.如何解決哈希碰撞?
拉鏈的形式來解決哈希碰撞(所以你會看到positionlinks和hashTable總是如影隨形)。根據(jù)保存在哈希表中的位置,獲取build端的相應(yīng)行,進而檢測記錄是否真正匹配。
ConcurrentRawHashTable hashTable;nnint[] positionlinks;nint matchedPosition = hashTable.get(hashCode);nwhile (matchedPosition != LIST_END) {n if (buildKeyChunks.equals(matchedPosition, keyChunk, position)) {n break;n }n matchedPosition = positionlinks[matchedPosition];n}
3.如何匹配結(jié)果并輸出?
build端構(gòu)建hash table之后,probe端開始流式的探測并輸出,可見AbstractBufferedJoinExec的doNextChunk和nextRow方法,整體流程如下:
遍歷probe端的結(jié)果,拿到probe端join key的hash code,首先去hash table中找到對應(yīng)的位置(matchInit方法)
判斷position是否有效(matchValid方法),并檢測join條件是否匹配(checkJoinCondition)
若匹配,則輸出相應(yīng)結(jié)果(buildJoinRow方法)
隨后一直從鏈表中獲取同一hash code對應(yīng)的build chunk中的位置(matchNext方法),跳轉(zhuǎn)至步驟2直至拿到的位置不合法
注:在wcf2333_build_semi_bka_join分支上對AbstractBufferedJoinExec進行了部分重構(gòu),邏輯可能會更清晰一些,有興趣的朋友可以參考。
matchedPosition = matchInit(probeJoinKeyChunk, probeKeyHashCode, probePosition);nfor (; matchValid(matchedPosition);n matchedPosition = matchNext(matchedPosition, probeJoinKeyChunk, probePosition)) n{n if (!checkJoinCondition(buildChunks, probeChunk, probePosition, matchedPosition)) {n continue;n }n buildJoinRow();n}
細節(jié)的強化:我們現(xiàn)在對上述流程中的步驟三(輸出結(jié)果)進行審視,原因是join的類型并非只有inner,還有outer join,semi join與anti join,其各自的特點分別如下:
outer join:如果進行probe的這行無法匹配右表,則右表中相應(yīng)字段填充為null
semi join:只輸出左表內(nèi)容,且一旦匹配,則無需繼續(xù)探測鏈表中余下的記錄是否能夠繼續(xù)匹配
anti join:同semi join,但需要注意的是condition中不包含anti的語義,因此一旦condition匹配,則anti join一定不匹配,但是condition不匹配時,anti join未必匹配,此時需要進一步判斷condition的結(jié)果是否為null。例如where a not in (select b from t2),則condition為a = b,null = 3的結(jié)果為null,這會導(dǎo)致condition不匹配,但是卻不應(yīng)該輸出。此外,還需要注意not exists與not in轉(zhuǎn)換而來的anti join的區(qū)別,整體來講anti join的這部分并不太容易理解,非必要可暫時不對該部分進行理解,下次文章期望能夠?qū)υ摬糠诌M行重構(gòu)。
4. 對于semi和anti時的一點優(yōu)化,即doSpecialCheckForSemiJoin方法 如果構(gòu)建哈希表的結(jié)果為空,則semi join的輸出結(jié)果為空,anti join的輸出結(jié)果為probe端。 如果anti join中為一列且build chunk中該列含有null值時,則anti join的輸出結(jié)果為空。
擴展:如果需要支持多列時anti join的pass nothing優(yōu)化,可以參考如下進行擴展,即檢查該行中參與構(gòu)建哈希表的所有字段是否均為null。
protected static boolean checkNullRecord(Chunk chunk) {n int blockCount = chunk.getBlockCount();n int position = chunk.getBlock(0).getPositionCount();n boolean hasNullRecord = IntStream.range(0, position).anyMatch(n pos -> IntStream.range(0, blockCount).allMatch(t -> chunk.getBlock(t).isNull(pos)));n return hasNullRecord;n}
對于null safe equal的處理? null safe equal:用<=>表示,用于判斷join中null = null是否成立。 null safe equal時將build端的null值放入哈希表中,否則構(gòu)建哈希表時過濾掉null值,如下:
boolean[] nullPos = getNullPos(keyChunk, ignoreNullBlocks);nfor (int offset = 0; offset < keyChunk.getPositionCount(); offset++, position++) {n if (!nullPos[offset]) {n int next = hashTable.put(position, hashes[offset]);n positionlinks[position] = next;n }n}
異步框架下算子狀態(tài)的切換
大致思路:通常情況下,如果我們能夠拿到數(shù)據(jù)進行處理,則該算子一定不會結(jié)束,狀態(tài)為非阻塞狀態(tài);反之,如果拿不到數(shù)據(jù),則應(yīng)進一步判斷為block狀態(tài)還是finish狀態(tài)(當(dāng)然,實際上我們會有Producer和Consumer,將來可以寫一篇文章來聊聊這其中的幾個接口與狀態(tài)切換,非本文重點,按下不表)
Semi BKA join的設(shè)計與實現(xiàn)
好了,現(xiàn)在我們要正式開始一步步的構(gòu)建semi bka join了。
添加整個框架
我們首先實現(xiàn)semi bka join優(yōu)化器部分,其次是執(zhí)行器部分,即將LogicalSemiJoin轉(zhuǎn)換為SemiBkaJoin,涉及的commit如下。
add the simplest optimizer rule for semi bka joinnnadd the simplest executor for semi bka joinnnsupport multi column in lookup predicatennenhance optimizer
優(yōu)化器部分
1.我們需要一個新的RelNode類型,SemiBkaJoin,派生自LogicalSemiJoin。
2.我們需要一個LogicalSemiJointoSemiBkaJoin的規(guī)則,以便將下述的RelNode樹進行轉(zhuǎn)換。

tips:在一條規(guī)則中放一個開關(guān),總不是一件太壞的事情,這樣如果該條規(guī)則有問題,可以更快的禁掉。
3.在RuleToUse中添加該規(guī)則,使得優(yōu)化器的RBO階段可以使用該規(guī)則。
執(zhí)行器部分
我們先來對比一下hash join和lookup join的執(zhí)行流程。
hash join

lookup join

通過對上圖的執(zhí)行過程進行分析,我們可以將bka join的執(zhí)行過程分為如下幾個階段。原來的代碼中這幾個階段不是非常清晰,我在之前提到的代碼分支上對這部分代碼進行了重構(gòu),感興趣的同學(xué)可以參考。
protected enum LookupJoinStatus {n CONSUMING_OUTER,n INIT_INNER_LOOKUP,n CACHE_INNER_RESULT,n BUILD_HASH_TABLE,n PROBE_AND_OUTPUTn}
接下來我們對這幾個關(guān)鍵狀態(tài)進行一些解釋
CONSUMING_OUTER:拉取outer端數(shù)據(jù)并進行緩存
INIT_INNER_LOOKUP:根據(jù)outer端數(shù)據(jù)構(gòu)建lookup predicate并下發(fā)物理SQL,拉取過濾后的數(shù)據(jù)
CACHE_INNER_RESULT:不停的獲取inner端數(shù)據(jù)并緩存
BUILD_HASH_TABLE:根據(jù)緩存的inner端數(shù)據(jù)構(gòu)建哈希表
PROBE_AND_OUTPUT:根據(jù)緩存的outer端數(shù)據(jù),探測并輸出
Q:現(xiàn)在我們來想一下如何接入異步框架,也就是算子何時會處于阻塞,非阻塞以及最終的結(jié)束狀態(tài)。
A:首先,只有在PROBE_AND_OUTPUT階段或者outer端無數(shù)據(jù)時,該算子才可能是結(jié)束狀態(tài);其次,只有CONSUMING_OUTER和CACHE_INNER_RESULT可能會出現(xiàn)block狀態(tài),block狀態(tài)主要是為了避免獲取DN數(shù)據(jù)時阻塞,整體流程圖如下。

Q:lookup join時下發(fā)的物理SQL只有在執(zhí)行時才真正確定,這與通常情況很不同,因為通常下發(fā)的物理SQL在拿到執(zhí)行計劃時就已經(jīng)確定了,如何解決?
A:第一步,生成物理SQL時,檢查是否帶有l(wèi)ookup的標,如果有,則在where條件中添加一個'bka_magic' = 'bka_magic'的占位符;第二步,執(zhí)行時用lookup condition對占位符'bka_magic' = 'bka_magic'進行替換。
一個思考:有興趣的朋友可以看一下,LogicalView中的isMGetEnabled用于控制執(zhí)行時生成lookup所需的物理SQL,expandView用于控制lookup執(zhí)行時對于DN數(shù)據(jù)的掃描,是否真的有必要使用兩個開關(guān)?
支持多列in查詢
例如,select * from t1 where (c1,c2) in (select (c1,c3) from t2); 可以參考LookupConditionBuilder.buildMultiCondition方法,比較簡單,不再贅述。
優(yōu)化器增強:限制使用Semi BKA join的場景
在這里我們需要考慮的一個核心問題是,哪些場景不能使用Semi BKA join?
比如select * from t1 where c1 in (select md5(c2) from t2),調(diào)用相應(yīng)的接口我們會發(fā)現(xiàn)md5(c2)并非是t2中的列,因此這種情況我們不再使用BKA join。
腦洞一下:這里面其實有個可以思考的問題,下發(fā)select xx_func(c2) from t2 where xx_func(c2) in (...)這樣的SQL,在某些情況下應(yīng)該也是會有比較好的性能的,比如存儲節(jié)點上具有函數(shù)索引。但這需要對優(yōu)化器和執(zhí)行器進行更為精細的設(shè)計,同時還需要考慮收益和可維護性的問題,但不管怎么樣,我覺得這是個有意思的問題。
優(yōu)化Semi BKA join
其實至此,我們已經(jīng)拿到了一個可以用的Semi BKA join了,但它的性能在某些場景下會有一定的優(yōu)化空間,因此接下來我們對其進行一些優(yōu)化,涉及如下commit。
support stream lookup join
support dynamic pruning
support single sharding key when table rule is not simple
support lookup predicate with multi-column when pruning
增強執(zhí)行器:增加分批處理的能力
如上所述,我們需要先拉取outer端的所有數(shù)據(jù),然后構(gòu)造predicate獲取inner端數(shù)據(jù),獲取完所有的inner端數(shù)據(jù)并使用其構(gòu)建了哈希表之后,才可以使用緩存的outer端數(shù)據(jù)進行探測并向上層流式的吐數(shù)據(jù)。
Q:這會帶來一定的問題,比如,這使得相比于hash join,吐數(shù)據(jù)的時間被延后了;再比如,這樣構(gòu)造出來的predicate中的in值一般會非常多,存儲節(jié)點會更傾向于全表掃描而非索引掃描。
A:既然我們覺得outer端完全阻塞住了不太好,那就讓他流式起來好了,即outer端的數(shù)據(jù)量一旦超過一個閾值,我們就先拿這部分的數(shù)據(jù)走一個完整的join流程并對外輸出結(jié)果。 于是我們涉及到如下這樣一個略微復(fù)雜一點的流程:

思考一下:相比于把batchSize做成BufferQueue的字段,調(diào)用BufferQueue.pop()拉取batchSize行數(shù)據(jù),我們還有一種方式,BufferQueue中不包含batchSize字段,使用BufferQueue.pop(batchSize)的方式拉取batchSize行數(shù)據(jù)。為什么要提這個呢,因為這樣可以讓BufferQueue更加純粹,且為執(zhí)行時batchSize的動態(tài)調(diào)整留下了空間,雖然這種自適應(yīng)的調(diào)整不一定很靠譜:((這是另一個有意思的問題了)。
支持動態(tài)裁剪
上述方案中,執(zhí)行時predicate中in列表中的值會下發(fā)到每一個分片,比如outer端查出來的數(shù)據(jù)為1,2,拼成的物理SQL為select c1 from t2_physical_table_name where c1 in (1,2),且需要下發(fā)到所有分片。
我們進一步假設(shè)t2有八個分片,標號為t2_{00-08},c1為分片鍵,c1值為1的記錄只可能出現(xiàn)在t2_01分片,c1值為2的記錄只可能出現(xiàn)在t2_02分片,則理想情況下我們只需要下發(fā)兩條物理SQL。
即一條至t2_01分片的select c1 from t2_01 where c1 in (1),一條至t2_02分片的select c1 from t2_02 where c1 in (2)。也就是我們可以做分片裁剪和物理SQL中in值的裁剪。
篇幅有限,我們決定把對這部分設(shè)計的詳細介紹放到下次,有興趣的朋友可以先自行結(jié)合以下三個commit和給出的SQL例子進行對照理解。
support dynamic pruningnnsupport single sharding key when table rule is not simplennsupport lookup predicate with multi-column when pruningn select * from t1 where c1 in (select c2 from t2);n select * from t1 where c1 in (select c2 from t3);n select * from t1 where (c1,c3) in (select c2,c3 from t3);nCREATE TABLE `t1` (n `c1` int(11) DEFAULT NULL,n `c2` int(11) DEFAULT NULL,n `c3` int(11) DEFAULT NULL,n KEY `auto_shard_key_c1` USING BTREE (`c1`),n KEY `auto_shard_key_c2` USING BTREE (`c2`)n) ENGINE = InnoDB dbpartition by hash(`c1`) tbpartition by hash(`c2`) tbpartitions 2;nnCREATE TABLE `t2` (n `c1` int(11) DEFAULT NULL,n `c2` int(11) DEFAULT NULL,n `c3` int(11) DEFAULT NULL,n KEY `auto_shard_key_c2` USING BTREE (`c2`)n) ENGINE = InnoDB dbpartition by hash(`c2`) tbpartition by hash(`c2`) tbpartitions 2;nnCREATE TABLE `t3` (n `c1` int(11) DEFAULT NULL,n `c2` int(11) DEFAULT NULL,n `c3` int(11) DEFAULT NULL,n KEY `auto_shard_key_c2` USING BTREE (`c2`),n KEY `auto_shard_key_c3` USING BTREE (`c3`)n) ENGINE = InnoDB dbpartition by hash(`c2`) tbpartition by hash(`c3`) tbpartitions 2;nninsert into t1 values (1,1,1), (2,2,2), (null, null, null);nninsert into t2 values (1,1,1), (1,1,1), (null, null, null);nninsert into t3 values (1,1,1), (null, null, null);
如何讓CBO選擇BKA join?
Q:首先,為什么相比于hash join,BKA join會更快,以及在哪些場景下會快?
A:相比于hash join,bka join在某些情況下可以避免拉取大量的數(shù)據(jù),本質(zhì)上在于hash join無法避免拉取兩張表的數(shù)據(jù),唯一能決定的是使用小表構(gòu)建哈希表,大表流式探測;而BKA join可以通過構(gòu)造predicate的形式,只拉取小表的全量數(shù)據(jù),同時只拉取大表中匹配的數(shù)據(jù)。但是分批之后,bka join會導(dǎo)致網(wǎng)絡(luò)交互次數(shù)增多,同時需要評估下發(fā)的物理SQL在存儲節(jié)點上的執(zhí)行效率。
Q:其次,semi bka join相比于semi hash join,為什么會快?
A:看起來這個問題和上面是相同的,其實有一些差異。因為semi join時輸出的一定是左端,在現(xiàn)有的實現(xiàn)下,semi hash join時一定會使用右端構(gòu)建哈希表,當(dāng)右端數(shù)據(jù)量大時,代價會很大。
思考:Semi hash join如何實現(xiàn)小表構(gòu)建哈希表,而非永遠使用子查詢中的表構(gòu)建哈希表? 在我們現(xiàn)在的執(zhí)行器模式之下,這是無法實現(xiàn)的,大家有興趣的可以debug并思考一下原因,以及如何支持這種執(zhí)行模式。
如何測試
測試還是一個蠻重要的東西,我覺得可能至少應(yīng)該包括如下場景:
- 執(zhí)行模式:tp_local, ap_local, mpp, cursor下推
- 左/右表數(shù)據(jù):空表,三行普通值,三行null值,三行普通值+三行null值,隨機數(shù)據(jù)(包含隨機比例的重復(fù)值+null值)
- 列的個數(shù):單列(sharding列),單列(非sharding列),兩列(全部為sharding列),兩列(一列sharding列+一列非sharding列)
- 子查詢的類型,in, exists, some, any, all, not in, not exists, scalar_query
- 子查詢中列的對齊情況,對齊,不對齊,主要測試涉及到下推場景時是否正確
- 子查詢中列是否嚴格非null,測試優(yōu)化器對于一些場景的處理是否符合預(yù)期
- 子查詢中的條件,某些條件下可為空,只有join key等值,join key等值+join key非等值,join key等值+普通condition
- 子查詢中涉及列的類型,全類型測試 9. 保護性case:比如> all轉(zhuǎn)成的anti join絕不允許使用物化semi join等
總結(jié)
在本文中,我們主要介紹了hash join的執(zhí)行流程,并從近乎零開始構(gòu)建了semi bka join的執(zhí)行模式。同時,我們在文中提到了很多問題,有興趣的朋友可以進行思考和交流。當(dāng)然,如文中所述,我們還遺留了一些內(nèi)容,關(guān)于這部分內(nèi)容,我們會在下次文章中填坑,同時會結(jié)合更多場景優(yōu)化更多細節(jié)。
作者:越寒
原文鏈接
本文為阿里云原創(chuàng)內(nèi)容,未經(jīng)允許不得轉(zhuǎn)載。