ETL,是英文 Extract-Transform-Load 的縮寫,用來描述將數據從來源端經過抽取(extract)、轉換(transform)、加載(load)至目的端的過程。
更關鍵的是,數據來源的業務系統也是在不斷地更新維護中的,任何一個變更都會對下游的數據分析程序產生巨大的影響。因此,有了ETL過程作為一個緩沖區,當上游的業務系統變更時,只需要對ETL過程進行相應變更,下游的數據分析就能夠比較穩定,從而降低系統維護成本。
ETL,是英文 Extract-Transform-Load 的縮寫,用來描述將數據從來源端經過抽取(extract)、轉換(transform)、加載(load)至目的端的過程。
1、數據清洗
首先進行數據清洗,對原始數據中的錯誤予以糾正,或者對缺失數據進行補填。譬如,現在要建設一個增值稅發票的數據中臺。這時,系統從許多不同的來源采集與增值稅發票相關的數據。當收集完這些原始數據以后,進行數據清洗工作。增值稅發票的數據結構如下圖所示。

增值稅發票的數據結構圖
在正常的增值稅發票的數據結構中,每張進項發票都應當有至少一條發票明細。然而,可能由于采集的數據不一致,發票與明細經常不是同時到來,可能相差幾天,造成用發票分析的數據與用發票明細分析的數據不一致。這時,必須要先補填一個發票明細,雖然商品名稱與數量不知道,但至少要保證發票明細的金額之和要等于發票金額,才不至于影響后續的分析質量。至于商品名稱,可以暫時補填一個“未知商品”。這樣,當該發票真正的發票明細到來時,再覆蓋原有補填的明細。
此外,原本每張發票都應當有購方納稅人與銷方納稅人,然而由于納稅人信息的基礎數據來源于不同的系統,可能造成該發票的納稅人信息不在納稅人信息表中的情況。這時,必須要補填一條納稅人信息,使得發票表與納稅人能夠對應上,不會造成數據無法關聯而缺失數據。
同理,每個納稅人都應當有各自的稅務機關、地域和行業,這些信息都可能缺失。對于稅務機關和地域,可以通過納稅人社會信用代碼中的內容進行推測。但是,行業信息是無法推測的。即使無法推測,也不能將其置為null,而是填一個默認值X99999,對應到行業表中的“未知行業”。
數據清洗的過程通過SparkSQL來實現。通過SparkSQL從原始表中查詢數據,然后經過以下處理過程,最終寫入ETL臨時表中:
1/**
2
3 * @author fangang
4
5 */
6
7object ZzsfpJx {
8
9 def main(args: Array[String]): Unit = {
10
11 val task = LogUtils.start("zzsfpJxQd")
12
13 try {
14
15 val spark = SparkUtils.init("zzsfpJx")
16
17 val ETLFPMAPNUM = PropertyFile.getProperty("ETLFPMAPNUM").toInt
18
19 spark.udf.register("getJxfpId", (fpdm:String, fphm:String, kprq:String) =>
20 if(null==kprq) fpdm+"X"+fphm+"X" else fpdm+"X"+fphm+"X"+kprq)
21 UdfRegister.fillNsr(spark)
22 UdfRegister.fillSwjg(spark)
23 UdfRegister.cutSL(spark)
24
25
26 val result = spark.sql("SELECT getJxfpId(D.FPDM,D.FPHM,D.KPRQ) JXFP_ID,D.FPDM,D.
27 FPHM,'YB' FP_LB,D.JE JE,cast(cutSL(D.SE/D.JE) as double) SL,D.
28 SE SE,fillNsr(D.XFSBH) XF_NSRSBH, D.XFMC XF_NSRMC, fillNsr(D.GFSBH) GF_NSRSBH,
29 D.GFMC GF_NSRMC,D.KPRQ, D.KPRQ RZSJ, D.XF_QXSWJG_DM SWJG_DM,
30 from_unixtime(unix_timestamp(),'yyyy-MM-dd HH:mm:ss') CZSJ,NSR.SWJG_KEY GF_SWJG_DM,
31 getSwjg(D.XF_QXSWJG_DM,fillNsr(D.XFSBH)) XF_SWJG_DM, case trim(D.fpzt_dm)
32 when '0' then 'N' when '1' then 'N' else 'Y' end ZFBZ,'' SKM,'' SHRSBH,'
33 ' SHRMC,'' FHRSBH,'' FHRMC,'' QYD,'' SKPH, D.JSHJ,'' CZCH,'' CCDW,''
34 YSHWXX,D.BZ,D.tspz_dm as TSPZBZ,CASE WHEN
35 length(trim(D.zfrq))>15 THEN D.zfrq ELSE NULL END ZFSJ FROM dzdz.DZDZ_FPXX_ZZSFP
36 D JOIN DW.DW_DM_NSR NSR ON D.GFSBH = NSR.NSR_KEY and
37 NSR.WDBZ='1'").repartition(ETLFPMAPNUM)
38
39 DataFrameUtils.saveAppend(result, "etl", "etl_jxfp")
40
41 LogUtils.end(task)
42 } catch { case ex:Exception => LogUtils.error(task, ex) }
43 }
44}
45
在以上SparkSQL程序中,首先從原始數據dzdz.DZDZ_FPXX_ZZSFP中查詢數據,通過公用方法UdfRegister.fillNsr(spark)與UdfRegister.fillSwjg(spark)對納稅人與稅務機關進行補填,保證發票在與納稅人信息、稅務機關信息關聯時不會因為數據為null而造成數據缺失。最終,將結果數據寫入etl_jxfp的臨時表中。
此外,在處理發票明細時加入了這樣一段語句:
1val result1 = spark.sql("SELECT getJxfpqdId(R.FPDM,R.FPHM,R.KPRQ,'00','1') JXFPQD_ID,
2
3getJxfpId(R.FPDM,R.FPHM,R.KPRQ) JXFP_ID ,1.0 HH,'YB' FP_LB,
4
5'無商品明細' WP_MC,'' WP_DW,'' WP_XH,1.0 WP_SL,R.JE DJ,R.JE, cast(cutSL(R.SL) as double) SL,R.SE,R.RZSJ, from_unixtime(unix_timestamp(),'yyyy-MM-dd HH:mm:ss') CZSJ,R.KPRQ,'00' QDBZ,'' SKPH,'' SFZHM,'' CD,'' HGZS,'' JKZMSH,'' SJDH,'' FDJHM,
6
7'' CJHM,'' DH,'' ZH,'' KHYH,'' DW,'' XCRS,0.0 JSHJ,'9999999999999999999' spbm "+s"FROM dzdz.DZDZ_HWXX_ZZSFP D
8
9RIGHT JOIN etl.ETL_JXFP R ON (D.FPDM = R.FPDM AND D.FPHM = R.FPHM)
10
11WHERE (D.FPDM is null or D.FPHM is null) and R.FP_LB='YB' ").repartition(ETLFPMAPNUM)
12
13DataFrameUtils.saveAppend(result1, "etl", "etl_jxfp_qd")
14
通過該語句在發票明細中加入了名為“無商品明細”的記錄,保證發票明細、發票的金額與稅額沒有缺失,保障后續數據分析的準確性。
2.數據轉換
以上一系列的數據清洗,可以有效杜絕因為缺失數據或關聯不上造成的數據分析質量問題。接著,就是數據轉換與集成。
數據中臺的數據來源于不同的業務系統,因此數據格式、計算口徑都可能存在差異。當把它們都抽取到數據中臺以后,應當將其轉換成統一口徑,并規范計算口徑。譬如,如何識別代開發票,不同的系統有不同的判斷邏輯,但經過數據轉換以后,可以在表中增加一個“是否代開發票”字段,這樣后續的分析業務就不必再去判斷了,直接看該字段即可。此外,同樣是稅務機關代碼,有的系統是9位,有的系統是11位,應該將它們都統一成11位。以上這些工作就是數據轉換。
3.數據集成
清洗和轉換工作完成以后,將相同或者相似的數據都集成在一起。譬如,從各個不同路徑采集的納稅人信息,包括納稅人的基礎信息、認證信息、核定信息、資格信息,都集成到了納稅人表中;從各個不同路徑采集的各種不同的增值稅發票,如增值稅專票、增值稅普票、機動車統一銷售發票、電子發票等類型的發票,都統一集成到發票信息表中。它們都來源于不同的業務系統,字段與類型都各不相同。因此,在集成的過程中,需要進行轉換或補填,彼此格式一致,并最終存入同一張表中。譬如,其他發票都有發票明細,但機動車統一銷售發票沒有,因此需要給它補填一條發票明細,商品就是那輛汽車,金額與稅額都是那張發票的金額與稅額。
在具體設計實現上,就是為每一種發票都編寫一個發票與發票明細的SparkSQL程序。它們分別從各自的原始數據中獲取,但經過一個SQL語句的轉換,最終都存入名為etl_jxfp與etl_jxfp_qd的發票與發票明細臨時表中。
本書摘編自《架構真意:企業級應用架構設計方法論與實踐》,經出版方授權發布。