From a6a84b8ecc324834f79f3a61e5e587366890a06e Mon Sep 17 00:00:00 2001 From: gnehil Date: Wed, 26 Jun 2024 20:12:40 +0800 Subject: [PATCH] [improvement](stream load)(cherry-pick) support hll_from_base64 for stream load column mapping (#36819) picked from https://github.com/apache/doris/pull/35923 --- .../org/apache/doris/catalog/FunctionSet.java | 1 + .../doris/planner/FileLoadScanNode.java | 4 +- .../load_p0/http_stream/test_http_stream.out | 12 ++++++ .../stream_load/test_stream_load_hll_type.csv | 10 +++++ .../stream_load/test_stream_load_new.out | 12 ++++++ .../http_stream/test_http_stream.groovy | 41 ++++++++++++++++++ .../stream_load/test_stream_load_new.groovy | 42 +++++++++++++++++++ 7 files changed, 121 insertions(+), 1 deletion(-) create mode 100644 regression-test/data/load_p0/stream_load/test_stream_load_hll_type.csv diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/FunctionSet.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/FunctionSet.java index b0d4c65453..2db943993d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/FunctionSet.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/FunctionSet.java @@ -178,6 +178,7 @@ public class FunctionSet { public static final String HLL_UNION_AGG = "hll_union_agg"; public static final String HLL_RAW_AGG = "hll_raw_agg"; public static final String HLL_CARDINALITY = "hll_cardinality"; + public static final String HLL_FROM_BASE64 = "hll_from_base64"; public static final String TO_BITMAP = "to_bitmap"; public static final String TO_BITMAP_WITH_CHECK = "to_bitmap_with_check"; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/FileLoadScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/FileLoadScanNode.java index ca0324a51d..0d674a7051 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/FileLoadScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/FileLoadScanNode.java @@ -280,9 +280,11 @@ public class FileLoadScanNode extends FileScanNode { } FunctionCallExpr fn = (FunctionCallExpr) expr; if (!fn.getFnName().getFunction().equalsIgnoreCase(FunctionSet.HLL_HASH) && !fn.getFnName() - .getFunction().equalsIgnoreCase("hll_empty")) { + .getFunction().equalsIgnoreCase("hll_empty") + && !fn.getFnName().getFunction().equalsIgnoreCase(FunctionSet.HLL_FROM_BASE64)) { throw new AnalysisException("HLL column must use " + FunctionSet.HLL_HASH + " function, like " + destSlotDesc.getColumn().getName() + "=" + FunctionSet.HLL_HASH + "(xxx) or " + + destSlotDesc.getColumn().getName() + "=" + FunctionSet.HLL_FROM_BASE64 + "(xxx) or " + destSlotDesc.getColumn().getName() + "=hll_empty()"); } expr.setType(org.apache.doris.catalog.Type.HLL); diff --git a/regression-test/data/load_p0/http_stream/test_http_stream.out b/regression-test/data/load_p0/http_stream/test_http_stream.out index 7ce24eea09..2475ed2496 100644 --- a/regression-test/data/load_p0/http_stream/test_http_stream.out +++ b/regression-test/data/load_p0/http_stream/test_http_stream.out @@ -620,3 +620,15 @@ 1 test 2 test +-- !sql19 -- +buag 1 1 +huang 1 1 +jfin 1 1 +koga 1 1 +kon 1 1 +lofn 1 1 +lojn 1 1 +nfubg 1 1 +nhga 1 1 +nijg 1 1 + diff --git a/regression-test/data/load_p0/stream_load/test_stream_load_hll_type.csv b/regression-test/data/load_p0/stream_load/test_stream_load_hll_type.csv new file mode 100644 index 0000000000..0b1d798782 --- /dev/null +++ b/regression-test/data/load_p0/stream_load/test_stream_load_hll_type.csv @@ -0,0 +1,10 @@ +1001,koga,AQEMYSmSmfh+mA== +1002,nijg,AQGs1RXTaA+hkQ== +1003,lojn,AQFyJr4rwn+S0A== +1004,lofn,AQFvE0bU6Pc9uw== +1005,jfin,AQEmxbO3VGItCA== +1006,kon,AQEm5d0Gw4uvZw== +1007,nhga,AQHOpocenFnBwQ== +1008,nfubg,AQFzYsFz+NIgUg== +1009,huang,AQH2slI7qAUmYA== +1010,buag,AQGBXZ3xnU79YA== \ No newline at end of file diff --git a/regression-test/data/load_p0/stream_load/test_stream_load_new.out b/regression-test/data/load_p0/stream_load/test_stream_load_new.out index 52440d9843..f251042a9d 100644 --- a/regression-test/data/load_p0/stream_load/test_stream_load_new.out +++ b/regression-test/data/load_p0/stream_load/test_stream_load_new.out @@ -124,3 +124,15 @@ 10009 jj 10010 kk +-- !sql13 -- +buag 1 1 +huang 1 1 +jfin 1 1 +koga 1 1 +kon 1 1 +lofn 1 1 +lojn 1 1 +nfubg 1 1 +nhga 1 1 +nijg 1 1 + diff --git a/regression-test/suites/load_p0/http_stream/test_http_stream.groovy b/regression-test/suites/load_p0/http_stream/test_http_stream.groovy index 781732988e..5411224c20 100644 --- a/regression-test/suites/load_p0/http_stream/test_http_stream.groovy +++ b/regression-test/suites/load_p0/http_stream/test_http_stream.groovy @@ -854,5 +854,46 @@ suite("test_http_stream", "p0") { } finally { try_sql "DROP TABLE IF EXISTS ${tableName18}" } + + // test load hll type + def tableName19 = "test_http_stream_hll_type" + + try { + sql """ + CREATE TABLE IF NOT EXISTS ${tableName19} ( + type_id int, + type_name varchar(10), + pv_hash hll hll_union not null, + pv_base64 hll hll_union not null + ) + AGGREGATE KEY(type_id,type_name) + DISTRIBUTED BY HASH(type_id) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1" + ) + """ + + streamLoad { + set 'version', '1' + set 'sql', """ + insert into ${db}.${tableName19} select c1,c2,hll_hash(c1),hll_from_base64(c3) from http_stream("format"="csv", "column_separator"=",") + """ + time 10000 + file '../stream_load/test_stream_load_hll_type.csv' + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("http_stream result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + } + } + + qt_sql19 "select type_name, hll_union_agg(pv_hash), hll_union_agg(pv_base64) from ${tableName19} group by type_name order by type_name" + } finally { + try_sql "DROP TABLE IF EXISTS ${tableName19}" + } + } diff --git a/regression-test/suites/load_p0/stream_load/test_stream_load_new.groovy b/regression-test/suites/load_p0/stream_load/test_stream_load_new.groovy index 7df57ebbd1..48c3e5f965 100644 --- a/regression-test/suites/load_p0/stream_load/test_stream_load_new.groovy +++ b/regression-test/suites/load_p0/stream_load/test_stream_load_new.groovy @@ -540,5 +540,47 @@ suite("test_stream_load_new", "p0") { } finally { try_sql "DROP TABLE IF EXISTS ${tableName12}" } + + // 13. test stream load hll type + def tableName13 = "test_stream_load_hll_type" + + try { + sql """ + CREATE TABLE IF NOT EXISTS ${tableName13} ( + type_id int, + type_name varchar(10), + pv_hash hll hll_union not null, + pv_base64 hll hll_union not null + ) + AGGREGATE KEY(type_id,type_name) + DISTRIBUTED BY HASH(type_id) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1" + ) + """ + + streamLoad { + set 'column_separator', ',' + set 'columns', 'type_id,type_name,type_id_base64,pv_hash=hll_hash(type_id),pv_base64=hll_from_base64(type_id_base64)' + table "${tableName13}" + time 10000 + file 'test_stream_load_hll_type.csv' + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(10, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + } + } + sql """ sync; """ + qt_sql13 "select type_name, hll_union_agg(pv_hash), hll_union_agg(pv_base64) from ${tableName13} group by type_name order by type_name" + } finally { + try_sql "DROP TABLE IF EXISTS ${tableName13}" + } + }