fix direct load memtable implement

This commit is contained in:
suz-yang 2024-05-10 10:35:49 +00:00 committed by ob-robot
parent db7e8aedd6
commit 980e5ce9d1
26 changed files with 572 additions and 350 deletions

View File

@ -87,6 +87,7 @@
#include "share/ob_rpc_struct.h"
#include "rootserver/ob_recovery_ls_service.h"
#include "logservice/ob_server_log_block_mgr.h"
#include "storage/ddl/ob_tablet_ddl_kv.h"
namespace oceanbase
{
@ -974,14 +975,14 @@ int ObDumpMemtableP::process()
} else if (OB_FAIL(tablet_handle.get_obj()->get_all_memtables(tables_handle))) {
LOG_WARN("failed to get all memtable", K(ret), KPC(tablet_handle.get_obj()));
} else {
memtable::ObMemtable *mt;
ObITabletMemtable *tablet_memtable = nullptr;
mkdir("/tmp/dump_memtable/", S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH | S_IXOTH);
for (int64_t i = 0; OB_SUCC(ret) && i < tables_handle.count(); i++) {
if (OB_FAIL(tables_handle.at(i).get_data_memtable(mt))) {
SERVER_LOG(WARN, "fail to get data memtables", K(ret));
if (OB_FAIL(tables_handle.at(i).get_tablet_memtable(tablet_memtable))) {
SERVER_LOG(WARN, "fail to get tablet memtables", K(ret));
} else {
TRANS_LOG(INFO, "start dump memtable", K(*mt), K(arg_));
mt->dump2text("/tmp/dump_memtable/memtable.txt");
TRANS_LOG(INFO, "start dump memtable", K(*tablet_memtable), K(arg_));
tablet_memtable->dump2text("/tmp/dump_memtable/memtable.txt");
}
}
}

View File

@ -189,27 +189,18 @@ int ObTableLoadInstance::start_stmt(const ObTableLoadParam &param)
stmt_ctx_.is_incremental_ = ObDirectLoadMethod::is_incremental(param.method_);
stmt_ctx_.use_insert_into_select_tx_ = param.px_mode_;
if (stmt_ctx_.is_incremental_) { // incremental direct-load
bool end_sql_tx_if_fail = false;
bool rollback_savepoint_if_fail = false;
if (OB_FAIL(build_tx_param())) {
LOG_WARN("fail to build tx param", KR(ret), K(stmt_ctx_));
} else if (OB_FAIL(start_sql_tx())) {
LOG_WARN("fail to start sql tx", KR(ret), K(stmt_ctx_));
} else if (FALSE_IT(end_sql_tx_if_fail = true)) {
} else if (OB_FAIL(create_implicit_savepoint())) {
LOG_WARN("fail to create implicit savepoint", KR(ret), K(stmt_ctx_));
} else if (OB_FAIL(lock_table_in_tx())) {
LOG_WARN("fail to lock table in tx", KR(ret), K(stmt_ctx_));
} else if (FALSE_IT(rollback_savepoint_if_fail = true)) {
} else if (OB_FAIL(init_ddl_param_for_inc_direct_load())) {
LOG_WARN("fail to init ddl param for inc direct load", KR(ret), K(stmt_ctx_));
}
if (OB_FAIL(ret)) {
int tmp_ret = OB_SUCCESS;
if (rollback_savepoint_if_fail && OB_TMP_FAIL(rollback_to_implicit_savepoint())) {
LOG_WARN("fail to rollback to implicit savepoint", KR(tmp_ret));
}
if (end_sql_tx_if_fail && OB_TMP_FAIL(end_sql_tx(false /*commit*/))) {
if (OB_TMP_FAIL(end_sql_tx(false /*commit*/))) {
LOG_WARN("fail to end sql tx", KR(tmp_ret));
}
}
@ -230,10 +221,6 @@ int ObTableLoadInstance::end_stmt(const bool commit)
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
if (stmt_ctx_.is_incremental_) {
// rollback in fail only, ignore ret
if (!commit && OB_TMP_FAIL(rollback_to_implicit_savepoint())) {
LOG_WARN("fail to rollback to implicit savepoint", KR(tmp_ret));
}
if (OB_FAIL(end_sql_tx(commit))) {
LOG_WARN("fail to end sql tx", KR(ret));
}
@ -367,50 +354,6 @@ int ObTableLoadInstance::end_sql_tx(const bool commit)
return ret;
}
int ObTableLoadInstance::create_implicit_savepoint()
{
int ret = OB_SUCCESS;
ObTransService *txs = MTL(ObTransService *);
ObTxDesc *tx_desc = stmt_ctx_.tx_desc_;
if (stmt_ctx_.use_insert_into_select_tx_) {
// do nothing
} else {
ObTxSEQ savepoint;
if (OB_FAIL(txs->create_implicit_savepoint(*tx_desc, stmt_ctx_.tx_param_, savepoint))) {
LOG_WARN("fail to create implicit savepoint", KR(ret));
} else {
stmt_ctx_.savepoint_ = savepoint;
LOG_INFO("create implicit savepoint succeed", KPC(tx_desc), K(savepoint));
}
}
return ret;
}
int ObTableLoadInstance::rollback_to_implicit_savepoint()
{
int ret = OB_SUCCESS;
ObTransService *txs = MTL(ObTransService *);
ObSQLSessionInfo *session_info = stmt_ctx_.session_info_;
ObTxDesc *tx_desc = stmt_ctx_.tx_desc_;
if (stmt_ctx_.use_insert_into_select_tx_) {
// do nothing
} else {
if (!stmt_ctx_.savepoint_.is_valid()) {
// do nothing
} else {
const int64_t stmt_timeout_ts = get_stmt_expire_ts(session_info);
const ObTxSEQ savepoint = stmt_ctx_.savepoint_;
if (OB_FAIL(txs->rollback_to_implicit_savepoint(*tx_desc, savepoint, stmt_timeout_ts, nullptr))) {
LOG_WARN("failed to rollback to implicit savepoint", KR(ret), KPC(tx_desc));
} else {
stmt_ctx_.savepoint_.reset();
LOG_INFO("rollback to implicit savepoint succeed", KPC(tx_desc), K(savepoint));
}
}
}
return ret;
}
int ObTableLoadInstance::lock_table_in_tx()
{
int ret = OB_SUCCESS;

View File

@ -54,9 +54,6 @@ private:
int build_tx_param();
int start_sql_tx();
int end_sql_tx(const bool commit);
// abort tx is async, use rollback savepoint to sync release table lock
int create_implicit_savepoint();
int rollback_to_implicit_savepoint();
int lock_table_in_tx();
int init_ddl_param_for_inc_direct_load();
// full
@ -94,7 +91,6 @@ private:
session_info_ = nullptr;
tx_desc_ = nullptr;
// tx_param_.reset();
savepoint_.reset();
is_incremental_ = false;
use_insert_into_select_tx_ = false;
is_started_ = false;
@ -107,7 +103,6 @@ private:
KP_(session_info),
KPC_(tx_desc),
K_(tx_param),
K_(savepoint),
KP_(is_incremental),
KP_(use_insert_into_select_tx),
KP_(is_started),
@ -119,7 +114,6 @@ private:
sql::ObSQLSessionInfo *session_info_;
transaction::ObTxDesc *tx_desc_;
transaction::ObTxParam tx_param_;
transaction::ObTxSEQ savepoint_;
bool is_incremental_;
bool use_insert_into_select_tx_; // whether use the transaction of insert into select
bool is_started_;

View File

@ -139,7 +139,7 @@ int ObAllVirtualMemstoreInfo::get_next_tablet(ObTabletHandle &tablet_handle)
return ret;
}
int ObAllVirtualMemstoreInfo::get_next_memtable(memtable::ObMemtable *&mt)
int ObAllVirtualMemstoreInfo::get_next_memtable(ObITabletMemtable *&mt)
{
int ret = OB_SUCCESS;
@ -164,7 +164,7 @@ int ObAllVirtualMemstoreInfo::get_next_memtable(memtable::ObMemtable *&mt)
} else if (OB_FAIL(tablet_handle.get_obj()->get_all_memtables(tables_handle_))) {
SERVER_LOG(WARN, "failed to get_memtable_mgr for get all memtable", K(ret), KPC(tablet_handle.get_obj()));
}
} else if (OB_FAIL(tables_handle_.at(memtable_array_pos_++).get_data_memtable(mt))) {
} else if (OB_FAIL(tables_handle_.at(memtable_array_pos_++).get_tablet_memtable(mt))) {
// get next memtable
ret = OB_SUCCESS;
} else if (OB_ISNULL(mt)) {
@ -197,7 +197,7 @@ void ObAllVirtualMemstoreInfo::get_freeze_time_dist(const ObMtStat& mt_stat)
int ObAllVirtualMemstoreInfo::process_curr_tenant(ObNewRow *&row)
{
int ret = OB_SUCCESS;
ObMemtable *mt = NULL;
ObITabletMemtable *mt = NULL;
if (NULL == allocator_) {
ret = OB_NOT_INIT;
SERVER_LOG(WARN, "allocator_ shouldn't be NULL", K(allocator_), K(ret));
@ -214,6 +214,10 @@ int ObAllVirtualMemstoreInfo::process_curr_tenant(ObNewRow *&row)
} else {
ObMtStat& mt_stat = mt->get_mt_stat();
const int64_t col_count = output_column_ids_.count();
memtable::ObMemtable *data_memtable = NULL;
if (mt->is_data_memtable()) {
data_memtable = static_cast<memtable::ObMemtable *>(mt);
}
for (int64_t i = 0; OB_SUCC(ret) && i < col_count; ++i) {
uint64_t col_id = output_column_ids_.at(i);
switch (col_id) {
@ -284,19 +288,35 @@ int ObAllVirtualMemstoreInfo::process_curr_tenant(ObNewRow *&row)
break;
case OB_APP_MIN_COLUMN_ID + 14:
// hash_item_count
cur_row_.cells_[i].set_int(mt->get_hash_item_count());
if (nullptr != data_memtable) {
cur_row_.cells_[i].set_int(data_memtable->get_hash_item_count());
} else {
cur_row_.cells_[i].set_int(0);
}
break;
case OB_APP_MIN_COLUMN_ID + 15:
// hash_mem_used
cur_row_.cells_[i].set_int(mt->get_hash_alloc_memory());
if (nullptr != data_memtable) {
cur_row_.cells_[i].set_int(data_memtable->get_hash_alloc_memory());
} else {
cur_row_.cells_[i].set_int(0);
}
break;
case OB_APP_MIN_COLUMN_ID + 16:
// btree_item_count
cur_row_.cells_[i].set_int(mt->get_btree_item_count());
if (nullptr != data_memtable) {
cur_row_.cells_[i].set_int(data_memtable->get_btree_item_count());
} else {
cur_row_.cells_[i].set_int(0);
}
break;
case OB_APP_MIN_COLUMN_ID + 17:
// btree_mem_used
cur_row_.cells_[i].set_int(mt->get_btree_alloc_memory());
if (nullptr != data_memtable) {
cur_row_.cells_[i].set_int(data_memtable->get_btree_alloc_memory());
} else {
cur_row_.cells_[i].set_int(0);
}
break;
case OB_APP_MIN_COLUMN_ID + 18:
// insert_row_count

View File

@ -49,7 +49,7 @@ private:
virtual void release_last_tenant() override;
int get_next_ls(ObLS *&ls);
int get_next_tablet(storage::ObTabletHandle &tablet_handle);
int get_next_memtable(memtable::ObMemtable *&mt);
int get_next_memtable(storage::ObITabletMemtable *&mt);
void get_freeze_time_dist(const ObMtStat& mt_stat);
private:
common::ObAddr addr_;

View File

@ -13,6 +13,7 @@
#include "storage/access/ob_sample_iter_helper.h"
#include "storage/memtable/ob_memtable.h"
#include "storage/access/ob_multiple_multi_scan_merge.h"
#include "storage/ddl/ob_tablet_ddl_kv.h"
namespace oceanbase {
namespace storage {
@ -86,7 +87,8 @@ int ObGetSampleIterHelper::can_retire_to_memtable_row_sample_(bool &retire, ObIA
memtable_row_count += memtable->get_physical_row_cnt();
}
} else if (table->is_direct_load_memtable()) {
// FIXEM : @suzhi.yt support direct load memtable get_row_count();
ObDDLKV *ddl_kv = static_cast<ObDDLKV *>(table);
sstable_row_count += ddl_kv->get_row_count();
} else if (table->is_sstable()) {
sstable_row_count += static_cast<ObSSTable *>(table)->get_row_count();
}

View File

@ -18,7 +18,7 @@
#include "storage/memtable/mvcc/ob_mvcc_engine.h"
#include "storage/memtable/mvcc/ob_mvcc_iterator.h"
#include "storage/column_store/ob_column_oriented_sstable.h"
#include "storage/ddl/ob_tablet_ddl_kv.h"
namespace oceanbase
{
@ -26,6 +26,35 @@ using namespace blocksstable;
namespace storage
{
class ObDirectLoadMemtableScanRowCountEstimator
{
public:
ObDirectLoadMemtableScanRowCountEstimator(const ObTableEstimateBaseInput &base_input,
const ObDatumRange &range,
ObPartitionEst &tmp_cost)
: base_input_(base_input), range_(range), tmp_cost_(tmp_cost)
{
}
int operator()(ObDDLMemtable *ddl_memtable)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(ddl_memtable)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected ddl memtable is null", K(ret));
} else if (OB_FAIL(ObTableEstimator::estimate_sstable_scan_row_count(base_input_,
ddl_memtable,
range_,
tmp_cost_))) {
LOG_WARN("failed to estimate sstable row count", K(ret), KPC(ddl_memtable));
}
return ret;
}
private:
const ObTableEstimateBaseInput &base_input_;
const ObDatumRange &range_;
ObPartitionEst &tmp_cost_;
};
int ObTableEstimator::estimate_row_count_for_get(
ObTableEstimateBaseInput &base_input,
const ObExtSRowkeyArray &rowkeys,
@ -154,9 +183,15 @@ int ObTableEstimator::estimate_multi_scan_row_count(
}
}
} else if (current_table->is_direct_load_memtable()) {
// FIXME : @suzhi.yt
ret = OB_NOT_SUPPORTED;
LOG_WARN("not supported memtable", KR(ret), KPC(current_table));
ObDDLKV *ddl_kv = static_cast<ObDDLKV *>(current_table);
ObDirectLoadMemtableScanRowCountEstimator estimator(base_input, range, tmp_cost);
if (OB_FAIL(ddl_kv->access_first_ddl_memtable(estimator))) {
if (OB_UNLIKELY(OB_ENTRY_NOT_EXIST != ret)) {
STORAGE_LOG(WARN, "fail to access first ddl memtable", K(ret), KPC(current_table));
} else {
ret = OB_SUCCESS;
}
}
} else {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected table type", K(ret), K(*current_table));

View File

@ -300,9 +300,6 @@ int ObPartitionMergePolicy::find_mini_merge_tables(
} else if (OB_UNLIKELY(memtable->is_active_memtable())) {
LOG_DEBUG("skip active memtable", K(i), KPC(memtable), K(memtable_handles));
break;
} else if (OB_UNLIKELY(memtable->is_direct_load_memtable())) {
LOG_DEBUG("skip direct load memtable", K(i), KPC(memtable), K(memtable_handles));
break;
} else if (!memtable->can_be_minor_merged()) {
FLOG_INFO("memtable cannot mini merge now", K(ret), K(i), KPC(memtable), K(max_snapshot_version), K(memtable_handles), K(param));
break;
@ -1022,7 +1019,7 @@ int ObPartitionMergePolicy::check_need_medium_merge(
can_merge = tablet.get_snapshot_version() >= medium_snapshot;
if (!can_merge) {
ObTableHandleV2 memtable_handle;
memtable::ObMemtable *last_frozen_memtable = nullptr;
ObITabletMemtable *last_frozen_memtable = nullptr;
if (OB_FAIL(tablet.get_protected_memtable_mgr_handle(protected_handle))) {
LOG_WARN("failed to get_protected_memtable_mgr_handle", K(ret), K(tablet));
} else if (OB_FAIL(protected_handle->get_last_frozen_memtable(memtable_handle))) {
@ -1032,7 +1029,7 @@ int ObPartitionMergePolicy::check_need_medium_merge(
} else {
LOG_WARN("failed to get last frozen memtable", K(ret), K(tablet));
}
} else if (OB_FAIL(memtable_handle.get_data_memtable(last_frozen_memtable))) {
} else if (OB_FAIL(memtable_handle.get_tablet_memtable(last_frozen_memtable))) {
LOG_WARN("failed to get last frozen memtable", K(ret));
} else {
need_force_freeze = last_frozen_memtable->get_snapshot_version() < medium_snapshot;

View File

@ -91,7 +91,7 @@ int ObFastFreezeChecker::check_need_fast_freeze(
int ret = OB_SUCCESS;
need_fast_freeze = false;
ObTableHandleV2 table_handle;
memtable::ObMemtable *memtable = nullptr;
ObITabletMemtable *memtable = nullptr;
const share::ObLSID &ls_id = tablet.get_tablet_meta().ls_id_;
const common::ObTabletID &tablet_id = tablet.get_tablet_meta().tablet_id_;
@ -104,23 +104,26 @@ int ObFastFreezeChecker::check_need_fast_freeze(
} else {
LOG_WARN("[FastFreeze] failed to get active memtable", K(ret));
}
} else if (OB_FAIL(table_handle.get_data_memtable(memtable))) {
} else if (OB_FAIL(table_handle.get_tablet_memtable(memtable))) {
LOG_WARN("[FastFreeze] failed to get memtalbe", K(ret), K(table_handle));
} else if (OB_ISNULL(memtable)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("[FastFreeze] get unexpected null memtable", K(ret), KPC(memtable));
} else if (!memtable->is_active_memtable()) {
// do nothing
} else if (!memtable->is_data_memtable()) {
// do nothing
} else if (ObTimeUtility::current_time() < memtable->get_timestamp() + FAST_FREEZE_INTERVAL_US) {
if (REACH_TENANT_TIME_INTERVAL(PRINT_LOG_INVERVAL)) {
LOG_INFO("[FastFreeze] no need to check fast freeze now", K(tablet));
}
} else {
check_hotspot_need_fast_freeze(*memtable, need_fast_freeze);
memtable::ObMemtable *mt = static_cast<memtable::ObMemtable *>(memtable);
check_hotspot_need_fast_freeze(*mt, need_fast_freeze);
if (need_fast_freeze) {
FLOG_INFO("[FastFreeze] tablet detects hotspot row, need fast freeze", K(ls_id), K(tablet_id));
} else {
check_tombstone_need_fast_freeze(tablet, *memtable, need_fast_freeze);
check_tombstone_need_fast_freeze(tablet, *mt, need_fast_freeze);
if (need_fast_freeze) {
FLOG_INFO("[FastFreeze] tablet detects tombstone, need fast freeze", K(ls_id), K(tablet_id));
}

View File

@ -886,10 +886,17 @@ int ObTabletDDLUtil::update_ddl_table_store(
} else {
const bool is_major_sstable = ddl_param.table_key_.is_major_sstable();
const int64_t rebuild_seq = ls.get_rebuild_seq();
const int64_t snapshot_version = is_major_sstable ? max(ddl_param.snapshot_version_, tablet.get_snapshot_version())
: tablet.get_snapshot_version();
const int64_t multi_version_start = is_major_sstable ? max(ddl_param.snapshot_version_, tablet.get_multi_version_start())
: 0;
int64_t snapshot_version = 0;
int64_t multi_version_start = 0;
if (is_full_direct_load(ddl_param.direct_load_type_)) {
snapshot_version = is_major_sstable ? max(ddl_param.snapshot_version_, tablet.get_snapshot_version())
: tablet.get_snapshot_version();
multi_version_start = is_major_sstable ? max(ddl_param.snapshot_version_, tablet.get_multi_version_start())
: 0;
} else {
snapshot_version = max(ddl_param.snapshot_version_, tablet.get_snapshot_version());
multi_version_start = tablet.get_multi_version_start();
}
ObTabletHandle new_tablet_handle;
ObUpdateTableStoreParam table_store_param(sstable,
snapshot_version,

View File

@ -874,6 +874,7 @@ ObDDLKV::ObDDLKV()
: is_inited_(false), is_closed_(false), is_inc_ddl_kv_(false), is_independent_freezed_(false), lock_(),
arena_allocator_("DDL_CONTAINER", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()),
tablet_id_(), ddl_start_scn_(SCN::min_scn()), ddl_snapshot_version_(0), data_format_version_(0), trans_id_(),
data_schema_version_(0), column_count_(0),
min_scn_(SCN::max_scn()), max_scn_(SCN::min_scn()), pending_cnt_(0),
macro_block_count_(0)
{
@ -935,6 +936,8 @@ void ObDDLKV::reset()
ddl_snapshot_version_ = 0;
data_format_version_ = 0;
trans_id_.reset();
data_schema_version_ = 0;
column_count_ = 0;
min_scn_ = SCN::max_scn();
max_scn_ = SCN::min_scn();
@ -1013,11 +1016,6 @@ int ObDDLKV::get_ddl_memtable(const int64_t cg_idx, ObDDLMemtable *&ddl_memtable
return ret;
}
int ObDDLKV::get_first_ddl_memtable(ObDDLMemtable *&ddl_memtable)
{
return get_ddl_memtable(0, ddl_memtable);
}
int ObDDLKV::set_macro_block(
ObTablet &tablet,
const ObDDLMacroBlock &macro_block,
@ -1145,6 +1143,12 @@ int ObDDLKV::set_macro_block(
min_scn_ = SCN::min(min_scn_, macro_block.scn_);
max_scn_ = SCN::max(max_scn_, macro_block.scn_);
++macro_block_count_;
if (0 == data_schema_version_) {
data_schema_version_ = data_macro_meta->val_.schema_version_;
}
if (0 == column_count_) {
column_count_ = data_macro_meta->val_.column_count_ - ObMultiVersionRowkeyHelpper::get_extra_rowkey_col_cnt();
}
LOG_INFO("succeed to set macro block into ddl kv", K(macro_block), K(macro_block_count_), KPC(data_macro_meta));
}
}
@ -1584,19 +1588,33 @@ void ObDDLKV::set_allow_freeze(const bool allow_freeze)
}
int ObDDLKV::get_frozen_schema_version(int64_t &schema_version) const
{
UNUSED(schema_version);
return OB_NOT_SUPPORTED;
}
bool ObDDLKV::can_be_minor_merged()
{
return ready_for_flush() && ObITabletMemtable::can_be_minor_merged();
}
int ObDDLKV::get_schema_info(
const int64_t input_column_cnt,
int64_t &max_schema_version_on_memtable,
int64_t &max_column_cnt_on_memtable) const
{
int ret = OB_SUCCESS;
TCRLockGuard guard(lock_);
if (OB_UNLIKELY(!is_inc_ddl_kv())) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("not support get for full direct load", K(ret));
} else if (ddl_memtables_.count() == 0) {
schema_version = 0;
} else if (OB_UNLIKELY(ddl_memtables_.count() != 1)) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("inc direct load do not support column store yet", K(ret));
} else if (OB_FAIL(ddl_memtables_.at(0)->get_frozen_schema_version(schema_version))) {
LOG_WARN("fail to get row", K(ret));
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("not inited", K(ret));
} else {
TCRLockGuard guard(lock_);
if (column_count_ >= input_column_cnt) {
LOG_INFO("column cnt or schema version is updated by ddl kv", KPC(this),
K(max_column_cnt_on_memtable), K(max_schema_version_on_memtable));
max_column_cnt_on_memtable = MAX(max_column_cnt_on_memtable, column_count_);
max_schema_version_on_memtable = MAX(max_schema_version_on_memtable, data_schema_version_);
}
}
return ret;
}
@ -1625,13 +1643,18 @@ int ObDDLKV::get_frozen_schema_version(int64_t &schema_version) const
} \
}
int ObDDLKV::exist(const ObTableIterParam &param, ObTableAccessContext &context,
const blocksstable::ObDatumRowkey &rowkey, bool &is_exist, bool &has_found)
int ObDDLKV::exist(
const ObTableIterParam &param,
ObTableAccessContext &context,
const ObDatumRowkey &rowkey,
bool &is_exist,
bool &has_found)
{
int ret = OB_SUCCESS;
TCRLockGuard guard(lock_);
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("not inited", K(ret));
} else if (OB_UNLIKELY(!is_inc_ddl_kv())) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("not support get for full direct load", K(ret));
@ -1653,6 +1676,7 @@ int ObDDLKV::exist(ObRowsInfo &rowsInfo, bool &is_exist, bool &has_found)
TCRLockGuard guard(lock_);
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("not inited", K(ret));
} else if (OB_UNLIKELY(!is_inc_ddl_kv())) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("not support get for full direct load", K(ret));
@ -1668,13 +1692,16 @@ int ObDDLKV::exist(ObRowsInfo &rowsInfo, bool &is_exist, bool &has_found)
return ret;
}
int ObDDLKV::scan(const ObTableIterParam &param, ObTableAccessContext &context,
const blocksstable::ObDatumRange &key_range, ObStoreRowIterator *&row_iter)
int ObDDLKV::scan(
const ObTableIterParam &param,
ObTableAccessContext &context,
const ObDatumRange &key_range,
ObStoreRowIterator *&row_iter)
{
int ret = OB_SUCCESS;
TCRLockGuard guard(lock_);
if (OB_UNLIKELY(!is_inited_)) {
if (OB_UNLIKELY(IS_NOT_INIT)) {
ret = OB_NOT_INIT;
LOG_WARN("not inited", K(ret));
} else if (OB_UNLIKELY(
!param.is_valid()
|| !context.is_valid()
@ -1684,26 +1711,22 @@ int ObDDLKV::scan(const ObTableIterParam &param, ObTableAccessContext &context,
} else if (OB_UNLIKELY(!is_inc_ddl_kv())) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("not support get for full direct load", K(ret));
} else if (OB_UNLIKELY(ddl_memtables_.count() == 0)) {
if (OB_FAIL(get_empty_iter(param, context, &key_range, row_iter))) {
LOG_WARN("fail to get empty iter", K(ret));
}
} else if (ddl_memtables_.count() != 1) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("inc direct load do not support column store yet", K(ret));
} else {
ALLOCATE_DDL_KV_MULTI_VERSION_ROW_IETRATOR(ObSSTableMultiVersionRowScanner, &key_range, row_iter);
}
return ret;
}
int ObDDLKV::get(const storage::ObTableIterParam &param, storage::ObTableAccessContext &context,
const blocksstable::ObDatumRowkey &rowkey, ObStoreRowIterator *&row_iter)
int ObDDLKV::get(
const ObTableIterParam &param,
ObTableAccessContext &context,
const ObDatumRowkey &rowkey,
ObStoreRowIterator *&row_iter)
{
int ret = OB_SUCCESS;
TCRLockGuard guard(lock_);
if (OB_UNLIKELY(!is_inited_)) {
if (OB_UNLIKELY(IS_NOT_INIT)) {
ret = OB_NOT_INIT;
LOG_WARN("not inited", K(ret));
} else if (OB_UNLIKELY(
!param.is_valid()
|| !context.is_valid()
@ -1713,92 +1736,33 @@ int ObDDLKV::get(const storage::ObTableIterParam &param, storage::ObTableAccessC
} else if (OB_UNLIKELY(!is_inc_ddl_kv())) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("not support get for full direct load", K(ret));
} else if (OB_UNLIKELY(ddl_memtables_.count() == 0)) {
if (OB_FAIL(get_empty_iter(param, context, &rowkey, row_iter))) {
LOG_WARN("fail to get empty iter", K(ret));
}
} else if (ddl_memtables_.count() != 1) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("inc direct load do not support column store yet", K(ret));
} else {
ALLOCATE_DDL_KV_MULTI_VERSION_ROW_IETRATOR(ObSSTableMultiVersionRowGetter, &rowkey, row_iter);
}
return ret;
}
int ObDDLKV::multi_get(const ObTableIterParam &param, ObTableAccessContext &context,
const common::ObIArray<blocksstable::ObDatumRowkey> &rowkeys, ObStoreRowIterator *&row_iter)
{
int ret = OB_SUCCESS;
TCRLockGuard guard(lock_);
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
} else if (OB_UNLIKELY(
!param.is_valid()
|| !context.is_valid()
|| 0 >= rowkeys.count())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("Invalid argument", K(ret), K(param), K(context), K(rowkeys));
} else if (OB_UNLIKELY(!is_inc_ddl_kv())) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("not support get for full direct load", K(ret));
} else if (OB_UNLIKELY(ddl_memtables_.count() == 0)) {
if (OB_FAIL(get_empty_iter(param, context, &rowkeys, row_iter))) {
LOG_WARN("fail to get empty iter", K(ret));
}
} else if (ddl_memtables_.count() != 1) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("inc direct load do not support column store yet", K(ret));
} else {
ALLOCATE_DDL_KV_MULTI_VERSION_ROW_IETRATOR(ObSSTableMultiVersionRowMultiGetter, &rowkeys, row_iter);
}
return ret;
}
int ObDDLKV::multi_scan(const ObTableIterParam &param, ObTableAccessContext &context,
const common::ObIArray<blocksstable::ObDatumRange> &ranges, ObStoreRowIterator *&row_iter)
{
int ret = OB_SUCCESS;
TCRLockGuard guard(lock_);
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
} else if (OB_UNLIKELY(
!param.is_valid()
|| !context.is_valid()
|| 0 >= ranges.count())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("Invalid argument", K(ret), K(param), K(context), K(ranges));
} else if (OB_UNLIKELY(!is_inc_ddl_kv())) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("not support get for full direct load", K(ret));
} else if (OB_UNLIKELY(ddl_memtables_.count() == 0)) {
if (OB_FAIL(get_empty_iter(param, context, &ranges, row_iter))) {
LOG_WARN("fail to get empty iter", K(ret));
}
} else if (ddl_memtables_.count() != 1) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("inc direct load do not support column store yet", K(ret));
} else {
ALLOCATE_DDL_KV_MULTI_VERSION_ROW_IETRATOR(ObSSTableMultiVersionRowMultiScanner, &ranges, row_iter);
}
return ret;
}
int ObDDLKV::get(const storage::ObTableIterParam &param, storage::ObTableAccessContext &context,
const blocksstable::ObDatumRowkey &rowkey, blocksstable::ObDatumRow &row)
int ObDDLKV::get(
const ObTableIterParam &param,
ObTableAccessContext &context,
const ObDatumRowkey &rowkey,
ObDatumRow &row)
{
int ret = OB_SUCCESS;
ObStoreRowIterator *row_iter = nullptr;
const ObDatumRow *row_ptr= nullptr;
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("not inited", K(ret));
} else if (OB_UNLIKELY(!is_inc_ddl_kv())) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("not support get for full direct load", K(ret));
} else if (OB_FAIL(get(param, context, rowkey, row_iter))) {
LOG_WARN("fail to get row", K(ret));
} else if (OB_FAIL(row_iter->get_next_row(row_ptr))) {
LOG_WARN("fail to get row", K(ret));
if (OB_UNLIKELY(OB_ITER_END != ret)) {
LOG_WARN("fail to get row", K(ret));
}
} else if (OB_ISNULL(row_ptr) || row_ptr->row_flag_.is_not_exist()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected row", K(ret));
@ -1812,55 +1776,76 @@ int ObDDLKV::get(const storage::ObTableIterParam &param, storage::ObTableAccessC
return ret;
}
int ObDDLKV::get_empty_iter(const ObTableIterParam &param, ObTableAccessContext &context,
const void *ranges, ObStoreRowIterator *&row_iter)
int ObDDLKV::multi_get(
const ObTableIterParam &param,
ObTableAccessContext &context,
const ObIArray<ObDatumRowkey> &rowkeys,
ObStoreRowIterator *&row_iter)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!context.is_valid() || OB_ISNULL(ranges) || !param.is_valid())) {
if (OB_UNLIKELY(IS_NOT_INIT)) {
ret = OB_NOT_INIT;
LOG_WARN("not inited", K(ret));
} else if (OB_UNLIKELY(
!param.is_valid()
|| !context.is_valid()
|| 0 >= rowkeys.count())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("Invalid argument", K(ret), K(context));
LOG_WARN("Invalid argument", K(ret), K(param), K(context), K(rowkeys));
} else if (OB_UNLIKELY(!is_inc_ddl_kv())) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("not support get for full direct load", K(ret));
} else {
void *buf = nullptr;
ObStoreRowIterator *empty_row_iter = nullptr;
ALLOCATE_TABLE_STORE_ROW_IETRATOR(context,
ObDDLKVEmptyIterator,
empty_row_iter);
if (OB_SUCC(ret)) {
if (OB_ISNULL(empty_row_iter)) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "unexpected error, row_scanner is nullptr", K(ret), KP(empty_row_iter));
} else if (OB_FAIL(empty_row_iter->init(param, context, this, ranges))) {
LOG_WARN("Fail to open row scanner", K(ret), K(param), K(context), KP(ranges), K(*this));
}
}
if (OB_FAIL(ret)) {
if (nullptr != empty_row_iter) {
empty_row_iter->~ObStoreRowIterator();
FREE_TABLE_STORE_ROW_IETRATOR(context, empty_row_iter);
empty_row_iter = nullptr;
}
} else {
row_iter = empty_row_iter;
}
ALLOCATE_DDL_KV_MULTI_VERSION_ROW_IETRATOR(ObSSTableMultiVersionRowMultiGetter, &rowkeys, row_iter);
}
return ret;
}
int ObDDLKV::check_row_locked(const ObTableIterParam &param, const blocksstable::ObDatumRowkey &rowkey,
ObTableAccessContext &context, ObStoreRowLockState &lock_state, ObRowState &row_state, bool check_exist)
int ObDDLKV::multi_scan(
const ObTableIterParam &param,
ObTableAccessContext &context,
const ObIArray<ObDatumRange> &ranges,
ObStoreRowIterator *&row_iter)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(IS_NOT_INIT)) {
ret = OB_NOT_INIT;
LOG_WARN("not inited", K(ret));
} else if (OB_UNLIKELY(
!param.is_valid()
|| !context.is_valid()
|| 0 >= ranges.count())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("Invalid argument", K(ret), K(param), K(context), K(ranges));
} else if (OB_UNLIKELY(!is_inc_ddl_kv())) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("not support get for full direct load", K(ret));
} else {
ALLOCATE_DDL_KV_MULTI_VERSION_ROW_IETRATOR(ObSSTableMultiVersionRowMultiScanner, &ranges, row_iter);
}
return ret;
}
int ObDDLKV::check_row_locked(
const ObTableIterParam &param,
const ObDatumRowkey &rowkey,
ObTableAccessContext &context,
ObStoreRowLockState &lock_state,
ObRowState &row_state,
bool check_exist)
{
int ret = OB_SUCCESS;
lock_state.trans_version_ = SCN::min_scn();
lock_state.is_locked_ = false;
TCRLockGuard guard(lock_);
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("not inited", K(ret));
} else if (OB_UNLIKELY(!is_inc_ddl_kv())) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("not support get for full direct load", K(ret));
} else if (ddl_memtables_.count() == 0) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("not support to check lock when memtable count is 0", K(ret));
// do nothing
} else if (ddl_memtables_.count() != 1) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("inc direct load do not support column store yet", K(ret));
@ -1870,24 +1855,95 @@ int ObDDLKV::check_row_locked(const ObTableIterParam &param, const blocksstable:
return ret;
}
int ObDDLKV::check_rows_locked(const bool check_exist, ObTableAccessContext &context,
SCN &max_trans_version, ObRowsInfo &rows_info)
int ObDDLKV::check_rows_locked(
const bool check_exist,
ObTableAccessContext &context,
SCN &max_trans_version,
ObRowsInfo &rows_info)
{
int ret = OB_SUCCESS;
TCRLockGuard guard(lock_);
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("not inited", K(ret));
} else if (OB_UNLIKELY(!is_inc_ddl_kv())) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("not support get for full direct load", K(ret));
} else if (ddl_memtables_.count() == 0) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("not support to check lock when memtable count is 0", K(ret));
// do nothing
} else if (ddl_memtables_.count() != 1) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("inc direct load do not support column store yet", K(ret));
} else if (OB_FAIL(ddl_memtables_.at(0)->check_rows_locked(check_exist, context, max_trans_version, rows_info))) {
LOG_WARN("fail to get row", K(ret));
} else {
ret = OB_NOT_SUPPORTED;
LOG_WARN("check_rows_locked in ddl memtable is not supported", K(ret));
// } else if (OB_FAIL(ddl_memtables_.at(0)->check_rows_locked(check_exist, context, max_trans_version, rows_info))) {
// LOG_WARN("fail to get row", K(ret));
}
return ret;
}
int64_t ObDDLKV::get_occupied_size() const
{
int ret = OB_SUCCESS;
int64_t occupied_size = 0;
if (OB_UNLIKELY(IS_NOT_INIT)) {
ret = OB_NOT_INIT;
LOG_WARN("not inited", K(ret));
} else {
TCRLockGuard guard(lock_);
ObSSTableMetaHandle sst_meta_hdl;
for (int64_t i = 0; OB_SUCC(ret) && i < ddl_memtables_.count(); ++i) {
ObDDLMemtable *ddl_memtable = ddl_memtables_.at(i);
occupied_size += ddl_memtable->get_occupy_size();
}
}
return occupied_size;
}
int64_t ObDDLKV::get_row_count() const
{
int ret = OB_SUCCESS;
int64_t row_count = 0;
if (OB_UNLIKELY(IS_NOT_INIT)) {
ret = OB_NOT_INIT;
LOG_WARN("not inited", K(ret));
} else {
TCRLockGuard guard(lock_);
ObSSTableMetaHandle sst_meta_hdl;
for (int64_t i = 0; OB_SUCC(ret) && i < ddl_memtables_.count(); ++i) {
ObDDLMemtable *ddl_memtable = ddl_memtables_.at(i);
row_count += ddl_memtable->get_row_count();
}
}
return row_count;
}
int ObDDLKV::get_block_count_and_row_count(
int64_t &macro_block_count,
int64_t &micro_block_count,
int64_t &row_count) const
{
int ret = OB_SUCCESS;
macro_block_count = 0;
micro_block_count = 0;
row_count = 0;
if (OB_UNLIKELY(IS_NOT_INIT)) {
ret = OB_NOT_INIT;
LOG_WARN("not inited", K(ret));
} else {
TCRLockGuard guard(lock_);
ObSSTableMetaHandle sst_meta_hdl;
for (int64_t i = 0; OB_SUCC(ret) && i < ddl_memtables_.count(); ++i) {
ObDDLMemtable *ddl_memtable = ddl_memtables_.at(i);
if (OB_FAIL(ddl_memtable->get_meta(sst_meta_hdl))) {
LOG_WARN("fail to get meta", K(ret));
} else {
macro_block_count += sst_meta_hdl.get_sstable_meta().get_data_macro_block_count();
micro_block_count += sst_meta_hdl.get_sstable_meta().get_data_micro_block_count();
row_count += sst_meta_hdl.get_sstable_meta().get_row_count();
}
}
}
return ret;
}

