Fix direct load concurrent access extra buf in px_mode
This commit is contained in:
parent
ebf454f130
commit
257eda4284
@ -841,6 +841,9 @@ int ObTableLoadStore::px_start_trans(const ObTableLoadTransId &trans_id)
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ObTableLoadStore not init", KR(ret), KP(this));
|
||||
} else if (OB_UNLIKELY(trans_id.segment_id_.id_ > param_.session_count_)) {
|
||||
ret = OB_NOT_SUPPORTED;
|
||||
LOG_WARN("not support trans id", KR(ret), K(trans_id), K(param_.session_count_));
|
||||
} else {
|
||||
ObTableLoadStoreTrans *trans = nullptr;
|
||||
if (OB_FAIL(store_ctx_->start_trans(trans_id, trans))) {
|
||||
@ -866,8 +869,11 @@ int ObTableLoadStore::px_finish_trans(const ObTableLoadTransId &trans_id)
|
||||
LOG_WARN("ObTableLoadStore not init", KR(ret), KP(this));
|
||||
} else {
|
||||
ObTableLoadStoreTrans *trans = nullptr;
|
||||
if (OB_FAIL(store_ctx_->get_trans(trans_id, trans))) {
|
||||
LOG_WARN("fail to get trans", KR(ret));
|
||||
if (OB_FAIL(store_ctx_->get_segment_trans(trans_id.segment_id_, trans))) {
|
||||
LOG_WARN("fail to get segment trans", KR(ret));
|
||||
} else if (OB_UNLIKELY(trans_id != trans->get_trans_id())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected trans id", KR(ret), K(trans_id), KPC(trans));
|
||||
} else if (OB_FAIL(px_flush(trans))) {
|
||||
LOG_WARN("fail to do px flush", KR(ret));
|
||||
} else if (OB_FAIL(store_ctx_->commit_trans(trans))) {
|
||||
@ -892,15 +898,15 @@ int ObTableLoadStore::px_abandon_trans(const ObTableLoadTransId &trans_id)
|
||||
} else {
|
||||
LOG_INFO("store px abandon trans", K(trans_id));
|
||||
ObTableLoadStoreTrans *trans = nullptr;
|
||||
if (OB_FAIL(store_ctx_->get_trans(trans_id, trans))) {
|
||||
if (OB_FAIL(store_ctx_->get_segment_trans(trans_id.segment_id_, trans))) {
|
||||
if (OB_UNLIKELY(OB_ENTRY_NOT_EXIST != ret)) {
|
||||
LOG_WARN("fail to get trans", KR(ret));
|
||||
LOG_WARN("fail to get segment trans", KR(ret));
|
||||
} else {
|
||||
ret = OB_SUCCESS;
|
||||
}
|
||||
} else if (OB_UNLIKELY(trans_id != trans->get_trans_ctx()->trans_id_)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid trans id", KR(ret), K(trans_id), KPC(trans));
|
||||
} else if (OB_UNLIKELY(trans_id != trans->get_trans_id())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected trans id", KR(ret), K(trans_id), KPC(trans));
|
||||
} else if (OB_FAIL(trans->set_trans_status_abort())) {
|
||||
LOG_WARN("fail to set trans status abort", KR(ret));
|
||||
} else if (OB_FAIL(store_ctx_->abort_trans(trans))) {
|
||||
@ -926,14 +932,17 @@ int ObTableLoadStore::px_write(const ObTableLoadTransId &trans_id,
|
||||
} else {
|
||||
ObTableLoadStoreTrans *trans = nullptr;
|
||||
ObTableLoadTransStoreWriter *store_writer = nullptr;
|
||||
if (OB_FAIL(store_ctx_->get_trans(trans_id, trans))) {
|
||||
LOG_WARN("fail to get trans", KR(ret));
|
||||
if (OB_FAIL(store_ctx_->get_segment_trans(trans_id.segment_id_, trans))) {
|
||||
LOG_WARN("fail to get segment trans", KR(ret));
|
||||
} else if (OB_UNLIKELY(trans_id != trans->get_trans_id())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected trans id", KR(ret), K(trans_id), KPC(trans));
|
||||
} else if (OB_FAIL(trans->get_store_writer(store_writer))) {
|
||||
LOG_WARN("fail to get store writer", KR(ret));
|
||||
} else {
|
||||
if (OB_SUCC(trans->check_trans_status(ObTableLoadTransStatusType::RUNNING)) ||
|
||||
OB_SUCC(trans->check_trans_status(ObTableLoadTransStatusType::FROZEN))) {
|
||||
if (OB_FAIL(store_writer->write(PX_DEFAULT_SESSION_ID, tablet_id, row_array))) {
|
||||
if (OB_FAIL(store_writer->write(trans_id.segment_id_.id_, tablet_id, row_array))) {
|
||||
LOG_WARN("fail to write store", KR(ret));
|
||||
} else {
|
||||
LOG_DEBUG("succeed to write store", K(trans_id), K(tablet_id));
|
||||
@ -962,7 +971,7 @@ int ObTableLoadStore::px_clean_up_trans(ObTableLoadStoreTrans *trans)
|
||||
ObTableLoadTransStoreWriter *store_writer = nullptr;
|
||||
if (OB_FAIL(trans->get_store_writer(store_writer))) {
|
||||
LOG_WARN("fail to get store writer", KR(ret));
|
||||
} else if (OB_FAIL(store_writer->clean_up(PX_DEFAULT_SESSION_ID))) {
|
||||
} else if (OB_FAIL(store_writer->clean_up(trans->get_trans_id().segment_id_.id_))) {
|
||||
LOG_WARN("fail to clean up store writer", KR(ret));
|
||||
}
|
||||
if (OB_NOT_NULL(store_writer)) {
|
||||
@ -987,7 +996,7 @@ int ObTableLoadStore::px_flush(ObTableLoadStoreTrans *trans)
|
||||
// after get store writer, avoid early commit
|
||||
else if (OB_FAIL(trans->set_trans_status_frozen())) {
|
||||
LOG_WARN("fail to freeze trans", KR(ret));
|
||||
} else if (OB_FAIL(store_writer->flush(PX_DEFAULT_SESSION_ID))) {
|
||||
} else if (OB_FAIL(store_writer->flush(trans->get_trans_id().segment_id_.id_))) {
|
||||
LOG_WARN("fail to flush store", KR(ret));
|
||||
} else {
|
||||
LOG_DEBUG("succeed to flush store");
|
||||
|
@ -71,7 +71,6 @@ private:
|
||||
|
||||
// px trans interface
|
||||
public:
|
||||
static const int32_t PX_DEFAULT_SESSION_ID = 1;
|
||||
int px_start_trans(const table::ObTableLoadTransId &trans_id);
|
||||
int px_finish_trans(const table::ObTableLoadTransId &trans_id);
|
||||
int px_abandon_trans(const table::ObTableLoadTransId &trans_id);
|
||||
|
@ -813,6 +813,34 @@ int ObTableLoadStoreCtx::get_trans_ctx(const ObTableLoadTransId &trans_id,
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableLoadStoreCtx::get_segment_trans(const ObTableLoadSegmentID &segment_id,
|
||||
ObTableLoadStoreTrans *&trans)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ObTableLoadStoreCtx not init", KR(ret));
|
||||
} else {
|
||||
obsys::ObRLockGuard guard(rwlock_);
|
||||
SegmentCtx *segment_ctx = nullptr;
|
||||
if (OB_FAIL(segment_ctx_map_.get(segment_id, segment_ctx))) {
|
||||
if (OB_UNLIKELY(OB_ENTRY_NOT_EXIST != ret)) {
|
||||
LOG_WARN("fail to get segment ctx", KR(ret));
|
||||
}
|
||||
} else if (OB_ISNULL(segment_ctx->current_trans_)) {
|
||||
ret = OB_ENTRY_NOT_EXIST;
|
||||
LOG_WARN("active segment trans not exist", KR(ret), KPC(segment_ctx));
|
||||
} else {
|
||||
trans = segment_ctx->current_trans_;
|
||||
trans->inc_ref_count();
|
||||
}
|
||||
if (OB_NOT_NULL(segment_ctx)) {
|
||||
segment_ctx_map_.revert(segment_ctx);
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableLoadStoreCtx::get_active_trans_ids(ObIArray<ObTableLoadTransId> &trans_id_array) const
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
@ -109,6 +109,8 @@ public:
|
||||
int get_trans(const table::ObTableLoadTransId &trans_id, ObTableLoadStoreTrans *&trans);
|
||||
int get_trans_ctx(const table::ObTableLoadTransId &trans_id,
|
||||
ObTableLoadTransCtx *&trans_ctx) const;
|
||||
int get_segment_trans(const table::ObTableLoadSegmentID &segment_id,
|
||||
ObTableLoadStoreTrans *&trans);
|
||||
int get_active_trans_ids(common::ObIArray<table::ObTableLoadTransId> &trans_id_array) const;
|
||||
int get_committed_trans_ids(table::ObTableLoadArray<table::ObTableLoadTransId> &trans_id_array,
|
||||
common::ObIAllocator &allocator) const;
|
||||
|
Loading…
x
Reference in New Issue
Block a user