成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

技術(shù)干貨分享:HBase數(shù)據(jù)遷移到Kafka實戰(zhàn)

存儲 大數(shù)據(jù) Kafka
在實際的應(yīng)用場景中,數(shù)據(jù)存儲在HBase集群中,但是由于一些特殊的原因,需要將數(shù)據(jù)從HBase遷移到Kafka。正常情況下,一般都是源數(shù)據(jù)到Kafka,再有消費者處理數(shù)據(jù),將數(shù)據(jù)寫入HBase。但是,如果逆向處理,如何將HBase的數(shù)據(jù)遷移到Kafka呢?

1.概述

在實際的應(yīng)用場景中,數(shù)據(jù)存儲在HBase集群中,但是由于一些特殊的原因,需要將數(shù)據(jù)從HBase遷移到Kafka。正常情況下,一般都是源數(shù)據(jù)到Kafka,再有消費者處理數(shù)據(jù),將數(shù)據(jù)寫入HBase。但是,如果逆向處理,如何將HBase的數(shù)據(jù)遷移到Kafka呢?今天筆者就給大家來分享一下具體的實現(xiàn)流程。

技術(shù)干貨分享:HBase數(shù)據(jù)遷移到Kafka實戰(zhàn)

2.內(nèi)容

一般業(yè)務(wù)場景如下,數(shù)據(jù)源頭產(chǎn)生數(shù)據(jù),進(jìn)入Kafka,然后由消費者(如Flink、Spark、Kafka API)處理數(shù)據(jù)后進(jìn)入到HBase。這是一個很典型的實時處理流程。流程圖如下:

 

技術(shù)干貨分享:HBase數(shù)據(jù)遷移到Kafka實戰(zhàn)

上述這類實時處理流程,處理數(shù)據(jù)都比較容易,畢竟數(shù)據(jù)流向是順序處理的。但是,如果將這個流程逆向,那么就會遇到一些問題。

2.1 海量數(shù)據(jù)

HBase的分布式特性,集群的橫向拓展,HBase中的數(shù)據(jù)往往都是百億、千億級別,或者數(shù)量級更大。這類級別的數(shù)據(jù),對于這類逆向數(shù)據(jù)流的場景,會有個很麻煩的問題,那就是取數(shù)問題。如何將這海量數(shù)據(jù)從HBase中取出來?

2.2 沒有數(shù)據(jù)分區(qū)

我們知道HBase做數(shù)據(jù)Get或者List很快,也比較容易。而它又沒有類似Hive這類數(shù)據(jù)倉庫分區(qū)的概念,不能提供某段時間內(nèi)的數(shù)據(jù)。如果要提取最近一周的數(shù)據(jù),可能全表掃描,通過過濾時間戳來獲取一周的數(shù)據(jù)。數(shù)量小的時候,可能問題不大,而數(shù)據(jù)量很大的時候,全表去掃描HBase很困難。

3.解決思路

對于這類逆向數(shù)據(jù)流程,如何處理。其實,我們可以利用HBase Get和List的特性來實現(xiàn)。因為HBase通過RowKey來構(gòu)建了一級索引,對于RowKey級別的取數(shù),速度是很快的。實現(xiàn)流程細(xì)節(jié)如下: 

技術(shù)干貨分享:HBase數(shù)據(jù)遷移到Kafka實戰(zhàn)

數(shù)據(jù)流程如上圖所示,下面筆者為大家來剖析每個流程的實現(xiàn)細(xì)節(jié),以及注意事項。

3.1 Rowkey抽取

我們知道HBase針對Rowkey取數(shù)做了一級索引,所以我們可以利用這個特性來展開。我們可以將海量數(shù)據(jù)中的Rowkey從HBase表中抽取,然后按照我們制定的抽取規(guī)則和存儲規(guī)則將抽取的Rowkey存儲到HDFS上。

這里需要注意一個問題,那就是關(guān)于HBase Rowkey的抽取,海量數(shù)據(jù)級別的Rowkey抽取,建議采用MapReduce來實現(xiàn)。這個得益于HBase提供了TableMapReduceUtil類來實現(xiàn),通過MapReduce任務(wù),將HBase中的Rowkey在map階段按照指定的時間范圍進(jìn)行過濾,在reduce階段將rowkey拆分為多個文件,最后存儲到HDFS上。

這里可能會有同學(xué)有疑問,都用MapReduce抽取Rowkey了,為啥不直接在掃描處理列簇下的列數(shù)據(jù)呢?這里,我們在啟動MapReduce任務(wù)的時候,Scan HBase的數(shù)據(jù)時只過濾Rowkey(利用FirstKeyOnlyFilter來實現(xiàn)),不對列簇數(shù)據(jù)做處理,這樣會快很多。對HBase RegionServer的壓力也會小很多。

  • RowColumnrow001info:namerow001info:agerow001info:sexrow001info:sn

