From 257eda4284906cc8ebaf49179efce62e13e95874 Mon Sep 17 00:00:00 2001 From: obdev Date: Fri, 14 Apr 2023 08:50:06 +0000 Subject: [PATCH] Fix direct load concurrent access extra buf in px_mode --- .../table_load/ob_table_load_store.cpp | 33 ++++++++++++------- src/observer/table_load/ob_table_load_store.h | 1 - .../table_load/ob_table_load_store_ctx.cpp | 28 ++++++++++++++++ .../table_load/ob_table_load_store_ctx.h | 2 ++ 4 files changed, 51 insertions(+), 13 deletions(-) diff --git a/src/observer/table_load/ob_table_load_store.cpp b/src/observer/table_load/ob_table_load_store.cpp index b18eb1b1d..35db7851b 100644 --- a/src/observer/table_load/ob_table_load_store.cpp +++ b/src/observer/table_load/ob_table_load_store.cpp @@ -841,6 +841,9 @@ int ObTableLoadStore::px_start_trans(const ObTableLoadTransId &trans_id) if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("ObTableLoadStore not init", KR(ret), KP(this)); + } else if (OB_UNLIKELY(trans_id.segment_id_.id_ > param_.session_count_)) { + ret = OB_NOT_SUPPORTED; + LOG_WARN("not support trans id", KR(ret), K(trans_id), K(param_.session_count_)); } else { ObTableLoadStoreTrans *trans = nullptr; if (OB_FAIL(store_ctx_->start_trans(trans_id, trans))) { @@ -866,8 +869,11 @@ int ObTableLoadStore::px_finish_trans(const ObTableLoadTransId &trans_id) LOG_WARN("ObTableLoadStore not init", KR(ret), KP(this)); } else { ObTableLoadStoreTrans *trans = nullptr; - if (OB_FAIL(store_ctx_->get_trans(trans_id, trans))) { - LOG_WARN("fail to get trans", KR(ret)); + if (OB_FAIL(store_ctx_->get_segment_trans(trans_id.segment_id_, trans))) { + LOG_WARN("fail to get segment trans", KR(ret)); + } else if (OB_UNLIKELY(trans_id != trans->get_trans_id())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected trans id", KR(ret), K(trans_id), KPC(trans)); } else if (OB_FAIL(px_flush(trans))) { LOG_WARN("fail to do px flush", KR(ret)); } else if (OB_FAIL(store_ctx_->commit_trans(trans))) { @@ -892,15 +898,15 @@ int ObTableLoadStore::px_abandon_trans(const ObTableLoadTransId &trans_id) } else { LOG_INFO("store px abandon trans", K(trans_id)); ObTableLoadStoreTrans *trans = nullptr; - if (OB_FAIL(store_ctx_->get_trans(trans_id, trans))) { + if (OB_FAIL(store_ctx_->get_segment_trans(trans_id.segment_id_, trans))) { if (OB_UNLIKELY(OB_ENTRY_NOT_EXIST != ret)) { - LOG_WARN("fail to get trans", KR(ret)); + LOG_WARN("fail to get segment trans", KR(ret)); } else { ret = OB_SUCCESS; } - } else if (OB_UNLIKELY(trans_id != trans->get_trans_ctx()->trans_id_)) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid trans id", KR(ret), K(trans_id), KPC(trans)); + } else if (OB_UNLIKELY(trans_id != trans->get_trans_id())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected trans id", KR(ret), K(trans_id), KPC(trans)); } else if (OB_FAIL(trans->set_trans_status_abort())) { LOG_WARN("fail to set trans status abort", KR(ret)); } else if (OB_FAIL(store_ctx_->abort_trans(trans))) { @@ -926,14 +932,17 @@ int ObTableLoadStore::px_write(const ObTableLoadTransId &trans_id, } else { ObTableLoadStoreTrans *trans = nullptr; ObTableLoadTransStoreWriter *store_writer = nullptr; - if (OB_FAIL(store_ctx_->get_trans(trans_id, trans))) { - LOG_WARN("fail to get trans", KR(ret)); + if (OB_FAIL(store_ctx_->get_segment_trans(trans_id.segment_id_, trans))) { + LOG_WARN("fail to get segment trans", KR(ret)); + } else if (OB_UNLIKELY(trans_id != trans->get_trans_id())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected trans id", KR(ret), K(trans_id), KPC(trans)); } else if (OB_FAIL(trans->get_store_writer(store_writer))) { LOG_WARN("fail to get store writer", KR(ret)); } else { if (OB_SUCC(trans->check_trans_status(ObTableLoadTransStatusType::RUNNING)) || OB_SUCC(trans->check_trans_status(ObTableLoadTransStatusType::FROZEN))) { - if (OB_FAIL(store_writer->write(PX_DEFAULT_SESSION_ID, tablet_id, row_array))) { + if (OB_FAIL(store_writer->write(trans_id.segment_id_.id_, tablet_id, row_array))) { LOG_WARN("fail to write store", KR(ret)); } else { LOG_DEBUG("succeed to write store", K(trans_id), K(tablet_id)); @@ -962,7 +971,7 @@ int ObTableLoadStore::px_clean_up_trans(ObTableLoadStoreTrans *trans) ObTableLoadTransStoreWriter *store_writer = nullptr; if (OB_FAIL(trans->get_store_writer(store_writer))) { LOG_WARN("fail to get store writer", KR(ret)); - } else if (OB_FAIL(store_writer->clean_up(PX_DEFAULT_SESSION_ID))) { + } else if (OB_FAIL(store_writer->clean_up(trans->get_trans_id().segment_id_.id_))) { LOG_WARN("fail to clean up store writer", KR(ret)); } if (OB_NOT_NULL(store_writer)) { @@ -987,7 +996,7 @@ int ObTableLoadStore::px_flush(ObTableLoadStoreTrans *trans) // after get store writer, avoid early commit else if (OB_FAIL(trans->set_trans_status_frozen())) { LOG_WARN("fail to freeze trans", KR(ret)); - } else if (OB_FAIL(store_writer->flush(PX_DEFAULT_SESSION_ID))) { + } else if (OB_FAIL(store_writer->flush(trans->get_trans_id().segment_id_.id_))) { LOG_WARN("fail to flush store", KR(ret)); } else { LOG_DEBUG("succeed to flush store"); diff --git a/src/observer/table_load/ob_table_load_store.h b/src/observer/table_load/ob_table_load_store.h index 98961415d..cb315baef 100644 --- a/src/observer/table_load/ob_table_load_store.h +++ b/src/observer/table_load/ob_table_load_store.h @@ -71,7 +71,6 @@ private: // px trans interface public: - static const int32_t PX_DEFAULT_SESSION_ID = 1; int px_start_trans(const table::ObTableLoadTransId &trans_id); int px_finish_trans(const table::ObTableLoadTransId &trans_id); int px_abandon_trans(const table::ObTableLoadTransId &trans_id); diff --git a/src/observer/table_load/ob_table_load_store_ctx.cpp b/src/observer/table_load/ob_table_load_store_ctx.cpp index 611786616..84266c28e 100644 --- a/src/observer/table_load/ob_table_load_store_ctx.cpp +++ b/src/observer/table_load/ob_table_load_store_ctx.cpp @@ -813,6 +813,34 @@ int ObTableLoadStoreCtx::get_trans_ctx(const ObTableLoadTransId &trans_id, return ret; } +int ObTableLoadStoreCtx::get_segment_trans(const ObTableLoadSegmentID &segment_id, + ObTableLoadStoreTrans *&trans) +{ + int ret = OB_SUCCESS; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + LOG_WARN("ObTableLoadStoreCtx not init", KR(ret)); + } else { + obsys::ObRLockGuard guard(rwlock_); + SegmentCtx *segment_ctx = nullptr; + if (OB_FAIL(segment_ctx_map_.get(segment_id, segment_ctx))) { + if (OB_UNLIKELY(OB_ENTRY_NOT_EXIST != ret)) { + LOG_WARN("fail to get segment ctx", KR(ret)); + } + } else if (OB_ISNULL(segment_ctx->current_trans_)) { + ret = OB_ENTRY_NOT_EXIST; + LOG_WARN("active segment trans not exist", KR(ret), KPC(segment_ctx)); + } else { + trans = segment_ctx->current_trans_; + trans->inc_ref_count(); + } + if (OB_NOT_NULL(segment_ctx)) { + segment_ctx_map_.revert(segment_ctx); + } + } + return ret; +} + int ObTableLoadStoreCtx::get_active_trans_ids(ObIArray &trans_id_array) const { int ret = OB_SUCCESS; diff --git a/src/observer/table_load/ob_table_load_store_ctx.h b/src/observer/table_load/ob_table_load_store_ctx.h index f382436d2..01f497646 100644 --- a/src/observer/table_load/ob_table_load_store_ctx.h +++ b/src/observer/table_load/ob_table_load_store_ctx.h @@ -109,6 +109,8 @@ public: int get_trans(const table::ObTableLoadTransId &trans_id, ObTableLoadStoreTrans *&trans); int get_trans_ctx(const table::ObTableLoadTransId &trans_id, ObTableLoadTransCtx *&trans_ctx) const; + int get_segment_trans(const table::ObTableLoadSegmentID &segment_id, + ObTableLoadStoreTrans *&trans); int get_active_trans_ids(common::ObIArray &trans_id_array) const; int get_committed_trans_ids(table::ObTableLoadArray &trans_id_array, common::ObIAllocator &allocator) const;