diff --git a/src/rootserver/ob_ddl_service.cpp b/src/rootserver/ob_ddl_service.cpp index 6da04511b3..3ed8afc755 100755 --- a/src/rootserver/ob_ddl_service.cpp +++ b/src/rootserver/ob_ddl_service.cpp @@ -19118,6 +19118,12 @@ int ObDDLService::prepare_hidden_table_schema(const ObTableSchema &orig_table_sc hidden_table_schema.set_association_table_id(orig_table_schema.get_table_id()); // set the hidden attributes of the table hidden_table_schema.set_table_state_flag(ObTableStateFlag::TABLE_STATE_HIDDEN_OFFLINE_DDL); + if (hidden_table_schema.is_ctas_tmp_table()) { + // for CTAS table, clear its session id, otherwise this table schema will not be visble + // to the index rebuiding phase. + hidden_table_schema.set_session_id(0); + LOG_INFO("clear session_id of hidden table copied from CTAS table", K(hidden_table_schema)); + } if (orig_table_schema.get_tenant_id() != hidden_table_schema.get_tenant_id()) { // recover restore table, do not sync log to cdc. hidden_table_schema.set_ddl_ignore_sync_cdc_flag(ObDDLIgnoreSyncCdcFlag::DONT_SYNC_LOG_FOR_CDC); diff --git a/src/sql/engine/cmd/ob_table_executor.cpp b/src/sql/engine/cmd/ob_table_executor.cpp index ad066a442c..9529b25fba 100644 --- a/src/sql/engine/cmd/ob_table_executor.cpp +++ b/src/sql/engine/cmd/ob_table_executor.cpp @@ -289,6 +289,7 @@ int ObCreateTableExecutor::prepare_ins_arg(ObCreateTableStmt &stmt, int ObCreateTableExecutor::prepare_alter_arg(ObCreateTableStmt &stmt, const ObSQLSessionInfo *my_session, const ObString &create_table_name, + bool is_full_direct_insert, obrpc::ObAlterTableArg &alter_table_arg) //out, 最终的alter table arg, set session_id = 0; { int ret = OB_SUCCESS; @@ -312,6 +313,7 @@ int ObCreateTableExecutor::prepare_alter_arg(ObCreateTableStmt &stmt, } else if (OB_FAIL(alter_table_schema->assign(table_schema))) { LOG_WARN("failed to assign alter table schema", K(ret)); } else if (!table_schema.is_mysql_tmp_table() + && !is_full_direct_insert && FALSE_IT(alter_table_schema->set_session_id(0))) { //impossible } else if (OB_FAIL(alter_table_schema->set_origin_table_name(stmt.get_table_name()))) { @@ -323,6 +325,7 @@ int ObCreateTableExecutor::prepare_alter_arg(ObCreateTableStmt &stmt, } else if (OB_FAIL(alter_table_schema->set_database_name(stmt.get_database_name()))) { LOG_WARN("failed to set database name", K(ret)); } else if (!table_schema.is_mysql_tmp_table() + && !is_full_direct_insert && OB_FAIL(alter_table_schema->alter_option_bitset_.add_member(obrpc::ObAlterTableArg::SESSION_ID))) { LOG_WARN("failed to add member SESSION_ID for alter table schema", K(ret), K(alter_table_arg)); } else if (OB_FAIL(alter_table_schema->alter_option_bitset_.add_member(obrpc::ObAlterTableArg::TABLE_NAME))) { @@ -361,6 +364,34 @@ int ObCreateTableExecutor::prepare_drop_arg(const ObCreateTableStmt &stmt, return ret; } +int ObCreateTableExecutor::check_if_ctas_use_full_direct_insert(ObCreateTableStmt &stmt, + bool &is_full_direct_insert) +{ + int ret = OB_SUCCESS; + const uint64_t tenant_id = MTL_ID(); + uint64_t data_version = 0; + omt::ObTenantConfigGuard tenant_config(TENANT_CONF(tenant_id)); + const ObString &config_str = tenant_config->default_load_mode.get_value_string(); + is_full_direct_insert = false; + + if (OB_FAIL(GET_MIN_DATA_VERSION(tenant_id, data_version))) { + LOG_WARN("fail to get data version", KR(ret), K(tenant_id)); + } else if (stmt.get_direct_load_hint().has_no_direct()) { + // when no_direct hint is set, ctas will not use direct load + } else if (stmt.get_has_parallel_hint() && 1 == stmt.get_parallelism()) { + // when user explictly set parallel hint to 1, ctas will not use direct load + } else if ((stmt.get_has_append_hint() || stmt.get_direct_load_hint().is_full_load_method()) && + stmt.get_parallelism() > 1) { + is_full_direct_insert = true; + } else if (DATA_VERSION_4_3_4_0 <= data_version && + tenant_config.is_valid() && + 0 == config_str.case_compare("FULL_DIRECT_WRITE")) { + is_full_direct_insert = true; + } + + return ret; +} + //查询建表的处理, 通过内部session执行查询插入代码参考了 ObTableModify::ObTableModifyCtx::open_inner_conn() 实现 int ObCreateTableExecutor::execute_ctas(ObExecContext &ctx, ObCreateTableStmt &stmt, @@ -398,16 +429,19 @@ int ObCreateTableExecutor::execute_ctas(ObExecContext &ctx, } if (OB_SUCC(ret)) { ObInnerSQLConnectionPool *pool = static_cast(sql_proxy->get_pool()); + bool is_full_direct_insert = false; if (OB_ISNULL(pool)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("pool is null", K(ret)); } else if (OB_FAIL(oracle_sql_proxy.init(pool))) { LOG_WARN("init oracle sql proxy failed", K(ret)); + } else if (OB_FAIL(check_if_ctas_use_full_direct_insert(stmt, is_full_direct_insert))) { + LOG_WARN("failed to check if ctas use full direct insert", K(ret)); } else if (OB_FAIL(prepare_stmt(stmt, *my_session, create_table_name))) { LOG_WARN("failed to prepare stmt", K(ret)); } else if (OB_FAIL(prepare_ins_arg(stmt, my_session, ctx.get_sql_ctx()->schema_guard_, &plan_ctx->get_param_store(), ins_sql))) { //1, 参数准备; LOG_WARN("failed to prepare insert table arg", K(ret)); - } else if (OB_FAIL(prepare_alter_arg(stmt, my_session, create_table_name, alter_table_arg))) { + } else if (OB_FAIL(prepare_alter_arg(stmt, my_session, create_table_name, is_full_direct_insert, alter_table_arg))) { LOG_WARN("failed to prepare alter table arg", K(ret)); } else if (OB_FAIL(prepare_drop_arg(stmt, my_session, table_item, drop_table_arg))) { LOG_WARN("failed to prepare drop table arg", K(ret)); @@ -450,12 +484,6 @@ int ObCreateTableExecutor::execute_ctas(ObExecContext &ctx, if (OB_SUCC(ret)) { bool is_mysql_temp_table = stmt.get_create_table_arg().schema_.is_mysql_tmp_table(); bool in_trans = my_session->is_in_transaction(); - omt::ObTenantConfigGuard tenant_config(TENANT_CONF(MTL_ID())); - const ObString &config_str = tenant_config->default_load_mode.get_value_string(); - bool is_full_direct_insert = - stmt.get_has_append_hint() || - stmt.get_direct_load_hint().is_full_load_method() || - (tenant_config.is_valid() && 0 == config_str.case_compare("FULL_DIRECT_WRITE")); bool need_start_trans = !is_full_direct_insert && (!is_mysql_temp_table || !in_trans); ObBasicSessionInfo::UserScopeGuard user_scope_guard(my_session->get_sql_scope_flags()); common::sqlclient::ObISQLConnection *conn = NULL; diff --git a/src/sql/engine/cmd/ob_table_executor.h b/src/sql/engine/cmd/ob_table_executor.h index 95c2b7e65a..6d719557df 100644 --- a/src/sql/engine/cmd/ob_table_executor.h +++ b/src/sql/engine/cmd/ob_table_executor.h @@ -89,8 +89,9 @@ private: ObSchemaGetterGuard *schema_guard, const ParamStore *param_store, ObSqlString &ins_sql); - int prepare_alter_arg(ObCreateTableStmt &stmt, const ObSQLSessionInfo *my_session, const ObString &create_table_name, obrpc::ObAlterTableArg &alter_table_arg); + int prepare_alter_arg(ObCreateTableStmt &stmt, const ObSQLSessionInfo *my_session, const ObString &create_table_name, bool is_full_direct_insert, obrpc::ObAlterTableArg &alter_table_arg); int prepare_drop_arg(const ObCreateTableStmt &stmt, const ObSQLSessionInfo *my_session, obrpc::ObTableItem &table_item, obrpc::ObDropTableArg &drop_table_arg); + int check_if_ctas_use_full_direct_insert(ObCreateTableStmt &stmt, bool &is_full_direct_insert); }; class ObAlterTableStmt;