cache data version in session for performance optimize
This commit is contained in:
parent
86e6b53016
commit
fd54ded76c
@ -394,7 +394,7 @@ int ObTableApiProcessorBase::init_read_trans(const ObTableConsistencyLevel consi
|
||||
bool strong_read = ObTableConsistencyLevel::STRONG == consistency_level;
|
||||
transaction::ObTransService *txs = MTL(transaction::ObTransService*);
|
||||
|
||||
if (OB_FAIL(txs->acquire_tx(trans_desc_, session().get_sessid()))) {
|
||||
if (OB_FAIL(txs->acquire_tx(trans_desc_, session().get_sessid(), session().get_data_version()))) {
|
||||
LOG_WARN("failed to acquire tx desc", K(ret));
|
||||
} else if (OB_FAIL(setup_tx_snapshot_(*trans_desc_, tx_snapshot_, strong_read, ls_id, timeout_ts))) {
|
||||
LOG_WARN("setup txn snapshot fail", K(ret), KPC_(trans_desc), K(strong_read), K(ls_id), K(timeout_ts));
|
||||
@ -485,7 +485,7 @@ int ObTableApiProcessorBase::start_trans_(bool is_readonly,
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_ERROR("start_trans is executed", K(ret));
|
||||
} else {
|
||||
if (OB_FAIL(txs->acquire_tx(trans_desc, session().get_sessid()))) {
|
||||
if (OB_FAIL(txs->acquire_tx(trans_desc, session().get_sessid(), session().get_data_version()))) {
|
||||
LOG_WARN("failed to acquire tx desc", K(ret));
|
||||
} else if (OB_FAIL(txs->start_tx(*trans_desc, tx_param))) {
|
||||
LOG_WARN("failed to start trans", K(ret), KPC(trans_desc));
|
||||
|
@ -106,7 +106,7 @@ int ObTMService::tm_rm_start(ObExecContext &exec_ctx,
|
||||
tx_param.timeout_us_ = tx_timeout;
|
||||
tx_param.lock_timeout_us_ = my_session->get_trx_lock_timeout();
|
||||
if (OB_FAIL(xa_service->xa_start_for_tm(0, timeout_seconds, my_session->get_sessid(),
|
||||
tx_param, tx_desc, xid))) {
|
||||
tx_param, tx_desc, xid, my_session->get_data_version()))) {
|
||||
LOG_WARN("xa start for dblink failed", K(ret), K(tx_param));
|
||||
// TODO, reset
|
||||
my_session->reset_first_need_txn_stmt_type();
|
||||
|
@ -99,7 +99,8 @@ int ObPlXaStartExecutor::execute(ObExecContext &ctx, ObXaStartStmt &stmt)
|
||||
my_session->get_xa_end_timeout_seconds(),
|
||||
my_session->get_sessid(),
|
||||
tx_param,
|
||||
tx_desc))) {
|
||||
tx_desc,
|
||||
my_session->get_data_version()))) {
|
||||
LOG_WARN("xa start failed", K(ret), K(tx_param));
|
||||
my_session->reset_tx_variable();
|
||||
my_session->set_early_lock_release(false);
|
||||
|
@ -171,7 +171,7 @@ int ObSqlTransControl::explicit_start_trans(ObExecContext &ctx, const bool read_
|
||||
|
||||
ObTxParam &tx_param = plan_ctx->get_trans_param();
|
||||
OZ (build_tx_param_(session, tx_param, &read_only));
|
||||
OZ (txs->acquire_tx(session->get_tx_desc(), session->get_sessid()));
|
||||
OZ (txs->acquire_tx(session->get_tx_desc(), session->get_sessid(), session->get_data_version()));
|
||||
OZ (txs->start_tx(*session->get_tx_desc(), tx_param), tx_param);
|
||||
OX (tx_id = session->get_tx_desc()->get_tx_id());
|
||||
|
||||
@ -1220,7 +1220,10 @@ int ObSqlTransControl::get_trans_result(ObExecContext &exec_ctx)
|
||||
return get_trans_result(exec_ctx, exec_ctx.get_my_session()->get_trans_result());
|
||||
}
|
||||
|
||||
int ObSqlTransControl::reset_session_tx_state(ObBasicSessionInfo *session, bool reuse_tx_desc, bool reset_trans_variable)
|
||||
int ObSqlTransControl::reset_session_tx_state(ObBasicSessionInfo *session,
|
||||
bool reuse_tx_desc,
|
||||
bool reset_trans_variable,
|
||||
const uint64_t data_version)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
LOG_DEBUG("reset session tx state", KPC(session->get_tx_desc()), K(lbt()));
|
||||
@ -1233,7 +1236,7 @@ int ObSqlTransControl::reset_session_tx_state(ObBasicSessionInfo *session, bool
|
||||
transaction::ObTransService *txs = NULL;
|
||||
OZ (get_tx_service(session, txs), *session, tx_desc);
|
||||
if (reuse_tx_desc) {
|
||||
if (OB_FAIL(txs->reuse_tx(tx_desc))) {
|
||||
if (OB_FAIL(txs->reuse_tx(tx_desc, data_version))) {
|
||||
LOG_ERROR("reuse txn descriptor fail, will release it", K(ret), KPC(session), K(tx_desc));
|
||||
OZ (txs->release_tx(tx_desc), tx_id);
|
||||
session->get_tx_desc() = NULL;
|
||||
@ -1262,7 +1265,8 @@ int ObSqlTransControl::reset_session_tx_state(ObSQLSessionInfo *session, bool re
|
||||
LOG_WARN_RET(temp_ret, "trx level temporary table clean failed", KR(temp_ret));
|
||||
}
|
||||
}
|
||||
int ret = reset_session_tx_state(static_cast<ObBasicSessionInfo*>(session), reuse_tx_desc, reset_trans_variable);
|
||||
int ret = reset_session_tx_state(static_cast<ObBasicSessionInfo*>(session), reuse_tx_desc,
|
||||
reset_trans_variable, session->get_data_version());
|
||||
return COVER_SUCC(temp_ret);
|
||||
}
|
||||
|
||||
@ -1291,7 +1295,7 @@ int ObSqlTransControl::acquire_tx_if_need_(transaction::ObTransService *txs, ObS
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_ISNULL(session.get_tx_desc())) {
|
||||
OZ (txs->acquire_tx(session.get_tx_desc(), session.get_sessid()), session);
|
||||
OZ (txs->acquire_tx(session.get_tx_desc(), session.get_sessid(), session.get_data_version()), session);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -1313,7 +1317,7 @@ int ObSqlTransControl::lock_table(ObExecContext &exec_ctx,
|
||||
OZ (get_tx_service(session, txs));
|
||||
OZ (get_lock_service(session->get_effective_tenant_id(), lock_service));
|
||||
if (OB_SUCC(ret) && OB_ISNULL(session->get_tx_desc())) {
|
||||
OZ (txs->acquire_tx(session->get_tx_desc(), session->get_sessid()), *session);
|
||||
OZ (txs->acquire_tx(session->get_tx_desc(), session->get_sessid(), session->get_data_version()), *session);
|
||||
}
|
||||
ObTxParam tx_param;
|
||||
OZ (build_tx_param_(session, tx_param));
|
||||
@ -1442,7 +1446,7 @@ int ObSqlTransControl::check_ls_readable(const uint64_t tenant_id,
|
||||
bool has_tx_desc = OB_NOT_NULL(tx_desc); \
|
||||
transaction::ObTransID prev_tx_id; \
|
||||
if (has_tx_desc) { prev_tx_id = session.get_tx_id(); } \
|
||||
OZ (txs->txn_free_route__update_##name##_state(session.get_sessid(), tx_desc, session.get_txn_free_route_ctx(), buf, len, pos), session); \
|
||||
OZ (txs->txn_free_route__update_##name##_state(session.get_sessid(), tx_desc, session.get_txn_free_route_ctx(), buf, len, pos, session.get_data_version()), session); \
|
||||
if (OB_SUCC(ret) && has_tx_desc && (OB_ISNULL(tx_desc) || tx_desc->get_tx_id() != prev_tx_id)) { \
|
||||
session.reset_tx_variable(); \
|
||||
} \
|
||||
|
@ -168,7 +168,10 @@ class ObSqlTransControl
|
||||
{
|
||||
public:
|
||||
static int reset_session_tx_state(ObSQLSessionInfo *session, bool reuse_tx_desc = false, bool active_tx_end = true);
|
||||
static int reset_session_tx_state(ObBasicSessionInfo *session, bool reuse_tx_desc = false, bool active_tx_end = true);
|
||||
static int reset_session_tx_state(ObBasicSessionInfo *session,
|
||||
bool reuse_tx_desc = false,
|
||||
bool active_tx_end = true,
|
||||
const uint64_t data_version = 0);
|
||||
static int create_stash_savepoint(ObExecContext &exec_ctx, const ObString &name);
|
||||
static int release_stash_savepoint(ObExecContext &exec_ctx, const ObString &name);
|
||||
static int explicit_start_trans(ObExecContext &exec_ctx, const bool read_only, const ObString hint = ObString());
|
||||
|
@ -2791,6 +2791,13 @@ void ObSQLSessionInfo::ObCachedTenantConfigInfo::refresh()
|
||||
LOG_DEBUG("refresh tenant config where tenant changed",
|
||||
K_(saved_tenant_info), K(effective_tenant_id));
|
||||
ATOMIC_STORE(&saved_tenant_info_, effective_tenant_id);
|
||||
}
|
||||
// 缓存data version 用于性能优化
|
||||
uint64_t data_version = 0;
|
||||
if (OB_TMP_FAIL(GET_MIN_DATA_VERSION(effective_tenant_id, data_version))) {
|
||||
LOG_WARN_RET(tmp_ret, "get data version fail", "ret", tmp_ret, K(effective_tenant_id));
|
||||
} else {
|
||||
ATOMIC_STORE(&data_version_, data_version);
|
||||
}
|
||||
// 1.是否支持外部一致性
|
||||
is_external_consistent_ = transaction::ObTsMgr::get_instance().is_external_consistent(effective_tenant_id);
|
||||
|
@ -689,6 +689,7 @@ public:
|
||||
at_type_(ObAuditTrailType::NONE),
|
||||
sort_area_size_(128*1024*1024),
|
||||
hash_area_size_(128*1024*1024),
|
||||
data_version_(0),
|
||||
enable_query_response_time_stats_(false),
|
||||
enable_user_defined_rewrite_rules_(false),
|
||||
range_optimizer_max_mem_size_(128*1024*1024),
|
||||
@ -708,6 +709,7 @@ public:
|
||||
ObAuditTrailType get_at_type() const { return at_type_; }
|
||||
int64_t get_sort_area_size() const { return ATOMIC_LOAD(&sort_area_size_); }
|
||||
int64_t get_hash_area_size() const { return ATOMIC_LOAD(&hash_area_size_); }
|
||||
uint64_t get_data_version() const { return ATOMIC_LOAD(&data_version_); }
|
||||
bool enable_query_response_time_stats() const { return enable_query_response_time_stats_; }
|
||||
bool enable_udr() const { return ATOMIC_LOAD(&enable_user_defined_rewrite_rules_); }
|
||||
int64_t get_print_sample_ppm() const { return ATOMIC_LOAD(&print_sample_ppm_); }
|
||||
@ -728,6 +730,7 @@ public:
|
||||
ObAuditTrailType at_type_;
|
||||
int64_t sort_area_size_;
|
||||
int64_t hash_area_size_;
|
||||
uint64_t data_version_;
|
||||
bool enable_query_response_time_stats_;
|
||||
bool enable_user_defined_rewrite_rules_;
|
||||
int64_t range_optimizer_max_mem_size_;
|
||||
@ -1280,6 +1283,11 @@ public:
|
||||
cached_tenant_config_info_.refresh();
|
||||
return cached_tenant_config_info_.get_sort_area_size();
|
||||
}
|
||||
uint64_t get_data_version()
|
||||
{
|
||||
cached_tenant_config_info_.refresh();
|
||||
return cached_tenant_config_info_.get_data_version();
|
||||
}
|
||||
bool enable_query_response_time_stats()
|
||||
{
|
||||
cached_tenant_config_info_.refresh();
|
||||
|
@ -195,8 +195,10 @@ TO_STRING_KV(K(is_inited_), K(tenant_id_), KP(this));
|
||||
|
||||
private:
|
||||
int check_ls_status_(const share::ObLSID &ls_id, bool &leader);
|
||||
int init_tx_(ObTxDesc &tx, const uint32_t session_id);
|
||||
int reinit_tx_(ObTxDesc &tx, const uint32_t session_id);
|
||||
int init_tx_(ObTxDesc &tx,
|
||||
const uint32_t session_id,
|
||||
const uint64_t cluster_version);
|
||||
int reinit_tx_(ObTxDesc &tx, const uint32_t session_id, const uint64_t cluster_version);
|
||||
int start_tx_(ObTxDesc &tx);
|
||||
int abort_tx_(ObTxDesc &tx, const int cause, bool cleanup = true);
|
||||
void abort_tx__(ObTxDesc &tx, const bool cleanup);
|
||||
|
@ -60,7 +60,9 @@ using namespace share;
|
||||
|
||||
namespace transaction {
|
||||
|
||||
inline int ObTransService::init_tx_(ObTxDesc &tx, const uint32_t session_id)
|
||||
inline int ObTransService::init_tx_(ObTxDesc &tx,
|
||||
const uint32_t session_id,
|
||||
const uint64_t cluster_version)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
tx.tenant_id_ = tenant_id_;
|
||||
@ -71,7 +73,9 @@ inline int ObTransService::init_tx_(ObTxDesc &tx, const uint32_t session_id)
|
||||
tx.expire_ts_ = INT64_MAX;
|
||||
tx.op_sn_ = 1;
|
||||
tx.state_ = ObTxDesc::State::IDLE;
|
||||
if (OB_FAIL(GET_MIN_DATA_VERSION(tenant_id_, tx.cluster_version_))) {
|
||||
tx.cluster_version_ = cluster_version;
|
||||
// cluster_version is invalid, need to get it
|
||||
if (0 == cluster_version && OB_FAIL(GET_MIN_DATA_VERSION(tenant_id_, tx.cluster_version_))) {
|
||||
TRANS_LOG(WARN, "get min data version fail", K(ret), K(tx));
|
||||
} else if (tx.cluster_version_ >= DATA_VERSION_4_3_0_0) {
|
||||
tx.seq_base_ = common::ObSequence::get_max_seq_no() - 1;
|
||||
@ -79,13 +83,15 @@ inline int ObTransService::init_tx_(ObTxDesc &tx, const uint32_t session_id)
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTransService::acquire_tx(ObTxDesc *&tx, const uint32_t session_id)
|
||||
int ObTransService::acquire_tx(ObTxDesc *&tx,
|
||||
const uint32_t session_id,
|
||||
const uint64_t cluster_version)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_FAIL(tx_desc_mgr_.alloc(tx))) {
|
||||
TRANS_LOG(WARN, "alloc tx fail", K(ret));
|
||||
} else {
|
||||
ret = init_tx_(*tx, session_id);
|
||||
ret = init_tx_(*tx, session_id, cluster_version);
|
||||
}
|
||||
TRANS_LOG(TRACE, "acquire tx", KPC(tx), K(session_id));
|
||||
if (OB_SUCC(ret)) {
|
||||
@ -172,7 +178,7 @@ int ObTransService::release_tx(ObTxDesc &tx, const bool is_from_xa)
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTransService::reuse_tx(ObTxDesc &tx)
|
||||
int ObTransService::reuse_tx(ObTxDesc &tx, const uint64_t data_version)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int spin_cnt = 0;
|
||||
@ -211,7 +217,7 @@ int ObTransService::reuse_tx(ObTxDesc &tx)
|
||||
#endif
|
||||
}
|
||||
// it is safe to operate tx without lock when not shared
|
||||
ret = reinit_tx_(tx, tx.sess_id_);
|
||||
ret = reinit_tx_(tx, tx.sess_id_, data_version);
|
||||
}
|
||||
TRANS_LOG(TRACE, "reuse tx", K(ret), K(orig_tx_id), K(tx));
|
||||
ObTransTraceLog &tlog = tx.get_tlog();
|
||||
@ -225,10 +231,10 @@ int ObTransService::reuse_tx(ObTxDesc &tx)
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTransService::reinit_tx_(ObTxDesc &tx, const uint32_t session_id)
|
||||
int ObTransService::reinit_tx_(ObTxDesc &tx, const uint32_t session_id, const uint64_t cluster_version)
|
||||
{
|
||||
tx.reset();
|
||||
return init_tx_(tx, session_id);
|
||||
return init_tx_(tx, session_id, cluster_version);
|
||||
}
|
||||
|
||||
int ObTransService::stop_tx(ObTxDesc &tx)
|
||||
|
@ -10,7 +10,7 @@
|
||||
* See the Mulan PubL v2 for more details.
|
||||
*/
|
||||
|
||||
int acquire_tx(ObTxDesc *&tx, const uint32_t session_id = 0);
|
||||
int acquire_tx(ObTxDesc *&tx, const uint32_t session_id = 0, const uint64_t data_version = 0);
|
||||
|
||||
/**
|
||||
* start_tx - explicit start transaction
|
||||
@ -112,11 +112,12 @@ int release_tx(ObTxDesc &tx, const bool is_from_xa = false);
|
||||
* when txn end, in stead of release txn descriptor, reuse it for
|
||||
* better performance.
|
||||
*
|
||||
* @tx: the target transaction's descriptor
|
||||
* @tx: the target transaction's descriptor
|
||||
* @data_version: tx data_version
|
||||
*
|
||||
* Return: OB_SUCCESS -OK
|
||||
*/
|
||||
int reuse_tx(ObTxDesc &tx);
|
||||
int reuse_tx(ObTxDesc &tx, const uint64_t data_version);
|
||||
|
||||
/**
|
||||
* stop_tx - stop txn immediately (for admin reason)
|
||||
|
@ -395,7 +395,8 @@ int ObTransService::txn_free_route__update_static_state(const uint32_t session_i
|
||||
ObTxnFreeRouteCtx &ctx,
|
||||
const char* buf,
|
||||
const int64_t len,
|
||||
int64_t &pos)
|
||||
int64_t &pos,
|
||||
const uint64_t data_version)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
bool need_add_tx = false;
|
||||
@ -419,7 +420,7 @@ int ObTransService::txn_free_route__update_static_state(const uint32_t session_i
|
||||
TRANS_LOG(WARN, "handle tx exist fail", K(ret));
|
||||
} else if (OB_ISNULL(tx)) {
|
||||
audit_record.alloc_tx_ = true;
|
||||
if (OB_FAIL(acquire_tx(tx, session_id))) {
|
||||
if (OB_FAIL(acquire_tx(tx, session_id, data_version))) {
|
||||
// if acquire tx failed, it may retryable: alloc-memory failed
|
||||
TRANS_LOG(WARN, "acquire tx for decode failed", K(ret));
|
||||
} else { need_add_tx = true; }
|
||||
@ -435,7 +436,7 @@ int ObTransService::txn_free_route__update_static_state(const uint32_t session_i
|
||||
tx_desc_mgr_.remove(*tx);
|
||||
}
|
||||
// reset tx to cleanup for new txn
|
||||
reinit_tx_(*tx, session_id);
|
||||
reinit_tx_(*tx, session_id, tx->get_cluster_version());
|
||||
need_add_tx = true;
|
||||
} else {
|
||||
// update
|
||||
@ -516,7 +517,8 @@ int ObTransService::txn_free_route__update_dynamic_state(const uint32_t session_
|
||||
ObTxnFreeRouteCtx &ctx,
|
||||
const char* buf,
|
||||
const int64_t len,
|
||||
int64_t &pos)
|
||||
int64_t &pos,
|
||||
const uint64_t data_version)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObTxnFreeRouteAuditRecord &audit_record = ctx.audit_record_;
|
||||
@ -575,7 +577,8 @@ int ObTransService::txn_free_route__update_parts_state(const uint32_t session_id
|
||||
ObTxnFreeRouteCtx &ctx,
|
||||
const char* buf,
|
||||
const int64_t len,
|
||||
int64_t &pos)
|
||||
int64_t &pos,
|
||||
const uint64_t data_version)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObTxnFreeRouteAuditRecord &audit_record = ctx.audit_record_;
|
||||
@ -632,7 +635,8 @@ int ObTransService::txn_free_route__update_extra_state(const uint32_t session_id
|
||||
ObTxnFreeRouteCtx &ctx,
|
||||
const char* buf,
|
||||
const int64_t len,
|
||||
int64_t &pos)
|
||||
int64_t &pos,
|
||||
const uint64_t data_version)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t logic_clock = 0;
|
||||
@ -667,7 +671,7 @@ int ObTransService::txn_free_route__update_extra_state(const uint32_t session_id
|
||||
} else if (OB_FAIL(update_logic_clock_(logic_clock, NULL, false))) {
|
||||
TRANS_LOG(ERROR, "update logic clock fail", K(ret));
|
||||
}
|
||||
if (OB_SUCC(ret) && add_tx && OB_FAIL(acquire_tx(tx, session_id))) {
|
||||
if (OB_SUCC(ret) && add_tx && OB_FAIL(acquire_tx(tx, session_id, data_version))) {
|
||||
// only has savepoints or txn scope snapshot, txn not started
|
||||
// acquire tx to hold extra info
|
||||
TRANS_LOG(WARN, "acquire tx fail", K(ret));
|
||||
|
@ -11,7 +11,7 @@
|
||||
*/
|
||||
|
||||
#define DEF_FREE_ROUTE_API_(name) \
|
||||
int txn_free_route__update_##name##_state(const uint32_t session_id, ObTxDesc *&tx, ObTxnFreeRouteCtx &ctx, const char* buf, const int64_t len, int64_t &pos); \
|
||||
int txn_free_route__update_##name##_state(const uint32_t session_id, ObTxDesc *&tx, ObTxnFreeRouteCtx &ctx, const char* buf, const int64_t len, int64_t &pos, const uint64_t data_version); \
|
||||
int txn_free_route__serialize_##name##_state(const uint32_t session_id, ObTxDesc *tx, ObTxnFreeRouteCtx &ctx, char* buf, const int64_t len, int64_t &pos); \
|
||||
int64_t txn_free_route__get_##name##_state_serialize_size(ObTxDesc *tx, ObTxnFreeRouteCtx &ctx); \
|
||||
static int64_t txn_free_route__get_##name##_state_size(ObTxDesc *tx); \
|
||||
|
@ -244,7 +244,8 @@ int ObXAService::xa_start_for_tm(const int64_t flags,
|
||||
const uint32_t session_id,
|
||||
const ObTxParam &tx_param,
|
||||
ObTxDesc *&tx_desc,
|
||||
ObXATransID &xid)
|
||||
ObXATransID &xid,
|
||||
const uint64_t data_version)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
@ -258,7 +259,7 @@ int ObXAService::xa_start_for_tm(const int64_t flags,
|
||||
TRANS_LOG(WARN, "invalid flags for xa start", K(ret), K(xid), K(flags));
|
||||
} else {
|
||||
if (ObXAFlag::is_tmnoflags(flags, ObXAReqType::XA_START)) {
|
||||
if (OB_FAIL(xa_start_for_tm_(flags, timeout_seconds, session_id, tx_param, tx_desc, xid))) {
|
||||
if (OB_FAIL(xa_start_for_tm_(flags, timeout_seconds, session_id, tx_param, tx_desc, xid, data_version))) {
|
||||
TRANS_LOG(WARN, "xa start promotion failed", K(ret), K(flags), K(xid));
|
||||
}
|
||||
} else {
|
||||
@ -283,7 +284,8 @@ int ObXAService::xa_start_for_tm_(const int64_t flags,
|
||||
const uint32_t session_id,
|
||||
const ObTxParam &tx_param,
|
||||
ObTxDesc *&tx_desc,
|
||||
ObXATransID &xid)
|
||||
ObXATransID &xid,
|
||||
const uint64_t data_version)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
@ -306,7 +308,7 @@ int ObXAService::xa_start_for_tm_(const int64_t flags,
|
||||
}
|
||||
// the first xa start for xa trans with this xid
|
||||
// therefore tx_desc should be allocated
|
||||
if (OB_FAIL(MTL(ObTransService *)->acquire_tx(tx_desc, session_id))) {
|
||||
if (OB_FAIL(MTL(ObTransService *)->acquire_tx(tx_desc, session_id, data_version))) {
|
||||
TRANS_LOG(WARN, "fail acquire trans", K(ret), K(tx_param));
|
||||
} else if (OB_FAIL(MTL(ObTransService *)->start_tx(*tx_desc, tx_param))) {
|
||||
TRANS_LOG(WARN, "fail start trans", K(ret), KPC(tx_desc));
|
||||
|
@ -997,7 +997,8 @@ int ObXAService::xa_start(const ObXATransID &xid,
|
||||
const int64_t timeout_seconds,
|
||||
const uint32_t session_id,
|
||||
const ObTxParam &tx_param,
|
||||
ObTxDesc *&tx_desc)
|
||||
ObTxDesc *&tx_desc,
|
||||
const uint64_t data_version)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
@ -1010,7 +1011,7 @@ int ObXAService::xa_start(const ObXATransID &xid,
|
||||
ret = OB_TRANS_XA_INVAL;
|
||||
TRANS_LOG(WARN, "invalid flags for xa start", K(ret), K(xid), K(flags));
|
||||
} else if (ObXAFlag::is_tmnoflags(flags, ObXAReqType::XA_START)) {
|
||||
if (OB_FAIL(xa_start_(xid, flags, timeout_seconds, session_id, tx_param, tx_desc))) {
|
||||
if (OB_FAIL(xa_start_(xid, flags, timeout_seconds, session_id, tx_param, tx_desc, data_version))) {
|
||||
TRANS_LOG(WARN, "xa start failed", K(ret), K(flags), K(xid));
|
||||
}
|
||||
} else if (ObXAFlag::is_tmjoin(flags) || ObXAFlag::is_tmresume(flags)) {
|
||||
@ -1043,7 +1044,8 @@ int ObXAService::xa_start_(const ObXATransID &xid,
|
||||
const int64_t timeout_seconds,
|
||||
const uint32_t session_id,
|
||||
const ObTxParam &tx_param,
|
||||
ObTxDesc *&tx_desc)
|
||||
ObTxDesc *&tx_desc,
|
||||
const uint64_t data_version)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
@ -1105,7 +1107,7 @@ int ObXAService::xa_start_(const ObXATransID &xid,
|
||||
// the first xa start for xa trans with this xid
|
||||
// therefore tx_desc should be allocated
|
||||
// this code may be moved to pl sql level
|
||||
if (OB_FAIL(MTL(ObTransService *)->acquire_tx(tx_desc, session_id))) {
|
||||
if (OB_FAIL(MTL(ObTransService *)->acquire_tx(tx_desc, session_id, data_version))) {
|
||||
TRANS_LOG(WARN, "fail acquire trans", K(ret), K(tx_param));
|
||||
} else if (OB_FAIL(MTL(ObTransService *)->start_tx(*tx_desc, tx_param, trans_id))) {
|
||||
TRANS_LOG(WARN, "fail start trans", K(ret), KPC(tx_desc));
|
||||
|
@ -106,7 +106,8 @@ public:
|
||||
const int64_t timeout_seconds,
|
||||
const uint32_t session_id,
|
||||
const ObTxParam &tx_param,
|
||||
ObTxDesc *&tx_desc);
|
||||
ObTxDesc *&tx_desc,
|
||||
const uint64_t data_version);
|
||||
int xa_end(const ObXATransID &xid,
|
||||
const int64_t flags,
|
||||
ObTxDesc *&tx_desc);
|
||||
@ -143,7 +144,8 @@ public:
|
||||
const uint32_t session_id,
|
||||
const ObTxParam &tx_param,
|
||||
ObTxDesc *&tx_desc,
|
||||
ObXATransID &xid);
|
||||
ObXATransID &xid,
|
||||
const uint64_t data_version);
|
||||
int xa_start_for_dblink_client(const common::sqlclient::DblinkDriverProto dblink_type,
|
||||
common::sqlclient::ObISQLConnection *dblink_conn,
|
||||
ObTxDesc *&tx_desc,
|
||||
@ -247,7 +249,8 @@ private:
|
||||
const int64_t timeout_seconds,
|
||||
const uint32_t session_id,
|
||||
const ObTxParam &tx_param,
|
||||
ObTxDesc *&tx_desc);
|
||||
ObTxDesc *&tx_desc,
|
||||
const uint64_t data_version);
|
||||
int xa_start_join_(const ObXATransID &xid,
|
||||
const int64_t flags,
|
||||
const int64_t timeout_seconds,
|
||||
@ -318,7 +321,8 @@ private:
|
||||
const uint32_t session_id,
|
||||
const ObTxParam &tx_param,
|
||||
ObTxDesc *&tx_desc,
|
||||
ObXATransID &xid);
|
||||
ObXATransID &xid,
|
||||
const uint64_t data_version);
|
||||
public:
|
||||
int xa_scheduler_hb_req();
|
||||
int gc_invalid_xa_record(const uint64_t tenant_id);
|
||||
|
@ -471,7 +471,7 @@ int MockObServer::handle(ObReq &req, ObResp &resp)
|
||||
#define TX_STATE_UPDATE__(T, tn) \
|
||||
case SyncTxState::T: \
|
||||
if (OB_SUCC(ret) && \
|
||||
OB_FAIL(tx_node_.txn_free_route__update_##tn##_state(session_.get_sessid(), tx_desc, free_route_ctx, buf, len, pos))) { \
|
||||
OB_FAIL(tx_node_.txn_free_route__update_##tn##_state(session_.get_sessid(), tx_desc, free_route_ctx, buf, len, pos, session_.get_data_version()))) { \
|
||||
TRANS_LOG(ERROR, "update txn state fail", K(ret), "type", #T); \
|
||||
} else if (pos != len) { \
|
||||
TRANS_LOG(WARN, "[maybe] pos != len, consume buffer incomplete", K(ret), K(pos), K(len), "state_type", #T); \
|
||||
|
Loading…
x
Reference in New Issue
Block a user