[master] fix ut: test_tx
This commit is contained in:
@ -74,27 +74,12 @@ TEST_F(ObTestTx, basic)
|
|||||||
GCONF._ob_trans_rpc_timeout = 50;
|
GCONF._ob_trans_rpc_timeout = 50;
|
||||||
ObTxNode::reset_localtion_adapter();
|
ObTxNode::reset_localtion_adapter();
|
||||||
|
|
||||||
auto n1 = new ObTxNode(1, ObAddr(ObAddr::VER::IPV4, "127.0.0.1", 8888), bus_);
|
START_TWO_TX_NODE(n1, n2);
|
||||||
auto n2 = new ObTxNode(2, ObAddr(ObAddr::VER::IPV4, "127.0.0.2", 8888), bus_);
|
PREPARE_TX(n1, tx);
|
||||||
|
PREPARE_TX_PARAM(tx_param);
|
||||||
DEFER(delete(n1));
|
GET_READ_SNAPSHOT(n1, tx, tx_param, snapshot);
|
||||||
DEFER(delete(n2));
|
CREATE_IMPLICIT_SAVEPOINT(n1, tx, tx_param, sp0);
|
||||||
|
CREATE_IMPLICIT_SAVEPOINT(n1, tx, tx_param, sp1);
|
||||||
ASSERT_EQ(OB_SUCCESS, n1->start());
|
|
||||||
ASSERT_EQ(OB_SUCCESS, n2->start());
|
|
||||||
ObTxDesc *tx_ptr = NULL;
|
|
||||||
ASSERT_EQ(OB_SUCCESS, n1->acquire_tx(tx_ptr));
|
|
||||||
ObTxDesc &tx = *tx_ptr;
|
|
||||||
ObTxParam tx_param;
|
|
||||||
tx_param.timeout_us_ = 5000000;
|
|
||||||
tx_param.access_mode_ = ObTxAccessMode::RW;
|
|
||||||
tx_param.isolation_ = ObTxIsolationLevel::RC;
|
|
||||||
tx_param.cluster_id_ = 100;
|
|
||||||
int64_t sp0 = 0, sp1 = 0;
|
|
||||||
ObTxReadSnapshot snapshot;
|
|
||||||
ASSERT_EQ(OB_SUCCESS, n1->get_read_snapshot(tx, tx_param.isolation_, n1->ts_after_ms(100), snapshot));
|
|
||||||
ASSERT_EQ(OB_SUCCESS, n1->create_implicit_savepoint(tx, tx_param, sp0));
|
|
||||||
ASSERT_EQ(OB_SUCCESS, n1->create_implicit_savepoint(tx, tx_param, sp1));
|
|
||||||
ASSERT_EQ(OB_SUCCESS, n1->write(tx, snapshot, 100, 112));
|
ASSERT_EQ(OB_SUCCESS, n1->write(tx, snapshot, 100, 112));
|
||||||
ASSERT_EQ(OB_SUCCESS, n2->write(tx, snapshot, 101, 113));
|
ASSERT_EQ(OB_SUCCESS, n2->write(tx, snapshot, 101, 113));
|
||||||
int64_t val1 = 0, val2 = 0;
|
int64_t val1 = 0, val2 = 0;
|
||||||
@ -103,7 +88,7 @@ TEST_F(ObTestTx, basic)
|
|||||||
ASSERT_EQ(112, val1);
|
ASSERT_EQ(112, val1);
|
||||||
ASSERT_EQ(113, val2);
|
ASSERT_EQ(113, val2);
|
||||||
// rollback to savepoint
|
// rollback to savepoint
|
||||||
ASSERT_EQ(OB_SUCCESS, n1->rollback_to_implicit_savepoint(tx, sp1, n1->ts_after_ms(1000), nullptr));
|
ROLLBACK_TO_IMPLICIT_SAVEPOINT(n1, tx, sp1, 1000 * 1000);
|
||||||
ASSERT_EQ(OB_ENTRY_NOT_EXIST, n1->read(tx, 100, val1));
|
ASSERT_EQ(OB_ENTRY_NOT_EXIST, n1->read(tx, 100, val1));
|
||||||
ASSERT_EQ(OB_ENTRY_NOT_EXIST, n2->read(tx, 101, val2));
|
ASSERT_EQ(OB_ENTRY_NOT_EXIST, n2->read(tx, 101, val2));
|
||||||
// write after rollback
|
// write after rollback
|
||||||
@ -114,9 +99,7 @@ TEST_F(ObTestTx, basic)
|
|||||||
ASSERT_EQ(OB_SUCCESS, n2->read(tx, 101, val2));
|
ASSERT_EQ(OB_SUCCESS, n2->read(tx, 101, val2));
|
||||||
ASSERT_EQ(114, val1);
|
ASSERT_EQ(114, val1);
|
||||||
ASSERT_EQ(115, val2);
|
ASSERT_EQ(115, val2);
|
||||||
ASSERT_EQ(OB_SUCCESS, n1->commit_tx(tx, n1->ts_after_ms(500)));
|
COMMIT_TX(n1, tx, 500 * 1000);
|
||||||
|
|
||||||
ASSERT_EQ(OB_SUCCESS, n1->release_tx(tx));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(ObTestTx, start_trans_expired)
|
TEST_F(ObTestTx, start_trans_expired)
|
||||||
|
|||||||
@ -36,6 +36,9 @@
|
|||||||
tx_param.access_mode_ = ObTxAccessMode::RW; \
|
tx_param.access_mode_ = ObTxAccessMode::RW; \
|
||||||
tx_param.isolation_ = ObTxIsolationLevel::RC; \
|
tx_param.isolation_ = ObTxIsolationLevel::RC; \
|
||||||
tx_param.cluster_id_ = 100;
|
tx_param.cluster_id_ = 100;
|
||||||
|
#define GET_READ_SNAPSHOT(n1, tx, tx_param, snapshot) \
|
||||||
|
ObTxReadSnapshot snapshot; \
|
||||||
|
ASSERT_EQ(OB_SUCCESS, n1->get_read_snapshot(tx, tx_param.isolation_, n1->ts_after_ms(100), snapshot));
|
||||||
|
|
||||||
#define CREATE_IMPLICIT_SAVEPOINT(n1, tx, tx_param, sp) \
|
#define CREATE_IMPLICIT_SAVEPOINT(n1, tx, tx_param, sp) \
|
||||||
int64_t sp = 0; \
|
int64_t sp = 0; \
|
||||||
@ -73,3 +76,6 @@
|
|||||||
|
|
||||||
#define ROLLBACK_TX(n1, tx) \
|
#define ROLLBACK_TX(n1, tx) \
|
||||||
n1->rollback_tx(tx);
|
n1->rollback_tx(tx);
|
||||||
|
|
||||||
|
#define COMMIT_TX(n1, tx, timeout_us) \
|
||||||
|
n1->commit_tx(tx, n1->ts_after_us(timeout_us));
|
||||||
|
|||||||
@ -477,9 +477,10 @@ int ObTxNode::read(ObTxDesc &tx, const int64_t key, int64_t &value, const ObTxIs
|
|||||||
ObTxReadSnapshot snapshot;
|
ObTxReadSnapshot snapshot;
|
||||||
OZ(get_read_snapshot(tx,
|
OZ(get_read_snapshot(tx,
|
||||||
iso,
|
iso,
|
||||||
ts_after_ms(5),
|
ts_after_ms(50),
|
||||||
snapshot));
|
snapshot));
|
||||||
return read(snapshot, key, value);
|
OZ(read(snapshot, key, value));
|
||||||
|
return ret;
|
||||||
}
|
}
|
||||||
int ObTxNode::read(const ObTxReadSnapshot &snapshot,
|
int ObTxNode::read(const ObTxReadSnapshot &snapshot,
|
||||||
const int64_t key,
|
const int64_t key,
|
||||||
@ -567,7 +568,7 @@ int ObTxNode::write(ObTxDesc &tx, const int64_t key, const int64_t value)
|
|||||||
ObTxReadSnapshot snapshot;
|
ObTxReadSnapshot snapshot;
|
||||||
OZ(get_read_snapshot(tx,
|
OZ(get_read_snapshot(tx,
|
||||||
tx.isolation_,
|
tx.isolation_,
|
||||||
ts_after_ms(5),
|
ts_after_ms(50),
|
||||||
snapshot));
|
snapshot));
|
||||||
OZ(write(tx, snapshot, key, value));
|
OZ(write(tx, snapshot, key, value));
|
||||||
return ret;
|
return ret;
|
||||||
|
|||||||
Reference in New Issue
Block a user