[FIX] serialize tx ctx in tx ctx lock

This commit is contained in:
ZenoWang 2023-08-11 02:12:53 +00:00 committed by ob-robot
parent b56b811398
commit 115f639cd5
5 changed files with 53 additions and 49 deletions

View File

@ -58,8 +58,9 @@ using namespace share;
namespace storage
{
int ObTxCtxMemtableScanIterator::get_next_tx_ctx_table_info_(transaction::ObPartTransCtx *&tx_ctx,
ObTxCtxTableInfo &ctx_info)
int ObTxCtxMemtableScanIterator::serialize_next_tx_ctx_(ObTxLocalBuffer &buffer,
int64_t &serialize_size,
transaction::ObPartTransCtx *&tx_ctx)
{
int ret = OB_SUCCESS;
bool need_retry = true;
@ -69,7 +70,7 @@ int ObTxCtxMemtableScanIterator::get_next_tx_ctx_table_info_(transaction::ObPart
if (OB_ITER_END != ret) {
STORAGE_LOG(WARN, "ls_tx_ctx_iter_.get_next_tx_ctx failed", K(ret));
}
} else if (OB_FAIL(tx_ctx->get_tx_ctx_table_info(ctx_info))) {
} else if (OB_FAIL(tx_ctx->serialize_tx_ctx_to_buffer(buffer, serialize_size))) {
if (OB_TRANS_CTX_NOT_EXIST == ret) {
ret = OB_SUCCESS;
} else {
@ -82,9 +83,9 @@ int ObTxCtxMemtableScanIterator::get_next_tx_ctx_table_info_(transaction::ObPart
}
if (OB_FAIL(ret)) {
STORAGE_LOG(INFO, "get next tx ctx table info failed", KR(ret), KPC(tx_ctx), K(ctx_info.tx_data_guard_));
STORAGE_LOG(INFO, "get next tx ctx table info failed", KR(ret), KPC(tx_ctx));
} else if (SLEEP_BEFORE_DUMP_TX_CTX) {
fprintf(stdout, "ready to dump tx ctx, undo status node ptr : %p\n", ctx_info.tx_data_guard_.tx_data()->undo_status_list_.head_);
fprintf(stdout, "ready to dump tx ctx, undo status node ptr : %p\n", tx_ctx->ctx_tx_data_.tx_data_guard_.tx_data()->undo_status_list_.head_);
fprintf(stdout, "sleep 20 seconds before dump\n");
HAS_GOT_TX_CTX = true;
SLEEP_BEFORE_DUMP_TX_CTX = false;

View File

@ -1561,9 +1561,11 @@ int ObPartTransCtx::recover_tx_ctx_table_info(ObTxCtxTableInfo &ctx_info)
}
// Checkpoint the tx ctx table
int ObPartTransCtx::get_tx_ctx_table_info(ObTxCtxTableInfo &info)
int ObPartTransCtx::serialize_tx_ctx_to_buffer(ObTxLocalBuffer &buffer, int64_t &serialize_size)
{
int ret = OB_SUCCESS;
ObTxCtxTableInfo ctx_info;
CtxLockGuard guard(lock_);
// 1. Tx ctx has already exited, so it means that it may have no chance to
// push its rec_log_ts to aggre_rec_log_ts, so we must not persist it
@ -1586,16 +1588,25 @@ int ObPartTransCtx::get_tx_ctx_table_info(ObTxCtxTableInfo &info)
TRANS_LOG(INFO, "tx ctx is in complete replay ctx", K(ret), KPC(this));
}
// 3. Fetch the current state of the tx ctx table
} else if (OB_FAIL(get_tx_ctx_table_info_(info))) {
} else if (OB_FAIL(get_tx_ctx_table_info_(ctx_info))) {
TRANS_LOG(WARN, "get tx ctx table info failed", K(ret), K(*this));
} else if (OB_UNLIKELY(!info.is_valid())) {
} else if (OB_UNLIKELY(!ctx_info.is_valid())) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "tx ctx info invalid", K(ret), K(info));
TRANS_LOG(WARN, "tx ctx info invalid", K(ret), K(ctx_info));
// 4. Refresh the rec_log_ts for the next checkpoint
} else if (OB_FAIL(refresh_rec_log_ts_())) {
TRANS_LOG(WARN, "refresh rec log ts failed", K(ret), K(*this));
} else {
is_ctx_table_merged_ = true;
// 5. Do serialize
int64_t pos = 0;
serialize_size = ctx_info.get_serialize_size();
if (OB_FAIL(buffer.reserve(serialize_size))) {
TRANS_LOG(WARN, "Failed to reserve local buffer", KR(ret));
} else if (OB_FAIL(ctx_info.serialize(buffer.get_ptr(), serialize_size, pos))) {
TRANS_LOG(WARN, "failed to serialize ctx_info", KR(ret), K(ctx_info), K(pos));
} else {
is_ctx_table_merged_ = true;
}
}
return ret;

View File

@ -380,6 +380,7 @@ public:
// get_tx_ctx_table_info returns OB_TRANS_CTX_NOT_EXIST if the tx ctx table need not to be
// dumped.
int get_tx_ctx_table_info(ObTxCtxTableInfo &info);
int serialize_tx_ctx_to_buffer(ObTxLocalBuffer &buffer, int64_t &serialize_size);
int recover_tx_ctx_table_info(ObTxCtxTableInfo &ctx_info);
// leader switch related

View File

@ -693,8 +693,9 @@ int ObTxCtxMemtableScanIterator::init(ObTxCtxMemtable *tx_ctx_memtable)
return ret;
}
int ObTxCtxMemtableScanIterator::get_next_tx_ctx_table_info_(transaction::ObPartTransCtx *&tx_ctx,
ObTxCtxTableInfo &ctx_info)
int ObTxCtxMemtableScanIterator::serialize_next_tx_ctx_(ObTxLocalBuffer &buffer,
int64_t &serialize_size,
transaction::ObPartTransCtx *&tx_ctx)
{
int ret = OB_SUCCESS;
bool need_retry = true;
@ -704,7 +705,7 @@ int ObTxCtxMemtableScanIterator::get_next_tx_ctx_table_info_(transaction::ObPart
if (OB_ITER_END != ret) {
STORAGE_LOG(WARN, "ls_tx_ctx_iter_.get_next_tx_ctx failed", K(ret));
}
} else if (OB_FAIL(tx_ctx->get_tx_ctx_table_info(ctx_info))) {
} else if (OB_FAIL(tx_ctx->serialize_tx_ctx_to_buffer(buffer, serialize_size))) {
if (OB_TRANS_CTX_NOT_EXIST == ret) {
ret = OB_SUCCESS;
} else {
@ -724,7 +725,6 @@ int ObTxCtxMemtableScanIterator::inner_get_next_row(const ObDatumRow *&row)
int ret = OB_SUCCESS;
ObTxCtxTableMeta curr_meta;
ObTxCtxTableInfo ctx_info;
transaction::ObPartTransCtx *tx_ctx = NULL;
char *row_buf = NULL;
int64_t need_merge_length = 0;
@ -733,40 +733,31 @@ int ObTxCtxMemtableScanIterator::inner_get_next_row(const ObDatumRow *&row)
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
STORAGE_LOG(WARN, "tx ctx memtable scan iterator is not inited");
}
if (OB_SUCC(ret)) {
if (has_unmerged_buf_) {
row_buf = buf_.get_ptr() + unmerged_buf_start_pos_;
need_merge_length = prev_meta_.get_tx_ctx_serialize_size() - unmerged_buf_start_pos_;
if (OB_FAIL(prev_meta_.get_multi_row_next_extent(curr_meta))) {
STORAGE_LOG(WARN, "prev_meta_.get_multi_row_next_extent failed", KR(ret), K_(prev_meta));
} else if (has_unmerged_buf_) {
// a single row can not hold the whole tx ctx
row_buf = buf_.get_ptr() + unmerged_buf_start_pos_;
need_merge_length = prev_meta_.get_tx_ctx_serialize_size() - unmerged_buf_start_pos_;
if (OB_FAIL(prev_meta_.get_multi_row_next_extent(curr_meta))) {
STORAGE_LOG(WARN, "prev_meta_.get_multi_row_next_extent failed", KR(ret), K_(prev_meta));
}
STORAGE_LOG(DEBUG, "write prev tx ctx unmerged buffer", K(prev_meta_));
} else {
// get next tx ctx and serialize it into buffer
int64_t serialize_size = 0;
if (OB_FAIL(serialize_next_tx_ctx_(buf_, serialize_size, tx_ctx))) {
if (OB_ITER_END != ret) {
STORAGE_LOG(WARN, "get_next_tx_ctx_table_info_ failed", K(ret));
}
STORAGE_LOG(DEBUG, "write prev tx ctx unmerged buffer", K(prev_meta_));
} else {
if (OB_FAIL(get_next_tx_ctx_table_info_(tx_ctx, ctx_info))) {
if (OB_ITER_END != ret) {
STORAGE_LOG(WARN, "get_next_tx_ctx_table_info_ failed", K(ret));
}
} else {
int64_t serialize_size = ctx_info.get_serialize_size();
curr_meta.init(tx_ctx->get_trans_id(), tx_ctx->get_ls_id(), serialize_size,
// ceil((double)serialize_size / MAX_VALUE_LENGTH_)
(serialize_size + MAX_VALUE_LENGTH_ - 1) / MAX_VALUE_LENGTH_, 0);
if (OB_FAIL(buf_.reserve(serialize_size))) {
STORAGE_LOG(WARN, "Failed to reserve local buffer", KR(ret));
} else {
int64_t pos = 0;
if (OB_FAIL(ctx_info.serialize(buf_.get_ptr(), serialize_size, pos))) {
STORAGE_LOG(WARN, "failed to serialize ctx_info", KR(ret), K(ctx_info), K(pos));
} else {
row_buf = buf_.get_ptr();
need_merge_length = serialize_size;
}
}
STORAGE_LOG(DEBUG, "write tx ctx info", K(ctx_info), K(serialize_size));
ls_tx_ctx_iter_.revert_tx_ctx(tx_ctx);
}
(void)curr_meta.init(tx_ctx->get_trans_id(),
tx_ctx->get_ls_id(),
serialize_size,
(serialize_size + MAX_VALUE_LENGTH_ - 1) / MAX_VALUE_LENGTH_ /* row_num */,
0 /* row_idx */);
row_buf = buf_.get_ptr();
need_merge_length = serialize_size;
STORAGE_LOG(DEBUG, "write tx ctx info", KPC(tx_ctx), K(serialize_size));
ls_tx_ctx_iter_.revert_tx_ctx(tx_ctx);
}
}
@ -814,7 +805,7 @@ int ObTxCtxMemtableScanIterator::inner_get_next_row(const ObDatumRow *&row)
row_.set_last_multi_version_row();
row_.set_compacted_multi_version_row();
row = &row_;
STORAGE_LOG(DEBUG, "write tx ctx info", K(ctx_info), K(idx_), K(curr_meta));
STORAGE_LOG(DEBUG, "write tx ctx info", K(idx_), K(curr_meta));
idx_++;
}

View File

@ -265,8 +265,8 @@ public:
return ret;
}
private:
int get_next_tx_ctx_table_info_(transaction::ObPartTransCtx *&tx_ctx,
ObTxCtxTableInfo &ctx_info);
int serialize_next_tx_ctx_(ObTxLocalBuffer &buffer, int64_t &serialize_size, transaction::ObPartTransCtx *&tx_ctx);
private:
const static int64_t TX_CTX_META_BUF_LENGTH = 256;
const static int64_t TX_CTX_BUF_LENGTH = 1000;