奧推網

選單
財經

Flink+Hologres 億級使用者實時UV精確去重最佳實踐

UV、PV計算,因為業務需求不同,通常會分為兩種場景:

離線計算場景:以T+1為主,計算歷史資料

實時計算場景:實時計算日常新增的資料,對使用者標籤去重

針對離線計算場景,Hologres基於RoaringBitmap,提供超高基數的UV計算,只需進行一次最細粒度的預聚合計算,也只生成一份最細粒度的預聚合結果表,就能達到亞秒級查詢。具體詳情可以參見往期文章>>Hologres如何支援超高基數UV計算(基於RoaringBitmap實現)

對於實時計算場景,可以使用Flink+Hologres方式,並基於RoaringBitmap,實時對使用者標籤去重。這樣的方式,可以較細粒度的實時得到使用者UV、PV資料,同時便於根據需求調整最小統計視窗(如最近5分鐘的UV),實現類似實時監控的效果,更好的在大屏等BI展示。相較於以天、周、月等為單位的去重,更適合在活動日期進行更細粒度的統計,並且透過簡單的聚合,也可以得到較大時間單位的統計結果。

主體思想

Flink將流式資料轉化為表與維表進行JOIN操作,再轉化為流式資料。此舉可以利用Hologres維表的

insertIfNotExists

特性結合自增欄位實現高效的uid對映。

Flink把關聯的結果資料按照時間視窗進行處理,根據查詢維度使用RoaringBitmap進行聚合,並將查詢維度以及聚合的uid存放在聚合結果表,其中聚合出的uid結果放入Hologres的RoaringBitmap型別的欄位中。

查詢時,與離線方式相似,直接按照查詢條件查詢聚合結果表,並對其中關鍵的RoaringBitmap欄位做or運算後並統計基數,即可得出對應使用者數。

處理流程如下圖所示

方案最佳實踐

1.建立相關基礎表

1)建立表uid_mapping為uid對映表,用於對映uid到32位int型別。

RoaringBitmap型別要求使用者ID必須是32位int型別且越稠密越好(即使用者ID最好連續)。常見的業務系統或者埋點中的使用者ID很多是字串型別或Long型別,因此需要使用uid_mapping型別構建一張對映表。對映表利用Hologres的SERIAL型別(自增的32位int)來實現使用者對映的自動管理和穩定對映。

由於是實時資料, 設定該表為行存表,以提高Flink維表實時JOIN的QPS。

BEGIN;CREATE TABLE public。uid_mapping (uid text NOT NULL,uid_int32 serial,PRIMARY KEY (uid));——將uid設為clustering_key和distribution_key便於快速查詢其對應的int32值CALL set_table_property(‘public。uid_mapping’, ‘clustering_key’, ‘uid’);CALL set_table_property(‘public。uid_mapping’, ‘distribution_key’, ‘uid’);CALL set_table_property(‘public。uid_mapping’, ‘orientation’, ‘row’);COMMIT;

2)建立表dws_app為基礎聚合表,用於存放在基礎維度上聚合後的結果。

使用RoaringBitmap前需要建立RoaringBitmap extention,同時也需要Hologres例項為0。10版本

CREATE EXTENSION IF NOT EXISTS roaringbitmap;

為了更好效能,建議根據基礎聚合表資料量合理的設定Shard數,但建議基礎聚合表的Shard數設定不超過計算資源的Core數。推薦使用以下方式透過Table Group來設定Shard數

——新建shard數為16的Table Group,——因為測試資料量百萬級,其中後端計算資源為100core,設定shard數為16BEGIN;CREATE TABLE tg16 (a int); ——Table Group哨兵表call set_table_property(‘tg16’, ‘shard_count’, ‘16’); COMMIT;

相比離線結果表,此結果表增加了時間戳欄位,用於實現以Flink視窗週期為單位的統計。結果表DDL如下:

