頂象工程師手記:Cassandra在風控數據處理中的實踐
Cassandra是一套開源分布式NoSQL數據庫系統。由Facebook開發,主要用于儲存收件箱等簡單格式數據,集GoogleBigTable的數據模型與Amazon Dynamo的完全分布式的架構于一身。
2008年,Facebook將 Cassandra 開源,并被Digg、Twitter等知名公司引入,成為了一種流行的分布式結構化數據存儲方案。
Cassandra是一個混合型的非關系的數據庫,類似于Google的BigTable。其主要功能比Dynamo (分布式的Key-Value存儲系統)更豐富,但支持度卻不如文檔存儲MongoDB(介于關系數據庫和非關系數據庫之間的開源產品,是非關系數據庫當中功能最豐富,最像關系數據庫的。支持的數據結構非常松散,是類似json的bjson格式,因此可以存儲比較復雜的數據類型)。

Cassandra 可以用到哪里?
Cassandra具有常見NoSQL 分布式數據庫,其自帶的命令行工具完備,兼容性強,甚至Windows機器都可以安裝,因此維護、升級都相對比較簡單。它有著很人性化的Web管理界面,兼容大部分SQL語法。可以設置多個“中心”,這一點跟Hadoop based的HBase / Hive 很不一樣。
Cassandra讀寫性能和可擴展非常好。由于它是一堆數據庫節點共同構成的一個分布式網絡服務,只要對Cassandra 的一個節點就行寫操作,會被復制到其他節點上去。同理,對Cassandra的一個節點進行讀操作,也會被路由到某個節點上面去讀取。因此,對于一個Cassandra群集來說,擴展性能是比較簡單的事情,只管在群集里面添加節點就可以了。非常適合金融、電商等記錄系統日志、產品的目錄管理、實時數據存儲等等要求高的行業。
我們把Cassandra應用在了數據對象服務的數據提供上,由于有多個“中心”,又減少了對于Hadoop環境的依賴,在維護管理上會非常便捷,又能大幅提升數據的存儲以及查詢性能。
Cassandra docker安裝初體驗
啟動Cassandra docker實例:
$ docker run --name some-cassandra -d cassandra:tag
some-cassandra指定的容器名稱,tag是指定所需Cassandra版本的標記。具體參考docker hub
建立集群以及在已有集群上擴展機器:
假設***臺機器的IP地址是10.42.42.42第二臺機器人的IP地址10.43.43.43,請使用公開的八卦端口啟動***臺機器:
$ docker run --name some-cassandra -d -e CASSANDRA_BROADCAST_ADDRESS=10.42.42.42 -p 7000:7000 cassandra:tag
然后在第二臺機器上啟動一個Cassandra容器,暴露的八卦端口和種子指向***臺機器:
$ docker run --name some-cassandra -d -e CASSANDRA_BROADCAST_ADDRESS=10.43.43.43 -p 7000:7000 -e CASSANDRA_SEEDS=10.42.42.42 cassandra:tag
這樣就建立好了一個倆臺Cassandra機器的集群了,可以進行cqls的體驗了。
Cassandra應用實踐
Cassandra沒有像BigTable或Hbase那樣選擇中心控制節點,而選擇了無中心的P2P架構,網絡中的所有節點都是對等的,它們構成了一個環,節點之間通過P2P協議每秒鐘交換一次數據,這樣每個節點都擁有其它所有節點的信息,包括位置、狀態等,如下圖所示

客戶端可以連接集群中的任一個節點來連接整個集群,和客戶端建立連接的節點叫協作者(coordinator),Cassandra相當于一個代理,負責定位該次請求要發到哪些實際擁有本次請求所需數據的節點上去獲取,但如何獲取并返回,主要根據客戶端要求的一致性級別(Consistency Level)來確定。
比如:ONE指只要有一個節點返回數據就可以對客戶端做出響應,QUONUM指需要返回幾個根據用戶的配置數目,ALL指等于數據復制份數的所有節點都返回結果才能向客戶端做出響應,對于數據一致性要求不是特別高的可以選擇ONE,這是最快的一種方式。
Cassandra的數據分發和復制通常是一起的,數據用表的形式來組織,用主鍵來識別應該存儲到哪些節點上,行的copy稱作replica。當一個集群被創建時,至少要指定如下幾個配置:Virtual Nodes,Partitioner,Replication Strategy,Snitch。
數據復制策略有兩種,一種是SimpleStrategy,適合一個數據中心的情況,***份數據放在Partitioner確定的節點,后面的放在順時針找到的節點上,它不考慮跨數據中心和機架的復制。另外一種是NetworkTopologyStargegy,***份數據和前一種一樣,第二份復制的數據放在不同的機架上,每個數據中心可以有不同數據的replicas。
Partitioner策略有三種,默認是Murmur3Partitioner,使用MurmurHash。RandomPartitioner,使用Md5 Hash。ByteOrderedPartitioner使用數據的字節進行有順分區。Cassandra默認使用MurmurHash,這種有更高的性能。
那么如果Cassandra集群動態擴展怎么辦?數據怎么流動呢?如果是依次按順序流動那么效率非常低下。這里就要提到Cassandra 的一個Vnode (virtual nodes)概念。意思就是把一個節點分成默認是256個Vnode來擁有較多不連續的hash值范圍以達到數據的負載的目的。類似于下圖:

