創建SSIS包:ETL中典型的數據清洗
這個例子的情景是一個信用卡公司,目前正著手于拓展Florida州新成立的一些公司的業務。市場部門每周都會向這些公司發送一些郵件,我們要為所有的郵件準備抽取數據。假設Florida州提供的一個上面這個dat文件,它是從老的計算機系統里面得到的,它是定長分隔的,這意味著文件中沒有分隔符,必須手工設置分隔列的長度。從下面的連接下載這個.dat文件:010305c.dat。如果使用工具查看,它們的模樣類似下面的:
- 01 ANNUAL_MICRO_DATA_REC.
- 03 ANNUAL_COR_NUMBER PIC X(12).
- 03 ANNUAL_COR_NAME PIC X(48).
- 03 ANNUAL_COR_STATUS PIC X(01).
- 03 ANNUAL_COR_FILING_TYPE PIC X(15).
- 03 ANNUAL_COR_2ND_MAIL_ADD_1 PIC X(42).
- 03 ANNUAL_COR_2ND_MAIL_ADD_2 PIC X(42).
- 03 ANNUAL_COR_2ND_MAIL_CITY PIC X(28).
- 03 ANNUAL_COR_2ND_MAIL_STATE PIC X(02).
- 03 ANNUAL_COR_2ND_MAIL_ZIP PIC X(10).
- 03 ANNUAL_COR_2ND_MAIL_COUNTRY PIC X(02).
- 03 ANNUAL_COR_FILE_DATE PIC X(08).
- 03 ANNUAL_COR_FEI_NUMBER PIC X(14).
- 03 ANNUAL_MORE_THAN_SIX_OFF_FLAG PIC X(01).
- 03 ANNUAL_LAST_TRX_DATE PIC X(08).
- 03 ANNUAL_STATE_COUNTRY PIC X(02).
- 03 ANNUAL_REPORT_YEAR_1 PIC X(04).
- 03 ANNUAL_HOUSE_FLAG_1 PIC X(01).
- 03 ANNUAL_REPORT_DATE_1 PIC X(08).
- 03 ANNUAL_REPORT_YEAR_2 PIC X(04).
- 03 ANNUAL_HOUSE_FLAG_2 PIC X(01).
- 03 ANNUAL_REPORT_DATE_2 PIC X(08).
- 03 ANNUAL_REPORT_YEAR_3 PIC X(04).
- 03 ANNUAL_HOUSE_FLAG_3 PIC X(01).
- 03 ANNUAL_REPORT_DATE_3 PIC X(08).
- 03 ANNUAL_RA_NAME PIC X(42).
- 03 ANNUAL_RA_NAME_TYPE PIC X(01).
- 03 ANNUAL_RA_ADD_1 PIC X(42).
- 03 ANNUAL_RA_CITY PIC X(28).
- 03 ANNUAL_RA_STATE PIC X(02).
- 03 ANNUAL_RA_ZIP5 PIC X(05).
- 03 ANNUAL_RA_ZIP4 PIC X(04).
- 03 ANNUAL_PRINCIPALS OCCURS 6 TIMES.
- 05 ANNUAL_PRINC_TITLE PIC X(04).
- 05 ANNUAL_PRINC_NAME_TYPE PIC X(01).
- 05 ANNUAL_PRINC_NAME PIC X(42).
- 05 ANNUAL_PRINC_ADD_1 PIC X(42).
- 05 ANNUAL_PRINC_CITY PIC X(28).
- 05 ANNUAL_PRINC_STATE PIC X(02).
- 05 ANNUAL_PRINC_ZIP5 PIC X(05).
- 05 ANNUAL_PRINC_ZIP4 PIC X(04).
- 03 FILLER PIC X(04).
創建數據源
這個文件的內容看上去不知所云,不可能像普通的文本文件一樣處理它們。下面要建一個package來清洗和這個類似的數據,得到有用的信息。package完成下面的任務:
從010305c.dat的存放路徑下將文件內容抽取出來存放到本地數據庫
將文件歸檔避免多次下載
當一列數據丟失,這一列需要自動重新添加
當一行數據丟失,需要輸出錯誤的行
新添加一個Package,重命名為CorporationLoad.dtsx,右擊Connection Managers選擇新添加一個連接,選擇AdventureWorks。創建一個新的Flat File Connection連接,重命名為Corporation Extract連接到010305c.dat。這里不需要設置分隔符,而是選擇定長格式,也不需要設置列分隔符,也沒有必要選擇第一行設置為列名選擇項。
定長格式文件意味著每一列不是由分隔符來分隔,必須手動設置每一列的開始和結束。大多數的大型機文件都是這種格式,你會發現這種設置會有些繁瑣。打開文件界面并推斷每一列的開始位置。點擊Column標簽界面如圖5-6。在Row Widh欄內輸入1172字符(這個數字表示文件的開始)。
圖5-6
下一步,在列上設置豎直線標示每一列。在標尺刻度上左擊設置豎直線,在這個例子中,可以使用下面的表中的提示來設置列。在豎直線上雙擊可以刪除,左擊拖動可以移動豎直線。
標尺刻度值 |
列名 |
12 |
CorporateNumber |
60 |
CorporationName |
61 |
CorporationStatus |
65 |
FilingType |
118 |
MailingAddressLine1 |
160 |
MailingAddressLine2 |
188 |
City |
190 |
State |
200 |
ZipCode |
202 |
Country |
210 |
FilingDate |
224 |
FEINumber |
1172 |
Records you will throw out |
在這個表中可以看到丟棄了大部分數據。添加完豎直線之后,點擊Advanced標簽界面,點擊Suggest Types,在Suggest Column Types對話框中接受默認設置,點擊OK。默認數據類型設置會滿足大部分的數據類型需要,但是也會有一些錯誤。在Column 8(ZipCode column)需要修改DataType選擇項為String[DT_STR],OutputColumnWidth設置為10。最后修改Column 10為String[DT_STR],OutputColumnWidth保持默認,點擊OK保存設置。
創建數據流
在Control Flow 界面內拖放一個Data Flow task,雙擊進入數據流標簽界面。在界面中拖放一個Flat File數據源重命名為Uncleansed Corporate Data,雙擊并選擇上文中新建的數據連接,點擊進入Columns標簽界面反選Column 11和Column 12,這意味著市場部門不需要這兩列數據。在后面的工作中將添加Destination和數據轉換任務。
處理臟數據
在進行下一步操作之前,先暫停來了解一下數據。我們創建了數據連接,你可能會注意到有一些數據行是空白的 ,例如city和state的一些記錄是沒有的。為了解決這個問題,需要使用一些任務將規范的的數據送到一個路徑,有缺損的數據送到另一個路徑。然后嘗試清洗壞的數據并送回到主要路徑中。也可能有一些不能清洗的數據,需要寫入錯誤日志。
首先,設置郵政編碼為5位字符,一些包含破折號的有10位字符,還有9位的。使用Derived Column轉換來標準化,從工具欄中拖放一個Derived Column數據轉換任務重命名為Standardize Zip Code。
使用箭頭連線將Load Corporate Data和Standardize Zip Code連接起來,雙擊Standardize Zip Code打開編輯界面,展開左邊欄中的Column樹形結構,點擊Column 8拖放到下方的表格內,這里會在表格內預先填入一些信息。為了輸出5位郵政編碼需要編寫一個表達式只截取5位。使用SUBSTRING函數可以實現這種功能,代碼如下:
SUBSTRING([Column 8],1,5)
在表格Expression列中輸入上面代碼,在Derived列中選擇replace the existing Column 8,最后可以看到界面如圖5-7,完成編輯后點擊OK退出界面。
圖5-7
用Conditional Split進行數據轉換
現在數據規范化了,從工具欄中拖放一個Conditional Split數據轉換任務,使用箭頭連線把它和Standardize Zip Code連接起來,將Conditional Split重命名為Find Bad Record。Conditional Split將把一些不符合要求的數據進行清洗。
為了去掉沒有city或state的數據行,需要編寫條件將缺失city或state的數據轉移到一個數據流。雙擊Find Bad Record打開編輯界面,新建一個Missing State or City條件,在Output Name列內輸入該名字。編寫一個表達式來查找空的記錄。一種方法是使用LTRIM函數。兩個豎線|用來實現邏輯或。下面的代碼用來查找Column 6和Column7。
LTRIM([Column 6]) == "" || LTRIM([Column 7]) == ""
最后要給不滿足條件的數據命名。這里不滿足上述條件的數據命名為Good Data,如圖5-8
圖5-8
使用Look Up轉換數據
從工具欄中拖放LookUp數據轉換重命名為Fix Bad Records,當你把它和上一個數據轉換Find Bad Record連接起來的時候,將會彈出Input Output Selection對話框如圖5-9。下拉列表框中選擇Missing State or City選擇項,點擊OK。這將有缺損的數據從Find Bad Record中送出。
圖5-9
LookUp轉換可以根據數據庫中ZipCode表中的數據來補全數據行中缺失的city和state。雙擊打開編輯界面,點擊Connection標簽界面,選擇AdventureWorks數據源和ZipCode數據表。點擊Columns標簽界面,點擊Column 8不放拖放到右邊ZipCode列上,這樣在兩邊的表上建立一個連線如圖5-10。然后再右邊表中選中State和City列,這兩列會出現在下方的表格中,ZipName會替換Column 6,State會替換Column 7 如圖5-10。點擊OK退出編輯界面。
圖5-10
使用Union All合并
現在數據被清洗,要使用Union All轉換將清洗后的數據送回到主要數據流中。在工具欄中拖放一個Union All轉換,從Fix Bad Records向Union All拖放一個連線,從Find Bad Records向Union拖放一個連線,Union All不再需要其他配置。
最后設置
最后需要將數據流送到一個OLE DB 目的中。從工具欄中拖放一個OLE DB Destination,重命名為Mail Merge Table。從Union All向它拖放一個連線,雙擊選擇AdventureWorks數據源,Use a Table or View選擇項中點擊New button。默認的建表語句使用的是表名是Mail Merge Table,數據類型可能有些不是很合適的,代碼如下:
- CREATE TABLE [Mail Merge Table] (
- [Column 0] VARCHAR(12),
- [Column 1] VARCHAR(48),
- [Column 2] VARCHAR(1),
- [Column 3] VARCHAR(4),
- [Column 4] VARCHAR(53),
- [Column 5] VARCHAR(42),
- [Column 6] VARCHAR(28),
- [Column 7] VARCHAR(2),
- [Column 8] VARCHAR(10),
- [Column 9] VARCHAR(2),
- [Column 10] VARCHAR(10)
- )
修改代碼中的表名和列名,修改后的代碼如下:
- CREATE TABLE MarketingCorporation(
- CorporateNumber varchar(12),
- CorporationName varchar(48),
- FilingStatus char(1),
- FilingType char(4),
- AddressLine1 varchar(53),
- AddressLine2 varchar(42),
- City varchar(28),
- State char(2),
- ZipCode varchar(10),
- Country char(2),
- FilingDate varchar(50) NULL
- )
由于列名是不同的需要點擊Mapping標簽將列對應起來。
處理更多的臟數據
這個Package基本上完成了,但是這里有一個致命的缺陷。010305c.dat這個文件中有一些多余的數據,在Find Bad Records和Fix Bad Records之間添加一個data viewer可以查看這些多余的數據。
這樣可以查看在010305c.dat文件中有4條數據被Fix Bad Records處理,只有2條被清洗。另外2條不能被定為到Fix Bad Records中。在這個Package的需求中有一條是為市場部門提供一份地址列表用在郵件內容中。下圖5-11中顯示了錯誤所在。
圖5-11
在輸出界面中你可以看到如下的錯誤信息:
Error: 0xC020901E at Load Corporate Data, Fix Bad Records [87]: Row yielded no
match
during lookup.
Error: 0xC0209029 at Load Corporate Data, Fix Bad Records [87]: The "component "Fix
Bad
Records" (87)" failed because error code 0xC020901E occurred, and the error row
disposition on "output "Lookup Output" (89)" specifies failure on error. An error
occurred on the specified object of the specified component.
Error: 0xC0047022 at Load Corporate Data, DTS.Pipeline: The ProcessInput method on
component "Fix Bad Records" (87) failed with error code 0xC0209029. The identified
component returned an error from the ProcessInput method. The error is specific to
the
component, but the error is fatal and will cause the Data Flow task to stop
running.
Error: 0xC0047021 at Load Corporate Data, DTS.Pipeline: Thread "WorkThread0" has
exited
with error code 0xC0209029.
Error: 0xC0047039 at Load Corporate Data, DTS.Pipeline: Thread "WorkThread1"
received a
shutdown signal and is terminating. The user requested a shutdown, or an error in
another
thread is causing the pipeline to shutdown.
Error: 0xC0047021 at Load Corporate Data, DTS.Pipeline: Thread "WorkThread1" has
exited
with error code 0xC0047039.
不能因為這些錯誤提示而放棄這個Package,需要將錯誤信息輸入到錯誤消息隊列中以備查看。需要創建一個ErrorQueue表,從工具欄中拖放一個Audit轉換,重命名為Add Auditing Info,從Fix Bad Records中拖拽紅色箭頭連線連接到Add Auditing Info。
如圖5-12,可以看到Configure Error Output對話框,在這個對話框中配置如果錯誤出現時SSIS如何反應。Truncation列表明如果一行太長而不能加入到轉換中時所作的反應。Error列表明遇到轉換錯誤時如何反應。在Description列中可以看到期望捕獲的錯誤。例如對于Lookup轉換,需要捕獲的錯誤是lookup failure,意思是lookup轉換不能找到對應的輸入。在這個例子中選擇錯誤類型如圖5-10。默認情況會使任務失敗,結果會使整個Package失敗。也可以從下拉列表中選擇完全忽略錯誤。
圖5-10
完成配置之后點擊OK退出界面。
錯誤處理之后,雙擊Audit transform打開編輯界面,添加兩列。繼續添加兩列,在Audit Type列中選擇Task Name和Package Name,Output Column Name默認同名,去掉名字中的空格,如圖5-13。由于可能有多個Package向表中寫入數據,所以這些信息是必須的。
圖5-13
最后的工作是將臟數據送入到SQL Server中的ErrorQueue表中,從工具欄中拖放另外一個OLE DB目的,重命名為Error Queue,雙擊選擇AdventureWorks數據源,點擊New新添加一個表,重命名表名ErrorQueue,代碼如下:
- CREATE TABLE ErrorQueue(
- CorporateNumber varchar(12),
- CorporationName varchar(48),
- FilingStatus char(1),
- FilingType char(4),
- AddressLine1 varchar(53),
- AddressLine2 varchar(42),
- City varchar(28),
- State char(2),
- ZipCode varchar(10),
- Country char(2),
- FilingDate varchar(10) NULL,
- ErrorCode INT,
- ErrorColumn INT,
- TaskName NVARCHAR(19),
- PackageName NVARCHAR(30)
- )
注意:可以看到這個表中的信息是很籠統的。
這次需要點擊mapping將列一一對應起來,點擊OK退出編輯界面。現在可以執行這個Package了,4條記錄被清洗,2條送到error queue。執行成功之后的界面如圖5-14。
圖5-14
原文鏈接:http://www.cnblogs.com/tylerdonet/archive/2011/04/22/2023843.html