這里舉個例子,比如上表中的數(shù)據(jù),其實我們只需要取出Rowkey(row001)。但是,實際業(yè)務(wù)數(shù)據(jù)中,HBase表描述一條數(shù)據(jù)可能有很多特征屬性(例如姓名、性別、年齡、身份證等等),可能有些業(yè)務(wù)數(shù)據(jù)一個列簇下有十幾個特征,但是他們卻只有一個Rowkey,我們也只需要這一個Rowkey。那么,我們使用FirstKeyOnlyFilter來實現(xiàn)就很合適了。 

  1. /** 
  2.  * A filter that will only return the first KV from each row. 
  3.  * <p> 
  4.  * This filter can be used to more efficiently perform row count operations. 
  5.  */ 

這個是FirstKeyOnlyFilter的一段功能描述,它用于返回第一條KV數(shù)據(jù),官方其實用它來做計數(shù)使用,這里我們稍加改進(jìn),把FirstKeyOnlyFilter用來做抽取Rowkey。

3.2 Rowkey生成

抽取的Rowkey如何生成,這里可能根據(jù)實際的數(shù)量級來確認(rèn)Reduce個數(shù)。建議生成Rowkey文件時,切合實際的數(shù)據(jù)量來算Reduce的個數(shù)。盡量不用為了使用方便就一個HDFS文件,這樣后面不好維護(hù)。舉個例子,比如HBase表有100GB,我們可以拆分為100個文件。

3.3 數(shù)據(jù)處理

在步驟1中,按照抽取規(guī)則和存儲規(guī)則,將數(shù)據(jù)從HBase中通過MapReduce抽取Rowkey并存儲到HDFS上。然后,我們在通過MapReduce任務(wù)讀取HDFS上的Rowkey文件,通過List的方式去HBase中獲取數(shù)據(jù)。拆解細(xì)節(jié)如下: 

技術(shù)干貨分享:HBase數(shù)據(jù)遷移到Kafka實戰(zhàn)

Map階段,我們從HDFS讀取Rowkey的數(shù)據(jù)文件,然后通過批量Get的方式從HBase取數(shù),然后組裝數(shù)據(jù)發(fā)送到Reduce階段。在Reduce階段,獲取來自Map階段的數(shù)據(jù),寫數(shù)據(jù)到Kafka,通過Kafka生產(chǎn)者回調(diào)函數(shù),獲取寫入Kafka狀態(tài)信息,根據(jù)狀態(tài)信息判斷數(shù)據(jù)是否寫入成功。如果成功,記錄成功的Rowkey到HDFS,便于統(tǒng)計成功的進(jìn)度;如果失敗,記錄失敗的Rowkey到HDFS,便于統(tǒng)計失敗的進(jìn)度。

3.4 失敗重跑

通過MapReduce任務(wù)寫數(shù)據(jù)到Kafka中,可能會有失敗的情況,對于失敗的情況,我們只需要記錄Rowkey到HDFS上,當(dāng)任務(wù)執(zhí)行完成后,再去程序檢查HDFS上是否存在失敗的Rowkey文件,如果存在,那么再次啟動步驟3,即讀取HDFS上失敗的Rowkey文件,然后再List HBase中的數(shù)據(jù),進(jìn)行數(shù)據(jù)處理后,最后再寫Kafka,以此類推,直到HDFS上失敗的Rowkey處理完成為止。

4.實現(xiàn)代碼