Cassandra 的寫請求
先為了能夠持久化與宕機恢復會寫入CommitLog
– 對應的配置: commitlog_directory
同時寫入Memtable
– Memtable : 內存之中的數據結構
每當Memtable的數據達到一定條件時將數據Flush到SSTable
– 條件在配置文件之中定義
• memtable_heap_space_in_mb
• memtable_offheap_space_in_mb
– SSTable
• 真正存儲到了硬盤之上:data_file_directories
• SSTable是不可變的,每次會將SSTable完全刪除再新寫一個
Flush之后,CommitLog會被自動刪除

如果客戶端配置了Consistency Level是ONE,意味著只要有一個節點寫入成功,就由代理節點(Coordinator)返回給客戶端寫入完成。當然這中間有可能會出現其它節點寫入失敗的情況,Cassandra自己會通過Hinted Handoff或Read Repair 或者Anti-entropy Node Repair方式保證數據最終一致性。
Cassandra的讀請求
上面提到,Cassandra在讀取數據有優勢。在讀取時,Cassandra首先檢查Bloom filter,每一個SSTable都有一個Bloom filter用來檢查partition key是否在這個SSTable,這一步是在訪問任何磁盤IO的前面就會做掉。如果存在,再檢查partition key cache,然后再做如下操作:
如果在cache中能找到索引,到compression offset map中找擁有這個數據的數據塊,從磁盤上取得壓縮數據并返回結果集。如果在cache中找不到索引,搜索partition summary確定索引在磁盤上的大概位置,然后獲取索引入口,在SSTable上執行一次單獨的尋道和一個順序的列讀取操作,下面也是到compression offset map中找擁有這個數據的數據塊,從磁盤上取得壓縮數據并返回結果集。讀取數據時會合并Memtable中緩存的數據、多個SSTable中的數據,才返回最終的結果。如下圖:

讀請求(Read Request)分兩種,一種是Rirect Read Request,根據客戶端配置的Consistency Level讀取到數據即可返回客戶端結果。一種是Background Read Repair Request,除了直接請求到達的節點外,會被發送到其它復制節點,用于修復之前寫入有問題的節點,保證數據最終一致性。客戶端讀取時,Coordinator首先聯系Consistency Level定義的節點,發送請求到最快響應的復制節點上,返回請求的數據。如果有多個節點被聯系,會在內存比較每個復制節點傳過的數據行,如果不一致選取最近的數據(根據時間戳)返回給客戶端,并在后臺更新過期的復制節點,這個過程被稱作Read Repair。
Cassandra的數據整理
更新操作不會立即更新,這樣會導致隨機讀寫磁盤,效率不高,Cassandra會把數據順序寫入到一個新的SSTable,并打上一個時間戳以標明數據的新舊。它也不會立馬做刪除操作,而是用Tombstone來標記要刪除的數據。Compaction時,將多個SSTable文件中的數據整合到新的SSTable文件中,當舊SSTable上的讀請求一完成,會被立即刪除,空余出來的空間可以重新利用。雖然Compcation沒有隨機的IO訪問,但還是一個重量級的操作,一般在后臺運行,并通過限制它的吞吐量來控制,`compaction throughput mb per sec參數可以設置,默認是16M/s。另外,如果key cache顯示整理后的數據是熱點數據,操作系統會把它放入到page cache里,以提升性能。它的合并的策略有以下兩種:
SizeTieredCompactionStrategy :每次更新不會直接更新原來的數據,這樣會造成隨機訪問磁盤,性能不高,而是在插入或更新直接寫入下一個sstable,這樣是順序寫入速度非常快,適合寫敏感的操作。但是,因為數據分布在多個sstable,讀取時需要多次磁盤尋道,讀取的性能不高。為了避免這樣情況,會定期在后臺將相似大小的sstable進行合并,這個合并速度也會很快,默認情況是4個sstable會合并一次,合并時如果沒有過期的數據要清理掉,會需要一倍的空間,因此最壞情況需要50%的空閑磁盤。
LeveledCompactionStrategy:創建固定大小默認是5M的sstable,最上面一級為L0下面為L1,下面一層是上面一層的10倍大小。這種整理策略讀取非常快,適合讀敏感的情況,最壞只需要10%的空閑磁盤空間,它參考了LevelDB的實現。
后記
從一開始的搭建Cassandra的demo,到擴展到生產環境上,已經充分的體驗到了Cassandra的易用和可擴展性。它極大的降低了環境的配置和解決問題之間的運維時間,能把更多的時間轉到實際開發中。