BEGIN;create table dws_app( country text, prov text, city text, ymd text NOT NULL, ——日期欄位 timetz TIMESTAMPTZ, ——統計時間戳,可以實現以Flink視窗週期為單位的統計 uid32_bitmap roaringbitmap, —— 使用roaringbitmap記錄uv primary key(country, prov, city, ymd, timetz)——查詢維度和時間作為主鍵,防止重複插入資料);CALL set_table_property(‘public。dws_app’, ‘orientation’, ‘column’);——日期欄位設為clustering_key和event_time_column,便於過濾CALL set_table_property(‘public。dws_app’, ‘clustering_key’, ‘ymd’);CALL set_table_property(‘public。dws_app’, ‘event_time_column’, ‘ymd’);——等價於將表放在shard數為16的table groupcall set_table_property(‘public。dws_app’, ‘colocate_with’, ‘tg16’);——group by欄位設為distribution_keyCALL set_table_property(‘public。dws_app’, ‘distribution_key’, ‘country,prov,city’);COMMIT;

2.Flink實時讀取資料並更新dws_app基礎聚合表

完整示例原始碼請見alibabacloud-hologres-connectors examples

1)Flink 流式讀取資料來源(DataStream),並轉化為源表(Table)

//此處使用csv檔案作為資料來源,也可以是kafka等DataStreamSource odsStream = env。createInput(csvInput, typeInfo);// 與維表join需要新增proctime欄位,詳見https://help。aliyun。com/document_detail/62506。htmlTable odsTable = tableEnv。fromDataStream( odsStream, $(“uid”), $(“country”), $(“prov”), $(“city”), $(“ymd”), $(“proctime”)。proctime());// 註冊到catalog環境tableEnv。createTemporaryView(“odsTable”, odsTable);

2)將源表與Hologres維表(uid_mapping)進行關聯

其中維表使用insertIfNotExists引數,即查詢不到資料時自行插入,uid_int32欄位便可以利用Hologres的serial型別自增建立。

// 建立Hologres維表,其中nsertIfNotExists表示查詢不到則自行插入String createUidMappingTable = String。format( “create table uid_mapping_dim(” + “ uid string,” + “ uid_int32 INT” + “) with (” + “ ‘connector’=‘hologres’,” + “ ‘dbname’ = ‘%s’,” //Hologres DB名 + “ ‘tablename’ = ‘%s’,”//Hologres 表名 + “ ‘username’ = ‘%s’,” //當前賬號access id + “ ‘password’ = ‘%s’,” //當前賬號access key + “ ‘endpoint’ = ‘%s’,” //Hologres endpoint + “ ‘insertifnotexists’=‘true’” + “)”, database, dimTableName, username, password, endpoint);tableEnv。executeSql(createUidMappingTable);// 源表與維表joinString odsJoinDim = “SELECT ods。country, ods。prov, ods。city, ods。ymd, dim。uid_int32” + “ FROM odsTable AS ods JOIN uid_mapping_dim FOR SYSTEM_TIME AS OF ods。proctime AS dim” + “ ON ods。uid = dim。uid”;Table joinRes = tableEnv。sqlQuery(odsJoinDim);

3)將關聯結果轉化為DataStream,透過Flink時間視窗處理,結合RoaringBitmap進行聚合

