[FEAT MERGE] Optimize dml performance in multi-local index scenarios

Co-authored-by: Handora <qcdsr970209@gmail.com>
Co-authored-by: Naynahs <cfzy002@126.com>
Co-authored-by: ZenoWang <wzybuaasoft@163.com>
This commit is contained in:
windye 2024-04-16 15:27:19 +00:00 committed by ob-robot
parent 5c28cef93c
commit 7aca4ef065
59 changed files with 696875 additions and 299635 deletions

File diff suppressed because it is too large Load Diff

View File

@ -93,6 +93,7 @@
#include "observer/table/ob_table_session_pool.h"
#include "share/index_usage/ob_index_usage_info_mgr.h"
#include "storage/tenant_snapshot/ob_tenant_snapshot_service.h"
#include "storage/memtable/ob_lock_wait_mgr.h"
namespace oceanbase
{
@ -736,6 +737,8 @@ int MockTenantModuleEnv::init()
MTL_BIND2(mtl_new_default, ObIndexUsageInfoMgr::mtl_init, mtl_start_default, mtl_stop_default, mtl_wait_default, mtl_destroy_default);
MTL_BIND2(mtl_new_default, storage::ObTabletMemtableMgrPool::mtl_init, nullptr, nullptr, nullptr, mtl_destroy_default);
MTL_BIND2(mtl_new_default, ObTenantSnapshotService::mtl_init, mtl_start_default, mtl_stop_default, nullptr, mtl_destroy_default);
MTL_BIND2(mtl_new_default, ObOptStatMonitorManager::mtl_init, nullptr, nullptr, nullptr, mtl_destroy_default);
MTL_BIND2(mtl_new_default, memtable::ObLockWaitMgr::mtl_init, nullptr, nullptr, nullptr, mtl_destroy_default);
MTL_BIND2(mtl_new_default, ObGlobalIteratorPool::mtl_init, nullptr, nullptr, nullptr, ObGlobalIteratorPool::mtl_destroy);
}
if (OB_FAIL(ret)) {

View File

@ -490,7 +490,7 @@ public:
if (OB_FAIL(context.init(query_flag, *wtx, allocator_, trans_version_range))) {
TRANS_LOG(WARN, "Fail to init access context", K(ret));
}
ret = memtable->set(iter_param_, context, columns_, write_row, encrypt_meta_);
ret = memtable->set(iter_param_, context, columns_, write_row, encrypt_meta_, false);
if (ret == -5024) {
TRANS_LOG(ERROR, "nima", K(ret), K(write_row));
}
@ -3404,14 +3404,16 @@ TEST_F(TestMemtableV2, test_seq_set_violation)
context,
columns_,
write_row,
encrypt_meta_)));
encrypt_meta_,
false)));
start_pdml_stmt(wtx, scn_3000, read_seq_no, 1000000000/*expire_time*/);
EXPECT_EQ(OB_ERR_PRIMARY_KEY_DUPLICATE, (ret = memtable->set(iter_param_,
context,
columns_,
write_row,
encrypt_meta_)));
encrypt_meta_,
false)));
memtable->destroy();
}

View File

