only open direct_load_insert_tablet in inc mode
This commit is contained in:
@ -158,8 +158,14 @@ int ObTableLoadStore::confirm_begin()
|
||||
LOG_INFO("store confirm begin");
|
||||
store_ctx_->heart_beat(); // init heart beat
|
||||
store_ctx_->set_enable_heart_beat_check(true);
|
||||
if (OB_FAIL(open_insert_table_ctx())) {
|
||||
LOG_WARN("fail to open insert_table_ctx", KR(ret));
|
||||
if (ObDirectLoadMethod::is_incremental(param_.method_)) {
|
||||
if (OB_FAIL(open_insert_table_ctx())) {
|
||||
LOG_WARN("fail to open insert_table_ctx", KR(ret));
|
||||
}
|
||||
} else {
|
||||
if (OB_FAIL(ctx_->store_ctx_->set_status_loading())) {
|
||||
LOG_WARN("fail to set store status loading", KR(ret));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -88,6 +88,8 @@ ObDirectLoadInsertTabletContext::ObDirectLoadInsertTabletContext()
|
||||
param_(nullptr),
|
||||
context_id_(0),
|
||||
row_count_(0),
|
||||
open_err_(OB_SUCCESS),
|
||||
is_open_(false),
|
||||
is_create_(false),
|
||||
is_cancel_(false),
|
||||
is_inited_(false)
|
||||
@ -166,43 +168,47 @@ int ObDirectLoadInsertTabletContext::open()
|
||||
} else if (OB_UNLIKELY(is_cancel_)) {
|
||||
ret = OB_CANCELED;
|
||||
LOG_WARN("task is cancel", KR(ret));
|
||||
} else if (is_create_) {
|
||||
} else if (is_open_) {
|
||||
// do nothing
|
||||
} else {
|
||||
ObTenantDirectLoadMgr *sstable_insert_mgr = MTL(ObTenantDirectLoadMgr *);
|
||||
ObTabletDirectLoadInsertParam direct_load_param;
|
||||
direct_load_param.is_replay_ = false;
|
||||
direct_load_param.common_param_.direct_load_type_ =
|
||||
param_->is_incremental_ ? ObDirectLoadType::DIRECT_LOAD_INCREMENTAL
|
||||
: ObDirectLoadType::DIRECT_LOAD_LOAD_DATA;
|
||||
direct_load_param.common_param_.data_format_version_ = param_->data_version_;
|
||||
direct_load_param.common_param_.read_snapshot_ = param_->snapshot_version_;
|
||||
direct_load_param.common_param_.ls_id_ = ls_id_;
|
||||
direct_load_param.common_param_.tablet_id_ = tablet_id_;
|
||||
direct_load_param.runtime_only_param_.exec_ctx_ = nullptr;
|
||||
direct_load_param.runtime_only_param_.task_id_ = param_->ddl_task_id_;
|
||||
direct_load_param.runtime_only_param_.table_id_ = param_->table_id_;
|
||||
direct_load_param.runtime_only_param_.schema_version_ = param_->schema_version_;
|
||||
direct_load_param.runtime_only_param_.task_cnt_ = 1; // default value.
|
||||
direct_load_param.runtime_only_param_.parallel_ = param_->parallel_;
|
||||
direct_load_param.runtime_only_param_.tx_desc_ = param_->trans_param_.tx_desc_;
|
||||
direct_load_param.runtime_only_param_.trans_id_ = param_->trans_param_.tx_id_;
|
||||
direct_load_param.runtime_only_param_.seq_no_ = param_->trans_param_.tx_seq_.cast_to_int();
|
||||
if (OB_FAIL(sstable_insert_mgr->create_tablet_direct_load(
|
||||
context_id_, context_id_ /*execution_id*/, direct_load_param))) {
|
||||
LOG_WARN("create tablet manager failed", KR(ret), K(direct_load_param));
|
||||
} else {
|
||||
is_create_ = true;
|
||||
lib::ObMutexGuard guard(mutex_);
|
||||
if (OB_FAIL(open_err_)) {
|
||||
LOG_WARN("open has error", KR(ret), K(origin_tablet_id_), K(tablet_id_));
|
||||
} else if (!is_open_) {
|
||||
ObTenantDirectLoadMgr *sstable_insert_mgr = MTL(ObTenantDirectLoadMgr *);
|
||||
ObTabletDirectLoadInsertParam direct_load_param;
|
||||
direct_load_param.is_replay_ = false;
|
||||
direct_load_param.common_param_.direct_load_type_ =
|
||||
param_->is_incremental_ ? ObDirectLoadType::DIRECT_LOAD_INCREMENTAL
|
||||
: ObDirectLoadType::DIRECT_LOAD_LOAD_DATA;
|
||||
direct_load_param.common_param_.data_format_version_ = param_->data_version_;
|
||||
direct_load_param.common_param_.read_snapshot_ = param_->snapshot_version_;
|
||||
direct_load_param.common_param_.ls_id_ = ls_id_;
|
||||
direct_load_param.common_param_.tablet_id_ = tablet_id_;
|
||||
direct_load_param.runtime_only_param_.exec_ctx_ = nullptr;
|
||||
direct_load_param.runtime_only_param_.task_id_ = param_->ddl_task_id_;
|
||||
direct_load_param.runtime_only_param_.table_id_ = param_->table_id_;
|
||||
direct_load_param.runtime_only_param_.schema_version_ = param_->schema_version_;
|
||||
direct_load_param.runtime_only_param_.task_cnt_ = 1; // default value.
|
||||
direct_load_param.runtime_only_param_.parallel_ = param_->parallel_;
|
||||
direct_load_param.runtime_only_param_.tx_desc_ = param_->trans_param_.tx_desc_;
|
||||
direct_load_param.runtime_only_param_.trans_id_ = param_->trans_param_.tx_id_;
|
||||
direct_load_param.runtime_only_param_.seq_no_ = param_->trans_param_.tx_seq_.cast_to_int();
|
||||
if (OB_FAIL(sstable_insert_mgr->create_tablet_direct_load(
|
||||
context_id_, context_id_ /*execution_id*/, direct_load_param))) {
|
||||
LOG_WARN("create tablet manager failed", KR(ret), K(direct_load_param));
|
||||
} else if (FALSE_IT(is_create_ = true)) {
|
||||
} else if (OB_FAIL(sstable_insert_mgr->open_tablet_direct_load(
|
||||
!param_->is_incremental_, ls_id_, tablet_id_, context_id_, start_scn_, handle_))) {
|
||||
LOG_WARN("fail to open tablet direct load", KR(ret), K(tablet_id_));
|
||||
} else {
|
||||
is_open_ = true;
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
open_err_ = ret; // avoid open repeatedly when failed
|
||||
}
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
ObTenantDirectLoadMgr *sstable_insert_mgr = MTL(ObTenantDirectLoadMgr *);
|
||||
if (OB_FAIL(sstable_insert_mgr->open_tablet_direct_load(
|
||||
!param_->is_incremental_, ls_id_, tablet_id_, context_id_, start_scn_, handle_))) {
|
||||
LOG_WARN("fail to open tablet direct load", KR(ret), K(tablet_id_));
|
||||
}
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -212,9 +218,9 @@ int ObDirectLoadInsertTabletContext::close()
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ObDirectLoadInsertTabletContext not init", KR(ret), KP(this));
|
||||
} else if (OB_UNLIKELY(is_cancel_)) {
|
||||
} else if (OB_UNLIKELY(!is_open_ || is_cancel_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected to close", KR(ret), K(is_cancel_));
|
||||
LOG_WARN("unexpected to close", KR(ret), K(is_open_), K(is_cancel_));
|
||||
} else {
|
||||
ObTenantDirectLoadMgr *sstable_insert_mgr = MTL(ObTenantDirectLoadMgr *);
|
||||
if (OB_FAIL(sstable_insert_mgr->close_tablet_direct_load(
|
||||
@ -222,6 +228,7 @@ int ObDirectLoadInsertTabletContext::close()
|
||||
true /*emergent_finish*/))) {
|
||||
LOG_WARN("fail to close tablet direct load", KR(ret), K(ls_id_), K(tablet_id_));
|
||||
} else {
|
||||
is_open_ = false;
|
||||
handle_.reset();
|
||||
}
|
||||
}
|
||||
@ -431,6 +438,9 @@ int ObDirectLoadInsertTabletContext::fill_lob_meta_sstable_slice(const int64_t &
|
||||
} else if (OB_UNLIKELY(is_cancel_)) {
|
||||
ret = OB_CANCELED;
|
||||
LOG_WARN("task is cancel", KR(ret));
|
||||
} else if (OB_UNLIKELY(!is_open_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected not open", KR(ret));
|
||||
} else {
|
||||
ObDirectLoadInsertTabletContext *tablet_ctx = nullptr;
|
||||
ObTenantDirectLoadMgr *sstable_insert_mgr = MTL(ObTenantDirectLoadMgr *);
|
||||
@ -466,6 +476,9 @@ int ObDirectLoadInsertTabletContext::fill_lob_sstable_slice(ObIAllocator &alloca
|
||||
} else if (OB_UNLIKELY(is_cancel_)) {
|
||||
ret = OB_CANCELED;
|
||||
LOG_WARN("task is cancel", KR(ret));
|
||||
} else if (OB_UNLIKELY(!is_open_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected not open", KR(ret));
|
||||
} else {
|
||||
ObDirectLoadSliceInfo slice_info;
|
||||
slice_info.is_full_direct_load_ = !param_->is_incremental_;
|
||||
@ -504,7 +517,9 @@ int ObDirectLoadInsertTabletContext::open_sstable_slice(const ObMacroDataSeq &st
|
||||
slice_info.data_tablet_id_ = tablet_id_;
|
||||
slice_info.slice_id_ = slice_id;
|
||||
slice_info.context_id_ = context_id_;
|
||||
if (OB_FAIL(sstable_insert_mgr->open_sstable_slice(start_seq, slice_info))) {
|
||||
if (OB_FAIL(open())) {
|
||||
LOG_WARN("fail to open tablet direct load", KR(ret));
|
||||
} else if (OB_FAIL(sstable_insert_mgr->open_sstable_slice(start_seq, slice_info))) {
|
||||
LOG_WARN("fail to construct sstable slice writer", KR(ret), K(slice_info.data_tablet_id_));
|
||||
} else {
|
||||
slice_id = slice_info.slice_id_;
|
||||
@ -532,7 +547,9 @@ int ObDirectLoadInsertTabletContext::open_lob_sstable_slice(const ObMacroDataSeq
|
||||
slice_info.data_tablet_id_ = tablet_id_;
|
||||
slice_info.slice_id_ = slice_id;
|
||||
slice_info.context_id_ = context_id_;
|
||||
if (OB_FAIL(sstable_insert_mgr->open_sstable_slice(start_seq, slice_info))) {
|
||||
if (OB_FAIL(open())) {
|
||||
LOG_WARN("fail to open tablet direct load", KR(ret));
|
||||
} else if (OB_FAIL(sstable_insert_mgr->open_sstable_slice(start_seq, slice_info))) {
|
||||
LOG_WARN("fail to construct sstable slice writer", KR(ret), K(slice_info.data_tablet_id_));
|
||||
} else {
|
||||
slice_id = slice_info.slice_id_;
|
||||
|
@ -176,6 +176,9 @@ public:
|
||||
K_(start_scn),
|
||||
K_(handle),
|
||||
K_(row_count),
|
||||
K_(open_err),
|
||||
K_(is_open),
|
||||
K_(is_create),
|
||||
K_(is_cancel));
|
||||
private:
|
||||
int get_pk_interval(uint64_t count, share::ObTabletCacheInterval &pk_interval);
|
||||
@ -197,6 +200,8 @@ private:
|
||||
share::SCN start_scn_;
|
||||
ObTabletDirectLoadMgrHandle handle_;
|
||||
int64_t row_count_;
|
||||
int open_err_;
|
||||
volatile bool is_open_;
|
||||
bool is_create_;
|
||||
bool is_cancel_;
|
||||
bool is_inited_;
|
||||
|
Reference in New Issue
Block a user