diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index a5ba48d337..1a7da1471e 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1092,6 +1092,7 @@ DEFINE_Int32(group_commit_sync_wal_batch, "10"); // the count of thread to group commit insert DEFINE_Int32(group_commit_insert_threads, "10"); +DEFINE_mInt32(group_commit_interval_seconds, "10"); DEFINE_mInt32(scan_thread_nice_value, "0"); DEFINE_mInt32(tablet_schema_cache_recycle_interval, "86400"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 3010cf4976..224ba26f67 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1156,6 +1156,7 @@ DECLARE_Int32(group_commit_sync_wal_batch); // This config can be set to limit thread number in group commit insert thread pool. DECLARE_mInt32(group_commit_insert_threads); +DECLARE_mInt32(group_commit_interval_seconds); // The configuration item is used to lower the priority of the scanner thread, // typically employed to ensure CPU scheduling for write operations. diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp index a876a055e9..5043d6f58a 100644 --- a/be/src/runtime/group_commit_mgr.cpp +++ b/be/src/runtime/group_commit_mgr.cpp @@ -62,21 +62,22 @@ Status LoadBlockQueue::get_block(vectorized::Block* block, bool* find_block, boo *eos = false; std::unique_lock l(*_mutex); if (!need_commit) { - auto left_seconds = 10 - std::chrono::duration_cast( - std::chrono::steady_clock::now() - _start_time) - .count(); + auto left_seconds = config::group_commit_interval_seconds - + std::chrono::duration_cast( + std::chrono::steady_clock::now() - _start_time) + .count(); if (left_seconds <= 0) { need_commit = true; } } while (_status.ok() && _block_queue.empty() && (!need_commit || (need_commit && !_load_ids.empty()))) { - // TODO make 10s as a config - auto left_seconds = 10; + auto left_seconds = config::group_commit_interval_seconds; if (!need_commit) { - left_seconds = 10 - std::chrono::duration_cast( - std::chrono::steady_clock::now() - _start_time) - .count(); + left_seconds = config::group_commit_interval_seconds - + std::chrono::duration_cast( + std::chrono::steady_clock::now() - _start_time) + .count(); if (left_seconds <= 0) { need_commit = true; break; @@ -470,10 +471,10 @@ Status GroupCommitMgr::group_commit_insert(int64_t table_id, const TPlan& plan, response->set_txn_id(load_block_queue->txn_id); } // TODO what to do if add one block error - RETURN_IF_ERROR(load_block_queue->add_block(future_block)); if (future_block->rows() > 0) { future_blocks.emplace_back(future_block); } + RETURN_IF_ERROR(load_block_queue->add_block(future_block)); first = false; } if (!runtime_state->get_error_log_file_path().empty()) { diff --git a/regression-test/suites/insert_p0/insert_group_commit_into_duplicate.groovy b/regression-test/suites/insert_p0/insert_group_commit_into_duplicate.groovy index 687aebd2e0..eed1a26f14 100644 --- a/regression-test/suites/insert_p0/insert_group_commit_into_duplicate.groovy +++ b/regression-test/suites/insert_p0/insert_group_commit_into_duplicate.groovy @@ -15,8 +15,12 @@ // specific language governing permissions and limitations // under the License. +import com.mysql.cj.jdbc.StatementImpl + suite("insert_group_commit_into_duplicate") { - def table = "insert_group_commit_into_duplicate" + def dbName = "regression_test_insert_p0" + def tableName = "insert_group_commit_into_duplicate" + def table = dbName + "." + tableName def getRowCount = { expectedRowCount -> def retry = 0 @@ -33,11 +37,12 @@ suite("insert_group_commit_into_duplicate") { def getAlterTableState = { def retry = 0 + sql "use ${dbName};" while (true) { sleep(2000) - def state = sql "show alter table column where tablename = '${table}' order by CreateTime desc " + def state = sql " show alter table column where tablename = '${tableName}' order by CreateTime desc " logger.info("alter table state: ${state}") - if (state.size()> 0 && state[0][9] == "FINISHED") { + if (state.size() > 0 && state[0][9] == "FINISHED") { return true } retry++ @@ -53,7 +58,7 @@ suite("insert_group_commit_into_duplicate") { sql """ drop table if exists ${table}; """ sql """ - CREATE TABLE `${table}` ( + CREATE TABLE ${table} ( `id` int(11) NOT NULL, `name` varchar(50) NULL, `score` int(11) NULL default "-1" @@ -69,104 +74,107 @@ suite("insert_group_commit_into_duplicate") { ); """ - sql """ set enable_insert_group_commit = true; """ + def group_commit_insert = { sql, expected_row_count -> + def stmt = prepareStatement """ ${sql} """ + def result = stmt.executeUpdate() + logger.info("insert result: " + result) + def serverInfo = (((StatementImpl) stmt).results).getServerInfo() + logger.info("result server info: " + serverInfo) + if (result != expected_row_count) { + logger.warn("insert result: " + result + ", expected_row_count: " + expected_row_count + ", sql: " + sql) + } + // assertEquals(result, expected_row_count) + assertTrue(serverInfo.contains("'status':'PREPARE'")) + assertTrue(serverInfo.contains("'label':'group_commit_")) + } - // 1. insert into - def result = sql """ insert into ${table}(name, id) values('c', 3); """ - logger.info("insert result: " + result) - assertEquals(1, result.size()) - assertEquals(1, result[0].size()) - assertEquals(1, result[0][0]) - result = sql """ insert into ${table}(id) values(4); """ - logger.info("insert result: " + result) - result = sql """ insert into ${table} values (1, 'a', 10),(5, 'q', 50); """ - logger.info("insert result: " + result) - assertEquals(1, result.size()) - assertEquals(1, result[0].size()) - assertEquals(2, result[0][0]) - result = sql """ insert into ${table}(id, name) values(2, 'b'); """ - logger.info("insert result: " + result) - result = sql """ insert into ${table}(id) select 6; """ - logger.info("insert result: " + result) + connect(user = context.config.jdbcUser, password = context.config.jdbcPassword, url = context.config.jdbcUrl) { + sql """ set enable_insert_group_commit = true; """ + // TODO + sql """ set enable_nereids_dml = false; """ - getRowCount(6) - qt_sql """ select * from ${table} order by id, name, score asc; """ + // 1. insert into + group_commit_insert """ insert into ${table}(name, id) values('c', 3); """, 1 + group_commit_insert """ insert into ${table}(id) values(4); """, 1 + group_commit_insert """ insert into ${table} values (1, 'a', 10),(5, 'q', 50); """, 2 + group_commit_insert """ insert into ${table}(id, name) values(2, 'b'); """, 1 + group_commit_insert """ insert into ${table}(id) select 6; """, 1 - // 2. insert into and delete - sql """ delete from ${table} where id = 4; """ - sql """ insert into ${table}(name, id) values('c', 3); """ - /*sql """ insert into ${table}(id, name) values(4, 'd1'); """ - sql """ insert into ${table}(id, name) values(4, 'd1'); """ - sql """ delete from ${table} where id = 4; """*/ - sql """ insert into ${table}(id, name) values(4, 'e1'); """ - sql """ insert into ${table} values (1, 'a', 10),(5, 'q', 50); """ - sql """ insert into ${table}(id, name) values(2, 'b'); """ - sql """ insert into ${table}(id) select 6; """ + getRowCount(6) + qt_sql """ select * from ${table} order by id, name, score asc; """ - getRowCount(11) - qt_sql """ select * from ${table} order by id, name, score asc; """ + // 2. insert into and delete + sql """ delete from ${table} where id = 4; """ + group_commit_insert """ insert into ${table}(name, id) values('c', 3); """, 1 + /*sql """ insert into ${table}(id, name) values(4, 'd1'); """ + sql """ insert into ${table}(id, name) values(4, 'd1'); """ + sql """ delete from ${table} where id = 4; """*/ + group_commit_insert """ insert into ${table}(id, name) values(4, 'e1'); """, 1 + group_commit_insert """ insert into ${table} values (1, 'a', 10),(5, 'q', 50); """, 2 + group_commit_insert """ insert into ${table}(id, name) values(2, 'b'); """, 1 + group_commit_insert """ insert into ${table}(id) select 6; """, 1 - // 3. insert into and light schema change: add column - sql """ insert into ${table}(name, id) values('c', 3); """ - sql """ insert into ${table}(id) values(4); """ - sql """ insert into ${table} values (1, 'a', 10),(5, 'q', 50); """ - sql """ alter table ${table} ADD column age int after name; """ - sql """ insert into ${table}(id, name) values(2, 'b'); """ - sql """ insert into ${table}(id) select 6; """ + getRowCount(11) + qt_sql """ select * from ${table} order by id, name, score asc; """ - assertTrue(getAlterTableState(), "add column should success") - getRowCount(17) - qt_sql """ select * from ${table} order by id, name,score asc; """ + // 3. insert into and light schema change: add column + group_commit_insert """ insert into ${table}(name, id) values('c', 3); """, 1 + group_commit_insert """ insert into ${table}(id) values(4); """, 1 + group_commit_insert """ insert into ${table} values (1, 'a', 10),(5, 'q', 50); """, 2 + sql """ alter table ${table} ADD column age int after name; """ + group_commit_insert """ insert into ${table}(id, name) values(2, 'b'); """, 1 + group_commit_insert """ insert into ${table}(id) select 6; """, 1 - // 4. insert into and truncate table - /*sql """ insert into ${table}(name, id) values('c', 3); """ - sql """ insert into ${table}(id) values(4); """ - sql """ insert into ${table} values (1, 'a', 5, 10),(5, 'q', 6, 50); """*/ - sql """ truncate table ${table}; """ - sql """ insert into ${table}(id, name) values(2, 'b'); """ - sql """ insert into ${table}(id) select 6; """ + assertTrue(getAlterTableState(), "add column should success") + getRowCount(17) + qt_sql """ select * from ${table} order by id, name,score asc; """ - getRowCount(2) - qt_sql """ select * from ${table} order by id, name, score asc; """ + // 4. insert into and truncate table + /*sql """ insert into ${table}(name, id) values('c', 3); """ + sql """ insert into ${table}(id) values(4); """ + sql """ insert into ${table} values (1, 'a', 5, 10),(5, 'q', 6, 50); """*/ + sql """ truncate table ${table}; """ + group_commit_insert """ insert into ${table}(id, name) values(2, 'b'); """, 1 + group_commit_insert """ insert into ${table}(id) select 6; """, 1 - // 5. insert into and schema change: modify column order - sql """ insert into ${table}(name, id) values('c', 3); """ - sql """ insert into ${table}(id) values(4); """ - sql """ insert into ${table} values (1, 'a', 5, 10),(5, 'q', 6, 50); """ - // sql """ alter table ${table} order by (id, name, score, age); """ - sql """ insert into ${table}(id, name) values(2, 'b'); """ - sql """ insert into ${table}(id) select 6; """ + getRowCount(2) + qt_sql """ select * from ${table} order by id, name, score asc; """ - // assertTrue(getAlterTableState(), "modify column order should success") - getRowCount(8) - qt_sql """ select id, name, score, age from ${table} order by id, name, score asc; """ + // 5. insert into and schema change: modify column order + group_commit_insert """ insert into ${table}(name, id) values('c', 3); """, 1 + group_commit_insert """ insert into ${table}(id) values(4); """, 1 + group_commit_insert """ insert into ${table} values (1, 'a', 5, 10),(5, 'q', 6, 50); """, 2 + // sql """ alter table ${table} order by (id, name, score, age); """ + group_commit_insert """ insert into ${table}(id, name) values(2, 'b'); """, 1 + group_commit_insert """ insert into ${table}(id) select 6; """, 1 - // 6. insert into and light schema change: drop column - sql """ insert into ${table}(name, id) values('c', 3); """ - sql """ insert into ${table}(id) values(4); """ - sql """ insert into ${table} values (1, 'a', 5, 10),(5, 'q', 6, 50); """ - sql """ alter table ${table} DROP column age; """ - sql """ insert into ${table}(id, name) values(2, 'b'); """ - sql """ insert into ${table}(id) select 6; """ + // assertTrue(getAlterTableState(), "modify column order should success") + getRowCount(8) + qt_sql """ select id, name, score, age from ${table} order by id, name, score asc; """ - assertTrue(getAlterTableState(), "drop column should success") - getRowCount(14) - qt_sql """ select * from ${table} order by id, name, score asc; """ + // 6. insert into and light schema change: drop column + group_commit_insert """ insert into ${table}(name, id) values('c', 3); """, 1 + group_commit_insert """ insert into ${table}(id) values(4); """, 1 + group_commit_insert """ insert into ${table} values (1, 'a', 5, 10),(5, 'q', 6, 50); """, 2 + sql """ alter table ${table} DROP column age; """ + group_commit_insert """ insert into ${table}(id, name) values(2, 'b'); """, 1 + group_commit_insert """ insert into ${table}(id) select 6; """, 1 - // 7. insert into and add rollup - sql """ insert into ${table}(name, id) values('c', 3); """ - sql """ insert into ${table}(id) values(4); """ - result = sql """ insert into ${table} values (1, 'a', 10),(5, 'q', 50),(101, 'a', 100); """ - logger.info("insert result: " + result) - assertEquals(1, result.size()) - assertEquals(1, result[0].size()) - assertEquals(2, result[0][0]) - // sql """ alter table ${table} ADD ROLLUP r1(name, score); """ - sql """ insert into ${table}(id, name) values(2, 'b'); """ - sql """ insert into ${table}(id) select 6; """ + assertTrue(getAlterTableState(), "drop column should success") + getRowCount(14) + qt_sql """ select * from ${table} order by id, name, score asc; """ - getRowCount(20) - qt_sql """ select name, score from ${table} order by name asc; """ + // 7. insert into and add rollup + group_commit_insert """ insert into ${table}(name, id) values('c', 3); """, 1 + group_commit_insert """ insert into ${table}(id) values(4); """, 1 + group_commit_insert """ insert into ${table} values (1, 'a', 10),(5, 'q', 50),(101, 'a', 100); """, 2 + // sql """ alter table ${table} ADD ROLLUP r1(name, score); """ + group_commit_insert """ insert into ${table}(id, name) values(2, 'b'); """, 1 + group_commit_insert """ insert into ${table}(id) select 6; """, 1 + + getRowCount(20) + qt_sql """ select name, score from ${table} order by name asc; """ + } } finally { // try_sql("DROP TABLE ${table}") } diff --git a/regression-test/suites/insert_p0/insert_group_commit_with_exception.groovy b/regression-test/suites/insert_p0/insert_group_commit_with_exception.groovy index 349bc0b852..3eeaeb797f 100644 --- a/regression-test/suites/insert_p0/insert_group_commit_with_exception.groovy +++ b/regression-test/suites/insert_p0/insert_group_commit_with_exception.groovy @@ -71,6 +71,8 @@ suite("insert_group_commit_with_exception") { """ sql """ set enable_insert_group_commit = true; """ + // TODO + sql """ set enable_nereids_dml = false; """ // insert into without column try { diff --git a/regression-test/suites/insert_p0/insert_group_commit_with_large_data.groovy b/regression-test/suites/insert_p0/insert_group_commit_with_large_data.groovy index 8a2f5f91b8..1805905a2a 100644 --- a/regression-test/suites/insert_p0/insert_group_commit_with_large_data.groovy +++ b/regression-test/suites/insert_p0/insert_group_commit_with_large_data.groovy @@ -15,7 +15,10 @@ // specific language governing permissions and limitations // under the License. +import com.mysql.cj.jdbc.StatementImpl + suite("insert_group_commit_with_large_data") { + def db = "regression_test_insert_p0" def table = "insert_group_commit_with_large_data" def getRowCount = { expectedRowCount -> @@ -31,6 +34,20 @@ suite("insert_group_commit_with_large_data") { } } + def group_commit_insert = { sql, expected_row_count -> + def stmt = prepareStatement """ ${sql} """ + def result = stmt.executeUpdate() + logger.info("insert result: " + result) + def serverInfo = (((StatementImpl) stmt).results).getServerInfo() + logger.info("result server info: " + serverInfo) + if (result != expected_row_count) { + logger.warn("insert result: " + result + ", expected_row_count: " + expected_row_count + ", sql: " + sql) + } + assertEquals(result, expected_row_count) + assertTrue(serverInfo.contains("'status':'PREPARE'")) + assertTrue(serverInfo.contains("'label':'group_commit_")) + } + try { // create table sql """ drop table if exists ${table}; """ @@ -48,35 +65,34 @@ suite("insert_group_commit_with_large_data") { ); """ - sql """ set enable_insert_group_commit = true; """ + connect(user = context.config.jdbcUser, password = context.config.jdbcPassword, url = context.config.jdbcUrl) { + sql """ set enable_insert_group_commit = true; """ + // TODO + sql """ set enable_nereids_dml = false; """ + sql """ use ${db}; """ - // insert into 5000 rows - def insert_sql = """ insert into ${table} values(1, 'a', 10) """ - for (def i in 2..5000) { - insert_sql += """, (${i}, 'a', 10) """ - } - def result = sql """ ${insert_sql} """ - logger.info("insert result: " + result) - assertEquals(1, result.size()) - assertEquals(1, result[0].size()) - assertEquals(5000, result[0][0]) - getRowCount(5000) + // insert into 5000 rows + def insert_sql = """ insert into ${table} values(1, 'a', 10) """ + for (def i in 2..5000) { + insert_sql += """, (${i}, 'a', 10) """ + } + group_commit_insert insert_sql, 5000 + getRowCount(5000) - // data size is large than 4MB, need " set global max_allowed_packet = 5508950 " - /*def name_value = "" - for (def i in 0..1024) { - name_value += 'a' + // data size is large than 4MB, need " set global max_allowed_packet = 5508950 " + /*def name_value = "" + for (def i in 0..1024) { + name_value += 'a' + } + insert_sql = """ insert into ${table} values(1, '${name_value}', 10) """ + for (def i in 2..5000) { + insert_sql += """, (${i}, '${name_value}', 10) """ + } + result = sql """ ${insert_sql} """ + group_commit_insert insert_sql, 5000 + getRowCount(10000) + */ } - insert_sql = """ insert into ${table} values(1, '${name_value}', 10) """ - for (def i in 2..5000) { - insert_sql += """, (${i}, '${name_value}', 10) """ - } - result = sql """ ${insert_sql} """ - logger.info("insert result: " + result) - assertEquals(1, result.size()) - assertEquals(1, result[0].size()) - assertEquals(5000, result[0][0]) - getRowCount(10000)*/ } finally { // try_sql("DROP TABLE ${table}") } diff --git a/regression-test/suites/insert_p0/insert_group_commit_with_prepare_stmt.groovy b/regression-test/suites/insert_p0/insert_group_commit_with_prepare_stmt.groovy index fd8d5309fb..b9748dc8d1 100644 --- a/regression-test/suites/insert_p0/insert_group_commit_with_prepare_stmt.groovy +++ b/regression-test/suites/insert_p0/insert_group_commit_with_prepare_stmt.groovy @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +import com.mysql.cj.jdbc.StatementImpl + suite("insert_group_commit_with_prepare_stmt") { def user = context.config.jdbcUser def password = context.config.jdbcPassword @@ -63,6 +65,19 @@ suite("insert_group_commit_with_prepare_stmt") { stmt.addBatch() } + def group_commit_insert = { stmt, expected_row_count -> + def result = stmt.executeBatch() + logger.info("insert result: " + result) + def serverInfo = (((StatementImpl) stmt).results).getServerInfo() + logger.info("result server info: " + serverInfo) + if (result != expected_row_count) { + logger.warn("insert result: " + result + ", expected_row_count: " + expected_row_count) + } + // assertEquals(result, expected_row_count) + assertTrue(serverInfo.contains("'status':'PREPARE'")) + assertTrue(serverInfo.contains("'label':'group_commit_")) + } + def url = getServerPrepareJdbcUrl(context.config.jdbcUrl, realDb) logger.info("url: " + url) @@ -91,19 +106,16 @@ suite("insert_group_commit_with_prepare_stmt") { assertEquals(com.mysql.cj.jdbc.ServerPreparedStatement, insert_stmt.class) insert_prepared insert_stmt, 1, "a", 10 - def result = insert_stmt.executeBatch() - logger.info("execute result: ${result}") + group_commit_insert insert_stmt, 1 insert_prepared insert_stmt, 2, null, 20 insert_prepared insert_stmt, 3, "c", null insert_prepared insert_stmt, 4, "d", 40 - result = insert_stmt.executeBatch() - logger.info("execute result: ${result}") + group_commit_insert insert_stmt, 3 insert_prepared insert_stmt, 5, "e", null insert_prepared insert_stmt, 6, "f", 40 - result = insert_stmt.executeBatch() - logger.info("execute result: ${result}") + group_commit_insert insert_stmt, 2 getRowCount(6) qt_sql """ select * from ${table} order by id asc; """ @@ -113,13 +125,11 @@ suite("insert_group_commit_with_prepare_stmt") { assertEquals(com.mysql.cj.jdbc.ServerPreparedStatement, insert_stmt.class) insert_prepared_partial insert_stmt, 'a', 1, 1 - result = insert_stmt.executeBatch() - logger.info("execute result: ${result}") + group_commit_insert insert_stmt, 1 insert_prepared_partial insert_stmt, 'e', 7, 0 insert_prepared_partial insert_stmt, null, 8, 0 - result = insert_stmt.executeBatch() - logger.info("execute result: ${result}") + group_commit_insert insert_stmt, 2 getRowCount(7) qt_sql """ select * from ${table} order by id, name, score asc; """ @@ -155,19 +165,16 @@ suite("insert_group_commit_with_prepare_stmt") { assertEquals(com.mysql.cj.jdbc.ServerPreparedStatement, insert_stmt.class) insert_prepared insert_stmt, 1, "a", 10 - def result = insert_stmt.executeBatch() - logger.info("execute result: " + result) + group_commit_insert insert_stmt, 1 insert_prepared insert_stmt, 2, null, 20 insert_prepared insert_stmt, 3, "c", null insert_prepared insert_stmt, 4, "d", 40 - result = insert_stmt.executeBatch() - logger.info("execute result: " + result) + group_commit_insert insert_stmt, 3 insert_prepared insert_stmt, 5, "e", null insert_prepared insert_stmt, 6, "f", 40 - result = insert_stmt.executeBatch() - logger.info("execute result: " + result) + group_commit_insert insert_stmt, 2 getRowCount(6) qt_sql """ select * from ${table} order by id asc; """ @@ -177,13 +184,11 @@ suite("insert_group_commit_with_prepare_stmt") { assertEquals(com.mysql.cj.jdbc.ServerPreparedStatement, insert_stmt.class) insert_prepared_partial_dup insert_stmt, 'a', 1 - result = insert_stmt.executeBatch() - logger.info("execute result: " + result) + group_commit_insert insert_stmt, 1 insert_prepared_partial_dup insert_stmt, 'e', 7 insert_prepared_partial_dup insert_stmt, null, 8 - result = insert_stmt.executeBatch() - logger.info("execute result: " + result) + group_commit_insert insert_stmt, 2 getRowCount(9) qt_sql """ select * from ${table} order by id, name, score asc; """