diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 3a25811eea..72c436f549 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -515,6 +515,10 @@ public class Config extends ConfigBase { "Default commit interval in ms for group commit"}) public static int group_commit_interval_ms_default_value = 10000; + @ConfField(mutable = false, masterOnly = true, description = {"攒批的默认提交数据量,单位是字节,默认128M", + "Default commit data bytes for group commit"}) + public static int group_commit_data_bytes_default_value = 134217728; + @ConfField(mutable = true, masterOnly = true, description = {"Stream load 的默认超时时间,单位是秒。", "Default timeout for stream load job, in seconds."}) public static int stream_load_default_timeout_second = 86400 * 3; // 3days diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java index c49a325487..11ef9d9015 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java @@ -500,6 +500,8 @@ public class Alter { .containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS) || properties .containsKey(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_INTERVAL_MS) + || properties + .containsKey(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_DATA_BYTES) || properties .containsKey(PropertyAnalyzer.PROPERTIES_ENABLE_SINGLE_REPLICA_COMPACTION) || properties diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java index b1d08ae0c0..206f028b02 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java @@ -2213,6 +2213,7 @@ public class SchemaChangeHandler extends AlterHandler { && !properties.containsKey(PropertyAnalyzer.PROPERTIES_ENABLE_SINGLE_REPLICA_COMPACTION) && !properties.containsKey(PropertyAnalyzer.PROPERTIES_DISABLE_AUTO_COMPACTION) && !properties.containsKey(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_INTERVAL_MS) + && !properties.containsKey(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_DATA_BYTES) && !properties.containsKey(PropertyAnalyzer.PROPERTIES_SKIP_WRITE_INDEX_ON_LOAD)) { LOG.info("Properties already up-to-date"); return; diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java index 07529535e6..cc4dede8f1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java @@ -267,6 +267,21 @@ public class ModifyTablePropertiesClause extends AlterTableClause { } this.needTableStable = false; this.opType = AlterOpType.MODIFY_TABLE_PROPERTY_SYNC; + } else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_DATA_BYTES)) { + long groupCommitDataBytes; + String groupCommitDataBytesStr = properties.get(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_DATA_BYTES); + try { + groupCommitDataBytes = Long.parseLong(groupCommitDataBytesStr); + if (groupCommitDataBytes < 0) { + throw new AnalysisException("group_commit_data_bytes can not be less than 0:" + + groupCommitDataBytesStr); + } + } catch (NumberFormatException e) { + throw new AnalysisException("Invalid group_commit_data_bytes format: " + + groupCommitDataBytesStr); + } + this.needTableStable = false; + this.opType = AlterOpType.MODIFY_TABLE_PROPERTY_SYNC; } else { throw new AnalysisException("Unknown table property: " + properties.keySet()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 197f5218a1..900c25fb7a 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -3411,6 +3411,10 @@ public class Env { sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_INTERVAL_MS).append("\" = \""); sb.append(olapTable.getGroupCommitIntervalMs()).append("\""); + // group commit data bytes + sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_DATA_BYTES).append("\" = \""); + sb.append(olapTable.getGroupCommitDataBytes()).append("\""); + // enable duplicate without keys by default if (olapTable.isDuplicateWithoutKey()) { sb.append(",\n\"") diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index e70b08e6c8..a8d7fc085b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -1114,6 +1114,14 @@ public class OlapTable extends Table { return getOrCreatTableProperty().getGroupCommitIntervalMs(); } + public void setGroupCommitDataBytes(int groupCommitInterValMs) { + getOrCreatTableProperty().setGroupCommitDataBytes(groupCommitInterValMs); + } + + public int getGroupCommitDataBytes() { + return getOrCreatTableProperty().getGroupCommitDataBytes(); + } + public Boolean hasSequenceCol() { return getSequenceCol() != null; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java index 3cbd8a6381..3d5215bc48 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java @@ -523,6 +523,16 @@ public class TableProperty implements Writable { Integer.toString(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_INTERVAL_MS_DEFAULT_VALUE))); } + public void setGroupCommitDataBytes(int groupCommitDataBytes) { + properties.put(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_DATA_BYTES, Integer.toString(groupCommitDataBytes)); + } + + public int getGroupCommitDataBytes() { + return Integer.parseInt(properties.getOrDefault( + PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_DATA_BYTES, + Integer.toString(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_DATA_BYTES_DEFAULT_VALUE))); + } + public void buildReplicaAllocation() { try { // Must copy the properties because "analyzeReplicaAllocation" will remove the property diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java index 9c0cfe722f..8699c9db43 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java @@ -180,6 +180,10 @@ public class PropertyAnalyzer { public static final int PROPERTIES_GROUP_COMMIT_INTERVAL_MS_DEFAULT_VALUE = Config.group_commit_interval_ms_default_value; + public static final String PROPERTIES_GROUP_COMMIT_DATA_BYTES = "group_commit_data_bytes"; + public static final int PROPERTIES_GROUP_COMMIT_DATA_BYTES_DEFAULT_VALUE + = Config.group_commit_data_bytes_default_value; + // compaction policy public static final String SIZE_BASED_COMPACTION_POLICY = "size_based"; public static final String TIME_SERIES_COMPACTION_POLICY = "time_series"; @@ -1213,6 +1217,22 @@ public class PropertyAnalyzer { return groupCommitIntervalMs; } + public static int analyzeGroupCommitDateBytes(Map properties) throws AnalysisException { + int groupCommitDataBytes = PROPERTIES_GROUP_COMMIT_DATA_BYTES_DEFAULT_VALUE; + if (properties != null && properties.containsKey(PROPERTIES_GROUP_COMMIT_DATA_BYTES)) { + String groupIntervalCommitDataBytesStr = properties.get(PROPERTIES_GROUP_COMMIT_DATA_BYTES); + try { + groupCommitDataBytes = Integer.parseInt(groupIntervalCommitDataBytesStr); + } catch (Exception e) { + throw new AnalysisException("parse group_commit_interval_ms format error"); + } + + properties.remove(PROPERTIES_GROUP_COMMIT_DATA_BYTES); + } + + return groupCommitDataBytes; + } + /** * Check the type property of the catalog props. */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index cc04dd195f..da8afec263 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -2428,7 +2428,7 @@ public class InternalCatalog implements CatalogIf { } // analyse group commit interval ms - int groupCommitIntervalMs = 0; + int groupCommitIntervalMs; try { groupCommitIntervalMs = PropertyAnalyzer.analyzeGroupCommitIntervalMs(properties); olapTable.setGroupCommitIntervalMs(groupCommitIntervalMs); @@ -2436,6 +2436,14 @@ public class InternalCatalog implements CatalogIf { throw new DdlException(e.getMessage()); } + int groupCommitDataBytes; + try { + groupCommitDataBytes = PropertyAnalyzer.analyzeGroupCommitDateBytes(properties); + olapTable.setGroupCommitDataBytes(groupCommitDataBytes); + } catch (Exception e) { + throw new DdlException(e.getMessage()); + } + olapTable.initSchemaColumnUniqueId(); olapTable.initAutoIncrentGenerator(db.getId()); olapTable.rebuildFullSchema(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index cf42cb921f..c69862d998 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -1983,8 +1983,7 @@ public class FrontendServiceImpl implements FrontendService.Iface { result.setTableId(parsedStmt.getTargetTable().getId()); result.setBaseSchemaVersion(((OlapTable) parsedStmt.getTargetTable()).getBaseSchemaVersion()); result.setGroupCommitIntervalMs(((OlapTable) parsedStmt.getTargetTable()).getGroupCommitIntervalMs()); - // TODO get from table property - result.setGroupCommitDataBytes(134217728L); + result.setGroupCommitDataBytes(((OlapTable) parsedStmt.getTargetTable()).getGroupCommitDataBytes()); result.setWaitInternalGroupCommitFinish(Config.wait_internal_group_commit_finish); } catch (UserException e) { LOG.warn("exec sql error", e); diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateTableAsSelectStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateTableAsSelectStmtTest.java index 96df0d9c74..13811d3af4 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateTableAsSelectStmtTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateTableAsSelectStmtTest.java @@ -99,7 +99,8 @@ public class CreateTableAsSelectStmtTest extends TestWithFeService { + "\"light_schema_change\" = \"true\",\n" + "\"disable_auto_compaction\" = \"false\",\n" + "\"enable_single_replica_compaction\" = \"false\",\n" - + "\"group_commit_interval_ms\" = \"10000\"\n" + + "\"group_commit_interval_ms\" = \"10000\",\n" + + "\"group_commit_data_bytes\" = \"134217728\"\n" + ");", showCreateTableByName("select_decimal_table").getResultRows().get(0).get(1)); String selectFromDecimal1 = @@ -123,7 +124,8 @@ public class CreateTableAsSelectStmtTest extends TestWithFeService { + "\"light_schema_change\" = \"true\",\n" + "\"disable_auto_compaction\" = \"false\",\n" + "\"enable_single_replica_compaction\" = \"false\",\n" - + "\"group_commit_interval_ms\" = \"10000\"\n" + + "\"group_commit_interval_ms\" = \"10000\",\n" + + "\"group_commit_data_bytes\" = \"134217728\"\n" + ");", showCreateTableByName("select_decimal_table_1").getResultRows().get(0).get(1)); } else { @@ -143,7 +145,8 @@ public class CreateTableAsSelectStmtTest extends TestWithFeService { + "\"light_schema_change\" = \"true\",\n" + "\"disable_auto_compaction\" = \"false\",\n" + "\"enable_single_replica_compaction\" = \"false\",\n" - + "\"group_commit_interval_ms\" = \"10000\"\n" + + "\"group_commit_interval_ms\" = \"10000\",\n" + + "\"group_commit_data_bytes\" = \"134217728\"\n" + ");", showCreateTableByName("select_decimal_table_1").getResultRows().get(0).get(1)); } @@ -181,7 +184,8 @@ public class CreateTableAsSelectStmtTest extends TestWithFeService { + "\"light_schema_change\" = \"true\",\n" + "\"disable_auto_compaction\" = \"false\",\n" + "\"enable_single_replica_compaction\" = \"false\",\n" - + "\"group_commit_interval_ms\" = \"10000\"\n" + + "\"group_commit_interval_ms\" = \"10000\",\n" + + "\"group_commit_data_bytes\" = \"134217728\"\n" + ");", showResultSet.getResultRows().get(0).get(1)); } @@ -208,7 +212,8 @@ public class CreateTableAsSelectStmtTest extends TestWithFeService { + "\"light_schema_change\" = \"true\",\n" + "\"disable_auto_compaction\" = \"false\",\n" + "\"enable_single_replica_compaction\" = \"false\",\n" - + "\"group_commit_interval_ms\" = \"10000\"\n" + + "\"group_commit_interval_ms\" = \"10000\",\n" + + "\"group_commit_data_bytes\" = \"134217728\"\n" + ");", showResultSet1.getResultRows().get(0).get(1)); @@ -237,7 +242,8 @@ public class CreateTableAsSelectStmtTest extends TestWithFeService { + "\"light_schema_change\" = \"true\",\n" + "\"disable_auto_compaction\" = \"false\",\n" + "\"enable_single_replica_compaction\" = \"false\",\n" - + "\"group_commit_interval_ms\" = \"10000\"\n" + + "\"group_commit_interval_ms\" = \"10000\",\n" + + "\"group_commit_data_bytes\" = \"134217728\"\n" + ");", showResultSet2.getResultRows().get(0).get(1)); } @@ -263,7 +269,8 @@ public class CreateTableAsSelectStmtTest extends TestWithFeService { + "\"light_schema_change\" = \"true\",\n" + "\"disable_auto_compaction\" = \"false\",\n" + "\"enable_single_replica_compaction\" = \"false\",\n" - + "\"group_commit_interval_ms\" = \"10000\"\n" + + "\"group_commit_interval_ms\" = \"10000\",\n" + + "\"group_commit_data_bytes\" = \"134217728\"\n" + ");", showResultSet1.getResultRows().get(0).get(1)); String selectAlias2 = "create table `test`.`select_alias_2` PROPERTIES(\"replication_num\" = \"1\") " + "as select userId as alias_name, username from `test`.`varchar_table`"; @@ -285,7 +292,8 @@ public class CreateTableAsSelectStmtTest extends TestWithFeService { + "\"light_schema_change\" = \"true\",\n" + "\"disable_auto_compaction\" = \"false\",\n" + "\"enable_single_replica_compaction\" = \"false\",\n" - + "\"group_commit_interval_ms\" = \"10000\"\n" + + "\"group_commit_interval_ms\" = \"10000\",\n" + + "\"group_commit_data_bytes\" = \"134217728\"\n" + ");", showResultSet2.getResultRows().get(0).get(1)); } @@ -314,7 +322,8 @@ public class CreateTableAsSelectStmtTest extends TestWithFeService { + "\"light_schema_change\" = \"true\",\n" + "\"disable_auto_compaction\" = \"false\",\n" + "\"enable_single_replica_compaction\" = \"false\",\n" - + "\"group_commit_interval_ms\" = \"10000\"\n" + + "\"group_commit_interval_ms\" = \"10000\",\n" + + "\"group_commit_data_bytes\" = \"134217728\"\n" + ");", showResultSet.getResultRows().get(0).get(1)); String selectFromJoin1 = "create table `test`.`select_join1` PROPERTIES(\"replication_num\" = \"1\") " @@ -340,7 +349,8 @@ public class CreateTableAsSelectStmtTest extends TestWithFeService { + "\"light_schema_change\" = \"true\",\n" + "\"disable_auto_compaction\" = \"false\",\n" + "\"enable_single_replica_compaction\" = \"false\",\n" - + "\"group_commit_interval_ms\" = \"10000\"\n" + + "\"group_commit_interval_ms\" = \"10000\",\n" + + "\"group_commit_data_bytes\" = \"134217728\"\n" + ");", showResultSet1.getResultRows().get(0).get(1)); } @@ -370,7 +380,8 @@ public class CreateTableAsSelectStmtTest extends TestWithFeService { + "\"light_schema_change\" = \"true\",\n" + "\"disable_auto_compaction\" = \"false\",\n" + "\"enable_single_replica_compaction\" = \"false\",\n" - + "\"group_commit_interval_ms\" = \"10000\"\n" + + "\"group_commit_interval_ms\" = \"10000\",\n" + + "\"group_commit_data_bytes\" = \"134217728\"\n" + ");", showResultSet.getResultRows().get(0).get(1)); } @@ -397,7 +408,8 @@ public class CreateTableAsSelectStmtTest extends TestWithFeService { + "\"light_schema_change\" = \"true\",\n" + "\"disable_auto_compaction\" = \"false\",\n" + "\"enable_single_replica_compaction\" = \"false\",\n" - + "\"group_commit_interval_ms\" = \"10000\"\n" + + "\"group_commit_interval_ms\" = \"10000\",\n" + + "\"group_commit_data_bytes\" = \"134217728\"\n" + ");", showResultSet.getResultRows().get(0).get(1)); } @@ -423,7 +435,8 @@ public class CreateTableAsSelectStmtTest extends TestWithFeService { + "\"light_schema_change\" = \"true\",\n" + "\"disable_auto_compaction\" = \"false\",\n" + "\"enable_single_replica_compaction\" = \"false\",\n" - + "\"group_commit_interval_ms\" = \"10000\"\n" + + "\"group_commit_interval_ms\" = \"10000\",\n" + + "\"group_commit_data_bytes\" = \"134217728\"\n" + ");", showResultSet.getResultRows().get(0).get(1)); String selectFromCteAndUnion = "create table `test`.`select_cte_union` PROPERTIES(\"replication_num\" = \"1\")" @@ -445,7 +458,8 @@ public class CreateTableAsSelectStmtTest extends TestWithFeService { + "\"light_schema_change\" = \"true\",\n" + "\"disable_auto_compaction\" = \"false\",\n" + "\"enable_single_replica_compaction\" = \"false\",\n" - + "\"group_commit_interval_ms\" = \"10000\"\n" + + "\"group_commit_interval_ms\" = \"10000\",\n" + + "\"group_commit_data_bytes\" = \"134217728\"\n" + ");", showResultSet1.getResultRows().get(0).get(1)); } @@ -474,7 +488,8 @@ public class CreateTableAsSelectStmtTest extends TestWithFeService { + "\"light_schema_change\" = \"true\",\n" + "\"disable_auto_compaction\" = \"false\",\n" + "\"enable_single_replica_compaction\" = \"false\",\n" - + "\"group_commit_interval_ms\" = \"10000\"\n" + + "\"group_commit_interval_ms\" = \"10000\",\n" + + "\"group_commit_data_bytes\" = \"134217728\"\n" + ");", showResultSet.getResultRows().get(0).get(1)); } @@ -502,7 +517,8 @@ public class CreateTableAsSelectStmtTest extends TestWithFeService { + "\"light_schema_change\" = \"true\",\n" + "\"disable_auto_compaction\" = \"false\",\n" + "\"enable_single_replica_compaction\" = \"false\",\n" - + "\"group_commit_interval_ms\" = \"10000\"\n" + + "\"group_commit_interval_ms\" = \"10000\",\n" + + "\"group_commit_data_bytes\" = \"134217728\"\n" + ");", showResultSet.getResultRows().get(0).get(1)); } @@ -529,7 +545,8 @@ public class CreateTableAsSelectStmtTest extends TestWithFeService { + "\"light_schema_change\" = \"true\",\n" + "\"disable_auto_compaction\" = \"false\",\n" + "\"enable_single_replica_compaction\" = \"false\",\n" - + "\"group_commit_interval_ms\" = \"10000\"\n" + + "\"group_commit_interval_ms\" = \"10000\",\n" + + "\"group_commit_data_bytes\" = \"134217728\"\n" + ");", showResultSet.getResultRows().get(0).get(1)); } @@ -558,7 +575,8 @@ public class CreateTableAsSelectStmtTest extends TestWithFeService { + "\"light_schema_change\" = \"true\",\n" + "\"disable_auto_compaction\" = \"false\",\n" + "\"enable_single_replica_compaction\" = \"false\",\n" - + "\"group_commit_interval_ms\" = \"10000\"\n" + + "\"group_commit_interval_ms\" = \"10000\",\n" + + "\"group_commit_data_bytes\" = \"134217728\"\n" + ");", showResultSet.getResultRows().get(0).get(1)); } @@ -610,7 +628,8 @@ public class CreateTableAsSelectStmtTest extends TestWithFeService { + "\"light_schema_change\" = \"true\",\n" + "\"disable_auto_compaction\" = \"false\",\n" + "\"enable_single_replica_compaction\" = \"false\",\n" - + "\"group_commit_interval_ms\" = \"10000\"\n" + + "\"group_commit_interval_ms\" = \"10000\",\n" + + "\"group_commit_data_bytes\" = \"134217728\"\n" + ");", createTableStmts.get(0)); } else { @@ -630,7 +649,8 @@ public class CreateTableAsSelectStmtTest extends TestWithFeService { + "\"light_schema_change\" = \"true\",\n" + "\"disable_auto_compaction\" = \"false\",\n" + "\"enable_single_replica_compaction\" = \"false\",\n" - + "\"group_commit_interval_ms\" = \"10000\"\n" + + "\"group_commit_interval_ms\" = \"10000\",\n" + + "\"group_commit_data_bytes\" = \"134217728\"\n" + ");", createTableStmts.get(0)); } @@ -664,7 +684,8 @@ public class CreateTableAsSelectStmtTest extends TestWithFeService { + "\"light_schema_change\" = \"true\",\n" + "\"disable_auto_compaction\" = \"false\",\n" + "\"enable_single_replica_compaction\" = \"false\",\n" - + "\"group_commit_interval_ms\" = \"10000\"\n" + + "\"group_commit_interval_ms\" = \"10000\",\n" + + "\"group_commit_data_bytes\" = \"134217728\"\n" + ");", showStr); } diff --git a/regression-test/data/show_p0/test_show_create_table_and_views.out b/regression-test/data/show_p0/test_show_create_table_and_views.out index 53c1da08b9..95af034fcc 100644 --- a/regression-test/data/show_p0/test_show_create_table_and_views.out +++ b/regression-test/data/show_p0/test_show_create_table_and_views.out @@ -1,6 +1,6 @@ -- This file is automatically generated. You should know what you did if you want to edit this -- !show -- -show_create_table_and_views_table CREATE TABLE `show_create_table_and_views_table` (\n `user_id` LARGEINT NOT NULL,\n `good_id` LARGEINT NOT NULL,\n `cost` BIGINT SUM NULL DEFAULT "0"\n) ENGINE=OLAP\nAGGREGATE KEY(`user_id`, `good_id`)\nCOMMENT 'OLAP'\nPARTITION BY RANGE(`good_id`)\n(PARTITION p1 VALUES [("-170141183460469231731687303715884105728"), ("100")),\nPARTITION p2 VALUES [("100"), ("200")),\nPARTITION p3 VALUES [("200"), ("300")),\nPARTITION p4 VALUES [("300"), ("400")),\nPARTITION p5 VALUES [("400"), ("500")),\nPARTITION p6 VALUES [("500"), ("600")),\nPARTITION p7 VALUES [("600"), (MAXVALUE)))\nDISTRIBUTED BY HASH(`user_id`) BUCKETS 2\nPROPERTIES (\n"replication_allocation" = "tag.location.default: 1",\n"min_load_replica_num" = "-1",\n"is_being_synced" = "false",\n"storage_medium" = "hdd",\n"storage_format" = "V2",\n"light_schema_change" = "true",\n"disable_auto_compaction" = "false",\n"binlog.enable" = "false",\n"binlog.ttl_seconds" = "9223372036854775807",\n"binlog.max_bytes" = "9223372036854775807",\n"binlog.max_history_nums" = "9223372036854775807",\n"enable_single_replica_compaction" = "false",\n"group_commit_interval_ms" = "10000"\n); +show_create_table_and_views_table CREATE TABLE `show_create_table_and_views_table` (\n `user_id` LARGEINT NOT NULL,\n `good_id` LARGEINT NOT NULL,\n `cost` BIGINT SUM NULL DEFAULT "0"\n) ENGINE=OLAP\nAGGREGATE KEY(`user_id`, `good_id`)\nCOMMENT 'OLAP'\nPARTITION BY RANGE(`good_id`)\n(PARTITION p1 VALUES [("-170141183460469231731687303715884105728"), ("100")),\nPARTITION p2 VALUES [("100"), ("200")),\nPARTITION p3 VALUES [("200"), ("300")),\nPARTITION p4 VALUES [("300"), ("400")),\nPARTITION p5 VALUES [("400"), ("500")),\nPARTITION p6 VALUES [("500"), ("600")),\nPARTITION p7 VALUES [("600"), (MAXVALUE)))\nDISTRIBUTED BY HASH(`user_id`) BUCKETS 2\nPROPERTIES (\n"replication_allocation" = "tag.location.default: 1",\n"min_load_replica_num" = "-1",\n"is_being_synced" = "false",\n"storage_medium" = "hdd",\n"storage_format" = "V2",\n"light_schema_change" = "true",\n"disable_auto_compaction" = "false",\n"binlog.enable" = "false",\n"binlog.ttl_seconds" = "9223372036854775807",\n"binlog.max_bytes" = "9223372036854775807",\n"binlog.max_history_nums" = "9223372036854775807",\n"enable_single_replica_compaction" = "false",\n"group_commit_interval_ms" = "10000",\n"group_commit_data_bytes" = "134217728"\n); -- !select -- 1 1 30 @@ -36,11 +36,11 @@ show_create_table_and_views_view CREATE VIEW `show_create_table_and_views_view` 300 1 -- !show -- -show_create_table_and_views_table CREATE TABLE `show_create_table_and_views_table` (\n `user_id` LARGEINT NOT NULL,\n `good_id` LARGEINT NOT NULL,\n `cost` BIGINT SUM NULL DEFAULT "0"\n) ENGINE=OLAP\nAGGREGATE KEY(`user_id`, `good_id`)\nCOMMENT 'OLAP'\nPARTITION BY RANGE(`good_id`)\n(PARTITION p1 VALUES [("-170141183460469231731687303715884105728"), ("100")),\nPARTITION p2 VALUES [("100"), ("200")),\nPARTITION p3 VALUES [("200"), ("300")),\nPARTITION p4 VALUES [("300"), ("400")),\nPARTITION p5 VALUES [("400"), ("500")),\nPARTITION p6 VALUES [("500"), ("600")),\nPARTITION p7 VALUES [("600"), (MAXVALUE)))\nDISTRIBUTED BY HASH(`user_id`) BUCKETS 2\nPROPERTIES (\n"replication_allocation" = "tag.location.default: 1",\n"min_load_replica_num" = "-1",\n"is_being_synced" = "false",\n"storage_medium" = "hdd",\n"storage_format" = "V2",\n"light_schema_change" = "true",\n"disable_auto_compaction" = "false",\n"binlog.enable" = "false",\n"binlog.ttl_seconds" = "9223372036854775807",\n"binlog.max_bytes" = "9223372036854775807",\n"binlog.max_history_nums" = "9223372036854775807",\n"enable_single_replica_compaction" = "false",\n"group_commit_interval_ms" = "10000"\n); +show_create_table_and_views_table CREATE TABLE `show_create_table_and_views_table` (\n `user_id` LARGEINT NOT NULL,\n `good_id` LARGEINT NOT NULL,\n `cost` BIGINT SUM NULL DEFAULT "0"\n) ENGINE=OLAP\nAGGREGATE KEY(`user_id`, `good_id`)\nCOMMENT 'OLAP'\nPARTITION BY RANGE(`good_id`)\n(PARTITION p1 VALUES [("-170141183460469231731687303715884105728"), ("100")),\nPARTITION p2 VALUES [("100"), ("200")),\nPARTITION p3 VALUES [("200"), ("300")),\nPARTITION p4 VALUES [("300"), ("400")),\nPARTITION p5 VALUES [("400"), ("500")),\nPARTITION p6 VALUES [("500"), ("600")),\nPARTITION p7 VALUES [("600"), (MAXVALUE)))\nDISTRIBUTED BY HASH(`user_id`) BUCKETS 2\nPROPERTIES (\n"replication_allocation" = "tag.location.default: 1",\n"min_load_replica_num" = "-1",\n"is_being_synced" = "false",\n"storage_medium" = "hdd",\n"storage_format" = "V2",\n"light_schema_change" = "true",\n"disable_auto_compaction" = "false",\n"binlog.enable" = "false",\n"binlog.ttl_seconds" = "9223372036854775807",\n"binlog.max_bytes" = "9223372036854775807",\n"binlog.max_history_nums" = "9223372036854775807",\n"enable_single_replica_compaction" = "false",\n"group_commit_interval_ms" = "10000",\n"group_commit_data_bytes" = "134217728"\n); -- !show -- -show_create_table_and_views_like CREATE TABLE `show_create_table_and_views_like` (\n `user_id` LARGEINT NOT NULL,\n `good_id` LARGEINT NOT NULL,\n `cost` BIGINT SUM NULL DEFAULT "0"\n) ENGINE=OLAP\nAGGREGATE KEY(`user_id`, `good_id`)\nCOMMENT 'OLAP'\nPARTITION BY RANGE(`good_id`)\n(PARTITION p1 VALUES [("-170141183460469231731687303715884105728"), ("100")),\nPARTITION p2 VALUES [("100"), ("200")),\nPARTITION p3 VALUES [("200"), ("300")),\nPARTITION p4 VALUES [("300"), ("400")),\nPARTITION p5 VALUES [("400"), ("500")),\nPARTITION p6 VALUES [("500"), ("600")),\nPARTITION p7 VALUES [("600"), (MAXVALUE)))\nDISTRIBUTED BY HASH(`user_id`) BUCKETS 2\nPROPERTIES (\n"replication_allocation" = "tag.location.default: 1",\n"min_load_replica_num" = "-1",\n"is_being_synced" = "false",\n"storage_medium" = "hdd",\n"storage_format" = "V2",\n"light_schema_change" = "true",\n"disable_auto_compaction" = "false",\n"binlog.enable" = "false",\n"binlog.ttl_seconds" = "9223372036854775807",\n"binlog.max_bytes" = "9223372036854775807",\n"binlog.max_history_nums" = "9223372036854775807",\n"enable_single_replica_compaction" = "false",\n"group_commit_interval_ms" = "10000"\n); +show_create_table_and_views_like CREATE TABLE `show_create_table_and_views_like` (\n `user_id` LARGEINT NOT NULL,\n `good_id` LARGEINT NOT NULL,\n `cost` BIGINT SUM NULL DEFAULT "0"\n) ENGINE=OLAP\nAGGREGATE KEY(`user_id`, `good_id`)\nCOMMENT 'OLAP'\nPARTITION BY RANGE(`good_id`)\n(PARTITION p1 VALUES [("-170141183460469231731687303715884105728"), ("100")),\nPARTITION p2 VALUES [("100"), ("200")),\nPARTITION p3 VALUES [("200"), ("300")),\nPARTITION p4 VALUES [("300"), ("400")),\nPARTITION p5 VALUES [("400"), ("500")),\nPARTITION p6 VALUES [("500"), ("600")),\nPARTITION p7 VALUES [("600"), (MAXVALUE)))\nDISTRIBUTED BY HASH(`user_id`) BUCKETS 2\nPROPERTIES (\n"replication_allocation" = "tag.location.default: 1",\n"min_load_replica_num" = "-1",\n"is_being_synced" = "false",\n"storage_medium" = "hdd",\n"storage_format" = "V2",\n"light_schema_change" = "true",\n"disable_auto_compaction" = "false",\n"binlog.enable" = "false",\n"binlog.ttl_seconds" = "9223372036854775807",\n"binlog.max_bytes" = "9223372036854775807",\n"binlog.max_history_nums" = "9223372036854775807",\n"enable_single_replica_compaction" = "false",\n"group_commit_interval_ms" = "10000",\n"group_commit_data_bytes" = "134217728"\n); -- !show -- -show_create_table_and_views_like_with_rollup CREATE TABLE `show_create_table_and_views_like_with_rollup` (\n `user_id` LARGEINT NOT NULL,\n `good_id` LARGEINT NOT NULL,\n `cost` BIGINT SUM NULL DEFAULT "0"\n) ENGINE=OLAP\nAGGREGATE KEY(`user_id`, `good_id`)\nCOMMENT 'OLAP'\nPARTITION BY RANGE(`good_id`)\n(PARTITION p1 VALUES [("-170141183460469231731687303715884105728"), ("100")),\nPARTITION p2 VALUES [("100"), ("200")),\nPARTITION p3 VALUES [("200"), ("300")),\nPARTITION p4 VALUES [("300"), ("400")),\nPARTITION p5 VALUES [("400"), ("500")),\nPARTITION p6 VALUES [("500"), ("600")),\nPARTITION p7 VALUES [("600"), (MAXVALUE)))\nDISTRIBUTED BY HASH(`user_id`) BUCKETS 2\nPROPERTIES (\n"replication_allocation" = "tag.location.default: 1",\n"min_load_replica_num" = "-1",\n"is_being_synced" = "false",\n"storage_medium" = "hdd",\n"storage_format" = "V2",\n"light_schema_change" = "true",\n"disable_auto_compaction" = "false",\n"binlog.enable" = "false",\n"binlog.ttl_seconds" = "9223372036854775807",\n"binlog.max_bytes" = "9223372036854775807",\n"binlog.max_history_nums" = "9223372036854775807",\n"enable_single_replica_compaction" = "false",\n"group_commit_interval_ms" = "10000"\n); +show_create_table_and_views_like_with_rollup CREATE TABLE `show_create_table_and_views_like_with_rollup` (\n `user_id` LARGEINT NOT NULL,\n `good_id` LARGEINT NOT NULL,\n `cost` BIGINT SUM NULL DEFAULT "0"\n) ENGINE=OLAP\nAGGREGATE KEY(`user_id`, `good_id`)\nCOMMENT 'OLAP'\nPARTITION BY RANGE(`good_id`)\n(PARTITION p1 VALUES [("-170141183460469231731687303715884105728"), ("100")),\nPARTITION p2 VALUES [("100"), ("200")),\nPARTITION p3 VALUES [("200"), ("300")),\nPARTITION p4 VALUES [("300"), ("400")),\nPARTITION p5 VALUES [("400"), ("500")),\nPARTITION p6 VALUES [("500"), ("600")),\nPARTITION p7 VALUES [("600"), (MAXVALUE)))\nDISTRIBUTED BY HASH(`user_id`) BUCKETS 2\nPROPERTIES (\n"replication_allocation" = "tag.location.default: 1",\n"min_load_replica_num" = "-1",\n"is_being_synced" = "false",\n"storage_medium" = "hdd",\n"storage_format" = "V2",\n"light_schema_change" = "true",\n"disable_auto_compaction" = "false",\n"binlog.enable" = "false",\n"binlog.ttl_seconds" = "9223372036854775807",\n"binlog.max_bytes" = "9223372036854775807",\n"binlog.max_history_nums" = "9223372036854775807",\n"enable_single_replica_compaction" = "false",\n"group_commit_interval_ms" = "10000",\n"group_commit_data_bytes" = "134217728"\n); diff --git a/regression-test/suites/insert_p0/test_group_commit_data_bytes_property.groovy b/regression-test/suites/insert_p0/test_group_commit_data_bytes_property.groovy new file mode 100644 index 0000000000..b71ebe0d09 --- /dev/null +++ b/regression-test/suites/insert_p0/test_group_commit_data_bytes_property.groovy @@ -0,0 +1,100 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import com.mysql.cj.jdbc.StatementImpl +import org.codehaus.groovy.runtime.IOGroovyMethods + +suite("test_group_commit_data_bytes_property") { + + def dbName = "regression_test_insert_p0" + def tableName = "test_group_commit_data_bytes_property_tbl" + def table = dbName + "." + tableName + + def group_commit_insert = { sql, expected_row_count -> + def stmt = prepareStatement """ ${sql} """ + def result = stmt.executeUpdate() + logger.info("insert result: " + result) + def serverInfo = (((StatementImpl) stmt).results).getServerInfo() + logger.info("result server info: " + serverInfo) + if (result != expected_row_count) { + logger.warn("insert result: " + result + ", expected_row_count: " + expected_row_count + ", sql: " + sql) + } + // assertEquals(result, expected_row_count) + assertTrue(serverInfo.contains("'status':'PREPARE'")) + assertTrue(serverInfo.contains("'label':'group_commit_")) + return serverInfo + } + + + + for (item in ["legacy", "nereids"]) { + try { + def test_table = table + "_" + item; + sql """ drop table if exists ${test_table} force; """ + sql """ + CREATE table ${test_table} ( + k bigint, + v bigint + ) + UNIQUE KEY(k) + DISTRIBUTED BY HASH (v) BUCKETS 8 + PROPERTIES( + "replication_num" = "1", + "group_commit_data_bytes"="1024" + ); + """ + + connect(user = context.config.jdbcUser, password = context.config.jdbcPassword, url = context.config.jdbcUrl) { + + sql """ set group_commit = async_mode; """ + + if (item == "nereids") { + sql """ set enable_nereids_dml = true; """ + sql """ set enable_nereids_planner=true; """ + //sql """ set enable_fallback_to_original_planner=false; """ + } else { + sql """ set enable_nereids_dml = false; """ + } + + def res1 = sql """show create table ${test_table}""" + assertTrue(res1.toString().contains("\"group_commit_data_bytes\" = \"1024\"")) + + def msg1 = group_commit_insert """insert into ${test_table} values(1,1); """, 1 + + def msg2 = group_commit_insert """insert into ${test_table} values(2,2) """, 1 + + assertEquals(msg1.substring(msg1.indexOf("group_commit")+11, msg1.indexOf("group_commit")+43), msg2.substring(msg2.indexOf("group_commit")+11, msg2.indexOf("group_commit")+43)); + + sql "ALTER table ${test_table} SET (\"group_commit_data_bytes\"=\"1\"); " + + sleep(10000) + + def res2 = sql """show create table ${test_table}""" + assertTrue(res2.toString().contains("\"group_commit_data_bytes\" = \"1\"")) + + def msg3 = group_commit_insert """insert into ${test_table} values(3,3); """, 1 + + def msg4 = group_commit_insert """insert into ${test_table} values(4,4); """, 1 + + assertNotEquals(msg3.substring(msg3.indexOf("group_commit")+11, msg3.indexOf("group_commit")+43), msg4.substring(msg4.indexOf("group_commit")+11, msg4.indexOf("group_commit")+43)); + + } + } finally { + // try_sql("DROP TABLE ${table}") + } + } +}