View File

@ -42,24 +42,6 @@ class ObDataMacroBlockMeta;
namespace storage
{
class ObDDLKVEmptyIterator : public ObStoreRowIterator
{
public:
ObDDLKVEmptyIterator() {};
virtual ~ObDDLKVEmptyIterator() {}
void reset() {}
void reuse() {}
int get_next_row(const blocksstable::ObDatumRow *&store_row) { return OB_ITER_END; }
protected:
int inner_open(
const ObTableIterParam &iter_param,
ObTableAccessContext &access_ctx,
ObITable *table,
const void *query_range) { return OB_SUCCESS; }
virtual int inner_get_next_row(const blocksstable::ObDatumRow *&store_row) { return OB_ITER_END; }
};
class ObBlockMetaTreeValue final
{
public:
@ -230,7 +212,13 @@ public:
const share::SCN &last_freezed_scn,
const uint64_t data_format_version);
public: // derived from ObIMemtable
// for read_barrier, it needs to be always false
virtual bool is_empty() const override { return false; }
virtual int64_t get_occupied_size() const override;
public : // derived from ObITabletMemtable
virtual bool is_inited() const override { return is_inited_; }
virtual int init(const ObITable::TableKey &table_key,
ObLSHandle &ls_handle,
ObFreezer *freezer,
@ -238,10 +226,19 @@ public : // derived from ObITabletMemtable
const int64_t schema_version,
const uint32_t freeze_clock) override;
virtual void print_ready_for_flush() override;
virtual bool is_inited() const override { return is_inited_; }
virtual bool is_frozen_memtable() override;
virtual int set_frozen() override { ATOMIC_SET(&is_independent_freezed_, true); return OB_SUCCESS; }
virtual bool can_be_minor_merged() override;
virtual int get_schema_info(
const int64_t input_column_cnt,
int64_t &max_schema_version_on_memtable,
int64_t &max_column_cnt_on_memtable) const override;
// TODO : @suzhi.yt implement ddlkv dump2text
virtual int dump2text(const char *fname) override { return OB_SUCCESS; }
public: // derived from ObITable
virtual bool is_frozen_memtable() override;
virtual int get_frozen_schema_version(int64_t &schema_version) const;
public: // derived from ObSSTable
virtual int exist(
const ObTableIterParam &param,
ObTableAccessContext &context,
@ -280,7 +277,7 @@ public: // derived from ObSSTable
ObTableAccessContext &context,
const common::ObIArray<blocksstable::ObDatumRange> &ranges,
ObStoreRowIterator *&row_iter) override;
virtual int get_frozen_schema_version(int64_t &schema_version) const;
int check_row_locked(
const ObTableIterParam &param,
const blocksstable::ObDatumRowkey &rowkey,
@ -288,6 +285,8 @@ public: // derived from ObSSTable
ObStoreRowLockState &lock_state,
ObRowState &row_state,
bool check_exist = false);
// TODO : @jianyun.sjy ObDDLMemtable adapts check_rows_locked
int check_rows_locked(
const bool check_exist,
storage::ObTableAccessContext &context,
@ -320,19 +319,26 @@ public:
share::SCN get_freeze_scn() const { return freeze_scn_; }
share::SCN get_ddl_start_scn() const { return ddl_start_scn_; }
int64_t get_macro_block_cnt() const { return macro_block_count_; }
int create_ddl_memtable(ObTablet &tablet, const ObITable::TableKey &table_key, ObDDLMemtable *&ddl_memtable);
// not thread safe, external call are limited to ddl merge task
int get_ddl_memtable(const int64_t cg_idx, ObDDLMemtable *&ddl_memtable);
int get_first_ddl_memtable(ObDDLMemtable *&ddl_memtable);
ObIArray<ObDDLMemtable *> &get_ddl_memtables() { return ddl_memtables_; }
void inc_pending_cnt(); // used by ddl kv pending guard
void dec_pending_cnt();
// const common::ObTabletID &get_tablet_id() const { return tablet_id_; }
int64_t get_snapshot_version() const { return ddl_snapshot_version_; }
uint64_t get_data_format_version() const { return data_format_version_; }
const transaction::ObTransID &get_trans_id() const { return trans_id_; }
int64_t get_memory_used() const;
OB_INLINE bool is_inc_ddl_kv() const { return is_inc_ddl_kv_; }
virtual int set_frozen() override { ATOMIC_SET(&is_independent_freezed_, true);; return OB_SUCCESS; }
int64_t get_row_count() const;
int get_block_count_and_row_count(
int64_t &macro_block_count,
int64_t &micro_block_count,
int64_t &row_count) const;
// for inc_ddl_kv only
template<class _callback>
int access_first_ddl_memtable(_callback &callback) const;
INHERIT_TO_STRING_KV("ObITabletMemtable",
ObITabletMemtable,
@ -346,6 +352,8 @@ public:
K_(snapshot_version),
K_(data_format_version),
K_(trans_id),
K_(data_schema_version),
K_(column_count),
K_(min_scn),
K_(max_scn),
K_(freeze_scn),
@ -360,8 +368,8 @@ private:
int full_load_freeze_(const share::SCN &freeze_scn);
int inc_load_freeze_();
int get_empty_iter(const ObTableIterParam &param, ObTableAccessContext &context,
const void *anges, ObStoreRowIterator *&row_iter);
int create_ddl_memtable(ObTablet &tablet, const ObITable::TableKey &table_key, ObDDLMemtable *&ddl_memtable);
private:
static const int64_t TOTAL_LIMIT = 10 * 1024 * 1024 * 1024L;
static const int64_t HOLD_LIMIT = 10 * 1024 * 1024 * 1024L;
@ -376,6 +384,8 @@ private:
int64_t ddl_snapshot_version_; // the snapshot version for major sstable which is completed by ddl
uint64_t data_format_version_;
transaction::ObTransID trans_id_; // for incremental direct load only
int64_t data_schema_version_;
int64_t column_count_;
// freeze related
share::SCN min_scn_; // the min log ts of macro blocks
@ -386,6 +396,28 @@ private:
ObArray<ObDDLMemtable *> ddl_memtables_;
};
template<class _callback>
int ObDDLKV::access_first_ddl_memtable(_callback &callback) const
{
int ret = OB_SUCCESS;
TCRLockGuard guard(lock_);
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
STORAGE_LOG(WARN, "not inited", K(ret));
} else if (OB_UNLIKELY(!is_inc_ddl_kv())) {
ret = OB_NOT_SUPPORTED;
STORAGE_LOG(WARN, "not support get for full direct load", K(ret));
} else if (ddl_memtables_.count() == 0) {
ret = OB_ENTRY_NOT_EXIST;
} else if (ddl_memtables_.count() != 1) {
ret = OB_NOT_SUPPORTED;
STORAGE_LOG(WARN, "inc direct load do not support column store yet", K(ret));
} else {
ObDDLMemtable *ddl_memtable = ddl_memtables_.at(0);
ret = callback(ddl_memtable);
}
return ret;
}
} // end namespace storage
} // end namespace oceanbase

