das tablet mapper use schema_guard in sql_ctx

This commit is contained in:
leslieyuchen
2023-07-05 08:18:11 +00:00
committed by ob-robot
parent 4c1cda057a
commit 6819b7ed03
10 changed files with 38 additions and 50 deletions

View File

@ -731,7 +731,6 @@ int ObTableLoadCoordinator::drive_sql_stat(ObExecContext *ctx,
const uint64_t table_id = ctx_->ddl_param_.dest_table_id_;
ObSchemaGetterGuard schema_guard;
ObSchemaGetterGuard *tmp_schema_guard = nullptr;
ObSchemaGetterGuard *tmp_schema_guard2 = nullptr;
const ObTableSchema *table_schema = nullptr;
if (OB_UNLIKELY(nullptr == ctx || sql_statistics.is_empty())) {
ret = OB_INVALID_ARGUMENT;
@ -741,9 +740,8 @@ int ObTableLoadCoordinator::drive_sql_stat(ObExecContext *ctx,
LOG_WARN("fail to get table schema", KR(ret), K(tenant_id), K(table_id));
} else {
tmp_schema_guard = ctx->get_virtual_table_ctx().schema_guard_;
tmp_schema_guard2 = ctx->get_das_ctx().get_schema_guard();
ctx->get_sql_ctx()->schema_guard_ = &schema_guard;
ctx->get_das_ctx().get_schema_guard() = &schema_guard;
ctx->get_das_ctx().set_sql_ctx(ctx->get_sql_ctx());
}
ObSEArray<ObOptColumnStat *, 64> part_column_stats;
if (OB_FAIL(ret)) {
@ -754,7 +752,7 @@ int ObTableLoadCoordinator::drive_sql_stat(ObExecContext *ctx,
LOG_WARN("fail to drive global stat by direct load", KR(ret));
}
ctx->get_sql_ctx()->schema_guard_ = tmp_schema_guard;
ctx->get_das_ctx().get_schema_guard() = tmp_schema_guard2;
ctx->get_das_ctx().set_sql_ctx(ctx->get_sql_ctx());
return ret;
}

View File

@ -23,7 +23,8 @@ using namespace sql;
using namespace table;
ObTableLoadPartitionCalc::ObTableLoadPartitionCalc()
: is_partition_with_autoinc_(false),
: session_info_(nullptr),
is_partition_with_autoinc_(false),
partition_with_autoinc_idx_(OB_INVALID_INDEX),
tenant_id_(OB_INVALID_ID),
table_id_(OB_INVALID_ID),
@ -42,6 +43,8 @@ int ObTableLoadPartitionCalc::init(uint64_t tenant_id, uint64_t table_id, sql::O
LOG_WARN("ObTableLoadPartitionCalc init twice", KR(ret), KP(this));
} else {
allocator_.set_tenant_id(tenant_id);
sql_ctx_.schema_guard_ = &schema_guard_;
exec_ctx_.set_sql_ctx(&sql_ctx_);
const ObTableSchema *table_schema = nullptr;
ObDataTypeCastParams cast_params(session_info->get_timezone_info());
if (OB_FAIL(time_cvrt_.init(cast_params.get_nls_format(ObDateTimeType)))) {
@ -57,7 +60,6 @@ int ObTableLoadPartitionCalc::init(uint64_t tenant_id, uint64_t table_id, sql::O
LOG_WARN("fail to get tablet and object", KR(ret));
}
} else { // 分区表
exec_ctx_.set_sql_ctx(&sql_ctx_);
// 初始化table_location_
if (OB_FAIL(
table_location_.init_partition_ids_by_rowkey2(exec_ctx_, *session_info, schema_guard_, table_id))) {

View File

@ -51,7 +51,7 @@ private:
public:
struct IndexAndType
{
IndexAndType() : index_(-1) {}
IndexAndType() : index_(-1), column_schema_(nullptr) {}
int64_t index_;
const share::schema::ObColumnSchemaV2 *column_schema_;
TO_STRING_KV(K_(index), KP_(column_schema));

View File

@ -86,36 +86,22 @@ int ObDASCtx::get_das_tablet_mapper(const uint64_t ref_table_id,
if (tablet_mapper.is_non_partition_optimized()) {
// table ids has calced for no partition entity table, continue
} else if (!is_vt) {
if (schema_guard_ == nullptr) {
void *buf = allocator_.alloc(sizeof(ObSchemaGetterGuard));
if (OB_ISNULL(buf)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("allocate schema getter guard failed", K(ret));
} else {
schema_guard_ = new (buf) ObSchemaGetterGuard(share::schema::ObSchemaMgrItem::MOD_DAS_CTX);
self_schema_guard_ = true;
if (OB_FAIL(GCTX.schema_service_->get_tenant_schema_guard(tenant_id, *schema_guard_))) {
LOG_WARN("get schema guard failed", K(ret));
//release the schema guard when fetch the schema guard throw exception
schema_guard_->~ObSchemaGetterGuard();
schema_guard_ = nullptr;
}
}
}
//get ObTableSchema object corresponding to the table_id from ObSchemaGetterGuard
//record the ObTableSchema into tablet_mapper
//the tablet and partition info come from ObTableSchema in the real table
if (OB_SUCC(ret)) {
if (OB_FAIL(schema_guard_->get_table_schema(tenant_id,
real_table_id, tablet_mapper.table_schema_))) {
LOG_WARN("get table schema failed", K(ret), K(tenant_id), K(real_table_id));
} else if (OB_ISNULL(tablet_mapper.table_schema_)) {
ret = OB_TABLE_NOT_EXIST;
LOG_WARN("table schema is not found", K(ret), K(real_table_id));
} else {
tablet_mapper.related_info_.guard_ = schema_guard_;
}
ObSchemaGetterGuard *schema_guard = nullptr;
if (OB_ISNULL(sql_ctx_) || OB_ISNULL(schema_guard = sql_ctx_->schema_guard_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("schema guard is nullptr", K(ret), K(sql_ctx_), K(schema_guard));
} else if (OB_FAIL(schema_guard->get_table_schema(tenant_id,
real_table_id,
tablet_mapper.table_schema_))) {
LOG_WARN("get table schema failed", K(ret), K(tenant_id), K(real_table_id));
} else if (OB_ISNULL(tablet_mapper.table_schema_)) {
ret = OB_TABLE_NOT_EXIST;
LOG_WARN("table schema is not found", K(ret), K(real_table_id));
} else {
tablet_mapper.related_info_.guard_ = schema_guard;
}
} else {
//get all server lists corresponding to the table_id from the tablet location cache

View File

@ -43,12 +43,11 @@ public:
ObDASCtx(common::ObIAllocator &allocator)
: table_locs_(allocator),
external_table_locs_(allocator),
schema_guard_(nullptr),
sql_ctx_(nullptr),
location_router_(allocator),
das_factory_(allocator),
related_tablet_map_(allocator),
allocator_(allocator),
self_schema_guard_(false),
snapshot_(),
savepoint_(0),
del_ctx_list_(allocator),
@ -60,10 +59,6 @@ public:
}
~ObDASCtx()
{
if (schema_guard_ != nullptr && self_schema_guard_) {
schema_guard_->~ObSchemaGetterGuard();
schema_guard_ = nullptr;
}
}
int init(const ObPhysicalPlan &plan, ObExecContext &ctx);
@ -103,7 +98,7 @@ public:
external_table_locs_.clear();
}
ObDASTaskFactory &get_das_factory() { return das_factory_; }
ObSchemaGetterGuard *&get_schema_guard() { return schema_guard_; }
void set_sql_ctx(ObSqlCtx *sql_ctx) { sql_ctx_ = sql_ctx; }
DASRelatedTabletMap &get_related_tablet_map() { return related_tablet_map_; }
bool is_partition_hit();
void unmark_need_check_server();
@ -126,12 +121,11 @@ private:
* external_cached_table_locs_ are "cached values" which only used by QC and do not need to serialized to SQC.
*/
DASTableLocList external_table_locs_;
share::schema::ObSchemaGetterGuard *schema_guard_;
ObSqlCtx *sql_ctx_;
ObDASLocationRouter location_router_;
ObDASTaskFactory das_factory_;
DASRelatedTabletMap related_tablet_map_;
common::ObIAllocator &allocator_;
bool self_schema_guard_;
transaction::ObTxReadSnapshot snapshot_; // Mvcc snapshot
int64_t savepoint_; // DML savepoint
//@todo: save snapshot version

View File

@ -134,6 +134,10 @@ int ObRpcLoadDataShuffleTaskExecuteP::process()
if (OB_FAIL(ObGlobalLoadDataStatMap::getInstance()->get_job_status(task.gid_, job_status))) {
LOG_WARN("fail to get job, main thread has already quit", K(ret), K(task));
} else if (OB_FAIL(GCTX.schema_service_->get_tenant_schema_guard(MTL_ID(), schema_guard_))) {
//Confirmed with the load data owner that the inability to calculate the correct tablet_id here will not affect the execution,
//so we use the latest schema version to obtain the guard
LOG_WARN("get tenant schema guard failed", KR(ret));
} else if (OB_ISNULL(job_status)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("job status is null", K(ret));
@ -144,6 +148,7 @@ int ObRpcLoadDataShuffleTaskExecuteP::process()
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("handle is null", K(ret));
} else {
handle->exec_ctx.get_sql_ctx()->schema_guard_ = &schema_guard_;
if (OB_UNLIKELY(THIS_WORKER.is_timeout())) {
ret = OB_TIMEOUT;
LOG_WARN("LOAD DATA shuffle task timeout", K(ret), K(task));

View File

@ -530,6 +530,7 @@ protected:
int process();
private:
const observer::ObGlobalContext &gctx_;
share::schema::ObSchemaGetterGuard schema_guard_;
};
class ObRpcLoadDataShuffleTaskCallBack

View File

@ -322,7 +322,7 @@ public:
// const common::ObInterruptibleTaskID &get_interrupt_id() { return interrupt_id_;}
// void set_interrupt_id(const common::ObInterruptibleTaskID &int_id) { interrupt_id_ = int_id; }
void set_sql_ctx(ObSqlCtx *ctx) { sql_ctx_ = ctx; }
void set_sql_ctx(ObSqlCtx *ctx) { sql_ctx_ = ctx; das_ctx_.set_sql_ctx(ctx); }
ObSqlCtx *get_sql_ctx() { return sql_ctx_; }
const ObSqlCtx *get_sql_ctx() const { return sql_ctx_; }
pl::ObPLContext *get_pl_stack_ctx() { return pl_stack_ctx_; }

View File

@ -1463,7 +1463,6 @@ int ObSql::handle_pl_execute(const ObString &sql,
? PRIV_CHECK_FLAG_DISABLE
: PRIV_CHECK_FLAG_IN_PL;
pctx = ectx.get_physical_plan_ctx();
ectx.get_das_ctx().get_schema_guard() = context.schema_guard_;
int64_t local_tenant_schema_version = -1;
int64_t local_sys_schema_version = -1;
if (OB_ISNULL(context.schema_guard_)) {
@ -1972,7 +1971,6 @@ int ObSql::handle_ps_execute(const ObPsStmtId client_stmt_id,
ObIAllocator &allocator = result.get_mem_pool();
ObSQLSessionInfo &session = result.get_session();
ObExecContext &ectx = result.get_exec_context();
ectx.get_das_ctx().get_schema_guard() = context.schema_guard_;
ParamStore fixed_params( (ObWrapperAllocator(allocator)) );
ParamStore ps_params( (ObWrapperAllocator(allocator)) );
ObPsCache *ps_cache = session.get_ps_cache();
@ -2144,7 +2142,6 @@ int ObSql::handle_remote_query(const ObRemoteSqlInfo &remote_sql_info,
ObIAllocator &allocator = THIS_WORKER.get_sql_arena_allocator();
ObSQLSessionInfo *session = exec_ctx.get_my_session();
exec_ctx.get_das_ctx().get_schema_guard() = context.schema_guard_;
int get_plan_err = OB_SUCCESS; //used for judge whether add plan to plan cache
bool is_from_plan_cache = false;
ObPlanCacheCtx *pc_ctx = NULL;
@ -2359,7 +2356,6 @@ OB_INLINE int ObSql::handle_text_query(const ObString &stmt, ObSqlCtx &context,
ObSQLSessionInfo &session = result.get_session();
const uint64_t tenant_id = session.get_effective_tenant_id();
ObExecContext& ectx = result.get_exec_context();
ectx.get_das_ctx().get_schema_guard() = context.schema_guard_;
int get_plan_err = OB_SUCCESS; //used for judge whether add plan to plan cache
bool use_plan_cache = session.get_local_ob_enable_plan_cache();
ObPlanCacheCtx *pc_ctx = NULL;

View File

@ -1444,11 +1444,12 @@ int ObTableLocation::calculate_partition_ids_by_rows2(ObSQLSessionInfo &session_
ObArenaAllocator allocator(ObModIds::OB_SQL_TABLE_LOCATION);
SMART_VAR(ObExecContext, exec_ctx, allocator) {
ObSqlSchemaGuard sql_schema_guard;
ObSqlCtx sql_ctx;
sql_ctx.schema_guard_ = &schema_guard;
sql_schema_guard.set_schema_guard(&schema_guard);
exec_ctx.set_sql_ctx(&sql_ctx);
exec_ctx.set_my_session(&session_info);
ObDASTabletMapper tablet_mapper;
OZ(OB_FAIL(exec_ctx.get_das_ctx().get_das_tablet_mapper(table_id, tablet_mapper,
&loc_meta_.related_table_ids_)));
if (OB_UNLIKELY(is_virtual_table(table_id))) {
ret = OB_NOT_SUPPORTED;
LOG_USER_ERROR(OB_NOT_SUPPORTED, "Calculate virtual table partition id with rowkey");
@ -1470,6 +1471,10 @@ int ObTableLocation::calculate_partition_ids_by_rows2(ObSQLSessionInfo &session_
}
}
}
} else if (OB_FAIL(exec_ctx.get_das_ctx().get_das_tablet_mapper(table_id,
tablet_mapper,
&loc_meta_.related_table_ids_))) {
LOG_WARN("get das tablet mapper failed", KR(ret), K(table_id));
} else {//TODO: copied from calc_partition_ids_by_rowkey()
ObSEArray<ObObjectID, 1> tmp_part_ids;
ObSEArray<ObTabletID, 1> tmp_tablet_ids;
@ -1528,6 +1533,7 @@ int ObTableLocation::calculate_partition_ids_by_rowkey(ObSQLSessionInfo &session
SMART_VAR(ObExecContext, exec_ctx, allocator) {
ObSqlSchemaGuard sql_schema_guard;
ObSqlCtx sql_ctx;
sql_ctx.schema_guard_ = &schema_guard;
sql_schema_guard.set_schema_guard(&schema_guard);
exec_ctx.set_my_session(&session_info);
exec_ctx.set_sql_ctx(&sql_ctx);