DataStream> processedSource = source // 篩選需要統計的維度(country, prov, city, ymd) 。keyBy(0, 1, 2, 3) // 滾動時間視窗;此處由於使用讀取csv模擬輸入流,採用ProcessingTime,實際使用中可使用EventTime 。window(TumblingProcessingTimeWindows。of(Time。minutes(5))) // 觸發器,可以在視窗未結束時獲取聚合結果 。trigger(ContinuousProcessingTimeTrigger。of(Time。minutes(1))) 。aggregate( // 聚合函式,根據key By篩選的維度,進行聚合 new AggregateFunction< Tuple5, RoaringBitmap, RoaringBitmap>() { @Override public RoaringBitmap createAccumulator() { return new RoaringBitmap(); } @Override public RoaringBitmap add( Tuple5 in, RoaringBitmap acc) { // 將32位的uid新增到RoaringBitmap進行去重 acc。add(in。f4); return acc; } @Override public RoaringBitmap getResult(RoaringBitmap acc) { return acc; } @Override public RoaringBitmap merge( RoaringBitmap acc1, RoaringBitmap acc2) { return RoaringBitmap。or(acc1, acc2); } }, //視窗函式,輸出聚合結果 new WindowFunction< RoaringBitmap, Tuple6, Tuple, TimeWindow>() { @Override public void apply( Tuple keys, TimeWindow timeWindow, Iterable iterable, Collector< Tuple6> out) throws Exception { RoaringBitmap result = iterable。iterator()。next(); // 最佳化RoaringBitmap result。runOptimize(); // 將RoaringBitmap轉化為位元組陣列以存入Holo中 byte[] byteArray = new byte[result。serializedSizeInBytes()]; result。serialize(ByteBuffer。wrap(byteArray)); // 其中 Tuple6。f4(Timestamp) 欄位表示以視窗長度為週期進行統計,以秒為單位 out。collect( new Tuple6<>( keys。getField(0), keys。getField(1), keys。getField(2), keys。getField(3), new Timestamp( timeWindow。getEnd() / 1000 * 1000), byteArray)); } });

4)寫入結果表

需要注意的是,Hologres中RoaringBitmap型別在Flink中對應Byte陣列型別

// 計算結果轉換為表Table resTable = tableEnv。fromDataStream( processedSource, $(“country”), $(“prov”), $(“city”), $(“ymd”), $(“timest”), $(“uid32_bitmap”));// 建立Hologres結果表, 其中Hologres的RoaringBitmap型別透過Byte陣列存入String createHologresTable = String。format( “create table sink(” + “ country string,” + “ prov string,” + “ city string,” + “ ymd string,” + “ timetz timestamp,” + “ uid32_bitmap BYTES” + “) with (” + “ ‘connector’=‘hologres’,” + “ ‘dbname’ = ‘%s’,” + “ ‘tablename’ = ‘%s’,” + “ ‘username’ = ‘%s’,” + “ ‘password’ = ‘%s’,” + “ ‘endpoint’ = ‘%s’,” + “ ‘connectionSize’ = ‘%s’,” + “ ‘mutatetype’ = ‘insertOrReplace’” + “)”, database, dwsTableName, username, password, endpoint, connectionSize);tableEnv。executeSql(createHologresTable);// 寫入計算結果到dws表tableEnv。executeSql(“insert into sink select * from ” + resTable);

3.資料查詢

查詢時,從基礎聚合表(dws_app)中按照查詢維度做聚合計算,查詢bitmap基數,得出group by條件下的使用者數

查詢某天內各個城市的uv

——執行下面RB_AGG運算查詢,可執行引數先關閉三階段聚合開關(預設關閉),效能更好set hg_experimental_enable_force_three_stage_agg=off SELECT country ,prov ,city ,RB_CARDINALITY(RB_OR_AGG(uid32_bitmap)) AS uvFROM dws_appWHERE ymd = ‘20210329’GROUP BY country ,prov ,city;

查詢某段時間內各個省份的uv

——執行下面RB_AGG運算查詢,可執行引數先關閉三階段聚合開關(預設關閉),效能更好set hg_experimental_enable_force_three_stage_agg=off SELECT country ,prov ,RB_CARDINALITY(RB_OR_AGG(uid32_bitmap)) AS uvFROM dws_appWHERE time > ‘2021-04-19 18:00:00+08’ and time < ‘2021-04-19 19:00:00+08’GROUP BY country ,prov;

本文為阿里雲原創內容,未經允許不得轉載。