From 1cd1c58eee40f2a8ba70350262746466aaf07a05 Mon Sep 17 00:00:00 2001 From: abmdocrt Date: Tue, 21 Nov 2023 20:50:02 +0800 Subject: [PATCH] [Feature](group commit) move group_commit_interval_ms from be.conf to table property (#27116) --- be/src/common/config.cpp | 1 - be/src/common/config.h | 1 - be/src/runtime/group_commit_mgr.cpp | 14 +-- be/src/runtime/group_commit_mgr.h | 7 +- .../java/org/apache/doris/alter/Alter.java | 2 + .../doris/alter/SchemaChangeHandler.java | 1 + .../analysis/ModifyTablePropertiesClause.java | 15 +++ .../java/org/apache/doris/catalog/Env.java | 4 + .../org/apache/doris/catalog/OlapTable.java | 8 ++ .../apache/doris/catalog/TableProperty.java | 10 ++ .../doris/common/util/PropertyAnalyzer.java | 31 ++++++ .../doris/datasource/InternalCatalog.java | 9 ++ .../doris/service/FrontendServiceImpl.java | 1 + .../analysis/CreateTableAsSelectStmtTest.java | 63 +++++++---- gensrc/thrift/FrontendService.thrift | 1 + ...test_group_commit_interval_ms_property.out | 13 +++ .../test_show_create_table_and_views.out | 8 +- ...t_group_commit_interval_ms_property.groovy | 100 ++++++++++++++++++ 18 files changed, 253 insertions(+), 36 deletions(-) create mode 100644 regression-test/data/insert_p0/test_group_commit_interval_ms_property.out create mode 100644 regression-test/suites/insert_p0/test_group_commit_interval_ms_property.groovy diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 5f867429bf..59ff35a3a5 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1080,7 +1080,6 @@ DEFINE_Bool(wait_internal_group_commit_finish, "false"); // the count of thread to group commit insert DEFINE_Int32(group_commit_insert_threads, "10"); -DEFINE_mInt32(group_commit_interval_ms, "10000"); DEFINE_mInt32(scan_thread_nice_value, "0"); DEFINE_mInt32(tablet_schema_cache_recycle_interval, "86400"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 6b76d37387..bc93816581 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1149,7 +1149,6 @@ DECLARE_Bool(wait_internal_group_commit_finish); // This config can be set to limit thread number in group commit insert thread pool. DECLARE_mInt32(group_commit_insert_threads); -DECLARE_mInt32(group_commit_interval_ms); // The configuration item is used to lower the priority of the scanner thread, // typically employed to ensure CPU scheduling for write operations. diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp index 00226d9dd7..3cce3b6f56 100644 --- a/be/src/runtime/group_commit_mgr.cpp +++ b/be/src/runtime/group_commit_mgr.cpp @@ -51,10 +51,10 @@ Status LoadBlockQueue::get_block(vectorized::Block* block, bool* find_block, boo *eos = false; std::unique_lock l(mutex); if (!need_commit) { - auto left_milliseconds = config::group_commit_interval_ms - - std::chrono::duration_cast( - std::chrono::steady_clock::now() - _start_time) - .count(); + auto left_milliseconds = + _group_commit_interval_ms - std::chrono::duration_cast( + std::chrono::steady_clock::now() - _start_time) + .count(); if (left_milliseconds <= 0) { need_commit = true; } @@ -62,9 +62,9 @@ Status LoadBlockQueue::get_block(vectorized::Block* block, bool* find_block, boo while (_status.ok() && _block_queue.empty() && (!need_commit || (need_commit && !_load_ids.empty()))) { CHECK(*_single_block_queue_bytes == 0); - auto left_milliseconds = config::group_commit_interval_ms; + auto left_milliseconds = _group_commit_interval_ms; if (!need_commit) { - left_milliseconds = config::group_commit_interval_ms - + left_milliseconds = _group_commit_interval_ms - std::chrono::duration_cast( std::chrono::steady_clock::now() - _start_time) .count(); @@ -251,7 +251,7 @@ Status GroupCommitTable::_create_group_commit_load( { load_block_queue = std::make_shared( instance_id, label, txn_id, schema_version, _all_block_queues_bytes, - result.wait_internal_group_commit_finish); + result.wait_internal_group_commit_finish, result.group_commit_interval_ms); std::unique_lock l(_lock); _load_block_queues.emplace(instance_id, load_block_queue); _need_plan_fragment = false; diff --git a/be/src/runtime/group_commit_mgr.h b/be/src/runtime/group_commit_mgr.h index 9c0a8a047f..2fe3195812 100644 --- a/be/src/runtime/group_commit_mgr.h +++ b/be/src/runtime/group_commit_mgr.h @@ -38,14 +38,15 @@ public: LoadBlockQueue(const UniqueId& load_instance_id, std::string& label, int64_t txn_id, int64_t schema_version, std::shared_ptr all_block_queues_bytes, - bool wait_internal_group_commit_finish) + bool wait_internal_group_commit_finish, int64_t group_commit_interval_ms) : load_instance_id(load_instance_id), label(label), txn_id(txn_id), schema_version(schema_version), wait_internal_group_commit_finish(wait_internal_group_commit_finish), _start_time(std::chrono::steady_clock::now()), - _all_block_queues_bytes(all_block_queues_bytes) { + _all_block_queues_bytes(all_block_queues_bytes), + _group_commit_interval_ms(group_commit_interval_ms) { _single_block_queue_bytes = std::make_shared(0); }; @@ -79,6 +80,8 @@ private: std::shared_ptr _all_block_queues_bytes; // memory consumption of one load block queue, used for correctness check. std::shared_ptr _single_block_queue_bytes; + // group commit interval in ms, can be changed by 'ALTER TABLE my_table SET ("group_commit_interval_ms"="1000");' + int64_t _group_commit_interval_ms; }; class GroupCommitTable { diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java index 965ed055df..fdc27bb62d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java @@ -504,6 +504,8 @@ public class Alter { .containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD) || properties .containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS) + || properties + .containsKey(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_INTERVAL_MS) || properties .containsKey(PropertyAnalyzer.PROPERTIES_ENABLE_SINGLE_REPLICA_COMPACTION) || properties diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java index 3aa8e7888e..3a6cae6402 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java @@ -2193,6 +2193,7 @@ public class SchemaChangeHandler extends AlterHandler { if (isInMemory < 0 && storagePolicyId < 0 && compactionPolicy == null && timeSeriesCompactionConfig.isEmpty() && !properties.containsKey(PropertyAnalyzer.PROPERTIES_IS_BEING_SYNCED) && !properties.containsKey(PropertyAnalyzer.PROPERTIES_ENABLE_SINGLE_REPLICA_COMPACTION) + && !properties.containsKey(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_INTERVAL_MS) && !properties.containsKey(PropertyAnalyzer.PROPERTIES_SKIP_WRITE_INDEX_ON_LOAD)) { LOG.info("Properties already up-to-date"); return; diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java index 59259ba37d..0cbb1d190e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java @@ -226,6 +226,21 @@ public class ModifyTablePropertiesClause extends AlterTableClause { } this.needTableStable = false; this.opType = AlterOpType.MODIFY_TABLE_PROPERTY_SYNC; + } else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_INTERVAL_MS)) { + long groupCommitIntervalMs; + String groupCommitIntervalMsStr = properties.get(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_INTERVAL_MS); + try { + groupCommitIntervalMs = Long.parseLong(groupCommitIntervalMsStr); + if (groupCommitIntervalMs < 0) { + throw new AnalysisException("group_commit_interval_ms can not be less than 0:" + + groupCommitIntervalMsStr); + } + } catch (NumberFormatException e) { + throw new AnalysisException("Invalid group_commit_interval_ms format: " + + groupCommitIntervalMsStr); + } + this.needTableStable = false; + this.opType = AlterOpType.MODIFY_TABLE_PROPERTY_SYNC; } else { throw new AnalysisException("Unknown table property: " + properties.keySet()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 09e5b1ef0d..941e24599b 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -3255,6 +3255,10 @@ public class Env { sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_ENABLE_SINGLE_REPLICA_COMPACTION).append("\" = \""); sb.append(olapTable.enableSingleReplicaCompaction()).append("\""); + // group commit interval ms + sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_INTERVAL_MS).append("\" = \""); + sb.append(olapTable.getGroupCommitIntervalMs()).append("\""); + // enable duplicate without keys by default if (olapTable.isDuplicateWithoutKey()) { sb.append(",\n\"") diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index a1ca88bf7c..aacd2080a3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -1083,6 +1083,14 @@ public class OlapTable extends Table { return null; } + public void setGroupCommitIntervalMs(int groupCommitInterValMs) { + getOrCreatTableProperty().setGroupCommitIntervalMs(groupCommitInterValMs); + } + + public int getGroupCommitIntervalMs() { + return getOrCreatTableProperty().getGroupCommitIntervalMs(); + } + public Boolean hasSequenceCol() { return getSequenceCol() != null; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java index 07c04a907e..9f299befad 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java @@ -476,6 +476,16 @@ public class TableProperty implements Writable { + PropertyAnalyzer.PROPERTIES_SEQUENCE_COL); } + public void setGroupCommitIntervalMs(int groupCommitIntervalMs) { + properties.put(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_INTERVAL_MS, Integer.toString(groupCommitIntervalMs)); + } + + public int getGroupCommitIntervalMs() { + return Integer.parseInt(properties.getOrDefault( + PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_INTERVAL_MS, + Integer.toString(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_INTERVAL_MS_DEFAULT_VALUE))); + } + public void buildReplicaAllocation() { try { // Must copy the properties because "analyzeReplicaAllocation" will remove the property diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java index 5e037c2e3b..46e1ad04c2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java @@ -169,6 +169,9 @@ public class PropertyAnalyzer { private static final double MAX_FPP = 0.05; private static final double MIN_FPP = 0.0001; + public static final String PROPERTIES_GROUP_COMMIT_INTERVAL_MS = "group_commit_interval_ms"; + public static final int PROPERTIES_GROUP_COMMIT_INTERVAL_MS_DEFAULT_VALUE = 10000; + // compaction policy public static final String SIZE_BASED_COMPACTION_POLICY = "size_based"; public static final String TIME_SERIES_COMPACTION_POLICY = "time_series"; @@ -1148,6 +1151,34 @@ public class PropertyAnalyzer { throw new AnalysisException(PropertyAnalyzer.ENABLE_UNIQUE_KEY_MERGE_ON_WRITE + " must be `true` or `false`"); } + /** + * Found property with "group_commit_interval_ms" prefix and return a time in ms. + * e.g. + * "group_commit_interval_ms"="1000" + * Returns: + * 1000 + * + * @param properties + * @param defaultValue + * @return + * @throws AnalysisException + */ + public static int analyzeGroupCommitIntervalMs(Map properties) throws AnalysisException { + int groupCommitIntervalMs = PROPERTIES_GROUP_COMMIT_INTERVAL_MS_DEFAULT_VALUE; + if (properties != null && properties.containsKey(PROPERTIES_GROUP_COMMIT_INTERVAL_MS)) { + String groupIntervalCommitMsStr = properties.get(PROPERTIES_GROUP_COMMIT_INTERVAL_MS); + try { + groupCommitIntervalMs = Integer.parseInt(groupIntervalCommitMsStr); + } catch (Exception e) { + throw new AnalysisException("schema version format error"); + } + + properties.remove(PROPERTIES_GROUP_COMMIT_INTERVAL_MS); + } + + return groupCommitIntervalMs; + } + /** * Check the type property of the catalog props. */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index b7ae9f29c2..6ab7dc2588 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -2399,6 +2399,15 @@ public class InternalCatalog implements CatalogIf { throw new DdlException(e.getMessage()); } + // analyse group commit interval ms + int groupCommitIntervalMs = 0; + try { + groupCommitIntervalMs = PropertyAnalyzer.analyzeGroupCommitIntervalMs(properties); + olapTable.setGroupCommitIntervalMs(groupCommitIntervalMs); + } catch (Exception e) { + throw new DdlException(e.getMessage()); + } + olapTable.initSchemaColumnUniqueId(); olapTable.initAutoIncrentGenerator(db.getId()); olapTable.rebuildFullSchema(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index c1e8e3e3f7..e30acf9973 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -2187,6 +2187,7 @@ public class FrontendServiceImpl implements FrontendService.Iface { result.setDbId(parsedStmt.getTargetTable().getDatabase().getId()); result.setTableId(parsedStmt.getTargetTable().getId()); result.setBaseSchemaVersion(((OlapTable) parsedStmt.getTargetTable()).getBaseSchemaVersion()); + result.setGroupCommitIntervalMs(((OlapTable) parsedStmt.getTargetTable()).getGroupCommitIntervalMs()); result.setWaitInternalGroupCommitFinish(Config.wait_internal_group_commit_finish); } catch (UserException e) { LOG.warn("exec sql error", e); diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateTableAsSelectStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateTableAsSelectStmtTest.java index 86db19e1f8..38eb87b057 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateTableAsSelectStmtTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateTableAsSelectStmtTest.java @@ -97,7 +97,8 @@ public class CreateTableAsSelectStmtTest extends TestWithFeService { + "\"storage_format\" = \"V2\",\n" + "\"light_schema_change\" = \"true\",\n" + "\"disable_auto_compaction\" = \"false\",\n" - + "\"enable_single_replica_compaction\" = \"false\"\n" + + "\"enable_single_replica_compaction\" = \"false\",\n" + + "\"group_commit_interval_ms\" = \"10000\"\n" + ");", showCreateTableByName("select_decimal_table").getResultRows().get(0).get(1)); String selectFromDecimal1 = @@ -119,7 +120,8 @@ public class CreateTableAsSelectStmtTest extends TestWithFeService { + "\"storage_format\" = \"V2\",\n" + "\"light_schema_change\" = \"true\",\n" + "\"disable_auto_compaction\" = \"false\",\n" - + "\"enable_single_replica_compaction\" = \"false\"\n" + + "\"enable_single_replica_compaction\" = \"false\",\n" + + "\"group_commit_interval_ms\" = \"10000\"\n" + ");", showCreateTableByName("select_decimal_table_1").getResultRows().get(0).get(1)); } else { @@ -137,7 +139,8 @@ public class CreateTableAsSelectStmtTest extends TestWithFeService { + "\"storage_format\" = \"V2\",\n" + "\"light_schema_change\" = \"true\",\n" + "\"disable_auto_compaction\" = \"false\",\n" - + "\"enable_single_replica_compaction\" = \"false\"\n" + + "\"enable_single_replica_compaction\" = \"false\",\n" + + "\"group_commit_interval_ms\" = \"10000\"\n" + ");", showCreateTableByName("select_decimal_table_1").getResultRows().get(0).get(1)); } @@ -173,7 +176,8 @@ public class CreateTableAsSelectStmtTest extends TestWithFeService { + "\"storage_format\" = \"V2\",\n" + "\"light_schema_change\" = \"true\",\n" + "\"disable_auto_compaction\" = \"false\",\n" - + "\"enable_single_replica_compaction\" = \"false\"\n" + + "\"enable_single_replica_compaction\" = \"false\",\n" + + "\"group_commit_interval_ms\" = \"10000\"\n" + ");", showResultSet.getResultRows().get(0).get(1)); } @@ -198,7 +202,8 @@ public class CreateTableAsSelectStmtTest extends TestWithFeService { + "\"storage_format\" = \"V2\",\n" + "\"light_schema_change\" = \"true\",\n" + "\"disable_auto_compaction\" = \"false\",\n" - + "\"enable_single_replica_compaction\" = \"false\"\n" + + "\"enable_single_replica_compaction\" = \"false\",\n" + + "\"group_commit_interval_ms\" = \"10000\"\n" + ");", showResultSet1.getResultRows().get(0).get(1)); @@ -225,7 +230,8 @@ public class CreateTableAsSelectStmtTest extends TestWithFeService { + "\"storage_format\" = \"V2\",\n" + "\"light_schema_change\" = \"true\",\n" + "\"disable_auto_compaction\" = \"false\",\n" - + "\"enable_single_replica_compaction\" = \"false\"\n" + + "\"enable_single_replica_compaction\" = \"false\",\n" + + "\"group_commit_interval_ms\" = \"10000\"\n" + ");", showResultSet2.getResultRows().get(0).get(1)); } @@ -249,7 +255,8 @@ public class CreateTableAsSelectStmtTest extends TestWithFeService { + "\"storage_format\" = \"V2\",\n" + "\"light_schema_change\" = \"true\",\n" + "\"disable_auto_compaction\" = \"false\",\n" - + "\"enable_single_replica_compaction\" = \"false\"\n" + + "\"enable_single_replica_compaction\" = \"false\",\n" + + "\"group_commit_interval_ms\" = \"10000\"\n" + ");", showResultSet1.getResultRows().get(0).get(1)); String selectAlias2 = "create table `test`.`select_alias_2` PROPERTIES(\"replication_num\" = \"1\") " + "as select userId as alias_name, username from `test`.`varchar_table`"; @@ -269,7 +276,8 @@ public class CreateTableAsSelectStmtTest extends TestWithFeService { + "\"storage_format\" = \"V2\",\n" + "\"light_schema_change\" = \"true\",\n" + "\"disable_auto_compaction\" = \"false\",\n" - + "\"enable_single_replica_compaction\" = \"false\"\n" + + "\"enable_single_replica_compaction\" = \"false\",\n" + + "\"group_commit_interval_ms\" = \"10000\"\n" + ");", showResultSet2.getResultRows().get(0).get(1)); } @@ -296,7 +304,8 @@ public class CreateTableAsSelectStmtTest extends TestWithFeService { + "\"storage_format\" = \"V2\",\n" + "\"light_schema_change\" = \"true\",\n" + "\"disable_auto_compaction\" = \"false\",\n" - + "\"enable_single_replica_compaction\" = \"false\"\n" + + "\"enable_single_replica_compaction\" = \"false\",\n" + + "\"group_commit_interval_ms\" = \"10000\"\n" + ");", showResultSet.getResultRows().get(0).get(1)); String selectFromJoin1 = "create table `test`.`select_join1` PROPERTIES(\"replication_num\" = \"1\") " @@ -320,7 +329,8 @@ public class CreateTableAsSelectStmtTest extends TestWithFeService { + "\"storage_format\" = \"V2\",\n" + "\"light_schema_change\" = \"true\",\n" + "\"disable_auto_compaction\" = \"false\",\n" - + "\"enable_single_replica_compaction\" = \"false\"\n" + + "\"enable_single_replica_compaction\" = \"false\",\n" + + "\"group_commit_interval_ms\" = \"10000\"\n" + ");", showResultSet1.getResultRows().get(0).get(1)); } @@ -348,7 +358,8 @@ public class CreateTableAsSelectStmtTest extends TestWithFeService { + "\"storage_format\" = \"V2\",\n" + "\"light_schema_change\" = \"true\",\n" + "\"disable_auto_compaction\" = \"false\",\n" - + "\"enable_single_replica_compaction\" = \"false\"\n" + + "\"enable_single_replica_compaction\" = \"false\",\n" + + "\"group_commit_interval_ms\" = \"10000\"\n" + ");", showResultSet.getResultRows().get(0).get(1)); } @@ -373,7 +384,8 @@ public class CreateTableAsSelectStmtTest extends TestWithFeService { + "\"storage_format\" = \"V2\",\n" + "\"light_schema_change\" = \"true\",\n" + "\"disable_auto_compaction\" = \"false\",\n" - + "\"enable_single_replica_compaction\" = \"false\"\n" + + "\"enable_single_replica_compaction\" = \"false\",\n" + + "\"group_commit_interval_ms\" = \"10000\"\n" + ");", showResultSet.getResultRows().get(0).get(1)); } @@ -397,7 +409,8 @@ public class CreateTableAsSelectStmtTest extends TestWithFeService { + "\"storage_format\" = \"V2\",\n" + "\"light_schema_change\" = \"true\",\n" + "\"disable_auto_compaction\" = \"false\",\n" - + "\"enable_single_replica_compaction\" = \"false\"\n" + + "\"enable_single_replica_compaction\" = \"false\",\n" + + "\"group_commit_interval_ms\" = \"10000\"\n" + ");", showResultSet.getResultRows().get(0).get(1)); String selectFromCteAndUnion = "create table `test`.`select_cte_union` PROPERTIES(\"replication_num\" = \"1\")" @@ -417,7 +430,8 @@ public class CreateTableAsSelectStmtTest extends TestWithFeService { + "\"storage_format\" = \"V2\",\n" + "\"light_schema_change\" = \"true\",\n" + "\"disable_auto_compaction\" = \"false\",\n" - + "\"enable_single_replica_compaction\" = \"false\"\n" + + "\"enable_single_replica_compaction\" = \"false\",\n" + + "\"group_commit_interval_ms\" = \"10000\"\n" + ");", showResultSet1.getResultRows().get(0).get(1)); } @@ -444,7 +458,8 @@ public class CreateTableAsSelectStmtTest extends TestWithFeService { + "\"storage_format\" = \"V2\",\n" + "\"light_schema_change\" = \"true\",\n" + "\"disable_auto_compaction\" = \"false\",\n" - + "\"enable_single_replica_compaction\" = \"false\"\n" + + "\"enable_single_replica_compaction\" = \"false\",\n" + + "\"group_commit_interval_ms\" = \"10000\"\n" + ");", showResultSet.getResultRows().get(0).get(1)); } @@ -470,7 +485,8 @@ public class CreateTableAsSelectStmtTest extends TestWithFeService { + "\"storage_format\" = \"V2\",\n" + "\"light_schema_change\" = \"true\",\n" + "\"disable_auto_compaction\" = \"false\",\n" - + "\"enable_single_replica_compaction\" = \"false\"\n" + + "\"enable_single_replica_compaction\" = \"false\",\n" + + "\"group_commit_interval_ms\" = \"10000\"\n" + ");", showResultSet.getResultRows().get(0).get(1)); } @@ -495,7 +511,8 @@ public class CreateTableAsSelectStmtTest extends TestWithFeService { + "\"storage_format\" = \"V2\",\n" + "\"light_schema_change\" = \"true\",\n" + "\"disable_auto_compaction\" = \"false\",\n" - + "\"enable_single_replica_compaction\" = \"false\"\n" + + "\"enable_single_replica_compaction\" = \"false\",\n" + + "\"group_commit_interval_ms\" = \"10000\"\n" + ");", showResultSet.getResultRows().get(0).get(1)); } @@ -521,7 +538,8 @@ public class CreateTableAsSelectStmtTest extends TestWithFeService { + "\"storage_format\" = \"V2\",\n" + "\"light_schema_change\" = \"true\",\n" + "\"disable_auto_compaction\" = \"false\",\n" - + "\"enable_single_replica_compaction\" = \"false\"\n" + + "\"enable_single_replica_compaction\" = \"false\",\n" + + "\"group_commit_interval_ms\" = \"10000\"\n" + ");", showResultSet.getResultRows().get(0).get(1)); } @@ -571,7 +589,8 @@ public class CreateTableAsSelectStmtTest extends TestWithFeService { + "\"storage_format\" = \"V2\",\n" + "\"light_schema_change\" = \"true\",\n" + "\"disable_auto_compaction\" = \"false\",\n" - + "\"enable_single_replica_compaction\" = \"false\"\n" + + "\"enable_single_replica_compaction\" = \"false\",\n" + + "\"group_commit_interval_ms\" = \"10000\"\n" + ");", createTableStmts.get(0)); } else { @@ -589,7 +608,8 @@ public class CreateTableAsSelectStmtTest extends TestWithFeService { + "\"storage_format\" = \"V2\",\n" + "\"light_schema_change\" = \"true\",\n" + "\"disable_auto_compaction\" = \"false\",\n" - + "\"enable_single_replica_compaction\" = \"false\"\n" + + "\"enable_single_replica_compaction\" = \"false\",\n" + + "\"group_commit_interval_ms\" = \"10000\"\n" + ");", createTableStmts.get(0)); } @@ -621,7 +641,8 @@ public class CreateTableAsSelectStmtTest extends TestWithFeService { + "\"storage_format\" = \"V2\",\n" + "\"light_schema_change\" = \"true\",\n" + "\"disable_auto_compaction\" = \"false\",\n" - + "\"enable_single_replica_compaction\" = \"false\"\n" + + "\"enable_single_replica_compaction\" = \"false\",\n" + + "\"group_commit_interval_ms\" = \"10000\"\n" + ");", showStr); } diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index f7c94b37e9..7d67bb1f90 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -654,6 +654,7 @@ struct TStreamLoadPutResult { 5: optional i64 db_id 6: optional i64 table_id 7: optional bool wait_internal_group_commit_finish = false + 8: optional i64 group_commit_interval_ms } struct TStreamLoadMultiTablePutResult { diff --git a/regression-test/data/insert_p0/test_group_commit_interval_ms_property.out b/regression-test/data/insert_p0/test_group_commit_interval_ms_property.out new file mode 100644 index 0000000000..54013be8c1 --- /dev/null +++ b/regression-test/data/insert_p0/test_group_commit_interval_ms_property.out @@ -0,0 +1,13 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !1 -- +test_group_commit_interval_ms_property_tbl CREATE TABLE `test_group_commit_interval_ms_property_tbl` (\n `k` BIGINT NULL,\n `v` BIGINT NULL\n) ENGINE=OLAP\nUNIQUE KEY(`k`)\nCOMMENT 'OLAP'\nDISTRIBUTED BY HASH(`v`) BUCKETS 8\nPROPERTIES (\n"replication_allocation" = "tag.location.default: 1",\n"min_load_replica_num" = "-1",\n"is_being_synced" = "false",\n"storage_format" = "V2",\n"light_schema_change" = "true",\n"disable_auto_compaction" = "false",\n"binlog.enable" = "false",\n"binlog.ttl_seconds" = "9223372036854775807",\n"binlog.max_bytes" = "9223372036854775807",\n"binlog.max_history_nums" = "9223372036854775807",\n"enable_single_replica_compaction" = "false",\n"group_commit_interval_ms" = "10000"\n); + +-- !2 -- +test_group_commit_interval_ms_property_tbl CREATE TABLE `test_group_commit_interval_ms_property_tbl` (\n `k` BIGINT NULL,\n `v` BIGINT NULL\n) ENGINE=OLAP\nUNIQUE KEY(`k`)\nCOMMENT 'OLAP'\nDISTRIBUTED BY HASH(`v`) BUCKETS 8\nPROPERTIES (\n"replication_allocation" = "tag.location.default: 1",\n"min_load_replica_num" = "-1",\n"is_being_synced" = "false",\n"storage_format" = "V2",\n"light_schema_change" = "true",\n"disable_auto_compaction" = "false",\n"binlog.enable" = "false",\n"binlog.ttl_seconds" = "9223372036854775807",\n"binlog.max_bytes" = "9223372036854775807",\n"binlog.max_history_nums" = "9223372036854775807",\n"enable_single_replica_compaction" = "false",\n"group_commit_interval_ms" = "1000"\n); + +-- !1 -- +test_group_commit_interval_ms_property_tbl CREATE TABLE `test_group_commit_interval_ms_property_tbl` (\n `k` BIGINT NULL,\n `v` BIGINT NULL\n) ENGINE=OLAP\nUNIQUE KEY(`k`)\nCOMMENT 'OLAP'\nDISTRIBUTED BY HASH(`v`) BUCKETS 8\nPROPERTIES (\n"replication_allocation" = "tag.location.default: 1",\n"min_load_replica_num" = "-1",\n"is_being_synced" = "false",\n"storage_format" = "V2",\n"light_schema_change" = "true",\n"disable_auto_compaction" = "false",\n"binlog.enable" = "false",\n"binlog.ttl_seconds" = "9223372036854775807",\n"binlog.max_bytes" = "9223372036854775807",\n"binlog.max_history_nums" = "9223372036854775807",\n"enable_single_replica_compaction" = "false",\n"group_commit_interval_ms" = "10000"\n); + +-- !2 -- +test_group_commit_interval_ms_property_tbl CREATE TABLE `test_group_commit_interval_ms_property_tbl` (\n `k` BIGINT NULL,\n `v` BIGINT NULL\n) ENGINE=OLAP\nUNIQUE KEY(`k`)\nCOMMENT 'OLAP'\nDISTRIBUTED BY HASH(`v`) BUCKETS 8\nPROPERTIES (\n"replication_allocation" = "tag.location.default: 1",\n"min_load_replica_num" = "-1",\n"is_being_synced" = "false",\n"storage_format" = "V2",\n"light_schema_change" = "true",\n"disable_auto_compaction" = "false",\n"binlog.enable" = "false",\n"binlog.ttl_seconds" = "9223372036854775807",\n"binlog.max_bytes" = "9223372036854775807",\n"binlog.max_history_nums" = "9223372036854775807",\n"enable_single_replica_compaction" = "false",\n"group_commit_interval_ms" = "1000"\n); + diff --git a/regression-test/data/show_p0/test_show_create_table_and_views.out b/regression-test/data/show_p0/test_show_create_table_and_views.out index ccd97f308f..68ca0b517b 100644 --- a/regression-test/data/show_p0/test_show_create_table_and_views.out +++ b/regression-test/data/show_p0/test_show_create_table_and_views.out @@ -1,6 +1,6 @@ -- This file is automatically generated. You should know what you did if you want to edit this -- !show -- -show_create_table_and_views_table CREATE TABLE `show_create_table_and_views_table` (\n `user_id` LARGEINT NOT NULL,\n `good_id` LARGEINT NOT NULL,\n `cost` BIGINT SUM NULL DEFAULT "0"\n) ENGINE=OLAP\nAGGREGATE KEY(`user_id`, `good_id`)\nCOMMENT 'OLAP'\nPARTITION BY RANGE(`good_id`)\n(PARTITION p1 VALUES [("-170141183460469231731687303715884105728"), ("100")),\nPARTITION p2 VALUES [("100"), ("200")),\nPARTITION p3 VALUES [("200"), ("300")),\nPARTITION p4 VALUES [("300"), ("400")),\nPARTITION p5 VALUES [("400"), ("500")),\nPARTITION p6 VALUES [("500"), ("600")),\nPARTITION p7 VALUES [("600"), (MAXVALUE)))\nDISTRIBUTED BY HASH(`user_id`) BUCKETS 2\nPROPERTIES (\n"replication_allocation" = "tag.location.default: 1",\n"min_load_replica_num" = "-1",\n"is_being_synced" = "false",\n"storage_format" = "V2",\n"light_schema_change" = "true",\n"disable_auto_compaction" = "false",\n"binlog.enable" = "false",\n"binlog.ttl_seconds" = "9223372036854775807",\n"binlog.max_bytes" = "9223372036854775807",\n"binlog.max_history_nums" = "9223372036854775807",\n"enable_single_replica_compaction" = "false"\n); +show_create_table_and_views_table CREATE TABLE `show_create_table_and_views_table` (\n `user_id` LARGEINT NOT NULL,\n `good_id` LARGEINT NOT NULL,\n `cost` BIGINT SUM NULL DEFAULT "0"\n) ENGINE=OLAP\nAGGREGATE KEY(`user_id`, `good_id`)\nCOMMENT 'OLAP'\nPARTITION BY RANGE(`good_id`)\n(PARTITION p1 VALUES [("-170141183460469231731687303715884105728"), ("100")),\nPARTITION p2 VALUES [("100"), ("200")),\nPARTITION p3 VALUES [("200"), ("300")),\nPARTITION p4 VALUES [("300"), ("400")),\nPARTITION p5 VALUES [("400"), ("500")),\nPARTITION p6 VALUES [("500"), ("600")),\nPARTITION p7 VALUES [("600"), (MAXVALUE)))\nDISTRIBUTED BY HASH(`user_id`) BUCKETS 2\nPROPERTIES (\n"replication_allocation" = "tag.location.default: 1",\n"min_load_replica_num" = "-1",\n"is_being_synced" = "false",\n"storage_format" = "V2",\n"light_schema_change" = "true",\n"disable_auto_compaction" = "false",\n"binlog.enable" = "false",\n"binlog.ttl_seconds" = "9223372036854775807",\n"binlog.max_bytes" = "9223372036854775807",\n"binlog.max_history_nums" = "9223372036854775807",\n"enable_single_replica_compaction" = "false",\n"group_commit_interval_ms" = "10000"\n); -- !select -- 1 1 30 @@ -36,11 +36,11 @@ show_create_table_and_views_view CREATE VIEW `show_create_table_and_views_view` 300 1 -- !show -- -show_create_table_and_views_table CREATE TABLE `show_create_table_and_views_table` (\n `user_id` LARGEINT NOT NULL,\n `good_id` LARGEINT NOT NULL,\n `cost` BIGINT SUM NULL DEFAULT "0"\n) ENGINE=OLAP\nAGGREGATE KEY(`user_id`, `good_id`)\nCOMMENT 'OLAP'\nPARTITION BY RANGE(`good_id`)\n(PARTITION p1 VALUES [("-170141183460469231731687303715884105728"), ("100")),\nPARTITION p2 VALUES [("100"), ("200")),\nPARTITION p3 VALUES [("200"), ("300")),\nPARTITION p4 VALUES [("300"), ("400")),\nPARTITION p5 VALUES [("400"), ("500")),\nPARTITION p6 VALUES [("500"), ("600")),\nPARTITION p7 VALUES [("600"), (MAXVALUE)))\nDISTRIBUTED BY HASH(`user_id`) BUCKETS 2\nPROPERTIES (\n"replication_allocation" = "tag.location.default: 1",\n"min_load_replica_num" = "-1",\n"is_being_synced" = "false",\n"storage_format" = "V2",\n"light_schema_change" = "true",\n"disable_auto_compaction" = "false",\n"binlog.enable" = "false",\n"binlog.ttl_seconds" = "9223372036854775807",\n"binlog.max_bytes" = "9223372036854775807",\n"binlog.max_history_nums" = "9223372036854775807",\n"enable_single_replica_compaction" = "false"\n); +show_create_table_and_views_table CREATE TABLE `show_create_table_and_views_table` (\n `user_id` LARGEINT NOT NULL,\n `good_id` LARGEINT NOT NULL,\n `cost` BIGINT SUM NULL DEFAULT "0"\n) ENGINE=OLAP\nAGGREGATE KEY(`user_id`, `good_id`)\nCOMMENT 'OLAP'\nPARTITION BY RANGE(`good_id`)\n(PARTITION p1 VALUES [("-170141183460469231731687303715884105728"), ("100")),\nPARTITION p2 VALUES [("100"), ("200")),\nPARTITION p3 VALUES [("200"), ("300")),\nPARTITION p4 VALUES [("300"), ("400")),\nPARTITION p5 VALUES [("400"), ("500")),\nPARTITION p6 VALUES [("500"), ("600")),\nPARTITION p7 VALUES [("600"), (MAXVALUE)))\nDISTRIBUTED BY HASH(`user_id`) BUCKETS 2\nPROPERTIES (\n"replication_allocation" = "tag.location.default: 1",\n"min_load_replica_num" = "-1",\n"is_being_synced" = "false",\n"storage_format" = "V2",\n"light_schema_change" = "true",\n"disable_auto_compaction" = "false",\n"binlog.enable" = "false",\n"binlog.ttl_seconds" = "9223372036854775807",\n"binlog.max_bytes" = "9223372036854775807",\n"binlog.max_history_nums" = "9223372036854775807",\n"enable_single_replica_compaction" = "false",\n"group_commit_interval_ms" = "10000"\n); -- !show -- -show_create_table_and_views_like CREATE TABLE `show_create_table_and_views_like` (\n `user_id` LARGEINT NOT NULL,\n `good_id` LARGEINT NOT NULL,\n `cost` BIGINT SUM NULL DEFAULT "0"\n) ENGINE=OLAP\nAGGREGATE KEY(`user_id`, `good_id`)\nCOMMENT 'OLAP'\nPARTITION BY RANGE(`good_id`)\n(PARTITION p1 VALUES [("-170141183460469231731687303715884105728"), ("100")),\nPARTITION p2 VALUES [("100"), ("200")),\nPARTITION p3 VALUES [("200"), ("300")),\nPARTITION p4 VALUES [("300"), ("400")),\nPARTITION p5 VALUES [("400"), ("500")),\nPARTITION p6 VALUES [("500"), ("600")),\nPARTITION p7 VALUES [("600"), (MAXVALUE)))\nDISTRIBUTED BY HASH(`user_id`) BUCKETS 2\nPROPERTIES (\n"replication_allocation" = "tag.location.default: 1",\n"min_load_replica_num" = "-1",\n"is_being_synced" = "false",\n"storage_format" = "V2",\n"light_schema_change" = "true",\n"disable_auto_compaction" = "false",\n"binlog.enable" = "false",\n"binlog.ttl_seconds" = "9223372036854775807",\n"binlog.max_bytes" = "9223372036854775807",\n"binlog.max_history_nums" = "9223372036854775807",\n"enable_single_replica_compaction" = "false"\n); +show_create_table_and_views_like CREATE TABLE `show_create_table_and_views_like` (\n `user_id` LARGEINT NOT NULL,\n `good_id` LARGEINT NOT NULL,\n `cost` BIGINT SUM NULL DEFAULT "0"\n) ENGINE=OLAP\nAGGREGATE KEY(`user_id`, `good_id`)\nCOMMENT 'OLAP'\nPARTITION BY RANGE(`good_id`)\n(PARTITION p1 VALUES [("-170141183460469231731687303715884105728"), ("100")),\nPARTITION p2 VALUES [("100"), ("200")),\nPARTITION p3 VALUES [("200"), ("300")),\nPARTITION p4 VALUES [("300"), ("400")),\nPARTITION p5 VALUES [("400"), ("500")),\nPARTITION p6 VALUES [("500"), ("600")),\nPARTITION p7 VALUES [("600"), (MAXVALUE)))\nDISTRIBUTED BY HASH(`user_id`) BUCKETS 2\nPROPERTIES (\n"replication_allocation" = "tag.location.default: 1",\n"min_load_replica_num" = "-1",\n"is_being_synced" = "false",\n"storage_format" = "V2",\n"light_schema_change" = "true",\n"disable_auto_compaction" = "false",\n"binlog.enable" = "false",\n"binlog.ttl_seconds" = "9223372036854775807",\n"binlog.max_bytes" = "9223372036854775807",\n"binlog.max_history_nums" = "9223372036854775807",\n"enable_single_replica_compaction" = "false",\n"group_commit_interval_ms" = "10000"\n); -- !show -- -show_create_table_and_views_like_with_rollup CREATE TABLE `show_create_table_and_views_like_with_rollup` (\n `user_id` LARGEINT NOT NULL,\n `good_id` LARGEINT NOT NULL,\n `cost` BIGINT SUM NULL DEFAULT "0"\n) ENGINE=OLAP\nAGGREGATE KEY(`user_id`, `good_id`)\nCOMMENT 'OLAP'\nPARTITION BY RANGE(`good_id`)\n(PARTITION p1 VALUES [("-170141183460469231731687303715884105728"), ("100")),\nPARTITION p2 VALUES [("100"), ("200")),\nPARTITION p3 VALUES [("200"), ("300")),\nPARTITION p4 VALUES [("300"), ("400")),\nPARTITION p5 VALUES [("400"), ("500")),\nPARTITION p6 VALUES [("500"), ("600")),\nPARTITION p7 VALUES [("600"), (MAXVALUE)))\nDISTRIBUTED BY HASH(`user_id`) BUCKETS 2\nPROPERTIES (\n"replication_allocation" = "tag.location.default: 1",\n"min_load_replica_num" = "-1",\n"is_being_synced" = "false",\n"storage_format" = "V2",\n"light_schema_change" = "true",\n"disable_auto_compaction" = "false",\n"binlog.enable" = "false",\n"binlog.ttl_seconds" = "9223372036854775807",\n"binlog.max_bytes" = "9223372036854775807",\n"binlog.max_history_nums" = "9223372036854775807",\n"enable_single_replica_compaction" = "false"\n); +show_create_table_and_views_like_with_rollup CREATE TABLE `show_create_table_and_views_like_with_rollup` (\n `user_id` LARGEINT NOT NULL,\n `good_id` LARGEINT NOT NULL,\n `cost` BIGINT SUM NULL DEFAULT "0"\n) ENGINE=OLAP\nAGGREGATE KEY(`user_id`, `good_id`)\nCOMMENT 'OLAP'\nPARTITION BY RANGE(`good_id`)\n(PARTITION p1 VALUES [("-170141183460469231731687303715884105728"), ("100")),\nPARTITION p2 VALUES [("100"), ("200")),\nPARTITION p3 VALUES [("200"), ("300")),\nPARTITION p4 VALUES [("300"), ("400")),\nPARTITION p5 VALUES [("400"), ("500")),\nPARTITION p6 VALUES [("500"), ("600")),\nPARTITION p7 VALUES [("600"), (MAXVALUE)))\nDISTRIBUTED BY HASH(`user_id`) BUCKETS 2\nPROPERTIES (\n"replication_allocation" = "tag.location.default: 1",\n"min_load_replica_num" = "-1",\n"is_being_synced" = "false",\n"storage_format" = "V2",\n"light_schema_change" = "true",\n"disable_auto_compaction" = "false",\n"binlog.enable" = "false",\n"binlog.ttl_seconds" = "9223372036854775807",\n"binlog.max_bytes" = "9223372036854775807",\n"binlog.max_history_nums" = "9223372036854775807",\n"enable_single_replica_compaction" = "false",\n"group_commit_interval_ms" = "10000"\n); diff --git a/regression-test/suites/insert_p0/test_group_commit_interval_ms_property.groovy b/regression-test/suites/insert_p0/test_group_commit_interval_ms_property.groovy new file mode 100644 index 0000000000..c53dd32562 --- /dev/null +++ b/regression-test/suites/insert_p0/test_group_commit_interval_ms_property.groovy @@ -0,0 +1,100 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import com.mysql.cj.jdbc.StatementImpl +import org.codehaus.groovy.runtime.IOGroovyMethods + +suite("test_group_commit_interval_ms_property") { + + def dbName = "regression_test_insert_p0" + def tableName = "test_group_commit_interval_ms_property_tbl" + def table = dbName + "." + tableName + + def group_commit_insert = { sql, expected_row_count -> + def stmt = prepareStatement """ ${sql} """ + def result = stmt.executeUpdate() + logger.info("insert result: " + result) + def serverInfo = (((StatementImpl) stmt).results).getServerInfo() + logger.info("result server info: " + serverInfo) + if (result != expected_row_count) { + logger.warn("insert result: " + result + ", expected_row_count: " + expected_row_count + ", sql: " + sql) + } + // assertEquals(result, expected_row_count) + assertTrue(serverInfo.contains("'status':'PREPARE'")) + assertTrue(serverInfo.contains("'label':'group_commit_")) + return serverInfo + } + + for (item in ["legacy", "nereids"]) { + try { + // create table + sql """ drop table if exists ${table}; """ + + sql """ + CREATE TABLE ${table} ( + k bigint, + v bigint + ) + UNIQUE KEY(k) + DISTRIBUTED BY HASH (v) BUCKETS 8 + PROPERTIES( + "replication_num" = "1", + "group_commit_interval_ms"="10000" + ); + """ + + connect(user = context.config.jdbcUser, password = context.config.jdbcPassword, url = context.config.jdbcUrl) { + + sql "set enable_insert_group_commit = true;" + + if (item == "nereids") { + sql """ set enable_nereids_dml = true; """ + sql """ set enable_nereids_planner=true; """ + sql """ set enable_fallback_to_original_planner=false; """ + } else { + sql """ set enable_nereids_dml = false; """ + } + + qt_1 "show create table ${table}" + + def msg1 = group_commit_insert """insert into ${table} values(1,1); """, 1 + + Thread.sleep(8000); + + def msg2 = group_commit_insert """insert into ${table} values(2,2) """, 1 + + assertEquals(msg1.substring(msg1.indexOf("group_commit")+11, msg1.indexOf("group_commit")+43), msg2.substring(msg2.indexOf("group_commit")+11, msg2.indexOf("group_commit")+43)); + + sql "ALTER TABLE ${table} SET (\"group_commit_interval_ms\"=\"1000\"); " + + qt_2 "show create table ${table}" + + def msg3 = group_commit_insert """insert into ${table} values(3,3); """, 1 + + Thread.sleep(2000); + + def msg4 = group_commit_insert """insert into ${table} values(4,4); """, 1 + + assertNotEquals(msg3.substring(msg3.indexOf("group_commit")+11, msg3.indexOf("group_commit")+43), msg4.substring(msg4.indexOf("group_commit")+11, msg4.indexOf("group_commit")+43)); + + sql "DROP TABLE ${table}" + } + } finally { + // try_sql("DROP TABLE ${table}") + } + } +}