成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

Storm入門教程 第五章 一致性事務

開發 前端
Storm是一個分布式的流處理系統,利用anchor和ack機制保證所有tuple都被成功處理。如果tuple出錯,則可以被重傳,但是如何 保證出錯的tuple只被處理一次呢?Storm提供了一套事務性組件Transaction Topology,用來解決這個問題。

Storm是一個分布式的流處理系統,利用anchor和ack機制保證所有tuple都被成功處理。如果tuple出錯,則可以被重傳,但是如何 保證出錯的tuple只被處理一次呢?Storm提供了一套事務性組件Transaction Topology,用來解決這個問題。

 Transactional Topology目前已經不再維護,由Trident來實現事務性topology,但是原理相同。

5.1一致性事務的設計

Storm如何實現即對tuple并行處理,又保證事務性。本節從簡單的事務性實現方法入手,逐步引出Transactional Topology的原理。

5.1.1 簡單設計一:強順序流

保證tuple只被處理一次,最簡單的方法就是將tuple流變成強順序的,并且每次只處理一個tuple。從1開始,給每個tuple都順序加上 一個id。在處理tuple的時候,將處理成功的tuple id和計算結果存在數據庫中。下一個tuple到來的時候,將其id與數據庫中的id做比較。如果相同,則說明這個tuple已經被成功處理過了,忽略 它;如果不同,根據強順序性,說明這個tuple沒有被處理過,將它的id及計算結果更新到數據庫中。

以統計消息總數為例。每來一個tuple,如果數據庫中存儲的id 與當前tuple id不同,則數據庫中的消息總數加1,同時更新數據庫中的當前tuple id值。如圖:

強順序流

         但是這種機制使得系統一次只能處理一個tuple,無法實現分布式計算。

5.1.2 簡單設計二:強順序batch流

為了實現分布式,我們可以每次處理一批tuple,稱為一個batch。一個batch中的tuple可以被并行處理。

我們要保證一個batch只被處理一次,機制和上一節類似。只不過數據庫中存儲的是batch id。batch的中間計算結果先存在局部變量中,當一個batch中的所有tuple都被處理完之后,判斷batch id,如果跟數據庫中的id不同,則將中間計算結果更新到數據庫中。

如何確保一個batch里面的所有tuple都被處理完了呢?可以利用Storm提供的CoordinateBolt。如圖:

順序batches

但是強順序batch流也有局限,每次只能處理一個batch,batch之間無法并行。要想實現真正的分布式事務處理,可以使用storm提供的Transactional Topology。在此之前,我們先詳細介紹一下CoordinateBolt的原理。

5.1.3 CoordinateBolt原理

CoordinateBolt具體原理如下:

  • 真正執行計算的bolt外面封裝了一個CoordinateBolt。真正執行任務的bolt我們稱為real bolt。
  • 每個CoordinateBolt記錄兩個值:有哪些task給我發送了tuple(根據topology的grouping信息);我要給哪些tuple發送信息(同樣根據groping信息)
  •  Real bolt發出一個tuple后,其外層的CoordinateBolt會記錄下這個tuple發送給哪個task了。
  • 等所有的tuple都發送完了之后,CoordinateBolt通過另外一個特殊的stream以emitDirect的方式告訴所有它發送過 tuple的task,它發送了多少tuple給這個task。下游task會將這個數字和自己已經接收到的tuple數量做對比,如果相等,則說明處理 完了所有的tuple。
  • 下游CoordinateBolt會重復上面的步驟,通知其下游。

整個過程如圖所示:

coordinateBolt

CoordinateBolt主要用于兩個場景:

  • DRPC
  • Transactional Topology

CoordinatedBolt對于業務是有侵入的,要使用CoordinatedBolt提供的功能,你必須要保證你的每個bolt發送的每個 tuple的***個field是request-id。 所謂的“我已經處理完我的上游”的意思是說當前這個bolt對于當前這個request-id所需要做的工作做完了。這個request-id在DRPC 里面代表一個DRPC請求;在Transactional Topology里面代表一個batch。

#p#

5.1.4 Trasactional Topology

Storm提供的Transactional Topology將batch計算分為process和commit兩個階段。Process階段可以同時處理多個batch,不用保證順序 性;commit階段保證batch的強順序性,并且一次只能處理一個batch,第1個batch成功提交之前,第2個batch不能被提交。

還是以統計消息總數為例,以下代碼來自storm-starter里面的TransactionalGlobalCount。

MemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA,new Fields(“word“), PARTITION_TAKE_PER_BATCH);

TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder(“global-count“, “spout“, spout, 3);

builder.setBolt(“partial-count“, new BatchCount(), 5).noneGrouping(“spout“);

builder.setBolt(“sum“, new UpdateGlobalCount()).globalGrouping(“partial-count“);

