[fix](regression) Add regression for group commit executed on observe… (#26692)

This commit is contained in:
meiyi
2023-11-10 18:53:45 +08:00
committed by GitHub
parent 70fdd1f1af
commit ca47d75e83
2 changed files with 57 additions and 1 deletions

View File

@ -1813,6 +1813,7 @@ public class StmtExecutor {
TransactionStatus txnStatus = TransactionStatus.ABORTED;
String errMsg = "";
TableType tblType = insertStmt.getTargetTable().getType();
boolean isGroupCommit = false;
if (context.isTxnModel()) {
if (insertStmt.getQueryStmt() instanceof SelectStmt) {
if (((SelectStmt) insertStmt.getQueryStmt()).getTableRefs().size() > 0) {
@ -1824,6 +1825,7 @@ public class StmtExecutor {
label = context.getTxnEntry().getLabel();
txnId = context.getTxnEntry().getTxnConf().getTxnId();
} else if (insertStmt instanceof NativeInsertStmt && ((NativeInsertStmt) insertStmt).isGroupCommit()) {
isGroupCommit = true;
NativeInsertStmt nativeInsertStmt = (NativeInsertStmt) insertStmt;
int maxRetry = 3;
for (int i = 0; i < maxRetry; i++) {
@ -2013,6 +2015,9 @@ public class StmtExecutor {
if (!Strings.isNullOrEmpty(errMsg)) {
sb.append(", 'err':'").append(errMsg).append("'");
}
if (isGroupCommit) {
sb.append(", 'query_id':'").append(DebugUtil.printId(context.queryId)).append("'");
}
sb.append("}");
context.getState().setOk(loadedRows, filteredRows, sb.toString());

View File

@ -16,6 +16,7 @@
// under the License.
import com.mysql.cj.jdbc.StatementImpl
import org.codehaus.groovy.runtime.IOGroovyMethods
suite("insert_group_commit_into") {
def dbName = "regression_test_insert_p0"
@ -65,6 +66,7 @@ suite("insert_group_commit_into") {
// assertEquals(result, expected_row_count)
assertTrue(serverInfo.contains("'status':'PREPARE'"))
assertTrue(serverInfo.contains("'label':'group_commit_"))
return serverInfo
}
def none_group_commit_insert = { sql, expected_row_count ->
@ -211,6 +213,55 @@ suite("insert_group_commit_into") {
// try_sql("DROP TABLE ${table}")
}
// test connect to observer fe
try {
def fes = sql_return_maparray "show frontends"
logger.info("frontends: ${fes}")
if (fes.size() > 1) {
def observer_fe = null
for (def fe : fes) {
if (fe.IsMaster == "false") {
observer_fe = fe
break
}
}
if (observer_fe != null) {
def url = "jdbc:mysql://${observer_fe.Host}:${observer_fe.QueryPort}/"
logger.info("observer url: " + url)
connect(user = context.config.jdbcUser, password = context.config.jdbcPassword, url = url) {
sql """ set enable_insert_group_commit = true; """
sql """ set enable_nereids_dml = false; """
sql """ set enable_profile= true; """
// 1. insert into
def server_info = group_commit_insert """ insert into ${table}(name, id) values('c', 3); """, 1
assertTrue(server_info.contains('query_id'))
// get query_id, such as 43f87963586a482a-b0496bcf9e2b5555
def query_id_index = server_info.indexOf("'query_id':'") + "'query_id':'".length()
def query_id = server_info.substring(query_id_index, query_id_index + 33)
logger.info("query_id: " + query_id)
// 2. check profile
StringBuilder sb = new StringBuilder();
sb.append("curl -X GET -u ${context.config.jdbcUser}:${context.config.jdbcPassword} http://${observer_fe.Host}:${observer_fe.HttpPort}")
sb.append("/api/profile?query_id=").append(query_id)
String command = sb.toString()
logger.info(command)
def process = command.execute()
def code = process.waitFor()
def err = IOGroovyMethods.getText(new BufferedReader(new InputStreamReader(process.getErrorStream())));
def out = process.getText()
logger.info("Get profile: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def json = parseJson(out)
assertEquals("success", json.msg.toLowerCase())
}
}
} else {
logger.info("only one fe, skip test connect to observer fe")
}
} finally {
}
// table with array type
tableName = "insert_group_commit_into_duplicate_array"
table = dbName + "." + tableName
@ -266,4 +317,4 @@ suite("insert_group_commit_into") {
// try_sql("DROP TABLE ${table}")
}
}
}
}