認識MongoDB 4.0的新特性——事務(Transactions)
前言
相信使用過主流的關系型數據庫的朋友對“事務(Transactions)”不會太陌生,它可以讓我們把對多張表的多次數據庫操作整合為一次原子操作,這在高并發場景下可以保證多個數據操作之間的互不干擾;并且一旦在這些操作過程任一環節中出現了錯誤,事務會中止并且讓數據回滾,這使得同時在多張表中修改數據的時候保證了數據的一致性。
以前 MongoDB 是不支持事務的,因此開發者在需要用到事務的時候,不得不借用其他工具,在業務代碼層面去彌補數據庫的不足。隨著 4.0 版本的發布,MongoDB 也為我們帶來了原生的事務操作,下面就讓我們一起來認識它,并通過簡單的例子了解如何去使用。
介紹
事務和副本集(Replica Sets)
副本集是 MongoDB 的一種主副節點架構,它使數據得到***的可用性,避免單點故障引起的整個服務不能訪問的情況的發生。目前 MongoDB 的多表事務操作僅支持在副本集上運行,想要在本地環境安裝運行副本集可以借助一個工具包——run-rs,以下的文章中有詳細的使用說明:
https://thecodebarbarian.com/...
事務和會話(Sessions)
事務和會話(Sessions)關聯,一個會話同一時刻只能開啟一個事務操作,當一個會話斷開,這個會話中的事務也會結束。
事務中的函數
- Session.startTransaction()
在當前會話中開始一次事務,事務開啟后就可以開始進行數據操作。在事務中執行的數據操作是對外隔離的,也就是說事務中的操作是原子性的。
- Session.commitTransaction()
提交事務,將事務中對數據的修改進行保存,然后結束當前事務,一次事務在提交之前的數據操作對外都是不可見的。
- Session.abortTransaction()
中止當前的事務,并將事務中執行過的數據修改回滾。
重試
當事務運行中報錯,catch 到的錯誤對象中會包含一個屬性名為 errorLabels 的數組,當這個數組中包含以下2個元素的時候,代表我們可以重新發起相應的事務操作。
- TransientTransactionError:出現在事務開啟以及隨后的數據操作階段
- UnknownTransactionCommitResult:出現在提交事務階段
示例
經過上面的鋪墊,你是不是已經迫不及待想知道究竟應該怎么寫代碼去完成一次完整的事務操作?下面我們就簡單寫一個例子:
場景描述: 假設一個交易系統中有2張表——記錄商品的名稱、庫存數量等信息的表 commodities,和記錄訂單的表 orders。當用戶下單的時候,首先要找到 commodities 表中對應的商品,判斷庫存數量是否滿足該筆訂單的需求,是的話則減去相應的值,然后在 orders 表中插入一條訂單數據。在高并發場景下,可能在查詢庫存數量和減少庫存的過程中,又收到了一次新的創建訂單請求,這個時候可能就會出問題,因為新的請求在查詢庫存的時候,上一次操作還未完成減少庫存的操作,這個時候查詢到的庫存數量可能是充足的,于是開始執行后續的操作,實際上可能上一次操作減少了庫存后,庫存的數量就已經不足了,于是新的下單請求可能就會導致實際創建的訂單數量超過庫存數量。
以往要解決這個問題,我們可以用給商品數據“加鎖”的方式,比如基于 Redis 的各種鎖,同一時刻只允許一個訂單操作一個商品數據,這種方案能解決問題,缺點就是代碼更復雜了,并且性能會比較低。如果用數據庫事務的方式就可以簡潔很多:
commodities 表數據(stock 為庫存):
- { "_id" : ObjectId("5af0776263426f87dd69319a"), "name" : "滅霸原味手套", "stock" : 5 }
- { "_id" : ObjectId("5af0776263426f87dd693198"), "name" : "雷神專用鐵錘", "stock" : 2 }
orders 表數據:
- { "_id" : ObjectId("5af07daa051d92f02462644c"), "commodity": ObjectId("5af0776263426f87dd69319a"), "amount": 2 }
- { "_id" : ObjectId("5af07daa051d92f02462644b"), "commodity": ObjectId("5af0776263426f87dd693198"), "amount": 3 }
通過一次事務完成創建訂單操作(mongo Shell):
- // 執行 txnFunc 并且在遇到 TransientTransactionError 的時候重試
- function runTransactionWithRetry(txnFunc, session) {
- while (true) {
- try {
- txnFunc(session); // 執行事務
- break;
- } catch (error) {
- if (
- error.hasOwnProperty('errorLabels') &&
- error.errorLabels.includes('TransientTransactionError')
- ) {
- print('TransientTransactionError, retrying transaction ...');
- continue;
- } else {
- throw error;
- }
- }
- }
- }
- // 提交事務并且在遇到 UnknownTransactionCommitResult 的時候重試
- function commitWithRetry(session) {
- while (true) {
- try {
- session.commitTransaction();
- print('Transaction committed.');
- break;
- } catch (error) {
- if (
- error.hasOwnProperty('errorLabels') &&
- error.errorLabels.includes('UnknownTransactionCommitResult')
- ) {
- print('UnknownTransactionCommitResult, retrying commit operation ...');
- continue;
- } else {
- print('Error during commit ...');
- throw error;
- }
- }
- }
- }
- // 在一次事務中完成創建訂單操作
- function createOrder(session) {
- var commoditiesCollection = session.getDatabase('mall').commodities;
- var ordersCollection = session.getDatabase('mall').orders;
- // 假設該筆訂單中商品的數量
- var orderAmount = 3;
- // 假設商品的ID
- var commodityID = ObjectId('5af0776263426f87dd69319a');
- session.startTransaction({
- readConcern: { level: 'snapshot' },
- writeConcern: { w: 'majority' },
- });
- try {
- var { stock } = commoditiesCollection.findOne({ _id: commodityID });
- if (stock < orderAmount) {
- print('Stock is not enough');
- session.abortTransaction();
- throw new Error('Stock is not enough');
- }
- commoditiesCollection.updateOne(
- { _id: commodityID },
- { $inc: { stock: -orderAmount } }
- );
- ordersCollection.insertOne({
- commodity: commodityID,
- amount: orderAmount,
- });
- } catch (error) {
- print('Caught exception during transaction, aborting.');
- session.abortTransaction();
- throw error;
- }
- commitWithRetry(session);
- }
- // 發起一次會話
- var session = db.getMongo().startSession({ readPreference: { mode: 'primary' } });
- try {
- runTransactionWithRetry(createOrder, session);
- } catch (error) {
- // 錯誤處理
- } finally {
- session.endSession();
- }
上面的代碼看著感覺很多,其實 runTransactionWithRetry 和 commitWithRetry 這兩個函數都是可以抽離出來成為公共函數的,不需要每次操作都重復書寫。用上了事務之后,因為事務中的數據操作都是一次原子操作,所以我們就不需要考慮分布并發導致的數據一致性的問題,是不是感覺簡單了許多?
你可能注意到了,代碼中在執行 startTransaction 的時候設置了兩個參數——readConcern 和 writeConcern,這是 MongoDB 讀寫操作的確認級別,在這里用于在副本集中平衡數據讀寫操作的可靠性和性能,如果在這里展開就太多了,所以感興趣的朋友建議去閱讀官方文檔了解一下:
readConcern:
https://docs.mongodb.com/mast...
writeConcern: