【大數(shù)據(jù)】Hive 小文件治理和 HDFS 數(shù)據(jù)平衡講解
一、Hive 小文件概述
在Hive中,所謂的小文件是指文件大小遠(yuǎn)小于HDFS塊大小的文件,通常小于128 MB,甚至更少。這些小文件可能是Hive表的一部分,每個小文件都包含一個或幾個表的記錄,它們以文本格式存儲。
Hive通常用于分析大量數(shù)據(jù),但它在處理小文件方面表現(xiàn)不佳,Hive中存在大量小文件會引起以下問題:
- 存儲空間占用過多:在Hadoop生態(tài)系統(tǒng)中,每個小文件都將占用一定的存儲空間,而且每個小文件也需要一個塊來存儲。如果存在大量的小文件,將浪費(fèi)大量的存儲空間。
- 處理延遲:小文件數(shù)量過多,會引起大量IO操作,導(dǎo)致處理延遲。
- 查詢性能下降:小文件用于分區(qū)和表劃分,可能導(dǎo)致查詢延遲并降低查詢性能。此外,小文件還會增加元數(shù)據(jù)的數(shù)量,使得Hive在查詢元數(shù)據(jù)時變得更加緩慢。
- 數(shù)據(jù)傾斜:如果數(shù)據(jù)分布不均勻,會導(dǎo)致一些Reduce任務(wù)處理了完全不同的分區(qū),這會使某些Reduce任務(wù)的運(yùn)行速度與其他Reduce任務(wù)相比非常慢。
因此,為了避免這些問題,我們需要對Hive中小文件的處理進(jìn)行優(yōu)化,減少小文件數(shù)量和大小,以提高數(shù)據(jù)處理效率和準(zhǔn)確性。
二、Hive 小文件產(chǎn)生的背景
Hive中小文件產(chǎn)生的背景主要是因為以下兩個原因:
- 數(shù)據(jù)寫入頻率較高:如果表的寫入頻率較高,也就意味著會頻繁地添加、更新或刪除記錄,這可能會導(dǎo)致小文件的產(chǎn)生。由于Hive表被映射到HDFS文件,因此如果頻繁地寫入數(shù)據(jù),它們可能以小文件的形式存在。
- 映射表的切分限制:Hive表映射為HDFS文件時會按照數(shù)據(jù)塊大小進(jìn)行切分和管理。如果表中存在小于單個數(shù)據(jù)塊大小的數(shù)據(jù),生成的文件就會比數(shù)據(jù)塊小。這可能會導(dǎo)致大量小文件的產(chǎn)生,
綜上所述,Hive中小文件的存在與數(shù)據(jù)寫入頻率高和表映射為HDFS文件的切分方式有關(guān)。為了處理小文件問題,我們需要了解這些背景并針對其原因來優(yōu)化處理。
三、環(huán)境準(zhǔn)備
如果已經(jīng)有了環(huán)境了,可以忽略,如果想快速部署環(huán)境進(jìn)行測試可以參考我這篇文章:通過 docker-compose 快速部署 Hive 詳細(xì)教程
# 登錄容器
docker exec -it hive-hiveserver2 bash
# 連接hive
beeline -u jdbc:hive2://hive-hiveserver2:10000 -n hadoop
四、Hive 小文件治理
為了處理Hive中的小文件問題,可以采取以下一些有效措施:
- 文件合并:將多個小文件合并成一個大文件,采用 Hadoop 文件合并API可以將多個小文件合并成一個大文件。合并文件后,可以減少小文件數(shù)量,減少Hadoop文件管理負(fù)擔(dān),減少HDFS元數(shù)據(jù)和NameNode內(nèi)存消耗。
- 壓縮文件:可以使用壓縮算法(如gzip、bzip2等)對小文件進(jìn)行壓縮,這樣可以減少磁盤空間和網(wǎng)絡(luò)帶寬的使用,并減少小文件損壞的可能性。
- 存儲格式優(yōu)化:Hive支持多種存儲格式,如ORC、Parquet、Avro等。這些格式允許將多個小文件壓縮并序列化成一個大文件,存儲時占用更少的磁盤和網(wǎng)絡(luò)帶寬。存儲格式優(yōu)化對于處理小文件問題非常有效。
- 分區(qū)表:對于一些常變動的數(shù)據(jù),推薦使用分區(qū)表。分區(qū)表將數(shù)據(jù)按照不同的分區(qū)值存儲在不同的目錄中。這減少了小文件數(shù)量并提高了查詢效率。
- 垃圾回收:如果一個表舊數(shù)據(jù)經(jīng)常更新或刪除,就會產(chǎn)生大量無用的小文件,因此建議進(jìn)行垃圾回收。可以定期執(zhí)行HDFS文件刪除命令或者設(shè)置TTL等機(jī)制,定期刪除冗余數(shù)據(jù)以減少HDFS文件、元數(shù)據(jù)和NameNode內(nèi)存的消耗。
通過采取上述措施中的一種或多種,可以極大地減少Hive中小文件數(shù)量,優(yōu)化Hive表的表現(xiàn)并提高查詢效率。
1)小文件合并(常用)
可以使用以下命令將 Hive 表中的小文件合并為一個大文件:
set hive.merge.size.per.task=256000000;
set hive.merge.smallfiles.avgsize=16000000;
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
set hive.merge.mapfiles=true;
# 未分區(qū)
INSERT OVERWRITE TABLE table_new SELECT * FROM table_old;
# 分區(qū)
INSERT OVERWRITE TABLE table_new SELECT column1,column2 FROM table_old where partitions;
上述代碼中的參數(shù)含義如下:
- hive.merge.size.per.task:設(shè)置MapReduce任務(wù)處理的最大數(shù)據(jù)大小,單位是字節(jié),默認(rèn)為256MB。
- hive.merge.smallfiles.avgsize:設(shè)置如果小于該平均大小的文件需要合并在一起,以減小小文件的數(shù)量和規(guī)模,單位是字節(jié),默認(rèn)為16MB。
- hive.input.format:使用 CombinHiveInputFormat 作為輸入格式合并小文件。
- hive.merge.mapfiles:合并Map文件(.mapred或.mapreduce)以減少小文件的數(shù)量。
1、示例演示一(非分區(qū)表)
# 非分區(qū)表
CREATE TABLE student (
id INT,
name STRING,
age INT,
address STRING
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n';
-- 添加數(shù)據(jù),這里多執(zhí)行幾次,會生成多個文件,方便下面文件合并實(shí)驗
INSERT INTO TABLE student VALUES (1, 'stu1', 15, 'add1'),(2, 'stu2', 16, 'add2'),(3, 'stu3', 17, 'add3');
-- 也可使用LOAD DATA LOCAL
LOAD DATA LOCAL INPATH './stu.txt' INTO TABLE student;
從上圖可看到已經(jīng)有很多小文件了,接下來就是進(jìn)行合并了。執(zhí)行以下命令即可:
INSERT OVERWRITE TABLE student SELECT * FROM student;
已經(jīng)將多個文件合并成一個文件了,達(dá)到了小文件合并的效果了。
2、示例演示二(分區(qū)表)
其實(shí)用的多的還是按分區(qū)進(jìn)行合并,一般表都是有分區(qū)的,按分區(qū)合并的好處就是減少讀寫壓力,數(shù)據(jù)量大的情況下分批合并是非常友好的。
# 分區(qū)表
CREATE TABLE student_patitions (
id INT,
name STRING,
age INT,
address STRING
)
PARTITIONED BY (year string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n';
-- 開啟動態(tài)分區(qū),默認(rèn)是false
set hive.exec.dynamic.partition=true;
-- 開啟允許所有分區(qū)都是動態(tài)的,否則必須要有靜態(tài)分區(qū)才能使用。
set hive.exec.dynamic.partition.mode=nostrick;
-- Hive默認(rèn)情況下設(shè)置的最大動態(tài)分區(qū)創(chuàng)建數(shù)是100。
set hive.exec.max.dynamic.partitions=10000;
-- 添加數(shù)據(jù),這里多執(zhí)行幾次,會生成多個文件,方便下面文件合并實(shí)驗
INSERT INTO TABLE student_patitions PARTITION (year=2019) VALUES (1, 'stu1', 15, 'add1'),(2, 'stu2', 16, 'add2'),(3, 'stu3', 17, 'add3');
INSERT INTO TABLE student_patitions PARTITION (year=2023) VALUES (1, 'stu1', 15, 'add1'),(2, 'stu2', 16, 'add2'),(3, 'stu3', 17, 'add3');
-- 也可使用LOAD DATA LOCAL
LOAD DATA LOCAL INPATH './stu_pt.txt' INTO TABLE student_patitions PARTITION (year=2020);
從上圖可看到已經(jīng)有很多小文件了,接下來就是進(jìn)行合并了。執(zhí)行以下命令即可:
-- 按分區(qū)合并
insert overwrite table student_patitions partition(year=2019)
select id, name, age, address from student_patitions where year=2019;
-- 動態(tài)分區(qū)合并,有些版本不支持*,
-- *
insert overwrite table student_patitions partition(year) select * from student_patitions;
-- insert overwrite table student_patitions partition(year) select id, name, age, address from student_patitions;
-- 也可以通過load data方式
load data local inpath './stu_pt.txt' overwrite into table student_patitions partition(year=2019);
3、示例演示三(臨時表)
還有一個更靠譜的方案就是通過將現(xiàn)有的表數(shù)據(jù)合并寫到另外一張臨時新表,然后確認(rèn)合并無誤后,將原始表和表數(shù)據(jù)刪除,再將新表名改成舊表名。
示例如下:
-- 分區(qū)表
CREATE TABLE student_patitions2 (
id INT,
name STRING,
age INT,
address STRING
)
PARTITIONED BY (year string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n';
-- 開啟動態(tài)分區(qū),默認(rèn)是false
set hive.exec.dynamic.partition=true;
-- 開啟允許所有分區(qū)都是動態(tài)的,否則必須要有靜態(tài)分區(qū)才能使用。
set hive.exec.dynamic.partition.mode=nostrick;
-- Hive默認(rèn)情況下設(shè)置的最大動態(tài)分區(qū)創(chuàng)建數(shù)是100。
set hive.exec.max.dynamic.partitions=10000;
-- 添加數(shù)據(jù),這里多執(zhí)行幾次,會生成多個文件,方便下面文件合并實(shí)驗
INSERT INTO TABLE student_patitions2 PARTITION (year=2019) VALUES (1, 'stu1', 15, 'add1'),(2, 'stu2', 16, 'add2'),(3, 'stu3', 17, 'add3');
INSERT INTO TABLE student_patitions2 PARTITION (year=2023) VALUES (1, 'stu1', 15, 'add1'),(2, 'stu2', 16, 'add2'),(3, 'stu3', 17, 'add3');
-- 也可使用LOAD DATA LOCAL
LOAD DATA LOCAL INPATH './stu_pt.txt' INTO TABLE student_patitions2 PARTITION (year=2020);
創(chuàng)建臨時表并將添加合并數(shù)據(jù)
CREATE TABLE student_patitions2_temp (
id INT,
name STRING,
age INT,
address STRING
)
PARTITIONED BY (year string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n';
-- 按分區(qū)合并,有些版本不支持*
insert overwrite table student_patitions2_temp partition(year)
select * from student_patitions2;
-- insert overwrite table student_patitions2_temp partition(year) select id, name, age, address from student_patitions2;
-- 也可以通過load data方式
load data local inpath './stu_pt.txt' overwrite into table student_patitions2_temp partition(year=2019);
刪除舊表,修改表表名稱
# 刪表,如果是外部表還是刪除數(shù)據(jù)文件
DROP TABLE student_patitions2;
ALTER TABLE student_patitions2_temp RENAME TO student_patitions2;
2)文件壓縮
可以使用以下命令將表中的小文件進(jìn)行壓縮:
SET hive.exec.compress.output=true;
SET mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec;
INSERT OVERWRITE TABLE table_new SELECT * FROM table_old;
3)存儲格式優(yōu)化
使用存儲格式進(jìn)行優(yōu)化,可以將多個小文件壓縮和序列化成一個大文件。以下是使用ORC格式的實(shí)現(xiàn)示例:
SET hive.exec.compress.output=true;
SET orc.compress=SNAPPY;
SET hive.exec.orc.default.compress=SNAPPY;
CREATE TABLE table_new STORED AS ORC AS SELECT * FROM table_old;
上述代碼中的參數(shù)含義如下:
- hive.exec.compress.output:指定是否開啟壓縮,如果啟用則會對輸出進(jìn)行壓縮,以節(jié)省存儲空間和網(wǎng)絡(luò)帶寬。
- orc.compress:設(shè)置壓縮算法,這里使用SNAPPY。
- hive.exec.orc.default.compress:設(shè)置ORC文件默認(rèn)壓縮算法,這里使用SNAPPY。
4)分區(qū)表
可以使用以下SQL語句創(chuàng)建分區(qū)表:
CREATE TABLE table_new(
column1 INT,
column2 STRING
)
PARTITIONED BY (
day STRING
)
ROW FORMAT DELIMITED
STORED AS TEXTFILE;
這里將表按照分區(qū)值進(jìn)行存儲,可以提高查詢效率,減少小文件數(shù)量。
5)垃圾回收
刪除HDFS中過期的小文件可以減少 HDFS 的存儲開銷。
可以使用如下命令進(jìn)行刪除操作:
hdfs dfs -rm /path/to/file-*
也可以使用 HiveQL 參數(shù) EXPIRE 進(jìn)行垃圾回收,以將無用的文件從HDFS中刪除:
ALTER TABLE table_old DROP PARTITION (day '2016-01-01') PURGE;
上述代碼中將刪除舊的分區(qū)并從HDFS中永久刪除不再需要的數(shù)據(jù)。
綜上所述,可以通過上述方式來處理Hive中小文件問題,以提高Hive的查詢效率和性能。
五、HDFS 數(shù)據(jù)平衡
1)HDFS 數(shù)據(jù)傾斜
HDFS數(shù)據(jù)傾斜是指存在一些數(shù)據(jù)塊的大小明顯大于其他數(shù)據(jù)塊,導(dǎo)致作業(yè)在運(yùn)行時的處理時間和性能嚴(yán)重不平衡。這通常是由于數(shù)據(jù)分布不均勻,或者任務(wù)負(fù)載不均勻?qū)е碌摹ive的MapReduce作業(yè)經(jīng)常面臨HDFS數(shù)據(jù)傾斜的問題,這會導(dǎo)致一部分Mapper處理的數(shù)據(jù)量很大,而其他Mapper卻沒有得到充分利用。
以下是一些緩解HDFS數(shù)據(jù)傾斜的方法:
- 增大文件塊大小:如果您的作業(yè)經(jīng)常面臨數(shù)據(jù)傾斜問題,可以嘗試增大數(shù)據(jù)塊的大小。這樣可以降低Mapper需要處理的數(shù)據(jù)塊數(shù)量,從而減少數(shù)據(jù)塊分配不均衡的可能性。
- 數(shù)據(jù)合并:如果您的作業(yè)中存在大量較小的文件,可以嘗試將它們合并為幾個較大的文件。這樣可以減少地圖任務(wù)的數(shù)目,并有助于均衡任務(wù)的負(fù)載。
- 數(shù)據(jù)重分區(qū):如果在您的作業(yè)中數(shù)據(jù)分布極不均勻,可以嘗試使用數(shù)據(jù)重分區(qū)(例如Hive中的 CLUSTER BY 或 DISTRIBUTE BY 語句)來重新組織數(shù)據(jù)。這可以幫助將相似的數(shù)據(jù)放在同一個分區(qū)中,從而減少數(shù)據(jù)傾斜的可能性。
- 動態(tài)分區(qū):在Hive中,動態(tài)分區(qū)可用于根據(jù)數(shù)據(jù)中實(shí)際的分區(qū)鍵動態(tài)創(chuàng)建分區(qū)。它可以使用較小的數(shù)據(jù)塊大小來提高作業(yè)的并行性。動態(tài)分區(qū)還可以通過確保數(shù)據(jù)分配均衡來緩解數(shù)據(jù)傾斜的問題。
- 壓縮:使用壓縮技術(shù)可以減小數(shù)據(jù)塊大小,并減少傾斜問題的可能性。常用的壓縮格式包括Gzip、Snappy、LZO等。
HDFS數(shù)據(jù)傾斜不僅可能出現(xiàn)在數(shù)據(jù)塊的大小上,還可能出現(xiàn)在數(shù)據(jù)節(jié)點(diǎn)(Datanode)的負(fù)載上。如果一個Datanode存儲的數(shù)據(jù)塊遠(yuǎn)遠(yuǎn)多于其他Datanode,那么它處理作業(yè)時的負(fù)載將遠(yuǎn)高于其他節(jié)點(diǎn),從而導(dǎo)致整個集群性能下降。下面是一些緩解HDFS數(shù)據(jù)節(jié)點(diǎn)傾斜問題的方法:
- 增加節(jié)點(diǎn):可以向集群中添加更多的節(jié)點(diǎn),以增加存儲能力。這樣可以分散節(jié)點(diǎn)的負(fù)載,避免單個節(jié)點(diǎn)負(fù)載過高。盡管這樣做可能會增加集群的維護(hù)成本,但它可以提高集群的性能和可靠性。一般增加完新節(jié)點(diǎn)需要做數(shù)據(jù)平衡,要不然新節(jié)點(diǎn)磁盤使用率遠(yuǎn)低于其它節(jié)點(diǎn)的磁盤。
- 均衡數(shù)據(jù)分布:您可以使用HDFS中的均衡命令(hdfs balancer)來均衡數(shù)據(jù)分布。該命令將根據(jù)需要將塊移動到不同的節(jié)點(diǎn),以保持所有節(jié)點(diǎn)的負(fù)載相對均衡。
- 更改塊大小:當(dāng)塊大小不均衡時,您可以嘗試根據(jù)每個節(jié)點(diǎn)的存儲容量增加或減少塊大小,以確保每個節(jié)點(diǎn)的負(fù)載相對均衡。例如,如果一個節(jié)點(diǎn)存儲大量的小文件,則可以將塊大小增加到更適合這種情況的大小(例如512MB或1GB),以減少每個節(jié)點(diǎn)的塊數(shù)。
- 數(shù)據(jù)遷移:如果一個節(jié)點(diǎn)負(fù)載過高,您可以從該節(jié)點(diǎn)中移動一些塊到其他節(jié)點(diǎn)中,以減輕該節(jié)點(diǎn)的負(fù)載。這可以通過將塊從一個節(jié)點(diǎn)復(fù)制到另一個節(jié)點(diǎn)來實(shí)現(xiàn)。需要注意的是,這樣做可能會影響作業(yè)的性能,因此建議在維護(hù)合適的性能的同時進(jìn)行數(shù)據(jù)遷移。
需要注意的是,緩解HDFS數(shù)據(jù)節(jié)點(diǎn)傾斜問題需要綜合考慮多種因素,包括數(shù)據(jù)分布、集群規(guī)模、硬件配置等。根據(jù)具體情況,您可以采取不同的措施來緩解數(shù)據(jù)節(jié)點(diǎn)傾斜的問題。
2)HDFS 數(shù)據(jù)平衡
HDFS提供了 hdfs balancer 命令來進(jìn)行數(shù)據(jù)平衡呢。hdfs balancer命令可以讓HDFS集群重新均衡分布數(shù)據(jù)塊,保證HDFS集群中數(shù)據(jù)塊在各個節(jié)點(diǎn)上均衡分布。
hdfs balancer 命令的語法如下:
hdfs balancer -help
Usage: java Balancer
[-policy <policy>] the balancing policy: datanode or blockpool
[-threshold <threshold>] Percentage of disk capacity
[-exclude [-f <hosts-file> | comma-sperated list of hosts]] Excludes the specified datanodes.
[-include [-f <hosts-file> | comma-sperated list of hosts]] Includes only the specified datanodes.
參數(shù)詳解:
- -threshold:某datanode的使用率和整個集群使用率的百分比差值閾值,達(dá)到這個閾值就啟動hdfs balancer,取值從1到100,不宜太小,因為在平衡過程中也有數(shù)據(jù)寫入,太小無法達(dá)到平衡,默認(rèn)值:10
- -policy:分為blockpool和datanode,前者是block pool級別的平衡后者是datanode級別的平衡,BlockPool 策略平衡了塊池級別和 DataNode 級別的存儲。BlockPool 策略僅適用于 Federated HDFS 服務(wù)
- -exclude:不為空,則不在這些機(jī)器上進(jìn)行平衡
- -include:不為空,則僅在這些機(jī)器上進(jìn)行平衡-idleiterations:最大迭代次數(shù)
另外還有兩個常用的參數(shù):
- dfs.datanode.balance.bandwidthPerSec :HDFS做均衡時使用的最大帶寬,默認(rèn)為1048576,即1MB/s,對大多數(shù)千兆甚至萬兆帶寬的集群來說過小。不過該值可以在啟動balancer腳本時再設(shè)置,可以不修改集群層面默認(rèn)值。目前目前我們產(chǎn)線環(huán)境設(shè)置的是50M/s~100M/s。
- dfs.balancer.block-move.timeout:是一個Hadoop數(shù)據(jù)平衡命令hdfs balancer的選項之一,用于設(shè)置數(shù)據(jù)塊移動的最長時間。該選項指定了塊移動操作在多長時間內(nèi)必須完成。該選項默認(rèn)值為120000毫秒(即2分鐘),可以通過以下命令進(jìn)行修改:
簡單使用:
# 啟動數(shù)據(jù)平衡,默認(rèn)閾值為 10%
hdfs balancer
# 默認(rèn)相差值為10% 帶寬速率為10M/s,超時時間10分鐘,過程信息會直接打印在客戶端 ctrl+c即可中止
hdfs balancer -Ddfs.balancer.block-move.timeout=600000
#可以手動設(shè)置相差值 一般相差值越小 需要平衡的時間就越長,//設(shè)置為20% 這個參數(shù)本身就是百分比 不用帶%
hdfs balancer -threshold 20
#如果怕影響業(yè)務(wù)可以動態(tài)設(shè)置一下帶寬再執(zhí)行上述命令,1M/s
hdfs dfsadmin -setBalancerBandwidth 1048576
#或者直接帶參運(yùn)行,帶寬為1M/s
hdfs balancer -Ddfs.datanode.balance.bandwidthPerSec=1048576 -Ddfs.balancer.block-move.timeout=600000