[DeadLock] fix memory leak when call multi_set()
This commit is contained in:
@ -625,6 +625,7 @@ ob_set_subtarget(ob_storage concurrency_control
|
||||
ob_set_subtarget(ob_storage memtable
|
||||
memtable/ob_lock_wait_mgr.cpp
|
||||
memtable/ob_memtable.cpp
|
||||
memtable/ob_memtable_key.cpp
|
||||
memtable/ob_memtable_compact_writer.cpp
|
||||
memtable/ob_memtable_context.cpp
|
||||
memtable/ob_memtable_interface.cpp
|
||||
|
@ -23,6 +23,7 @@
|
||||
|
||||
#include "storage/memtable/mvcc/ob_mvcc_engine.h"
|
||||
#include "storage/memtable/mvcc/ob_mvcc_iterator.h"
|
||||
#include "storage/memtable/mvcc/ob_mvcc_row.h"
|
||||
|
||||
#include "storage/memtable/ob_memtable_compact_writer.h"
|
||||
#include "storage/memtable/ob_memtable_iterator.h"
|
||||
@ -333,6 +334,7 @@ int ObMemtable::multi_set(
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObMvccWriteGuard guard;
|
||||
ObMemtableKeyGenerator mtk_generator;
|
||||
if (IS_NOT_INIT) {
|
||||
TRANS_LOG(WARN, "Not inited", K(*this));
|
||||
ret = OB_NOT_INIT;
|
||||
@ -351,6 +353,8 @@ int ObMemtable::multi_set(
|
||||
} else if (need_for_save(encrypt_meta) && OB_FAIL(save_encrypt_meta(param.table_id_, encrypt_meta))) {
|
||||
TRANS_LOG(WARN, "store encrypt meta to memtable failed", KPC(encrypt_meta), KR(ret));
|
||||
#endif
|
||||
} else if (OB_FAIL(mtk_generator.init(rows, row_count, param.get_schema_rowkey_count(), columns))) {
|
||||
TRANS_LOG(WARN, "fail to generate memtable keys", KPC(encrypt_meta), K(*context.store_ctx_), KR(ret));
|
||||
}
|
||||
|
||||
if (OB_FAIL(ret)) {
|
||||
@ -359,13 +363,29 @@ int ObMemtable::multi_set(
|
||||
} else {
|
||||
lib::CompatModeGuard compat_guard(mode_);
|
||||
if (row_count > 1) {
|
||||
ret = multi_set_(param, columns, rows, row_count, check_exist, context, rows_info);
|
||||
ret = multi_set_(param, columns, rows, row_count, check_exist, mtk_generator, context, rows_info);
|
||||
} else {
|
||||
ret = set_(param, columns, rows[0], nullptr, nullptr, context, nullptr,
|
||||
ret = set_(param, columns, rows[0], nullptr, nullptr, mtk_generator[0], context, nullptr,
|
||||
check_exist && !rows_info.is_row_exist_checked(0));
|
||||
}
|
||||
guard.set_memtable(this);
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
/*****[for deadlock]*****/
|
||||
// recored this row is hold by this trans for deadlock detector
|
||||
ObLockWaitMgr* p_lock_wait_mgr = MTL(ObLockWaitMgr*);
|
||||
if (OB_ISNULL(p_lock_wait_mgr)) {
|
||||
TRANS_LOG(WARN, "lock wait mgr is null", K(ret));
|
||||
} else {
|
||||
for (int64_t idx = 0; idx < mtk_generator.count(); ++idx) {
|
||||
p_lock_wait_mgr->set_hash_holder(key_.get_tablet_id(),
|
||||
mtk_generator[idx],
|
||||
context.store_ctx_->mvcc_acc_ctx_.get_mem_ctx()->get_tx_id());
|
||||
}
|
||||
}
|
||||
/***********************/
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -414,6 +434,7 @@ int ObMemtable::set(
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObMvccWriteGuard guard;
|
||||
ObMemtableKeyGenerator mtk_generator;
|
||||
if (IS_NOT_INIT) {
|
||||
TRANS_LOG(WARN, "not init", K(*this));
|
||||
ret = OB_NOT_INIT;
|
||||
@ -434,6 +455,8 @@ int ObMemtable::set(
|
||||
} else if (need_for_save(encrypt_meta) && OB_FAIL(save_encrypt_meta(param.table_id_, encrypt_meta))) {
|
||||
TRANS_LOG(WARN, "store encrypt meta to memtable failed", KPC(encrypt_meta), KR(ret));
|
||||
#endif
|
||||
} else if (OB_FAIL(mtk_generator.init(&row, 1, param.get_schema_rowkey_count(), columns))) {
|
||||
TRANS_LOG(WARN, "fail to generate memtable keys", KPC(encrypt_meta), K(*context.store_ctx_), KR(ret));
|
||||
}
|
||||
|
||||
if (OB_FAIL(ret)) {
|
||||
@ -442,9 +465,23 @@ int ObMemtable::set(
|
||||
} else {
|
||||
lib::CompatModeGuard compat_guard(mode_);
|
||||
|
||||
ret = set_(param, columns, row, NULL, NULL, context);
|
||||
ret = set_(param, columns, row, NULL, NULL, mtk_generator[0], context);
|
||||
guard.set_memtable(this);
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
/*****[for deadlock]*****/
|
||||
// recored this row is hold by this trans for deadlock detector
|
||||
ObLockWaitMgr* p_lock_wait_mgr = MTL(ObLockWaitMgr*);
|
||||
if (OB_ISNULL(p_lock_wait_mgr)) {
|
||||
TRANS_LOG(WARN, "lock wait mgr is null", K(ret));
|
||||
} else {
|
||||
p_lock_wait_mgr->set_hash_holder(key_.get_tablet_id(),
|
||||
mtk_generator[0],
|
||||
context.store_ctx_->mvcc_acc_ctx_.get_mem_ctx()->get_tx_id());
|
||||
}
|
||||
/***********************/
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -459,6 +496,7 @@ int ObMemtable::set(
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObMvccWriteGuard guard;
|
||||
ObMemtableKeyGenerator mtk_generator;
|
||||
if (IS_NOT_INIT) {
|
||||
TRANS_LOG(WARN, "not init", K(*this));
|
||||
ret = OB_NOT_INIT;
|
||||
@ -477,6 +515,8 @@ int ObMemtable::set(
|
||||
} else if (need_for_save(encrypt_meta) && OB_FAIL(save_encrypt_meta(param.table_id_, encrypt_meta))) {
|
||||
TRANS_LOG(WARN, "store encrypt meta to memtable failed", KPC(encrypt_meta), KR(ret));
|
||||
#endif
|
||||
} else if (OB_FAIL(mtk_generator.init(&new_row, 1, param.get_schema_rowkey_count(), columns))) {
|
||||
TRANS_LOG(WARN, "fail to generate memtable keys", KPC(encrypt_meta), K(*context.store_ctx_), KR(ret));
|
||||
}
|
||||
|
||||
if (OB_FAIL(ret)){
|
||||
@ -485,9 +525,23 @@ int ObMemtable::set(
|
||||
} else {
|
||||
lib::CompatModeGuard compat_guard(mode_);
|
||||
|
||||
ret = set_(param, columns, new_row, &old_row, &update_idx, context);
|
||||
ret = set_(param, columns, new_row, &old_row, &update_idx, mtk_generator[0], context);
|
||||
guard.set_memtable(this);
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
/*****[for deadlock]*****/
|
||||
// recored this row is hold by this trans for deadlock detector
|
||||
ObLockWaitMgr* p_lock_wait_mgr = MTL(ObLockWaitMgr*);
|
||||
if (OB_ISNULL(p_lock_wait_mgr)) {
|
||||
TRANS_LOG(WARN, "lock wait mgr is null", K(ret));
|
||||
} else {
|
||||
p_lock_wait_mgr->set_hash_holder(key_.get_tablet_id(),
|
||||
mtk_generator[0],
|
||||
context.store_ctx_->mvcc_acc_ctx_.get_mem_ctx()->get_tx_id());
|
||||
}
|
||||
/***********************/
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -512,7 +566,9 @@ int ObMemtable::lock(
|
||||
TRANS_LOG(WARN, "not allow to write", K(*context.store_ctx_));
|
||||
} else if (OB_FAIL(tmp_key.assign(row.cells_, param.get_schema_rowkey_count()))) {
|
||||
TRANS_LOG(WARN, "Failed to assign rowkey", K(row), K(param));
|
||||
} else if (OB_FAIL(lock_(param, context, tmp_key))) {
|
||||
} else if (OB_FAIL(mtk.encode(param.get_read_info()->get_columns_desc(), &tmp_key))) {
|
||||
TRANS_LOG(WARN, "encode mtk failed", K(ret), K(param));
|
||||
} else if (OB_FAIL(lock_(param, context, tmp_key, mtk))) {
|
||||
TRANS_LOG(WARN, "lock_ failed", K(ret), K(param));
|
||||
}
|
||||
|
||||
@ -520,6 +576,20 @@ int ObMemtable::lock(
|
||||
if (OB_FAIL(ret) && (OB_TRY_LOCK_ROW_CONFLICT != ret)) {
|
||||
TRANS_LOG(WARN, "lock fail", K(ret), K(row), K(mtk));
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
/*****[for deadlock]*****/
|
||||
// recored this row is hold by this trans for deadlock detector
|
||||
ObLockWaitMgr* p_lock_wait_mgr = MTL(ObLockWaitMgr*);
|
||||
if (OB_ISNULL(p_lock_wait_mgr)) {
|
||||
TRANS_LOG(WARN, "lock wait mgr is null", K(ret));
|
||||
} else {
|
||||
p_lock_wait_mgr->set_hash_holder(key_.get_tablet_id(),
|
||||
mtk,
|
||||
context.store_ctx_->mvcc_acc_ctx_.get_mem_ctx()->get_tx_id());
|
||||
}
|
||||
/***********************/
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -530,6 +600,7 @@ int ObMemtable::lock(
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObMvccWriteGuard guard;
|
||||
ObMemtableKey mtk;
|
||||
|
||||
if (IS_NOT_INIT) {
|
||||
TRANS_LOG(WARN, "not init", K(*this));
|
||||
@ -539,12 +610,29 @@ int ObMemtable::lock(
|
||||
TRANS_LOG(WARN, "invalid param", K(ret), K(rowkey));
|
||||
} else if (OB_FAIL(guard.write_auth(*context.store_ctx_))) {
|
||||
TRANS_LOG(WARN, "not allow to write", K(*context.store_ctx_));
|
||||
} else if (OB_FAIL(lock_(param, context, rowkey.get_store_rowkey()))) {
|
||||
} else if (OB_FAIL(mtk.encode(param.get_read_info()->get_columns_desc(), &rowkey.get_store_rowkey()))) {
|
||||
TRANS_LOG(WARN, "encode mtk failed", K(ret), K(param));
|
||||
} else if (OB_FAIL(lock_(param, context, rowkey.get_store_rowkey(), mtk))) {
|
||||
TRANS_LOG(WARN, "lock_ failed", K(ret), K(param));
|
||||
}
|
||||
|
||||
if (OB_FAIL(ret) && (OB_TRY_LOCK_ROW_CONFLICT != ret) && (OB_TRANSACTION_SET_VIOLATION != ret)) {
|
||||
TRANS_LOG(WARN, "lock fail", K(ret), K(rowkey));
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
/*****[for deadlock]*****/
|
||||
// recored this row is hold by this trans for deadlock detector
|
||||
ObLockWaitMgr* p_lock_wait_mgr = MTL(ObLockWaitMgr*);
|
||||
if (OB_ISNULL(p_lock_wait_mgr)) {
|
||||
TRANS_LOG(WARN, "lock wait mgr is null", K(ret));
|
||||
} else {
|
||||
p_lock_wait_mgr->set_hash_holder(key_.get_tablet_id(),
|
||||
mtk,
|
||||
context.store_ctx_->mvcc_acc_ctx_.get_mem_ctx()->get_tx_id());
|
||||
}
|
||||
/***********************/
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -1357,6 +1445,7 @@ void ObMemtable::lock_row_on_frozen_stores_on_failure(
|
||||
int ObMemtable::lock_rows_on_frozen_stores_(
|
||||
const bool check_exist,
|
||||
const storage::ObTableIterParam ¶m,
|
||||
const ObMemtableKeyGenerator &memtable_keys,
|
||||
storage::ObTableAccessContext &context,
|
||||
ObMvccRowAndWriteResults &mvcc_rows,
|
||||
ObRowsInfo &rows_info)
|
||||
@ -2832,6 +2921,7 @@ int ObMemtable::multi_set_(
|
||||
const storage::ObStoreRow *rows,
|
||||
const int64_t row_count,
|
||||
const bool check_exist,
|
||||
const ObMemtableKeyGenerator &memtable_keys,
|
||||
storage::ObTableAccessContext &context,
|
||||
storage::ObRowsInfo &rows_info)
|
||||
{
|
||||
@ -2847,7 +2937,7 @@ int ObMemtable::multi_set_(
|
||||
// 1. Check write conflict in memtables.
|
||||
for (int64_t i = 0 ; OB_SUCC(ret) && i < row_count; ++i) {
|
||||
const uint32_t permutation_idx = rows_info.get_permutation_idx(i);
|
||||
if (OB_FAIL(set_(param, columns, rows[i], nullptr, nullptr, context, &(mvcc_rows[permutation_idx]), check_exist))) {
|
||||
if (OB_FAIL(set_(param, columns, rows[i], nullptr, nullptr, memtable_keys[i], context, &(mvcc_rows[permutation_idx]), check_exist))) {
|
||||
if (OB_UNLIKELY(OB_TRY_LOCK_ROW_CONFLICT != ret && OB_TRANSACTION_SET_VIOLATION != ret)) {
|
||||
TRANS_LOG(WARN, "Failed to insert new row", K(ret), K(i), K(permutation_idx), K(rows[i]));
|
||||
}
|
||||
@ -2869,8 +2959,7 @@ int ObMemtable::multi_set_(
|
||||
// 2. Check uniqueness constraint and write conflict in sstables.
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (rows_info.all_rows_found()) {
|
||||
} else if (OB_FAIL(lock_rows_on_frozen_stores_(check_exist, param, context,
|
||||
mvcc_rows, rows_info))) {
|
||||
} else if (OB_FAIL(lock_rows_on_frozen_stores_(check_exist, param, memtable_keys, context, mvcc_rows, rows_info))) {
|
||||
TRANS_LOG(WARN, "Failed to lock rows on frozen stores", K(ret));
|
||||
} else if (rows_info.have_conflict()) {
|
||||
conflict_idx = rows_info.get_conflict_idx();
|
||||
@ -2926,6 +3015,7 @@ int ObMemtable::set_(
|
||||
const storage::ObStoreRow &new_row,
|
||||
const storage::ObStoreRow *old_row,
|
||||
const common::ObIArray<int64_t> *update_idx,
|
||||
const ObMemtableKey &mtk,
|
||||
storage::ObTableAccessContext &context,
|
||||
ObMvccRowAndWriteResult *mvcc_row,
|
||||
bool check_exist)
|
||||
@ -2935,20 +3025,12 @@ int ObMemtable::set_(
|
||||
char *buf = nullptr;
|
||||
int64_t len = 0;
|
||||
ObRowData old_row_data;
|
||||
ObStoreRowkey tmp_key;
|
||||
ObMemtableKey mtk;
|
||||
ObStoreCtx &ctx = *(context.store_ctx_);
|
||||
ObMemtableCtx *mem_ctx = ctx.mvcc_acc_ctx_.get_mem_ctx();
|
||||
|
||||
//set_begin(ctx.mvcc_acc_ctx_);
|
||||
|
||||
if (OB_FAIL(tmp_key.assign(new_row.row_val_.cells_,
|
||||
param.get_schema_rowkey_count()))) {
|
||||
TRANS_LOG(WARN, "Failed to assign tmp rowkey", K(ret), K(new_row),
|
||||
K(param.get_schema_rowkey_count()));
|
||||
} else if (OB_FAIL(mtk.encode(columns, &tmp_key))) {
|
||||
TRANS_LOG(WARN, "mtk encode fail", "ret", ret);
|
||||
} else if (nullptr != old_row) {
|
||||
if (nullptr != old_row) {
|
||||
char *new_buf = nullptr;
|
||||
if(OB_FAIL(row_writer.write(param.get_schema_rowkey_count(), *old_row, nullptr, buf, len))) {
|
||||
TRANS_LOG(WARN, "Failed to write old row", K(ret), KPC(old_row));
|
||||
@ -3034,18 +3116,16 @@ int ObMemtable::set_(
|
||||
int ObMemtable::lock_(
|
||||
const storage::ObTableIterParam ¶m,
|
||||
storage::ObTableAccessContext &context,
|
||||
const common::ObStoreRowkey &rowkey)
|
||||
const common::ObStoreRowkey &rowkey,
|
||||
const ObMemtableKey &mtk)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
bool is_new_locked = false;
|
||||
blocksstable::ObRowWriter row_writer;
|
||||
ObMemtableKey mtk;
|
||||
char *buf = NULL;
|
||||
int64_t len = 0;
|
||||
|
||||
if (OB_FAIL(mtk.encode(param.get_read_info()->get_columns_desc(), &rowkey))) {
|
||||
TRANS_LOG(WARN, "mtk encode fail", "ret", ret);
|
||||
} else if (OB_FAIL(row_writer.write_rowkey(rowkey, buf, len))) {
|
||||
if (OB_FAIL(row_writer.write_rowkey(rowkey, buf, len))) {
|
||||
TRANS_LOG(WARN, "Failed to writer rowkey", K(ret), K(rowkey));
|
||||
} else {
|
||||
// for elr optimization
|
||||
@ -3188,19 +3268,6 @@ int ObMemtable::mvcc_write_(
|
||||
(void)mvcc_engine_.mvcc_undo(value);
|
||||
res.is_mvcc_undo_ = true;
|
||||
TRANS_LOG(WARN, "register row commit failed", K(ret));
|
||||
} else {
|
||||
is_new_locked = res.is_new_locked_;
|
||||
/*****[for deadlock]*****/
|
||||
if (is_new_locked) {
|
||||
// recored this row is hold by this trans for deadlock detector
|
||||
ObLockWaitMgr* p_lock_wait_mgr = MTL(ObLockWaitMgr*);
|
||||
if (OB_ISNULL(p_lock_wait_mgr)) {
|
||||
TRANS_LOG(WARN, "lock wait mgr is null", K(ret));
|
||||
} else {
|
||||
p_lock_wait_mgr->set_hash_holder(key_.get_tablet_id(), *key, mem_ctx->get_tx_id());
|
||||
}
|
||||
}
|
||||
/***********************/
|
||||
}
|
||||
|
||||
// cannot be serializable when transaction set violation
|
||||
|
@ -561,6 +561,7 @@ private:
|
||||
int lock_rows_on_frozen_stores_(
|
||||
const bool check_exist,
|
||||
const storage::ObTableIterParam ¶m,
|
||||
const ObMemtableKeyGenerator &memtable_keys,
|
||||
storage::ObTableAccessContext &context,
|
||||
ObMvccRowAndWriteResults &mvcc_rows,
|
||||
ObRowsInfo &rows_info);
|
||||
@ -589,6 +590,7 @@ private:
|
||||
const storage::ObStoreRow &new_row,
|
||||
const storage::ObStoreRow *old_row,
|
||||
const common::ObIArray<int64_t> *update_idx,
|
||||
const ObMemtableKey &mtk,
|
||||
storage::ObTableAccessContext &context,
|
||||
ObMvccRowAndWriteResult *mvcc_row = nullptr,
|
||||
bool check_exist = false);
|
||||
@ -598,12 +600,14 @@ private:
|
||||
const storage::ObStoreRow *rows,
|
||||
const int64_t row_count,
|
||||
const bool check_exist,
|
||||
const ObMemtableKeyGenerator &memtable_keys,
|
||||
storage::ObTableAccessContext &context,
|
||||
storage::ObRowsInfo &rows_info);
|
||||
int lock_(
|
||||
const storage::ObTableIterParam ¶m,
|
||||
storage::ObTableAccessContext &context,
|
||||
const common::ObStoreRowkey &rowkey);
|
||||
const common::ObStoreRowkey &rowkey,
|
||||
const ObMemtableKey &mtk);
|
||||
|
||||
int post_row_write_conflict_(ObMvccAccessCtx &acc_ctx,
|
||||
const ObMemtableKey &row_key,
|
||||
|
120
src/storage/memtable/ob_memtable_key.cpp
Normal file
120
src/storage/memtable/ob_memtable_key.cpp
Normal file
@ -0,0 +1,120 @@
|
||||
/**
|
||||
* Copyright (c) 2021 OceanBase
|
||||
* OceanBase CE is licensed under Mulan PubL v2.
|
||||
* You can use this software according to the terms and conditions of the Mulan PubL v2.
|
||||
* You may obtain a copy of Mulan PubL v2 at:
|
||||
* http://license.coscl.org.cn/MulanPubL-2.0
|
||||
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
||||
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
||||
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
||||
* See the Mulan PubL v2 for more details.
|
||||
*/
|
||||
|
||||
#include "ob_memtable_key.h"
|
||||
#include "lib/ob_errno.h"
|
||||
#include "rowkey/ob_rowkey.h"
|
||||
#include "share/rc/ob_tenant_base.h"
|
||||
#include "storage/ob_i_store.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
namespace memtable
|
||||
{
|
||||
|
||||
constexpr int64_t ObMemtableKeyGenerator::STACK_BUFFER_SIZE;
|
||||
|
||||
int ObMemtableKeyGenerator::init(const storage::ObStoreRow *rows,
|
||||
const int64_t row_count,
|
||||
const int64_t schema_rowkey_count,
|
||||
const common::ObIArray<share::schema::ObColDesc> &columns)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (size_ != 0) {
|
||||
ret = OB_INIT_TWICE;
|
||||
} else {
|
||||
int64_t extra_size = row_count - STACK_BUFFER_SIZE;
|
||||
if (extra_size > 0) {
|
||||
if (FALSE_IT(p_extra_store_row_keys_ =
|
||||
(ObStoreRowkey *)share::mtl_malloc(extra_size * (sizeof(ObStoreRowkey)), "MemTableKey"))) {
|
||||
} else if (OB_ISNULL(p_extra_store_row_keys_)) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
TRANS_LOG(WARN, "Failed to alloc ObStoreRowkey memory", K(ret), K(row_count), K(schema_rowkey_count), K(columns), KP(MTL_CTX()),
|
||||
KP(lib::ObMallocAllocator::get_instance()), KP(p_extra_store_row_keys_));
|
||||
} else if (FALSE_IT(p_extra_memtable_keys_ =
|
||||
(ObMemtableKey *)share::mtl_malloc(extra_size * (sizeof(ObMemtableKey)), "MemTableKey"))) {
|
||||
} else if (OB_ISNULL(p_extra_memtable_keys_)) {
|
||||
share::mtl_free(p_extra_store_row_keys_);
|
||||
p_extra_store_row_keys_ = nullptr;
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
TRANS_LOG(WARN, "Failed to alloc ObMemtableKey memory", K(ret), K(row_count), K(schema_rowkey_count), K(columns), KP(MTL_CTX()),
|
||||
KP(lib::ObMallocAllocator::get_instance()), KP(p_extra_store_row_keys_));
|
||||
}
|
||||
}
|
||||
for (int i = 0; i < row_count && OB_SUCC(ret); ++i) {
|
||||
ObStoreRowkey *p_store_row_key = i < STACK_BUFFER_SIZE ? &store_row_key_buffer_[i] : &p_extra_store_row_keys_[i - STACK_BUFFER_SIZE];
|
||||
ObMemtableKey *p_memtable_key = i < STACK_BUFFER_SIZE ? &memtable_key_buffer_[i] : &p_extra_memtable_keys_[i - STACK_BUFFER_SIZE];
|
||||
new (p_store_row_key) ObStoreRowkey();
|
||||
new (p_memtable_key) ObMemtableKey();
|
||||
if (OB_FAIL(p_store_row_key->assign(rows[i].row_val_.cells_, schema_rowkey_count))) {
|
||||
p_store_row_key->~ObStoreRowkey();
|
||||
TRANS_LOG(WARN, "Failed to assign tmp rowkey", K(ret), K(rows[i]), K(row_count), K(schema_rowkey_count));
|
||||
} else if (OB_FAIL(p_memtable_key->encode(columns, p_store_row_key))) {
|
||||
p_memtable_key->~ObMemtableKey();
|
||||
p_store_row_key->~ObStoreRowkey();
|
||||
TRANS_LOG(WARN, "mtk encode fail", K(ret), K(rows[i]), K(row_count), K(schema_rowkey_count));
|
||||
} else {
|
||||
size_++;
|
||||
}
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
reset();
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
ObMemtableKey &ObMemtableKeyGenerator::operator[](int64_t idx) {
|
||||
ObMemtableKey *element = nullptr;
|
||||
if (OB_UNLIKELY(idx < 0 || idx >= size_)) {
|
||||
ob_abort();
|
||||
}
|
||||
if (idx < STACK_BUFFER_SIZE) {
|
||||
element = &memtable_key_buffer_[idx];
|
||||
} else {
|
||||
element = &p_extra_memtable_keys_[idx - STACK_BUFFER_SIZE];
|
||||
}
|
||||
return *element;
|
||||
}
|
||||
|
||||
const ObMemtableKey &ObMemtableKeyGenerator::operator[](int64_t idx) const
|
||||
{
|
||||
return const_cast<ObMemtableKeyGenerator *>(this)->operator[](idx);
|
||||
}
|
||||
|
||||
void ObMemtableKeyGenerator::reset()
|
||||
{
|
||||
int64_t idx = size_ - 1;
|
||||
for(; idx >= STACK_BUFFER_SIZE; --idx) {
|
||||
p_extra_memtable_keys_[idx - STACK_BUFFER_SIZE].~ObMemtableKey();
|
||||
p_extra_store_row_keys_[idx - STACK_BUFFER_SIZE].~ObStoreRowkey();
|
||||
}
|
||||
for(; idx >= 0; --idx) {
|
||||
memtable_key_buffer_[idx].~ObMemtableKey();
|
||||
store_row_key_buffer_[idx].~ObStoreRowkey();
|
||||
}
|
||||
if (OB_UNLIKELY(nullptr != p_extra_memtable_keys_)) {
|
||||
share::mtl_free(p_extra_memtable_keys_);
|
||||
}
|
||||
if (OB_UNLIKELY(nullptr != p_extra_store_row_keys_)) {
|
||||
share::mtl_free(p_extra_store_row_keys_);
|
||||
}
|
||||
new (this) ObMemtableKeyGenerator();
|
||||
}
|
||||
|
||||
ObMemtableKeyGenerator::~ObMemtableKeyGenerator()
|
||||
{
|
||||
reset();
|
||||
}
|
||||
|
||||
}
|
||||
}
|
@ -305,6 +305,30 @@ public:
|
||||
const common::ObStoreRowkey *rowkey_;
|
||||
};
|
||||
|
||||
|
||||
// this is for multi_set pre alloc memory to generate memtable key
|
||||
class ObMemtableKeyGenerator {// RAII
|
||||
static constexpr int64_t STACK_BUFFER_SIZE = 32;
|
||||
public:
|
||||
ObMemtableKeyGenerator() : p_extra_store_row_keys_(nullptr), p_extra_memtable_keys_(nullptr), size_(0) {}
|
||||
~ObMemtableKeyGenerator();
|
||||
int init(const storage::ObStoreRow *rows,
|
||||
const int64_t row_count,
|
||||
const int64_t schema_rowkey_count,
|
||||
const common::ObIArray<share::schema::ObColDesc> &columns);
|
||||
void reset();
|
||||
int64_t count() const { return size_; }
|
||||
ObMemtableKey &operator[](int64_t idx);
|
||||
const ObMemtableKey &operator[](int64_t idx) const;
|
||||
private:
|
||||
// this is for avoid memory allocation when rows not so much
|
||||
ObStoreRowkey store_row_key_buffer_[STACK_BUFFER_SIZE];
|
||||
ObMemtableKey memtable_key_buffer_[STACK_BUFFER_SIZE];
|
||||
ObStoreRowkey *p_extra_store_row_keys_;
|
||||
ObMemtableKey *p_extra_memtable_keys_;
|
||||
int64_t size_;
|
||||
};
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -333,7 +333,13 @@ public:
|
||||
|
||||
context.init(query_flag, store_ctx, allocator, trans_version_range);
|
||||
|
||||
return mt.set_(tm_->iter_param_, tm_->columns_, write_row, nullptr, nullptr, context);
|
||||
ObStoreRowkey tmp_key;
|
||||
ObMemtableKey mtk;
|
||||
|
||||
tmp_key.assign(write_row.row_val_.cells_, tm_->iter_param_.get_schema_rowkey_count());
|
||||
mtk.encode(tm_->columns_, &tmp_key);
|
||||
|
||||
return mt.set_(tm_->iter_param_, tm_->columns_, write_row, nullptr, nullptr, mtk, context);
|
||||
}
|
||||
int write(int64_t key, int64_t val, ObMemtable &mt, int64_t snapshot_version = 1000) {
|
||||
ObDatumRowkey row_key;
|
||||
|
Reference in New Issue
Block a user