bitmap_union_count support window function (#2902)

This commit is contained in:
kangkaisen
2020-02-19 14:33:05 +08:00
committed by GitHub
parent 87a84a793e
commit a76f2b8211
13 changed files with 177 additions and 57 deletions

View File

@ -281,6 +281,12 @@ BigIntVal BitmapFunctions::bitmap_finalize(FunctionContext* ctx, const StringVal
return result;
}
BigIntVal BitmapFunctions::bitmap_get_value(FunctionContext* ctx, const StringVal& src) {
auto src_bitmap = reinterpret_cast<BitmapValue*>(src.ptr);
BigIntVal result(src_bitmap->cardinality());
return result;
}
void BitmapFunctions::bitmap_union(FunctionContext* ctx, const StringVal& src, StringVal* dst) {
auto dst_bitmap = reinterpret_cast<BitmapValue*>(dst->ptr);
// zero size means the src input is a agg object

View File

@ -46,6 +46,9 @@ public:
// the input src's ptr need to point a BitmapValue, this function will release the
// BitmapValue memory
static BigIntVal bitmap_finalize(FunctionContext* ctx, const StringVal& src);
// Get the bitmap cardinality, the difference from bitmap_finalize method is
// bitmap_get_value method doesn't free memory, this function is used in analytic get_value function
static BigIntVal bitmap_get_value(FunctionContext* ctx, const StringVal& src);
static void bitmap_union(FunctionContext* ctx, const StringVal& src, StringVal* dst);
static BigIntVal bitmap_count(FunctionContext* ctx, const StringVal& src);

View File

@ -37,7 +37,7 @@ public:
static BigIntVal hll_finalize(FunctionContext*, const StringVal& src);
// Get the hll cardinality, the difference from hll_finalize method is
// hll_get_value method doesn't free memory
// hll_get_value method doesn't free memory, this function is used in analytic get_value function
static BigIntVal hll_get_value(FunctionContext*, const StringVal& src);
static StringVal hll_serialize(FunctionContext* ctx, const StringVal& src);

View File

@ -119,6 +119,28 @@ TEST_F(BitmapFunctionsTest, bitmap_union_int) {
ASSERT_EQ(expected, result);
}
TEST_F(BitmapFunctionsTest, bitmap_get_value) {
StringVal dst;
BitmapFunctions::bitmap_init(ctx, &dst);
IntVal src1(1);
BitmapFunctions::bitmap_update_int(ctx, src1, &dst);
BigIntVal result = BitmapFunctions::bitmap_get_value(ctx, dst);
BigIntVal expected(1);
ASSERT_EQ(expected, result);
IntVal src2(1234567);
BitmapFunctions::bitmap_update_int(ctx, src2, &dst);
result = BitmapFunctions::bitmap_get_value(ctx, dst);
expected.val = 2;
ASSERT_EQ(expected, result);
BigIntVal finalize_result = BitmapFunctions::bitmap_finalize(ctx, dst);
ASSERT_EQ(result, finalize_result);
}
TEST_F(BitmapFunctionsTest, bitmap_union) {
StringVal dst;
BitmapFunctions::bitmap_init(ctx, &dst);

View File

@ -23,7 +23,6 @@ import org.apache.doris.catalog.BrokerTable;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.FunctionSet;
import org.apache.doris.catalog.MysqlTable;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
@ -578,10 +577,6 @@ public class InsertStmt extends DdlStmt {
checkHllCompatibility(col, expr);
}
if (col.getAggregationType() == AggregateType.BITMAP_UNION) {
checkBitmapCompatibility(col, expr);
}
if (expr instanceof DefaultValueExpr) {
if (targetColumns.get(i).getDefaultValue() == null) {
throw new AnalysisException("Column has no default value, column=" + targetColumns.get(i).getName());
@ -591,6 +586,10 @@ public class InsertStmt extends DdlStmt {
expr.analyze(analyzer);
if (col.getAggregationType() == AggregateType.BITMAP_UNION) {
checkBitmapCompatibility(col, expr);
}
row.set(i, checkTypeCompatibility(col, expr));
}
}
@ -643,33 +642,10 @@ public class InsertStmt extends DdlStmt {
}
private void checkBitmapCompatibility(Column col, Expr expr) throws AnalysisException {
boolean isCompatible = false;
final String bitmapMismatchLog = "Column's type is BITMAP,"
+ " SelectList must contains BITMAP column, or function return type must be BITMAP, " +
" column=" + col.getName();
if (expr instanceof SlotRef) {
final SlotRef slot = (SlotRef) expr;
Column column = slot.getDesc().getColumn();
if (column != null && column.getAggregationType() == AggregateType.BITMAP_UNION) {
isCompatible = true; // select * from bitmap_table
} else if (slot.getDesc().getSourceExprs().size() == 1) {
Expr sourceExpr = slot.getDesc().getSourceExprs().get(0);
if (sourceExpr instanceof FunctionCallExpr) {
FunctionCallExpr functionExpr = (FunctionCallExpr) sourceExpr;
if (functionExpr.getFnName().getFunction().equalsIgnoreCase(FunctionSet.BITMAP_UNION)) {
isCompatible = true; // select id, bitmap_union(id2) from bitmap_table group by id
}
}
}
} else if (expr instanceof FunctionCallExpr) {
final FunctionCallExpr functionExpr = (FunctionCallExpr) expr;
if (functionExpr.getFn().getReturnType() == Type.BITMAP) {
isCompatible = true;
}
}
if (!isCompatible) {
throw new AnalysisException(bitmapMismatchLog);
String errorMsg = String.format("bitmap column %s require the function return type is BITMAP",
col.getName());
if (!expr.getType().isBitmapType()) {
throw new AnalysisException(errorMsg);
}
}

View File

@ -1161,8 +1161,10 @@ public class FunctionSet {
"_ZN5doris15BitmapFunctions12bitmap_unionEPN9doris_udf15FunctionContextERKNS1_9StringValEPS4_",
"_ZN5doris15BitmapFunctions12bitmap_unionEPN9doris_udf15FunctionContextERKNS1_9StringValEPS4_",
"_ZN5doris15BitmapFunctions16bitmap_serializeEPN9doris_udf15FunctionContextERKNS1_9StringValE",
"_ZN5doris15BitmapFunctions16bitmap_get_valueEPN9doris_udf15FunctionContextERKNS1_9StringValE",
null,
"_ZN5doris15BitmapFunctions15bitmap_finalizeEPN9doris_udf15FunctionContextERKNS1_9StringValE",
true, false, true));
true, true, true));
//PercentileApprox
addBuiltin(AggregateFunction.createBuiltin("percentile_approx",

View File

@ -292,7 +292,7 @@ public class BrokerScanNode extends LoadScanNode {
expr.setType(Type.HLL);
}
checkBitmapCompatibility(destSlotDesc, expr);
checkBitmapCompatibility(analyzer, destSlotDesc, expr);
// analyze negative
if (isNegative && destSlotDesc.getColumn().getAggregationType() == AggregateType.SUM) {

View File

@ -20,7 +20,6 @@ package org.apache.doris.planner;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.ExprSubstitutionMap;
import org.apache.doris.analysis.FunctionCallExpr;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.TupleDescriptor;
@ -75,21 +74,15 @@ public abstract class LoadScanNode extends ScanNode {
addConjuncts(whereExpr.getConjuncts());
}
protected void checkBitmapCompatibility(SlotDescriptor slotDesc, Expr expr) throws AnalysisException {
boolean isCompatible = true;
protected void checkBitmapCompatibility(Analyzer analyzer, SlotDescriptor slotDesc, Expr expr) throws AnalysisException {
if (slotDesc.getColumn().getAggregationType() == AggregateType.BITMAP_UNION) {
if (!(expr instanceof FunctionCallExpr)) {
isCompatible = false;
} else {
FunctionCallExpr fn = (FunctionCallExpr) expr;
if (fn.getFn().getReturnType() != Type.BITMAP) {
isCompatible = false;
}
expr.analyze(analyzer);
if (!expr.getType().isBitmapType()) {
String errorMsg = String.format("bitmap column %s require the function return type is BITMAP",
slotDesc.getColumn().getName());
throw new AnalysisException(errorMsg);
}
}
if (!isCompatible) {
throw new AnalysisException("bitmap column require the function return type is BITMAP");
}
}
}

View File

@ -27,8 +27,6 @@ import org.apache.doris.analysis.SlotId;
import org.apache.doris.analysis.StatementBase;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.NotImplementedException;
import org.apache.doris.common.UserException;
import org.apache.doris.thrift.TExplainLevel;
import org.apache.doris.thrift.TQueryOptions;
@ -74,7 +72,7 @@ public class Planner {
}
public void plan(StatementBase queryStmt, Analyzer analyzer, TQueryOptions queryOptions)
throws NotImplementedException, UserException, AnalysisException {
throws UserException {
createPlanFragments(queryStmt, analyzer, queryOptions);
}
@ -133,7 +131,7 @@ public class Planner {
* a list such that element i of that list can only consume output of the following fragments j > i.
*/
public void createPlanFragments(StatementBase statement, Analyzer analyzer, TQueryOptions queryOptions)
throws NotImplementedException, UserException, AnalysisException {
throws UserException {
QueryStmt queryStmt;
if (statement instanceof InsertStmt) {
queryStmt = ((InsertStmt) statement).getQueryStmt();

View File

@ -194,7 +194,7 @@ public class StreamLoadScanNode extends LoadScanNode {
expr.setType(Type.HLL);
}
checkBitmapCompatibility(dstSlotDesc, expr);
checkBitmapCompatibility(analyzer, dstSlotDesc, expr);
if (negative && dstSlotDesc.getColumn().getAggregationType() == AggregateType.SUM) {
expr = new ArithmeticExpr(ArithmeticExpr.Operator.MULTIPLY, expr, new IntLiteral(-1));

View File

@ -234,7 +234,6 @@ public class StmtExecutor {
}
if (!context.getMysqlChannel().isSend()) {
LOG.warn("retry {} times. stmt: {}", (i + 1), context.getStmtId());
continue;
} else {
throw e;
}
@ -456,8 +455,6 @@ public class StmtExecutor {
}
// TODO(zc):
// Preconditions.checkState(!analyzer.hasUnassignedConjuncts());
} catch (AnalysisException e) {
throw e;
} catch (UserException e) {
throw e;
} catch (Exception e) {

View File

@ -0,0 +1,108 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.utframe;
import org.apache.doris.analysis.CreateDbStmt;
import org.apache.doris.analysis.CreateTableStmt;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.qe.ConnectContext;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.util.UUID;
public class BitmapFunctionTest {
// use a unique dir so that it won't be conflict with other unit test which
// may also start a Mocked Frontend
private static String runningDir = "fe/mocked/BitmapFunctionTest/" + UUID.randomUUID().toString() + "/";
private static ConnectContext connectContext;
@BeforeClass
public static void beforeClass() throws Exception {
UtFrameUtils.createMinDorisCluster(runningDir);
// create connect context
connectContext = UtFrameUtils.createDefaultCtx();
// create database
String createDbStmtStr = "create database test;";
CreateDbStmt createDbStmt = (CreateDbStmt) UtFrameUtils.parseAndAnalyzeStmt(createDbStmtStr, connectContext);
Catalog.getCurrentCatalog().createDb(createDbStmt);
// create table
String createTblStmtStr = "CREATE TABLE test.bitmap_table (\n" +
" `id` int(11) NULL COMMENT \"\",\n" +
" `id2` bitmap bitmap_union NULL\n" +
") ENGINE=OLAP\n" +
"AGGREGATE KEY(`id`)\n" +
"DISTRIBUTED BY HASH(`id`) BUCKETS 1\n" +
"PROPERTIES (\n" +
" \"replication_num\" = \"1\"\n" +
");";
CreateTableStmt createTableStmt = (CreateTableStmt) UtFrameUtils.parseAndAnalyzeStmt(createTblStmtStr, connectContext);
Catalog.getCurrentCatalog().createTable(createTableStmt);
createTblStmtStr = "CREATE TABLE test.bitmap_table_2 (\n" +
" `id` int(11) NULL COMMENT \"\",\n" +
" `id2` bitmap bitmap_union NULL\n" +
") ENGINE=OLAP\n" +
"AGGREGATE KEY(`id`)\n" +
"DISTRIBUTED BY HASH(`id`) BUCKETS 1\n" +
"PROPERTIES (\n" +
" \"replication_num\" = \"1\"\n" +
");";
createTableStmt = (CreateTableStmt) UtFrameUtils.parseAndAnalyzeStmt(createTblStmtStr, connectContext);
Catalog.getCurrentCatalog().createTable(createTableStmt);
}
@Test
public void testBitmapInsertInto() throws Exception {
String queryStr = "explain INSERT INTO test.bitmap_table (id, id2) VALUES (1001, to_bitmap(1000)), (1001, to_bitmap(2000));";
String explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr);
Assert.assertTrue(explainString.contains("OLAP TABLE SINK"));
queryStr = "explain insert into test.bitmap_table select id, bitmap_union(id2) from test.bitmap_table_2 group by id;";
explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr);
Assert.assertTrue(explainString.contains("OLAP TABLE SINK"));
Assert.assertTrue(explainString.contains("bitmap_union"));
Assert.assertTrue(explainString.contains("1:AGGREGATE"));
Assert.assertTrue(explainString.contains("0:OlapScanNode"));
queryStr = "explain insert into test.bitmap_table select id, id2 from test.bitmap_table_2;";
explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr);
Assert.assertTrue(explainString.contains("OLAP TABLE SINK"));
Assert.assertTrue(explainString.contains("OUTPUT EXPRS:`id` | `id2`"));
Assert.assertTrue(explainString.contains("0:OlapScanNode"));
queryStr = "explain insert into test.bitmap_table select id, to_bitmap(id2) from test.bitmap_table_2;";
explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr);
Assert.assertTrue(explainString.contains("OLAP TABLE SINK"));
Assert.assertTrue(explainString.contains("OUTPUT EXPRS:`id` | to_bitmap(`id2`)"));
Assert.assertTrue(explainString.contains("0:OlapScanNode"));
queryStr = "explain insert into test.bitmap_table select id, bitmap_hash(id2) from test.bitmap_table_2;";
explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr);
Assert.assertTrue(explainString.contains("OLAP TABLE SINK"));
Assert.assertTrue(explainString.contains("OUTPUT EXPRS:`id` | bitmap_hash(`id2`)"));
Assert.assertTrue(explainString.contains("0:OlapScanNode"));
queryStr = "explain insert into test.bitmap_table select id, id from test.bitmap_table_2;";
String errorMsg = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr);
Assert.assertTrue(errorMsg.contains("bitmap column id2 require the function return type is BITMAP"));
}
}

View File

@ -26,8 +26,12 @@ import org.apache.doris.catalog.Catalog;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.Pair;
import org.apache.doris.mysql.privilege.PaloAuth;
import org.apache.doris.planner.Planner;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.QueryState;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TExplainLevel;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.utframe.MockedBackendFactory.DefaultBeThriftServiceImpl;
import org.apache.doris.utframe.MockedBackendFactory.DefaultHeartbeatServiceImpl;
@ -49,7 +53,6 @@ import java.util.List;
import java.util.Map;
public class UtFrameUtils {
// Help to create a mocked ConnectContext.
public static ConnectContext createDefaultCtx() throws IOException {
SocketChannel channel = SocketChannel.open();
@ -139,4 +142,16 @@ public class UtFrameUtils {
}
}
}
public static String getSQLPlanOrErrorMsg(ConnectContext ctx, String queryStr) throws Exception {
ctx.getState().reset();
StmtExecutor stmtExecutor = new StmtExecutor(ctx, queryStr);
stmtExecutor.execute();
if (ctx.getState().getStateType() != QueryState.MysqlStateType.ERR) {
Planner planner = stmtExecutor.planner();
return planner.getExplainString(planner.getFragments(), TExplainLevel.VERBOSE);
} else {
return ctx.getState().getErrorMessage();
}
}
}