[DeadLock] fix memory leak when call multi_set()
This commit is contained in:

committed by
ant-ob-hengtang

parent
1049d14a7d
commit
a17991664f
@ -625,6 +625,7 @@ ob_set_subtarget(ob_storage concurrency_control
|
|||||||
ob_set_subtarget(ob_storage memtable
|
ob_set_subtarget(ob_storage memtable
|
||||||
memtable/ob_lock_wait_mgr.cpp
|
memtable/ob_lock_wait_mgr.cpp
|
||||||
memtable/ob_memtable.cpp
|
memtable/ob_memtable.cpp
|
||||||
|
memtable/ob_memtable_key.cpp
|
||||||
memtable/ob_memtable_compact_writer.cpp
|
memtable/ob_memtable_compact_writer.cpp
|
||||||
memtable/ob_memtable_context.cpp
|
memtable/ob_memtable_context.cpp
|
||||||
memtable/ob_memtable_interface.cpp
|
memtable/ob_memtable_interface.cpp
|
||||||
|
@ -23,6 +23,7 @@
|
|||||||
|
|
||||||
#include "storage/memtable/mvcc/ob_mvcc_engine.h"
|
#include "storage/memtable/mvcc/ob_mvcc_engine.h"
|
||||||
#include "storage/memtable/mvcc/ob_mvcc_iterator.h"
|
#include "storage/memtable/mvcc/ob_mvcc_iterator.h"
|
||||||
|
#include "storage/memtable/mvcc/ob_mvcc_row.h"
|
||||||
|
|
||||||
#include "storage/memtable/ob_memtable_compact_writer.h"
|
#include "storage/memtable/ob_memtable_compact_writer.h"
|
||||||
#include "storage/memtable/ob_memtable_iterator.h"
|
#include "storage/memtable/ob_memtable_iterator.h"
|
||||||
@ -333,6 +334,7 @@ int ObMemtable::multi_set(
|
|||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
ObMvccWriteGuard guard;
|
ObMvccWriteGuard guard;
|
||||||
|
ObMemtableKeyGenerator mtk_generator;
|
||||||
if (IS_NOT_INIT) {
|
if (IS_NOT_INIT) {
|
||||||
TRANS_LOG(WARN, "Not inited", K(*this));
|
TRANS_LOG(WARN, "Not inited", K(*this));
|
||||||
ret = OB_NOT_INIT;
|
ret = OB_NOT_INIT;
|
||||||
@ -351,6 +353,8 @@ int ObMemtable::multi_set(
|
|||||||
} else if (need_for_save(encrypt_meta) && OB_FAIL(save_encrypt_meta(param.table_id_, encrypt_meta))) {
|
} else if (need_for_save(encrypt_meta) && OB_FAIL(save_encrypt_meta(param.table_id_, encrypt_meta))) {
|
||||||
TRANS_LOG(WARN, "store encrypt meta to memtable failed", KPC(encrypt_meta), KR(ret));
|
TRANS_LOG(WARN, "store encrypt meta to memtable failed", KPC(encrypt_meta), KR(ret));
|
||||||
#endif
|
#endif
|
||||||
|
} else if (OB_FAIL(mtk_generator.init(rows, row_count, param.get_schema_rowkey_count(), columns))) {
|
||||||
|
TRANS_LOG(WARN, "fail to generate memtable keys", KPC(encrypt_meta), K(*context.store_ctx_), KR(ret));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (OB_FAIL(ret)) {
|
if (OB_FAIL(ret)) {
|
||||||
@ -359,13 +363,29 @@ int ObMemtable::multi_set(
|
|||||||
} else {
|
} else {
|
||||||
lib::CompatModeGuard compat_guard(mode_);
|
lib::CompatModeGuard compat_guard(mode_);
|
||||||
if (row_count > 1) {
|
if (row_count > 1) {
|
||||||
ret = multi_set_(param, columns, rows, row_count, check_exist, context, rows_info);
|
ret = multi_set_(param, columns, rows, row_count, check_exist, mtk_generator, context, rows_info);
|
||||||
} else {
|
} else {
|
||||||
ret = set_(param, columns, rows[0], nullptr, nullptr, context, nullptr,
|
ret = set_(param, columns, rows[0], nullptr, nullptr, mtk_generator[0], context, nullptr,
|
||||||
check_exist && !rows_info.is_row_exist_checked(0));
|
check_exist && !rows_info.is_row_exist_checked(0));
|
||||||
}
|
}
|
||||||
guard.set_memtable(this);
|
guard.set_memtable(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (OB_SUCC(ret)) {
|
||||||
|
/*****[for deadlock]*****/
|
||||||
|
// recored this row is hold by this trans for deadlock detector
|
||||||
|
ObLockWaitMgr* p_lock_wait_mgr = MTL(ObLockWaitMgr*);
|
||||||
|
if (OB_ISNULL(p_lock_wait_mgr)) {
|
||||||
|
TRANS_LOG(WARN, "lock wait mgr is null", K(ret));
|
||||||
|
} else {
|
||||||
|
for (int64_t idx = 0; idx < mtk_generator.count(); ++idx) {
|
||||||
|
p_lock_wait_mgr->set_hash_holder(key_.get_tablet_id(),
|
||||||
|
mtk_generator[idx],
|
||||||
|
context.store_ctx_->mvcc_acc_ctx_.get_mem_ctx()->get_tx_id());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
/***********************/
|
||||||
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -414,6 +434,7 @@ int ObMemtable::set(
|
|||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
ObMvccWriteGuard guard;
|
ObMvccWriteGuard guard;
|
||||||
|
ObMemtableKeyGenerator mtk_generator;
|
||||||
if (IS_NOT_INIT) {
|
if (IS_NOT_INIT) {
|
||||||
TRANS_LOG(WARN, "not init", K(*this));
|
TRANS_LOG(WARN, "not init", K(*this));
|
||||||
ret = OB_NOT_INIT;
|
ret = OB_NOT_INIT;
|
||||||
@ -434,6 +455,8 @@ int ObMemtable::set(
|
|||||||
} else if (need_for_save(encrypt_meta) && OB_FAIL(save_encrypt_meta(param.table_id_, encrypt_meta))) {
|
} else if (need_for_save(encrypt_meta) && OB_FAIL(save_encrypt_meta(param.table_id_, encrypt_meta))) {
|
||||||
TRANS_LOG(WARN, "store encrypt meta to memtable failed", KPC(encrypt_meta), KR(ret));
|
TRANS_LOG(WARN, "store encrypt meta to memtable failed", KPC(encrypt_meta), KR(ret));
|
||||||
#endif
|
#endif
|
||||||
|
} else if (OB_FAIL(mtk_generator.init(&row, 1, param.get_schema_rowkey_count(), columns))) {
|
||||||
|
TRANS_LOG(WARN, "fail to generate memtable keys", KPC(encrypt_meta), K(*context.store_ctx_), KR(ret));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (OB_FAIL(ret)) {
|
if (OB_FAIL(ret)) {
|
||||||
@ -442,9 +465,23 @@ int ObMemtable::set(
|
|||||||
} else {
|
} else {
|
||||||
lib::CompatModeGuard compat_guard(mode_);
|
lib::CompatModeGuard compat_guard(mode_);
|
||||||
|
|
||||||
ret = set_(param, columns, row, NULL, NULL, context);
|
ret = set_(param, columns, row, NULL, NULL, mtk_generator[0], context);
|
||||||
guard.set_memtable(this);
|
guard.set_memtable(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (OB_SUCC(ret)) {
|
||||||
|
/*****[for deadlock]*****/
|
||||||
|
// recored this row is hold by this trans for deadlock detector
|
||||||
|
ObLockWaitMgr* p_lock_wait_mgr = MTL(ObLockWaitMgr*);
|
||||||
|
if (OB_ISNULL(p_lock_wait_mgr)) {
|
||||||
|
TRANS_LOG(WARN, "lock wait mgr is null", K(ret));
|
||||||
|
} else {
|
||||||
|
p_lock_wait_mgr->set_hash_holder(key_.get_tablet_id(),
|
||||||
|
mtk_generator[0],
|
||||||
|
context.store_ctx_->mvcc_acc_ctx_.get_mem_ctx()->get_tx_id());
|
||||||
|
}
|
||||||
|
/***********************/
|
||||||
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -459,6 +496,7 @@ int ObMemtable::set(
|
|||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
ObMvccWriteGuard guard;
|
ObMvccWriteGuard guard;
|
||||||
|
ObMemtableKeyGenerator mtk_generator;
|
||||||
if (IS_NOT_INIT) {
|
if (IS_NOT_INIT) {
|
||||||
TRANS_LOG(WARN, "not init", K(*this));
|
TRANS_LOG(WARN, "not init", K(*this));
|
||||||
ret = OB_NOT_INIT;
|
ret = OB_NOT_INIT;
|
||||||
@ -477,6 +515,8 @@ int ObMemtable::set(
|
|||||||
} else if (need_for_save(encrypt_meta) && OB_FAIL(save_encrypt_meta(param.table_id_, encrypt_meta))) {
|
} else if (need_for_save(encrypt_meta) && OB_FAIL(save_encrypt_meta(param.table_id_, encrypt_meta))) {
|
||||||
TRANS_LOG(WARN, "store encrypt meta to memtable failed", KPC(encrypt_meta), KR(ret));
|
TRANS_LOG(WARN, "store encrypt meta to memtable failed", KPC(encrypt_meta), KR(ret));
|
||||||
#endif
|
#endif
|
||||||
|
} else if (OB_FAIL(mtk_generator.init(&new_row, 1, param.get_schema_rowkey_count(), columns))) {
|
||||||
|
TRANS_LOG(WARN, "fail to generate memtable keys", KPC(encrypt_meta), K(*context.store_ctx_), KR(ret));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (OB_FAIL(ret)){
|
if (OB_FAIL(ret)){
|
||||||
@ -485,9 +525,23 @@ int ObMemtable::set(
|
|||||||
} else {
|
} else {
|
||||||
lib::CompatModeGuard compat_guard(mode_);
|
lib::CompatModeGuard compat_guard(mode_);
|
||||||
|
|
||||||
ret = set_(param, columns, new_row, &old_row, &update_idx, context);
|
ret = set_(param, columns, new_row, &old_row, &update_idx, mtk_generator[0], context);
|
||||||
guard.set_memtable(this);
|
guard.set_memtable(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (OB_SUCC(ret)) {
|
||||||
|
/*****[for deadlock]*****/
|
||||||
|
// recored this row is hold by this trans for deadlock detector
|
||||||
|
ObLockWaitMgr* p_lock_wait_mgr = MTL(ObLockWaitMgr*);
|
||||||
|
if (OB_ISNULL(p_lock_wait_mgr)) {
|
||||||
|
TRANS_LOG(WARN, "lock wait mgr is null", K(ret));
|
||||||
|
} else {
|
||||||
|
p_lock_wait_mgr->set_hash_holder(key_.get_tablet_id(),
|
||||||
|
mtk_generator[0],
|
||||||
|
context.store_ctx_->mvcc_acc_ctx_.get_mem_ctx()->get_tx_id());
|
||||||
|
}
|
||||||
|
/***********************/
|
||||||
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -512,7 +566,9 @@ int ObMemtable::lock(
|
|||||||
TRANS_LOG(WARN, "not allow to write", K(*context.store_ctx_));
|
TRANS_LOG(WARN, "not allow to write", K(*context.store_ctx_));
|
||||||
} else if (OB_FAIL(tmp_key.assign(row.cells_, param.get_schema_rowkey_count()))) {
|
} else if (OB_FAIL(tmp_key.assign(row.cells_, param.get_schema_rowkey_count()))) {
|
||||||
TRANS_LOG(WARN, "Failed to assign rowkey", K(row), K(param));
|
TRANS_LOG(WARN, "Failed to assign rowkey", K(row), K(param));
|
||||||
} else if (OB_FAIL(lock_(param, context, tmp_key))) {
|
} else if (OB_FAIL(mtk.encode(param.get_read_info()->get_columns_desc(), &tmp_key))) {
|
||||||
|
TRANS_LOG(WARN, "encode mtk failed", K(ret), K(param));
|
||||||
|
} else if (OB_FAIL(lock_(param, context, tmp_key, mtk))) {
|
||||||
TRANS_LOG(WARN, "lock_ failed", K(ret), K(param));
|
TRANS_LOG(WARN, "lock_ failed", K(ret), K(param));
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -520,6 +576,20 @@ int ObMemtable::lock(
|
|||||||
if (OB_FAIL(ret) && (OB_TRY_LOCK_ROW_CONFLICT != ret)) {
|
if (OB_FAIL(ret) && (OB_TRY_LOCK_ROW_CONFLICT != ret)) {
|
||||||
TRANS_LOG(WARN, "lock fail", K(ret), K(row), K(mtk));
|
TRANS_LOG(WARN, "lock fail", K(ret), K(row), K(mtk));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (OB_SUCC(ret)) {
|
||||||
|
/*****[for deadlock]*****/
|
||||||
|
// recored this row is hold by this trans for deadlock detector
|
||||||
|
ObLockWaitMgr* p_lock_wait_mgr = MTL(ObLockWaitMgr*);
|
||||||
|
if (OB_ISNULL(p_lock_wait_mgr)) {
|
||||||
|
TRANS_LOG(WARN, "lock wait mgr is null", K(ret));
|
||||||
|
} else {
|
||||||
|
p_lock_wait_mgr->set_hash_holder(key_.get_tablet_id(),
|
||||||
|
mtk,
|
||||||
|
context.store_ctx_->mvcc_acc_ctx_.get_mem_ctx()->get_tx_id());
|
||||||
|
}
|
||||||
|
/***********************/
|
||||||
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -530,6 +600,7 @@ int ObMemtable::lock(
|
|||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
ObMvccWriteGuard guard;
|
ObMvccWriteGuard guard;
|
||||||
|
ObMemtableKey mtk;
|
||||||
|
|
||||||
if (IS_NOT_INIT) {
|
if (IS_NOT_INIT) {
|
||||||
TRANS_LOG(WARN, "not init", K(*this));
|
TRANS_LOG(WARN, "not init", K(*this));
|
||||||
@ -539,12 +610,29 @@ int ObMemtable::lock(
|
|||||||
TRANS_LOG(WARN, "invalid param", K(ret), K(rowkey));
|
TRANS_LOG(WARN, "invalid param", K(ret), K(rowkey));
|
||||||
} else if (OB_FAIL(guard.write_auth(*context.store_ctx_))) {
|
} else if (OB_FAIL(guard.write_auth(*context.store_ctx_))) {
|
||||||
TRANS_LOG(WARN, "not allow to write", K(*context.store_ctx_));
|
TRANS_LOG(WARN, "not allow to write", K(*context.store_ctx_));
|
||||||
} else if (OB_FAIL(lock_(param, context, rowkey.get_store_rowkey()))) {
|
} else if (OB_FAIL(mtk.encode(param.get_read_info()->get_columns_desc(), &rowkey.get_store_rowkey()))) {
|
||||||
|
TRANS_LOG(WARN, "encode mtk failed", K(ret), K(param));
|
||||||
|
} else if (OB_FAIL(lock_(param, context, rowkey.get_store_rowkey(), mtk))) {
|
||||||
|
TRANS_LOG(WARN, "lock_ failed", K(ret), K(param));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (OB_FAIL(ret) && (OB_TRY_LOCK_ROW_CONFLICT != ret) && (OB_TRANSACTION_SET_VIOLATION != ret)) {
|
if (OB_FAIL(ret) && (OB_TRY_LOCK_ROW_CONFLICT != ret) && (OB_TRANSACTION_SET_VIOLATION != ret)) {
|
||||||
TRANS_LOG(WARN, "lock fail", K(ret), K(rowkey));
|
TRANS_LOG(WARN, "lock fail", K(ret), K(rowkey));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (OB_SUCC(ret)) {
|
||||||
|
/*****[for deadlock]*****/
|
||||||
|
// recored this row is hold by this trans for deadlock detector
|
||||||
|
ObLockWaitMgr* p_lock_wait_mgr = MTL(ObLockWaitMgr*);
|
||||||
|
if (OB_ISNULL(p_lock_wait_mgr)) {
|
||||||
|
TRANS_LOG(WARN, "lock wait mgr is null", K(ret));
|
||||||
|
} else {
|
||||||
|
p_lock_wait_mgr->set_hash_holder(key_.get_tablet_id(),
|
||||||
|
mtk,
|
||||||
|
context.store_ctx_->mvcc_acc_ctx_.get_mem_ctx()->get_tx_id());
|
||||||
|
}
|
||||||
|
/***********************/
|
||||||
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1357,6 +1445,7 @@ void ObMemtable::lock_row_on_frozen_stores_on_failure(
|
|||||||
int ObMemtable::lock_rows_on_frozen_stores_(
|
int ObMemtable::lock_rows_on_frozen_stores_(
|
||||||
const bool check_exist,
|
const bool check_exist,
|
||||||
const storage::ObTableIterParam ¶m,
|
const storage::ObTableIterParam ¶m,
|
||||||
|
const ObMemtableKeyGenerator &memtable_keys,
|
||||||
storage::ObTableAccessContext &context,
|
storage::ObTableAccessContext &context,
|
||||||
ObMvccRowAndWriteResults &mvcc_rows,
|
ObMvccRowAndWriteResults &mvcc_rows,
|
||||||
ObRowsInfo &rows_info)
|
ObRowsInfo &rows_info)
|
||||||
@ -2832,6 +2921,7 @@ int ObMemtable::multi_set_(
|
|||||||
const storage::ObStoreRow *rows,
|
const storage::ObStoreRow *rows,
|
||||||
const int64_t row_count,
|
const int64_t row_count,
|
||||||
const bool check_exist,
|
const bool check_exist,
|
||||||
|
const ObMemtableKeyGenerator &memtable_keys,
|
||||||
storage::ObTableAccessContext &context,
|
storage::ObTableAccessContext &context,
|
||||||
storage::ObRowsInfo &rows_info)
|
storage::ObRowsInfo &rows_info)
|
||||||
{
|
{
|
||||||
@ -2847,7 +2937,7 @@ int ObMemtable::multi_set_(
|
|||||||
// 1. Check write conflict in memtables.
|
// 1. Check write conflict in memtables.
|
||||||
for (int64_t i = 0 ; OB_SUCC(ret) && i < row_count; ++i) {
|
for (int64_t i = 0 ; OB_SUCC(ret) && i < row_count; ++i) {
|
||||||
const uint32_t permutation_idx = rows_info.get_permutation_idx(i);
|
const uint32_t permutation_idx = rows_info.get_permutation_idx(i);
|
||||||
if (OB_FAIL(set_(param, columns, rows[i], nullptr, nullptr, context, &(mvcc_rows[permutation_idx]), check_exist))) {
|
if (OB_FAIL(set_(param, columns, rows[i], nullptr, nullptr, memtable_keys[i], context, &(mvcc_rows[permutation_idx]), check_exist))) {
|
||||||
if (OB_UNLIKELY(OB_TRY_LOCK_ROW_CONFLICT != ret && OB_TRANSACTION_SET_VIOLATION != ret)) {
|
if (OB_UNLIKELY(OB_TRY_LOCK_ROW_CONFLICT != ret && OB_TRANSACTION_SET_VIOLATION != ret)) {
|
||||||
TRANS_LOG(WARN, "Failed to insert new row", K(ret), K(i), K(permutation_idx), K(rows[i]));
|
TRANS_LOG(WARN, "Failed to insert new row", K(ret), K(i), K(permutation_idx), K(rows[i]));
|
||||||
}
|
}
|
||||||
@ -2869,8 +2959,7 @@ int ObMemtable::multi_set_(
|
|||||||
// 2. Check uniqueness constraint and write conflict in sstables.
|
// 2. Check uniqueness constraint and write conflict in sstables.
|
||||||
if (OB_FAIL(ret)) {
|
if (OB_FAIL(ret)) {
|
||||||
} else if (rows_info.all_rows_found()) {
|
} else if (rows_info.all_rows_found()) {
|
||||||
} else if (OB_FAIL(lock_rows_on_frozen_stores_(check_exist, param, context,
|
} else if (OB_FAIL(lock_rows_on_frozen_stores_(check_exist, param, memtable_keys, context, mvcc_rows, rows_info))) {
|
||||||
mvcc_rows, rows_info))) {
|
|
||||||
TRANS_LOG(WARN, "Failed to lock rows on frozen stores", K(ret));
|
TRANS_LOG(WARN, "Failed to lock rows on frozen stores", K(ret));
|
||||||
} else if (rows_info.have_conflict()) {
|
} else if (rows_info.have_conflict()) {
|
||||||
conflict_idx = rows_info.get_conflict_idx();
|
conflict_idx = rows_info.get_conflict_idx();
|
||||||
@ -2926,6 +3015,7 @@ int ObMemtable::set_(
|
|||||||
const storage::ObStoreRow &new_row,
|
const storage::ObStoreRow &new_row,
|
||||||
const storage::ObStoreRow *old_row,
|
const storage::ObStoreRow *old_row,
|
||||||
const common::ObIArray<int64_t> *update_idx,
|
const common::ObIArray<int64_t> *update_idx,
|
||||||
|
const ObMemtableKey &mtk,
|
||||||
storage::ObTableAccessContext &context,
|
storage::ObTableAccessContext &context,
|
||||||
ObMvccRowAndWriteResult *mvcc_row,
|
ObMvccRowAndWriteResult *mvcc_row,
|
||||||
bool check_exist)
|
bool check_exist)
|
||||||
@ -2935,20 +3025,12 @@ int ObMemtable::set_(
|
|||||||
char *buf = nullptr;
|
char *buf = nullptr;
|
||||||
int64_t len = 0;
|
int64_t len = 0;
|
||||||
ObRowData old_row_data;
|
ObRowData old_row_data;
|
||||||
ObStoreRowkey tmp_key;
|
|
||||||
ObMemtableKey mtk;
|
|
||||||
ObStoreCtx &ctx = *(context.store_ctx_);
|
ObStoreCtx &ctx = *(context.store_ctx_);
|
||||||
ObMemtableCtx *mem_ctx = ctx.mvcc_acc_ctx_.get_mem_ctx();
|
ObMemtableCtx *mem_ctx = ctx.mvcc_acc_ctx_.get_mem_ctx();
|
||||||
|
|
||||||
//set_begin(ctx.mvcc_acc_ctx_);
|
//set_begin(ctx.mvcc_acc_ctx_);
|
||||||
|
|
||||||
if (OB_FAIL(tmp_key.assign(new_row.row_val_.cells_,
|
if (nullptr != old_row) {
|
||||||
param.get_schema_rowkey_count()))) {
|
|
||||||
TRANS_LOG(WARN, "Failed to assign tmp rowkey", K(ret), K(new_row),
|
|
||||||
K(param.get_schema_rowkey_count()));
|
|
||||||
} else if (OB_FAIL(mtk.encode(columns, &tmp_key))) {
|
|
||||||
TRANS_LOG(WARN, "mtk encode fail", "ret", ret);
|
|
||||||
} else if (nullptr != old_row) {
|
|
||||||
char *new_buf = nullptr;
|
char *new_buf = nullptr;
|
||||||
if(OB_FAIL(row_writer.write(param.get_schema_rowkey_count(), *old_row, nullptr, buf, len))) {
|
if(OB_FAIL(row_writer.write(param.get_schema_rowkey_count(), *old_row, nullptr, buf, len))) {
|
||||||
TRANS_LOG(WARN, "Failed to write old row", K(ret), KPC(old_row));
|
TRANS_LOG(WARN, "Failed to write old row", K(ret), KPC(old_row));
|
||||||
@ -3034,18 +3116,16 @@ int ObMemtable::set_(
|
|||||||
int ObMemtable::lock_(
|
int ObMemtable::lock_(
|
||||||
const storage::ObTableIterParam ¶m,
|
const storage::ObTableIterParam ¶m,
|
||||||
storage::ObTableAccessContext &context,
|
storage::ObTableAccessContext &context,
|
||||||
const common::ObStoreRowkey &rowkey)
|
const common::ObStoreRowkey &rowkey,
|
||||||
|
const ObMemtableKey &mtk)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
bool is_new_locked = false;
|
bool is_new_locked = false;
|
||||||
blocksstable::ObRowWriter row_writer;
|
blocksstable::ObRowWriter row_writer;
|
||||||
ObMemtableKey mtk;
|
|
||||||
char *buf = NULL;
|
char *buf = NULL;
|
||||||
int64_t len = 0;
|
int64_t len = 0;
|
||||||
|
|
||||||
if (OB_FAIL(mtk.encode(param.get_read_info()->get_columns_desc(), &rowkey))) {
|
if (OB_FAIL(row_writer.write_rowkey(rowkey, buf, len))) {
|
||||||
TRANS_LOG(WARN, "mtk encode fail", "ret", ret);
|
|
||||||
} else if (OB_FAIL(row_writer.write_rowkey(rowkey, buf, len))) {
|
|
||||||
TRANS_LOG(WARN, "Failed to writer rowkey", K(ret), K(rowkey));
|
TRANS_LOG(WARN, "Failed to writer rowkey", K(ret), K(rowkey));
|
||||||
} else {
|
} else {
|
||||||
// for elr optimization
|
// for elr optimization
|
||||||
@ -3188,19 +3268,6 @@ int ObMemtable::mvcc_write_(
|
|||||||
(void)mvcc_engine_.mvcc_undo(value);
|
(void)mvcc_engine_.mvcc_undo(value);
|
||||||
res.is_mvcc_undo_ = true;
|
res.is_mvcc_undo_ = true;
|
||||||
TRANS_LOG(WARN, "register row commit failed", K(ret));
|
TRANS_LOG(WARN, "register row commit failed", K(ret));
|
||||||
} else {
|
|
||||||
is_new_locked = res.is_new_locked_;
|
|
||||||
/*****[for deadlock]*****/
|
|
||||||
if (is_new_locked) {
|
|
||||||
// recored this row is hold by this trans for deadlock detector
|
|
||||||
ObLockWaitMgr* p_lock_wait_mgr = MTL(ObLockWaitMgr*);
|
|
||||||
if (OB_ISNULL(p_lock_wait_mgr)) {
|
|
||||||
TRANS_LOG(WARN, "lock wait mgr is null", K(ret));
|
|
||||||
} else {
|
|
||||||
p_lock_wait_mgr->set_hash_holder(key_.get_tablet_id(), *key, mem_ctx->get_tx_id());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
/***********************/
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// cannot be serializable when transaction set violation
|
// cannot be serializable when transaction set violation
|
||||||
|
@ -561,6 +561,7 @@ private:
|
|||||||
int lock_rows_on_frozen_stores_(
|
int lock_rows_on_frozen_stores_(
|
||||||
const bool check_exist,
|
const bool check_exist,
|
||||||
const storage::ObTableIterParam ¶m,
|
const storage::ObTableIterParam ¶m,
|
||||||
|
const ObMemtableKeyGenerator &memtable_keys,
|
||||||
storage::ObTableAccessContext &context,
|
storage::ObTableAccessContext &context,
|
||||||
ObMvccRowAndWriteResults &mvcc_rows,
|
ObMvccRowAndWriteResults &mvcc_rows,
|
||||||
ObRowsInfo &rows_info);
|
ObRowsInfo &rows_info);
|
||||||
@ -589,6 +590,7 @@ private:
|
|||||||
const storage::ObStoreRow &new_row,
|
const storage::ObStoreRow &new_row,
|
||||||
const storage::ObStoreRow *old_row,
|
const storage::ObStoreRow *old_row,
|
||||||
const common::ObIArray<int64_t> *update_idx,
|
const common::ObIArray<int64_t> *update_idx,
|
||||||
|
const ObMemtableKey &mtk,
|
||||||
storage::ObTableAccessContext &context,
|
storage::ObTableAccessContext &context,
|
||||||
ObMvccRowAndWriteResult *mvcc_row = nullptr,
|
ObMvccRowAndWriteResult *mvcc_row = nullptr,
|
||||||
bool check_exist = false);
|
bool check_exist = false);
|
||||||
@ -598,12 +600,14 @@ private:
|
|||||||
const storage::ObStoreRow *rows,
|
const storage::ObStoreRow *rows,
|
||||||
const int64_t row_count,
|
const int64_t row_count,
|
||||||
const bool check_exist,
|
const bool check_exist,
|
||||||
|
const ObMemtableKeyGenerator &memtable_keys,
|
||||||
storage::ObTableAccessContext &context,
|
storage::ObTableAccessContext &context,
|
||||||
storage::ObRowsInfo &rows_info);
|
storage::ObRowsInfo &rows_info);
|
||||||
int lock_(
|
int lock_(
|
||||||
const storage::ObTableIterParam ¶m,
|
const storage::ObTableIterParam ¶m,
|
||||||
storage::ObTableAccessContext &context,
|
storage::ObTableAccessContext &context,
|
||||||
const common::ObStoreRowkey &rowkey);
|
const common::ObStoreRowkey &rowkey,
|
||||||
|
const ObMemtableKey &mtk);
|
||||||
|
|
||||||
int post_row_write_conflict_(ObMvccAccessCtx &acc_ctx,
|
int post_row_write_conflict_(ObMvccAccessCtx &acc_ctx,
|
||||||
const ObMemtableKey &row_key,
|
const ObMemtableKey &row_key,
|
||||||
|
120
src/storage/memtable/ob_memtable_key.cpp
Normal file
120
src/storage/memtable/ob_memtable_key.cpp
Normal file
@ -0,0 +1,120 @@
|
|||||||
|
/**
|
||||||
|
* Copyright (c) 2021 OceanBase
|
||||||
|
* OceanBase CE is licensed under Mulan PubL v2.
|
||||||
|
* You can use this software according to the terms and conditions of the Mulan PubL v2.
|
||||||
|
* You may obtain a copy of Mulan PubL v2 at:
|
||||||
|
* http://license.coscl.org.cn/MulanPubL-2.0
|
||||||
|
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
||||||
|
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
||||||
|
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
||||||
|
* See the Mulan PubL v2 for more details.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "ob_memtable_key.h"
|
||||||
|
#include "lib/ob_errno.h"
|
||||||
|
#include "rowkey/ob_rowkey.h"
|
||||||
|
#include "share/rc/ob_tenant_base.h"
|
||||||
|
#include "storage/ob_i_store.h"
|
||||||
|
|
||||||
|
namespace oceanbase
|
||||||
|
{
|
||||||
|
namespace memtable
|
||||||
|
{
|
||||||
|
|
||||||
|
constexpr int64_t ObMemtableKeyGenerator::STACK_BUFFER_SIZE;
|
||||||
|
|
||||||
|
int ObMemtableKeyGenerator::init(const storage::ObStoreRow *rows,
|
||||||
|
const int64_t row_count,
|
||||||
|
const int64_t schema_rowkey_count,
|
||||||
|
const common::ObIArray<share::schema::ObColDesc> &columns)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
if (size_ != 0) {
|
||||||
|
ret = OB_INIT_TWICE;
|
||||||
|
} else {
|
||||||
|
int64_t extra_size = row_count - STACK_BUFFER_SIZE;
|
||||||
|
if (extra_size > 0) {
|
||||||
|
if (FALSE_IT(p_extra_store_row_keys_ =
|
||||||
|
(ObStoreRowkey *)share::mtl_malloc(extra_size * (sizeof(ObStoreRowkey)), "MemTableKey"))) {
|
||||||
|
} else if (OB_ISNULL(p_extra_store_row_keys_)) {
|
||||||
|
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||||
|
TRANS_LOG(WARN, "Failed to alloc ObStoreRowkey memory", K(ret), K(row_count), K(schema_rowkey_count), K(columns), KP(MTL_CTX()),
|
||||||
|
KP(lib::ObMallocAllocator::get_instance()), KP(p_extra_store_row_keys_));
|
||||||
|
} else if (FALSE_IT(p_extra_memtable_keys_ =
|
||||||
|
(ObMemtableKey *)share::mtl_malloc(extra_size * (sizeof(ObMemtableKey)), "MemTableKey"))) {
|
||||||
|
} else if (OB_ISNULL(p_extra_memtable_keys_)) {
|
||||||
|
share::mtl_free(p_extra_store_row_keys_);
|
||||||
|
p_extra_store_row_keys_ = nullptr;
|
||||||
|
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||||
|
TRANS_LOG(WARN, "Failed to alloc ObMemtableKey memory", K(ret), K(row_count), K(schema_rowkey_count), K(columns), KP(MTL_CTX()),
|
||||||
|
KP(lib::ObMallocAllocator::get_instance()), KP(p_extra_store_row_keys_));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for (int i = 0; i < row_count && OB_SUCC(ret); ++i) {
|
||||||
|
ObStoreRowkey *p_store_row_key = i < STACK_BUFFER_SIZE ? &store_row_key_buffer_[i] : &p_extra_store_row_keys_[i - STACK_BUFFER_SIZE];
|
||||||
|
ObMemtableKey *p_memtable_key = i < STACK_BUFFER_SIZE ? &memtable_key_buffer_[i] : &p_extra_memtable_keys_[i - STACK_BUFFER_SIZE];
|
||||||
|
new (p_store_row_key) ObStoreRowkey();
|
||||||
|
new (p_memtable_key) ObMemtableKey();
|
||||||
|
if (OB_FAIL(p_store_row_key->assign(rows[i].row_val_.cells_, schema_rowkey_count))) {
|
||||||
|
p_store_row_key->~ObStoreRowkey();
|
||||||
|
TRANS_LOG(WARN, "Failed to assign tmp rowkey", K(ret), K(rows[i]), K(row_count), K(schema_rowkey_count));
|
||||||
|
} else if (OB_FAIL(p_memtable_key->encode(columns, p_store_row_key))) {
|
||||||
|
p_memtable_key->~ObMemtableKey();
|
||||||
|
p_store_row_key->~ObStoreRowkey();
|
||||||
|
TRANS_LOG(WARN, "mtk encode fail", K(ret), K(rows[i]), K(row_count), K(schema_rowkey_count));
|
||||||
|
} else {
|
||||||
|
size_++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (OB_FAIL(ret)) {
|
||||||
|
reset();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
ObMemtableKey &ObMemtableKeyGenerator::operator[](int64_t idx) {
|
||||||
|
ObMemtableKey *element = nullptr;
|
||||||
|
if (OB_UNLIKELY(idx < 0 || idx >= size_)) {
|
||||||
|
ob_abort();
|
||||||
|
}
|
||||||
|
if (idx < STACK_BUFFER_SIZE) {
|
||||||
|
element = &memtable_key_buffer_[idx];
|
||||||
|
} else {
|
||||||
|
element = &p_extra_memtable_keys_[idx - STACK_BUFFER_SIZE];
|
||||||
|
}
|
||||||
|
return *element;
|
||||||
|
}
|
||||||
|
|
||||||
|
const ObMemtableKey &ObMemtableKeyGenerator::operator[](int64_t idx) const
|
||||||
|
{
|
||||||
|
return const_cast<ObMemtableKeyGenerator *>(this)->operator[](idx);
|
||||||
|
}
|
||||||
|
|
||||||
|
void ObMemtableKeyGenerator::reset()
|
||||||
|
{
|
||||||
|
int64_t idx = size_ - 1;
|
||||||
|
for(; idx >= STACK_BUFFER_SIZE; --idx) {
|
||||||
|
p_extra_memtable_keys_[idx - STACK_BUFFER_SIZE].~ObMemtableKey();
|
||||||
|
p_extra_store_row_keys_[idx - STACK_BUFFER_SIZE].~ObStoreRowkey();
|
||||||
|
}
|
||||||
|
for(; idx >= 0; --idx) {
|
||||||
|
memtable_key_buffer_[idx].~ObMemtableKey();
|
||||||
|
store_row_key_buffer_[idx].~ObStoreRowkey();
|
||||||
|
}
|
||||||
|
if (OB_UNLIKELY(nullptr != p_extra_memtable_keys_)) {
|
||||||
|
share::mtl_free(p_extra_memtable_keys_);
|
||||||
|
}
|
||||||
|
if (OB_UNLIKELY(nullptr != p_extra_store_row_keys_)) {
|
||||||
|
share::mtl_free(p_extra_store_row_keys_);
|
||||||
|
}
|
||||||
|
new (this) ObMemtableKeyGenerator();
|
||||||
|
}
|
||||||
|
|
||||||
|
ObMemtableKeyGenerator::~ObMemtableKeyGenerator()
|
||||||
|
{
|
||||||
|
reset();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
@ -305,6 +305,30 @@ public:
|
|||||||
const common::ObStoreRowkey *rowkey_;
|
const common::ObStoreRowkey *rowkey_;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
// this is for multi_set pre alloc memory to generate memtable key
|
||||||
|
class ObMemtableKeyGenerator {// RAII
|
||||||
|
static constexpr int64_t STACK_BUFFER_SIZE = 32;
|
||||||
|
public:
|
||||||
|
ObMemtableKeyGenerator() : p_extra_store_row_keys_(nullptr), p_extra_memtable_keys_(nullptr), size_(0) {}
|
||||||
|
~ObMemtableKeyGenerator();
|
||||||
|
int init(const storage::ObStoreRow *rows,
|
||||||
|
const int64_t row_count,
|
||||||
|
const int64_t schema_rowkey_count,
|
||||||
|
const common::ObIArray<share::schema::ObColDesc> &columns);
|
||||||
|
void reset();
|
||||||
|
int64_t count() const { return size_; }
|
||||||
|
ObMemtableKey &operator[](int64_t idx);
|
||||||
|
const ObMemtableKey &operator[](int64_t idx) const;
|
||||||
|
private:
|
||||||
|
// this is for avoid memory allocation when rows not so much
|
||||||
|
ObStoreRowkey store_row_key_buffer_[STACK_BUFFER_SIZE];
|
||||||
|
ObMemtableKey memtable_key_buffer_[STACK_BUFFER_SIZE];
|
||||||
|
ObStoreRowkey *p_extra_store_row_keys_;
|
||||||
|
ObMemtableKey *p_extra_memtable_keys_;
|
||||||
|
int64_t size_;
|
||||||
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -333,7 +333,13 @@ public:
|
|||||||
|
|
||||||
context.init(query_flag, store_ctx, allocator, trans_version_range);
|
context.init(query_flag, store_ctx, allocator, trans_version_range);
|
||||||
|
|
||||||
return mt.set_(tm_->iter_param_, tm_->columns_, write_row, nullptr, nullptr, context);
|
ObStoreRowkey tmp_key;
|
||||||
|
ObMemtableKey mtk;
|
||||||
|
|
||||||
|
tmp_key.assign(write_row.row_val_.cells_, tm_->iter_param_.get_schema_rowkey_count());
|
||||||
|
mtk.encode(tm_->columns_, &tmp_key);
|
||||||
|
|
||||||
|
return mt.set_(tm_->iter_param_, tm_->columns_, write_row, nullptr, nullptr, mtk, context);
|
||||||
}
|
}
|
||||||
int write(int64_t key, int64_t val, ObMemtable &mt, int64_t snapshot_version = 1000) {
|
int write(int64_t key, int64_t val, ObMemtable &mt, int64_t snapshot_version = 1000) {
|
||||||
ObDatumRowkey row_key;
|
ObDatumRowkey row_key;
|
||||||
|
Reference in New Issue
Block a user