TransactionalTopologyBuilder共接收四個參數。

  • 這個Transactional Topology的id。Id用來在Zookeeper中保存當前topology的進度,如果這個topology重啟,可以繼續之前的進度執行。
  •  Spout在這個topology中的id
  • 一個TransactionalSpout。一個Trasactional Topology中只能有一個TrasactionalSpout.在本例中是一個MemoryTransactionalSpout,從一個內存變量(DATA)中讀取數據。
  • TransactionalSpout的并行度(可選)。

下面是BatchCount的定義:

  1. public static class BatchCount extends BaseBatchBolt { 
  2.  
  3.         Object _id; 
  4.  
  5.         BatchOutputCollector _collector; 
  6.  
  7.         int _count = 0
  8.  
  9.         @Override 
  10.  
  11.         public void prepare(Map conf, TopologyContext context, 
  12.  
  13.               BatchOutputCollector collector, Object id) { 
  14.  
  15.             _collector = collector; 
  16.  
  17.             _id = id; 
  18.  
  19.         } 
  20.  
  21.         @Override 
  22.  
  23.         public void execute(Tuple tuple) { 
  24.  
  25.             _count++; 
  26.  
  27.         } 
  28.  
  29.         @Override 
  30.  
  31.         public void finishBatch() { 
  32.  
  33.             _collector.emit(new Values(_id, _count)); 
  34.  
  35.         } 
  36.  
  37.         @Override 
  38.  
  39.         public void declareOutputFields(OutputFieldsDeclarer declarer) { 
  40.  
  41.             declarer.declare(new Fields(“id“, “count“)); 
  42.  
  43.         } 
  44.  

 BatchCount的prepare方法的***一個參數是batch id,在Transactional Tolpoloyg里面這id是一個TransactionAttempt對象。

Transactional Topology里發送的tuple都必須以TransactionAttempt作為***個field,storm根據這個field來判斷tuple屬于哪一個batch。

TransactionAttempt包含兩個值:一個transaction id,一個attempt id。transaction id的作用就是我們上面介紹的對于每個batch中的tuple是唯一的,而且不管這個batch replay多少次都是一樣的。attempt id是對于每個batch唯一的一個id, 但是對于同一個batch,它replay之后的attempt id跟replay之前就不一樣了, 我們可以把attempt id理解成replay-times, storm利用這個id來區別一個batch發射的tuple的不同版本。

execute方法會為batch里面的每個tuple執行一次,你應該把這個batch里面的計算狀態保持在一個本地變量里面。對于這個例子來說, 它在execute方法里面遞增tuple的個數。

***, 當這個bolt接收到某個batch的所有的tuple之后, finishBatch方法會被調用。這個例子里面的BatchCount類會在這個時候發射它的局部數量到它的輸出流里面去。

#p#

下面是UpdateGlobalCount類的定義:

  1. public static class UpdateGlobalCount extends BaseTransactionalBolt 
  2.  
  3. implements ICommitter { 
  4.  
  5.         TransactionAttempt _attempt; 
  6.  
  7.         BatchOutputCollector _collector; 
  8.  
  9.         int _sum = 0
  10.  
  11.         @Override 
  12.  
  13.         public void prepare(Map conf, TopologyContext context, 
  14.  
  15. BatchOutputCollector collector, TransactionAttempt attempt) { 
  16.  
  17.             _collector = collector; 
  18.  
  19.             _attempt = attempt; 
  20.  
  21.         } 
  22.  
  23.         @Override 
  24.  
  25.         public void execute(Tuple tuple) { 
  26.  
  27.             _sum+=tuple.getInteger(1); 
  28.  
  29.         } 
  30.  
  31.         @Override 
  32.  
  33.         public void finishBatch() { 
  34.  
  35.             Value val = DATABASE.get(GLOBAL_COUNT_KEY); 
  36.  
  37.             Value newval; 
  38.  
  39.             if(val == null || !val.txid.equals(_attempt.getTransactionId())) { 
  40.  
  41.                 newval = new Value(); 
  42.  
  43.                 newval.txid = _attempt.getTransactionId(); 
  44.  
  45.                 if(val==null) { 
  46.  
  47.                     newval.count = _sum; 
  48.  
  49.                 } else { 
  50.  
  51.                     newval.count = _sum + val.count; 
  52.  
  53.                 } 
  54.  
  55.                 DATABASE.put(GLOBAL_COUNT_KEY, newval); 
  56.  
  57.             } else { 
  58.  
  59.                 newval = val; 
  60.  
  61.             } 
  62.  
  63.             _collector.emit(new Values(_attempt, newval.count)); 
  64.  
  65.         } 
  66.  
  67.         @Override 
  68.  
  69.         public void declareOutputFields(OutputFieldsDeclarer declarer) { 
  70.  
  71.             declarer.declare(new Fields(“id“, “sum“)); 
  72.  
  73.         } 
  74.  

UpdateGlobalCount實現了ICommitter接口,所以storm只會在commit階段執行finishBatch方法。而execute方法可以在任何階段完成。

在UpdateGlobalCount的finishBatch方法中,將當前的transaction id與數據庫中存儲的id做比較。如果相同,則忽略這個batch;如果不同,則把這個batch的計算結果加到總結果中,并更新數據庫。

Transactional Topolgy運行示意圖如下:

transactional topology

下面總結一下Transactional Topology的一些特性

  •  Transactional Topology將事務性機制都封裝好了,其內部使用CoordinateBolt來保證一個batch中的tuple被處理完。
  •  TransactionalSpout只能有一個,它將所有tuple分為一個一個的batch,而且保證同一個batch的transaction id始終一樣。
  •  BatchBolt處理batch在一起的tuples。對于每一個tuple調用execute方法,而在整個batch處理完成的時候調用finishBatch方法。
  •  如果BatchBolt被標記成Committer,則只能在commit階段調用finishBolt方法。一個batch的commit階 段由storm保證只在前一個batch成功提交之后才會執行。并且它會重試直到topology里面的所有bolt在commit完成提交。
  •  Transactional Topology隱藏了anchor/ack框架,它提供一個不同的機制來fail一個batch,從而使得這個batch被replay。

5.2 Trident介紹

Trident是Storm之上的高級抽象,提供了joins,grouping,aggregations,fuctions和filters等接口。如果你使用過Pig或Cascading,對這些接口就不會陌生。

Trident將stream中的tuples分成batches進行處理,API封裝了對這些batches的處理過程,保證tuple只被處理一次。處理batches中間結果存儲在TridentState對象中。

Trident事務性原理這里不詳細介紹,有興趣的讀者請自行查閱資料。

參考:http://xumingming.sinaapp.com/736/twitter-storm-transactional-topolgoy/

http://xumingming.sinaapp.com/811/twitter-storm-code-analysis-coordinated-bolt/

https://github.com/nathanmarz/storm/wiki/Trident-tutorial

原文鏈接:http://blog.linezing.com/2013/01/storm%E5%85%A5%E9%97%A8%E6%95%99%E7%A8%8B-%E7%AC%AC%E4%BA%94%E7%AB%A0-%E4%B8

責任編輯:陳四芳 來源: blog.linezing.com
相關推薦

2013-12-12 16:37:45

Storm入門教程一致性事務

2014-01-13 11:22:28

storm

2013-08-29 14:12:52

Storm分布式實時計算

2014-01-16 11:14:37

StormTopology

2014-01-16 14:30:43

storm安裝部署

2013-08-29 14:28:09

StormHadoop

2022-08-29 08:38:00

事務一致性

2021-08-13 07:56:13

Raft算法日志

2022-08-11 07:55:05

數據庫Mysql

2017-07-25 14:38:56

數據庫一致性非鎖定讀一致性鎖定讀

2013-09-18 14:46:32

StormStorm集群

2013-12-12 16:14:21

storm入門教程storm消息處理

2014-01-16 15:48:49

storm

2019-09-18 08:41:53

并發扣減一致性redis

2021-03-04 06:49:53

RocketMQ事務

2009-06-18 09:18:08

Oracle檢索數據數據一致性事務恢復

2023-12-01 13:51:21

數據一致性數據庫

2022-12-14 08:23:30

2025-03-05 09:10:00

session開發Web
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 97精品超碰一区二区三区 | 一区二区三区在线免费观看视频 | 亚洲 欧美 日韩 在线 | 91视频www.| 色网在线看 | 一区二区三区在线播放视频 | 黄色三级免费网站 | 91久久精品一区二区二区 | 天天躁日日躁狠狠的躁天龙影院 | 日韩精品专区在线影院重磅 | 成人精品鲁一区一区二区 | 日韩免费一区二区 | v片网站| 中文字幕成人在线 | 亚洲精品久久久久久宅男 | 日韩三级在线 | 国产精品久久久久久久久大全 | 精品欧美色视频网站在线观看 | 免费久久精品 | 波多野结衣电影一区 | 欧美一级精品片在线看 | 亚洲欧美日韩精品久久亚洲区 | 在线观看成人 | 国产日韩欧美在线 | 91文字幕巨乱亚洲香蕉 | av一区二区在线观看 | 欧美啪啪 | 黄视频网址| 国产av毛片 | 欧美精品在线播放 | 精品国产一区二区久久 | 国产美女在线观看 | 国产一区二区视频在线观看 | 日韩a v在线免费观看 | 一区二区三区成人 | 亚洲国产一区二区视频 | 一区二区免费视频 | av一级久久 | 日韩三级| 黄色毛片免费看 | 中文字幕一区二区三区四区 |