使用Spark Streaming SQL進行PV/UV統計
1.背景介紹
PV/UV統計是流式分析一個常見的場景。通過PV可以對訪問的網站做流量或熱點分析,例如廣告主可以通過PV值預估投放廣告網頁所帶來的流量以及廣告收入。另外一些場景需要對訪問的用戶作分析,比如分析用戶的網頁點擊行為,此時就需要對UV做統計。
使用Spark Streaming SQL,并結合Redis可以很方便進行PV/UV的統計。本文將介紹通過Streaming SQL消費Loghub中存儲的用戶訪問信息,對過去1分鐘內的數據進行PV/UV統計,將結果存入Redis中。
2.準備工作
- 創建E-MapReduce 3.23.0以上版本的Hadoop集群。
- 下載并編譯E-MapReduce-SDK包
- git clone git@github.com:aliyun/aliyun-emapreduce-sdk.git
- cd aliyun-emapreduce-sdk
- git checkout -b master-2.x origin/master-2.x
- mvn clean package -DskipTests
編譯完后, assembly/target目錄下會生成emr-datasources_shaded_${version}.jar,其中${version}為sdk的版本。
數據源
本文采用Loghub作為數據源,有關日志采集、日志解析請參考日志服務。
3.統計PV/UV
一般場景下需要將統計出的PV/UV以及相應的統計時間存入Redis。其他一些業務場景中,也會只保存最新結果,用新的結果不斷覆蓋更新舊的數據。以下首先介紹第一種情況的操作流程。
3.1啟動客戶端
命令行啟動streaming-sql客戶端
- streaming-sql --master yarn-client --num-executors 2 --executor-memory 2g --executor-cores 2 --jars emr-datasources_shaded_2.11-${version}.jar --driver-class-path emr-datasources_shaded_2.11-${version}.jar
也可以創建SQL語句文件,通過streaming-sql -f的方式運行。
3.1定義數據表
數據源表定義如下
- CREATE TABLE loghub_source(user_ip STRING, __time__ TIMESTAMP)
- USING loghub
- OPTIONS(
- sls.project=${sls.project},
- sls.store=${sls.store},
- access.key.id=${access.key.id},
- access.key.secret=${access.key.secret},
- endpoint=${endpoint});
其中,數據源表包含user_ip和__time__兩個字段,分別代表用戶的IP地址和loghub上的時間列。OPTIONS中配置項的值根據實際配置。
結果表定義如下
- CREATE TABLE redis_sink
- USING redis
- OPTIONS(
- table='statistic_info',
- host=${redis_host},
- key.column='interval');
其中,statistic_info為Redis存儲結果的表名,interval對應統計結果中的interval字段;配置項${redis_host}的值根據實際配置。
3.2創建流作業
- CREATE SCAN loghub_scan
- ON loghub_source
- USING STREAM
- OPTIONS(
- watermark.column='__time__',
- watermark.delayThreshold='10 second');
- CREATE STREAM job
- OPTIONS(
- checkpointLocation=${checkpoint_location})
- INSERT INTO redis_sink
- SELECT COUNT(user_ip) AS pv, approx_count_distinct( user_ip) AS uv, window.end AS interval
- FROM loghub_scan
- GROUP BY TUMBLING(__time__, interval 1 minute), window;
4.3查看統計結果
最終的統計結果如下圖所示
可以看到,每隔一分鐘都會生成一條數據,key的形式為表名:interval,value為pv和uv的值。
3.4實現覆蓋更新
將結果表的配置項key.column修改為一個固定的值,例如定義如下
- CREATE TABLE redis_sink
- USING redis
- OPTIONS(
- table='statistic_info',
- host=${redis_host},
- key.column='statistic_type');
創建流作業的SQL改為
- CREATE STREAM job
- OPTIONS(
- checkpointLocation='/tmp/spark-test/checkpoint')
- INSERT INTO redis_sink
- SELECT "PV_UV" as statistic_type,COUNT(user_ip) AS pv, approx_count_distinct( user_ip) AS uv, window.end AS interval
- FROM loghub_scan
- GROUP BY TUMBLING(__time__, interval 1 minute), window;
最終的統計結果如下圖所示
可以看到,Redis中值保留了一個值,這個值每分鐘都被更新,value包含pv、uv和interval的值。
4.總結
本文簡要介紹了使用Streaming SQL結合Redis實現流式處理中統計PV/UV的需求。后續文章,我將介紹Spark Streaming SQL的更多內容。