View File

@ -24,25 +24,50 @@ namespace storage
template <class T>
class ObDDLKVMultiVersionRowIterator : public ObStoreRowIterator
{
private:
class IteratorInitializer
{
public:
IteratorInitializer(T &iterator,
const ObTableIterParam &param,
ObTableAccessContext &context,
const void *query_range)
: iterator_(iterator), param_(param), context_(context), query_range_(query_range)
{
}
int operator()(ObDDLMemtable *ddl_memtable)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(ddl_memtable)) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "unexpected ddl memtable is null", K(ret));
} else if (OB_FAIL(iterator_.init(param_, context_, ddl_memtable, query_range_))) {
STORAGE_LOG(WARN, "fail to init", K(ret));
}
return ret;
}
private:
T &iterator_;
const ObTableIterParam &param_;
ObTableAccessContext &context_;
const void *query_range_;
};
public:
ObDDLKVMultiVersionRowIterator() {}
ObDDLKVMultiVersionRowIterator() : is_empty_(false) { type_ = iterator_.get_iter_type(); }
virtual ~ObDDLKVMultiVersionRowIterator() {}
virtual void reuse()
{
iterator_.reuse();
is_empty_ = false;
ObStoreRowIterator::reuse();
}
virtual void reset()
{
iterator_.reset();
is_empty_ = false;
ObStoreRowIterator::reset();
}
// used for global query iterator pool, prepare for returning to pool
virtual void reclaim()
{
iterator_.reclaim();
ObStoreRowIterator::reclaim();
}
virtual int init(
const ObTableIterParam &param,
ObTableAccessContext &context,
@ -50,6 +75,7 @@ public:
const void *query_range)
{
int ret = common::OB_SUCCESS;
is_reclaimed_ = false;
if (OB_ISNULL(query_range) || OB_ISNULL(table)) {
ret = OB_INVALID_ARGUMENT;
STORAGE_LOG(WARN, "invalid argument", K(ret), KP(query_range), KP(table));
@ -57,29 +83,33 @@ public:
ret = OB_INVALID_ARGUMENT;
STORAGE_LOG(WARN, "invalid argument", K(ret), KPC(table));
} else {
ObDDLMemtable *ddl_memtable = nullptr;
if (OB_FAIL((static_cast<ObDDLKV *>(table)->get_first_ddl_memtable(ddl_memtable)))) {
STORAGE_LOG(WARN, "fail to get ddl memtable", K(ret));
} else if (OB_FAIL(iterator_.init(param, context, ddl_memtable, query_range))) {
STORAGE_LOG(WARN, "fail to init", K(ret));
} else {
type_ = iterator_.get_iter_type();
is_sstable_iter_ = false;
is_reclaimed_ = false;
ObDDLKV *ddl_kv = static_cast<ObDDLKV *>(table);
IteratorInitializer initializer(iterator_, param, context, query_range);
if (OB_FAIL(ddl_kv->access_first_ddl_memtable(initializer))) {
if (OB_UNLIKELY(OB_ENTRY_NOT_EXIST != ret)) {
STORAGE_LOG(WARN, "fail to access first ddl memtable", K(ret));
} else {
ret = OB_SUCCESS;
is_empty_ = true;
}
}
}
return ret;
}
virtual int set_ignore_shadow_row() { return iterator_.set_ignore_shadow_row(); }
virtual bool can_blockscan() const { return iterator_.can_blockscan(); }
virtual bool can_batch_scan() const { return iterator_.can_batch_scan(); }
virtual int get_next_row(const blocksstable::ObDatumRow *&row)
{
return iterator_.get_next_row(row);
int ret = OB_SUCCESS;
if (is_empty_) {
ret = OB_ITER_END;
} else {
ret = iterator_.get_next_row(row);
}
return ret;
}
private:
T iterator_;
bool is_empty_;
};
} // namespace storage

