如何快速同步第三方平臺數據?
大家好,我是蘇三,又跟大家見面了。
前言
最近知識星球中有位小伙伴問了我一個問題:如何快速同步第三方平臺數據?
他們有個業務需求是:需要同步全國34個省市,多個系統的8種業務數據,到他們公司的系統當中。
他們需求同步全量的數據和增量的數據。
全量的數據主要是針對多個系統的歷史數據,大概有幾千萬數據,只需要初始化一次即可。
而增量的數據,是系統后續變更的數據。
這個需求其實不簡單,至少有以下難點:
- 不能直接訪問第三方數據庫。
- 不能將歷史數據導出到excel中,有泄露數據的風險。
- 如何快速同步歷史數據?
- 增量數據如何處理?
- 接口需要做限流嗎?
- 增量數據如何校驗數據的一致性?
帶著這些問題,開始今天的文章之旅。
1. 如何快速同步歷史數據?
想要快速同步歷史數據,第一個想到的可能是直接同步數據庫中的數據。
但多個第三方系統為了數據安全考慮,不可能直接把他們的數據庫訪問地址和相關賬號密碼告訴你。
即使他們告訴你了,但有很多個系統,你一個個去連數據庫查數據,也非常麻煩。
有些小伙伴可能會說:這好辦,讓第三方系統把他們的歷史數據導出到excel中,我們寫個程序解析去這些excel,就能將數據快速導入到我們的數據庫中。
這是個好辦法,但忽略了一點:這些數據是敏感數據,不能對外暴露。
因此導出excel的方案行不通。
那么,該如何快速同步歷史數據呢?
答:使用SFTP。
不知道你有沒有跟銀行對接過,SFTP在銀行業務中經常會用到。
那么,如何用SFTP同步數據呢?
2. 如何使用SFTP?
說起SFTP,就不得不說一說FTP。
我們都知道,FTP是用來傳送文件的協議。使用FTP實現遠程文件傳輸的同時,還可以保證數據傳輸的可靠性和高效性。
而SFTP是一種可以安全傳輸文件的協議,它是一種基于SSH(Secure Shell)的文件傳輸協議,它允許用戶將文件以加密的形式傳輸到遠程服務器上,以保護文件的安全性。
FTP和SFTP有哪些區別呢?
- 鏈接方式不同:FTP使用TCP的21號端口建立連接。而SFTP是在客戶端和服務器之間通過 SSH 協議 (即TCP22號端口) 建立的安全連接來傳輸文件。
- 安全性不同:SFTP使用加密傳輸認證信息和傳輸的數據,相對于FTP更安全一些。
- 傳輸效率不同:SFTP傳輸文件時使用了加密解密技術,因此傳輸效率比普通的FTP要低一些。
- 使用協議不同:FTP使用了TCP/IP協議,而SFTP使用了SSH協議。
- 安全通道:SFTP協議提供了一個安全通道,用于在網絡上的主機之間傳輸文件。而FTP協議沒有安全通道。
因此可見,我們使用SFTP來傳輸文件還是比較安全的。
那么,如何使用SFTP來實現同步歷史數據的需求呢?
答:這就需要我們做好SFTP的賬號、目錄和文件格式的規劃了。
2.1 賬號權限控制
首先需要運維同學搭建一個SFTP服務器,提供一個可以對外訪問的域名和端口號。
然后需要在根目錄下,創建一個存放文件的目錄,比如:/data。
然后給每個省市的第三方系統都創建一個子目錄,比如:/data/sichuan、/data/shenzhen、/data/beijing等。
接下來,我們需要給每個子目錄創建一個賬號,以及分配權限。
比如有個賬號是:sichuan,密碼是:sisuan123。這個賬號只擁有/data/sichuan目錄讀數據和寫數據的權限。
另外一個賬號是:shenzhen,密碼是:shenzhen123。這個賬號只擁有/data/目錄讀數據和寫數據的權限。
以此類推。
當然大家如果不放心,可以用在線工具,將密碼設置成一個8位的隨機字符串,包含字母、數字和特殊字符,這樣的密碼安全性相對來說要高一些。
這樣相關的第三方系統都有往SFTP自己目錄下讀和寫數據的權限。
在這里溫馨提醒一下:上面這些賬號讀數據的權限,主要是為了后面他們好排查問題用的,不是必須分配的,我們需要根據實際情況而定。
此外,還需要給我們自己分配一個賬號,開通對/data整個目錄的只讀權限。
2.2 統一數據格式
接下來,最關鍵的一步是要制定一個統一的文件格式和數據格式。
文件名稱為:sichuan_20230724.txt。
也就是用 省市拼音_日期.txt 的格式。
這樣大家就能非常清楚的看出,是哪個省市,哪個日期產生的數據。
然后我們需要規定txt文件的格式。
比如:id占20個字符,name占30個字符,金額占10個字符等等。
如果有些列的數據不滿對應的字符長度,前面可以補0。
這樣我們的程序,只需要在解析txt文件時,先讀取一行數據,是一個比較長的字符串,然后按照固定的長度,去解析字符串中每一列的數據即可。
2.3 使用job同步數據
假如第三方系統都按照我們要求,已將歷史數據寫入到指定目錄下的指定文件中。
這時我們需要提供一個job,去讀取/data目錄下,所有子目錄的txt文件,一個個解析里面包含的歷史數據,然后將這些數據,做一些業務邏輯處理,然后寫入我們的數據庫當中。
如圖所示:
圖片
當然如果想快一點處理完,我們可以在job中使用多線程解析和讀取不同的txt文件,然后寫數據。
3. 增量數據如何處理?
對于歷史數據,我們通過上面的方案,可以快速的同步數據。
但對于增量的數據如何處理呢?
增量的數據,對實時性要求比較高。
我們沒辦法跟之前一下,走SFTP同步文件,然后使用job定時解析文件的方案。
為了滿足數據實時性的需求,我們不得不走接口實時數據同步的方案。
那么,是第三方系統提供接口,還是我們這邊提供接口呢?
很顯然,如果讓第三方提供接口,第三方有那么多系統,我們需要對接很多很多接口,非常麻煩。
因此,這個接口必須由我們這邊提供。
我們這邊提供一個統一的數據上報接口,支持傳入批量的數據。
為了防止第三方系統,一次性傳入過多的參數,導致該接口超時,我們需要對單次上傳的數據條數做限制,例如:一次請求,最大允許上傳500條數據。
其實,光限制請求參數還不夠。
我們的這個數據上報接口,可能會被多個系統調用,并發量可能也不小。
為了防止在高并發下,請求量突增把我們的接口搞掛了,我們需要對接口限流。
我們可以使用redis記錄第三方系統請求的url和請求賬號,然后在程序中查詢redis中的次數,是否超過限額。允許每一個第三方系統,在1秒之內調用10次。第三方系統總的請求次數,1秒不超過500次。
如果超過了限額,則數據上報接口提示:請求太頻繁,請稍后再試。
圖片
為了增加數據上報接口的性能,在接收到數據之后,不直接寫庫。
我們可以將接口中接收到的數據作為mq消息,發送到mq服務器。
然后有專門的mq消費者,實時監聽mq服務器的消息,異步讀取消息寫入數據庫。
該方案比較適合,寫庫操作,包含了一些復雜的業務邏輯。
如果消費速度有點慢,我們可以及時調整mq消費者,使用多線程處理,或者增加mq中隊列的數量,增加mq消費者來增加消息的處理速度。
圖片
如果mq消費者在處理mq消息的過程中,由于網絡問題,寫庫失敗了,可以增加自動重試機制。
圖片
一旦mq消費者在mq消費過程中出現失敗的情況,則自動重試3次,如果還是失敗,則將消息寫入死信隊列,目前RocketMQ自帶了失敗重試功能。
然后有個job監控死信隊列,如果一旦發現異常數據,則發報警郵件給相關開發,后面人工處理。
4. 如何校驗數據一致性?
通過上面的方案,我們把歷史數據和增量的數據都已經處理了。
但還有一個問題:如何校驗數據一致性。
對于歷史數據,其實我們好處理,第三方系統已經生成好txt文件上傳到SFTP上了,我們可以直接對比那些文件即可。
但對于增量的數據,是第三方系統調用我們的數據上報接口,去上報的數據,這部分數據如何校驗數據一致性呢?
答:我們可以要求第三方系統,在某日凌晨,生成一份昨日的增量數據到txt文件,然后上傳到SFTP上。
我們有個job,在每天的凌晨1點會讀取第三方系統生成昨日增量數據,跟我們數據庫中昨日的增量數據做對比,校驗數據的差異性。
如果第三方后面產生的增量數據,只有新增,沒有刪除和修改,使用上面的方案是沒有問題的。
但如果增量的數據,包含了刪除和修改的數據,可能會有問題。
因為我們做比較的數據源是昨日的增量數據,而我們的job在比較數據的過程中,萬一第三方系統上報了我們正在對比的數據,更新成了一個新值,跟昨日的值不一樣,這樣對比數據就會產生差異。
那么,該如何解決這個問題呢?
答:我們可以只校驗昨日的數據(就修改時間是昨天),今日產生的增量數據,會在明日凌晨1點的job中會去校驗的。
在比較時,遍歷昨日增量txt文件中的每行數據,跟數據庫中的數據做對比,如果id相同,但是修改時間是今天,則忽略這條數據。
如果id相同,修改時間是昨天,則判斷數據是否一致,如果不一致,則用txt文件中的數據修復我們數據庫中的異常數據。
如果txt文件中的id,在我們數據庫中不存在,則新增一條數據。
圖片
這兩種情況產生的數據變動,修改時間要設置成昨天,不然明日的job又會再重新處理一次這條數據。