Citus 分布式 PostgreSQL 集群 - SQL Reference(查詢分布式表 SQL)
如前幾節所述,Citus 是一個擴展,它擴展了最新的 PostgreSQL 以進行分布式執行。這意味著您可以在 Citus 協調器上使用標準 PostgreSQL SELECT 查詢進行查詢。 Citus 將并行化涉及復雜選擇、分組和排序以及 JOIN 的 SELECT 查詢,以加快查詢性能。在高層次上,Citus 將 SELECT 查詢劃分為更小的查詢片段,將這些查詢片段分配給 worker,監督他們的執行,合并他們的結果(如果需要,對它們進行排序),并將最終結果返回給用戶。
- SELECT
http://www.postgresql.org/docs/current/static/sql-select.html
在以下部分中,我們將討論您可以使用 Citus 運行的不同類型的查詢。
聚合函數
Citus 支持和并行化 PostgreSQL 支持的大多數聚合函數,包括自定義用戶定義的聚合。聚合使用以下三種方法之一執行,優先順序如下:
- 當聚合按表的分布列分組時,Citus 可以將整個查詢的執行下推到每個 worker。在這種情況下支持所有聚合,并在 worker 上并行執行。(任何正在使用的自定義聚合都必須安裝在 worker 身上。)
- 當聚合沒有按表的分布列分組時,Citus 仍然可以根據具體情況進行優化。Citus 對 sum()、avg() 和 count(distinct) 等某些聚合有內部規則,允許它重寫查詢以對 worker 進行部分聚合。例如,為了計算平均值,Citus 從每個 worker 那里獲得一個總和和一個計數,然后 coordinator 節點計算最終的平均值。特殊情況聚合的完整列表:
avg, min, max, sum, count, array_agg, jsonb_agg, jsonb_object_agg, json_agg, json_object_agg, bit_and, bit_or, bool_and, bool_or, every, hll_add_agg, hll_union_agg, topn_add_agg, topn_union_agg, any_value, var_pop(float4), var_pop(float8), var_samp(float4), var_samp(float8), variance(float4), variance(float8) stddev_pop(float4), stddev_pop(float8), stddev_samp(float4), stddev_samp(float8) stddev(float4), stddev(float8) tdigest(double precision, int), tdigest_percentile(double precision, int, double precision), tdigest_percentile(double precision, int, double precision[]), tdigest_percentile(tdigest, double precision), tdigest_percentile(tdigest, double precision[]), tdigest_percentile_of(double precision, int, double precision), tdigest_percentile_of(double precision, int, double precision[]), tdigest_percentile_of(tdigest, double precision), tdigest_percentile_of(tdigest, double precision[])
- 最后的手段:從 worker 中提取所有行并在 coordinator 節點上執行聚合。如果聚合未在分布列上分組,并且不是預定義的特殊情況之一,則 Citus 會退回到這種方法。它會導致網絡開銷,并且如果要聚合的數據集太大,可能會耗盡 coordinator 的資源。(可以禁用此回退,見下文。)
請注意,查詢中的微小更改可能會改變執行模式,從而導致潛在的令人驚訝的低效率。例如,按非分布列分組的 sum(x) 可以使用分布式執行,而 sum(distinct x) 必須將整個輸入記錄集拉到 coordinator。
SELECT sum(value1), sum(distinct value2) FROM distributed_table;
為避免意外將數據拉到 coordinator,可以設置一個 GUC:
SET citus.coordinator_aggregation_strategy TO 'disabled';
請注意,禁用 coordinator 聚合策略將完全阻止 “類型三”(最后的手段) 聚合查詢工作。
Count (Distinct) 聚合
Citus 以多種方式支持 count(distinct) 聚合。如果 count(distinct) 聚合在分布列上,Citus 可以直接將查詢下推給 worker。如果不是,Citus 對每個 worker 運行 select distinct 語句, 并將列表返回給 coordinator,從中獲取最終計數。
請注意,當 worker 擁有更多 distinct 項時,傳輸此數據會變得更慢。對于包含多個 count(distinct) 聚合的查詢尤其如此,例如:
-- multiple distinct counts in one query tend to be slow
SELECT count(distinct a), count(distinct b), count(distinct c)
FROM table_abc;
對于這類查詢,worker 上產生的 select distinct 語句本質上會產生要傳輸到 coordinator 的行的 cross-product(叉積)。
為了提高性能,您可以選擇進行近似計數。請按照以下步驟操作:
- 在所有 PostgreSQL 實例(coordinator 和所有 worker)上下載并安裝 hll 擴展。有關獲取擴展的詳細信息,請訪問 PostgreSQL hll github 存儲庫。
https://github.com/citusdata/postgresql-hll
- 只需從 coordinator 運行以下命令,即可在所有 PostgreSQL 實例上創建 hll 擴展。
CREATE EXTENSION hll;
- 通過設置 Citus.count_distinct_error_rate 配置值啟用計數不同的近似值。此配置設置的較低值預計會提供更準確的結果,但需要更多時間進行計算。我們建議將其設置為 0.005。
SET citus.count_distinct_error_rate to 0.005;
在這一步之后,count(distinct) 聚合會自動切換到使用 HLL,而無需對您的查詢進行任何更改。您應該能夠在表的任何列上運行近似 count distinct 查詢。
HyperLogLog 列
某些用戶已經將他們的數據存儲為 HLL 列。在這種情況下,他們可以通過調用 hll_union_agg(hll_column) 動態匯總這些數據。
估計 Top N 個項
通過應用 count、sort 和 limit 來計算集合中的前 n 個元素很簡單。然而,隨著數據大小的增加,這種方法變得緩慢且資源密集。使用近似值更有效。
Postgres 的開源 TopN 擴展可以快速獲得 “top-n” 查詢的近似結果。該擴展將 top 值具體化為 JSON 數據類型。TopN 可以增量更新這些 top 值,或者在不同的時間間隔內按需合并它們。
- TopN 擴展
https://github.com/citusdata/postgresql-topn
基本操作
在查看 TopN 的實際示例之前,讓我們看看它的一些原始操作是如何工作的。首先 topn_add 更新一個 JSON 對象,其中包含一個 key 被看到的次數:
select topn_add('{}', 'a');
-- => {"a": 1}
-- record the sighting of another "a"
select topn_add(topn_add('{}', 'a'), 'a');
-- => {"a": 2}
該擴展還提供聚合以掃描多個值:
-- for normal_rand
create extension tablefunc;
-- count values from a normal distribution
SELECT topn_add_agg(floor(abs(i))::text)
FROM normal_rand(1000, 5, 0.7) i;
-- => {"2": 1, "3": 74, "4": 420, "5": 425, "6": 77, "7": 3}
如果 distinct 值的數量超過閾值,則聚合會丟棄那些最不常見的信息。這可以控制空間使用。閾值可以由 topn.number_of_counters GUC 控制。它的默認值為 1000。
現實例子
現在來看一個更現實的例子,說明 TopN 在實踐中是如何工作的。讓我們提取 2000 年的亞馬遜產品評論,并使用 TopN 快速查詢。首先下載數據集:
curl -L https://examples.citusdata.com/customer_reviews_2000.csv.gz | \
gunzip > reviews.csv
接下來,將其攝取到分布式表中:
CREATE TABLE customer_reviews
(
customer_id TEXT,
review_date DATE,
review_rating INTEGER,
review_votes INTEGER,
review_helpful_votes INTEGER,
product_id CHAR(10),
product_title TEXT,
product_sales_rank BIGINT,
product_group TEXT,
product_category TEXT,
product_subcategory TEXT,
similar_product_ids CHAR(10)[]
);
SELECT create_distributed_table('customer_reviews', 'product_id');
\COPY customer_reviews FROM 'reviews.csv' WITH CSV
接下來我們將添加擴展,創建一個目標表來存儲 TopN 生成的 json 數據,并應用我們之前看到的 topn_add_agg 函數。
-- run below command from coordinator, it will be propagated to the worker nodes as well
CREATE EXTENSION topn;
-- a table to materialize the daily aggregate
CREATE TABLE reviews_by_day
(
review_date date unique,
agg_data jsonb
);
SELECT create_reference_table('reviews_by_day');
-- materialize how many reviews each product got per day per customer
INSERT INTO reviews_by_day
SELECT review_date, topn_add_agg(product_id)
FROM customer_reviews
GROUP BY review_date;
現在,我們無需在 customer_reviews 上編寫復雜的窗口函數,只需將 TopN 應用于 reviews_by_day。例如,以下查詢查找前五天中每一天最常被評論的產品:
SELECT review_date, (topn(agg_data, 1)).*
FROM reviews_by_day
ORDER BY review_date
LIMIT 5;
┌─────────────┬────────────┬───────────┐
│ review_date │ item │ frequency │
├─────────────┼────────────┼───────────┤
│ 2000-01-01 │ 0939173344 │ 12 │
│ 2000-01-02 │ B000050XY8 │ 11 │
│ 2000-01-03 │ 0375404368 │ 12 │
│ 2000-01-04 │ 0375408738 │ 14 │
│ 2000-01-05 │ B00000J7J4 │ 17 │
└─────────────┴────────────┴───────────┘
TopN 創建的 json 字段可以與 topn_union 和 topn_union_agg 合并。我們可以使用后者來合并整個第一個月的數據,并列出該期間最受好評的五個產品。
SELECT (topn(topn_union_agg(agg_data), 5)).*
FROM reviews_by_day
WHERE review_date >= '2000-01-01' AND review_date < '2000-02-01'
ORDER BY 2 DESC;
┌────────────┬───────────┐
│ item │ frequency │
├────────────┼───────────┤
│ 0375404368 │ 217 │
│ 0345417623 │ 217 │
│ 0375404376 │ 217 │
│ 0375408738 │ 217 │
│ 043936213X │ 204 │
└────────────┴───────────┘
有關更多詳細信息和示例,請參閱 TopN readme。
百分位計算
在大量行上找到精確的百分位數可能會非常昂貴, 因為所有行都必須轉移到 coordinator 以進行最終排序和處理。另一方面,找到近似值可以使用所謂的 sketch 算法在 worker 節點上并行完成。 coordinator 節點然后將壓縮摘要組合到最終結果中,而不是讀取完整的行。
一種流行的百分位數 sketch 算法使用稱為 t-digest 的壓縮數據結構,可在 tdigest 擴展中用于 PostgreSQL。Citus 集成了對此擴展的支持。
以下是在 Citus 中使用 t-digest 的方法:
- 在所有 PostgreSQL 節點( coordinator 和所有 worker)上下載并安裝 tdigest 擴展。tdigest 擴展 github 存儲庫有安裝說明。
https://github.com/tvondra/tdigest
- 在數據庫中創建 tdigest 擴展。在 coordinator 上運行以下命令:
CREATE EXTENSION tdigest;
coordinator 也會將命令傳播給 worker。
當在查詢中使用擴展中定義的任何聚合時,Citus 將重寫查詢以將部分 tdigest 計算下推到適用的 worker。
T-digest 精度可以通過傳遞給聚合的 compression 參數來控制。權衡是準確性與 worker 和 coordinator 之間共享的數據量。有關如何在 tdigest 擴展中使用聚合的完整說明,請查看官方 tdigest github 存儲庫中的文檔。
限制下推
Citus 還盡可能將限制條款下推到 worker 的分片,以最大限度地減少跨網絡傳輸的數據量。
但是,在某些情況下,帶有 LIMIT 子句的 SELECT 查詢可能需要從每個分片中獲取所有行以生成準確的結果。例如,如果查詢需要按聚合列排序,則需要所有分片中該列的結果來確定最終聚合值。由于大量的網絡數據傳輸,這會降低 LIMIT 子句的性能。在這種情況下,如果近似值會產生有意義的結果,Citus 提供了一種用于網絡高效近似 LIMIT 子句的選項。
LIMIT 近似值默認禁用,可以通過設置配置參數 citus.limit_clause_row_fetch_count 來啟用。在這個配置值的基礎上,Citus 會限制每個任務返回的行數,用于在 coordinator 上進行聚合。由于這個 limit,最終結果可能是近似的。增加此 limit 將提高最終結果的準確性,同時仍提供從 worker 中提取的行數的上限。
SET citus.limit_clause_row_fetch_count to 10000;
分布式表的視圖
Citus 支持分布式表的所有視圖。有關視圖的語法和功能的概述,請參閱 CREATE VIEW 的 PostgreSQL 文檔。
- https://www.postgresql.org/docs/current/static/sql-createview.html
請注意,某些視圖導致查詢計劃的效率低于其他視圖。有關檢測和改進不良視圖性能的更多信息,請參閱子查詢/CTE 網絡開銷。(視圖在內部被視為子查詢。)
- https://docs.citusdata.com/en/v11.0beta/performance/performance_tuning.html#subquery-perf
Citus 也支持物化視圖,并將它們作為本地表存儲在 coordinator 節點上。
連接(Join)
Citus 支持任意數量的表之間的 equi-JOIN,無論它們的大小和分布方法如何。查詢計劃器根據表的分布方式選擇最佳連接方法和 join 順序。它評估幾個可能的 join 順序并創建一個 join 計劃,該計劃需要通過網絡傳輸最少的數據。
共置連接
當兩個表共置時,它們可以在它們的公共分布列上有效地 join。co-located join(共置連接) 是 join 兩個大型分布式表的最有效方式。
- https://docs.citusdata.com/en/v11.0-beta/sharding/data_modeling.html#colocation
注意
確保表分布到相同數量的分片中,并且每個表的分布列具有完全匹配的類型。嘗試加入類型略有不同的列(例如 `int` 和 `bigint`)可能會導致問題。
引用表連接
引用表可以用作“維度”表, 以有效地與大型“事實”表連接。因為引用表在所有 worker 上完全復制, 所以 reference join 可以分解為每個 worker 上的本地連接并并行執行。 reference join 就像一個更靈活的 co-located join 版本, 因為引用表沒有分布在任何特定的列上,并且可以自由地 join 到它們的任何列上。
- https://docs.citusdata.com/en/v11.0-beta/develop/reference_ddl.html#reference-tables
引用表也可以與 coordinator 節點本地的表連接。
重新分區連接
在某些情況下,您可能需要在除分布列之外的列上連接兩個表。對于這種情況,Citus 還允許通過動態重新分區查詢的表來連接非分布 key 列。
在這種情況下,要分區的表由查詢優化器根據分布列、連接鍵和表的大小來確定。使用重新分區的表,可以確保只有相關的分片對相互連接,從而大大減少了通過網絡傳輸的數據量。
通常,co-located join 比 repartition join 更有效,因為 repartition join 需要對數據進行混洗。因此,您應該盡可能通過 common join 鍵來分布表。