View File

@ -586,7 +586,7 @@ int ObTabletBackfillTXTask::get_backfill_tx_memtables_(
ret = OB_ERR_UNEXPECTED;
LOG_WARN("table should not be NULL or table type is unexpected", K(ret), KP(table));
} else if (table->is_direct_load_memtable()) {
ret = OB_NOT_SUPPORTED;
ret = OB_TRANSFER_SYS_ERROR;
LOG_WARN("find a direct load memtable", KR(ret), K(tablet_info_.tablet_id_), KPC(table));
} else if (FALSE_IT(memtable = static_cast<memtable::ObMemtable *>(table))) {
} else if (table->get_start_scn() >= backfill_tx_ctx_->log_sync_scn_

View File

@ -385,16 +385,20 @@ int ObTXTransferUtils::set_tablet_freeze_flag(storage::ObLS &ls, ObTablet *table
LOG_WARN("failed to get_memtable_mgr for get all memtable", K(ret), KPC(tablet));
} else {
CLICK();
ObITabletMemtable *mt = nullptr;
for (int64_t i = 0; OB_SUCC(ret) && i < memtables.count(); ++i) {
ObITable *table = memtables.at(i).get_table();
if (OB_ISNULL(table)) {
if (OB_UNLIKELY(memtables.at(i).get_tablet_memtable(mt))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("table in tables_handle is invalid", K(ret), KP(table));
LOG_WARN("table in tables_handle is not memtable", K(ret), K(memtables.at(i)));
} else if (!mt->is_active_memtable()) {
// skip
} else if (OB_UNLIKELY(!mt->is_data_memtable())) {
// incremental direct load hold table lock will block transfer scheduling, so there will be no active direct load memtable
ret = OB_TRANSFER_SYS_ERROR;
LOG_WARN("memtable is not data memtable", K(ret), KPC(mt));
} else {
memtable::ObMemtable *memtable = static_cast<memtable::ObMemtable *>(table);
if (memtable->is_active_memtable()) {
memtable->set_transfer_freeze(weak_read_scn);
}
memtable::ObMemtable *memtable = static_cast<memtable::ObMemtable *>(mt);
memtable->set_transfer_freeze(weak_read_scn);
}
}
if (OB_SUCC(ret)) {

View File

@ -1061,6 +1061,7 @@ int ObFreezer::do_direct_load_memtable_tablet_freeze_(ObITabletMemtable *tablet_
ObDDLKV *direct_load_memtable = static_cast<ObDDLKV*>(tablet_memtable);
if (OB_FAIL(direct_load_memtable->decide_right_boundary())) {
STORAGE_LOG(WARN, "freeze direct load memtable failed", KR(ret), K(ls_id), KPC(tablet_memtable));
} else if (FALSE_IT(direct_load_memtable->set_snapshot_version(get_freeze_snapshot_version()))) {
} else {
int64_t read_lock = LSLOCKALL;
int64_t write_lock = 0;

View File

@ -77,6 +77,7 @@
#include "storage/high_availability/ob_storage_ha_utils.h"
#include "storage/slog_ckpt/ob_tenant_checkpoint_slog_handler.h"
#include "storage/concurrency_control/ob_data_validation_service.h"
#include "storage/ddl/ob_tablet_ddl_kv.h"
using namespace oceanbase::share;
using namespace oceanbase::common;
@ -6067,7 +6068,19 @@ int ObLSTabletService::estimate_block_count_and_row_count(
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected null table", K(ret), K(tablet_iter.table_iter()));
} else if (table->is_direct_load_memtable()) {
// FIXME : @suzhi.yt
ObDDLKV *ddl_kv = static_cast<ObDDLKV *>(table);
int64_t macro_block_count_in_ddl_kv = 0;
int64_t micro_block_count_in_ddl_kv = 0;
int64_t row_count_in_ddl_kv = 0;
if (OB_FAIL(ddl_kv->get_block_count_and_row_count(macro_block_count_in_ddl_kv,
micro_block_count_in_ddl_kv,
row_count_in_ddl_kv))) {
LOG_WARN("fail to get block count and row count", K(ret));
} else {
macro_block_count += macro_block_count_in_ddl_kv;
micro_block_count += micro_block_count_in_ddl_kv;
sstable_row_count += row_count_in_ddl_kv;
}
} else if (table->is_data_memtable()) {
memtable_row_count += static_cast<memtable::ObMemtable *>(table)->get_physical_row_cnt();
} else if (table->is_sstable()) {

View File

@ -52,6 +52,7 @@
#include "storage/column_store/ob_column_oriented_sstable.h"
#include "storage/access/ob_row_sample_iterator.h"
#include "storage/concurrency_control/ob_trans_stat_row.h"
#include "storage/ddl/ob_tablet_ddl_kv.h"
#include "logservice/ob_log_service.h"
@ -99,6 +100,41 @@ ObGlobalMtAlloc &get_global_mt_alloc()
return s_alloc;
}
class ObDirectLoadMemtableRowsLockedChecker
{
public:
ObDirectLoadMemtableRowsLockedChecker(ObMemtable &memtable,
const bool check_exist,
const storage::ObTableIterParam &param,
storage::ObTableAccessContext &context,
ObRowsInfo &rows_info)
: memtable_(memtable),
check_exist_(check_exist),
param_(param),
context_(context),
rows_info_(rows_info)
{
}
int operator()(ObDDLMemtable *ddl_memtable)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(ddl_memtable)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected ddl memtable is null", K(ret));
} else if (OB_FAIL(memtable_.check_rows_locked_on_ddl_merge_sstable(
ddl_memtable, check_exist_, param_, context_, rows_info_))) {
TRANS_LOG(WARN, "Failed to check rows locked for sstable", K(ret), KPC(ddl_memtable));
}
return ret;
}
private:
ObMemtable &memtable_;
const bool check_exist_;
const storage::ObTableIterParam &param_;
storage::ObTableAccessContext &context_;
ObRowsInfo &rows_info_;
};
////////////////////////////////////////////////////////////////////////////////////////////////////
// Public Functions
@ -1209,21 +1245,6 @@ int ObMemtable::multi_scan(
return ret;
}
int ObMemtable::replay_schema_version_change_log(const int64_t schema_version)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
TRANS_LOG(WARN, "not init", K(*this));
ret = OB_NOT_INIT;
} else if (schema_version < 0) {
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(WARN, "invalid argument", K(ret), K(schema_version));
} else {
set_max_schema_version(schema_version);
}
return ret;
}
int ObMemtable::replay_row(ObStoreCtx &ctx,
const share::SCN &scn,
ObMemtableMutatorIterator *mmi)
@ -1432,7 +1453,33 @@ int ObMemtable::internal_lock_row_on_frozen_stores_(const bool check_exist,
K(tmp_lock_state),
K(row_state));
} else if (iter_tables.at(i)->is_direct_load_memtable()) {
TRANS_LOG(DEBUG, "skip check direct load memtable", KPC(iter_tables.at(i)));
ObDDLKV *ddl_kv = static_cast<ObDDLKV *>(iter_tables.at(i));
blocksstable::ObDatumRowkeyHelper rowkey_converter;
blocksstable::ObDatumRowkey datum_rowkey;
if (OB_FAIL(rowkey_converter.convert_datum_rowkey(key->get_rowkey()->get_rowkey(), datum_rowkey))) {
STORAGE_LOG(WARN, "Failed to convert datum rowkey", K(ret), KPC(key));
} else if (OB_FAIL(ddl_kv->check_row_locked(
param, datum_rowkey, context, tmp_lock_state, row_state, check_exist))) {
TRANS_LOG(WARN,
"direct load memtable check row lock fail",
K(ret),
KPC(key),
K(check_exist),
K(datum_rowkey),
K(lock_state),
K(tmp_lock_state),
K(row_state));
}
TRANS_LOG(DEBUG,
"direct load memtable check row lock debug",
K(ret),
KPC(key),
KPC(ddl_kv),
K(check_exist),
K(datum_rowkey),
K(lock_state),
K(tmp_lock_state),
K(row_state));
} else {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "unknown store type", K(ret), K(iter_tables), K(i));
@ -1673,8 +1720,16 @@ int ObMemtable::internal_lock_rows_on_frozen_stores_(
TRANS_LOG(WARN, "Failed to check rows locked for sstable", K(ret), K(i), K(iter_tables));
}
}
} else if (iter_tables.at(i)->is_direct_load_memtable()) {
TRANS_LOG(DEBUG, "skip check direct load memtable", KPC(iter_tables.at(i)));
} else if (i_table->is_direct_load_memtable()) {
ObDDLKV *ddl_kv = static_cast<ObDDLKV *>(i_table);
ObDirectLoadMemtableRowsLockedChecker checker(*this, check_exist, param, context, rows_info);
if (OB_FAIL(ddl_kv->access_first_ddl_memtable(checker))) {
if (OB_UNLIKELY(OB_ENTRY_NOT_EXIST != ret)) {
STORAGE_LOG(WARN, "fail to access first ddl memtable", K(ret), K(i), K(iter_tables));
} else {
ret = OB_SUCCESS;
}
}
} else {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "Unknown store type", K(ret), K(iter_tables), K(i));
@ -1868,12 +1923,12 @@ bool ObMemtable::ready_for_flush_()
} else if (is_frozen && get_logging_blocked()) {
// ensure unset all frozen memtables'logging_block
ObTableHandleV2 handle;
ObMemtable *first_frozen_memtable = nullptr;
ObITabletMemtable *first_frozen_memtable = nullptr;
ObTabletMemtableMgr *memtable_mgr = get_memtable_mgr();
if (OB_ISNULL(memtable_mgr)) {
} else if (OB_FAIL(memtable_mgr->get_first_frozen_memtable(handle))) {
TRANS_LOG(WARN, "fail to get first_frozen_memtable", K(ret));
} else if (OB_FAIL(handle.get_data_memtable(first_frozen_memtable))) {
} else if (OB_FAIL(handle.get_tablet_memtable(first_frozen_memtable))) {
TRANS_LOG(WARN, "fail to get memtable", K(ret));
} else if (first_frozen_memtable == this) {
(void)clear_logging_blocked();

View File

@ -225,6 +225,10 @@ public: // derived from ObITabletMemtable
virtual bool is_inited() const override { return is_inited_; }
virtual int64_t dec_write_ref() override;
virtual bool is_frozen_memtable() override;
virtual int get_schema_info(
const int64_t input_column_cnt,
int64_t &max_schema_version_on_memtable,
int64_t &max_column_cnt_on_memtable) const override;
public: // derived from ObITable
// ==================== Memtable Operation Interface ==================
@ -357,7 +361,6 @@ public: // derived from ObITable
storage::ObStoreCtx &ctx,
const share::SCN &scn,
ObMemtableMutatorIterator *mmi);
virtual int replay_schema_version_change_log(const int64_t schema_version);
virtual int safe_to_destroy(bool &is_safe);
public:
@ -369,10 +372,6 @@ public:
int64_t get_max_data_schema_version() const;
void set_max_column_cnt(const int64_t column_cnt);
int64_t get_max_column_cnt() const;
int get_schema_info(
const int64_t input_column_cnt,
int64_t &max_schema_version_on_memtable,
int64_t &max_column_cnt_on_memtable) const;
int row_compact(ObMvccRow *value,
const share::SCN snapshot_version,
const int64_t flag);
@ -450,7 +449,7 @@ public:
int check_cleanout(bool &is_all_cleanout,
bool &is_all_delay_cleanout,
int64_t &count);
int dump2text(const char *fname);
virtual int dump2text(const char *fname) override;
// TODO(handora.qc) ready_for_flush interface adjustment
virtual int finish_freeze();

View File

@ -16,6 +16,8 @@
#include "storage/memtable/mvcc/ob_mvcc_iterator.h"
#include "storage/memtable/ob_lock_wait_mgr.h"
#include "storage/tx_table/ob_tx_table_guards.h"
#include "storage/access/ob_rows_info.h"
#include "storage/ddl/ob_tablet_ddl_kv.h"
namespace oceanbase {
using namespace common;
@ -122,6 +124,16 @@ int ObRowConflictHandler::check_row_locked(const storage::ObTableIterParam &para
} else if (max_trans_version < lock_state.trans_version_) {
max_trans_version = lock_state.trans_version_;
}
} else if (stores->at(i)->is_direct_load_memtable()) {
ObDDLKV *ddl_kv = static_cast<ObDDLKV *>(stores->at(i));
if (OB_FAIL(ddl_kv->check_row_locked(param, rowkey, context, lock_state, row_state))) {
TRANS_LOG(WARN, "sstable check row lock fail", K(ret), K(rowkey));
} else if (lock_state.is_locked_) {
break;
} else if (max_trans_version < row_state.max_trans_version_) {
max_trans_version = row_state.max_trans_version_;
}
TRANS_LOG(DEBUG, "check_row_locked meet direct load memtable", K(ret), K(rowkey), K(row_state), K(*ddl_kv));
} else if (stores->at(i)->is_sstable()) {
blocksstable::ObSSTable *sstable = static_cast<blocksstable::ObSSTable *>(stores->at(i));
if (OB_FAIL(sstable->check_row_locked(param, rowkey, context, lock_state, row_state))) {

View File

@ -66,14 +66,14 @@ int ObIMemtableMgr::get_first_nonempty_memtable(ObTableHandleV2 &handle) const
for (int64_t i = memtable_head_; OB_SUCC(ret) && i < memtable_tail_; ++i) {
ObTableHandleV2 tmp_handle;
memtable::ObMemtable *mt = NULL;
ObITabletMemtable *mt = NULL;
if (OB_FAIL(get_ith_memtable(i, tmp_handle))) {
STORAGE_LOG(WARN, "fail to get ith memtable", KR(ret), K(i));
} else if (OB_UNLIKELY(!tmp_handle.is_valid())) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "get invalid tmp table handle", KR(ret), K(i), K(tmp_handle));
} else if (OB_FAIL(tmp_handle.get_data_memtable(mt))) {
STORAGE_LOG(WARN, "failed to get_data_memtable", KR(ret), K(i), K(tmp_handle));
} else if (OB_FAIL(tmp_handle.get_tablet_memtable(mt))) {
STORAGE_LOG(WARN, "failed to get_tablet_memtable", KR(ret), K(i), K(tmp_handle));
} else if (OB_ISNULL(mt)) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "mt is NULL", KR(ret), K(i), K(tmp_handle));

View File

@ -346,6 +346,21 @@ int64_t ObITabletMemtable::get_max_schema_version() const
return ATOMIC_LOAD(&max_schema_version_);
}
int ObITabletMemtable::replay_schema_version_change_log(const int64_t schema_version)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!is_inited())) {
TRANS_LOG(WARN, "not init", K(*this));
ret = OB_NOT_INIT;
} else if (schema_version < 0) {
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(WARN, "invalid argument", K(ret), K(schema_version));
} else {
set_max_schema_version(schema_version);
}
return ret;
}
int64_t ObITabletMemtable::inc_unsubmitted_cnt_() { return ATOMIC_AAF(&unsubmitted_cnt_, 1); }
int64_t ObITabletMemtable::dec_unsubmitted_cnt_() { return ATOMIC_SAF(&unsubmitted_cnt_, 1); }
int64_t ObITabletMemtable::inc_write_ref_() { return ATOMIC_AAF(&write_ref_cnt_, 1); }

