diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 5fa8a8205c..0c3902372d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -1813,6 +1813,7 @@ public class StmtExecutor { TransactionStatus txnStatus = TransactionStatus.ABORTED; String errMsg = ""; TableType tblType = insertStmt.getTargetTable().getType(); + boolean isGroupCommit = false; if (context.isTxnModel()) { if (insertStmt.getQueryStmt() instanceof SelectStmt) { if (((SelectStmt) insertStmt.getQueryStmt()).getTableRefs().size() > 0) { @@ -1824,6 +1825,7 @@ public class StmtExecutor { label = context.getTxnEntry().getLabel(); txnId = context.getTxnEntry().getTxnConf().getTxnId(); } else if (insertStmt instanceof NativeInsertStmt && ((NativeInsertStmt) insertStmt).isGroupCommit()) { + isGroupCommit = true; NativeInsertStmt nativeInsertStmt = (NativeInsertStmt) insertStmt; int maxRetry = 3; for (int i = 0; i < maxRetry; i++) { @@ -2013,6 +2015,9 @@ public class StmtExecutor { if (!Strings.isNullOrEmpty(errMsg)) { sb.append(", 'err':'").append(errMsg).append("'"); } + if (isGroupCommit) { + sb.append(", 'query_id':'").append(DebugUtil.printId(context.queryId)).append("'"); + } sb.append("}"); context.getState().setOk(loadedRows, filteredRows, sb.toString()); diff --git a/regression-test/suites/insert_p0/insert_group_commit_into.groovy b/regression-test/suites/insert_p0/insert_group_commit_into.groovy index bea130d106..736e774fbb 100644 --- a/regression-test/suites/insert_p0/insert_group_commit_into.groovy +++ b/regression-test/suites/insert_p0/insert_group_commit_into.groovy @@ -16,6 +16,7 @@ // under the License. import com.mysql.cj.jdbc.StatementImpl +import org.codehaus.groovy.runtime.IOGroovyMethods suite("insert_group_commit_into") { def dbName = "regression_test_insert_p0" @@ -65,6 +66,7 @@ suite("insert_group_commit_into") { // assertEquals(result, expected_row_count) assertTrue(serverInfo.contains("'status':'PREPARE'")) assertTrue(serverInfo.contains("'label':'group_commit_")) + return serverInfo } def none_group_commit_insert = { sql, expected_row_count -> @@ -211,6 +213,55 @@ suite("insert_group_commit_into") { // try_sql("DROP TABLE ${table}") } + // test connect to observer fe + try { + def fes = sql_return_maparray "show frontends" + logger.info("frontends: ${fes}") + if (fes.size() > 1) { + def observer_fe = null + for (def fe : fes) { + if (fe.IsMaster == "false") { + observer_fe = fe + break + } + } + if (observer_fe != null) { + def url = "jdbc:mysql://${observer_fe.Host}:${observer_fe.QueryPort}/" + logger.info("observer url: " + url) + connect(user = context.config.jdbcUser, password = context.config.jdbcPassword, url = url) { + sql """ set enable_insert_group_commit = true; """ + sql """ set enable_nereids_dml = false; """ + sql """ set enable_profile= true; """ + + // 1. insert into + def server_info = group_commit_insert """ insert into ${table}(name, id) values('c', 3); """, 1 + assertTrue(server_info.contains('query_id')) + // get query_id, such as 43f87963586a482a-b0496bcf9e2b5555 + def query_id_index = server_info.indexOf("'query_id':'") + "'query_id':'".length() + def query_id = server_info.substring(query_id_index, query_id_index + 33) + logger.info("query_id: " + query_id) + // 2. check profile + StringBuilder sb = new StringBuilder(); + sb.append("curl -X GET -u ${context.config.jdbcUser}:${context.config.jdbcPassword} http://${observer_fe.Host}:${observer_fe.HttpPort}") + sb.append("/api/profile?query_id=").append(query_id) + String command = sb.toString() + logger.info(command) + def process = command.execute() + def code = process.waitFor() + def err = IOGroovyMethods.getText(new BufferedReader(new InputStreamReader(process.getErrorStream()))); + def out = process.getText() + logger.info("Get profile: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def json = parseJson(out) + assertEquals("success", json.msg.toLowerCase()) + } + } + } else { + logger.info("only one fe, skip test connect to observer fe") + } + } finally { + } + // table with array type tableName = "insert_group_commit_into_duplicate_array" table = dbName + "." + tableName @@ -266,4 +317,4 @@ suite("insert_group_commit_into") { // try_sql("DROP TABLE ${table}") } } -} +} \ No newline at end of file