Fix table load task concurrency problem

This commit is contained in:
obdev
2023-02-17 08:11:54 +00:00
committed by ob-robot
parent b0b540090f
commit 38ec0851ad
36 changed files with 1258 additions and 751 deletions

View File

@ -23,12 +23,11 @@ using namespace sql;
using namespace table;
using namespace obrpc;
ObTableLoadTableCtx::ObTableLoadTableCtx(const ObTableLoadParam &param)
: param_(param),
coordinator_ctx_(nullptr),
ObTableLoadTableCtx::ObTableLoadTableCtx()
: coordinator_ctx_(nullptr),
store_ctx_(nullptr),
job_stat_(nullptr),
allocator_("TLD_TableCtx", OB_MALLOC_NORMAL_BLOCK_SIZE, param.tenant_id_),
allocator_("TLD_TableCtx"),
ref_count_(0),
is_dirty_(false),
is_inited_(false)
@ -40,13 +39,19 @@ ObTableLoadTableCtx::~ObTableLoadTableCtx()
destroy();
}
int ObTableLoadTableCtx::init()
int ObTableLoadTableCtx::init(const ObTableLoadParam &param, const ObTableLoadDDLParam &ddl_param)
{
int ret = OB_SUCCESS;
if (IS_INIT) {
ret = OB_INIT_TWICE;
LOG_WARN("ObTableLoadTableCtx init twice", KR(ret));
} else if (OB_UNLIKELY(!param.is_valid() || !ddl_param.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", KR(ret), K(param), K(ddl_param));
} else {
param_ = param;
ddl_param_ = ddl_param;
allocator_.set_tenant_id(MTL_ID());
if (OB_FAIL(schema_.init(param_.tenant_id_, param_.table_id_))) {
LOG_WARN("fail to init table load schema", KR(ret), K(param_.tenant_id_),
K(param_.table_id_));
@ -142,24 +147,20 @@ void ObTableLoadTableCtx::unregister_job_stat()
}
}
int ObTableLoadTableCtx::init_coordinator_ctx(const ObIArray<int64_t> &idx_array,
ObSQLSessionInfo *session_info)
int ObTableLoadTableCtx::init_coordinator_ctx(const ObIArray<int64_t> &idx_array, uint64_t user_id)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ObTableLoadTableCtx not init", KR(ret));
} else if (OB_NOT_NULL(coordinator_ctx_)) {
ret = OB_ENTRY_EXIST;
ret = OB_ERR_UNEXPECTED;
LOG_WARN("coordinator ctx already exist", KR(ret));
} else {
if (session_info == nullptr) {
session_info = &session_info_;
}
if (OB_ISNULL(coordinator_ctx_ = OB_NEWx(ObTableLoadCoordinatorCtx, (&allocator_), this))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to new ObTableLoadCoordinatorCtx", KR(ret));
} else if (OB_FAIL(coordinator_ctx_->init(session_info, idx_array))) {
} else if (OB_FAIL(coordinator_ctx_->init(idx_array, user_id))) {
LOG_WARN("fail to init coordinator ctx", KR(ret));
} else if (OB_FAIL(coordinator_ctx_->set_status_inited())) {
LOG_WARN("fail to set coordinator status inited", KR(ret));
@ -168,20 +169,7 @@ int ObTableLoadTableCtx::init_coordinator_ctx(const ObIArray<int64_t> &idx_array
return ret;
}
int ObTableLoadTableCtx::init_session_info(uint64_t user_id)
{
int ret = OB_SUCCESS;
if (OB_FAIL(session_info_.init(0, 0, nullptr, nullptr, ObTimeUtility::current_time(), MTL_ID()))) {
LOG_WARN("fail to init session info", KR(ret));
}
OZ (session_info_.load_default_sys_variable(false, false)); //加载默认的session参数
OZ (session_info_.load_default_configs_in_pc());
OX (session_info_.set_priv_user_id(user_id));
return ret;
}
int ObTableLoadTableCtx::init_store_ctx(
int64_t ddl_task_id,
const ObTableLoadArray<ObTableLoadLSIdAndPartitionId> &partition_id_array,
const ObTableLoadArray<ObTableLoadLSIdAndPartitionId> &target_partition_id_array)
{
@ -196,7 +184,7 @@ int ObTableLoadTableCtx::init_store_ctx(
if (OB_ISNULL(store_ctx_ = OB_NEWx(ObTableLoadStoreCtx, (&allocator_), this))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to new ObTableLoadStoreCtx", KR(ret));
} else if (OB_FAIL(store_ctx_->init(ddl_task_id, partition_id_array, target_partition_id_array))) {
} else if (OB_FAIL(store_ctx_->init(partition_id_array, target_partition_id_array))) {
LOG_WARN("fail to init store ctx", KR(ret));
} else if (OB_FAIL(store_ctx_->set_status_inited())) {
LOG_WARN("fail to set store status inited", KR(ret));