View File

@ -250,6 +250,7 @@ public:
int set_migration_clog_checkpoint_scn(const share::SCN &clog_checkpoint_scn);
int resolve_left_boundary(share::SCN end_scn) { return set_start_scn(end_scn); }
int resolve_right_boundary();
int replay_schema_version_change_log(const int64_t schema_version);
int set_start_scn(const share::SCN start_scn);
int set_end_scn(const share::SCN end_scn);
int set_max_end_scn(const share::SCN scn, bool allow_backoff = false);
@ -289,6 +290,11 @@ public:
virtual void print_ready_for_flush() = 0;
virtual void set_allow_freeze(const bool allow_freeze) = 0;
virtual int set_frozen() = 0;
virtual int get_schema_info(
const int64_t input_column_cnt,
int64_t &max_schema_version_on_memtable,
int64_t &max_column_cnt_on_memtable) const = 0;
virtual int dump2text(const char *fname) = 0;
// *************** pure virtual functions *****************
public:

View File

@ -931,10 +931,6 @@ int ObPartitionRangeSpliter::get_single_range_info(const ObStoreRange &store_ran
if (OB_ISNULL(table)) {
ret = OB_INVALID_ARGUMENT;
STORAGE_LOG(WARN, "Invalid table pointer", K(ret), KP(table));
} else if (table->is_direct_load_memtable()) {
// FIXME : @suzhi.yt
ret = OB_NOT_SUPPORTED;
STORAGE_LOG(WARN, "get single range from direct load memtable not supported", KR(ret), KPC(table));
} else if (table->is_data_memtable()) {
memtable::ObMemtable *memtable = static_cast<memtable::ObMemtable *>(table);
int64_t row_count = 0;
@ -1010,6 +1006,11 @@ int ObPartitionRangeSpliter::get_single_range_info(const ObStoreRange &store_ran
}
}
}
} else if (table->is_direct_load_memtable()) {
// TODO : @suzhi.yt 可能会导致划分range不均衡, 后续实现
total_size = 0;
macro_block_cnt = 0;
estimate_micro_block_cnt = 0;
}
return ret;
}
@ -1092,10 +1093,6 @@ int ObPartitionRangeSpliter::split_ranges_memtable(ObRangeSplitInfo &range_info,
} else if (OB_ISNULL(table)) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "Unexpected null table", K(ret), KP(table), K(range_info));
} else if (table->is_direct_load_memtable()) {
// FIXME : @suzhi.yt support direct load memtable?
ret = OB_NOT_SUPPORTED;
STORAGE_LOG(WARN, "not supported memtable", KR(ret), KPC(table));
} else if (table->is_data_memtable()) {
ObSEArray<ObStoreRange, 16> store_ranges;
memtable::ObMemtable *memtable = static_cast<memtable::ObMemtable *>(table);
@ -1135,9 +1132,15 @@ int ObPartitionRangeSpliter::split_ranges_memtable(ObRangeSplitInfo &range_info,
}
}
STORAGE_LOG(DEBUG, "splite ranges with memtable", K(range_info), K(range_array));
} else if (table->is_direct_load_memtable()) {
// TODO : @suzhi.yt 可能会导致划分range不均衡, 后续实现
if (OB_FAIL(build_single_range(false/*for compaction*/, range_info, allocator, range_array))) {
STORAGE_LOG(WARN, "Failed to build single range", K(ret));
} else {
STORAGE_LOG(DEBUG, "try to make single split range for memtable", K(range_info), K(range_array));
}
}
return ret;
}
@ -1198,6 +1201,8 @@ int ObPartitionMultiRangeSpliter::get_split_tables(ObTableStoreIterator &table_i
memtable_size = MAX(mem_size, memtable_size);
max_memtable = table;
}
} else if (table->is_direct_load_memtable()) {
// TODO : @suzhi.yt 可能会导致划分range不均衡, 后续实现
}
}

