[4.1] support transaction isolation level for dblink trans
This commit is contained in:
parent
5f602ed6c4
commit
305ad6341a
@ -12,6 +12,7 @@
|
||||
#include "share/ob_define.h"
|
||||
#include "lib/mysqlclient/ob_mysql_proxy.h"
|
||||
#include "observer/ob_server_struct.h"
|
||||
#include "lib/mysqlclient/ob_mysql_connection.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
@ -68,7 +69,7 @@ int ObDBLinkClient::init(const uint32_t index,
|
||||
// 1. if START, return success directly
|
||||
// 2. if IDLE, execute xa start
|
||||
// @param[in] xid
|
||||
int ObDBLinkClient::rm_xa_start(const ObXATransID &xid)
|
||||
int ObDBLinkClient::rm_xa_start(const ObXATransID &xid, const ObTxIsolationLevel isolation)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObSpinLockGuard guard(lock_);
|
||||
@ -76,9 +77,9 @@ int ObDBLinkClient::rm_xa_start(const ObXATransID &xid)
|
||||
if (!is_inited_) {
|
||||
ret = OB_NOT_INIT;
|
||||
TRANS_LOG(WARN, "dblink client is not inited", K(ret), K(xid), K(*this));
|
||||
} else if (!xid.is_valid() || xid.empty()) {
|
||||
} else if (!xid.is_valid() || xid.empty() || ObTxIsolationLevel::INVALID == isolation) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
TRANS_LOG(WARN, "invalid argument", K(ret), K(xid));
|
||||
TRANS_LOG(WARN, "invalid argument", K(ret), K(xid), K(isolation));
|
||||
} else if (ObDBLinkClientState::IDLE != state_) {
|
||||
if (ObDBLinkClientState::START == state_
|
||||
&& xid.all_equal_to(xid_)) {
|
||||
@ -89,18 +90,22 @@ int ObDBLinkClient::rm_xa_start(const ObXATransID &xid)
|
||||
}
|
||||
// TODO, check connection
|
||||
} else {
|
||||
if (OB_FAIL(init_query_impl_())) {
|
||||
TRANS_LOG(WARN, "fail to init query impl", K(ret), K(xid), K(*this));
|
||||
int64_t flag = ObXAFlag::TMNOFLAGS;
|
||||
if (ObTxIsolationLevel::RR == isolation || ObTxIsolationLevel::SERIAL == isolation) {
|
||||
flag = ObXAFlag::TMSERIALIZABLE;
|
||||
}
|
||||
if (OB_FAIL(init_query_impl_(isolation))) {
|
||||
TRANS_LOG(WARN, "fail to init query impl", K(ret), K(xid), K(isolation), K(*this));
|
||||
} else if (NULL == impl_) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
TRANS_LOG(WARN, "unexpected query impl", K(ret), K(xid), K(*this));
|
||||
} else if (OB_FAIL(impl_->xa_start(xid, ObXAFlag::TMNOFLAGS))) {
|
||||
TRANS_LOG(WARN, "fail to execute query", K(ret), K(xid), K(*this));
|
||||
} else if (OB_FAIL(impl_->xa_start(xid, flag))) {
|
||||
TRANS_LOG(WARN, "fail to execute query", K(ret), K(xid), K(flag), K(*this));
|
||||
} else {
|
||||
xid_ = xid;
|
||||
state_ = ObDBLinkClientState::START;
|
||||
}
|
||||
TRANS_LOG(INFO, "rm xa start for dblink", K(ret), K(xid));
|
||||
TRANS_LOG(INFO, "rm xa start for dblink", K(ret), K(xid), K(isolation), K(flag));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -318,7 +323,7 @@ bool ObDBLinkClient::equal(ObISQLConnection *dblink_conn)
|
||||
return dblink_conn_ == dblink_conn;
|
||||
}
|
||||
|
||||
int ObDBLinkClient::init_query_impl_()
|
||||
int ObDBLinkClient::init_query_impl_(const ObTxIsolationLevel isolation)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (NULL == impl_) {
|
||||
@ -339,8 +344,16 @@ int ObDBLinkClient::init_query_impl_()
|
||||
// set tx variables
|
||||
static const int64_t MIN_TIMEOUT_US = 20 * 1000 * 1000; // 20s
|
||||
const int64_t timeout_us = tx_timeout_us_ + MIN_TIMEOUT_US;
|
||||
if (OB_FAIL(dblink_conn_->set_session_variable("ob_trx_timeout", timeout_us))) {
|
||||
ObMySQLConnection *mysql_conn = dynamic_cast<ObMySQLConnection*>(dblink_conn_);
|
||||
const ObString isolation_str = get_tx_isolation_str(isolation);
|
||||
if (nullptr == mysql_conn) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
TRANS_LOG(WARN, "unexpected mysql connection", K(ret), K(*this));
|
||||
} else if (OB_FAIL(mysql_conn->set_session_variable("ob_trx_timeout", timeout_us))) {
|
||||
TRANS_LOG(WARN, "fail to set transaction timeout", K(ret), K(timeout_us), K(*this));
|
||||
} else if (OB_FAIL(mysql_conn->set_session_variable("tx_isolation", isolation_str))) {
|
||||
TRANS_LOG(WARN, "fail to set transaction isolation level in session", K(ret),
|
||||
K(timeout_us), K(*this));
|
||||
}
|
||||
}
|
||||
if (OB_SUCCESS != ret) {
|
||||
|
@ -64,7 +64,7 @@ public:
|
||||
const int64_t tx_timeout_us,
|
||||
common::sqlclient::ObISQLConnection *dblink_conn);
|
||||
public:
|
||||
int rm_xa_start(const transaction::ObXATransID &xid);
|
||||
int rm_xa_start(const transaction::ObXATransID &xid, const ObTxIsolationLevel isolation);
|
||||
int rm_xa_end();
|
||||
int rm_xa_prepare();
|
||||
int rm_xa_commit();
|
||||
@ -79,7 +79,7 @@ public:
|
||||
static bool is_valid_dblink_type(const common::sqlclient::DblinkDriverProto dblink_type);
|
||||
private:
|
||||
int rm_xa_end_();
|
||||
int init_query_impl_();
|
||||
int init_query_impl_(const ObTxIsolationLevel isolation);
|
||||
public:
|
||||
TO_STRING_KV(KP(this), K_(is_inited), K_(index), K_(xid), K_(state),
|
||||
K_(dblink_type), KP_(dblink_conn), K_(tx_timeout_us));
|
||||
|
@ -53,6 +53,27 @@ ObTxIsolationLevel tx_isolation_from_str(const ObString &s)
|
||||
return r;
|
||||
}
|
||||
|
||||
const ObString &get_tx_isolation_str(const ObTxIsolationLevel isolation)
|
||||
{
|
||||
static const ObString EMPTY_STR;
|
||||
static const ObString LEVEL_STR[4] =
|
||||
{
|
||||
"READ-UNCOMMITTED",
|
||||
"READ-COMMITTED",
|
||||
"REPEATABLE-READ",
|
||||
"SERIALIZABLE"
|
||||
};
|
||||
const ObString *isolation_str = &EMPTY_STR;
|
||||
switch (isolation) {
|
||||
case ObTxIsolationLevel::RU: isolation_str = &LEVEL_STR[0]; break;
|
||||
case ObTxIsolationLevel::RC: isolation_str = &LEVEL_STR[1]; break;
|
||||
case ObTxIsolationLevel::RR: isolation_str = &LEVEL_STR[2]; break;
|
||||
case ObTxIsolationLevel::SERIAL: isolation_str = &LEVEL_STR[3]; break;
|
||||
default: break;
|
||||
}
|
||||
return *isolation_str;
|
||||
}
|
||||
|
||||
ObTxSavePoint::ObTxSavePoint()
|
||||
: type_(T::INVL), scn_(0), session_id_(0), user_create_(false), name_() {}
|
||||
|
||||
|
@ -150,6 +150,8 @@ enum class ObTxIsolationLevel
|
||||
|
||||
extern ObTxIsolationLevel tx_isolation_from_str(const ObString &s);
|
||||
|
||||
extern const ObString &get_tx_isolation_str(const ObTxIsolationLevel isolation);
|
||||
|
||||
enum class ObTxAccessMode
|
||||
{
|
||||
INVL = -1, RW = 0, RD_ONLY = 1
|
||||
@ -655,6 +657,7 @@ LST_DO(DEF_FREE_ROUTE_DECODE, (;), static, dynamic, parts, extra);
|
||||
bool is_dynamic_changed() { return state_ > State::IDLE && state_change_flags_.DYNAMIC_CHANGED_; }
|
||||
bool is_parts_changed() { return state_change_flags_.PARTS_CHANGED_; };
|
||||
bool is_extra_changed() { return state_change_flags_.EXTRA_CHANGED_; };
|
||||
void set_explicit() { flags_.EXPLICIT_ = true; }
|
||||
};
|
||||
|
||||
// Is used to store and travserse all TxScheduler's Stat information;
|
||||
|
@ -1055,6 +1055,7 @@ int ObXACtx::xa_start_for_dblink(const ObXATransID &xid,
|
||||
} else {
|
||||
// set global trans type to dblink trans
|
||||
tx_desc->set_global_tx_type(ObGlobalTxType::DBLINK_TRANS);
|
||||
tx_desc->set_explicit();
|
||||
}
|
||||
|
||||
TRANS_LOG(INFO, "xa start for dblink", K(ret), K(xid), K(flags), K(need_promote), K(*this));
|
||||
|
@ -433,7 +433,7 @@ int ObXAService::xa_start_for_dblink_client(const DblinkDriverProto dblink_type,
|
||||
} else if (OB_FAIL(ObXAService::generate_xid_with_new_bqual(xid,
|
||||
client->get_index(), remote_xid))) {
|
||||
TRANS_LOG(WARN, "fail to generate xid", K(ret), K(xid), K(tx_id), K(remote_xid));
|
||||
} else if (OB_FAIL(client->rm_xa_start(remote_xid))) {
|
||||
} else if (OB_FAIL(client->rm_xa_start(remote_xid, tx_desc->get_isolation_level()))) {
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
if (OB_TMP_FAIL(xa_ctx->remove_dblink_client(client))) {
|
||||
ret = tmp_ret;
|
||||
|
@ -54,8 +54,10 @@ void ObXAQueryObImpl::reset()
|
||||
DBMS_XA.TMNOFLAGS) as result \
|
||||
from dual"
|
||||
|
||||
// NOTE that the input parameter flags is not used
|
||||
int ObXAQueryObImpl::xa_start(const ObXATransID &xid, const int64_t flags)
|
||||
{
|
||||
UNUSED(flags);
|
||||
int ret = OB_SUCCESS;
|
||||
if (!is_inited_) {
|
||||
ret = OB_NOT_INIT;
|
||||
@ -209,13 +211,16 @@ int ObXAQueryOraImpl::init(ObISQLConnection *conn)
|
||||
return ret;
|
||||
}
|
||||
|
||||
#define OCI_DEFAULT 0x00000000
|
||||
#define OCI_TRANS_NEW 0x00000001
|
||||
#define OCI_TRANS_JOIN 0x00000002
|
||||
#define OCI_TRANS_RESUME 0x00000004
|
||||
#define OCI_TRANS_LOOSE 0x00010000
|
||||
#define OCI_TRANS_TIGHT 0x00020000
|
||||
#define OCI_TRANS_TWOPHASE 0x01000000
|
||||
#define OCI_DEFAULT 0x00000000
|
||||
#define OCI_TRANS_NEW 0x00000001
|
||||
#define OCI_TRANS_JOIN 0x00000002
|
||||
#define OCI_TRANS_RESUME 0x00000004
|
||||
#define OCI_TRANS_READONLY 0x00000100
|
||||
#define OCI_TRANS_READWRITE 0x00000200
|
||||
#define OCI_TRANS_SERIALIZABLE 0x00000400
|
||||
#define OCI_TRANS_LOOSE 0x00010000
|
||||
#define OCI_TRANS_TIGHT 0x00020000
|
||||
#define OCI_TRANS_TWOPHASE 0x01000000
|
||||
|
||||
int ObXAQueryOraImpl::xa_start(const ObXATransID &xid, const int64_t flags)
|
||||
{
|
||||
@ -323,7 +328,7 @@ int ObXAQueryOraImpl::xa_rollback(const ObXATransID &xid)
|
||||
}
|
||||
|
||||
// NOTE that only support
|
||||
// xa start, TMNOFLAGS
|
||||
// xa start, TMNOFLAGS, TMSERIALIZABLE
|
||||
// xa end, TMSUCCESS
|
||||
// xa commit, TMNOFLAGS
|
||||
int ObXAQueryOraImpl::convert_flag_(const int64_t xa_flag,
|
||||
@ -350,6 +355,14 @@ int ObXAQueryOraImpl::convert_flag_(const int64_t xa_flag,
|
||||
}
|
||||
break;
|
||||
}
|
||||
case ObXAFlag::TMSERIALIZABLE: {
|
||||
if (ObXAReqType::XA_START == xa_req_type) {
|
||||
oci_flag = OCI_TRANS_SERIALIZABLE;
|
||||
} else {
|
||||
ret = OB_NOT_SUPPORTED;
|
||||
}
|
||||
break;
|
||||
}
|
||||
default:
|
||||
ret = OB_NOT_SUPPORTED;
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user