@ -124,6 +124,7 @@ void TestTableScanPureDataTable::insert_data_to_tablet(MockObAccessService *acce
ASSERT_EQ(OB_SUCCESS, tx_service->get_read_snapshot(*tx_desc, isolation, expire_ts, read_snapshot));
// 4. storage dml
ObStoreCtxGuard store_ctx_guard;
ObDMLBaseParam dml_param;
dml_param.timeout_ = ObTimeUtility::current_time() + TestDmlCommon::TX_EXPIRE_TIME_US;
dml_param.is_total_quantity_log_ = false;
@ -133,6 +134,7 @@ void TestTableScanPureDataTable::insert_data_to_tablet(MockObAccessService *acce
dml_param.tenant_schema_version_ = share::OB_CORE_SCHEMA_VERSION + 1;
dml_param.encrypt_meta_ = &dml_param.encrypt_meta_legacy_;
dml_param.snapshot_ = read_snapshot;
dml_param.store_ctx_guard_ = &store_ctx_guard;
ObArenaAllocator allocator;
share::schema::ObTableDMLParam table_dml_param(allocator);
@ -144,10 +146,16 @@ void TestTableScanPureDataTable::insert_data_to_tablet(MockObAccessService *acce
ASSERT_EQ(OB_SUCCESS, table_dml_param.convert(&table_schema, 1, column_ids));
dml_param.table_param_ = &table_dml_param;
ASSERT_EQ(OB_SUCCESS, access_service->get_write_store_ctx_guard(ls_id_,
dml_param.timeout_,
*tx_desc,
read_snapshot,
0,/*branch_id*/
store_ctx_guard));
int64_t affected_rows = 0;
ASSERT_EQ(OB_SUCCESS, access_service->insert_rows(ls_id_, tablet_id_,
*tx_desc, dml_param, column_ids, &mock_iter, affected_rows));
store_ctx_guard.reset();
ASSERT_EQ(12, affected_rows);
@ -215,7 +223,7 @@ void TestTableScanPureDataTable::table_scan(
if (OB_SUCCESS == ret) {
++cnt;
}
LOG_INFO("table scan row", KPC(row));
LOG_INFO("table scan row", KPC(row), K(ret));
}
ASSERT_EQ(12, cnt);

View File

@ -174,15 +174,23 @@ void TestTrans::insert_rows(ObLSID &ls_id, ObTabletID &tablet_id, ObTxDesc &tx_d
ObSEArray<uint64_t, 1> column_ids;
column_ids.push_back(16); // pk
ASSERT_EQ(OB_SUCCESS, table_dml_param.convert(&table_schema_, 1000, column_ids));
ObStoreCtxGuard store_ctx_guard;
ObDMLBaseParam dml_param;
dml_param.timeout_ = ObTimeUtility::current_time() + 100000000;
dml_param.schema_version_ = 1000;
dml_param.table_param_ = &table_dml_param;
dml_param.snapshot_ = snapshot;
dml_param.store_ctx_guard_ = &store_ctx_guard;
auto das = MTL(ObAccessService*);
auto as = MTL(ObAccessService*);
LOG_INFO("storage access by dml");
ASSERT_EQ(OB_SUCCESS, das->insert_rows(ls_id,
ASSERT_EQ(OB_SUCCESS, as->get_write_store_ctx_guard(ls_id,
dml_param.timeout_,
tx_desc,
snapshot,
0,/*branch_id*/
store_ctx_guard));
ASSERT_EQ(OB_SUCCESS, as->insert_rows(ls_id,
tablet_id,
tx_desc,
dml_param,

View File

@ -70,8 +70,14 @@ int ObLSTabletService::insert_tablet_rows(
// Check write conflict in memtable + sstable.
// Check uniqueness constraint in sstable only.
if (OB_SUCC(ret)) {
if (OB_FAIL(tablet_handle.get_obj()->insert_rows(table, run_ctx.store_ctx_, rows, rows_info,
check_exists, *run_ctx.col_descs_, row_count, run_ctx.dml_param_.encrypt_meta_))) {
if (OB_FAIL(tablet_handle.get_obj()->insert_rows(table,
run_ctx.store_ctx_,
rows,
rows_info,
check_exists,
*run_ctx.col_descs_,
row_count,
run_ctx.dml_param_.encrypt_meta_))) {
if (OB_ERR_PRIMARY_KEY_DUPLICATE == ret) {
blocksstable::ObDatumRowkey &duplicate_rowkey = rows_info.get_conflict_rowkey();
TRANS_LOG(WARN, "Rowkey already exist", K(ret), K(table), K(duplicate_rowkey),

File diff suppressed because it is too large Load Diff

View File

@ -70,6 +70,7 @@ public:
OB_INLINE bool is_unique_index() const { return ObTableSchema::is_unique_index(index_type_); }
OB_INLINE bool is_domain_index() const { return ObTableSchema::is_domain_index(index_type_); }
OB_INLINE bool is_spatial_index() const { return ObTableSchema::is_spatial_index(index_type_); }
OB_INLINE bool is_index_local_storage() const { return share::schema::is_index_local_storage(index_type_); }
int is_rowkey_column(const uint64_t column_id, bool &is_rowkey) const;
int is_column_nullable_for_write(const uint64_t column_id, bool &is_nullable_for_write) const;

View File

@ -1031,7 +1031,8 @@ int ObTableSqlService::insert_single_column(
}
if (OB_SUCC(ret)) {
if (new_column_schema.is_autoincrement()) {
if (OB_FAIL(add_sequence(new_table_schema.get_tenant_id(),
if (OB_FAIL(add_sequence(sql_client,
new_table_schema.get_tenant_id(),
new_table_schema.get_table_id(),
new_column_schema.get_column_id(),
new_table_schema.get_auto_increment(),
@ -1092,7 +1093,8 @@ int ObTableSqlService::update_single_column(
if (new_column_schema.is_autoincrement()) {
if (origin_table_schema.get_autoinc_column_id() == 0 &&
new_table_schema.get_autoinc_column_id() == new_column_schema.get_column_id()) {
if (OB_FAIL(add_sequence(tenant_id,
if (OB_FAIL(add_sequence(sql_client,
tenant_id,
new_table_schema.get_table_id(),
new_column_schema.get_column_id(),
new_table_schema.get_auto_increment(),
@ -2450,7 +2452,7 @@ int ObTableSqlService::create_table(ObTableSchema &table,
LOG_WARN("check ddl allowd failed", K(ret), K(table));
}
if (OB_SUCCESS == ret && 0 != table.get_autoinc_column_id()) {
if (OB_FAIL(add_sequence(tenant_id, table.get_table_id(),
if (OB_FAIL(add_sequence(sql_client, tenant_id, table.get_table_id(),
table.get_autoinc_column_id(), table.get_auto_increment(),
table.get_truncate_version()))) {
LOG_WARN("insert sequence record faild", K(ret), K(table));
@ -3915,56 +3917,45 @@ int ObTableSqlService::update_data_table_schema_version(
return ret;
}
int ObTableSqlService::add_sequence(const uint64_t tenant_id,
int ObTableSqlService::add_sequence(ObISQLClient &sql_client,
const uint64_t tenant_id,
const uint64_t table_id,
const uint64_t column_id,
const uint64_t auto_increment,
const int64_t truncate_version)
{
int ret = OB_SUCCESS;
ObMySQLTransaction trans;
// FIXME:__all_time_zone contains auto increment column. Cyclic dependence may occur.
const uint64_t exec_tenant_id = tenant_id;
if (OB_FAIL(trans.start(sql_proxy_, tenant_id, false))) {
LOG_WARN("failed to start trans, ", K(ret), K(tenant_id));
ObSqlString sql;
ObSqlString values;
if (OB_FAIL(sql.append_fmt("INSERT IGNORE INTO %s (", OB_ALL_AUTO_INCREMENT_TNAME))) {
LOG_WARN("append table name failed, ", K(ret));
} else {
ObSqlString sql;
ObSqlString values;
if (OB_FAIL(sql.append_fmt("INSERT IGNORE INTO %s (", OB_ALL_AUTO_INCREMENT_TNAME))) {
LOG_WARN("append table name failed, ", K(ret));
} else {
SQL_COL_APPEND_VALUE(sql, values, ObSchemaUtils::get_extract_tenant_id(
exec_tenant_id, tenant_id), "tenant_id", "%lu");
SQL_COL_APPEND_VALUE(sql, values, ObSchemaUtils::get_extract_schema_id(
exec_tenant_id, table_id), "sequence_key", "%lu");
SQL_COL_APPEND_VALUE(sql, values, column_id, "column_id", "%lu");
SQL_COL_APPEND_VALUE(sql, values, 0 == auto_increment ? 1 : auto_increment, "sequence_value", "%lu");
SQL_COL_APPEND_VALUE(sql, values, 0 == auto_increment ? 0 : auto_increment - 1, "sync_value", "%lu");
SQL_COL_APPEND_VALUE(sql, values, truncate_version, "truncate_version", "%ld");
}
if (OB_SUCC(ret)) {
if (OB_FAIL(sql.append_fmt(", gmt_modified) VALUES (%.*s, now(6))",
static_cast<int32_t>(values.length()), values.ptr()))) {
LOG_WARN("append sql failed, ", K(ret));
}
}
if (OB_SUCC(ret)) {
int64_t affected_rows = 0;
if (OB_FAIL(trans.write(exec_tenant_id, sql.ptr(), affected_rows))) {
LOG_WARN("fail to execute. ", "sql", sql.ptr(), K(ret));
} else {
if (!is_zero_row(affected_rows) && !is_single_row(affected_rows)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected value", K(affected_rows), "sql", sql.ptr(), K(ret));
}
}
SQL_COL_APPEND_VALUE(sql, values, ObSchemaUtils::get_extract_tenant_id(
exec_tenant_id, tenant_id), "tenant_id", "%lu");
SQL_COL_APPEND_VALUE(sql, values, ObSchemaUtils::get_extract_schema_id(
exec_tenant_id, table_id), "sequence_key", "%lu");
SQL_COL_APPEND_VALUE(sql, values, column_id, "column_id", "%lu");
SQL_COL_APPEND_VALUE(sql, values, 0 == auto_increment ? 1 : auto_increment, "sequence_value", "%lu");
SQL_COL_APPEND_VALUE(sql, values, 0 == auto_increment ? 0 : auto_increment - 1, "sync_value", "%lu");
SQL_COL_APPEND_VALUE(sql, values, truncate_version, "truncate_version", "%ld");
}
if (OB_SUCC(ret)) {
if (OB_FAIL(sql.append_fmt(", gmt_modified) VALUES (%.*s, now(6))",
static_cast<int32_t>(values.length()), values.ptr()))) {
LOG_WARN("append sql failed, ", K(ret));
}
}
if (trans.is_started()) {
int temp_ret = OB_SUCCESS;
if (OB_SUCCESS != (temp_ret = trans.end(OB_SUCC(ret)))) {
LOG_WARN("trans end failed", "is_commit", OB_SUCCESS == ret, K(temp_ret));
ret = (OB_SUCC(ret)) ? temp_ret : ret;
if (OB_SUCC(ret)) {
int64_t affected_rows = 0;
if (OB_FAIL(sql_client.write(exec_tenant_id, sql.ptr(), affected_rows))) {
LOG_WARN("fail to execute. ", "sql", sql.ptr(), K(ret));
} else {
if (!is_zero_row(affected_rows) && !is_single_row(affected_rows)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected value", K(affected_rows), "sql", sql.ptr(), K(ret));
}
}
}
return ret;

View File

@ -314,7 +314,8 @@ private:
int delete_constraint(common::ObISQLClient &sql_client,
const ObTableSchema &table_schema,
const int64_t new_schema_version);
int add_sequence(const uint64_t tenant_id,
int add_sequence(common::ObISQLClient &sql_client,
const uint64_t tenant_id,
const uint64_t table_id,
const uint64_t column_id,
const uint64_t auto_increment,

View File

@ -455,7 +455,7 @@ private:
class ObDASDMLIterator : public common::ObNewRowIterator
{
public:
static const int64_t DEFAULT_BATCH_SIZE = 1;
static const int64_t DEFAULT_BATCH_SIZE = 256;
public:
ObDASDMLIterator(const ObDASDMLBaseCtDef *das_ctdef,
ObDASWriteBuffer &write_buffer,

View File

@ -157,10 +157,20 @@ int ObDASInsertOp::insert_row_with_fetch()
ObAccessService *as = MTL(ObAccessService *);
ObDMLBaseParam dml_param;
ObDASDMLIterator dml_iter(ins_ctdef_, insert_buffer_, op_alloc_);
storage::ObStoreCtxGuard store_ctx_guard;
if (ins_ctdef_->table_rowkey_types_.empty()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("table_rowkey_types is invalid", K(ret));
} else if (OB_FAIL(ObDMLService::init_dml_param(*ins_ctdef_, *ins_rtdef_, *snapshot_, write_branch_id_, op_alloc_, dml_param))) {
} else if (OB_FAIL(as->get_write_store_ctx_guard(ls_id_,
ins_rtdef_->timeout_ts_,
*trans_desc_,
*snapshot_,
write_branch_id_,
store_ctx_guard))) {
LOG_WARN("fail to get_write_store_ctx_guard", K(ret), K(ls_id_));
} else if (OB_FAIL(ObDMLService::init_dml_param(*ins_ctdef_, *ins_rtdef_,
*snapshot_, write_branch_id_, op_alloc_, store_ctx_guard, dml_param))) {
LOG_WARN("init dml param failed", K(ret), KPC_(ins_ctdef), KPC_(ins_rtdef));
} else if (OB_ISNULL(buf = op_alloc_.alloc(sizeof(ObDASConflictIterator)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
@ -226,6 +236,7 @@ int ObDASInsertOp::insert_row_with_fetch()
*snapshot_,
write_branch_id_,
op_alloc_,
store_ctx_guard,
dml_param))) {
LOG_WARN("init index dml param failed", K(ret), KPC(index_ins_ctdef), KPC(index_ins_rtdef));
}

View File

@ -55,7 +55,17 @@ int ObDASLockOp::open_op()
ObDASDMLIterator dml_iter(lock_ctdef_, lock_buffer_, op_alloc_);
ObAccessService *as = MTL(ObAccessService *);
if (OB_FAIL(ObDMLService::init_dml_param(*lock_ctdef_, *lock_rtdef_, *snapshot_, write_branch_id_, op_alloc_, dml_param))) {
storage::ObStoreCtxGuard store_ctx_guard;
if (OB_FAIL(as->get_write_store_ctx_guard(ls_id_,
lock_rtdef_->timeout_ts_,
*trans_desc_,
*snapshot_,
write_branch_id_,
store_ctx_guard))) {
LOG_WARN("fail to get_write_access_tx_ctx_guard", K(ret), K(ls_id_));
} else if (OB_FAIL(ObDMLService::init_dml_param(
*lock_ctdef_, *lock_rtdef_, *snapshot_, write_branch_id_, op_alloc_, store_ctx_guard, dml_param))) {
LOG_WARN("init dml param failed", K(ret));
} else if (OB_FAIL(as->lock_rows(ls_id_,
tablet_id_,

View File

@ -1184,6 +1184,7 @@ int ObDMLService::init_dml_param(const ObDASDMLBaseCtDef &base_ctdef,
transaction::ObTxReadSnapshot &snapshot,
const int16_t write_branch_id,
ObIAllocator &das_alloc,
storage::ObStoreCtxGuard &store_ctx_gurad,
storage::ObDMLBaseParam &dml_param)
{
int ret = OB_SUCCESS;
@ -1200,6 +1201,7 @@ int ObDMLService::init_dml_param(const ObDASDMLBaseCtDef &base_ctdef,
dml_param.dml_allocator_ = &das_alloc;
dml_param.snapshot_ = snapshot;
dml_param.branch_id_ = write_branch_id;
dml_param.store_ctx_guard_ = &store_ctx_gurad;
if (base_ctdef.is_batch_stmt_) {
dml_param.write_flag_.set_is_dml_batch_opt();
}

View File

@ -15,6 +15,8 @@
#include "sql/engine/dml/ob_dml_ctx_define.h"
#include "sql/das/ob_das_context.h"
#include "ob_table_modify_op.h"
#include "storage/tx_storage/ob_access_service.h"
namespace oceanbase
{
namespace sql
@ -144,6 +146,7 @@ public:
transaction::ObTxReadSnapshot &snapshot,
const int16_t write_branch_id,
common::ObIAllocator &das_alloc,
storage::ObStoreCtxGuard &store_ctx_gurad,
storage::ObDMLBaseParam &dml_param);
static int init_das_dml_rtdef(ObDMLRtCtx &dml_rtctx,
const ObDASDMLBaseCtDef &das_ctdef,
@ -331,40 +334,54 @@ int ObDASIndexDMLAdaptor<N, DMLIterator>::write_tablet(DMLIterator &iter, int64_
if (OB_FAIL(write_tablet_with_ignore(iter, affected_rows))) {
LOG_WARN("write tablet with ignore failed", K(ret));
}
} else if (OB_FAIL(ObDMLService::init_dml_param(*ctdef_, *rtdef_, *snapshot_, write_branch_id_, *das_allocator_, dml_param_))) {
SQL_DAS_LOG(WARN, "init dml param failed", K(ret), K(ctdef_->table_id_), K(ctdef_->index_tid_));
} else if (OB_FAIL(write_rows(ls_id_, tablet_id_, *ctdef_, *rtdef_, iter, affected_rows))) {
SQL_DAS_LOG(WARN, "write rows failed", K(ret),
K(ls_id_), K(tablet_id_), K(ctdef_->table_id_), K(ctdef_->index_tid_));
} else if (related_ctdefs_ != nullptr && !related_ctdefs_->empty()) {
//write local index
for (int64_t i = 0; OB_SUCC(ret) && i < related_ctdefs_->count(); ++i) {
const CtDefType *related_ctdef = static_cast<const CtDefType*>(related_ctdefs_->at(i));
RtDefType *related_rtdef = static_cast<RtDefType*>(related_rtdefs_->at(i));
ObTabletID related_tablet_id = related_tablet_ids_->at(i);
int64_t index_affected_rows = 0;
SQL_DAS_LOG(DEBUG, "rewind iterator and write local index tablet",
K(ls_id_), K(related_tablet_id), K(related_ctdef->table_id_), K(related_ctdef->index_tid_));
if (OB_FAIL(iter.rewind(related_ctdef))) {
SQL_DAS_LOG(WARN, "rewind iterator failed", K(ret));
} else if (OB_FAIL(ObDMLService::init_dml_param(*related_ctdef, *related_rtdef, *snapshot_, write_branch_id_, *das_allocator_, dml_param_))) {
SQL_DAS_LOG(WARN, "init index dml param failed", K(ret),
K(related_ctdef->table_id_), K(related_ctdef->index_tid_));
} else if (OB_FAIL(write_rows(ls_id_,
related_tablet_id,
*related_ctdef,
*related_rtdef,
iter,
index_affected_rows))) {
SQL_DAS_LOG(WARN, "write local index rows failed", K(ret),
K(related_tablet_id), K(related_ctdef->table_id_), K(related_ctdef->index_tid_));
} else if (OB_FAIL(ObDMLService::check_local_index_affected_rows(affected_rows,
index_affected_rows,
*ctdef_,
*rtdef_,
*related_ctdef,
*related_rtdef))) {
SQL_DAS_LOG(WARN, "check local index affected rows failed", K(ret));
} else {
ObAccessService *as = MTL(ObAccessService *);
storage::ObStoreCtxGuard store_ctx_guard;
if (OB_FAIL(as->get_write_store_ctx_guard(ls_id_,
rtdef_->timeout_ts_,
*tx_desc_,
*snapshot_,
write_branch_id_,
store_ctx_guard))) {
LOG_WARN("fail to get_write_store_ctx_guard", K(ret), K(ls_id_));
} else if (OB_FAIL(ObDMLService::init_dml_param(
*ctdef_, *rtdef_, *snapshot_, write_branch_id_, *das_allocator_, store_ctx_guard, dml_param_))) {
SQL_DAS_LOG(WARN, "init dml param failed", K(ret), K(ctdef_->table_id_), K(ctdef_->index_tid_));
} else if (OB_FAIL(write_rows(ls_id_, tablet_id_, *ctdef_, *rtdef_, iter, affected_rows))) {
SQL_DAS_LOG(WARN, "write rows failed", K(ret),
K(ls_id_), K(tablet_id_), K(ctdef_->table_id_), K(ctdef_->index_tid_));
} else if (related_ctdefs_ != nullptr && !related_ctdefs_->empty()) {
//write local index
for (int64_t i = 0; OB_SUCC(ret) && i < related_ctdefs_->count(); ++i) {
const CtDefType *related_ctdef = static_cast<const CtDefType*>(related_ctdefs_->at(i));
RtDefType *related_rtdef = static_cast<RtDefType*>(related_rtdefs_->at(i));
ObTabletID related_tablet_id = related_tablet_ids_->at(i);
int64_t index_affected_rows = 0;
SQL_DAS_LOG(DEBUG, "rewind iterator and write local index tablet",
K(ls_id_), K(related_tablet_id), K(related_ctdef->table_id_), K(related_ctdef->index_tid_));
if (OB_FAIL(iter.rewind(related_ctdef))) {
SQL_DAS_LOG(WARN, "rewind iterator failed", K(ret));
} else if (OB_FAIL(ObDMLService::init_dml_param(*related_ctdef, *related_rtdef,
*snapshot_, write_branch_id_, *das_allocator_, store_ctx_guard, dml_param_))) {
SQL_DAS_LOG(WARN, "init index dml param failed", K(ret),
K(related_ctdef->table_id_), K(related_ctdef->index_tid_));
} else if (OB_FAIL(write_rows(ls_id_,
related_tablet_id,
*related_ctdef,
*related_rtdef,
iter,
index_affected_rows))) {
SQL_DAS_LOG(WARN, "write local index rows failed", K(ret),
K(related_tablet_id), K(related_ctdef->table_id_), K(related_ctdef->index_tid_));
} else if (OB_FAIL(ObDMLService::check_local_index_affected_rows(affected_rows,
index_affected_rows,
*ctdef_,
*rtdef_,
*related_ctdef,
*related_rtdef))) {
SQL_DAS_LOG(WARN, "check local index affected rows failed", K(ret));
}
}
}
}
@ -379,6 +396,7 @@ int ObDASIndexDMLAdaptor<N, DMLIterator>::write_tablet_with_ignore(DMLIterator &
affected_rows = 0;
const ObDASWriteBuffer::DmlRow *dml_row = nullptr;
ObDASWriteBuffer::Iterator write_iter;
ObAccessService *as = MTL(ObAccessService *);
const bool with_local_index = related_ctdefs_ != nullptr && !related_ctdefs_->empty();
if (OB_FAIL(iter.get_write_buffer().begin(write_iter))) {
LOG_WARN("begin write iterator failed", K(ret));
@ -405,7 +423,17 @@ int ObDASIndexDMLAdaptor<N, DMLIterator>::write_tablet_with_ignore(DMLIterator &
SQL_DAS_LOG(TRACE, "write table dml row with ignore", KPC(dml_row), K(ls_id_), K(tablet_id_),
K(ctdef_->table_id_), K(ctdef_->index_tid_));
DMLIterator single_row_iter(ctdef_, single_row_buffer, *das_allocator_);
if (OB_FAIL(ObDMLService::init_dml_param(*ctdef_, *rtdef_, *snapshot_, write_branch_id_, *das_allocator_, dml_param_))) {
storage::ObStoreCtxGuard store_ctx_guard;
if (OB_FAIL(as->get_write_store_ctx_guard(ls_id_,
rtdef_->timeout_ts_,
*tx_desc_,
*snapshot_,
write_branch_id_,
store_ctx_guard))) {
LOG_WARN("fail to get_write_store_ctx_guard", K(ret), K(ls_id_));
} else if (OB_FAIL(ObDMLService::init_dml_param(*ctdef_, *rtdef_, *snapshot_, write_branch_id_,
*das_allocator_, store_ctx_guard, dml_param_))) {
SQL_DAS_LOG(WARN, "init dml param failed", K(ret), KPC_(ctdef), KPC_(rtdef));
} else if (with_local_index && FALSE_IT(dml_param_.write_flag_.set_skip_flush_redo())) {
} else if (OB_FAIL(write_rows(ls_id_,
@ -432,6 +460,7 @@ int ObDASIndexDMLAdaptor<N, DMLIterator>::write_tablet_with_ignore(DMLIterator &
*snapshot_,
write_branch_id_,
*das_allocator_,
store_ctx_guard,
dml_param_))) {
SQL_DAS_LOG(WARN, "init index dml param failed", K(ret),
KPC(related_ctdef), KPC(related_rtdef));

View File

@ -730,31 +730,41 @@ int ObSqlTransControl::stmt_sanity_check_(ObSQLSessionInfo *session,
ObPhysicalPlanCtx *plan_ctx)
{
int ret = OB_SUCCESS;
auto current_consist_level = plan_ctx->get_consistency_level();
ObConsistencyLevel current_consist_level = plan_ctx->get_consistency_level();
CK (current_consist_level != ObConsistencyLevel::INVALID_CONSISTENCY);
bool is_plain_select = plan->is_plain_select();
// adjust stmt's consistency level
if (OB_SUCC(ret)) {
// Weak read statement with inner table should be converted to strong read.
// For example, schema refresh statement;
if (plan->is_contain_inner_table() ||
(!is_plain_select && current_consist_level != ObConsistencyLevel::STRONG)) {
(!plan->is_plain_select() && current_consist_level != ObConsistencyLevel::STRONG)) {
plan_ctx->set_consistency_level(ObConsistencyLevel::STRONG);
}
}
// check isolation with consistency type
if (OB_SUCC(ret) && session->is_in_transaction()) {
// check consistency type volatile
ObConsistencyLevel current_consist_level = plan_ctx->get_consistency_level();
if (current_consist_level == ObConsistencyLevel::WEAK) {
// read write transaction
if (!session->get_tx_desc()->is_clean()) {
plan_ctx->set_consistency_level(ObConsistencyLevel::STRONG);
}
}
// check isolation with consistency type
auto iso = session->get_tx_desc()->get_isolation_level();
auto cl = plan_ctx->get_consistency_level();
if (ObConsistencyLevel::WEAK == cl && (iso == ObTxIsolationLevel::SERIAL || iso == ObTxIsolationLevel::RR)) {
if (ObConsistencyLevel::WEAK == cl &&
(iso == ObTxIsolationLevel::SERIAL || iso == ObTxIsolationLevel::RR)) {
ret = OB_NOT_SUPPORTED;
TRANS_LOG(ERROR, "statement of weak consistency is not allowed under SERIALIZABLE isolation",
KR(ret), "trans_id", session->get_tx_id(), "consistency_level", cl);
LOG_USER_ERROR(OB_NOT_SUPPORTED, "weak consistency under SERIALIZABLE and REPEATABLE-READ isolation level");
}
}
return ret;
}

View File

@ -46,6 +46,7 @@ struct ObStorageDatum;
}
namespace storage
{
class ObStoreCtxGuard;
//
// Project storage output row to expression array, the core project logic is:
@ -177,6 +178,7 @@ struct ObDMLBaseParam
prelock_(false),
is_batch_stmt_(false),
dml_allocator_(nullptr),
store_ctx_guard_(nullptr),
encrypt_meta_(NULL),
encrypt_meta_legacy_(),
spec_seq_no_(),
@ -203,7 +205,7 @@ struct ObDMLBaseParam
bool prelock_;
bool is_batch_stmt_;
mutable common::ObIAllocator *dml_allocator_;
mutable ObStoreCtxGuard *store_ctx_guard_;
// table_id_, local_index_id_ and its encrypt_meta
const common::ObIArray<transaction::ObEncryptMetaCache> *encrypt_meta_;
common::ObSEArray<transaction::ObEncryptMetaCache, 1> encrypt_meta_legacy_;
@ -218,7 +220,7 @@ struct ObDMLBaseParam
// write flag for inner write processing
concurrent_control::ObWriteFlag write_flag_;
bool check_schema_version_;
bool is_valid() const { return (timeout_ > 0 && schema_version_ >= 0); }
bool is_valid() const { return (timeout_ > 0 && schema_version_ >= 0) && nullptr != store_ctx_guard_; }
bool is_direct_insert() const { return (direct_insert_task_id_ > 0); }
DECLARE_TO_STRING;
};

View File

@ -67,7 +67,7 @@ public:
return is_inited_ && exist_helper_.is_valid() && delete_count_ >= 0
&& rowkeys_.count() >= delete_count_ && OB_NOT_NULL(rows_);
}
OB_INLINE int32_t get_rowkey_cnt() const
OB_INLINE int64_t get_rowkey_cnt() const
{
return rowkeys_.count();
}
@ -169,6 +169,11 @@ public:
{
rowkeys_[idx].marked_rowkey_.mark_row_non_existent();
}
inline bool is_row_checked(const int64_t idx) const
{
const blocksstable::ObMarkedRowkey &marked_rowkey = rowkeys_[idx].marked_rowkey_;
return marked_rowkey.is_checked();
}
inline bool is_row_skipped(const int64_t idx) const
{
const blocksstable::ObMarkedRowkey &marked_rowkey = rowkeys_[idx].marked_rowkey_;
@ -267,7 +272,7 @@ private:
const blocksstable::ObStorageDatumUtils *datum_utils_;
blocksstable::ObDatumRowkey min_key_;
int64_t conflict_rowkey_idx_;
int64_t error_code_;
int error_code_;
int64_t delete_count_;
int16_t rowkey_column_num_;
bool is_inited_;

View File

@ -52,6 +52,7 @@ ObTableIterParam::ObTableIterParam()
has_lob_column_out_(false),
is_for_foreign_check_(false),
limit_prefetch_(false),
is_non_unique_local_index_(false),
ss_rowkey_prefix_cnt_(0),
pd_storage_flag_()
{
@ -98,6 +99,7 @@ void ObTableIterParam::reset()
has_lob_column_out_ = false;
is_for_foreign_check_ = false;
limit_prefetch_ = false;
is_non_unique_local_index_ = false;
ObSSTableIndexFilterFactory::destroy_sstable_index_filter(sstable_index_filter_);
}
@ -181,6 +183,7 @@ DEF_TO_STRING(ObTableIterParam)
K_(has_lob_column_out),
K_(is_for_foreign_check),
K_(limit_prefetch),
K_(is_non_unique_local_index),
K_(ss_rowkey_prefix_cnt));
J_OBJ_END();
return pos;

View File

@ -205,6 +205,7 @@ public:
bool has_lob_column_out_;
bool is_for_foreign_check_;
bool limit_prefetch_;
bool is_non_unique_local_index_;
int64_t ss_rowkey_prefix_cnt_;
sql::ObStoragePushdownFlag pd_storage_flag_;
};

View File

@ -60,7 +60,7 @@ public:
void reuse();
void reset_for_switch();
virtual void reset();
ObAccessService::ObStoreCtxGuard &get_ctx_guard() { return ctx_guard_; }
ObStoreCtxGuard &get_ctx_guard() { return ctx_guard_; }
// A offline ls will disable replay status and kill all part_ctx on the follower.
// We can not read the uncommitted data which has not replay commit log yet.
@ -114,7 +114,7 @@ private:
ObTableAccessContext main_table_ctx_;
ObGetTableParam get_table_param_;
ObAccessService::ObStoreCtxGuard ctx_guard_;
ObStoreCtxGuard ctx_guard_;
ObTableScanParam *scan_param_;
ObTableScanRange table_scan_range_;
ObQueryRowIterator *main_iter_;

View File

@ -381,12 +381,16 @@ int ObPersistentLobApator::prepare_lob_meta_dml(
int ret = OB_SUCCESS;
if (param.dml_base_param_ == nullptr) {
share::schema::ObTableDMLParam* table_dml_param = nullptr;
void *buf = param.allocator_->alloc(sizeof(ObDMLBaseParam));
ObStoreCtxGuard *store_ctx_guard = nullptr;
void *buf = param.allocator_->alloc(sizeof(ObDMLBaseParam) + sizeof(ObStoreCtxGuard));
if (OB_ISNULL(buf)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to alloc dml base param", K(ret));
} else {
param.dml_base_param_ = new(buf)ObDMLBaseParam();
store_ctx_guard = new((char*)buf + sizeof(ObDMLBaseParam)) ObStoreCtxGuard();
buf = param.allocator_->alloc(sizeof(share::schema::ObTableDMLParam));
if (OB_ISNULL(buf)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
@ -397,7 +401,8 @@ int ObPersistentLobApator::prepare_lob_meta_dml(
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(build_lob_meta_table_dml(param, tenant_id, table_dml_param,
*param.dml_base_param_, param.column_ids_, data_tablet, lob_meta_tablet))) {
*param.dml_base_param_, store_ctx_guard,
param.column_ids_, data_tablet, lob_meta_tablet))) {
LOG_WARN("failed to build meta schema", K(ret), K(data_tablet), K(lob_meta_tablet));
}
} else {
@ -415,6 +420,19 @@ int ObPersistentLobApator::prepare_lob_meta_dml(
LOG_WARN("invalid seq no from param.", K(ret), K(param));
}
}
if (OB_SUCC(ret)) {
param.dml_base_param_->store_ctx_guard_->reset();
ObAccessService *oas = MTL(ObAccessService *);
if (OB_FAIL(oas->get_write_store_ctx_guard(param.ls_id_,
param.timeout_,
*param.tx_desc_,
param.snapshot_,
0,/*branch_id*/
*param.dml_base_param_->store_ctx_guard_,
param.dml_base_param_->spec_seq_no_ ))) {
LOG_WARN("fail to get write store tx ctx guard", K(ret), K(param));
}
}
return ret;
}
@ -423,6 +441,7 @@ int ObPersistentLobApator::build_lob_meta_table_dml(
const uint64_t tenant_id,
ObTableDMLParam* dml_param,
ObDMLBaseParam& dml_base_param,
ObStoreCtxGuard *store_ctx_guard,
ObSEArray<uint64_t, 6>& column_ids,
const ObTabletHandle& data_tablet,
const ObTabletHandle& lob_meta_tablet)
@ -436,8 +455,10 @@ int ObPersistentLobApator::build_lob_meta_table_dml(
dml_base_param.encrypt_meta_ = &dml_base_param.encrypt_meta_legacy_;
dml_base_param.snapshot_ = param.snapshot_;
dml_base_param.check_schema_version_ = false; // lob tablet should not check schema version
dml_base_param.store_ctx_guard_ = store_ctx_guard;
dml_base_param.write_flag_.set_is_insert_up();
dml_base_param.write_flag_.set_lob_aux();
if (param.seq_no_st_.is_valid()) {
if (param.used_seq_cnt_ < param.total_seq_cnt_) {
dml_base_param.spec_seq_no_ = param.seq_no_st_ + param.used_seq_cnt_;

View File

@ -23,6 +23,8 @@ namespace oceanbase
namespace storage
{
class ObStoreCtxGuard;
class ObLobUpdIterator : public ObNewRowIterator
{
public:
@ -137,6 +139,7 @@ private:
const uint64_t tenant_id,
ObTableDMLParam* dml_param,
ObDMLBaseParam& dml_base_param,
ObStoreCtxGuard *store_ctx_guard,
ObSEArray<uint64_t, 6>& column_ids,
const ObTabletHandle& data_tablet,
const ObTabletHandle& lob_meta_tablet);

View File

@ -17,6 +17,7 @@
#include "storage/tx/ob_trans_service.h"
#include "storage/blocksstable/ob_datum_row.h"
#include "ob_lob_meta.h"
#include "storage/tx_storage/ob_access_service.h"
namespace oceanbase
{
@ -26,6 +27,16 @@ using namespace transaction;
namespace storage
{
ObLobAccessParam::~ObLobAccessParam()
{
if (OB_NOT_NULL(dml_base_param_)) {
if (OB_NOT_NULL(dml_base_param_->store_ctx_guard_)) {
dml_base_param_->store_ctx_guard_->~ObStoreCtxGuard();
}
dml_base_param_->~ObDMLBaseParam();
}
}
ObCollationType ObLobCharsetUtil::get_collation_type(ObObjType type, ObCollationType ori_coll_type)
{
ObCollationType coll_type = ori_coll_type;

View File

@ -67,11 +67,8 @@ struct ObLobAccessParam {
inrow_read_nocopy_(false), inrow_threshold_(OB_DEFAULT_LOB_INROW_THRESHOLD), schema_chunk_size_(OB_DEFAULT_LOB_CHUNK_SIZE), spec_lob_id_(),
remote_query_ctx_(nullptr)
{}
~ObLobAccessParam() {
if (OB_NOT_NULL(dml_base_param_)) {
dml_base_param_->~ObDMLBaseParam();
}
}
~ObLobAccessParam();
public:
bool is_full_read() const { return op_type_ == ObLobDataOutRowCtx::OpType::SQL && 0 == offset_ && (len_ == byte_size_ || INT64_MAX == len_ || UINT64_MAX == len_); }

View File

@ -620,12 +620,33 @@ int ObFreezer::ls_freeze_task()
FLOG_INFO("[Freezer] logstream_freeze success", K(ls_id), K(freeze_clock));
stat_.end_set_freeze_stat(ObFreezeState::FINISH, ObTimeUtility::current_time(), ret);
unset_freeze_();
(void)try_freeze_tx_data_();
return ret;
}
void ObFreezer::try_freeze_tx_data_()
{
int ret = OB_SUCCESS;
const int64_t MAX_RETRY_DURATION = 10LL * 1000LL * 1000LL; // 10 seconds
int64_t retry_times = 0;
int64_t start_freeze_ts = ObClockGenerator::getClock();
do {
if (OB_FAIL(ls_->get_tx_table()->self_freeze_task())) {
if (OB_EAGAIN == ret) {
// sleep and retry
retry_times++;
usleep(100);
} else {
STORAGE_LOG(WARN, "freeze tx data table failed", KR(ret), K(get_ls_id()));
}
}
} while (OB_EAGAIN == ret && ObClockGenerator::getClock() - start_freeze_ts < MAX_RETRY_DURATION);
STORAGE_LOG(INFO, "freeze tx data after logstream freeze", KR(ret), K(retry_times), KTIME(start_freeze_ts));
}
// must be used under the protection of ls_lock
int ObFreezer::check_ls_state()
{

View File

@ -262,6 +262,7 @@ private:
int inc_freeze_clock();
void unset_freeze_();
void undo_freeze_();
void try_freeze_tx_data_();
/* inner subfunctions for freeze process */
int inner_logstream_freeze(ObFuture<int> *result);

View File

@ -2879,7 +2879,6 @@ int ObLSTabletService::insert_row(
lob_allocator,
ObDmlFlag::DF_INSERT);
ObIAllocator &work_allocator = run_ctx.allocator_;
duplicated_rows = nullptr;
ObStoreRow &tbl_row = run_ctx.tbl_row_;
const ObRelativeTable &data_table = run_ctx.relative_table_;
if (OB_FAIL(prepare_dml_running_ctx(&column_ids, nullptr, tablet_handle, run_ctx))) {
@ -2887,49 +2886,39 @@ int ObLSTabletService::insert_row(
} else {
tbl_row.flag_.set_flag(ObDmlFlag::DF_INSERT);
tbl_row.row_val_ = row;
if (!dml_param.table_param_->get_data_table().is_mlog_table()
&& OB_FAIL(get_conflict_rows(tablet_handle,
run_ctx,
flag,
duplicated_column_ids,
tbl_row.row_val_,
duplicated_rows))) {
LOG_WARN("failed to get conflict row(s)", K(ret), K(duplicated_column_ids), K(row));
} else if (nullptr == duplicated_rows) {
if (OB_FAIL(insert_row_to_tablet(tablet_handle, run_ctx, tbl_row))) {
if (OB_TRY_LOCK_ROW_CONFLICT != ret) {
LOG_WARN("failed to write row", K(ret));
}
if (OB_ERR_PRIMARY_KEY_DUPLICATE == ret) {
int tmp_ret = OB_SUCCESS;
// For primary key conflicts caused by concurrent insertions within
// a statement, we need to return the corresponding duplicated_rows.
// However, under circumstances where an exception may unexpectedly
// prevent us from reading the conflicting rows within statements,
// at such times, it becomes necessary for us to mock the rows.
if (OB_TMP_FAIL(get_conflict_rows(tablet_handle,
run_ctx,
flag,
duplicated_column_ids,
tbl_row.row_val_,
duplicated_rows))) {
LOG_WARN("failed to get conflict row(s)", K(ret), K(duplicated_column_ids), K(row));
if (OB_FAIL(insert_row_to_tablet(true /*check_exist*/,
tablet_handle,
run_ctx,
tbl_row))) {
if (OB_ERR_PRIMARY_KEY_DUPLICATE == ret) {
int tmp_ret = OB_SUCCESS;
// For primary key conflicts caused by concurrent insertions within
// a statement, we need to return the corresponding duplicated_rows.
// However, under circumstances where an exception may unexpectedly
// prevent us from reading the conflicting rows within statements,
// at such times, it becomes necessary for us to mock the rows.
if (OB_TMP_FAIL(get_conflict_rows(tablet_handle,
run_ctx,
flag,
duplicated_column_ids,
tbl_row.row_val_,
duplicated_rows))) {
LOG_WARN("failed to get conflict row(s)", K(ret), K(duplicated_column_ids), K(row));
ret = tmp_ret;
} else if (nullptr == duplicated_rows) {
if (OB_TMP_FAIL(mock_duplicated_rows_(duplicated_rows))) {
LOG_WARN("failed to mock duplicated row(s)", K(ret), K(duplicated_column_ids), K(row));
ret = tmp_ret;
} else if (nullptr == duplicated_rows) {
if (OB_TMP_FAIL(mock_duplicated_rows_(duplicated_rows))) {
LOG_WARN("failed to mock duplicated row(s)", K(ret), K(duplicated_column_ids), K(row));
ret = tmp_ret;
}
}
}
} else {
LOG_DEBUG("succeeded to insert row", K(ret), K(row));
affected_rows = 1;
EVENT_INC(STORAGE_INSERT_ROW_COUNT);
}
if (OB_TRY_LOCK_ROW_CONFLICT != ret) {
LOG_WARN("failed to write row", K(ret));
}
} else {
ret = OB_ERR_PRIMARY_KEY_DUPLICATE;
LOG_DEBUG("succeeded to insert row", K(ret), K(row));
affected_rows = 1;
EVENT_INC(STORAGE_INSERT_ROW_COUNT);
}
}
lob_allocator.reset();
@ -3191,7 +3180,10 @@ int ObLSTabletService::put_rows(
if (cur_time > dml_param.timeout_) {
ret = OB_TIMEOUT;
LOG_WARN("query timeout", K(ret), K(cur_time), K(dml_param));
} else if (OB_FAIL(insert_row_to_tablet(tmp_handle, run_ctx, tbl_row))) {
} else if (OB_FAIL(insert_row_to_tablet(false/*check_exist*/,
tmp_handle,
run_ctx,
tbl_row))) {
if (OB_TRY_LOCK_ROW_CONFLICT != ret && OB_TRANSACTION_SET_VIOLATION != ret) {
LOG_WARN("failed to write row", K(ret));
}
@ -4068,7 +4060,7 @@ int ObLSTabletService::insert_tablet_rows(
int ret = OB_SUCCESS;
ObRelativeTable &table = run_ctx.relative_table_;
const bool check_exists = !table.is_storage_index_table() || table.is_unique_index();
bool exists = false;
// 1. Defensive checking of new rows.
if (GCONF.enable_defensive_check()) {
for (int64_t i = 0; OB_SUCC(ret) && i < row_count; i++) {
@ -4079,19 +4071,18 @@ int ObLSTabletService::insert_tablet_rows(
}
}
// 2. Check uniqueness constraint in memetable only(active + frozen).
// It would be more efficient and elegant to completely merge the uniqueness constraint
// and write conflict checking, but the implementation currently is to minimize intrusion
// into the memtable.
if (OB_FAIL(ret)) {
} else if (check_exists && OB_FAIL(tablet_handle.get_obj()->rowkeys_exists(run_ctx.store_ctx_, table,
rows_info, exists))) {
LOG_WARN("Failed to check the uniqueness constraint", K(ret), K(rows_info));
} else if (exists) {
ret = OB_ERR_PRIMARY_KEY_DUPLICATE;
blocksstable::ObDatumRowkey &duplicate_rowkey = rows_info.get_conflict_rowkey();
LOG_WARN("Rowkey already exist", K(ret), K(table), K(duplicate_rowkey));
}
// 2. Skip check uniqueness constraint on both memetables and sstables It
// would be more efficient and elegant to completely merge the uniqueness
// constraint and write conflict checking.
//
// if (check_exists && OB_FAIL(tablet_handle.get_obj()->rowkeys_exists(run_ctx.store_ctx_, table,
// rows_info, exists))) {
// LOG_WARN("Failed to check the uniqueness constraint", K(ret), K(rows_info));
// } else if (exists) {
// ret = OB_ERR_PRIMARY_KEY_DUPLICATE;
// blocksstable::ObDatumRowkey &duplicate_rowkey = rows_info.get_conflict_rowkey();
// LOG_WARN("Rowkey already exist", K(ret), K(table), K(duplicate_rowkey));
// }
// 3. Insert rows with uniqueness constraint and write conflict checking.
// Check write conflict in memtable + sstable.
@ -4820,7 +4811,12 @@ int ObLSTabletService::process_old_row(
del_row.flag_.set_flag(ObDmlFlag::DF_UPDATE);
ObSEArray<int64_t, 8> update_idx;
if (OB_FAIL(tablet_handle.get_obj()->update_row(relative_table,
run_ctx.store_ctx_, col_descs, update_idx, del_row, new_tbl_row, run_ctx.dml_param_.encrypt_meta_))) {
run_ctx.store_ctx_,
col_descs,
update_idx,
del_row,
new_tbl_row,
run_ctx.dml_param_.encrypt_meta_))) {
if (OB_TRY_LOCK_ROW_CONFLICT != ret && OB_TRANSACTION_SET_VIOLATION != ret) {
LOG_WARN("failed to write data tablet row", K(ret), K(del_row), K(new_tbl_row));
}
@ -4947,7 +4943,11 @@ int ObLSTabletService::process_data_table_row(
}
} else {
if (OB_FAIL(data_tablet.get_obj()->insert_row_without_rowkey_check(relative_table,
ctx, col_descs, new_row, run_ctx.dml_param_.encrypt_meta_))) {
ctx,
false /*check_exist*/,
col_descs,
new_row,
run_ctx.dml_param_.encrypt_meta_))) {
if (OB_TRY_LOCK_ROW_CONFLICT != ret && OB_TRANSACTION_SET_VIOLATION != ret) {
LOG_WARN("failed to update to row", K(ret), K(new_row));
}
@ -5332,9 +5332,10 @@ int ObLSTabletService::get_next_row_from_iter(
}
int ObLSTabletService::insert_row_to_tablet(
ObTabletHandle &tablet_handle,
ObDMLRunningCtx &run_ctx,
ObStoreRow &tbl_row)
const bool check_exist,
ObTabletHandle &tablet_handle,
ObDMLRunningCtx &run_ctx,
ObStoreRow &tbl_row)
{
int ret = OB_SUCCESS;
ObStoreCtx &store_ctx = run_ctx.store_ctx_;
@ -5357,6 +5358,7 @@ int ObLSTabletService::insert_row_to_tablet(
if (OB_FAIL(tablet_handle.get_obj()->insert_row_without_rowkey_check(
relative_table,
store_ctx,
check_exist /*check_exist*/,
col_descs,
tbl_row,
run_ctx.dml_param_.encrypt_meta_))) {

View File

@ -748,6 +748,7 @@ private:
ObStoreRow &store_row,
const bool need_copy_cells);
static int insert_row_to_tablet(
const bool check_exist,
ObTabletHandle &tablet_handle,
ObDMLRunningCtx &run_ctx,
ObStoreRow &tbl_row);

View File

@ -176,6 +176,14 @@ public:
tx_table_guards_.src_tx_table_guard_ = tx_table_guard;
tx_table_guards_.src_ls_handle_ = src_ls_handle;
}
void set_write_flag(const concurrent_control::ObWriteFlag write_flag)
{
write_flag_ = write_flag;
}
void set_abs_lock_timeout_ts(const int64_t abs_lock_timeout)
{
abs_lock_timeout_ts_ = abs_lock_timeout;
}
void init_replay(transaction::ObPartTransCtx &tx_ctx,
ObMemtableCtx &mem_ctx,
const transaction::ObTransID &tx_id)

View File

@ -53,7 +53,8 @@ int ObIMvccCtx::register_row_commit_cb(
const ObRowData *old_row,
ObMemtable *memtable,
const transaction::ObTxSEQ seq_no,
const int64_t column_cnt)
const int64_t column_cnt,
const bool is_non_unique_local_index)
{
int ret = OB_SUCCESS;
const bool is_replay = false;
@ -80,7 +81,8 @@ int ObIMvccCtx::register_row_commit_cb(
old_row,
is_replay,
seq_no,
column_cnt);
column_cnt,
is_non_unique_local_index);
cb->set_is_link();
if (OB_FAIL(append_callback(cb))) {
@ -122,7 +124,8 @@ int ObIMvccCtx::register_row_replay_cb(
NULL,
is_replay,
seq_no,
column_cnt);
column_cnt,
false/*is_non_unique_local_index_cb, not setted correctly now, fix later*/);
{
ObRowLatchGuard guard(value->latch_);
cb->link_trans_node();

View File

@ -91,6 +91,7 @@ public: // for mvcc engine invoke
virtual void inc_lock_for_read_retry_count() = 0;
virtual void add_lock_for_read_elapse(const int64_t n) = 0;
virtual int64_t get_lock_for_read_elapse() const = 0;
virtual void on_key_duplication_retry(const ObMemtableKey& key) = 0;
virtual void on_tsc_retry(const ObMemtableKey& key,
const share::SCN snapshot_version,
const share::SCN max_trans_version,
@ -154,7 +155,8 @@ public:
const ObRowData *old_row,
ObMemtable *memtable,
const transaction::ObTxSEQ seq_no,
const int64_t column_cnt);
const int64_t column_cnt,
const bool is_non_unique_local_index);
int register_row_replay_cb(
const ObMemtableKey *key,
ObMvccRow *value,

View File

@ -233,7 +233,8 @@ int ObMvccEngine::estimate_scan_row_count(
int ObMvccEngine::check_row_locked(ObMvccAccessCtx &ctx,
const ObMemtableKey *key,
ObStoreRowLockState &lock_state)
ObStoreRowLockState &lock_state,
ObRowState &row_state)
{
int ret = OB_SUCCESS;
ObMemtableKey stored_key;
@ -247,7 +248,7 @@ int ObMvccEngine::check_row_locked(ObMvccAccessCtx &ctx,
// rewrite ret
ret = OB_SUCCESS;
}
} else if (OB_FAIL(value->check_row_locked(ctx, lock_state))) {
} else if (OB_FAIL(value->check_row_locked(ctx, lock_state, row_state))) {
TRANS_LOG(WARN, "check row locked fail", K(ret), KPC(value), K(ctx), K(lock_state));
}

View File

@ -109,7 +109,8 @@ public:
// also returns tx_id for same txn write and max_trans_version for TSC check
int check_row_locked(ObMvccAccessCtx &ctx,
const ObMemtableKey *key,
storage::ObStoreRowLockState &lock_state);
storage::ObStoreRowLockState &lock_state,
storage::ObRowState &row_state);
// estimate_scan_row_count estimate the row count for the range
int estimate_scan_row_count(const transaction::ObTransID &tx_id,
const ObMvccScanRange &range,

View File

@ -20,6 +20,7 @@
#include "storage/memtable/ob_row_conflict_handler.h"
#include "storage/tx/ob_trans_ctx.h"
#include "storage/ls/ob_ls.h"
#include "storage/access/ob_rows_info.h"
#include "common/ob_clock_generator.h"
namespace oceanbase
@ -348,13 +349,15 @@ void ObMvccValueIterator::move_to_next_node_()
int ObMvccValueIterator::check_row_locked(ObStoreRowLockState &lock_state)
{
int ret = OB_SUCCESS;
storage::ObRowState row_state;
if (IS_NOT_INIT) {
TRANS_LOG(WARN, "not init", KP(this));
ret = OB_NOT_INIT;
} else if (OB_ISNULL(value_)) {
ret = OB_SUCCESS;
TRANS_LOG(WARN, "get value iter but mvcc row in it is null", K(ret));
} else if (OB_FAIL(value_->check_row_locked(*ctx_, lock_state))){
} else if (OB_FAIL(value_->check_row_locked(*ctx_, lock_state, row_state))){
TRANS_LOG(WARN, "check row locked fail", K(ret), KPC(value_), KPC(ctx_), K(lock_state));
}
return ret;

View File

@ -30,6 +30,7 @@
#include "storage/tx/ob_trans_event.h"
#include "storage/memtable/mvcc/ob_mvcc_trans_ctx.h"
#include "storage/blocksstable/ob_datum_row.h"
#include "storage/access/ob_rows_info.h"
namespace oceanbase
{
@ -737,7 +738,11 @@ int ObMvccRow::remove_callback(ObMvccRowCallback &cb)
tx_scheduler = static_cast<transaction::ObPartTransCtx*>(tx_ctx)->get_scheduler();
}
MTL(ObLockWaitMgr*)->transform_row_lock_to_tx_lock(cb.get_tablet_id(), *cb.get_key(), ObTransID(node->tx_id_), tx_scheduler);
MTL(ObLockWaitMgr*)->reset_hash_holder(cb.get_tablet_id(), *cb.get_key(), ObTransID(node->tx_id_));
if (cb.is_non_unique_local_index_cb()) {
// row lock holder is no need to set for non-unique local index, so the reset can be skipped
} else {
MTL(ObLockWaitMgr*)->reset_hash_holder(cb.get_tablet_id(), *cb.get_key(), ObTransID(node->tx_id_));
}
}
}
return ret;
@ -776,6 +781,7 @@ int ObMvccRow::mvcc_write_(ObStoreCtx &ctx,
bool &need_insert = res.need_insert_;
bool &is_new_locked = res.is_new_locked_;
ObStoreRowLockState &lock_state = res.lock_state_;
ObExistFlag &exist_flag = lock_state.exist_flag_;
bool need_retry = true;
while (OB_SUCC(ret) && need_retry) {
@ -784,6 +790,7 @@ int ObMvccRow::mvcc_write_(ObStoreCtx &ctx,
can_insert = true;
need_insert = true;
is_new_locked = true;
exist_flag = ObExistFlag::UNKNOWN;
need_retry = false;
} else {
// Tip 1: The newest node is either delayed cleanout or not depending on
@ -811,6 +818,8 @@ int ObMvccRow::mvcc_write_(ObStoreCtx &ctx,
can_insert = true;
need_insert = true;
is_new_locked = true;
exist_flag =
extract_exist_flag_from_dml_flag(iter->get_dml_flag());
need_retry = false;
} else if (iter->is_aborted()) {
// Case 3: the newest node is aborted and the node must be unlinked,
@ -829,12 +838,16 @@ int ObMvccRow::mvcc_write_(ObStoreCtx &ctx,
can_insert = true;
need_insert = false;
is_new_locked = false;
exist_flag =
extract_exist_flag_from_dml_flag(iter->get_dml_flag());
need_retry = false;
} else {
// Case 4.2: the writer node is not lock node, so we do not insert into it
can_insert = true;
need_insert = true;
is_new_locked = false;
exist_flag =
extract_exist_flag_from_dml_flag(iter->get_dml_flag());
need_retry = false;
}
} else {
@ -851,6 +864,8 @@ int ObMvccRow::mvcc_write_(ObStoreCtx &ctx,
lock_state.is_delayed_cleanout_ = iter->is_delayed_cleanout();
lock_state.mvcc_row_ = this;
lock_state.trans_scn_ = iter->get_scn();
exist_flag =
extract_exist_flag_from_dml_flag(iter->get_dml_flag());
}
}
}
@ -970,6 +985,18 @@ int ObMvccRow::mvcc_write(ObStoreCtx &ctx,
// Tip1: mvcc_write guarantee the tnode will not be inserted if error is reported
(void)mvcc_undo();
}
} else if (node.get_dml_flag() == blocksstable::ObDmlFlag::DF_INSERT &&
res.lock_state_.row_exist()) {
// Case 4. successfully locked while insert into exist row
ret = OB_ERR_PRIMARY_KEY_DUPLICATE;
TRANS_LOG(WARN, "duplicated primary key found", K(ret), K(ctx), K(node),
K(*this), K(res));
if (!res.has_insert()) {
// It may not inserted due to primary key duplicated
} else {
// Tip1: mvcc_write guarantee the tnode will not be inserted if error is reported
(void)mvcc_undo();
}
}
return ret;
}
@ -989,13 +1016,16 @@ int ObMvccRow::mvcc_write(ObStoreCtx &ctx,
* return:
* - OB_SUCCESS
*/
int ObMvccRow::check_row_locked(ObMvccAccessCtx &ctx, ObStoreRowLockState &lock_state)
int ObMvccRow::check_row_locked(ObMvccAccessCtx &ctx,
ObStoreRowLockState &lock_state,
ObRowState &row_state)
{
int ret = OB_SUCCESS;
ObRowLatchGuard guard(latch_);
auto iter = ATOMIC_LOAD(&list_head_);
ObTxTableGuards &tx_table_guards = ctx.get_tx_table_guards();
transaction::ObTxSnapshot &snapshot = ctx.snapshot_;
const SCN snapshot_version = snapshot.version_;
const ObTransID checker_tx_id = ctx.get_tx_id();
ObMvccTransNode *iter = ATOMIC_LOAD(&list_head_);
bool need_retry = true;
while (OB_SUCC(ret) && need_retry) {
@ -1004,21 +1034,24 @@ int ObMvccRow::check_row_locked(ObMvccAccessCtx &ctx, ObStoreRowLockState &lock_
lock_state.is_locked_ = false;
lock_state.trans_version_.set_min();
lock_state.lock_trans_id_.reset();
lock_state.exist_flag_ = ObExistFlag::UNKNOWN;
need_retry = false;
} else {
auto data_tx_id = iter->tx_id_;
if (!(iter->is_committed() || iter->is_aborted())
&& iter->is_delayed_cleanout()
&& OB_FAIL(tx_table_guards.cleanout_tx_node(data_tx_id,
*this,
*iter,
false /*need_row_latch*/))) {
&& OB_FAIL(ctx.get_tx_table_guards().cleanout_tx_node(data_tx_id,
*this,
*iter,
false /*need_row_latch*/))) {
TRANS_LOG(WARN, "cleanout tx state failed", K(ret), K(*this));
} else if (iter->is_committed() || iter->is_elr()) {
// Case 2: the newest node is decided, so node currently is not be locked
lock_state.is_locked_ = false;
lock_state.trans_version_ = get_max_trans_version();
lock_state.lock_trans_id_.reset();
lock_state.exist_flag_ =
extract_exist_flag_from_dml_flag(iter->get_dml_flag());
need_retry = false;
} else if (iter->is_aborted()) {
iter = iter->prev_;
@ -1031,12 +1064,27 @@ int ObMvccRow::check_row_locked(ObMvccAccessCtx &ctx, ObStoreRowLockState &lock_
lock_state.lock_dml_flag_ = iter->get_dml_flag();
lock_state.is_delayed_cleanout_ = iter->is_delayed_cleanout();
lock_state.trans_scn_ = iter->get_scn();
lock_state.exist_flag_ =
extract_exist_flag_from_dml_flag(iter->get_dml_flag());
need_retry = false;
}
}
}
if (OB_SUCC(ret)) {
lock_state.mvcc_row_ = this;
// just for temporary enable the batch insert, so the following code will be
// optimized in the future
if (!lock_state.is_lock_decided()) {
// row is not exist
} else if (lock_state.is_locked(checker_tx_id) ||
lock_state.trans_version_ > snapshot_version) {
// row is locked or tsc
} else {
if (OB_NOT_NULL(iter)) {
row_state.row_dml_flag_ = iter->get_dml_flag();
}
}
}
return ret;
}

View File

@ -21,11 +21,14 @@
#include "storage/ob_i_store.h"
#include "ob_row_latch.h"
#include "storage/memtable/ob_memtable_data.h"
#include "storage/ob_i_store.h"
#include "storage/memtable/mvcc/ob_mvcc_define.h"
namespace oceanbase
{
namespace storage
{
class ObRowState;
}
namespace memtable
{
@ -295,7 +298,9 @@ struct ObMvccRow
// key is the row key for lock
// ctx is the write txn's context, currently the tx_table is the only required field
// lock_state is the check's result
int check_row_locked(ObMvccAccessCtx &ctx, storage::ObStoreRowLockState &lock_state);
int check_row_locked(ObMvccAccessCtx &ctx,
storage::ObStoreRowLockState &lock_state,
storage::ObRowState &row_state);
// insert_trans_node insert the tx node for replay
// ctx is the write txn's context

View File

@ -450,6 +450,7 @@ public:
memtable_(memtable),
is_link_(false),
not_calc_checksum_(false),
is_non_unique_local_index_cb_(false),
seq_no_(),
column_cnt_(0)
{}
@ -462,6 +463,7 @@ public:
memtable_(memtable),
is_link_(cb.is_link_),
not_calc_checksum_(cb.not_calc_checksum_),
is_non_unique_local_index_cb_(cb.is_non_unique_local_index_cb_),
seq_no_(cb.seq_no_),
column_cnt_(cb.column_cnt_)
{
@ -478,7 +480,8 @@ public:
const ObRowData *old_row,
const bool is_replay,
const transaction::ObTxSEQ seq_no,
const int64_t column_cnt)
const int64_t column_cnt,
const bool is_non_unique_local_index_cb)
{
UNUSED(is_replay);
@ -501,7 +504,9 @@ public:
tnode_->set_seq_no(seq_no_);
}
column_cnt_ = column_cnt;
is_non_unique_local_index_cb_ = is_non_unique_local_index_cb;
}
bool is_non_unique_local_index_cb() const { return is_non_unique_local_index_cb_;}
bool on_memtable(const ObIMemtable * const memtable) override;
ObIMemtable *get_memtable() const override;
virtual MutatorType get_mutator_type() const override;
@ -566,6 +571,9 @@ private:
struct {
bool is_link_ : 1;
bool not_calc_checksum_ : 1;
// this flag is currently only used to skip reset_hash_holder of ObLockWaitMgr,
// but it is not set correctly in the replay path which will be fixed later.
bool is_non_unique_local_index_cb_ : 1;
};
transaction::ObTxSEQ seq_no_;
int64_t column_cnt_;

View File

@ -59,10 +59,12 @@ int check_sequence_set_violation(const concurrent_control::ObWriteFlag write_fla
// 1. reader seq no is bigger or equal than the seq no of the last statements
if (reader_seq_no < locker_seq_no) {
// Case 1: It may happens that two pdml unique index tasks insert the same
// row concurrently, so we report duplicate key under the case to prevent
// the insertion.
// row concurrently or two px update with one update(one doesnot change
// the rowkey) and one insert(one changes the rowkey), so we report
// duplicate key under the case to prevent the insertion.
if (blocksstable::ObDmlFlag::DF_INSERT == writer_dml_flag
&& blocksstable::ObDmlFlag::DF_INSERT == locker_dml_flag) {
&& (blocksstable::ObDmlFlag::DF_INSERT == locker_dml_flag
|| blocksstable::ObDmlFlag::DF_UPDATE == locker_dml_flag)) {
ret = OB_ERR_PRIMARY_KEY_DUPLICATE;
TRANS_LOG(WARN, "pdml duplicate primary key found", K(ret),
K(writer_tx_id), K(writer_dml_flag), K(writer_seq_no),

View File

@ -20,6 +20,7 @@
#include "lib/time/ob_time_utility.h"
#include "lib/worker.h"
#include "share/rc/ob_context.h"
#include "share/stat/ob_opt_stat_monitor_manager.h"
#include "storage/memtable/mvcc/ob_mvcc_engine.h"
#include "storage/memtable/mvcc/ob_mvcc_iterator.h"
@ -111,6 +112,8 @@ ObMemtable::ObMemtable()
local_allocator_(*this),
query_engine_(local_allocator_),
mvcc_engine_(),
mt_stat_(),
reported_dml_stat_(),
max_schema_version_(0),
max_data_schema_version_(0),
pending_cb_cnt_(0),
@ -139,7 +142,6 @@ ObMemtable::ObMemtable()
encrypt_meta_lock_(ObLatchIds::DEFAULT_SPIN_RWLOCK),
max_column_cnt_(0)
{
mt_stat_.reset();
migration_clog_checkpoint_scn_.set_min();
}
@ -260,6 +262,7 @@ void ObMemtable::destroy()
max_data_schema_version_ = 0;
max_column_cnt_ = 0;
mt_stat_.reset();
reported_dml_stat_.reset();
freeze_state_ = ObMemtableFreezeState::INVALID;
unsubmitted_cnt_ = 0;
logging_blocked_ = false;
@ -418,23 +421,33 @@ int ObMemtable::multi_set(
if (row_count > 1) {
ret = multi_set_(param, columns, rows, row_count, check_exist, mtk_generator, context, rows_info);
} else {
ret = set_(param, columns, rows[0], nullptr, nullptr, mtk_generator[0], context, nullptr,
check_exist && !rows_info.is_row_exist_checked(0));
ret = set_(param,
columns,
rows[0],
nullptr, /*old_row*/
nullptr, /*update_idx*/
mtk_generator[0],
check_exist,
context,
nullptr /*mvcc_row*/);
}
guard.set_memtable(this);
}
if (OB_SUCC(ret)) {
int tmp_ret = OB_SUCCESS;
if (OB_TMP_FAIL(try_report_dml_stat_(param.table_id_))) {
TRANS_LOG_RET(WARN, tmp_ret, "fail to report dml stat", K_(reported_dml_stat));
}
/*****[for deadlock]*****/
// recored this row is hold by this trans for deadlock detector
ObLockWaitMgr* p_lock_wait_mgr = MTL(ObLockWaitMgr*);
if (OB_ISNULL(p_lock_wait_mgr)) {
TRANS_LOG(WARN, "lock wait mgr is null", K(ret));
if (param.is_non_unique_local_index_) {
// no need to detect deadlock for non-unique local index table
} else {
for (int64_t idx = 0; idx < mtk_generator.count(); ++idx) {
p_lock_wait_mgr->set_hash_holder(key_.get_tablet_id(),
mtk_generator[idx],
context.store_ctx_->mvcc_acc_ctx_.get_mem_ctx()->get_tx_id());
MTL(ObLockWaitMgr*)->set_hash_holder(key_.get_tablet_id(),
mtk_generator[idx],
context.store_ctx_->mvcc_acc_ctx_.get_mem_ctx()->get_tx_id());
}
}
/***********************/
@ -456,10 +469,14 @@ int ObMemtable::check_rows_locked(
for (int64_t i = 0; OB_SUCC(ret) && i < rows_info.rowkeys_.count(); i++) {
const blocksstable::ObDatumRowkey &rowkey = rows_info.get_rowkey(i);
ObStoreRowLockState &lock_state = rows_info.get_row_lock_state(i);
if (rows_info.is_row_lock_checked(i)) {
ObRowState row_state;
if (rows_info.is_row_checked(i)) {
} else if (OB_FAIL(mtk.encode(param.get_read_info()->get_columns_desc(), &rowkey.get_store_rowkey()))) {
TRANS_LOG(WARN, "Failed to enocde memtable key", K(ret));
} else if (OB_FAIL(get_mvcc_engine().check_row_locked(ctx.mvcc_acc_ctx_, &mtk, lock_state))) {
} else if (OB_FAIL(get_mvcc_engine().check_row_locked(ctx.mvcc_acc_ctx_,
&mtk,
lock_state,
row_state))) {
TRANS_LOG(WARN, "Failed to check row lock in mvcc engine", K(ret), K(mtk));
} else if (lock_state.is_lock_decided()) {
if (lock_state.is_locked_ && lock_state.lock_trans_id_ != my_tx_id) {
@ -470,8 +487,59 @@ int ObMemtable::check_rows_locked(
rows_info.set_conflict_rowkey(i);
rows_info.set_error_code(OB_TRANSACTION_SET_VIOLATION);
break;
} else if (check_exist && !row_state.is_delete()) {
rows_info.set_conflict_rowkey(i);
rows_info.set_error_code(OB_ERR_PRIMARY_KEY_DUPLICATE);
break;
} else {
rows_info.set_row_lock_checked(i, check_exist);
rows_info.set_row_checked(i);
}
}
}
return ret;
}
// TODO(handora.qc): remove after ddl merge sstable finished
// the interface of batch_check_row_lock is not supported on ddl merge sstable
int ObMemtable::check_rows_locked_on_ddl_merge_sstable(
ObSSTable *sstable,
const bool check_exist,
const storage::ObTableIterParam &param,
storage::ObTableAccessContext &context,
ObRowsInfo &rows_info)
{
int ret = OB_SUCCESS;
ObStoreCtx &ctx = *(context.store_ctx_);
ObTransID my_tx_id = ctx.mvcc_acc_ctx_.get_tx_id();
SCN snapshot_version = ctx.mvcc_acc_ctx_.get_snapshot_version();
for (int64_t i = 0; OB_SUCC(ret) && i < rows_info.rowkeys_.count(); i++) {
const blocksstable::ObDatumRowkey &rowkey = rows_info.get_rowkey(i);
ObStoreRowLockState &lock_state = rows_info.get_row_lock_state(i);
ObRowState row_state;
if (rows_info.is_row_checked(i)) {
} else if (OB_FAIL(sstable->check_row_locked(param,
rowkey,
context,
lock_state,
row_state,
check_exist))) {
TRANS_LOG(WARN, "Failed to check row lock in sstable", K(ret), KPC(this));
} else if (lock_state.is_lock_decided()) {
if (lock_state.is_locked_ && lock_state.lock_trans_id_ != my_tx_id) {
rows_info.set_conflict_rowkey(i);
rows_info.set_error_code(OB_TRY_LOCK_ROW_CONFLICT);
break;
} else if (lock_state.trans_version_ > snapshot_version) {
rows_info.set_conflict_rowkey(i);
rows_info.set_error_code(OB_TRANSACTION_SET_VIOLATION);
break;
} else if (check_exist && !row_state.is_delete()) {
rows_info.set_conflict_rowkey(i);
rows_info.set_error_code(OB_ERR_PRIMARY_KEY_DUPLICATE);
break;
} else {
rows_info.set_row_checked(i);
}
}
}
@ -479,11 +547,12 @@ int ObMemtable::check_rows_locked(
}
int ObMemtable::set(
const storage::ObTableIterParam &param,
storage::ObTableAccessContext &context,
const common::ObIArray<share::schema::ObColDesc> &columns,
const storage::ObStoreRow &row,
const share::ObEncryptMeta *encrypt_meta)
const storage::ObTableIterParam &param,
storage::ObTableAccessContext &context,
const common::ObIArray<share::schema::ObColDesc> &columns,
const storage::ObStoreRow &row,
const share::ObEncryptMeta *encrypt_meta,
const bool check_exist)
{
int ret = OB_SUCCESS;
ObMvccWriteGuard guard(ret);
@ -518,20 +587,32 @@ int ObMemtable::set(
} else {
lib::CompatModeGuard compat_guard(mode_);
ret = set_(param, columns, row, NULL, NULL, mtk_generator[0], context);
ret = set_(param,
columns,
row,
NULL, /*old_row*/
NULL, /*update_idx*/
mtk_generator[0],
check_exist,
context,
nullptr /*mvcc_row*/);
guard.set_memtable(this);
}
if (OB_SUCC(ret)) {
int tmp_ret = OB_SUCCESS;
if (OB_TMP_FAIL(try_report_dml_stat_(param.table_id_))) {
TRANS_LOG_RET(WARN, tmp_ret, "fail to report dml stat", K_(reported_dml_stat));
}
/*****[for deadlock]*****/
// recored this row is hold by this trans for deadlock detector
ObLockWaitMgr* p_lock_wait_mgr = MTL(ObLockWaitMgr*);
if (OB_ISNULL(p_lock_wait_mgr)) {
TRANS_LOG(WARN, "lock wait mgr is null", K(ret));
if (param.is_non_unique_local_index_) {
// no need to detect deadlock for non-unique local index table
} else {
p_lock_wait_mgr->set_hash_holder(key_.get_tablet_id(),
mtk_generator[0],
context.store_ctx_->mvcc_acc_ctx_.get_mem_ctx()->get_tx_id());
MTL(ObLockWaitMgr*)->set_hash_holder(key_.get_tablet_id(),
mtk_generator[0],
context.store_ctx_->mvcc_acc_ctx_.get_mem_ctx()->get_tx_id());
}
/***********************/
}
@ -578,20 +659,31 @@ int ObMemtable::set(
} else {
lib::CompatModeGuard compat_guard(mode_);
ret = set_(param, columns, new_row, &old_row, &update_idx, mtk_generator[0], context);
ret = set_(param,
columns,
new_row,
&old_row,
&update_idx,
mtk_generator[0],
false/*check_exist*/,
context,
nullptr /*mvcc_row*/);
guard.set_memtable(this);
}
if (OB_SUCC(ret)) {
int tmp_ret = OB_SUCCESS;
if (OB_TMP_FAIL(try_report_dml_stat_(param.table_id_))) {
TRANS_LOG_RET(WARN, tmp_ret, "fail to report dml stat", K_(reported_dml_stat));
}
/*****[for deadlock]*****/
// recored this row is hold by this trans for deadlock detector
ObLockWaitMgr* p_lock_wait_mgr = MTL(ObLockWaitMgr*);
if (OB_ISNULL(p_lock_wait_mgr)) {
TRANS_LOG(WARN, "lock wait mgr is null", K(ret));
if (param.is_non_unique_local_index_) {
// no need to detect deadlock for local index table
} else {
p_lock_wait_mgr->set_hash_holder(key_.get_tablet_id(),
mtk_generator[0],
context.store_ctx_->mvcc_acc_ctx_.get_mem_ctx()->get_tx_id());
MTL(ObLockWaitMgr*)->set_hash_holder(key_.get_tablet_id(),
mtk_generator[0],
context.store_ctx_->mvcc_acc_ctx_.get_mem_ctx()->get_tx_id());
}
/***********************/
}
@ -615,6 +707,11 @@ int ObMemtable::lock(
|| row.count_ < param.get_schema_rowkey_count()) {
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(WARN, "invalid param", K(ret), K(row), K(param));
} else if (OB_UNLIKELY(param.is_non_unique_local_index_)) {
// since checking lock on non-unique local index is optimized out, so do a defensive judgment here,
// actually, there is no circumstance in where locking the index table is need.
ret = OB_NOT_SUPPORTED;
TRANS_LOG(WARN, "locking the non-unique local index is not supported", K(ret), K(row), K(param));
} else if (OB_FAIL(guard.write_auth(*context.store_ctx_))) {
TRANS_LOG(WARN, "not allow to write", K(*context.store_ctx_));
} else if (OB_FAIL(tmp_key.assign(row.cells_, param.get_schema_rowkey_count()))) {
@ -635,14 +732,9 @@ int ObMemtable::lock(
if (OB_SUCC(ret)) {
/*****[for deadlock]*****/
// recored this row is hold by this trans for deadlock detector
ObLockWaitMgr* p_lock_wait_mgr = MTL(ObLockWaitMgr*);
if (OB_ISNULL(p_lock_wait_mgr)) {
TRANS_LOG(WARN, "lock wait mgr is null", K(ret));
} else {
p_lock_wait_mgr->set_hash_holder(key_.get_tablet_id(),
mtk,
context.store_ctx_->mvcc_acc_ctx_.get_mem_ctx()->get_tx_id());
}
MTL(ObLockWaitMgr*)->set_hash_holder(key_.get_tablet_id(),
mtk,
context.store_ctx_->mvcc_acc_ctx_.get_mem_ctx()->get_tx_id());
/***********************/
}
return ret;
@ -663,6 +755,11 @@ int ObMemtable::lock(
} else if (!context.store_ctx_->mvcc_acc_ctx_.is_write() || !rowkey.is_memtable_valid()) {
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(WARN, "invalid param", K(ret), K(rowkey));
} else if (OB_UNLIKELY(param.is_non_unique_local_index_)) {
// since checking lock on non-unique local index is optimized out, so do a defensive judgment here,
// actually, there is no circumstance in where locking the index table is need.
ret = OB_NOT_SUPPORTED;
TRANS_LOG(WARN, "locking the non-unique local index is not supported", K(ret), K(param));
} else if (OB_FAIL(guard.write_auth(*context.store_ctx_))) {
TRANS_LOG(WARN, "not allow to write", K(*context.store_ctx_));
} else if (OB_FAIL(mtk.encode(param.get_read_info()->get_columns_desc(), &rowkey.get_store_rowkey()))) {
@ -680,14 +777,9 @@ int ObMemtable::lock(
if (OB_SUCC(ret)) {
/*****[for deadlock]*****/
// recored this row is hold by this trans for deadlock detector
ObLockWaitMgr* p_lock_wait_mgr = MTL(ObLockWaitMgr*);
if (OB_ISNULL(p_lock_wait_mgr)) {
TRANS_LOG(WARN, "lock wait mgr is null", K(ret));
} else {
p_lock_wait_mgr->set_hash_holder(key_.get_tablet_id(),
mtk,
context.store_ctx_->mvcc_acc_ctx_.get_mem_ctx()->get_tx_id());
}
MTL(ObLockWaitMgr*)->set_hash_holder(key_.get_tablet_id(),
mtk,
context.store_ctx_->mvcc_acc_ctx_.get_mem_ctx()->get_tx_id());
/***********************/
}
return ret;
@ -1266,13 +1358,19 @@ int ObMemtable::lock_row_on_frozen_stores_(
ObStoreRowLockState &lock_state = res.lock_state_;
ObStoreCtx &ctx = *(context.store_ctx_);
const ObTxSEQ reader_seq_no = ctx.mvcc_acc_ctx_.snapshot_.scn_;
if (OB_ISNULL(value) || !ctx.mvcc_acc_ctx_.is_write() || NULL == key) {
TRANS_LOG(WARN, "invalid param", KP(value), K(ctx), KP(key));
ret = OB_INVALID_ARGUMENT;
} else if (OB_ISNULL(ctx.table_iter_)) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "tables handle or iterator in context is null", K(ret), K(ctx));
} else if (lock_state.row_exist_decided()) {
// in the suitation, row already exists, so we need not examine any
// existance or lock status of the row in the following tables
} else if (!check_exist && value->is_lower_lock_scaned()) {
} else if (!check_exist && param.is_non_unique_local_index_) {
// skip if it is non-unique index for which the lock has been checked in primary table
} else {
ObRowState row_state;
ObStoreRowLockState tmp_lock_state;
@ -1280,6 +1378,7 @@ int ObMemtable::lock_row_on_frozen_stores_(
common::ObSEArray<ObITable *, 4> iter_tables;
ctx.table_iter_->resume();
ObTransID my_tx_id = ctx.mvcc_acc_ctx_.get_tx_id();
while (OB_SUCC(ret)) {
ObITable *table_ptr = nullptr;
if (OB_FAIL(ctx.table_iter_->get_next(table_ptr))) {
@ -1320,8 +1419,14 @@ int ObMemtable::lock_row_on_frozen_stores_(
// TRANS_LOG(WARN, "mvcc engine check row lock fail", K(ret), K(lock_state));
// }
if (OB_FAIL(mvcc_engine.check_row_locked(ctx.mvcc_acc_ctx_, key, tmp_lock_state))) {
if (OB_FAIL(mvcc_engine.check_row_locked(ctx.mvcc_acc_ctx_,
key,
tmp_lock_state,
row_state))) {
TRANS_LOG(WARN, "mvcc engine check row lock fail", K(ret), K(lock_state), K(tmp_lock_state));
} else {
TRANS_LOG(DEBUG, "check_row_locked meet memtable", K(ret), K(check_exist), K(*key), K(*memtable),
K(lock_state), K(tmp_lock_state), K(row_state));
}
} else if (stores->at(i)->is_sstable()) {
blocksstable::ObDatumRowkeyHelper rowkey_converter;
@ -1329,14 +1434,17 @@ int ObMemtable::lock_row_on_frozen_stores_(
ObITable *sstable = stores->at(i);
if (OB_FAIL(rowkey_converter.convert_datum_rowkey(key->get_rowkey()->get_rowkey(), datum_rowkey))) {
STORAGE_LOG(WARN, "Failed to convert datum rowkey", K(ret), KPC(key));
} else if (OB_FAIL(static_cast<ObSSTable *>(sstable)->check_row_locked(param, datum_rowkey,
context, tmp_lock_state,
row_state, check_exist))) {
} else if (OB_FAIL(static_cast<ObSSTable *>(sstable)->check_row_locked(param,
datum_rowkey,
context,
tmp_lock_state,
row_state,
check_exist))) {
TRANS_LOG(WARN, "sstable check row lock fail", K(ret), K(datum_rowkey), K(*key), K(lock_state),
K(tmp_lock_state), K(row_state));
}
TRANS_LOG(DEBUG, "check_row_locked meet sstable", K(ret), K(check_exist), K(*key), K(*sstable), K(lock_state), K(tmp_lock_state),
K(row_state));
TRANS_LOG(DEBUG, "check_row_locked meet sstable", K(ret), K(check_exist), K(*key), K(*sstable),
K(lock_state), K(tmp_lock_state), K(row_state));
} else {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "unknown store type", K(ret), K(*stores), K(i));
@ -1512,6 +1620,9 @@ int ObMemtable::lock_rows_on_frozen_stores_(
if (OB_ISNULL(ctx.table_iter_)) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "Table iterator in context is null", K(ret), K(ctx));
} else if (!check_exist && param.is_non_unique_local_index_) {
// skip if it is non-unique index table for which the transaction conflict has checked in primary table,
// so there is no need to check transaction conflict again.
} else {
common::ObSEArray<ObITable *, 4> iter_tables;
ctx.table_iter_->resume();
@ -1577,20 +1688,34 @@ int ObMemtable::internal_lock_rows_on_frozen_stores_(
for (int64_t i = iter_tables.count() - 2; OB_SUCC(ret) && i >= 0; i--) {
ObITable *i_table = iter_tables.at(i);
if (i_table->is_memtable()) {
auto *memtable = static_cast<ObMemtable *>(i_table);
ObMemtable *memtable = static_cast<ObMemtable *>(i_table);
if (OB_FAIL(memtable->check_rows_locked(check_exist, param, context, rows_info))) {
TRANS_LOG(WARN, "Failed to check rows locked and duplication in memtable", K(ret), K(i),
K(iter_tables));
}
} else if (i_table->is_sstable()) {
auto *sstable = static_cast<ObSSTable *>(i_table);
if (OB_FAIL(sstable->check_rows_locked(check_exist, context, max_trans_version, rows_info))) {
TRANS_LOG(WARN, "Failed to check rows locked for sstable", K(ret), K(i), K(iter_tables));
ObSSTable *sstable = static_cast<ObSSTable *>(i_table);
if (sstable->is_ddl_merge_sstable()) {
if (OB_FAIL(check_rows_locked_on_ddl_merge_sstable(sstable,
check_exist,
param,
context,
rows_info))) {
TRANS_LOG(WARN, "Failed to check rows locked for sstable", K(ret), K(i), K(iter_tables));
}
} else {
if (OB_FAIL(sstable->check_rows_locked(check_exist,
context,
max_trans_version,
rows_info))) {
TRANS_LOG(WARN, "Failed to check rows locked for sstable", K(ret), K(i), K(iter_tables));
}
}
} else {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "Unknown store type", K(ret), K(iter_tables), K(i));
}
if (OB_SUCC(ret) && (rows_info.all_rows_found() || rows_info.have_conflict())) {
break;
}
@ -2103,6 +2228,11 @@ bool ObMemtable::ready_for_flush()
bool bool_ret = ready_for_flush_();
if (bool_ret) {
int tmp_ret = OB_SUCCESS;
// dml stat is periodically reported, so need to report residual stat when freeze finished
if (OB_TMP_FAIL(report_residual_dml_stat_())) {
TRANS_LOG_RET(WARN, tmp_ret, "fail to report dml stat", K_(reported_dml_stat));
}
local_allocator_.set_frozen();
}
@ -2944,24 +3074,37 @@ int ObMemtable::multi_set_(
}
// 1. Check write conflict in memtables.
for (int64_t i = 0 ; OB_SUCC(ret) && i < row_count; ++i) {
const uint32_t permutation_idx = rows_info.get_permutation_idx(i);
if (OB_FAIL(set_(param, columns, rows[i], nullptr, nullptr, memtable_keys[i], context, &(mvcc_rows[permutation_idx]), check_exist))) {
if (OB_UNLIKELY(OB_TRY_LOCK_ROW_CONFLICT != ret && OB_TRANSACTION_SET_VIOLATION != ret)) {
TRANS_LOG(WARN, "Failed to insert new row", K(ret), K(i), K(permutation_idx), K(rows[i]));
if (OB_SUCC(ret)) {
for (int64_t i = 0; OB_SUCC(ret) && i < row_count; ++i) {
const uint32_t permutation_idx = rows_info.get_permutation_idx(i);
if (OB_FAIL(set_(param,
columns,
rows[i],
nullptr, /*old_row*/
nullptr, /*update_idx*/
memtable_keys[i],
check_exist,
context,
&(mvcc_rows[permutation_idx])))) {
if (OB_UNLIKELY(OB_TRY_LOCK_ROW_CONFLICT != ret && OB_TRANSACTION_SET_VIOLATION != ret)) {
TRANS_LOG(WARN, "Failed to insert new row", K(ret), K(i), K(permutation_idx), K(rows[i]));
}
rows_info.set_conflict_rowkey(permutation_idx);
} else {
row_size_stat += mvcc_rows[permutation_idx].write_result_.tx_node_->get_data_size();
}
rows_info.set_conflict_rowkey(permutation_idx);
} else {
row_size_stat += mvcc_rows[permutation_idx].write_result_.tx_node_->get_data_size();
}
}
if (OB_SUCC(ret)) {
for (int64_t i = 0 ; i < row_count; ++i) {
ObMvccRowAndWriteResult &result = mvcc_rows[i];
rows_info.set_row_lock_state(i, &(result.write_result_.lock_state_));
if (result.mvcc_row_->is_lower_lock_scaned()) {
if (result.write_result_.lock_state_.row_exist_decided()) {
rows_info.set_row_checked(i);
} else if (result.mvcc_row_->is_lower_lock_scaned()) {
rows_info.set_row_lock_checked(i, check_exist);
}
TRANS_LOG(DEBUG, "set row lock state", K(result), K(i), K(rows_info));
rows_info.set_row_lock_state(i, &(result.write_result_.lock_state_));
}
}
@ -2998,23 +3141,12 @@ int ObMemtable::multi_set_(
}
if (OB_TRANSACTION_SET_VIOLATION == ret) {
auto iso = ctx.mvcc_acc_ctx_.tx_desc_->get_isolation_level();
ObTxIsolationLevel iso = ctx.mvcc_acc_ctx_.tx_desc_->get_isolation_level();
if (ObTxIsolationLevel::SERIAL == iso || ObTxIsolationLevel::RR == iso) {
ret = OB_TRANS_CANNOT_SERIALIZE;
}
}
if (OB_SUCC(ret) && mvcc_rows.count() > 0) {
const blocksstable::ObDmlFlag &dml_flag = mvcc_rows[0].write_result_.tx_node_->get_dml_flag();
mt_stat_.row_size_ += row_size_stat;
if (OB_LIKELY(blocksstable::ObDmlFlag::DF_INSERT == dml_flag)) {
mt_stat_.insert_row_count_ += row_count;
} else if (blocksstable::ObDmlFlag::DF_UPDATE == dml_flag) {
mt_stat_.update_row_count_ += row_count;
} else if (blocksstable::ObDmlFlag::DF_DELETE == dml_flag) {
mt_stat_.delete_row_count_ += row_count;
}
}
return ret;
}
@ -3025,9 +3157,9 @@ int ObMemtable::set_(
const storage::ObStoreRow *old_row,
const common::ObIArray<int64_t> *update_idx,
const ObMemtableKey &mtk,
const bool check_exist,
storage::ObTableAccessContext &context,
ObMvccRowAndWriteResult *mvcc_row,
bool check_exist)
ObMvccRowAndWriteResult *mvcc_row)
{
int ret = OB_SUCCESS;
blocksstable::ObRowWriter row_writer;
@ -3071,9 +3203,14 @@ int ObMemtable::set_(
NULL == old_row ? NULL : &old_row_data,
timestamp_, /*memstore_version*/
ctx.mvcc_acc_ctx_.tx_scn_, /*seq_no*/
new_row.row_val_.count_ /*column_cnt*/
);
if (OB_FAIL(mvcc_write_(param, context, &mtk, arg, is_new_locked, mvcc_row, check_exist))) {
new_row.row_val_.count_ /*column_cnt*/);
if (OB_FAIL(mvcc_write_(param,
context,
&mtk,
arg,
check_exist,
is_new_locked,
mvcc_row))) {
if (OB_TRY_LOCK_ROW_CONFLICT != ret &&
OB_TRANSACTION_SET_VIOLATION != ret &&
OB_ERR_PRIMARY_KEY_DUPLICATE != ret) {
@ -3081,15 +3218,16 @@ int ObMemtable::set_(
}
} else {
TRANS_LOG(DEBUG, "set end, success",
"ret", ret,
"tablet_id_", key_.tablet_id_,
"dml_flag", new_row.flag_.get_dml_flag(),
"columns", strarray<ObColDesc>(columns),
"old_row", to_cstring(old_row),
"new_row", to_cstring(new_row),
"update_idx", (update_idx == NULL ? "" : to_cstring(update_idx)),
"mtd", to_cstring(mtd),
K(arg));
"ret", ret,
"tablet_id_", key_.tablet_id_,
"dml_flag", new_row.flag_.get_dml_flag(),
"columns", strarray<ObColDesc>(columns),
"old_row", to_cstring(old_row),
"new_row", to_cstring(new_row),
"update_idx", (update_idx == NULL ? "" : to_cstring(update_idx)),
"mtd", to_cstring(mtd),
K(arg),
KPC(mvcc_row));
}
}
}
@ -3156,7 +3294,13 @@ int ObMemtable::lock_(
TRANS_LOG(WARN, "meet unexpected return code in check_row_locked", K(ret), K(context), K(mtk));
}
}
} else if (OB_FAIL(mvcc_write_(param, context, &mtk, arg, is_new_locked))) {
} else if (OB_FAIL(mvcc_write_(param,
context,
&mtk,
arg,
false, /*check_exist*/
is_new_locked,
nullptr /*mvcc_row*/))) {
} else if (OB_UNLIKELY(!is_new_locked)) {
TRANS_LOG(DEBUG, "lock twice, no need to store lock trans node");
}
@ -3212,9 +3356,9 @@ int ObMemtable::mvcc_write_(
storage::ObTableAccessContext &context,
const ObMemtableKey *key,
const ObTxNodeArg &arg,
const bool check_exist,
bool &is_new_locked,
ObMvccRowAndWriteResult *mvcc_row,
bool check_exist)
ObMvccRowAndWriteResult *mvcc_row)
{
int ret = OB_SUCCESS;
bool is_new_add = false;
@ -3251,6 +3395,8 @@ int ObMemtable::mvcc_write_(
snapshot_version,
value->get_max_trans_version(),
value->get_max_trans_id());
} else if (OB_ERR_PRIMARY_KEY_DUPLICATE == ret) {
mem_ctx->on_key_duplication_retry(*key);
} else {
TRANS_LOG(WARN, "mvcc write fail", K(ret));
}
@ -3276,7 +3422,8 @@ int ObMemtable::mvcc_write_(
arg.old_row_,
this,
arg.seq_no_,
arg.column_cnt_))) {
arg.column_cnt_,
param.is_non_unique_local_index_))) {
(void)mvcc_engine_.mvcc_undo(value);
res.is_mvcc_undo_ = true;
TRANS_LOG(WARN, "register row commit failed", K(ret));
@ -3342,6 +3489,7 @@ int ObMemtable::post_row_write_conflict_(ObMvccAccessCtx &acc_ctx,
ObFunction<int(bool&, bool&)> recheck_func([&](bool &locked, bool &wait_on_row) -> int {
int ret = OB_SUCCESS;
lock_state.is_locked_ = false;
storage::ObRowState row_state;
if (lock_state.is_delayed_cleanout_) {
transaction::ObTxSEQ lock_data_sequence = lock_state.lock_data_sequence_;
storage::ObTxTableGuards &tx_table_guards = acc_ctx.get_tx_table_guards();
@ -3350,7 +3498,7 @@ int ObMemtable::post_row_write_conflict_(ObMvccAccessCtx &acc_ctx,
TRANS_LOG(WARN, "re-check row locked via tx_table fail", K(ret), K(tx_id), K(lock_state));
}
} else {
if (OB_FAIL(lock_state.mvcc_row_->check_row_locked(acc_ctx, lock_state))) {
if (OB_FAIL(lock_state.mvcc_row_->check_row_locked(acc_ctx, lock_state, row_state))) {
TRANS_LOG(WARN, "re-check row locked via mvcc_row fail", K(ret), K(tx_id), K(lock_state));
}
}
@ -3405,6 +3553,40 @@ storage::ObTabletMemtableMgr *ObMemtable::get_memtable_mgr_()
return static_cast<ObTabletMemtableMgr *>(memtable_mgr_handle_.get_memtable_mgr());
}
int ObMemtable::try_report_dml_stat_(const int64_t table_id)
{
int ret = OB_SUCCESS;
const int64_t current_ts = common::ObClockGenerator::getClock();
reported_dml_stat_.table_id_ = table_id; // record the table id for reporting residual dml stat
if (current_ts - reported_dml_stat_.last_report_time_ > ObReportedDmlStat::REPORT_INTERVAL) {
if (ATOMIC_BCAS(&reported_dml_stat_.is_reporting_, false, true)) {
// double check
if (current_ts - reported_dml_stat_.last_report_time_ > ObReportedDmlStat::REPORT_INTERVAL) {
ObOptDmlStat dml_stat;
dml_stat.tenant_id_ = MTL_ID();
dml_stat.table_id_ = table_id;
dml_stat.tablet_id_ = get_tablet_id().id();
const int64_t current_insert_row_cnt = mt_stat_.insert_row_count_;
const int64_t current_update_row_cnt = mt_stat_.update_row_count_;
const int64_t current_delete_row_cnt = mt_stat_.delete_row_count_;
dml_stat.insert_row_count_ = current_insert_row_cnt - reported_dml_stat_.insert_row_count_;
dml_stat.update_row_count_ = current_update_row_cnt - reported_dml_stat_.update_row_count_;
dml_stat.delete_row_count_ = current_delete_row_cnt - reported_dml_stat_.delete_row_count_;
if (OB_FAIL(MTL(ObOptStatMonitorManager*)->update_local_cache(dml_stat))) {
TRANS_LOG(WARN, "failed to update local cache", K(ret), K(dml_stat));
} else {
reported_dml_stat_.insert_row_count_ = current_insert_row_cnt;
reported_dml_stat_.update_row_count_ = current_update_row_cnt;
reported_dml_stat_.delete_row_count_ = current_delete_row_cnt;
reported_dml_stat_.last_report_time_ = current_ts;
}
}
ATOMIC_STORE(&reported_dml_stat_.is_reporting_, false);
}
}
return ret;
}
int ObMemtable::finish_freeze()
{
int ret = OB_SUCCESS;
@ -3416,5 +3598,31 @@ int ObMemtable::finish_freeze()
return ret;
}
int ObMemtable::report_residual_dml_stat_()
{
int ret = OB_SUCCESS;
if (reported_dml_stat_.table_id_ != OB_INVALID_ID) {
if (mt_stat_.insert_row_count_ > reported_dml_stat_.insert_row_count_ ||
mt_stat_.update_row_count_ > reported_dml_stat_.update_row_count_ ||
mt_stat_.delete_row_count_ > reported_dml_stat_.delete_row_count_) {
ObOptDmlStat dml_stat;
dml_stat.tenant_id_ = MTL_ID();
dml_stat.table_id_ = reported_dml_stat_.table_id_;
dml_stat.tablet_id_ = get_tablet_id().id();
dml_stat.insert_row_count_ = mt_stat_.insert_row_count_ - reported_dml_stat_.insert_row_count_;
dml_stat.update_row_count_ = mt_stat_.update_row_count_ - reported_dml_stat_.update_row_count_;
dml_stat.delete_row_count_ = mt_stat_.delete_row_count_ - reported_dml_stat_.delete_row_count_;
if (OB_FAIL(MTL(ObOptStatMonitorManager*)->update_local_cache(dml_stat))) {
TRANS_LOG(WARN, "failed to update local cache", K(ret), K(dml_stat), K(reported_dml_stat_));
} else {
reported_dml_stat_.insert_row_count_ = mt_stat_.insert_row_count_;
reported_dml_stat_.update_row_count_ = mt_stat_.update_row_count_;
reported_dml_stat_.delete_row_count_ = mt_stat_.delete_row_count_;
}
}
}
return ret;
}
} // namespace memtable
} // namespace ocenabase

View File

@ -29,6 +29,7 @@
#include "storage/checkpoint/ob_freeze_checkpoint.h"
#include "storage/compaction/ob_medium_compaction_mgr.h"
#include "storage/tx_storage/ob_ls_handle.h" //ObLSHandle
#include "storage/blocksstable/ob_sstable.h"
#include "storage/checkpoint/ob_checkpoint_diagnose.h"
namespace oceanbase
@ -85,6 +86,34 @@ struct ObMtStat
int64_t row_size_;
};
// report the dml stat to ObOptStatMonitorManager
struct ObReportedDmlStat
{
static constexpr int64_t REPORT_INTERVAL = 1_s;
ObReportedDmlStat() { reset(); }
~ObReportedDmlStat() = default;
void reset() {
last_report_time_ = 0;
insert_row_count_ = 0;
update_row_count_ = 0;
delete_row_count_ = 0;
table_id_ = OB_INVALID_ID;
is_reporting_ = false;
}
int64_t last_report_time_;
int64_t insert_row_count_;
int64_t update_row_count_;
int64_t delete_row_count_;
// record the table_id for report the residual dml stat when memtable freeze,
// in which case the table_id can't be acquired
int64_t table_id_;
bool is_reporting_;
TO_STRING_KV(K_(last_report_time), K_(insert_row_count),
K_(update_row_count), K_(delete_row_count), K_(table_id), K_(is_reporting));
};
struct ObMvccRowAndWriteResult
{
ObMvccRow *mvcc_row_;
@ -290,7 +319,8 @@ public:
storage::ObTableAccessContext &context,
const common::ObIArray<share::schema::ObColDesc> &columns, // TODO: remove columns
const storage::ObStoreRow &row,
const share::ObEncryptMeta *encrypt_meta);
const share::ObEncryptMeta *encrypt_meta,
const bool check_exist);
virtual int set(
const storage::ObTableIterParam &param,
storage::ObTableAccessContext &context,
@ -313,6 +343,13 @@ public:
const storage::ObTableIterParam &param,
storage::ObTableAccessContext &context,
ObRowsInfo &rows_info);
int check_rows_locked_on_ddl_merge_sstable(
blocksstable::ObSSTable *sstable,
const bool check_exist,
const storage::ObTableIterParam &param,
storage::ObTableAccessContext &context,
ObRowsInfo &rows_info);
// lock is used to lock the row(s)
// ctx is the locker tx's context, we need the tx_id, version and scn to do the concurrent control(mvcc_write)
@ -627,9 +664,9 @@ private:
storage::ObTableAccessContext &context,
const ObMemtableKey *key,
const ObTxNodeArg &arg,
const bool check_exist,
bool &is_new_locked,
ObMvccRowAndWriteResult *mvcc_row = nullptr,
bool check_exist = false);
ObMvccRowAndWriteResult *mvcc_row = nullptr);
int mvcc_replay_(storage::ObStoreCtx &ctx,
const ObMemtableKey *key,
@ -692,9 +729,9 @@ private:
const storage::ObStoreRow *old_row,
const common::ObIArray<int64_t> *update_idx,
const ObMemtableKey &mtk,
const bool check_exist,
storage::ObTableAccessContext &context,
ObMvccRowAndWriteResult *mvcc_row = nullptr,
bool check_exist = false);
ObMvccRowAndWriteResult *mvcc_row = nullptr);
int multi_set_(
const storage::ObTableIterParam &param,
const common::ObIArray<share::schema::ObColDesc> &columns,
@ -725,6 +762,8 @@ private:
const int64_t range_count,
ObIAllocator &allocator,
ObIArray<blocksstable::ObDatumRange> &sample_memtable_ranges);
int try_report_dml_stat_(const int64_t table_id);
int report_residual_dml_stat_();
private:
DISALLOW_COPY_AND_ASSIGN(ObMemtable);
@ -738,6 +777,7 @@ private:
ObQueryEngine query_engine_;
ObMvccEngine mvcc_engine_;
mutable ObMtStat mt_stat_;
mutable ObReportedDmlStat reported_dml_stat_;
int64_t max_schema_version_; // to record the max schema version of memtable & schema_change_clog
int64_t max_data_schema_version_; // to record the max schema version of write data
int64_t pending_cb_cnt_; // number of transactions have to sync log

View File

@ -310,6 +310,13 @@ void ObMemtableCtx::on_wlock_retry(const ObMemtableKey& key, const transaction::
retry_info_.on_conflict();
}
void ObMemtableCtx::on_key_duplication_retry(const ObMemtableKey& key)
{
if (retry_info_.need_print()) {
TRANS_LOG_RET(WARN, OB_SUCCESS, "primary key duplication conflict", K(key), KPC(this));
}
}
void ObMemtableCtx::on_tsc_retry(const ObMemtableKey& key,
const SCN snapshot_version,
const SCN max_trans_version,

View File

@ -430,6 +430,7 @@ public:
int clean_unlog_callbacks();
int check_tx_mem_size_overflow(bool &is_overflow);
public:
void on_key_duplication_retry(const ObMemtableKey& key);
void on_tsc_retry(const ObMemtableKey& key,
const share::SCN snapshot_version,
const share::SCN max_trans_version,

View File

@ -112,7 +112,10 @@ int ObRowConflictHandler::check_row_locked(const storage::ObTableIterParam &para
TRANS_LOG(WARN, "ObIStore is null", K(ret), K(i));
} else if (stores->at(i)->is_data_memtable()) {
ObMemtable *memtable = static_cast<ObMemtable *>(stores->at(i));
if (OB_FAIL(memtable->get_mvcc_engine().check_row_locked(ctx->mvcc_acc_ctx_, &mtk, lock_state))) {
if (OB_FAIL(memtable->get_mvcc_engine().check_row_locked(ctx->mvcc_acc_ctx_,
&mtk,
lock_state,
row_state))) {
TRANS_LOG(WARN, "mvcc engine check row lock fail", K(ret), K(mtk));
} else if (lock_state.is_locked_) {
break;
@ -176,10 +179,11 @@ int ObRowConflictHandler::check_foreign_key_constraint_for_memtable(ObMvccAccess
ObStoreRowLockState &lock_state)
{
int ret = OB_SUCCESS;
storage::ObRowState row_state;
if (OB_ISNULL(value)) {
ret = OB_BAD_NULL_ERROR;
TRANS_LOG(ERROR, "the ObMvccValueIterator is null", K(ret));
} else if (OB_FAIL(value->check_row_locked(ctx, lock_state))) {
} else if (OB_FAIL(value->check_row_locked(ctx, lock_state, row_state))) {
TRANS_LOG(WARN, "check row locked fail", K(ret), K(lock_state));
} else {
const ObTransID my_tx_id = ctx.get_tx_id();
@ -291,7 +295,8 @@ int ObRowConflictHandler::post_row_read_conflict(ObMvccAccessCtx &acc_ctx,
TRANS_LOG(WARN, "re-check row locked via tx_table fail", K(ret), K(tx_id), K(lock_state));
}
} else {
if (OB_FAIL(lock_state.mvcc_row_->check_row_locked(acc_ctx, lock_state))) {
storage::ObRowState row_state;
if (OB_FAIL(lock_state.mvcc_row_->check_row_locked(acc_ctx, lock_state, row_state))) {
TRANS_LOG(WARN, "re-check row locked via mvcc_row fail", K(ret), K(tx_id), K(lock_state));
}
}

View File

@ -139,6 +139,7 @@ void ObStoreRowLockState::reset()
lock_data_sequence_.reset();
lock_dml_flag_ = blocksstable::ObDmlFlag::DF_NOT_EXIST;
is_delayed_cleanout_ = false;
exist_flag_ = ObExistFlag::UNKNOWN;
mvcc_row_ = NULL;
trans_scn_ = SCN::max_scn();
}

View File

@ -212,18 +212,59 @@ private:
int64_t row_version_;
};
enum class ObExistFlag : uint8_t
{
EXIST = 0,
UNKNOWN = 1,
NOT_EXIST = 2,
};
static ObExistFlag extract_exist_flag_from_dml_flag(const blocksstable::ObDmlFlag dml_flag)
{
ObExistFlag exist_flag = ObExistFlag::UNKNOWN;
switch (dml_flag) {
case blocksstable::ObDmlFlag::DF_NOT_EXIST:
exist_flag = ObExistFlag::UNKNOWN;
break;
case blocksstable::ObDmlFlag::DF_LOCK:
exist_flag = ObExistFlag::EXIST;
break;
case blocksstable::ObDmlFlag::DF_UPDATE:
case blocksstable::ObDmlFlag::DF_INSERT:
exist_flag = ObExistFlag::EXIST;
break;
case blocksstable::ObDmlFlag::DF_DELETE:
exist_flag = ObExistFlag::NOT_EXIST;
break;
default:
ob_abort();
break;
}
return exist_flag;
}
struct ObStoreRowLockState
{
public:
ObStoreRowLockState()
: is_locked_(false),
trans_version_(share::SCN::min_scn()),
lock_trans_id_(),
lock_data_sequence_(),
lock_dml_flag_(blocksstable::ObDmlFlag::DF_NOT_EXIST),
is_delayed_cleanout_(false),
mvcc_row_(nullptr),
trans_scn_(share::SCN::max_scn()) {}
trans_version_(share::SCN::min_scn()),
lock_trans_id_(),
lock_data_sequence_(),
lock_dml_flag_(blocksstable::ObDmlFlag::DF_NOT_EXIST),
is_delayed_cleanout_(false),
exist_flag_(ObExistFlag::UNKNOWN),
mvcc_row_(NULL),
trans_scn_(share::SCN::max_scn()) {}
inline bool row_exist_decided() const
{
return ObExistFlag::EXIST == exist_flag_
|| ObExistFlag::NOT_EXIST == exist_flag_;
}
inline bool row_exist() const
{
return ObExistFlag::EXIST == exist_flag_;
}
inline bool is_lock_decided() const
{
return is_locked_ || !trans_version_.is_min();
@ -239,6 +280,7 @@ public:
K_(lock_data_sequence),
K_(lock_dml_flag),
K_(is_delayed_cleanout),
K_(exist_flag),
KP_(mvcc_row),
K_(trans_scn));
@ -248,6 +290,7 @@ public:
transaction::ObTxSEQ lock_data_sequence_;
blocksstable::ObDmlFlag lock_dml_flag_;
bool is_delayed_cleanout_;
ObExistFlag exist_flag_;
memtable::ObMvccRow *mvcc_row_;
share::SCN trans_scn_; // sstable takes end_scn, memtable takes scn_ of ObMvccTransNode
};

View File

@ -453,6 +453,11 @@ bool ObRelativeTable::is_storage_index_table() const
return schema_param_->is_storage_index_table();
}
bool ObRelativeTable::is_index_local_storage() const
{
return schema_param_->is_index_local_storage();
}
bool ObRelativeTable::can_read_index() const
{
return schema_param_->can_read_index();

View File

@ -86,6 +86,7 @@ public:
bool is_index_table() const;
bool is_lob_meta_table() const;
bool is_storage_index_table() const;
bool is_index_local_storage() const;
bool can_read_index() const;
bool is_unique_index() const;
bool is_domain_index() const;

View File

@ -3678,39 +3678,6 @@ int ObTablet::update_upper_trans_version(ObLS &ls, bool &is_updated)
return ret;
}
int ObTablet::insert_row(
ObRelativeTable &relative_table,
ObStoreCtx &store_ctx,
const ObColDescIArray &col_descs,
const ObStoreRow &row)
{
int ret = OB_SUCCESS;
bool b_exist = false;
common::ObIArray<transaction::ObEncryptMetaCache> *encrypt_meta_arr = NULL;
if (OB_UNLIKELY(!store_ctx.is_valid() || col_descs.count() <= 0 || !row.is_valid()
|| !relative_table.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(store_ctx), K(col_descs), K(row), K(ret));
} else {
const bool check_exists = !relative_table.is_storage_index_table()
|| relative_table.is_unique_index();
if (OB_FAIL(ret)) {
LOG_WARN("failed to get rowkey columns");
} else if (check_exists
&& OB_FAIL(rowkey_exists(relative_table, store_ctx, row.row_val_, b_exist))) {
LOG_WARN("failed to check whether row exists", K(row), K(ret));
} else if (OB_UNLIKELY(b_exist)) {
ret = OB_ERR_PRIMARY_KEY_DUPLICATE;
LOG_WARN("rowkey already exists", K(relative_table.get_table_id()), K(row), K(ret));
} else if (OB_FAIL(insert_row_without_rowkey_check(relative_table, store_ctx, col_descs, row, encrypt_meta_arr))) {
if (OB_TRY_LOCK_ROW_CONFLICT != ret) {
LOG_WARN("failed to set row", K(row), K(ret));
}
}
}
return ret;
}
int ObTablet::update_row(
ObRelativeTable &relative_table,
storage::ObStoreCtx &store_ctx,
@ -3757,7 +3724,13 @@ int ObTablet::update_row(
ObTableAccessContext context;
if (OB_FAIL(prepare_param_ctx(allocator, relative_table, store_ctx, param, context))) {
LOG_WARN("prepare param ctx fail, ", K(ret));
} else if (OB_FAIL(write_memtable->set(param, context, col_descs, update_idx, old_row, new_row, encrypt_meta))) {
} else if (OB_FAIL(write_memtable->set(param,
context,
col_descs,
update_idx,
old_row,
new_row,
encrypt_meta))) {
LOG_WARN("failed to set memtable, ", K(ret));
}
}
@ -3816,6 +3789,7 @@ int ObTablet::insert_rows(
int ObTablet::insert_row_without_rowkey_check(
ObRelativeTable &relative_table,
ObStoreCtx &store_ctx,
const bool check_exist,
const common::ObIArray<share::schema::ObColDesc> &col_descs,
const storage::ObStoreRow &row,
const common::ObIArray<transaction::ObEncryptMetaCache> *encrypt_meta_arr)
@ -3854,7 +3828,12 @@ int ObTablet::insert_row_without_rowkey_check(
ObTableAccessContext context;
if (OB_FAIL(prepare_param_ctx(allocator, relative_table, store_ctx, param, context))) {
LOG_WARN("prepare param ctx fail, ", K(ret));
} else if (OB_FAIL(write_memtable->set(param, context, col_descs, row, encrypt_meta))) {
} else if (OB_FAIL(write_memtable->set(param,
context,
col_descs,
row,
encrypt_meta,
check_exist))) {
LOG_WARN("fail to set memtable", K(ret));
}
}
@ -5810,6 +5789,8 @@ int ObTablet::prepare_param(
param.tablet_id_ = tablet_meta_.tablet_id_;
param.read_info_ = rowkey_read_info_;
param.set_tablet_handle(relative_table.get_tablet_handle());
param.is_non_unique_local_index_ = relative_table.is_storage_index_table() &&
relative_table.is_index_local_storage() && !relative_table.is_unique_index();
return ret;
}

View File

@ -316,11 +316,6 @@ public:
void trim_tablet_list();
// dml operation
int insert_row(
ObRelativeTable &relative_table,
ObStoreCtx &store_ctx,
const ObColDescIArray &col_descs,
const ObStoreRow &row);
int insert_rows(
ObRelativeTable &relative_table,
ObStoreCtx &store_ctx,
@ -333,6 +328,7 @@ public:
int insert_row_without_rowkey_check(
ObRelativeTable &relative_table,
ObStoreCtx &store_ctx,
const bool check_exist,
const ObColDescIArray &col_descs,
const storage::ObStoreRow &row,
const common::ObIArray<transaction::ObEncryptMetaCache> *encrypt_meta_arr);

View File

@ -34,6 +34,46 @@ using namespace logservice::coordinator;
namespace storage
{
void ObStoreCtxGuard::reset()
{
int ret = OB_SUCCESS;
static const int64_t WARN_TIME_US = 5 * 1000 * 1000;
if (IS_INIT) {
if (OB_NOT_NULL(handle_.get_ls())) {
if (ctx_.is_valid() && OB_FAIL(handle_.get_ls()->revert_store_ctx(ctx_))) {
LOG_WARN("revert transaction context fail", K(ret), K_(ls_id));
}
handle_.reset();
}
const int64_t guard_used_us = ObClockGenerator::getClock() - init_ts_;
if (guard_used_us >= WARN_TIME_US) {
LOG_WARN_RET(OB_ERR_TOO_MUCH_TIME, "guard used too much time", K(guard_used_us), K_(ls_id), K(lbt()));
}
ctx_.reset();
ls_id_.reset();
is_inited_ = false;
}
}
int ObStoreCtxGuard::init(const share::ObLSID &ls_id)
{
int ret = OB_SUCCESS;
if (IS_INIT) {
ret = OB_INIT_TWICE;
LOG_WARN("init twice", K(ret));
} else if (OB_UNLIKELY(!ls_id.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument(s)", K(ret), K(ls_id));
} else {
ls_id_ = ls_id;
ctx_.reset();
ctx_.ls_id_ = ls_id;
is_inited_ = true;
init_ts_ = ObClockGenerator::getClock();
}
return ret;
}
ObAccessService::ObAccessService()
: is_inited_(false),
tenant_id_(OB_INVALID_ID),
@ -361,6 +401,29 @@ int ObAccessService::table_rescan(
}
return ret;
}
int ObAccessService::get_write_store_ctx_guard(
const share::ObLSID &ls_id,
const int64_t timeout,
transaction::ObTxDesc &tx_desc,
const transaction::ObTxReadSnapshot &snapshot,
const int16_t branch_id,
ObStoreCtxGuard &ctx_guard,
const transaction::ObTxSEQ &spec_seq_no)
{
int ret = OB_SUCCESS;
ObLS *ls = nullptr;
// the write_flag is for tablet and does not need to be set here, just use default value,
// it will be set by dml param in check_write_allowed_ when doing dml operations
concurrent_control::ObWriteFlag default_write_flag;
if (OB_UNLIKELY(!ls_id.is_valid() || !tx_desc.is_valid() || !snapshot.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), K(ls_id), K(tx_desc), K(snapshot));
} else if (OB_FAIL(get_write_store_ctx_guard_(
ls_id, timeout, tx_desc, snapshot, branch_id, default_write_flag, ctx_guard, spec_seq_no))) {
LOG_WARN("fail to get write store ctx gurad", K(ret), K(ls_id), K(tx_desc));
}
return ret;
}
int ObAccessService::get_write_store_ctx_guard_(
const share::ObLSID &ls_id,
@ -414,18 +477,21 @@ int ObAccessService::get_source_ls_tx_table_guard_(
} else if (ObTabletStatus::TRANSFER_IN != user_data.tablet_status_ || !user_data.transfer_ls_id_.is_valid()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("tablet status is unexpected", K(ret), K(user_data));
} else if (ctx_guard.get_store_ctx().mvcc_acc_ctx_.get_tx_table_guards().is_src_valid()) {
// The main tablet and local index tablets use the same mvcc_acc_ctx, if the src_tx_table_guard
// has been set, you do not need to set it again and must skip start_request_for_transfer,
// because it only call end_request_for_transfer once when revert store ctx.
ObTxTableGuards &tx_table_guards = ctx_guard.get_store_ctx().mvcc_acc_ctx_.get_tx_table_guards();
if (OB_UNLIKELY(tx_table_guards.src_ls_handle_.get_ls()->get_ls_id() != user_data.transfer_ls_id_)) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("main tablet and local index tablet must have same src ls", K(ret), K(tx_table_guards), K(user_data));
}
} else {
ObLS *src_ls = nullptr;
ObLSService *ls_service = nullptr;
ObLSService *ls_service = MTL(ObLSService*);
ObLSHandle ls_handle;
ObTxTableGuard src_tx_table_guard;
if (!user_data.transfer_ls_id_.is_valid()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("table type is unexpected", K(ret), K(user_data));
} else if (OB_ISNULL(ls_service = MTL(ObLSService*))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("failed to get ObLSService from MTL", K(ret), KP(ls_service));
} else if (OB_FAIL(ls_service->get_ls(user_data.transfer_ls_id_, ls_handle, ObLSGetMod::HA_MOD))) {
if (OB_FAIL(ls_service->get_ls(user_data.transfer_ls_id_, ls_handle, ObLSGetMod::HA_MOD))) {
LOG_WARN("failed to get ls", K(ret), K(user_data));
} else if (OB_ISNULL(src_ls = ls_handle.get_ls())) {
ret = OB_ERR_UNEXPECTED;
@ -613,7 +679,7 @@ int ObAccessService::check_write_allowed_(
const ObTableLockOwnerID lock_owner(0);
const bool is_deadlock_avoid_enabled = false;
bool is_try_lock = lock_wait_timeout_ts <= 0;
int64_t abs_timeout_ts = MIN(lock_wait_timeout_ts, tx_desc.get_expire_ts());
const int64_t abs_timeout_ts = MIN(lock_wait_timeout_ts, tx_desc.get_expire_ts());
bool enable_table_lock = true;
ret = OB_E(EventTable::EN_ENABLE_TABLE_LOCK) OB_SUCCESS;
if (OB_ERR_UNEXPECTED == ret) {
@ -631,24 +697,26 @@ int ObAccessService::check_write_allowed_(
} else if (is_disk_full) {
ret = OB_USER_OUTOF_DATA_DISK_SPACE;
LOG_WARN("data disk full, you should not do io now", K(ret));
} else if (OB_FAIL(get_write_store_ctx_guard_(ls_id,
abs_timeout_ts,
tx_desc,
dml_param.snapshot_,
dml_param.branch_id_,
dml_param.write_flag_,
ctx_guard,
dml_param.spec_seq_no_))) {
LOG_WARN("get write store ctx failed", K(ret), K(ls_id), K(dml_param), K(tx_desc));
} else if (FALSE_IT(ctx_guard.get_store_ctx().tablet_id_ = tablet_id)) {
} else if (OB_ISNULL(ls = ctx_guard.get_ls_handle().get_ls())) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("ls should not be null", K(ret), K(ls_id), K_(tenant_id));
} else if (!enable_table_lock) {
// do nothing
} else {
int64_t lock_expired_ts = MIN(dml_param.timeout_, tx_desc.get_expire_ts());
if (OB_FAIL(get_lock_id(tablet_id, lock_id))) {
ObStoreCtx &store_ctx = ctx_guard.get_store_ctx();
store_ctx.tablet_id_ = tablet_id;
store_ctx.timeout_ = abs_timeout_ts;
store_ctx.mvcc_acc_ctx_.set_write_flag(dml_param.write_flag_);
store_ctx.mvcc_acc_ctx_.set_abs_lock_timeout_ts(abs_timeout_ts);
store_ctx.tablet_stat_.reset();
const int64_t lock_expired_ts = MIN(dml_param.timeout_, tx_desc.get_expire_ts());
const ObTableSchemaParam &schema_param = dml_param.table_param_->get_data_table();
const bool is_local_index_table = schema_param.is_index_table() && schema_param.is_index_local_storage();
if (!enable_table_lock) {
// do nothing
} else if (dml_param.is_direct_insert() || is_local_index_table) {
// skip table lock
} else if (OB_FAIL(get_lock_id(tablet_id, lock_id))) {
LOG_WARN("get lock id failed", K(ret), K(tablet_id));
} else if (OB_FAIL(lock_param.set(lock_id,
lock_mode,
@ -664,9 +732,8 @@ int ObAccessService::check_write_allowed_(
// so it will lead to incorrect error
lock_expired_ts))) {
LOG_WARN("get lock param failed", K(ret), K(lock_id));
} // When locking the table, the tablet is not detected to be deleted.
else if (!dml_param.is_direct_insert()
&& OB_FAIL(ls->lock(ctx_guard.get_store_ctx(), lock_param))) {
// When locking the table, the tablet is not detected to be deleted.
} else if (OB_FAIL(ls->lock(ctx_guard.get_store_ctx(), lock_param))) {
LOG_WARN("lock tablet failed", K(ret), K(lock_param));
} else {
// do nothing
@ -678,9 +745,6 @@ int ObAccessService::check_write_allowed_(
share::SCN::max_scn(), tablet_handle, ctx_guard))) {
LOG_WARN("failed to check replica allow to read", K(ret), K(tablet_id));
}
if (OB_FAIL(ret)) {
ctx_guard.reset();
}
return ret;
}
@ -696,7 +760,6 @@ int ObAccessService::delete_rows(
ACTIVE_SESSION_FLAG_SETTER_GUARD(in_storage_write);
int ret = OB_SUCCESS;
DISABLE_SQL_MEMLEAK_GUARD;
ObStoreCtxGuard ctx_guard;
ObLS *ls = nullptr;
ObLSTabletService *tablet_service = nullptr;
// Attention!!! This handle is only used for ObLSTabletService, will be reset inside ObLSTabletService.
@ -720,9 +783,9 @@ int ObAccessService::delete_rows(
dml_param.timeout_,
tx_desc,
tablet_handle,
ctx_guard))) {
*dml_param.store_ctx_guard_))) {
LOG_WARN("fail to check query allowed", K(ret), K(ls_id), K(tablet_id));
} else if (OB_ISNULL(ls = ctx_guard.get_ls_handle().get_ls())) {
} else if (OB_ISNULL(ls = dml_param.store_ctx_guard_->get_ls_handle().get_ls())) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("ls should not be null", K(ret), KP(ls));
} else if (OB_ISNULL(tablet_service = ls->get_tablet_svr())) {
@ -730,17 +793,11 @@ int ObAccessService::delete_rows(
LOG_ERROR("tablet service should not be null.", K(ret), K(ls_id));
} else {
ret = tablet_service->delete_rows(tablet_handle,
ctx_guard.get_store_ctx(),
dml_param.store_ctx_guard_->get_store_ctx(),
dml_param,
column_ids,
row_iter,
affected_rows);
if (OB_SUCC(ret)) {
int tmp_ret = audit_tablet_opt_dml_stat(dml_param,
tablet_id,
ObOptDmlStatType::TABLET_OPT_DELETE_STAT,
affected_rows);
}
}
return ret;
}
@ -756,7 +813,6 @@ int ObAccessService::put_rows(
{
ACTIVE_SESSION_FLAG_SETTER_GUARD(in_storage_write);
int ret = OB_SUCCESS;
ObStoreCtxGuard ctx_guard;
ObLS *ls = nullptr;
ObLSTabletService *tablet_service = nullptr;
// Attention!!! This handle is only used for ObLSTabletService, will be reset inside ObLSTabletService.
@ -780,9 +836,9 @@ int ObAccessService::put_rows(
dml_param.timeout_,
tx_desc,
tablet_handle,
ctx_guard))) {
*dml_param.store_ctx_guard_))) {
LOG_WARN("fail to check query allowed", K(ret), K(ls_id), K(tablet_id));
} else if (OB_ISNULL(ls = ctx_guard.get_ls_handle().get_ls())) {
} else if (OB_ISNULL(ls = dml_param.store_ctx_guard_->get_ls_handle().get_ls())) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("ls should not be null", K(ret), KP(ls));
} else if (OB_ISNULL(tablet_service = ls->get_tablet_svr())) {
@ -790,17 +846,11 @@ int ObAccessService::put_rows(
LOG_ERROR("tablet service should not be null.", K(ret), K(ls_id));
} else {
ret = tablet_service->put_rows(tablet_handle,
ctx_guard.get_store_ctx(),
dml_param.store_ctx_guard_->get_store_ctx(),
dml_param,
column_ids,
row_iter,
affected_rows);
if (OB_SUCC(ret)) {
int tmp_ret = audit_tablet_opt_dml_stat(dml_param,
tablet_id,
ObOptDmlStatType::TABLET_OPT_INSERT_STAT,
affected_rows);
}
}
return ret;
}
@ -817,7 +867,6 @@ int ObAccessService::insert_rows(
ACTIVE_SESSION_FLAG_SETTER_GUARD(in_storage_write);
int ret = OB_SUCCESS;
DISABLE_SQL_MEMLEAK_GUARD;
ObStoreCtxGuard ctx_guard;
ObLS *ls = nullptr;
ObLSTabletService *tablet_service = nullptr;
// Attention!!! This handle is only used for ObLSTabletService, will be reset inside ObLSTabletService.
@ -841,9 +890,9 @@ int ObAccessService::insert_rows(
dml_param.timeout_,
tx_desc,
tablet_handle,
ctx_guard))) {
*dml_param.store_ctx_guard_))) {
LOG_WARN("fail to check query allowed", K(ret), K(ls_id), K(tablet_id));
} else if (OB_ISNULL(ls = ctx_guard.get_ls_handle().get_ls())) {
} else if (OB_ISNULL(ls = dml_param.store_ctx_guard_->get_ls_handle().get_ls())) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("ls should not be null", K(ret), KP(ls));
} else if (OB_ISNULL(tablet_service = ls->get_tablet_svr())) {
@ -851,17 +900,11 @@ int ObAccessService::insert_rows(
LOG_ERROR("tablet service should not be null.", K(ret), K(ls_id));
} else {
ret = tablet_service->insert_rows(tablet_handle,
ctx_guard.get_store_ctx(),
dml_param.store_ctx_guard_->get_store_ctx(),
dml_param,
column_ids,
row_iter,
affected_rows);
if (OB_SUCC(ret) && !dml_param.is_direct_insert()) {
int tmp_ret = audit_tablet_opt_dml_stat(dml_param,
tablet_id,
ObOptDmlStatType::TABLET_OPT_INSERT_STAT,
affected_rows);
}
}
return ret;
}
@ -881,7 +924,6 @@ int ObAccessService::insert_row(
ACTIVE_SESSION_FLAG_SETTER_GUARD(in_storage_write);
int ret = OB_SUCCESS;
DISABLE_SQL_MEMLEAK_GUARD;
ObStoreCtxGuard ctx_guard;
ObLS *ls = nullptr;
ObLSTabletService *tablet_service = nullptr;
// Attention!!! This handle is only used for ObLSTabletService, will be reset inside ObLSTabletService.
@ -906,9 +948,9 @@ int ObAccessService::insert_row(
dml_param.timeout_,
tx_desc,
tablet_handle,
ctx_guard))) {
*dml_param.store_ctx_guard_))) {
LOG_WARN("fail to check query allowed", K(ret), K(ls_id), K(tablet_id));
} else if (OB_ISNULL(ls = ctx_guard.get_ls_handle().get_ls())) {
} else if (OB_ISNULL(ls = dml_param.store_ctx_guard_->get_ls_handle().get_ls())) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("ls should not be null", K(ret), KP(ls));
} else if (OB_ISNULL(tablet_service = ls->get_tablet_svr())) {
@ -916,7 +958,7 @@ int ObAccessService::insert_row(
LOG_ERROR("tablet service should not be null.", K(ret), K(ls_id));
} else {
ret = tablet_service->insert_row(tablet_handle,
ctx_guard.get_store_ctx(),
dml_param.store_ctx_guard_->get_store_ctx(),
dml_param,
column_ids,
duplicated_column_ids,
@ -924,12 +966,6 @@ int ObAccessService::insert_row(
flag,
affected_rows,
duplicated_rows);
if (OB_SUCC(ret)) {
int tmp_ret = audit_tablet_opt_dml_stat(dml_param,
tablet_id,
ObOptDmlStatType::TABLET_OPT_INSERT_STAT,
affected_rows);
}
}
return ret;
}
@ -957,7 +993,6 @@ int ObAccessService::update_rows(
ACTIVE_SESSION_FLAG_SETTER_GUARD(in_storage_write);
int ret = OB_SUCCESS;
DISABLE_SQL_MEMLEAK_GUARD;
ObStoreCtxGuard ctx_guard;
ObLS *ls = nullptr;
ObLSTabletService *tablet_service = nullptr;
// Attention!!! This handle is only used for ObLSTabletService, will be reset inside ObLSTabletService.
@ -981,9 +1016,9 @@ int ObAccessService::update_rows(
dml_param.timeout_,
tx_desc,
tablet_handle,
ctx_guard))) {
*dml_param.store_ctx_guard_))) {
LOG_WARN("fail to check query allowed", K(ret), K(ls_id), K(tablet_id));
} else if (OB_ISNULL(ls = ctx_guard.get_ls_handle().get_ls())) {
} else if (OB_ISNULL(ls = dml_param.store_ctx_guard_->get_ls_handle().get_ls())) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("ls should not be null", K(ret), KP(ls));
} else if (OB_ISNULL(tablet_service = ls->get_tablet_svr())) {
@ -991,18 +1026,12 @@ int ObAccessService::update_rows(
LOG_ERROR("tablet service should not be null.", K(ret), K(ls_id));
} else {
ret = tablet_service->update_rows(tablet_handle,
ctx_guard.get_store_ctx(),
dml_param.store_ctx_guard_->get_store_ctx(),
dml_param,
column_ids,
updated_column_ids,
row_iter,
affected_rows);
if (OB_SUCC(ret)) {
int tmp_ret = audit_tablet_opt_dml_stat(dml_param,
tablet_id,
ObOptDmlStatType::TABLET_OPT_UPDATE_STAT,
affected_rows);
}
}
return ret;
}
@ -1020,7 +1049,6 @@ int ObAccessService::lock_rows(
ACTIVE_SESSION_FLAG_SETTER_GUARD(in_storage_write);
int ret = OB_SUCCESS;
DISABLE_SQL_MEMLEAK_GUARD;
ObStoreCtxGuard ctx_guard;
ObLS *ls = nullptr;
ObLSTabletService *tablet_service = nullptr;
// Attention!!! This handle is only used for ObLSTabletService, will be reset inside ObLSTabletService.
@ -1044,9 +1072,9 @@ int ObAccessService::lock_rows(
lock_wait_timeout_ts,
tx_desc,
tablet_handle,
ctx_guard))) {
*dml_param.store_ctx_guard_))) {
LOG_WARN("fail to check query allowed", K(ret), K(ls_id), K(tablet_id));
} else if (OB_ISNULL(ls = ctx_guard.get_ls_handle().get_ls())) {
} else if (OB_ISNULL(ls = dml_param.store_ctx_guard_->get_ls_handle().get_ls())) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("ls should not be null", K(ret), KP(ls));
} else if (OB_ISNULL(tablet_service = ls->get_tablet_svr())) {
@ -1054,7 +1082,7 @@ int ObAccessService::lock_rows(
LOG_ERROR("tablet service should not be null.", K(ret), K(ls_id));
} else {
ret = tablet_service->lock_rows(tablet_handle,
ctx_guard.get_store_ctx(),
dml_param.store_ctx_guard_->get_store_ctx(),
dml_param,
lock_flag,
false,
@ -1075,7 +1103,6 @@ int ObAccessService::lock_row(
{
ACTIVE_SESSION_FLAG_SETTER_GUARD(in_storage_write);
int ret = OB_SUCCESS;
ObStoreCtxGuard ctx_guard;
ObLS *ls = nullptr;
ObLSTabletService *tablet_service = nullptr;
// Attention!!! This handle is only used for ObLSTabletService, will be reset inside ObLSTabletService.
@ -1099,9 +1126,9 @@ int ObAccessService::lock_row(
lock_wait_timeout_ts,
tx_desc,
tablet_handle,
ctx_guard))) {
*dml_param.store_ctx_guard_))) {
LOG_WARN("fail to check query allowed", K(ret), K(ls_id), K(tablet_id));
} else if (OB_ISNULL(ls = ctx_guard.get_ls_handle().get_ls())) {
} else if (OB_ISNULL(ls = dml_param.store_ctx_guard_->get_ls_handle().get_ls())) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("ls should not be null", K(ret), KP(ls));
} else if (OB_ISNULL(tablet_service = ls->get_tablet_svr())) {
@ -1109,7 +1136,7 @@ int ObAccessService::lock_row(
LOG_ERROR("tablet service should not be null.", K(ret), K(ls_id));
} else {
ret = tablet_service->lock_row(tablet_handle,
ctx_guard.get_store_ctx(),
dml_param.store_ctx_guard_->get_store_ctx(),
dml_param,
row,
lock_flag,
@ -1303,84 +1330,5 @@ int ObAccessService::split_multi_ranges(
return ret;
}
void ObAccessService::ObStoreCtxGuard::reset()
{
int ret = OB_SUCCESS;
static const int64_t WARN_TIME_US = 5 * 1000 * 1000;
if (IS_INIT) {
if (OB_NOT_NULL(handle_.get_ls())) {
if (ctx_.is_valid() && OB_FAIL(handle_.get_ls()->revert_store_ctx(ctx_))) {
LOG_WARN("revert transaction context fail", K(ret), K_(ls_id));
}
handle_.reset();
}
const int64_t guard_used_us = ObClockGenerator::getClock() - init_ts_;
if (guard_used_us >= WARN_TIME_US) {
LOG_WARN_RET(OB_ERR_TOO_MUCH_TIME, "guard used too much time", K(guard_used_us), K_(ls_id), K(lbt()));
}
ctx_.reset();
ls_id_.reset();
is_inited_ = false;
}
}
int ObAccessService::ObStoreCtxGuard::init(const share::ObLSID &ls_id)
{
int ret = OB_SUCCESS;
if (IS_INIT) {
ret = OB_INIT_TWICE;
LOG_WARN("init twice", K(ret));
} else if (OB_UNLIKELY(!ls_id.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument(s)", K(ret), K(ls_id));
} else {
ls_id_ = ls_id;
ctx_.reset();
ctx_.ls_id_ = ls_id;
is_inited_ = true;
init_ts_ = ObClockGenerator::getClock();
}
return ret;
}
int ObAccessService::audit_tablet_opt_dml_stat(
const ObDMLBaseParam &dml_param,
const common::ObTabletID &tablet_id,
const ObOptDmlStatType dml_stat_type,
const int64_t affected_rows)
{
int ret = OB_SUCCESS;
//static __thread int64_t last_access_ts = 0;
//if (!GCONF.enable_defensive_check() && ObClockGenerator::getClock() - last_access_ts < 1000000) {
// do nothing
if (OB_ISNULL(dml_param.table_param_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected null", K(ret), K(dml_param.table_param_));
} else if (dml_stat_type == ObOptDmlStatType::TABLET_OPT_INSERT_STAT ||
dml_stat_type == ObOptDmlStatType::TABLET_OPT_UPDATE_STAT ||
dml_stat_type == ObOptDmlStatType::TABLET_OPT_DELETE_STAT) {
ObOptDmlStat dml_stat;
dml_stat.tenant_id_ = tenant_id_;
dml_stat.table_id_ = dml_param.table_param_->get_data_table().get_table_id();
dml_stat.tablet_id_ = tablet_id.id();
if (dml_stat_type == ObOptDmlStatType::TABLET_OPT_INSERT_STAT) {
dml_stat.insert_row_count_ = affected_rows;
} else if (dml_stat_type == ObOptDmlStatType::TABLET_OPT_UPDATE_STAT) {
dml_stat.update_row_count_ = affected_rows;
} else {
dml_stat.delete_row_count_ = affected_rows;
}
if (MTL(ObOptStatMonitorManager*) != NULL) {
if (OB_FAIL(MTL(ObOptStatMonitorManager*)->update_local_cache(dml_stat))) {
LOG_WARN("failed to update local cache", K(ret));
} else {
LOG_TRACE("succeed to update dml stat local cache", K(dml_stat));
}
}
//last_access_ts = ObClockGenerator::getClock();
}
return ret;
}
}
}

View File

@ -39,6 +39,30 @@ struct ObDMLBaseParam;
class ObLSService;
class ObStoreCtx;
class ObStoreCtxGuard
{
public:
ObStoreCtxGuard() : is_inited_(false), init_ts_(0)
{
}
~ObStoreCtxGuard()
{
reset();
}
int init(const share::ObLSID &ls_id);
void reset();
ObStoreCtx &get_store_ctx() { return ctx_; }
ObLSHandle &get_ls_handle() { return handle_; }
private:
bool is_inited_;
ObStoreCtx ctx_;
share::ObLSID ls_id_;
ObLSHandle handle_;
int64_t init_ts_;
DISALLOW_COPY_AND_ASSIGN(ObStoreCtxGuard);
};
class ObAccessService : public common::ObITabletScan
{
public:
@ -49,28 +73,6 @@ public:
void destroy();
void stop();
public:
class ObStoreCtxGuard
{
public:
ObStoreCtxGuard() : is_inited_(false), init_ts_(0)
{
}
~ObStoreCtxGuard()
{
reset();
}
int init(const share::ObLSID &ls_id);
void reset();
ObStoreCtx &get_store_ctx() { return ctx_; }
ObLSHandle &get_ls_handle() { return handle_; }
private:
bool is_inited_;
ObStoreCtx ctx_;
share::ObLSID ls_id_;
ObLSHandle handle_;
int64_t init_ts_;
};
public:
// pre_check_lock
// @param [in] ls_id, this check op will be processed at which logstream.
@ -119,6 +121,14 @@ public:
const int64_t expected_task_count,
common::ObIAllocator &allocator,
common::ObArrayArray<ObStoreRange> &multi_range_split_array) override;
int get_write_store_ctx_guard(
const share::ObLSID &ls_id,
const int64_t timeout,
transaction::ObTxDesc &tx_desc,
const transaction::ObTxReadSnapshot &snapshot,
const int16_t branch_id,
ObStoreCtxGuard &ctx_guard,
const transaction::ObTxSEQ &spec_seq_no = transaction::ObTxSEQ::INVL());
// DML interface
int delete_rows(
@ -231,11 +241,6 @@ protected:
transaction::ObTxDesc &tx_desc,
ObTabletHandle &tablet_handle,
ObStoreCtxGuard &ctx_guard);
int audit_tablet_opt_dml_stat(
const ObDMLBaseParam &dml_param,
const common::ObTabletID &tablet_id,
const common::ObOptDmlStatType dml_stat_type,
const int64_t affected_rows);
int get_source_ls_tx_table_guard_(
const ObTabletHandle &tablet_handle,
ObStoreCtxGuard &ctx_guard);

View File

@ -512,9 +512,6 @@ int ObTenantFreezer::check_and_freeze_normal_data_(ObTenantFreezeCtx &ctx)
if (OB_TMP_FAIL(do_minor_freeze_(ctx))) {
LOG_WARN("[TenantFreezer] fail to do minor freeze", K(tmp_ret));
}
if (OB_TMP_FAIL(post_tx_data_freeze_request_())) {
LOG_WARN("[TenantFreezer] fail to do tx data self freeze", KR(tmp_ret));
}
}
}
return ret;

View File

@ -921,7 +921,7 @@ int ObTxDataMemtable::flush(const int64_t trace_id)
param.merge_type_ = compaction::MINI_MERGE;
param.merge_version_ = ObVersionRange::MIN_VERSION;
set_trace_id(trace_id);
if (OB_FAIL(compaction::ObScheduleDagFunc::schedule_tx_table_merge_dag(param))) {
if (OB_FAIL(compaction::ObScheduleDagFunc::schedule_tx_table_merge_dag(param, true /* is_emergency */))) {
if (OB_EAGAIN != ret && OB_SIZE_OVERFLOW != ret) {
STORAGE_LOG(WARN, "failed to schedule tablet merge dag", K(ret));
}

View File

@ -807,14 +807,14 @@ int ObTxDataTable::self_freeze_task()
{
int ret = OB_SUCCESS;
STORAGE_LOG(INFO, "start tx data table self freeze task", K(get_ls_id()));
STORAGE_LOG(DEBUG, "start tx data table self freeze task", K(get_ls_id()));
if (OB_FAIL(memtable_mgr_->flush(SCN::max_scn(), checkpoint::INVALID_TRACE_ID, true))) {
share::ObLSID ls_id = get_ls_id();
STORAGE_LOG(WARN, "self freeze of tx data memtable failed.", KR(ret), K(ls_id), KPC(memtable_mgr_));
}
STORAGE_LOG(INFO, "finish tx data table self freeze task", KR(ret), K(get_ls_id()));
STORAGE_LOG(DEBUG, "finish tx data table self freeze task", KR(ret), K(get_ls_id()));
return ret;
}

View File

@ -60,6 +60,7 @@ public:
}
bool is_valid() const { return tx_table_guard_.is_valid(); }
bool is_src_valid() const { return src_ls_handle_.is_valid() && src_tx_table_guard_.is_valid(); }
int check_row_locked(
const transaction::ObTransID &read_tx_id,

View File

@ -36,8 +36,6 @@ int MockObAccessService::insert_rows(
int64_t &affected_rows)
{
int ret = OB_SUCCESS;
ObStoreCtxGuard ctx_guard;
ObLSHandle handle;
ObTabletHandle tablet_handle;
if (OB_UNLIKELY(!ls_id.is_valid())
@ -59,11 +57,11 @@ int MockObAccessService::insert_rows(
dml_param.timeout_,
tx_desc,
tablet_handle,
ctx_guard))) {
*dml_param.store_ctx_guard_))) {
LOG_WARN("fail to check query allowed", K(ret), K(ls_id), K(tablet_id));
} else {
ret = tablet_service_->insert_rows(tablet_handle,
ctx_guard.get_store_ctx(),
dml_param.store_ctx_guard_->get_store_ctx(),
dml_param,
column_ids,
row_iter,

View File

@ -151,6 +151,10 @@ ObTxNode::ObTxNode(const int64_t ls_id,
&schema_service_,
&server_tracer_));
tenant_.set(&txs_);
OZ(fake_opt_stat_mgr_.init(tenant_id_));
tenant_.set(&fake_opt_stat_mgr_);
OZ(fake_lock_wait_mgr_.init());
tenant_.set(&fake_lock_wait_mgr_);
OZ (create_memtable_(100000, memtable_));
{
ObColDesc col_desc;
@ -662,7 +666,7 @@ int ObTxNode::write(ObTxDesc &tx,
param.read_info_ = &read_info;
context.init(query_flag, write_store_ctx, allocator, trans_version_range);
OZ(memtable_->set(param, context, columns_, row, encrypt_meta));
OZ(memtable_->set(param, context, columns_, row, encrypt_meta, false));
OZ(txs_.revert_store_ctx(write_store_ctx));
delete iter;
return ret;
@ -700,7 +704,7 @@ int ObTxNode::write_one_row(ObStoreCtx& write_store_ctx, const int64_t key, cons
read_info.init(allocator, 2, 1, false, columns_, nullptr/*storage_cols_index*/);
ObStoreRow row;
ObObj cols[2] = {ObObj(key), ObObj(value)};
row.flag_ = blocksstable::ObDmlFlag::DF_INSERT;
row.flag_ = blocksstable::ObDmlFlag::DF_UPDATE;
row.row_val_.cells_ = cols;
row.row_val_.count_ = 2;
@ -723,7 +727,7 @@ int ObTxNode::write_one_row(ObStoreCtx& write_store_ctx, const int64_t key, cons
OZ(context.init(query_flag, write_store_ctx, allocator, trans_version_range));
OZ(memtable_->set(param, context, columns_, row, encrypt_meta));
OZ(memtable_->set(param, context, columns_, row, encrypt_meta, false));
return ret;
}

View File

@ -35,6 +35,8 @@
#include "../mock_utils/basic_fake_define.h"
#include "../mock_utils/ob_fake_tx_rpc.h"
#include "share/allocator/ob_shared_memory_allocator_mgr.h"
#include "share/stat/ob_opt_stat_monitor_manager.h"
#include "storage/memtable/ob_lock_wait_mgr.h"
namespace oceanbase {
using namespace transaction;
@ -305,6 +307,8 @@ public:
ObTxNodeRole role_;
ObFakeTxLogAdapter* fake_tx_log_adapter_;
ObTabletMemtableMgr fake_memtable_mgr_;
ObOptStatMonitorManager fake_opt_stat_mgr_;
ObLockWaitMgr fake_lock_wait_mgr_;
storage::ObLS mock_ls_; // TODO mock required member on LS
common::hash::ObHashSet<int16_t> drop_msg_type_set_;
ObLSMap fake_ls_map_;