diff --git a/src/observer/table_load/ob_table_load_coordinator.cpp b/src/observer/table_load/ob_table_load_coordinator.cpp index b66ac04fcb..cdff675af4 100644 --- a/src/observer/table_load/ob_table_load_coordinator.cpp +++ b/src/observer/table_load/ob_table_load_coordinator.cpp @@ -731,7 +731,6 @@ int ObTableLoadCoordinator::drive_sql_stat(ObExecContext *ctx, const uint64_t table_id = ctx_->ddl_param_.dest_table_id_; ObSchemaGetterGuard schema_guard; ObSchemaGetterGuard *tmp_schema_guard = nullptr; - ObSchemaGetterGuard *tmp_schema_guard2 = nullptr; const ObTableSchema *table_schema = nullptr; if (OB_UNLIKELY(nullptr == ctx || sql_statistics.is_empty())) { ret = OB_INVALID_ARGUMENT; @@ -741,9 +740,8 @@ int ObTableLoadCoordinator::drive_sql_stat(ObExecContext *ctx, LOG_WARN("fail to get table schema", KR(ret), K(tenant_id), K(table_id)); } else { tmp_schema_guard = ctx->get_virtual_table_ctx().schema_guard_; - tmp_schema_guard2 = ctx->get_das_ctx().get_schema_guard(); ctx->get_sql_ctx()->schema_guard_ = &schema_guard; - ctx->get_das_ctx().get_schema_guard() = &schema_guard; + ctx->get_das_ctx().set_sql_ctx(ctx->get_sql_ctx()); } ObSEArray part_column_stats; if (OB_FAIL(ret)) { @@ -754,7 +752,7 @@ int ObTableLoadCoordinator::drive_sql_stat(ObExecContext *ctx, LOG_WARN("fail to drive global stat by direct load", KR(ret)); } ctx->get_sql_ctx()->schema_guard_ = tmp_schema_guard; - ctx->get_das_ctx().get_schema_guard() = tmp_schema_guard2; + ctx->get_das_ctx().set_sql_ctx(ctx->get_sql_ctx()); return ret; } diff --git a/src/observer/table_load/ob_table_load_partition_calc.cpp b/src/observer/table_load/ob_table_load_partition_calc.cpp index 93cebc5149..d73d4dc49d 100644 --- a/src/observer/table_load/ob_table_load_partition_calc.cpp +++ b/src/observer/table_load/ob_table_load_partition_calc.cpp @@ -23,7 +23,8 @@ using namespace sql; using namespace table; ObTableLoadPartitionCalc::ObTableLoadPartitionCalc() - : is_partition_with_autoinc_(false), + : session_info_(nullptr), + is_partition_with_autoinc_(false), partition_with_autoinc_idx_(OB_INVALID_INDEX), tenant_id_(OB_INVALID_ID), table_id_(OB_INVALID_ID), @@ -42,6 +43,8 @@ int ObTableLoadPartitionCalc::init(uint64_t tenant_id, uint64_t table_id, sql::O LOG_WARN("ObTableLoadPartitionCalc init twice", KR(ret), KP(this)); } else { allocator_.set_tenant_id(tenant_id); + sql_ctx_.schema_guard_ = &schema_guard_; + exec_ctx_.set_sql_ctx(&sql_ctx_); const ObTableSchema *table_schema = nullptr; ObDataTypeCastParams cast_params(session_info->get_timezone_info()); if (OB_FAIL(time_cvrt_.init(cast_params.get_nls_format(ObDateTimeType)))) { @@ -57,7 +60,6 @@ int ObTableLoadPartitionCalc::init(uint64_t tenant_id, uint64_t table_id, sql::O LOG_WARN("fail to get tablet and object", KR(ret)); } } else { // 分区表 - exec_ctx_.set_sql_ctx(&sql_ctx_); // 初始化table_location_ if (OB_FAIL( table_location_.init_partition_ids_by_rowkey2(exec_ctx_, *session_info, schema_guard_, table_id))) { diff --git a/src/observer/table_load/ob_table_load_partition_calc.h b/src/observer/table_load/ob_table_load_partition_calc.h index b79177b559..20a680ab01 100644 --- a/src/observer/table_load/ob_table_load_partition_calc.h +++ b/src/observer/table_load/ob_table_load_partition_calc.h @@ -51,7 +51,7 @@ private: public: struct IndexAndType { - IndexAndType() : index_(-1) {} + IndexAndType() : index_(-1), column_schema_(nullptr) {} int64_t index_; const share::schema::ObColumnSchemaV2 *column_schema_; TO_STRING_KV(K_(index), KP_(column_schema)); diff --git a/src/sql/das/ob_das_context.cpp b/src/sql/das/ob_das_context.cpp index 78b62d0980..d63d136a42 100644 --- a/src/sql/das/ob_das_context.cpp +++ b/src/sql/das/ob_das_context.cpp @@ -86,36 +86,22 @@ int ObDASCtx::get_das_tablet_mapper(const uint64_t ref_table_id, if (tablet_mapper.is_non_partition_optimized()) { // table ids has calced for no partition entity table, continue } else if (!is_vt) { - if (schema_guard_ == nullptr) { - void *buf = allocator_.alloc(sizeof(ObSchemaGetterGuard)); - if (OB_ISNULL(buf)) { - ret = OB_ALLOCATE_MEMORY_FAILED; - LOG_WARN("allocate schema getter guard failed", K(ret)); - } else { - schema_guard_ = new (buf) ObSchemaGetterGuard(share::schema::ObSchemaMgrItem::MOD_DAS_CTX); - self_schema_guard_ = true; - if (OB_FAIL(GCTX.schema_service_->get_tenant_schema_guard(tenant_id, *schema_guard_))) { - LOG_WARN("get schema guard failed", K(ret)); - //release the schema guard when fetch the schema guard throw exception - schema_guard_->~ObSchemaGetterGuard(); - schema_guard_ = nullptr; - } - } - } //get ObTableSchema object corresponding to the table_id from ObSchemaGetterGuard //record the ObTableSchema into tablet_mapper //the tablet and partition info come from ObTableSchema in the real table - - if (OB_SUCC(ret)) { - if (OB_FAIL(schema_guard_->get_table_schema(tenant_id, - real_table_id, tablet_mapper.table_schema_))) { - LOG_WARN("get table schema failed", K(ret), K(tenant_id), K(real_table_id)); - } else if (OB_ISNULL(tablet_mapper.table_schema_)) { - ret = OB_TABLE_NOT_EXIST; - LOG_WARN("table schema is not found", K(ret), K(real_table_id)); - } else { - tablet_mapper.related_info_.guard_ = schema_guard_; - } + ObSchemaGetterGuard *schema_guard = nullptr; + if (OB_ISNULL(sql_ctx_) || OB_ISNULL(schema_guard = sql_ctx_->schema_guard_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("schema guard is nullptr", K(ret), K(sql_ctx_), K(schema_guard)); + } else if (OB_FAIL(schema_guard->get_table_schema(tenant_id, + real_table_id, + tablet_mapper.table_schema_))) { + LOG_WARN("get table schema failed", K(ret), K(tenant_id), K(real_table_id)); + } else if (OB_ISNULL(tablet_mapper.table_schema_)) { + ret = OB_TABLE_NOT_EXIST; + LOG_WARN("table schema is not found", K(ret), K(real_table_id)); + } else { + tablet_mapper.related_info_.guard_ = schema_guard; } } else { //get all server lists corresponding to the table_id from the tablet location cache diff --git a/src/sql/das/ob_das_context.h b/src/sql/das/ob_das_context.h index 09025168d9..620c7240e4 100644 --- a/src/sql/das/ob_das_context.h +++ b/src/sql/das/ob_das_context.h @@ -43,12 +43,11 @@ public: ObDASCtx(common::ObIAllocator &allocator) : table_locs_(allocator), external_table_locs_(allocator), - schema_guard_(nullptr), + sql_ctx_(nullptr), location_router_(allocator), das_factory_(allocator), related_tablet_map_(allocator), allocator_(allocator), - self_schema_guard_(false), snapshot_(), savepoint_(0), del_ctx_list_(allocator), @@ -60,10 +59,6 @@ public: } ~ObDASCtx() { - if (schema_guard_ != nullptr && self_schema_guard_) { - schema_guard_->~ObSchemaGetterGuard(); - schema_guard_ = nullptr; - } } int init(const ObPhysicalPlan &plan, ObExecContext &ctx); @@ -103,7 +98,7 @@ public: external_table_locs_.clear(); } ObDASTaskFactory &get_das_factory() { return das_factory_; } - ObSchemaGetterGuard *&get_schema_guard() { return schema_guard_; } + void set_sql_ctx(ObSqlCtx *sql_ctx) { sql_ctx_ = sql_ctx; } DASRelatedTabletMap &get_related_tablet_map() { return related_tablet_map_; } bool is_partition_hit(); void unmark_need_check_server(); @@ -126,12 +121,11 @@ private: * external_cached_table_locs_ are "cached values" which only used by QC and do not need to serialized to SQC. */ DASTableLocList external_table_locs_; - share::schema::ObSchemaGetterGuard *schema_guard_; + ObSqlCtx *sql_ctx_; ObDASLocationRouter location_router_; ObDASTaskFactory das_factory_; DASRelatedTabletMap related_tablet_map_; common::ObIAllocator &allocator_; - bool self_schema_guard_; transaction::ObTxReadSnapshot snapshot_; // Mvcc snapshot int64_t savepoint_; // DML savepoint //@todo: save snapshot version diff --git a/src/sql/engine/cmd/ob_load_data_rpc.cpp b/src/sql/engine/cmd/ob_load_data_rpc.cpp index e7d0f9e001..d2ad19d6ec 100644 --- a/src/sql/engine/cmd/ob_load_data_rpc.cpp +++ b/src/sql/engine/cmd/ob_load_data_rpc.cpp @@ -134,6 +134,10 @@ int ObRpcLoadDataShuffleTaskExecuteP::process() if (OB_FAIL(ObGlobalLoadDataStatMap::getInstance()->get_job_status(task.gid_, job_status))) { LOG_WARN("fail to get job, main thread has already quit", K(ret), K(task)); + } else if (OB_FAIL(GCTX.schema_service_->get_tenant_schema_guard(MTL_ID(), schema_guard_))) { + //Confirmed with the load data owner that the inability to calculate the correct tablet_id here will not affect the execution, + //so we use the latest schema version to obtain the guard + LOG_WARN("get tenant schema guard failed", KR(ret)); } else if (OB_ISNULL(job_status)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("job status is null", K(ret)); @@ -144,6 +148,7 @@ int ObRpcLoadDataShuffleTaskExecuteP::process() ret = OB_ERR_UNEXPECTED; LOG_ERROR("handle is null", K(ret)); } else { + handle->exec_ctx.get_sql_ctx()->schema_guard_ = &schema_guard_; if (OB_UNLIKELY(THIS_WORKER.is_timeout())) { ret = OB_TIMEOUT; LOG_WARN("LOAD DATA shuffle task timeout", K(ret), K(task)); diff --git a/src/sql/engine/cmd/ob_load_data_rpc.h b/src/sql/engine/cmd/ob_load_data_rpc.h index b08cf1481b..87afe79626 100644 --- a/src/sql/engine/cmd/ob_load_data_rpc.h +++ b/src/sql/engine/cmd/ob_load_data_rpc.h @@ -530,6 +530,7 @@ protected: int process(); private: const observer::ObGlobalContext &gctx_; + share::schema::ObSchemaGetterGuard schema_guard_; }; class ObRpcLoadDataShuffleTaskCallBack diff --git a/src/sql/engine/ob_exec_context.h b/src/sql/engine/ob_exec_context.h index 67c6085a7f..75e139139d 100644 --- a/src/sql/engine/ob_exec_context.h +++ b/src/sql/engine/ob_exec_context.h @@ -322,7 +322,7 @@ public: // const common::ObInterruptibleTaskID &get_interrupt_id() { return interrupt_id_;} // void set_interrupt_id(const common::ObInterruptibleTaskID &int_id) { interrupt_id_ = int_id; } - void set_sql_ctx(ObSqlCtx *ctx) { sql_ctx_ = ctx; } + void set_sql_ctx(ObSqlCtx *ctx) { sql_ctx_ = ctx; das_ctx_.set_sql_ctx(ctx); } ObSqlCtx *get_sql_ctx() { return sql_ctx_; } const ObSqlCtx *get_sql_ctx() const { return sql_ctx_; } pl::ObPLContext *get_pl_stack_ctx() { return pl_stack_ctx_; } diff --git a/src/sql/ob_sql.cpp b/src/sql/ob_sql.cpp index 2d2f6635c1..a290db9755 100644 --- a/src/sql/ob_sql.cpp +++ b/src/sql/ob_sql.cpp @@ -1463,7 +1463,6 @@ int ObSql::handle_pl_execute(const ObString &sql, ? PRIV_CHECK_FLAG_DISABLE : PRIV_CHECK_FLAG_IN_PL; pctx = ectx.get_physical_plan_ctx(); - ectx.get_das_ctx().get_schema_guard() = context.schema_guard_; int64_t local_tenant_schema_version = -1; int64_t local_sys_schema_version = -1; if (OB_ISNULL(context.schema_guard_)) { @@ -1972,7 +1971,6 @@ int ObSql::handle_ps_execute(const ObPsStmtId client_stmt_id, ObIAllocator &allocator = result.get_mem_pool(); ObSQLSessionInfo &session = result.get_session(); ObExecContext &ectx = result.get_exec_context(); - ectx.get_das_ctx().get_schema_guard() = context.schema_guard_; ParamStore fixed_params( (ObWrapperAllocator(allocator)) ); ParamStore ps_params( (ObWrapperAllocator(allocator)) ); ObPsCache *ps_cache = session.get_ps_cache(); @@ -2144,7 +2142,6 @@ int ObSql::handle_remote_query(const ObRemoteSqlInfo &remote_sql_info, ObIAllocator &allocator = THIS_WORKER.get_sql_arena_allocator(); ObSQLSessionInfo *session = exec_ctx.get_my_session(); - exec_ctx.get_das_ctx().get_schema_guard() = context.schema_guard_; int get_plan_err = OB_SUCCESS; //used for judge whether add plan to plan cache bool is_from_plan_cache = false; ObPlanCacheCtx *pc_ctx = NULL; @@ -2359,7 +2356,6 @@ OB_INLINE int ObSql::handle_text_query(const ObString &stmt, ObSqlCtx &context, ObSQLSessionInfo &session = result.get_session(); const uint64_t tenant_id = session.get_effective_tenant_id(); ObExecContext& ectx = result.get_exec_context(); - ectx.get_das_ctx().get_schema_guard() = context.schema_guard_; int get_plan_err = OB_SUCCESS; //used for judge whether add plan to plan cache bool use_plan_cache = session.get_local_ob_enable_plan_cache(); ObPlanCacheCtx *pc_ctx = NULL; diff --git a/src/sql/optimizer/ob_table_location.cpp b/src/sql/optimizer/ob_table_location.cpp index 100d0dbaa8..b414b4279e 100644 --- a/src/sql/optimizer/ob_table_location.cpp +++ b/src/sql/optimizer/ob_table_location.cpp @@ -1444,11 +1444,12 @@ int ObTableLocation::calculate_partition_ids_by_rows2(ObSQLSessionInfo &session_ ObArenaAllocator allocator(ObModIds::OB_SQL_TABLE_LOCATION); SMART_VAR(ObExecContext, exec_ctx, allocator) { ObSqlSchemaGuard sql_schema_guard; + ObSqlCtx sql_ctx; + sql_ctx.schema_guard_ = &schema_guard; sql_schema_guard.set_schema_guard(&schema_guard); + exec_ctx.set_sql_ctx(&sql_ctx); exec_ctx.set_my_session(&session_info); ObDASTabletMapper tablet_mapper; - OZ(OB_FAIL(exec_ctx.get_das_ctx().get_das_tablet_mapper(table_id, tablet_mapper, - &loc_meta_.related_table_ids_))); if (OB_UNLIKELY(is_virtual_table(table_id))) { ret = OB_NOT_SUPPORTED; LOG_USER_ERROR(OB_NOT_SUPPORTED, "Calculate virtual table partition id with rowkey"); @@ -1470,6 +1471,10 @@ int ObTableLocation::calculate_partition_ids_by_rows2(ObSQLSessionInfo &session_ } } } + } else if (OB_FAIL(exec_ctx.get_das_ctx().get_das_tablet_mapper(table_id, + tablet_mapper, + &loc_meta_.related_table_ids_))) { + LOG_WARN("get das tablet mapper failed", KR(ret), K(table_id)); } else {//TODO: copied from calc_partition_ids_by_rowkey() ObSEArray tmp_part_ids; ObSEArray tmp_tablet_ids; @@ -1528,6 +1533,7 @@ int ObTableLocation::calculate_partition_ids_by_rowkey(ObSQLSessionInfo &session SMART_VAR(ObExecContext, exec_ctx, allocator) { ObSqlSchemaGuard sql_schema_guard; ObSqlCtx sql_ctx; + sql_ctx.schema_guard_ = &schema_guard; sql_schema_guard.set_schema_guard(&schema_guard); exec_ctx.set_my_session(&session_info); exec_ctx.set_sql_ctx(&sql_ctx);