View File

@ -3046,13 +3046,13 @@ int ObTablet::get_max_schema_version(int64_t &schema_version)
if (OB_FAIL(get_all_memtables(table_handle_array))) {
LOG_WARN("failed to get all memtable", K(ret), KPC(this));
} else {
const memtable::ObMemtable *memtable = nullptr;
const ObITabletMemtable *memtable = nullptr;
for (int64_t i = 0; OB_SUCC(ret) && i < table_handle_array.count(); ++i) {
const ObTableHandleV2 &handle = table_handle_array[i];
if (OB_UNLIKELY(!handle.is_valid())) {
ret = OB_ERR_SYS;
LOG_WARN("invalid memtable", K(ret), K(handle));
} else if (OB_FAIL(handle.get_data_memtable(memtable))) {
} else if (OB_FAIL(handle.get_tablet_memtable(memtable))) {
LOG_WARN("fail to get memtable", K(ret), K(handle));
} else if (OB_ISNULL(memtable)) {
ret = OB_ERR_SYS;
@ -4220,16 +4220,12 @@ int ObTablet::get_newest_schema_version(int64_t &schema_version) const
int64_t unused_max_column_cnt_on_memtable = 0;
for (int64_t idx = 0; OB_SUCC(ret) && idx < memtables.count(); ++idx) {
ObITable *table = memtables.at(idx);
if (table->is_data_memtable()) {
ObMemtable *memtable = static_cast<memtable::ObMemtable *>(table);
if (table->is_memtable()) {
ObITabletMemtable *memtable = static_cast<ObITabletMemtable *>(table);
if (OB_FAIL(memtable->get_schema_info(
store_column_cnt_in_schema, max_schema_version_on_memtable, unused_max_column_cnt_on_memtable))) {
LOG_WARN("failed to get schema info from memtable", KR(ret), KPC(table));
}
} else if (table->is_direct_load_memtable()) {
// FIXME : @suzhi.yt
// ret = OB_NOT_SUPPORTED;
LOG_INFO("find a direct load memtable", KR(ret));
}
}
if (OB_SUCC(ret)) {
@ -5245,14 +5241,14 @@ int ObTablet::replay_schema_version_change_log(const int64_t schema_version)
} else if (OB_FAIL(get_all_memtables(table_handle_array))) {
LOG_WARN("failed to get all memtable", K(ret), KPC(this));
} else {
memtable::ObMemtable *memtable = nullptr;
ObITabletMemtable *memtable = nullptr;
const int64_t table_num = table_handle_array.count();
if (0 == table_num) {
// no memtable, no need to replay schema version change
} else if (!table_handle_array[table_num - 1].is_valid()) {
ret = OB_ERR_SYS;
LOG_WARN("latest memtable is invalid", K(ret));
} else if (OB_FAIL(table_handle_array[table_num - 1].get_data_memtable(memtable))) {
} else if (OB_FAIL(table_handle_array[table_num - 1].get_tablet_memtable(memtable))) {
LOG_WARN("fail to get memtable", K(ret));
} else if (OB_ISNULL(memtable)) {
ret = OB_ERR_SYS;
@ -5459,7 +5455,7 @@ int ObTablet::get_rec_log_scn(SCN &rec_scn)
int ret = OB_SUCCESS;
rec_scn = SCN::max_scn();
ObTableHandleV2 handle;
memtable::ObMemtable *mt = NULL;
ObITabletMemtable *mt = NULL;
ObProtectedMemtableMgrHandle *protected_handle = NULL;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
@ -5472,8 +5468,8 @@ int ObTablet::get_rec_log_scn(SCN &rec_scn)
} else {
LOG_WARN("fail to get first memtable", KR(ret), K(handle));
}
} else if (OB_FAIL(handle.get_data_memtable(mt))) {
LOG_WARN("fail to get data memtables", KR(ret), K(handle));
} else if (OB_FAIL(handle.get_tablet_memtable(mt))) {
LOG_WARN("fail to get tablet memtable", KR(ret), K(handle));
} else if (OB_ISNULL(mt)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("mt is NULL", KR(ret), K(handle));
@ -5564,16 +5560,12 @@ int ObTablet::get_storage_schema_for_transfer_in(
if (OB_ISNULL(table)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("table in tables_handle is invalid", K(ret), KP(table));
} else if (table->is_data_memtable()) {
ObMemtable *memtable = static_cast<memtable::ObMemtable *>(table);
} else if (table->is_memtable()) {
ObITabletMemtable *memtable = static_cast<ObITabletMemtable *>(table);
if (OB_FAIL(memtable->get_schema_info(
store_column_cnt_in_schema, max_schema_version_in_memtable, max_column_cnt_in_memtable))) {
LOG_WARN("failed to get schema info from memtable", KR(ret), KPC(table));
}
} else if (table->is_direct_load_memtable()) {
// FIXME : @suzhi.yt
// ret = OB_NOT_SUPPORTED;
LOG_INFO("find a direct load memtable", KR(ret));
}
}

View File

@ -414,7 +414,7 @@ int ObTableStoreIterator::set_retire_check()
if (OB_UNLIKELY(!table_ptr.is_valid())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected invalid table handle", K(ret), K(table_ptr), K(*this));
} else if (table_ptr.table_->is_data_memtable()) {
} else if (table_ptr.table_->is_memtable()) {
first_memtable = table_ptr.table_;
} else {
break;
@ -422,7 +422,7 @@ int ObTableStoreIterator::set_retire_check()
}
if (OB_SUCC(ret) && OB_NOT_NULL(first_memtable)) {
memtable::ObMemtable *memtable = static_cast<memtable::ObMemtable *>(first_memtable);
ObITabletMemtable *memtable = static_cast<ObITabletMemtable *>(first_memtable);
memstore_retired_ = &memtable->get_read_barrier();
}
return ret;