UV、PV計算,因為業務需求不同,通常會分為兩種場景:
離線計算場景:以T+1為主,計算歷史資料
實時計算場景:實時計算日常新增的資料,對使用者標籤去重
針對離線計算場景,Hologres基於RoaringBitmap,提供超高基數的UV計算,只需進行一次最細粒度的預聚合計算,也只生成一份最細粒度的預聚合結果表,就能達到亞秒級查詢。具體詳情可以參見往期文章>>Hologres如何支援超高基數UV計算(基於RoaringBitmap實現)
對於實時計算場景,可以使用Flink+Hologres方式,並基於RoaringBitmap,實時對使用者標籤去重。這樣的方式,可以較細粒度的實時得到使用者UV、PV資料,同時便於根據需求調整最小統計視窗(如最近5分鐘的UV),實現類似實時監控的效果,更好的在大屏等BI展示。相較於以天、周、月等為單位的去重,更適合在活動日期進行更細粒度的統計,並且透過簡單的聚合,也可以得到較大時間單位的統計結果。
主體思想
Flink將流式資料轉化為表與維表進行JOIN操作,再轉化為流式資料。此舉可以利用Hologres維表的
insertIfNotExists
特性結合自增欄位實現高效的uid對映。
Flink把關聯的結果資料按照時間視窗進行處理,根據查詢維度使用RoaringBitmap進行聚合,並將查詢維度以及聚合的uid存放在聚合結果表,其中聚合出的uid結果放入Hologres的RoaringBitmap型別的欄位中。
查詢時,與離線方式相似,直接按照查詢條件查詢聚合結果表,並對其中關鍵的RoaringBitmap欄位做or運算後並統計基數,即可得出對應使用者數。
處理流程如下圖所示
方案最佳實踐
1.建立相關基礎表
1)建立表uid_mapping為uid對映表,用於對映uid到32位int型別。
RoaringBitmap型別要求使用者ID必須是32位int型別且越稠密越好(即使用者ID最好連續)。常見的業務系統或者埋點中的使用者ID很多是字串型別或Long型別,因此需要使用uid_mapping型別構建一張對映表。對映表利用Hologres的SERIAL型別(自增的32位int)來實現使用者對映的自動管理和穩定對映。
由於是實時資料, 設定該表為行存表,以提高Flink維表實時JOIN的QPS。
BEGIN;CREATE TABLE public。uid_mapping (uid text NOT NULL,uid_int32 serial,PRIMARY KEY (uid));——將uid設為clustering_key和distribution_key便於快速查詢其對應的int32值CALL set_table_property(‘public。uid_mapping’, ‘clustering_key’, ‘uid’);CALL set_table_property(‘public。uid_mapping’, ‘distribution_key’, ‘uid’);CALL set_table_property(‘public。uid_mapping’, ‘orientation’, ‘row’);COMMIT;
2)建立表dws_app為基礎聚合表,用於存放在基礎維度上聚合後的結果。
使用RoaringBitmap前需要建立RoaringBitmap extention,同時也需要Hologres例項為0。10版本
CREATE EXTENSION IF NOT EXISTS roaringbitmap;
為了更好效能,建議根據基礎聚合表資料量合理的設定Shard數,但建議基礎聚合表的Shard數設定不超過計算資源的Core數。推薦使用以下方式透過Table Group來設定Shard數
——新建shard數為16的Table Group,——因為測試資料量百萬級,其中後端計算資源為100core,設定shard數為16BEGIN;CREATE TABLE tg16 (a int); ——Table Group哨兵表call set_table_property(‘tg16’, ‘shard_count’, ‘16’); COMMIT;
相比離線結果表,此結果表增加了時間戳欄位,用於實現以Flink視窗週期為單位的統計。結果表DDL如下:
BEGIN;create table dws_app( country text, prov text, city text, ymd text NOT NULL, ——日期欄位 timetz TIMESTAMPTZ, ——統計時間戳,可以實現以Flink視窗週期為單位的統計 uid32_bitmap roaringbitmap, —— 使用roaringbitmap記錄uv primary key(country, prov, city, ymd, timetz)——查詢維度和時間作為主鍵,防止重複插入資料);CALL set_table_property(‘public。dws_app’, ‘orientation’, ‘column’);——日期欄位設為clustering_key和event_time_column,便於過濾CALL set_table_property(‘public。dws_app’, ‘clustering_key’, ‘ymd’);CALL set_table_property(‘public。dws_app’, ‘event_time_column’, ‘ymd’);——等價於將表放在shard數為16的table groupcall set_table_property(‘public。dws_app’, ‘colocate_with’, ‘tg16’);——group by欄位設為distribution_keyCALL set_table_property(‘public。dws_app’, ‘distribution_key’, ‘country,prov,city’);COMMIT;
2.Flink實時讀取資料並更新dws_app基礎聚合表
完整示例原始碼請見alibabacloud-hologres-connectors examples
1)Flink 流式讀取資料來源(DataStream),並轉化為源表(Table)
//此處使用csv檔案作為資料來源,也可以是kafka等DataStreamSource odsStream = env。createInput(csvInput, typeInfo);// 與維表join需要新增proctime欄位,詳見https://help。aliyun。com/document_detail/62506。htmlTable odsTable = tableEnv。fromDataStream( odsStream, $(“uid”), $(“country”), $(“prov”), $(“city”), $(“ymd”), $(“proctime”)。proctime());// 註冊到catalog環境tableEnv。createTemporaryView(“odsTable”, odsTable);
2)將源表與Hologres維表(uid_mapping)進行關聯
其中維表使用insertIfNotExists引數,即查詢不到資料時自行插入,uid_int32欄位便可以利用Hologres的serial型別自增建立。
// 建立Hologres維表,其中nsertIfNotExists表示查詢不到則自行插入String createUidMappingTable = String。format( “create table uid_mapping_dim(” + “ uid string,” + “ uid_int32 INT” + “) with (” + “ ‘connector’=‘hologres’,” + “ ‘dbname’ = ‘%s’,” //Hologres DB名 + “ ‘tablename’ = ‘%s’,”//Hologres 表名 + “ ‘username’ = ‘%s’,” //當前賬號access id + “ ‘password’ = ‘%s’,” //當前賬號access key + “ ‘endpoint’ = ‘%s’,” //Hologres endpoint + “ ‘insertifnotexists’=‘true’” + “)”, database, dimTableName, username, password, endpoint);tableEnv。executeSql(createUidMappingTable);// 源表與維表joinString odsJoinDim = “SELECT ods。country, ods。prov, ods。city, ods。ymd, dim。uid_int32” + “ FROM odsTable AS ods JOIN uid_mapping_dim FOR SYSTEM_TIME AS OF ods。proctime AS dim” + “ ON ods。uid = dim。uid”;Table joinRes = tableEnv。sqlQuery(odsJoinDim);
3)將關聯結果轉化為DataStream,透過Flink時間視窗處理,結合RoaringBitmap進行聚合
DataStream
4)寫入結果表
需要注意的是,Hologres中RoaringBitmap型別在Flink中對應Byte陣列型別
// 計算結果轉換為表Table resTable = tableEnv。fromDataStream( processedSource, $(“country”), $(“prov”), $(“city”), $(“ymd”), $(“timest”), $(“uid32_bitmap”));// 建立Hologres結果表, 其中Hologres的RoaringBitmap型別透過Byte陣列存入String createHologresTable = String。format( “create table sink(” + “ country string,” + “ prov string,” + “ city string,” + “ ymd string,” + “ timetz timestamp,” + “ uid32_bitmap BYTES” + “) with (” + “ ‘connector’=‘hologres’,” + “ ‘dbname’ = ‘%s’,” + “ ‘tablename’ = ‘%s’,” + “ ‘username’ = ‘%s’,” + “ ‘password’ = ‘%s’,” + “ ‘endpoint’ = ‘%s’,” + “ ‘connectionSize’ = ‘%s’,” + “ ‘mutatetype’ = ‘insertOrReplace’” + “)”, database, dwsTableName, username, password, endpoint, connectionSize);tableEnv。executeSql(createHologresTable);// 寫入計算結果到dws表tableEnv。executeSql(“insert into sink select * from ” + resTable);
3.資料查詢
查詢時,從基礎聚合表(dws_app)中按照查詢維度做聚合計算,查詢bitmap基數,得出group by條件下的使用者數
查詢某天內各個城市的uv
——執行下面RB_AGG運算查詢,可執行引數先關閉三階段聚合開關(預設關閉),效能更好set hg_experimental_enable_force_three_stage_agg=off SELECT country ,prov ,city ,RB_CARDINALITY(RB_OR_AGG(uid32_bitmap)) AS uvFROM dws_appWHERE ymd = ‘20210329’GROUP BY country ,prov ,city;
查詢某段時間內各個省份的uv
——執行下面RB_AGG運算查詢,可執行引數先關閉三階段聚合開關(預設關閉),效能更好set hg_experimental_enable_force_three_stage_agg=off SELECT country ,prov ,RB_CARDINALITY(RB_OR_AGG(uid32_bitmap)) AS uvFROM dws_appWHERE time > ‘2021-04-19 18:00:00+08’ and time < ‘2021-04-19 19:00:00+08’GROUP BY country ,prov;
本文為阿里雲原創內容,未經允許不得轉載。