fix hang when use direct load in ctas with index

This commit is contained in:
fforkboat
2024-10-17 06:13:41 +00:00
committed by ob-robot
parent 3bc20bdf71
commit a788fa9169
3 changed files with 43 additions and 8 deletions

View File

@ -289,6 +289,7 @@ int ObCreateTableExecutor::prepare_ins_arg(ObCreateTableStmt &stmt,
int ObCreateTableExecutor::prepare_alter_arg(ObCreateTableStmt &stmt,
const ObSQLSessionInfo *my_session,
const ObString &create_table_name,
bool is_full_direct_insert,
obrpc::ObAlterTableArg &alter_table_arg) //out, 最终的alter table arg, set session_id = 0;
{
int ret = OB_SUCCESS;
@ -312,6 +313,7 @@ int ObCreateTableExecutor::prepare_alter_arg(ObCreateTableStmt &stmt,
} else if (OB_FAIL(alter_table_schema->assign(table_schema))) {
LOG_WARN("failed to assign alter table schema", K(ret));
} else if (!table_schema.is_mysql_tmp_table()
&& !is_full_direct_insert
&& FALSE_IT(alter_table_schema->set_session_id(0))) {
//impossible
} else if (OB_FAIL(alter_table_schema->set_origin_table_name(stmt.get_table_name()))) {
@ -323,6 +325,7 @@ int ObCreateTableExecutor::prepare_alter_arg(ObCreateTableStmt &stmt,
} else if (OB_FAIL(alter_table_schema->set_database_name(stmt.get_database_name()))) {
LOG_WARN("failed to set database name", K(ret));
} else if (!table_schema.is_mysql_tmp_table()
&& !is_full_direct_insert
&& OB_FAIL(alter_table_schema->alter_option_bitset_.add_member(obrpc::ObAlterTableArg::SESSION_ID))) {
LOG_WARN("failed to add member SESSION_ID for alter table schema", K(ret), K(alter_table_arg));
} else if (OB_FAIL(alter_table_schema->alter_option_bitset_.add_member(obrpc::ObAlterTableArg::TABLE_NAME))) {
@ -361,6 +364,34 @@ int ObCreateTableExecutor::prepare_drop_arg(const ObCreateTableStmt &stmt,
return ret;
}
int ObCreateTableExecutor::check_if_ctas_use_full_direct_insert(ObCreateTableStmt &stmt,
bool &is_full_direct_insert)
{
int ret = OB_SUCCESS;
const uint64_t tenant_id = MTL_ID();
uint64_t data_version = 0;
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(tenant_id));
const ObString &config_str = tenant_config->default_load_mode.get_value_string();
is_full_direct_insert = false;
if (OB_FAIL(GET_MIN_DATA_VERSION(tenant_id, data_version))) {
LOG_WARN("fail to get data version", KR(ret), K(tenant_id));
} else if (stmt.get_direct_load_hint().has_no_direct()) {
// when no_direct hint is set, ctas will not use direct load
} else if (stmt.get_has_parallel_hint() && 1 == stmt.get_parallelism()) {
// when user explictly set parallel hint to 1, ctas will not use direct load
} else if ((stmt.get_has_append_hint() || stmt.get_direct_load_hint().is_full_load_method()) &&
stmt.get_parallelism() > 1) {
is_full_direct_insert = true;
} else if (DATA_VERSION_4_3_4_0 <= data_version &&
tenant_config.is_valid() &&
0 == config_str.case_compare("FULL_DIRECT_WRITE")) {
is_full_direct_insert = true;
}
return ret;
}
//查询建表的处理, 通过内部session执行查询插入代码参考了 ObTableModify::ObTableModifyCtx::open_inner_conn() 实现
int ObCreateTableExecutor::execute_ctas(ObExecContext &ctx,
ObCreateTableStmt &stmt,
@ -398,16 +429,19 @@ int ObCreateTableExecutor::execute_ctas(ObExecContext &ctx,
}
if (OB_SUCC(ret)) {
ObInnerSQLConnectionPool *pool = static_cast<observer::ObInnerSQLConnectionPool*>(sql_proxy->get_pool());
bool is_full_direct_insert = false;
if (OB_ISNULL(pool)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("pool is null", K(ret));
} else if (OB_FAIL(oracle_sql_proxy.init(pool))) {
LOG_WARN("init oracle sql proxy failed", K(ret));
} else if (OB_FAIL(check_if_ctas_use_full_direct_insert(stmt, is_full_direct_insert))) {
LOG_WARN("failed to check if ctas use full direct insert", K(ret));
} else if (OB_FAIL(prepare_stmt(stmt, *my_session, create_table_name))) {
LOG_WARN("failed to prepare stmt", K(ret));
} else if (OB_FAIL(prepare_ins_arg(stmt, my_session, ctx.get_sql_ctx()->schema_guard_, &plan_ctx->get_param_store(), ins_sql))) { //1, 参数准备;
LOG_WARN("failed to prepare insert table arg", K(ret));
} else if (OB_FAIL(prepare_alter_arg(stmt, my_session, create_table_name, alter_table_arg))) {
} else if (OB_FAIL(prepare_alter_arg(stmt, my_session, create_table_name, is_full_direct_insert, alter_table_arg))) {
LOG_WARN("failed to prepare alter table arg", K(ret));
} else if (OB_FAIL(prepare_drop_arg(stmt, my_session, table_item, drop_table_arg))) {
LOG_WARN("failed to prepare drop table arg", K(ret));
@ -450,12 +484,6 @@ int ObCreateTableExecutor::execute_ctas(ObExecContext &ctx,
if (OB_SUCC(ret)) {
bool is_mysql_temp_table = stmt.get_create_table_arg().schema_.is_mysql_tmp_table();
bool in_trans = my_session->is_in_transaction();
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(MTL_ID()));
const ObString &config_str = tenant_config->default_load_mode.get_value_string();
bool is_full_direct_insert =
stmt.get_has_append_hint() ||
stmt.get_direct_load_hint().is_full_load_method() ||
(tenant_config.is_valid() && 0 == config_str.case_compare("FULL_DIRECT_WRITE"));
bool need_start_trans = !is_full_direct_insert && (!is_mysql_temp_table || !in_trans);
ObBasicSessionInfo::UserScopeGuard user_scope_guard(my_session->get_sql_scope_flags());
common::sqlclient::ObISQLConnection *conn = NULL;