這里實現(xiàn)的代碼量也并不復(fù)雜,下面提供一個偽代碼,可以在此基礎(chǔ)上進(jìn)行改造(例如Rowkey的抽取、MapReduce讀取Rowkey并批量Get HBase表,然后在寫入Kafka等)。示例代碼如下:

  1. public class MRROW2HDFS { 
  2.  public static void main(String[] args) throws Exception { 
  3.  Configuration config = HBaseConfiguration.create(); // HBase Config info 
  4.  Job job = Job.getInstance(config, "MRROW2HDFS"); 
  5.  job.setJarByClass(MRROW2HDFS.class); 
  6.  job.setReducerClass(ROWReducer.class); 
  7.  String hbaseTableName = "hbase_tbl_name"
  8.  Scan scan = new Scan(); 
  9.  scan.setCaching(1000); 
  10.  scan.setCacheBlocks(false); 
  11.  scan.setFilter(new FirstKeyOnlyFilter()); 
  12.  TableMapReduceUtil.initTableMapperJob(hbaseTableName, scan, ROWMapper.class, Text.class, Text.class, job); 
  13.  FileOutputFormat.setOutputPath(job, new Path("/tmp/rowkey.list")); // input you storage rowkey hdfs path 
  14.  System.exit(job.waitForCompletion(true) ? 0 : 1); 
  15.  } 
  16.  public static class ROWMapper extends TableMapper<Text, Text> { 
  17.  @Override 
  18.  protected void map(ImmutableBytesWritable key, Result value, 
  19.  Mapper<ImmutableBytesWritable, Result, Text, Text>.Context context) 
  20.  throws IOException, InterruptedException { 
  21.  for (Cell cell : value.rawCells()) { 
  22.  // Filter date range 
  23.  // context.write(...); 
  24.  } 
  25.  } 
  26.  } 
  27.   
  28.  public static class ROWReducer extends Reducer<Text,Text,Text,Text>{ 
  29.  private Text result = new Text(); 
  30.   
  31.  @Override 
  32.  protected void reduce(Text key, Iterable<Text> values,Context context) throws IOException, InterruptedException { 
  33.  for(Text val:values){ 
  34.  result.set(val); 
  35.  context.write(key, result); 
  36.  } 
  37.  } 
  38.  } 

5.總結(jié)

整個逆向數(shù)據(jù)處理流程,并不算復(fù)雜,實現(xiàn)也是很基本的MapReduce邏輯,沒有太復(fù)雜的邏輯處理。在處理的過程中,需要幾個細(xì)節(jié)問題,Rowkey生成到HDFS上時,可能存在行位空格的情況,在讀取HDFS上Rowkey文件去List時,最好對每條數(shù)據(jù)做個過濾空格處理。另外,就是對于成功處理Rowkey和失敗處理Rowkey的記錄,這樣便于任務(wù)失敗重跑和數(shù)據(jù)對賬。可以知曉數(shù)據(jù)遷移進(jìn)度和完成情況。同時,我們可以使用 Kafka Eagle 監(jiān)控工具來查看Kafka寫入進(jìn)度。

 

責(zé)任編輯:未麗燕 來源: 今日頭條
相關(guān)推薦

2010-08-20 11:18:49

Exchange Se

2022-07-27 22:48:29

消息中間件RocketMQ架構(gòu)設(shè)計

2012-05-21 10:23:36

2021-07-26 12:10:37

FacebookMySQL 8.0數(shù)據(jù)庫

2020-06-11 08:02:38

VMwareHyper-VOpenStack

2018-07-25 08:57:42

存儲數(shù)據(jù)遷移

2010-11-09 11:12:23

2009-08-06 09:20:30

2016-12-12 19:16:43

數(shù)據(jù)云端

2019-08-08 15:05:26

HBase數(shù)據(jù)遷移命令

2019-03-25 12:20:29

數(shù)據(jù)MySQL性能測試

2018-02-02 16:15:02

Hadoop數(shù)據(jù)遷移集群

2010-11-17 09:18:47

私有云遷移

2010-08-12 15:10:17

Flex4

2013-11-29 10:45:03

MySQLNoSQLHBase

2010-08-09 12:47:00

Flex4beta

2018-10-29 13:07:15

HBase存儲遷移

2011-11-10 13:44:13

VMwareKVM遷移

2019-07-15 16:10:00

技術(shù)研發(fā)指標(biāo)

2012-05-18 10:03:32

VMware
點贊
收藏

51CTO技術(shù)棧公眾號

主站蜘蛛池模板: 玖玖国产精品视频 | 波多野吉衣久久 | 91xxx在线观看 | 久久免费资源 | 91久久久久| 精品乱人伦一区二区三区 | 亚洲欧美日韩中文字幕一区二区三区 | 午夜精品在线观看 | 欧美日韩一区二区三区视频 | 欧美一级片在线观看 | 国产福利在线 | 国产1区2区3区 | 偷拍亚洲色图 | 一区二区三区免费观看 | 精品国产乱码久久久久久闺蜜 | 韩日在线| 日韩一区二区三区精品 | 国产午夜精品一区二区三区嫩草 | 男人的天堂久久 | 在线观看久草 | 日韩欧美国产不卡 | 久久亚洲一区 | 完全免费av在线 | 欧美久久精品一级c片 | 在线综合视频 | 久久黄色 | 91亚洲国产成人久久精品网站 | 成人午夜免费福利视频 | 三级成人在线 | 羞羞在线观看视频 | 国产精品毛片一区二区三区 | 日韩精品视频一区二区三区 | 色视频欧美 | 国产一区二区影院 | 国产网站在线播放 | 日韩国产欧美 | 黄色免费在线观看网址 | 日本在线播放一区二区 | 中文字幕人成乱码在线观看 | 欧美一区二区三区精品 | 影音先锋成人资源 |