Fix direct load commit early due to concurrency issues
This commit is contained in:
@ -369,8 +369,6 @@ int ObTableLoadStore::pre_finish_trans(const ObTableLoadTransId &trans_id)
|
||||
ObTableLoadStoreTrans *trans = nullptr;
|
||||
if (OB_FAIL(store_ctx_->get_trans(trans_id, trans))) {
|
||||
LOG_WARN("fail to get trans", KR(ret));
|
||||
} else if (OB_FAIL(trans->set_trans_status_frozen())) {
|
||||
LOG_WARN("fail to freeze trans", KR(ret));
|
||||
} else if (OB_FAIL(flush(trans))) {
|
||||
LOG_WARN("fail to flush", KR(ret));
|
||||
}
|
||||
@ -497,7 +495,7 @@ int ObTableLoadStore::clean_up_trans(ObTableLoadStoreTrans *trans)
|
||||
LOG_DEBUG("store clean up trans");
|
||||
ObTableLoadTransStoreWriter *store_writer = nullptr;
|
||||
// 取出当前store_writer
|
||||
if (OB_FAIL(trans->get_store_writer_for_clean_up(store_writer))) {
|
||||
if (OB_FAIL(trans->get_store_writer(store_writer))) {
|
||||
LOG_WARN("fail to get store writer", KR(ret));
|
||||
} else {
|
||||
for (int32_t session_id = 1; OB_SUCC(ret) && session_id <= param_.session_count_;
|
||||
@ -660,7 +658,7 @@ int ObTableLoadStore::write(const ObTableLoadTransId &trans_id, int32_t session_
|
||||
LOG_WARN("fail to get trans", KR(ret));
|
||||
}
|
||||
// 取出store_writer
|
||||
else if (OB_FAIL(trans->get_store_writer_for_write(store_writer))) {
|
||||
else if (OB_FAIL(trans->get_store_writer(store_writer))) {
|
||||
LOG_WARN("fail to get store writer", KR(ret));
|
||||
//} else if (OB_FAIL(store_writer->advance_sequence_no(session_id, partition_id, sequence_no, guard))) {
|
||||
// if (OB_UNLIKELY(OB_ENTRY_EXIST != ret)) {
|
||||
@ -796,8 +794,12 @@ int ObTableLoadStore::flush(ObTableLoadStoreTrans *trans)
|
||||
LOG_DEBUG("store flush");
|
||||
ObTableLoadTransStoreWriter *store_writer = nullptr;
|
||||
// 取出当前store_writer
|
||||
if (OB_FAIL(trans->get_store_writer_for_flush(store_writer))) {
|
||||
if (OB_FAIL(trans->get_store_writer(store_writer))) {
|
||||
LOG_WARN("fail to get store writer", KR(ret));
|
||||
}
|
||||
// after get store writer, avoid early commit
|
||||
else if (OB_FAIL(trans->set_trans_status_frozen())) {
|
||||
LOG_WARN("fail to freeze trans", KR(ret));
|
||||
} else {
|
||||
for (int32_t session_id = 1; OB_SUCC(ret) && session_id <= param_.session_count_; ++session_id) {
|
||||
ObTableLoadTask *task = nullptr;
|
||||
@ -866,8 +868,6 @@ int ObTableLoadStore::px_finish_trans(const ObTableLoadTransId &trans_id)
|
||||
ObTableLoadStoreTrans *trans = nullptr;
|
||||
if (OB_FAIL(store_ctx_->get_trans(trans_id, trans))) {
|
||||
LOG_WARN("fail to get trans", KR(ret));
|
||||
} else if (OB_FAIL(trans->set_trans_status_frozen())) {
|
||||
LOG_WARN("fail to freeze trans", KR(ret));
|
||||
} else if (OB_FAIL(px_flush(trans))) {
|
||||
LOG_WARN("fail to do px flush", KR(ret));
|
||||
} else if (OB_FAIL(store_ctx_->commit_trans(trans))) {
|
||||
@ -928,7 +928,7 @@ int ObTableLoadStore::px_write(const ObTableLoadTransId &trans_id,
|
||||
ObTableLoadTransStoreWriter *store_writer = nullptr;
|
||||
if (OB_FAIL(store_ctx_->get_trans(trans_id, trans))) {
|
||||
LOG_WARN("fail to get trans", KR(ret));
|
||||
} else if (OB_FAIL(trans->get_store_writer_for_write(store_writer))) {
|
||||
} else if (OB_FAIL(trans->get_store_writer(store_writer))) {
|
||||
LOG_WARN("fail to get store writer", KR(ret));
|
||||
} else {
|
||||
if (OB_SUCC(trans->check_trans_status(ObTableLoadTransStatusType::RUNNING)) ||
|
||||
@ -960,8 +960,8 @@ int ObTableLoadStore::px_clean_up_trans(ObTableLoadStoreTrans *trans)
|
||||
LOG_WARN("ObTableLoadStore not init", KR(ret), KP(this));
|
||||
} else {
|
||||
ObTableLoadTransStoreWriter *store_writer = nullptr;
|
||||
if (OB_FAIL(trans->get_store_writer_for_clean_up(store_writer))) {
|
||||
LOG_WARN("fail to get store writer for clean up", KR(ret));
|
||||
if (OB_FAIL(trans->get_store_writer(store_writer))) {
|
||||
LOG_WARN("fail to get store writer", KR(ret));
|
||||
} else if (OB_FAIL(store_writer->clean_up(PX_DEFAULT_SESSION_ID))) {
|
||||
LOG_WARN("fail to clean up store writer", KR(ret));
|
||||
}
|
||||
@ -981,8 +981,12 @@ int ObTableLoadStore::px_flush(ObTableLoadStoreTrans *trans)
|
||||
LOG_WARN("ObTableLoadStore not init", KR(ret), KP(this));
|
||||
} else {
|
||||
ObTableLoadTransStoreWriter *store_writer = nullptr;
|
||||
if (OB_FAIL(trans->get_store_writer_for_flush(store_writer))) {
|
||||
if (OB_FAIL(trans->get_store_writer(store_writer))) {
|
||||
LOG_WARN("fail to get store writer", KR(ret));
|
||||
}
|
||||
// after get store writer, avoid early commit
|
||||
else if (OB_FAIL(trans->set_trans_status_frozen())) {
|
||||
LOG_WARN("fail to freeze trans", KR(ret));
|
||||
} else if (OB_FAIL(store_writer->flush(PX_DEFAULT_SESSION_ID))) {
|
||||
LOG_WARN("fail to flush store", KR(ret));
|
||||
} else {
|
||||
|
@ -106,69 +106,13 @@ int ObTableLoadStoreTrans::set_trans_status_abort()
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableLoadStoreTrans::get_store_writer_for_write(
|
||||
ObTableLoadTransStoreWriter *&store_writer) const
|
||||
int ObTableLoadStoreTrans::get_store_writer(ObTableLoadTransStoreWriter *&store_writer) const
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
store_writer = nullptr;
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ObTableLoadStoreTrans not init", KR(ret), KP(this));
|
||||
} else if (OB_FAIL(check_trans_status(ObTableLoadTransStatusType::RUNNING))) {
|
||||
LOG_WARN("fail to check trans status", KR(ret));
|
||||
} else {
|
||||
obsys::ObRLockGuard guard(trans_ctx_->rwlock_);
|
||||
if (OB_ISNULL(trans_store_writer_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected null store writer", KR(ret));
|
||||
} else if (OB_UNLIKELY(trans_store_writer_->is_flush())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("trans store writer is flush", KR(ret));
|
||||
} else {
|
||||
store_writer = trans_store_writer_;
|
||||
store_writer->inc_ref_count();
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableLoadStoreTrans::get_store_writer_for_flush(
|
||||
ObTableLoadTransStoreWriter *&store_writer) const
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
store_writer = nullptr;
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ObTableLoadStoreTrans not init", KR(ret), KP(this));
|
||||
} else if (OB_FAIL(check_trans_status(ObTableLoadTransStatusType::FROZEN))) {
|
||||
LOG_WARN("fail to check trans status", KR(ret));
|
||||
} else {
|
||||
obsys::ObRLockGuard guard(trans_ctx_->rwlock_);
|
||||
if (OB_ISNULL(trans_store_writer_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected null store writer", KR(ret));
|
||||
} else if (OB_UNLIKELY(trans_store_writer_->is_flush())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("trans store writer is flush", KR(ret));
|
||||
} else {
|
||||
trans_store_writer_->set_is_flush();
|
||||
store_writer = trans_store_writer_;
|
||||
store_writer->inc_ref_count();
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableLoadStoreTrans::get_store_writer_for_clean_up(
|
||||
ObTableLoadTransStoreWriter *&store_writer) const
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
store_writer = nullptr;
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ObTableLoadStoreTrans not init", KR(ret), KP(this));
|
||||
} else if (OB_FAIL(check_trans_status(ObTableLoadTransStatusType::ABORT))) {
|
||||
LOG_WARN("fail to check trans status", KR(ret));
|
||||
} else {
|
||||
obsys::ObRLockGuard guard(trans_ctx_->rwlock_);
|
||||
if (OB_ISNULL(trans_store_writer_)) {
|
||||
|
@ -57,9 +57,7 @@ public:
|
||||
private:
|
||||
int advance_trans_status(table::ObTableLoadTransStatusType trans_status);
|
||||
public:
|
||||
int get_store_writer_for_write(ObTableLoadTransStoreWriter *&store_writer) const;
|
||||
int get_store_writer_for_flush(ObTableLoadTransStoreWriter *&store_writer) const;
|
||||
int get_store_writer_for_clean_up(ObTableLoadTransStoreWriter *&store_writer) const;
|
||||
int get_store_writer(ObTableLoadTransStoreWriter *&store_writer) const;
|
||||
void put_store_writer(ObTableLoadTransStoreWriter *store_writer);
|
||||
// 取出store
|
||||
int output_store(ObTableLoadTransStore *&trans_store);
|
||||
|
@ -107,7 +107,6 @@ ObTableLoadTransStoreWriter::ObTableLoadTransStoreWriter(ObTableLoadTransStore *
|
||||
allocator_("TLD_TSWriter", OB_MALLOC_NORMAL_BLOCK_SIZE, param_.tenant_id_),
|
||||
table_data_desc_(nullptr),
|
||||
ref_count_(0),
|
||||
is_flush_(false),
|
||||
is_inited_(false)
|
||||
{
|
||||
}
|
||||
|
@ -64,8 +64,6 @@ public:
|
||||
int flush(int32_t session_id);
|
||||
int clean_up(int32_t session_id);
|
||||
public:
|
||||
void set_is_flush() { is_flush_ = true; }
|
||||
bool is_flush() const { return is_flush_; }
|
||||
int64_t get_ref_count() const { return ATOMIC_LOAD(&ref_count_); }
|
||||
int64_t inc_ref_count() { return ATOMIC_AAF(&ref_count_, 1); }
|
||||
int64_t dec_ref_count() { return ATOMIC_AAF(&ref_count_, -1); }
|
||||
@ -112,7 +110,6 @@ private:
|
||||
};
|
||||
SessionContext *session_ctx_array_;
|
||||
int64_t ref_count_ CACHE_ALIGNED;
|
||||
bool is_flush_;
|
||||
bool is_inited_;
|
||||
ObSchemaGetterGuard schema_guard_;
|
||||
};
|
||||
|
Reference in New Issue
Block a user