From 980e5ce9d1704e69d0f3887a40380e177da35725 Mon Sep 17 00:00:00 2001 From: suz-yang Date: Fri, 10 May 2024 10:35:49 +0000 Subject: [PATCH] fix direct load memtable implement --- src/observer/ob_rpc_processor_simple.cpp | 11 +- .../table_load/ob_table_load_instance.cpp | 59 +--- .../table_load/ob_table_load_instance.h | 6 - .../ob_all_virtual_memstore_info.cpp | 34 +- .../ob_all_virtual_memstore_info.h | 2 +- src/storage/access/ob_sample_iter_helper.cpp | 4 +- src/storage/access/ob_table_estimator.cpp | 43 ++- .../compaction/ob_partition_merge_policy.cpp | 7 +- .../compaction/ob_tenant_tablet_scheduler.cpp | 11 +- src/storage/ddl/ob_ddl_merge_task.cpp | 15 +- src/storage/ddl/ob_tablet_ddl_kv.cpp | 334 ++++++++++-------- src/storage/ddl/ob_tablet_ddl_kv.h | 88 +++-- ...tablet_ddl_kv_multi_version_row_iterator.h | 70 ++-- .../ob_tablet_backfill_tx.cpp | 2 +- .../high_availability/ob_transfer_struct.cpp | 18 +- src/storage/ls/ob_freezer.cpp | 1 + src/storage/ls/ob_ls_tablet_service.cpp | 15 +- src/storage/memtable/ob_memtable.cpp | 95 +++-- src/storage/memtable/ob_memtable.h | 11 +- .../memtable/ob_row_conflict_handler.cpp | 12 + src/storage/ob_i_memtable_mgr.cpp | 6 +- src/storage/ob_i_tablet_memtable.cpp | 15 + src/storage/ob_i_tablet_memtable.h | 6 + src/storage/ob_partition_range_spliter.cpp | 23 +- src/storage/tablet/ob_tablet.cpp | 30 +- .../tablet/ob_tablet_table_store_iterator.cpp | 4 +- 26 files changed, 572 insertions(+), 350 deletions(-) diff --git a/src/observer/ob_rpc_processor_simple.cpp b/src/observer/ob_rpc_processor_simple.cpp index 906066d54..33dfb8a68 100644 --- a/src/observer/ob_rpc_processor_simple.cpp +++ b/src/observer/ob_rpc_processor_simple.cpp @@ -87,6 +87,7 @@ #include "share/ob_rpc_struct.h" #include "rootserver/ob_recovery_ls_service.h" #include "logservice/ob_server_log_block_mgr.h" +#include "storage/ddl/ob_tablet_ddl_kv.h" namespace oceanbase { @@ -974,14 +975,14 @@ int ObDumpMemtableP::process() } else if (OB_FAIL(tablet_handle.get_obj()->get_all_memtables(tables_handle))) { LOG_WARN("failed to get all memtable", K(ret), KPC(tablet_handle.get_obj())); } else { - memtable::ObMemtable *mt; + ObITabletMemtable *tablet_memtable = nullptr; mkdir("/tmp/dump_memtable/", S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH | S_IXOTH); for (int64_t i = 0; OB_SUCC(ret) && i < tables_handle.count(); i++) { - if (OB_FAIL(tables_handle.at(i).get_data_memtable(mt))) { - SERVER_LOG(WARN, "fail to get data memtables", K(ret)); + if (OB_FAIL(tables_handle.at(i).get_tablet_memtable(tablet_memtable))) { + SERVER_LOG(WARN, "fail to get tablet memtables", K(ret)); } else { - TRANS_LOG(INFO, "start dump memtable", K(*mt), K(arg_)); - mt->dump2text("/tmp/dump_memtable/memtable.txt"); + TRANS_LOG(INFO, "start dump memtable", K(*tablet_memtable), K(arg_)); + tablet_memtable->dump2text("/tmp/dump_memtable/memtable.txt"); } } } diff --git a/src/observer/table_load/ob_table_load_instance.cpp b/src/observer/table_load/ob_table_load_instance.cpp index 58b1a860d..94661d1c7 100644 --- a/src/observer/table_load/ob_table_load_instance.cpp +++ b/src/observer/table_load/ob_table_load_instance.cpp @@ -189,27 +189,18 @@ int ObTableLoadInstance::start_stmt(const ObTableLoadParam ¶m) stmt_ctx_.is_incremental_ = ObDirectLoadMethod::is_incremental(param.method_); stmt_ctx_.use_insert_into_select_tx_ = param.px_mode_; if (stmt_ctx_.is_incremental_) { // incremental direct-load - bool end_sql_tx_if_fail = false; - bool rollback_savepoint_if_fail = false; if (OB_FAIL(build_tx_param())) { LOG_WARN("fail to build tx param", KR(ret), K(stmt_ctx_)); } else if (OB_FAIL(start_sql_tx())) { LOG_WARN("fail to start sql tx", KR(ret), K(stmt_ctx_)); - } else if (FALSE_IT(end_sql_tx_if_fail = true)) { - } else if (OB_FAIL(create_implicit_savepoint())) { - LOG_WARN("fail to create implicit savepoint", KR(ret), K(stmt_ctx_)); } else if (OB_FAIL(lock_table_in_tx())) { LOG_WARN("fail to lock table in tx", KR(ret), K(stmt_ctx_)); - } else if (FALSE_IT(rollback_savepoint_if_fail = true)) { } else if (OB_FAIL(init_ddl_param_for_inc_direct_load())) { LOG_WARN("fail to init ddl param for inc direct load", KR(ret), K(stmt_ctx_)); } if (OB_FAIL(ret)) { int tmp_ret = OB_SUCCESS; - if (rollback_savepoint_if_fail && OB_TMP_FAIL(rollback_to_implicit_savepoint())) { - LOG_WARN("fail to rollback to implicit savepoint", KR(tmp_ret)); - } - if (end_sql_tx_if_fail && OB_TMP_FAIL(end_sql_tx(false /*commit*/))) { + if (OB_TMP_FAIL(end_sql_tx(false /*commit*/))) { LOG_WARN("fail to end sql tx", KR(tmp_ret)); } } @@ -230,10 +221,6 @@ int ObTableLoadInstance::end_stmt(const bool commit) int ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS; if (stmt_ctx_.is_incremental_) { - // rollback in fail only, ignore ret - if (!commit && OB_TMP_FAIL(rollback_to_implicit_savepoint())) { - LOG_WARN("fail to rollback to implicit savepoint", KR(tmp_ret)); - } if (OB_FAIL(end_sql_tx(commit))) { LOG_WARN("fail to end sql tx", KR(ret)); } @@ -367,50 +354,6 @@ int ObTableLoadInstance::end_sql_tx(const bool commit) return ret; } -int ObTableLoadInstance::create_implicit_savepoint() -{ - int ret = OB_SUCCESS; - ObTransService *txs = MTL(ObTransService *); - ObTxDesc *tx_desc = stmt_ctx_.tx_desc_; - if (stmt_ctx_.use_insert_into_select_tx_) { - // do nothing - } else { - ObTxSEQ savepoint; - if (OB_FAIL(txs->create_implicit_savepoint(*tx_desc, stmt_ctx_.tx_param_, savepoint))) { - LOG_WARN("fail to create implicit savepoint", KR(ret)); - } else { - stmt_ctx_.savepoint_ = savepoint; - LOG_INFO("create implicit savepoint succeed", KPC(tx_desc), K(savepoint)); - } - } - return ret; -} - -int ObTableLoadInstance::rollback_to_implicit_savepoint() -{ - int ret = OB_SUCCESS; - ObTransService *txs = MTL(ObTransService *); - ObSQLSessionInfo *session_info = stmt_ctx_.session_info_; - ObTxDesc *tx_desc = stmt_ctx_.tx_desc_; - if (stmt_ctx_.use_insert_into_select_tx_) { - // do nothing - } else { - if (!stmt_ctx_.savepoint_.is_valid()) { - // do nothing - } else { - const int64_t stmt_timeout_ts = get_stmt_expire_ts(session_info); - const ObTxSEQ savepoint = stmt_ctx_.savepoint_; - if (OB_FAIL(txs->rollback_to_implicit_savepoint(*tx_desc, savepoint, stmt_timeout_ts, nullptr))) { - LOG_WARN("failed to rollback to implicit savepoint", KR(ret), KPC(tx_desc)); - } else { - stmt_ctx_.savepoint_.reset(); - LOG_INFO("rollback to implicit savepoint succeed", KPC(tx_desc), K(savepoint)); - } - } - } - return ret; -} - int ObTableLoadInstance::lock_table_in_tx() { int ret = OB_SUCCESS; diff --git a/src/observer/table_load/ob_table_load_instance.h b/src/observer/table_load/ob_table_load_instance.h index 4df3aa16f..299b2edb0 100644 --- a/src/observer/table_load/ob_table_load_instance.h +++ b/src/observer/table_load/ob_table_load_instance.h @@ -54,9 +54,6 @@ private: int build_tx_param(); int start_sql_tx(); int end_sql_tx(const bool commit); - // abort tx is async, use rollback savepoint to sync release table lock - int create_implicit_savepoint(); - int rollback_to_implicit_savepoint(); int lock_table_in_tx(); int init_ddl_param_for_inc_direct_load(); // full @@ -94,7 +91,6 @@ private: session_info_ = nullptr; tx_desc_ = nullptr; // tx_param_.reset(); - savepoint_.reset(); is_incremental_ = false; use_insert_into_select_tx_ = false; is_started_ = false; @@ -107,7 +103,6 @@ private: KP_(session_info), KPC_(tx_desc), K_(tx_param), - K_(savepoint), KP_(is_incremental), KP_(use_insert_into_select_tx), KP_(is_started), @@ -119,7 +114,6 @@ private: sql::ObSQLSessionInfo *session_info_; transaction::ObTxDesc *tx_desc_; transaction::ObTxParam tx_param_; - transaction::ObTxSEQ savepoint_; bool is_incremental_; bool use_insert_into_select_tx_; // whether use the transaction of insert into select bool is_started_; diff --git a/src/observer/virtual_table/ob_all_virtual_memstore_info.cpp b/src/observer/virtual_table/ob_all_virtual_memstore_info.cpp index d91917640..9f3252770 100644 --- a/src/observer/virtual_table/ob_all_virtual_memstore_info.cpp +++ b/src/observer/virtual_table/ob_all_virtual_memstore_info.cpp @@ -139,7 +139,7 @@ int ObAllVirtualMemstoreInfo::get_next_tablet(ObTabletHandle &tablet_handle) return ret; } -int ObAllVirtualMemstoreInfo::get_next_memtable(memtable::ObMemtable *&mt) +int ObAllVirtualMemstoreInfo::get_next_memtable(ObITabletMemtable *&mt) { int ret = OB_SUCCESS; @@ -164,7 +164,7 @@ int ObAllVirtualMemstoreInfo::get_next_memtable(memtable::ObMemtable *&mt) } else if (OB_FAIL(tablet_handle.get_obj()->get_all_memtables(tables_handle_))) { SERVER_LOG(WARN, "failed to get_memtable_mgr for get all memtable", K(ret), KPC(tablet_handle.get_obj())); } - } else if (OB_FAIL(tables_handle_.at(memtable_array_pos_++).get_data_memtable(mt))) { + } else if (OB_FAIL(tables_handle_.at(memtable_array_pos_++).get_tablet_memtable(mt))) { // get next memtable ret = OB_SUCCESS; } else if (OB_ISNULL(mt)) { @@ -197,7 +197,7 @@ void ObAllVirtualMemstoreInfo::get_freeze_time_dist(const ObMtStat& mt_stat) int ObAllVirtualMemstoreInfo::process_curr_tenant(ObNewRow *&row) { int ret = OB_SUCCESS; - ObMemtable *mt = NULL; + ObITabletMemtable *mt = NULL; if (NULL == allocator_) { ret = OB_NOT_INIT; SERVER_LOG(WARN, "allocator_ shouldn't be NULL", K(allocator_), K(ret)); @@ -214,6 +214,10 @@ int ObAllVirtualMemstoreInfo::process_curr_tenant(ObNewRow *&row) } else { ObMtStat& mt_stat = mt->get_mt_stat(); const int64_t col_count = output_column_ids_.count(); + memtable::ObMemtable *data_memtable = NULL; + if (mt->is_data_memtable()) { + data_memtable = static_cast(mt); + } for (int64_t i = 0; OB_SUCC(ret) && i < col_count; ++i) { uint64_t col_id = output_column_ids_.at(i); switch (col_id) { @@ -284,19 +288,35 @@ int ObAllVirtualMemstoreInfo::process_curr_tenant(ObNewRow *&row) break; case OB_APP_MIN_COLUMN_ID + 14: // hash_item_count - cur_row_.cells_[i].set_int(mt->get_hash_item_count()); + if (nullptr != data_memtable) { + cur_row_.cells_[i].set_int(data_memtable->get_hash_item_count()); + } else { + cur_row_.cells_[i].set_int(0); + } break; case OB_APP_MIN_COLUMN_ID + 15: // hash_mem_used - cur_row_.cells_[i].set_int(mt->get_hash_alloc_memory()); + if (nullptr != data_memtable) { + cur_row_.cells_[i].set_int(data_memtable->get_hash_alloc_memory()); + } else { + cur_row_.cells_[i].set_int(0); + } break; case OB_APP_MIN_COLUMN_ID + 16: // btree_item_count - cur_row_.cells_[i].set_int(mt->get_btree_item_count()); + if (nullptr != data_memtable) { + cur_row_.cells_[i].set_int(data_memtable->get_btree_item_count()); + } else { + cur_row_.cells_[i].set_int(0); + } break; case OB_APP_MIN_COLUMN_ID + 17: // btree_mem_used - cur_row_.cells_[i].set_int(mt->get_btree_alloc_memory()); + if (nullptr != data_memtable) { + cur_row_.cells_[i].set_int(data_memtable->get_btree_alloc_memory()); + } else { + cur_row_.cells_[i].set_int(0); + } break; case OB_APP_MIN_COLUMN_ID + 18: // insert_row_count diff --git a/src/observer/virtual_table/ob_all_virtual_memstore_info.h b/src/observer/virtual_table/ob_all_virtual_memstore_info.h index d291bea1f..acec59a1d 100644 --- a/src/observer/virtual_table/ob_all_virtual_memstore_info.h +++ b/src/observer/virtual_table/ob_all_virtual_memstore_info.h @@ -49,7 +49,7 @@ private: virtual void release_last_tenant() override; int get_next_ls(ObLS *&ls); int get_next_tablet(storage::ObTabletHandle &tablet_handle); - int get_next_memtable(memtable::ObMemtable *&mt); + int get_next_memtable(storage::ObITabletMemtable *&mt); void get_freeze_time_dist(const ObMtStat& mt_stat); private: common::ObAddr addr_; diff --git a/src/storage/access/ob_sample_iter_helper.cpp b/src/storage/access/ob_sample_iter_helper.cpp index 1d76d69d9..fb29b60cd 100644 --- a/src/storage/access/ob_sample_iter_helper.cpp +++ b/src/storage/access/ob_sample_iter_helper.cpp @@ -13,6 +13,7 @@ #include "storage/access/ob_sample_iter_helper.h" #include "storage/memtable/ob_memtable.h" #include "storage/access/ob_multiple_multi_scan_merge.h" +#include "storage/ddl/ob_tablet_ddl_kv.h" namespace oceanbase { namespace storage { @@ -86,7 +87,8 @@ int ObGetSampleIterHelper::can_retire_to_memtable_row_sample_(bool &retire, ObIA memtable_row_count += memtable->get_physical_row_cnt(); } } else if (table->is_direct_load_memtable()) { - // FIXEM : @suzhi.yt support direct load memtable get_row_count(); + ObDDLKV *ddl_kv = static_cast(table); + sstable_row_count += ddl_kv->get_row_count(); } else if (table->is_sstable()) { sstable_row_count += static_cast(table)->get_row_count(); } diff --git a/src/storage/access/ob_table_estimator.cpp b/src/storage/access/ob_table_estimator.cpp index aa18e22b8..640eaf9b2 100644 --- a/src/storage/access/ob_table_estimator.cpp +++ b/src/storage/access/ob_table_estimator.cpp @@ -18,7 +18,7 @@ #include "storage/memtable/mvcc/ob_mvcc_engine.h" #include "storage/memtable/mvcc/ob_mvcc_iterator.h" #include "storage/column_store/ob_column_oriented_sstable.h" - +#include "storage/ddl/ob_tablet_ddl_kv.h" namespace oceanbase { @@ -26,6 +26,35 @@ using namespace blocksstable; namespace storage { +class ObDirectLoadMemtableScanRowCountEstimator +{ +public: + ObDirectLoadMemtableScanRowCountEstimator(const ObTableEstimateBaseInput &base_input, + const ObDatumRange &range, + ObPartitionEst &tmp_cost) + : base_input_(base_input), range_(range), tmp_cost_(tmp_cost) + { + } + int operator()(ObDDLMemtable *ddl_memtable) + { + int ret = OB_SUCCESS; + if (OB_ISNULL(ddl_memtable)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected ddl memtable is null", K(ret)); + } else if (OB_FAIL(ObTableEstimator::estimate_sstable_scan_row_count(base_input_, + ddl_memtable, + range_, + tmp_cost_))) { + LOG_WARN("failed to estimate sstable row count", K(ret), KPC(ddl_memtable)); + } + return ret; + } +private: + const ObTableEstimateBaseInput &base_input_; + const ObDatumRange &range_; + ObPartitionEst &tmp_cost_; +}; + int ObTableEstimator::estimate_row_count_for_get( ObTableEstimateBaseInput &base_input, const ObExtSRowkeyArray &rowkeys, @@ -154,9 +183,15 @@ int ObTableEstimator::estimate_multi_scan_row_count( } } } else if (current_table->is_direct_load_memtable()) { - // FIXME : @suzhi.yt - ret = OB_NOT_SUPPORTED; - LOG_WARN("not supported memtable", KR(ret), KPC(current_table)); + ObDDLKV *ddl_kv = static_cast(current_table); + ObDirectLoadMemtableScanRowCountEstimator estimator(base_input, range, tmp_cost); + if (OB_FAIL(ddl_kv->access_first_ddl_memtable(estimator))) { + if (OB_UNLIKELY(OB_ENTRY_NOT_EXIST != ret)) { + STORAGE_LOG(WARN, "fail to access first ddl memtable", K(ret), KPC(current_table)); + } else { + ret = OB_SUCCESS; + } + } } else { ret = OB_ERR_UNEXPECTED; LOG_WARN("unexpected table type", K(ret), K(*current_table)); diff --git a/src/storage/compaction/ob_partition_merge_policy.cpp b/src/storage/compaction/ob_partition_merge_policy.cpp index e35c19e24..dd71a01d3 100644 --- a/src/storage/compaction/ob_partition_merge_policy.cpp +++ b/src/storage/compaction/ob_partition_merge_policy.cpp @@ -300,9 +300,6 @@ int ObPartitionMergePolicy::find_mini_merge_tables( } else if (OB_UNLIKELY(memtable->is_active_memtable())) { LOG_DEBUG("skip active memtable", K(i), KPC(memtable), K(memtable_handles)); break; - } else if (OB_UNLIKELY(memtable->is_direct_load_memtable())) { - LOG_DEBUG("skip direct load memtable", K(i), KPC(memtable), K(memtable_handles)); - break; } else if (!memtable->can_be_minor_merged()) { FLOG_INFO("memtable cannot mini merge now", K(ret), K(i), KPC(memtable), K(max_snapshot_version), K(memtable_handles), K(param)); break; @@ -1022,7 +1019,7 @@ int ObPartitionMergePolicy::check_need_medium_merge( can_merge = tablet.get_snapshot_version() >= medium_snapshot; if (!can_merge) { ObTableHandleV2 memtable_handle; - memtable::ObMemtable *last_frozen_memtable = nullptr; + ObITabletMemtable *last_frozen_memtable = nullptr; if (OB_FAIL(tablet.get_protected_memtable_mgr_handle(protected_handle))) { LOG_WARN("failed to get_protected_memtable_mgr_handle", K(ret), K(tablet)); } else if (OB_FAIL(protected_handle->get_last_frozen_memtable(memtable_handle))) { @@ -1032,7 +1029,7 @@ int ObPartitionMergePolicy::check_need_medium_merge( } else { LOG_WARN("failed to get last frozen memtable", K(ret), K(tablet)); } - } else if (OB_FAIL(memtable_handle.get_data_memtable(last_frozen_memtable))) { + } else if (OB_FAIL(memtable_handle.get_tablet_memtable(last_frozen_memtable))) { LOG_WARN("failed to get last frozen memtable", K(ret)); } else { need_force_freeze = last_frozen_memtable->get_snapshot_version() < medium_snapshot; diff --git a/src/storage/compaction/ob_tenant_tablet_scheduler.cpp b/src/storage/compaction/ob_tenant_tablet_scheduler.cpp index 4079d2287..5bc46cb9a 100644 --- a/src/storage/compaction/ob_tenant_tablet_scheduler.cpp +++ b/src/storage/compaction/ob_tenant_tablet_scheduler.cpp @@ -91,7 +91,7 @@ int ObFastFreezeChecker::check_need_fast_freeze( int ret = OB_SUCCESS; need_fast_freeze = false; ObTableHandleV2 table_handle; - memtable::ObMemtable *memtable = nullptr; + ObITabletMemtable *memtable = nullptr; const share::ObLSID &ls_id = tablet.get_tablet_meta().ls_id_; const common::ObTabletID &tablet_id = tablet.get_tablet_meta().tablet_id_; @@ -104,23 +104,26 @@ int ObFastFreezeChecker::check_need_fast_freeze( } else { LOG_WARN("[FastFreeze] failed to get active memtable", K(ret)); } - } else if (OB_FAIL(table_handle.get_data_memtable(memtable))) { + } else if (OB_FAIL(table_handle.get_tablet_memtable(memtable))) { LOG_WARN("[FastFreeze] failed to get memtalbe", K(ret), K(table_handle)); } else if (OB_ISNULL(memtable)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("[FastFreeze] get unexpected null memtable", K(ret), KPC(memtable)); } else if (!memtable->is_active_memtable()) { // do nothing + } else if (!memtable->is_data_memtable()) { + // do nothing } else if (ObTimeUtility::current_time() < memtable->get_timestamp() + FAST_FREEZE_INTERVAL_US) { if (REACH_TENANT_TIME_INTERVAL(PRINT_LOG_INVERVAL)) { LOG_INFO("[FastFreeze] no need to check fast freeze now", K(tablet)); } } else { - check_hotspot_need_fast_freeze(*memtable, need_fast_freeze); + memtable::ObMemtable *mt = static_cast(memtable); + check_hotspot_need_fast_freeze(*mt, need_fast_freeze); if (need_fast_freeze) { FLOG_INFO("[FastFreeze] tablet detects hotspot row, need fast freeze", K(ls_id), K(tablet_id)); } else { - check_tombstone_need_fast_freeze(tablet, *memtable, need_fast_freeze); + check_tombstone_need_fast_freeze(tablet, *mt, need_fast_freeze); if (need_fast_freeze) { FLOG_INFO("[FastFreeze] tablet detects tombstone, need fast freeze", K(ls_id), K(tablet_id)); } diff --git a/src/storage/ddl/ob_ddl_merge_task.cpp b/src/storage/ddl/ob_ddl_merge_task.cpp index 5d9f8a785..7deeee68b 100644 --- a/src/storage/ddl/ob_ddl_merge_task.cpp +++ b/src/storage/ddl/ob_ddl_merge_task.cpp @@ -886,10 +886,17 @@ int ObTabletDDLUtil::update_ddl_table_store( } else { const bool is_major_sstable = ddl_param.table_key_.is_major_sstable(); const int64_t rebuild_seq = ls.get_rebuild_seq(); - const int64_t snapshot_version = is_major_sstable ? max(ddl_param.snapshot_version_, tablet.get_snapshot_version()) - : tablet.get_snapshot_version(); - const int64_t multi_version_start = is_major_sstable ? max(ddl_param.snapshot_version_, tablet.get_multi_version_start()) - : 0; + int64_t snapshot_version = 0; + int64_t multi_version_start = 0; + if (is_full_direct_load(ddl_param.direct_load_type_)) { + snapshot_version = is_major_sstable ? max(ddl_param.snapshot_version_, tablet.get_snapshot_version()) + : tablet.get_snapshot_version(); + multi_version_start = is_major_sstable ? max(ddl_param.snapshot_version_, tablet.get_multi_version_start()) + : 0; + } else { + snapshot_version = max(ddl_param.snapshot_version_, tablet.get_snapshot_version()); + multi_version_start = tablet.get_multi_version_start(); + } ObTabletHandle new_tablet_handle; ObUpdateTableStoreParam table_store_param(sstable, snapshot_version, diff --git a/src/storage/ddl/ob_tablet_ddl_kv.cpp b/src/storage/ddl/ob_tablet_ddl_kv.cpp index 5a31fdad4..204631423 100644 --- a/src/storage/ddl/ob_tablet_ddl_kv.cpp +++ b/src/storage/ddl/ob_tablet_ddl_kv.cpp @@ -874,6 +874,7 @@ ObDDLKV::ObDDLKV() : is_inited_(false), is_closed_(false), is_inc_ddl_kv_(false), is_independent_freezed_(false), lock_(), arena_allocator_("DDL_CONTAINER", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()), tablet_id_(), ddl_start_scn_(SCN::min_scn()), ddl_snapshot_version_(0), data_format_version_(0), trans_id_(), + data_schema_version_(0), column_count_(0), min_scn_(SCN::max_scn()), max_scn_(SCN::min_scn()), pending_cnt_(0), macro_block_count_(0) { @@ -935,6 +936,8 @@ void ObDDLKV::reset() ddl_snapshot_version_ = 0; data_format_version_ = 0; trans_id_.reset(); + data_schema_version_ = 0; + column_count_ = 0; min_scn_ = SCN::max_scn(); max_scn_ = SCN::min_scn(); @@ -1013,11 +1016,6 @@ int ObDDLKV::get_ddl_memtable(const int64_t cg_idx, ObDDLMemtable *&ddl_memtable return ret; } -int ObDDLKV::get_first_ddl_memtable(ObDDLMemtable *&ddl_memtable) -{ - return get_ddl_memtable(0, ddl_memtable); -} - int ObDDLKV::set_macro_block( ObTablet &tablet, const ObDDLMacroBlock ¯o_block, @@ -1145,6 +1143,12 @@ int ObDDLKV::set_macro_block( min_scn_ = SCN::min(min_scn_, macro_block.scn_); max_scn_ = SCN::max(max_scn_, macro_block.scn_); ++macro_block_count_; + if (0 == data_schema_version_) { + data_schema_version_ = data_macro_meta->val_.schema_version_; + } + if (0 == column_count_) { + column_count_ = data_macro_meta->val_.column_count_ - ObMultiVersionRowkeyHelpper::get_extra_rowkey_col_cnt(); + } LOG_INFO("succeed to set macro block into ddl kv", K(macro_block), K(macro_block_count_), KPC(data_macro_meta)); } } @@ -1584,19 +1588,33 @@ void ObDDLKV::set_allow_freeze(const bool allow_freeze) } int ObDDLKV::get_frozen_schema_version(int64_t &schema_version) const +{ + UNUSED(schema_version); + return OB_NOT_SUPPORTED; +} + +bool ObDDLKV::can_be_minor_merged() +{ + return ready_for_flush() && ObITabletMemtable::can_be_minor_merged(); +} + +int ObDDLKV::get_schema_info( + const int64_t input_column_cnt, + int64_t &max_schema_version_on_memtable, + int64_t &max_column_cnt_on_memtable) const { int ret = OB_SUCCESS; - TCRLockGuard guard(lock_); - if (OB_UNLIKELY(!is_inc_ddl_kv())) { - ret = OB_NOT_SUPPORTED; - LOG_WARN("not support get for full direct load", K(ret)); - } else if (ddl_memtables_.count() == 0) { - schema_version = 0; - } else if (OB_UNLIKELY(ddl_memtables_.count() != 1)) { - ret = OB_NOT_SUPPORTED; - LOG_WARN("inc direct load do not support column store yet", K(ret)); - } else if (OB_FAIL(ddl_memtables_.at(0)->get_frozen_schema_version(schema_version))) { - LOG_WARN("fail to get row", K(ret)); + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + LOG_WARN("not inited", K(ret)); + } else { + TCRLockGuard guard(lock_); + if (column_count_ >= input_column_cnt) { + LOG_INFO("column cnt or schema version is updated by ddl kv", KPC(this), + K(max_column_cnt_on_memtable), K(max_schema_version_on_memtable)); + max_column_cnt_on_memtable = MAX(max_column_cnt_on_memtable, column_count_); + max_schema_version_on_memtable = MAX(max_schema_version_on_memtable, data_schema_version_); + } } return ret; } @@ -1625,13 +1643,18 @@ int ObDDLKV::get_frozen_schema_version(int64_t &schema_version) const } \ } -int ObDDLKV::exist(const ObTableIterParam ¶m, ObTableAccessContext &context, - const blocksstable::ObDatumRowkey &rowkey, bool &is_exist, bool &has_found) +int ObDDLKV::exist( + const ObTableIterParam ¶m, + ObTableAccessContext &context, + const ObDatumRowkey &rowkey, + bool &is_exist, + bool &has_found) { int ret = OB_SUCCESS; TCRLockGuard guard(lock_); if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; + LOG_WARN("not inited", K(ret)); } else if (OB_UNLIKELY(!is_inc_ddl_kv())) { ret = OB_NOT_SUPPORTED; LOG_WARN("not support get for full direct load", K(ret)); @@ -1653,6 +1676,7 @@ int ObDDLKV::exist(ObRowsInfo &rowsInfo, bool &is_exist, bool &has_found) TCRLockGuard guard(lock_); if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; + LOG_WARN("not inited", K(ret)); } else if (OB_UNLIKELY(!is_inc_ddl_kv())) { ret = OB_NOT_SUPPORTED; LOG_WARN("not support get for full direct load", K(ret)); @@ -1668,13 +1692,16 @@ int ObDDLKV::exist(ObRowsInfo &rowsInfo, bool &is_exist, bool &has_found) return ret; } -int ObDDLKV::scan(const ObTableIterParam ¶m, ObTableAccessContext &context, - const blocksstable::ObDatumRange &key_range, ObStoreRowIterator *&row_iter) +int ObDDLKV::scan( + const ObTableIterParam ¶m, + ObTableAccessContext &context, + const ObDatumRange &key_range, + ObStoreRowIterator *&row_iter) { int ret = OB_SUCCESS; - TCRLockGuard guard(lock_); - if (OB_UNLIKELY(!is_inited_)) { + if (OB_UNLIKELY(IS_NOT_INIT)) { ret = OB_NOT_INIT; + LOG_WARN("not inited", K(ret)); } else if (OB_UNLIKELY( !param.is_valid() || !context.is_valid() @@ -1684,26 +1711,22 @@ int ObDDLKV::scan(const ObTableIterParam ¶m, ObTableAccessContext &context, } else if (OB_UNLIKELY(!is_inc_ddl_kv())) { ret = OB_NOT_SUPPORTED; LOG_WARN("not support get for full direct load", K(ret)); - } else if (OB_UNLIKELY(ddl_memtables_.count() == 0)) { - if (OB_FAIL(get_empty_iter(param, context, &key_range, row_iter))) { - LOG_WARN("fail to get empty iter", K(ret)); - } - } else if (ddl_memtables_.count() != 1) { - ret = OB_NOT_SUPPORTED; - LOG_WARN("inc direct load do not support column store yet", K(ret)); } else { ALLOCATE_DDL_KV_MULTI_VERSION_ROW_IETRATOR(ObSSTableMultiVersionRowScanner, &key_range, row_iter); } return ret; } -int ObDDLKV::get(const storage::ObTableIterParam ¶m, storage::ObTableAccessContext &context, - const blocksstable::ObDatumRowkey &rowkey, ObStoreRowIterator *&row_iter) +int ObDDLKV::get( + const ObTableIterParam ¶m, + ObTableAccessContext &context, + const ObDatumRowkey &rowkey, + ObStoreRowIterator *&row_iter) { int ret = OB_SUCCESS; - TCRLockGuard guard(lock_); - if (OB_UNLIKELY(!is_inited_)) { + if (OB_UNLIKELY(IS_NOT_INIT)) { ret = OB_NOT_INIT; + LOG_WARN("not inited", K(ret)); } else if (OB_UNLIKELY( !param.is_valid() || !context.is_valid() @@ -1713,92 +1736,33 @@ int ObDDLKV::get(const storage::ObTableIterParam ¶m, storage::ObTableAccessC } else if (OB_UNLIKELY(!is_inc_ddl_kv())) { ret = OB_NOT_SUPPORTED; LOG_WARN("not support get for full direct load", K(ret)); - } else if (OB_UNLIKELY(ddl_memtables_.count() == 0)) { - if (OB_FAIL(get_empty_iter(param, context, &rowkey, row_iter))) { - LOG_WARN("fail to get empty iter", K(ret)); - } - } else if (ddl_memtables_.count() != 1) { - ret = OB_NOT_SUPPORTED; - LOG_WARN("inc direct load do not support column store yet", K(ret)); } else { ALLOCATE_DDL_KV_MULTI_VERSION_ROW_IETRATOR(ObSSTableMultiVersionRowGetter, &rowkey, row_iter); } return ret; } -int ObDDLKV::multi_get(const ObTableIterParam ¶m, ObTableAccessContext &context, - const common::ObIArray &rowkeys, ObStoreRowIterator *&row_iter) -{ - int ret = OB_SUCCESS; - TCRLockGuard guard(lock_); - if (OB_UNLIKELY(!is_inited_)) { - ret = OB_NOT_INIT; - } else if (OB_UNLIKELY( - !param.is_valid() - || !context.is_valid() - || 0 >= rowkeys.count())) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("Invalid argument", K(ret), K(param), K(context), K(rowkeys)); - } else if (OB_UNLIKELY(!is_inc_ddl_kv())) { - ret = OB_NOT_SUPPORTED; - LOG_WARN("not support get for full direct load", K(ret)); - } else if (OB_UNLIKELY(ddl_memtables_.count() == 0)) { - if (OB_FAIL(get_empty_iter(param, context, &rowkeys, row_iter))) { - LOG_WARN("fail to get empty iter", K(ret)); - } - } else if (ddl_memtables_.count() != 1) { - ret = OB_NOT_SUPPORTED; - LOG_WARN("inc direct load do not support column store yet", K(ret)); - } else { - ALLOCATE_DDL_KV_MULTI_VERSION_ROW_IETRATOR(ObSSTableMultiVersionRowMultiGetter, &rowkeys, row_iter); - } - return ret; -} - -int ObDDLKV::multi_scan(const ObTableIterParam ¶m, ObTableAccessContext &context, - const common::ObIArray &ranges, ObStoreRowIterator *&row_iter) -{ - int ret = OB_SUCCESS; - TCRLockGuard guard(lock_); - if (OB_UNLIKELY(!is_inited_)) { - ret = OB_NOT_INIT; - } else if (OB_UNLIKELY( - !param.is_valid() - || !context.is_valid() - || 0 >= ranges.count())) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("Invalid argument", K(ret), K(param), K(context), K(ranges)); - } else if (OB_UNLIKELY(!is_inc_ddl_kv())) { - ret = OB_NOT_SUPPORTED; - LOG_WARN("not support get for full direct load", K(ret)); - } else if (OB_UNLIKELY(ddl_memtables_.count() == 0)) { - if (OB_FAIL(get_empty_iter(param, context, &ranges, row_iter))) { - LOG_WARN("fail to get empty iter", K(ret)); - } - } else if (ddl_memtables_.count() != 1) { - ret = OB_NOT_SUPPORTED; - LOG_WARN("inc direct load do not support column store yet", K(ret)); - } else { - ALLOCATE_DDL_KV_MULTI_VERSION_ROW_IETRATOR(ObSSTableMultiVersionRowMultiScanner, &ranges, row_iter); - } - return ret; -} - -int ObDDLKV::get(const storage::ObTableIterParam ¶m, storage::ObTableAccessContext &context, - const blocksstable::ObDatumRowkey &rowkey, blocksstable::ObDatumRow &row) +int ObDDLKV::get( + const ObTableIterParam ¶m, + ObTableAccessContext &context, + const ObDatumRowkey &rowkey, + ObDatumRow &row) { int ret = OB_SUCCESS; ObStoreRowIterator *row_iter = nullptr; const ObDatumRow *row_ptr= nullptr; if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; + LOG_WARN("not inited", K(ret)); } else if (OB_UNLIKELY(!is_inc_ddl_kv())) { ret = OB_NOT_SUPPORTED; LOG_WARN("not support get for full direct load", K(ret)); } else if (OB_FAIL(get(param, context, rowkey, row_iter))) { LOG_WARN("fail to get row", K(ret)); } else if (OB_FAIL(row_iter->get_next_row(row_ptr))) { - LOG_WARN("fail to get row", K(ret)); + if (OB_UNLIKELY(OB_ITER_END != ret)) { + LOG_WARN("fail to get row", K(ret)); + } } else if (OB_ISNULL(row_ptr) || row_ptr->row_flag_.is_not_exist()) { ret = OB_ERR_UNEXPECTED; LOG_WARN("get unexpected row", K(ret)); @@ -1812,55 +1776,76 @@ int ObDDLKV::get(const storage::ObTableIterParam ¶m, storage::ObTableAccessC return ret; } -int ObDDLKV::get_empty_iter(const ObTableIterParam ¶m, ObTableAccessContext &context, - const void *ranges, ObStoreRowIterator *&row_iter) +int ObDDLKV::multi_get( + const ObTableIterParam ¶m, + ObTableAccessContext &context, + const ObIArray &rowkeys, + ObStoreRowIterator *&row_iter) { int ret = OB_SUCCESS; - if (OB_UNLIKELY(!context.is_valid() || OB_ISNULL(ranges) || !param.is_valid())) { + if (OB_UNLIKELY(IS_NOT_INIT)) { + ret = OB_NOT_INIT; + LOG_WARN("not inited", K(ret)); + } else if (OB_UNLIKELY( + !param.is_valid() + || !context.is_valid() + || 0 >= rowkeys.count())) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("Invalid argument", K(ret), K(context)); + LOG_WARN("Invalid argument", K(ret), K(param), K(context), K(rowkeys)); + } else if (OB_UNLIKELY(!is_inc_ddl_kv())) { + ret = OB_NOT_SUPPORTED; + LOG_WARN("not support get for full direct load", K(ret)); } else { - void *buf = nullptr; - ObStoreRowIterator *empty_row_iter = nullptr; - ALLOCATE_TABLE_STORE_ROW_IETRATOR(context, - ObDDLKVEmptyIterator, - empty_row_iter); - - if (OB_SUCC(ret)) { - if (OB_ISNULL(empty_row_iter)) { - ret = OB_ERR_UNEXPECTED; - STORAGE_LOG(WARN, "unexpected error, row_scanner is nullptr", K(ret), KP(empty_row_iter)); - } else if (OB_FAIL(empty_row_iter->init(param, context, this, ranges))) { - LOG_WARN("Fail to open row scanner", K(ret), K(param), K(context), KP(ranges), K(*this)); - } - } - - if (OB_FAIL(ret)) { - if (nullptr != empty_row_iter) { - empty_row_iter->~ObStoreRowIterator(); - FREE_TABLE_STORE_ROW_IETRATOR(context, empty_row_iter); - empty_row_iter = nullptr; - } - } else { - row_iter = empty_row_iter; - } + ALLOCATE_DDL_KV_MULTI_VERSION_ROW_IETRATOR(ObSSTableMultiVersionRowMultiGetter, &rowkeys, row_iter); } return ret; } -int ObDDLKV::check_row_locked(const ObTableIterParam ¶m, const blocksstable::ObDatumRowkey &rowkey, - ObTableAccessContext &context, ObStoreRowLockState &lock_state, ObRowState &row_state, bool check_exist) +int ObDDLKV::multi_scan( + const ObTableIterParam ¶m, + ObTableAccessContext &context, + const ObIArray &ranges, + ObStoreRowIterator *&row_iter) { int ret = OB_SUCCESS; + if (OB_UNLIKELY(IS_NOT_INIT)) { + ret = OB_NOT_INIT; + LOG_WARN("not inited", K(ret)); + } else if (OB_UNLIKELY( + !param.is_valid() + || !context.is_valid() + || 0 >= ranges.count())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("Invalid argument", K(ret), K(param), K(context), K(ranges)); + } else if (OB_UNLIKELY(!is_inc_ddl_kv())) { + ret = OB_NOT_SUPPORTED; + LOG_WARN("not support get for full direct load", K(ret)); + } else { + ALLOCATE_DDL_KV_MULTI_VERSION_ROW_IETRATOR(ObSSTableMultiVersionRowMultiScanner, &ranges, row_iter); + } + return ret; +} + +int ObDDLKV::check_row_locked( + const ObTableIterParam ¶m, + const ObDatumRowkey &rowkey, + ObTableAccessContext &context, + ObStoreRowLockState &lock_state, + ObRowState &row_state, + bool check_exist) +{ + int ret = OB_SUCCESS; + lock_state.trans_version_ = SCN::min_scn(); + lock_state.is_locked_ = false; TCRLockGuard guard(lock_); if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; + LOG_WARN("not inited", K(ret)); } else if (OB_UNLIKELY(!is_inc_ddl_kv())) { ret = OB_NOT_SUPPORTED; LOG_WARN("not support get for full direct load", K(ret)); } else if (ddl_memtables_.count() == 0) { - ret = OB_NOT_SUPPORTED; - LOG_WARN("not support to check lock when memtable count is 0", K(ret)); + // do nothing } else if (ddl_memtables_.count() != 1) { ret = OB_NOT_SUPPORTED; LOG_WARN("inc direct load do not support column store yet", K(ret)); @@ -1870,24 +1855,95 @@ int ObDDLKV::check_row_locked(const ObTableIterParam ¶m, const blocksstable: return ret; } -int ObDDLKV::check_rows_locked(const bool check_exist, ObTableAccessContext &context, - SCN &max_trans_version, ObRowsInfo &rows_info) +int ObDDLKV::check_rows_locked( + const bool check_exist, + ObTableAccessContext &context, + SCN &max_trans_version, + ObRowsInfo &rows_info) { int ret = OB_SUCCESS; TCRLockGuard guard(lock_); if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; + LOG_WARN("not inited", K(ret)); } else if (OB_UNLIKELY(!is_inc_ddl_kv())) { ret = OB_NOT_SUPPORTED; LOG_WARN("not support get for full direct load", K(ret)); } else if (ddl_memtables_.count() == 0) { - ret = OB_NOT_SUPPORTED; - LOG_WARN("not support to check lock when memtable count is 0", K(ret)); + // do nothing } else if (ddl_memtables_.count() != 1) { ret = OB_NOT_SUPPORTED; LOG_WARN("inc direct load do not support column store yet", K(ret)); - } else if (OB_FAIL(ddl_memtables_.at(0)->check_rows_locked(check_exist, context, max_trans_version, rows_info))) { - LOG_WARN("fail to get row", K(ret)); + } else { + ret = OB_NOT_SUPPORTED; + LOG_WARN("check_rows_locked in ddl memtable is not supported", K(ret)); + // } else if (OB_FAIL(ddl_memtables_.at(0)->check_rows_locked(check_exist, context, max_trans_version, rows_info))) { + // LOG_WARN("fail to get row", K(ret)); + } + return ret; +} + +int64_t ObDDLKV::get_occupied_size() const +{ + int ret = OB_SUCCESS; + int64_t occupied_size = 0; + if (OB_UNLIKELY(IS_NOT_INIT)) { + ret = OB_NOT_INIT; + LOG_WARN("not inited", K(ret)); + } else { + TCRLockGuard guard(lock_); + ObSSTableMetaHandle sst_meta_hdl; + for (int64_t i = 0; OB_SUCC(ret) && i < ddl_memtables_.count(); ++i) { + ObDDLMemtable *ddl_memtable = ddl_memtables_.at(i); + occupied_size += ddl_memtable->get_occupy_size(); + } + } + return occupied_size; +} + +int64_t ObDDLKV::get_row_count() const +{ + int ret = OB_SUCCESS; + int64_t row_count = 0; + if (OB_UNLIKELY(IS_NOT_INIT)) { + ret = OB_NOT_INIT; + LOG_WARN("not inited", K(ret)); + } else { + TCRLockGuard guard(lock_); + ObSSTableMetaHandle sst_meta_hdl; + for (int64_t i = 0; OB_SUCC(ret) && i < ddl_memtables_.count(); ++i) { + ObDDLMemtable *ddl_memtable = ddl_memtables_.at(i); + row_count += ddl_memtable->get_row_count(); + } + } + return row_count; +} + +int ObDDLKV::get_block_count_and_row_count( + int64_t ¯o_block_count, + int64_t µ_block_count, + int64_t &row_count) const +{ + int ret = OB_SUCCESS; + macro_block_count = 0; + micro_block_count = 0; + row_count = 0; + if (OB_UNLIKELY(IS_NOT_INIT)) { + ret = OB_NOT_INIT; + LOG_WARN("not inited", K(ret)); + } else { + TCRLockGuard guard(lock_); + ObSSTableMetaHandle sst_meta_hdl; + for (int64_t i = 0; OB_SUCC(ret) && i < ddl_memtables_.count(); ++i) { + ObDDLMemtable *ddl_memtable = ddl_memtables_.at(i); + if (OB_FAIL(ddl_memtable->get_meta(sst_meta_hdl))) { + LOG_WARN("fail to get meta", K(ret)); + } else { + macro_block_count += sst_meta_hdl.get_sstable_meta().get_data_macro_block_count(); + micro_block_count += sst_meta_hdl.get_sstable_meta().get_data_micro_block_count(); + row_count += sst_meta_hdl.get_sstable_meta().get_row_count(); + } + } } return ret; } diff --git a/src/storage/ddl/ob_tablet_ddl_kv.h b/src/storage/ddl/ob_tablet_ddl_kv.h index 21d025a53..7fd0f2f07 100644 --- a/src/storage/ddl/ob_tablet_ddl_kv.h +++ b/src/storage/ddl/ob_tablet_ddl_kv.h @@ -42,24 +42,6 @@ class ObDataMacroBlockMeta; namespace storage { -class ObDDLKVEmptyIterator : public ObStoreRowIterator -{ -public: - ObDDLKVEmptyIterator() {}; - virtual ~ObDDLKVEmptyIterator() {} - void reset() {} - void reuse() {} - int get_next_row(const blocksstable::ObDatumRow *&store_row) { return OB_ITER_END; } -protected: - int inner_open( - const ObTableIterParam &iter_param, - ObTableAccessContext &access_ctx, - ObITable *table, - const void *query_range) { return OB_SUCCESS; } - virtual int inner_get_next_row(const blocksstable::ObDatumRow *&store_row) { return OB_ITER_END; } - -}; - class ObBlockMetaTreeValue final { public: @@ -230,7 +212,13 @@ public: const share::SCN &last_freezed_scn, const uint64_t data_format_version); +public: // derived from ObIMemtable + // for read_barrier, it needs to be always false + virtual bool is_empty() const override { return false; } + virtual int64_t get_occupied_size() const override; + public : // derived from ObITabletMemtable + virtual bool is_inited() const override { return is_inited_; } virtual int init(const ObITable::TableKey &table_key, ObLSHandle &ls_handle, ObFreezer *freezer, @@ -238,10 +226,19 @@ public : // derived from ObITabletMemtable const int64_t schema_version, const uint32_t freeze_clock) override; virtual void print_ready_for_flush() override; - virtual bool is_inited() const override { return is_inited_; } - virtual bool is_frozen_memtable() override; + virtual int set_frozen() override { ATOMIC_SET(&is_independent_freezed_, true); return OB_SUCCESS; } + virtual bool can_be_minor_merged() override; + virtual int get_schema_info( + const int64_t input_column_cnt, + int64_t &max_schema_version_on_memtable, + int64_t &max_column_cnt_on_memtable) const override; + // TODO : @suzhi.yt implement ddlkv dump2text + virtual int dump2text(const char *fname) override { return OB_SUCCESS; } + +public: // derived from ObITable + virtual bool is_frozen_memtable() override; + virtual int get_frozen_schema_version(int64_t &schema_version) const; -public: // derived from ObSSTable virtual int exist( const ObTableIterParam ¶m, ObTableAccessContext &context, @@ -280,7 +277,7 @@ public: // derived from ObSSTable ObTableAccessContext &context, const common::ObIArray &ranges, ObStoreRowIterator *&row_iter) override; - virtual int get_frozen_schema_version(int64_t &schema_version) const; + int check_row_locked( const ObTableIterParam ¶m, const blocksstable::ObDatumRowkey &rowkey, @@ -288,6 +285,8 @@ public: // derived from ObSSTable ObStoreRowLockState &lock_state, ObRowState &row_state, bool check_exist = false); + + // TODO : @jianyun.sjy ObDDLMemtable adapts check_rows_locked int check_rows_locked( const bool check_exist, storage::ObTableAccessContext &context, @@ -320,19 +319,26 @@ public: share::SCN get_freeze_scn() const { return freeze_scn_; } share::SCN get_ddl_start_scn() const { return ddl_start_scn_; } int64_t get_macro_block_cnt() const { return macro_block_count_; } - int create_ddl_memtable(ObTablet &tablet, const ObITable::TableKey &table_key, ObDDLMemtable *&ddl_memtable); + // not thread safe, external call are limited to ddl merge task int get_ddl_memtable(const int64_t cg_idx, ObDDLMemtable *&ddl_memtable); - int get_first_ddl_memtable(ObDDLMemtable *&ddl_memtable); ObIArray &get_ddl_memtables() { return ddl_memtables_; } void inc_pending_cnt(); // used by ddl kv pending guard void dec_pending_cnt(); // const common::ObTabletID &get_tablet_id() const { return tablet_id_; } - int64_t get_snapshot_version() const { return ddl_snapshot_version_; } uint64_t get_data_format_version() const { return data_format_version_; } const transaction::ObTransID &get_trans_id() const { return trans_id_; } int64_t get_memory_used() const; OB_INLINE bool is_inc_ddl_kv() const { return is_inc_ddl_kv_; } - virtual int set_frozen() override { ATOMIC_SET(&is_independent_freezed_, true);; return OB_SUCCESS; } + + int64_t get_row_count() const; + int get_block_count_and_row_count( + int64_t ¯o_block_count, + int64_t µ_block_count, + int64_t &row_count) const; + + // for inc_ddl_kv only + template + int access_first_ddl_memtable(_callback &callback) const; INHERIT_TO_STRING_KV("ObITabletMemtable", ObITabletMemtable, @@ -346,6 +352,8 @@ public: K_(snapshot_version), K_(data_format_version), K_(trans_id), + K_(data_schema_version), + K_(column_count), K_(min_scn), K_(max_scn), K_(freeze_scn), @@ -360,8 +368,8 @@ private: int full_load_freeze_(const share::SCN &freeze_scn); int inc_load_freeze_(); - int get_empty_iter(const ObTableIterParam ¶m, ObTableAccessContext &context, - const void *anges, ObStoreRowIterator *&row_iter); + int create_ddl_memtable(ObTablet &tablet, const ObITable::TableKey &table_key, ObDDLMemtable *&ddl_memtable); + private: static const int64_t TOTAL_LIMIT = 10 * 1024 * 1024 * 1024L; static const int64_t HOLD_LIMIT = 10 * 1024 * 1024 * 1024L; @@ -376,6 +384,8 @@ private: int64_t ddl_snapshot_version_; // the snapshot version for major sstable which is completed by ddl uint64_t data_format_version_; transaction::ObTransID trans_id_; // for incremental direct load only + int64_t data_schema_version_; + int64_t column_count_; // freeze related share::SCN min_scn_; // the min log ts of macro blocks @@ -386,6 +396,28 @@ private: ObArray ddl_memtables_; }; +template +int ObDDLKV::access_first_ddl_memtable(_callback &callback) const +{ + int ret = OB_SUCCESS; + TCRLockGuard guard(lock_); + if (OB_UNLIKELY(!is_inited_)) { + ret = OB_NOT_INIT; + STORAGE_LOG(WARN, "not inited", K(ret)); + } else if (OB_UNLIKELY(!is_inc_ddl_kv())) { + ret = OB_NOT_SUPPORTED; + STORAGE_LOG(WARN, "not support get for full direct load", K(ret)); + } else if (ddl_memtables_.count() == 0) { + ret = OB_ENTRY_NOT_EXIST; + } else if (ddl_memtables_.count() != 1) { + ret = OB_NOT_SUPPORTED; + STORAGE_LOG(WARN, "inc direct load do not support column store yet", K(ret)); + } else { + ObDDLMemtable *ddl_memtable = ddl_memtables_.at(0); + ret = callback(ddl_memtable); + } + return ret; +} } // end namespace storage } // end namespace oceanbase diff --git a/src/storage/ddl/ob_tablet_ddl_kv_multi_version_row_iterator.h b/src/storage/ddl/ob_tablet_ddl_kv_multi_version_row_iterator.h index a3d2203ae..9b20344f2 100644 --- a/src/storage/ddl/ob_tablet_ddl_kv_multi_version_row_iterator.h +++ b/src/storage/ddl/ob_tablet_ddl_kv_multi_version_row_iterator.h @@ -24,25 +24,50 @@ namespace storage template class ObDDLKVMultiVersionRowIterator : public ObStoreRowIterator { +private: + class IteratorInitializer + { + public: + IteratorInitializer(T &iterator, + const ObTableIterParam ¶m, + ObTableAccessContext &context, + const void *query_range) + : iterator_(iterator), param_(param), context_(context), query_range_(query_range) + { + } + int operator()(ObDDLMemtable *ddl_memtable) + { + int ret = OB_SUCCESS; + if (OB_ISNULL(ddl_memtable)) { + ret = OB_ERR_UNEXPECTED; + STORAGE_LOG(WARN, "unexpected ddl memtable is null", K(ret)); + } else if (OB_FAIL(iterator_.init(param_, context_, ddl_memtable, query_range_))) { + STORAGE_LOG(WARN, "fail to init", K(ret)); + } + return ret; + } + private: + T &iterator_; + const ObTableIterParam ¶m_; + ObTableAccessContext &context_; + const void *query_range_; + }; + public: - ObDDLKVMultiVersionRowIterator() {} + ObDDLKVMultiVersionRowIterator() : is_empty_(false) { type_ = iterator_.get_iter_type(); } virtual ~ObDDLKVMultiVersionRowIterator() {} virtual void reuse() { iterator_.reuse(); + is_empty_ = false; ObStoreRowIterator::reuse(); } virtual void reset() { iterator_.reset(); + is_empty_ = false; ObStoreRowIterator::reset(); } - // used for global query iterator pool, prepare for returning to pool - virtual void reclaim() - { - iterator_.reclaim(); - ObStoreRowIterator::reclaim(); - } virtual int init( const ObTableIterParam ¶m, ObTableAccessContext &context, @@ -50,6 +75,7 @@ public: const void *query_range) { int ret = common::OB_SUCCESS; + is_reclaimed_ = false; if (OB_ISNULL(query_range) || OB_ISNULL(table)) { ret = OB_INVALID_ARGUMENT; STORAGE_LOG(WARN, "invalid argument", K(ret), KP(query_range), KP(table)); @@ -57,29 +83,33 @@ public: ret = OB_INVALID_ARGUMENT; STORAGE_LOG(WARN, "invalid argument", K(ret), KPC(table)); } else { - ObDDLMemtable *ddl_memtable = nullptr; - if (OB_FAIL((static_cast(table)->get_first_ddl_memtable(ddl_memtable)))) { - STORAGE_LOG(WARN, "fail to get ddl memtable", K(ret)); - } else if (OB_FAIL(iterator_.init(param, context, ddl_memtable, query_range))) { - STORAGE_LOG(WARN, "fail to init", K(ret)); - } else { - type_ = iterator_.get_iter_type(); - is_sstable_iter_ = false; - is_reclaimed_ = false; + ObDDLKV *ddl_kv = static_cast(table); + IteratorInitializer initializer(iterator_, param, context, query_range); + if (OB_FAIL(ddl_kv->access_first_ddl_memtable(initializer))) { + if (OB_UNLIKELY(OB_ENTRY_NOT_EXIST != ret)) { + STORAGE_LOG(WARN, "fail to access first ddl memtable", K(ret)); + } else { + ret = OB_SUCCESS; + is_empty_ = true; + } } } return ret; } - virtual int set_ignore_shadow_row() { return iterator_.set_ignore_shadow_row(); } - virtual bool can_blockscan() const { return iterator_.can_blockscan(); } - virtual bool can_batch_scan() const { return iterator_.can_batch_scan(); } virtual int get_next_row(const blocksstable::ObDatumRow *&row) { - return iterator_.get_next_row(row); + int ret = OB_SUCCESS; + if (is_empty_) { + ret = OB_ITER_END; + } else { + ret = iterator_.get_next_row(row); + } + return ret; } private: T iterator_; + bool is_empty_; }; } // namespace storage diff --git a/src/storage/high_availability/ob_tablet_backfill_tx.cpp b/src/storage/high_availability/ob_tablet_backfill_tx.cpp index 3908f8ea5..52cf57fe8 100644 --- a/src/storage/high_availability/ob_tablet_backfill_tx.cpp +++ b/src/storage/high_availability/ob_tablet_backfill_tx.cpp @@ -586,7 +586,7 @@ int ObTabletBackfillTXTask::get_backfill_tx_memtables_( ret = OB_ERR_UNEXPECTED; LOG_WARN("table should not be NULL or table type is unexpected", K(ret), KP(table)); } else if (table->is_direct_load_memtable()) { - ret = OB_NOT_SUPPORTED; + ret = OB_TRANSFER_SYS_ERROR; LOG_WARN("find a direct load memtable", KR(ret), K(tablet_info_.tablet_id_), KPC(table)); } else if (FALSE_IT(memtable = static_cast(table))) { } else if (table->get_start_scn() >= backfill_tx_ctx_->log_sync_scn_ diff --git a/src/storage/high_availability/ob_transfer_struct.cpp b/src/storage/high_availability/ob_transfer_struct.cpp index a40d0087d..a967601ad 100644 --- a/src/storage/high_availability/ob_transfer_struct.cpp +++ b/src/storage/high_availability/ob_transfer_struct.cpp @@ -385,16 +385,20 @@ int ObTXTransferUtils::set_tablet_freeze_flag(storage::ObLS &ls, ObTablet *table LOG_WARN("failed to get_memtable_mgr for get all memtable", K(ret), KPC(tablet)); } else { CLICK(); + ObITabletMemtable *mt = nullptr; for (int64_t i = 0; OB_SUCC(ret) && i < memtables.count(); ++i) { - ObITable *table = memtables.at(i).get_table(); - if (OB_ISNULL(table)) { + if (OB_UNLIKELY(memtables.at(i).get_tablet_memtable(mt))) { ret = OB_ERR_UNEXPECTED; - LOG_WARN("table in tables_handle is invalid", K(ret), KP(table)); + LOG_WARN("table in tables_handle is not memtable", K(ret), K(memtables.at(i))); + } else if (!mt->is_active_memtable()) { + // skip + } else if (OB_UNLIKELY(!mt->is_data_memtable())) { + // incremental direct load hold table lock will block transfer scheduling, so there will be no active direct load memtable + ret = OB_TRANSFER_SYS_ERROR; + LOG_WARN("memtable is not data memtable", K(ret), KPC(mt)); } else { - memtable::ObMemtable *memtable = static_cast(table); - if (memtable->is_active_memtable()) { - memtable->set_transfer_freeze(weak_read_scn); - } + memtable::ObMemtable *memtable = static_cast(mt); + memtable->set_transfer_freeze(weak_read_scn); } } if (OB_SUCC(ret)) { diff --git a/src/storage/ls/ob_freezer.cpp b/src/storage/ls/ob_freezer.cpp index e7a124608..3806afb3f 100644 --- a/src/storage/ls/ob_freezer.cpp +++ b/src/storage/ls/ob_freezer.cpp @@ -1061,6 +1061,7 @@ int ObFreezer::do_direct_load_memtable_tablet_freeze_(ObITabletMemtable *tablet_ ObDDLKV *direct_load_memtable = static_cast(tablet_memtable); if (OB_FAIL(direct_load_memtable->decide_right_boundary())) { STORAGE_LOG(WARN, "freeze direct load memtable failed", KR(ret), K(ls_id), KPC(tablet_memtable)); + } else if (FALSE_IT(direct_load_memtable->set_snapshot_version(get_freeze_snapshot_version()))) { } else { int64_t read_lock = LSLOCKALL; int64_t write_lock = 0; diff --git a/src/storage/ls/ob_ls_tablet_service.cpp b/src/storage/ls/ob_ls_tablet_service.cpp index 4910df7ec..18c090c82 100644 --- a/src/storage/ls/ob_ls_tablet_service.cpp +++ b/src/storage/ls/ob_ls_tablet_service.cpp @@ -77,6 +77,7 @@ #include "storage/high_availability/ob_storage_ha_utils.h" #include "storage/slog_ckpt/ob_tenant_checkpoint_slog_handler.h" #include "storage/concurrency_control/ob_data_validation_service.h" +#include "storage/ddl/ob_tablet_ddl_kv.h" using namespace oceanbase::share; using namespace oceanbase::common; @@ -6067,7 +6068,19 @@ int ObLSTabletService::estimate_block_count_and_row_count( ret = OB_ERR_UNEXPECTED; LOG_WARN("get unexpected null table", K(ret), K(tablet_iter.table_iter())); } else if (table->is_direct_load_memtable()) { - // FIXME : @suzhi.yt + ObDDLKV *ddl_kv = static_cast(table); + int64_t macro_block_count_in_ddl_kv = 0; + int64_t micro_block_count_in_ddl_kv = 0; + int64_t row_count_in_ddl_kv = 0; + if (OB_FAIL(ddl_kv->get_block_count_and_row_count(macro_block_count_in_ddl_kv, + micro_block_count_in_ddl_kv, + row_count_in_ddl_kv))) { + LOG_WARN("fail to get block count and row count", K(ret)); + } else { + macro_block_count += macro_block_count_in_ddl_kv; + micro_block_count += micro_block_count_in_ddl_kv; + sstable_row_count += row_count_in_ddl_kv; + } } else if (table->is_data_memtable()) { memtable_row_count += static_cast(table)->get_physical_row_cnt(); } else if (table->is_sstable()) { diff --git a/src/storage/memtable/ob_memtable.cpp b/src/storage/memtable/ob_memtable.cpp index 459fa17d5..8cf216a2a 100644 --- a/src/storage/memtable/ob_memtable.cpp +++ b/src/storage/memtable/ob_memtable.cpp @@ -52,6 +52,7 @@ #include "storage/column_store/ob_column_oriented_sstable.h" #include "storage/access/ob_row_sample_iterator.h" #include "storage/concurrency_control/ob_trans_stat_row.h" +#include "storage/ddl/ob_tablet_ddl_kv.h" #include "logservice/ob_log_service.h" @@ -99,6 +100,41 @@ ObGlobalMtAlloc &get_global_mt_alloc() return s_alloc; } +class ObDirectLoadMemtableRowsLockedChecker +{ +public: + ObDirectLoadMemtableRowsLockedChecker(ObMemtable &memtable, + const bool check_exist, + const storage::ObTableIterParam ¶m, + storage::ObTableAccessContext &context, + ObRowsInfo &rows_info) + : memtable_(memtable), + check_exist_(check_exist), + param_(param), + context_(context), + rows_info_(rows_info) + { + } + int operator()(ObDDLMemtable *ddl_memtable) + { + int ret = OB_SUCCESS; + if (OB_ISNULL(ddl_memtable)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected ddl memtable is null", K(ret)); + } else if (OB_FAIL(memtable_.check_rows_locked_on_ddl_merge_sstable( + ddl_memtable, check_exist_, param_, context_, rows_info_))) { + TRANS_LOG(WARN, "Failed to check rows locked for sstable", K(ret), KPC(ddl_memtable)); + } + return ret; + } +private: + ObMemtable &memtable_; + const bool check_exist_; + const storage::ObTableIterParam ¶m_; + storage::ObTableAccessContext &context_; + ObRowsInfo &rows_info_; +}; + //////////////////////////////////////////////////////////////////////////////////////////////////// // Public Functions @@ -1209,21 +1245,6 @@ int ObMemtable::multi_scan( return ret; } -int ObMemtable::replay_schema_version_change_log(const int64_t schema_version) -{ - int ret = OB_SUCCESS; - if (IS_NOT_INIT) { - TRANS_LOG(WARN, "not init", K(*this)); - ret = OB_NOT_INIT; - } else if (schema_version < 0) { - ret = OB_INVALID_ARGUMENT; - TRANS_LOG(WARN, "invalid argument", K(ret), K(schema_version)); - } else { - set_max_schema_version(schema_version); - } - return ret; -} - int ObMemtable::replay_row(ObStoreCtx &ctx, const share::SCN &scn, ObMemtableMutatorIterator *mmi) @@ -1432,7 +1453,33 @@ int ObMemtable::internal_lock_row_on_frozen_stores_(const bool check_exist, K(tmp_lock_state), K(row_state)); } else if (iter_tables.at(i)->is_direct_load_memtable()) { - TRANS_LOG(DEBUG, "skip check direct load memtable", KPC(iter_tables.at(i))); + ObDDLKV *ddl_kv = static_cast(iter_tables.at(i)); + blocksstable::ObDatumRowkeyHelper rowkey_converter; + blocksstable::ObDatumRowkey datum_rowkey; + if (OB_FAIL(rowkey_converter.convert_datum_rowkey(key->get_rowkey()->get_rowkey(), datum_rowkey))) { + STORAGE_LOG(WARN, "Failed to convert datum rowkey", K(ret), KPC(key)); + } else if (OB_FAIL(ddl_kv->check_row_locked( + param, datum_rowkey, context, tmp_lock_state, row_state, check_exist))) { + TRANS_LOG(WARN, + "direct load memtable check row lock fail", + K(ret), + KPC(key), + K(check_exist), + K(datum_rowkey), + K(lock_state), + K(tmp_lock_state), + K(row_state)); + } + TRANS_LOG(DEBUG, + "direct load memtable check row lock debug", + K(ret), + KPC(key), + KPC(ddl_kv), + K(check_exist), + K(datum_rowkey), + K(lock_state), + K(tmp_lock_state), + K(row_state)); } else { ret = OB_ERR_UNEXPECTED; TRANS_LOG(ERROR, "unknown store type", K(ret), K(iter_tables), K(i)); @@ -1673,8 +1720,16 @@ int ObMemtable::internal_lock_rows_on_frozen_stores_( TRANS_LOG(WARN, "Failed to check rows locked for sstable", K(ret), K(i), K(iter_tables)); } } - } else if (iter_tables.at(i)->is_direct_load_memtable()) { - TRANS_LOG(DEBUG, "skip check direct load memtable", KPC(iter_tables.at(i))); + } else if (i_table->is_direct_load_memtable()) { + ObDDLKV *ddl_kv = static_cast(i_table); + ObDirectLoadMemtableRowsLockedChecker checker(*this, check_exist, param, context, rows_info); + if (OB_FAIL(ddl_kv->access_first_ddl_memtable(checker))) { + if (OB_UNLIKELY(OB_ENTRY_NOT_EXIST != ret)) { + STORAGE_LOG(WARN, "fail to access first ddl memtable", K(ret), K(i), K(iter_tables)); + } else { + ret = OB_SUCCESS; + } + } } else { ret = OB_ERR_UNEXPECTED; TRANS_LOG(ERROR, "Unknown store type", K(ret), K(iter_tables), K(i)); @@ -1868,12 +1923,12 @@ bool ObMemtable::ready_for_flush_() } else if (is_frozen && get_logging_blocked()) { // ensure unset all frozen memtables'logging_block ObTableHandleV2 handle; - ObMemtable *first_frozen_memtable = nullptr; + ObITabletMemtable *first_frozen_memtable = nullptr; ObTabletMemtableMgr *memtable_mgr = get_memtable_mgr(); if (OB_ISNULL(memtable_mgr)) { } else if (OB_FAIL(memtable_mgr->get_first_frozen_memtable(handle))) { TRANS_LOG(WARN, "fail to get first_frozen_memtable", K(ret)); - } else if (OB_FAIL(handle.get_data_memtable(first_frozen_memtable))) { + } else if (OB_FAIL(handle.get_tablet_memtable(first_frozen_memtable))) { TRANS_LOG(WARN, "fail to get memtable", K(ret)); } else if (first_frozen_memtable == this) { (void)clear_logging_blocked(); diff --git a/src/storage/memtable/ob_memtable.h b/src/storage/memtable/ob_memtable.h index 1805260f1..b7845c974 100644 --- a/src/storage/memtable/ob_memtable.h +++ b/src/storage/memtable/ob_memtable.h @@ -225,6 +225,10 @@ public: // derived from ObITabletMemtable virtual bool is_inited() const override { return is_inited_; } virtual int64_t dec_write_ref() override; virtual bool is_frozen_memtable() override; + virtual int get_schema_info( + const int64_t input_column_cnt, + int64_t &max_schema_version_on_memtable, + int64_t &max_column_cnt_on_memtable) const override; public: // derived from ObITable // ==================== Memtable Operation Interface ================== @@ -357,7 +361,6 @@ public: // derived from ObITable storage::ObStoreCtx &ctx, const share::SCN &scn, ObMemtableMutatorIterator *mmi); - virtual int replay_schema_version_change_log(const int64_t schema_version); virtual int safe_to_destroy(bool &is_safe); public: @@ -369,10 +372,6 @@ public: int64_t get_max_data_schema_version() const; void set_max_column_cnt(const int64_t column_cnt); int64_t get_max_column_cnt() const; - int get_schema_info( - const int64_t input_column_cnt, - int64_t &max_schema_version_on_memtable, - int64_t &max_column_cnt_on_memtable) const; int row_compact(ObMvccRow *value, const share::SCN snapshot_version, const int64_t flag); @@ -450,7 +449,7 @@ public: int check_cleanout(bool &is_all_cleanout, bool &is_all_delay_cleanout, int64_t &count); - int dump2text(const char *fname); + virtual int dump2text(const char *fname) override; // TODO(handora.qc) ready_for_flush interface adjustment virtual int finish_freeze(); diff --git a/src/storage/memtable/ob_row_conflict_handler.cpp b/src/storage/memtable/ob_row_conflict_handler.cpp index 72862161b..8f3040ea4 100644 --- a/src/storage/memtable/ob_row_conflict_handler.cpp +++ b/src/storage/memtable/ob_row_conflict_handler.cpp @@ -16,6 +16,8 @@ #include "storage/memtable/mvcc/ob_mvcc_iterator.h" #include "storage/memtable/ob_lock_wait_mgr.h" #include "storage/tx_table/ob_tx_table_guards.h" +#include "storage/access/ob_rows_info.h" +#include "storage/ddl/ob_tablet_ddl_kv.h" namespace oceanbase { using namespace common; @@ -122,6 +124,16 @@ int ObRowConflictHandler::check_row_locked(const storage::ObTableIterParam ¶ } else if (max_trans_version < lock_state.trans_version_) { max_trans_version = lock_state.trans_version_; } + } else if (stores->at(i)->is_direct_load_memtable()) { + ObDDLKV *ddl_kv = static_cast(stores->at(i)); + if (OB_FAIL(ddl_kv->check_row_locked(param, rowkey, context, lock_state, row_state))) { + TRANS_LOG(WARN, "sstable check row lock fail", K(ret), K(rowkey)); + } else if (lock_state.is_locked_) { + break; + } else if (max_trans_version < row_state.max_trans_version_) { + max_trans_version = row_state.max_trans_version_; + } + TRANS_LOG(DEBUG, "check_row_locked meet direct load memtable", K(ret), K(rowkey), K(row_state), K(*ddl_kv)); } else if (stores->at(i)->is_sstable()) { blocksstable::ObSSTable *sstable = static_cast(stores->at(i)); if (OB_FAIL(sstable->check_row_locked(param, rowkey, context, lock_state, row_state))) { diff --git a/src/storage/ob_i_memtable_mgr.cpp b/src/storage/ob_i_memtable_mgr.cpp index f5d7f0d5a..930ab0fef 100644 --- a/src/storage/ob_i_memtable_mgr.cpp +++ b/src/storage/ob_i_memtable_mgr.cpp @@ -66,14 +66,14 @@ int ObIMemtableMgr::get_first_nonempty_memtable(ObTableHandleV2 &handle) const for (int64_t i = memtable_head_; OB_SUCC(ret) && i < memtable_tail_; ++i) { ObTableHandleV2 tmp_handle; - memtable::ObMemtable *mt = NULL; + ObITabletMemtable *mt = NULL; if (OB_FAIL(get_ith_memtable(i, tmp_handle))) { STORAGE_LOG(WARN, "fail to get ith memtable", KR(ret), K(i)); } else if (OB_UNLIKELY(!tmp_handle.is_valid())) { ret = OB_ERR_UNEXPECTED; STORAGE_LOG(WARN, "get invalid tmp table handle", KR(ret), K(i), K(tmp_handle)); - } else if (OB_FAIL(tmp_handle.get_data_memtable(mt))) { - STORAGE_LOG(WARN, "failed to get_data_memtable", KR(ret), K(i), K(tmp_handle)); + } else if (OB_FAIL(tmp_handle.get_tablet_memtable(mt))) { + STORAGE_LOG(WARN, "failed to get_tablet_memtable", KR(ret), K(i), K(tmp_handle)); } else if (OB_ISNULL(mt)) { ret = OB_ERR_UNEXPECTED; STORAGE_LOG(WARN, "mt is NULL", KR(ret), K(i), K(tmp_handle)); diff --git a/src/storage/ob_i_tablet_memtable.cpp b/src/storage/ob_i_tablet_memtable.cpp index 3bd17ebe1..bdaf2eda5 100644 --- a/src/storage/ob_i_tablet_memtable.cpp +++ b/src/storage/ob_i_tablet_memtable.cpp @@ -346,6 +346,21 @@ int64_t ObITabletMemtable::get_max_schema_version() const return ATOMIC_LOAD(&max_schema_version_); } +int ObITabletMemtable::replay_schema_version_change_log(const int64_t schema_version) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!is_inited())) { + TRANS_LOG(WARN, "not init", K(*this)); + ret = OB_NOT_INIT; + } else if (schema_version < 0) { + ret = OB_INVALID_ARGUMENT; + TRANS_LOG(WARN, "invalid argument", K(ret), K(schema_version)); + } else { + set_max_schema_version(schema_version); + } + return ret; +} + int64_t ObITabletMemtable::inc_unsubmitted_cnt_() { return ATOMIC_AAF(&unsubmitted_cnt_, 1); } int64_t ObITabletMemtable::dec_unsubmitted_cnt_() { return ATOMIC_SAF(&unsubmitted_cnt_, 1); } int64_t ObITabletMemtable::inc_write_ref_() { return ATOMIC_AAF(&write_ref_cnt_, 1); } diff --git a/src/storage/ob_i_tablet_memtable.h b/src/storage/ob_i_tablet_memtable.h index b6adf836d..7d277b643 100644 --- a/src/storage/ob_i_tablet_memtable.h +++ b/src/storage/ob_i_tablet_memtable.h @@ -250,6 +250,7 @@ public: int set_migration_clog_checkpoint_scn(const share::SCN &clog_checkpoint_scn); int resolve_left_boundary(share::SCN end_scn) { return set_start_scn(end_scn); } int resolve_right_boundary(); + int replay_schema_version_change_log(const int64_t schema_version); int set_start_scn(const share::SCN start_scn); int set_end_scn(const share::SCN end_scn); int set_max_end_scn(const share::SCN scn, bool allow_backoff = false); @@ -289,6 +290,11 @@ public: virtual void print_ready_for_flush() = 0; virtual void set_allow_freeze(const bool allow_freeze) = 0; virtual int set_frozen() = 0; + virtual int get_schema_info( + const int64_t input_column_cnt, + int64_t &max_schema_version_on_memtable, + int64_t &max_column_cnt_on_memtable) const = 0; + virtual int dump2text(const char *fname) = 0; // *************** pure virtual functions ***************** public: diff --git a/src/storage/ob_partition_range_spliter.cpp b/src/storage/ob_partition_range_spliter.cpp index c334ae40a..dd80aaf31 100644 --- a/src/storage/ob_partition_range_spliter.cpp +++ b/src/storage/ob_partition_range_spliter.cpp @@ -931,10 +931,6 @@ int ObPartitionRangeSpliter::get_single_range_info(const ObStoreRange &store_ran if (OB_ISNULL(table)) { ret = OB_INVALID_ARGUMENT; STORAGE_LOG(WARN, "Invalid table pointer", K(ret), KP(table)); - } else if (table->is_direct_load_memtable()) { - // FIXME : @suzhi.yt - ret = OB_NOT_SUPPORTED; - STORAGE_LOG(WARN, "get single range from direct load memtable not supported", KR(ret), KPC(table)); } else if (table->is_data_memtable()) { memtable::ObMemtable *memtable = static_cast(table); int64_t row_count = 0; @@ -1010,6 +1006,11 @@ int ObPartitionRangeSpliter::get_single_range_info(const ObStoreRange &store_ran } } } + } else if (table->is_direct_load_memtable()) { + // TODO : @suzhi.yt 可能会导致划分range不均衡, 后续实现 + total_size = 0; + macro_block_cnt = 0; + estimate_micro_block_cnt = 0; } return ret; } @@ -1092,10 +1093,6 @@ int ObPartitionRangeSpliter::split_ranges_memtable(ObRangeSplitInfo &range_info, } else if (OB_ISNULL(table)) { ret = OB_ERR_UNEXPECTED; STORAGE_LOG(WARN, "Unexpected null table", K(ret), KP(table), K(range_info)); - } else if (table->is_direct_load_memtable()) { - // FIXME : @suzhi.yt support direct load memtable? - ret = OB_NOT_SUPPORTED; - STORAGE_LOG(WARN, "not supported memtable", KR(ret), KPC(table)); } else if (table->is_data_memtable()) { ObSEArray store_ranges; memtable::ObMemtable *memtable = static_cast(table); @@ -1135,9 +1132,15 @@ int ObPartitionRangeSpliter::split_ranges_memtable(ObRangeSplitInfo &range_info, } } STORAGE_LOG(DEBUG, "splite ranges with memtable", K(range_info), K(range_array)); + } else if (table->is_direct_load_memtable()) { + // TODO : @suzhi.yt 可能会导致划分range不均衡, 后续实现 + if (OB_FAIL(build_single_range(false/*for compaction*/, range_info, allocator, range_array))) { + STORAGE_LOG(WARN, "Failed to build single range", K(ret)); + } else { + STORAGE_LOG(DEBUG, "try to make single split range for memtable", K(range_info), K(range_array)); + } } - return ret; } @@ -1198,6 +1201,8 @@ int ObPartitionMultiRangeSpliter::get_split_tables(ObTableStoreIterator &table_i memtable_size = MAX(mem_size, memtable_size); max_memtable = table; } + } else if (table->is_direct_load_memtable()) { + // TODO : @suzhi.yt 可能会导致划分range不均衡, 后续实现 } } diff --git a/src/storage/tablet/ob_tablet.cpp b/src/storage/tablet/ob_tablet.cpp index 9200c2d60..8a9556515 100644 --- a/src/storage/tablet/ob_tablet.cpp +++ b/src/storage/tablet/ob_tablet.cpp @@ -3046,13 +3046,13 @@ int ObTablet::get_max_schema_version(int64_t &schema_version) if (OB_FAIL(get_all_memtables(table_handle_array))) { LOG_WARN("failed to get all memtable", K(ret), KPC(this)); } else { - const memtable::ObMemtable *memtable = nullptr; + const ObITabletMemtable *memtable = nullptr; for (int64_t i = 0; OB_SUCC(ret) && i < table_handle_array.count(); ++i) { const ObTableHandleV2 &handle = table_handle_array[i]; if (OB_UNLIKELY(!handle.is_valid())) { ret = OB_ERR_SYS; LOG_WARN("invalid memtable", K(ret), K(handle)); - } else if (OB_FAIL(handle.get_data_memtable(memtable))) { + } else if (OB_FAIL(handle.get_tablet_memtable(memtable))) { LOG_WARN("fail to get memtable", K(ret), K(handle)); } else if (OB_ISNULL(memtable)) { ret = OB_ERR_SYS; @@ -4220,16 +4220,12 @@ int ObTablet::get_newest_schema_version(int64_t &schema_version) const int64_t unused_max_column_cnt_on_memtable = 0; for (int64_t idx = 0; OB_SUCC(ret) && idx < memtables.count(); ++idx) { ObITable *table = memtables.at(idx); - if (table->is_data_memtable()) { - ObMemtable *memtable = static_cast(table); + if (table->is_memtable()) { + ObITabletMemtable *memtable = static_cast(table); if (OB_FAIL(memtable->get_schema_info( store_column_cnt_in_schema, max_schema_version_on_memtable, unused_max_column_cnt_on_memtable))) { LOG_WARN("failed to get schema info from memtable", KR(ret), KPC(table)); } - } else if (table->is_direct_load_memtable()) { - // FIXME : @suzhi.yt - // ret = OB_NOT_SUPPORTED; - LOG_INFO("find a direct load memtable", KR(ret)); } } if (OB_SUCC(ret)) { @@ -5245,14 +5241,14 @@ int ObTablet::replay_schema_version_change_log(const int64_t schema_version) } else if (OB_FAIL(get_all_memtables(table_handle_array))) { LOG_WARN("failed to get all memtable", K(ret), KPC(this)); } else { - memtable::ObMemtable *memtable = nullptr; + ObITabletMemtable *memtable = nullptr; const int64_t table_num = table_handle_array.count(); if (0 == table_num) { // no memtable, no need to replay schema version change } else if (!table_handle_array[table_num - 1].is_valid()) { ret = OB_ERR_SYS; LOG_WARN("latest memtable is invalid", K(ret)); - } else if (OB_FAIL(table_handle_array[table_num - 1].get_data_memtable(memtable))) { + } else if (OB_FAIL(table_handle_array[table_num - 1].get_tablet_memtable(memtable))) { LOG_WARN("fail to get memtable", K(ret)); } else if (OB_ISNULL(memtable)) { ret = OB_ERR_SYS; @@ -5459,7 +5455,7 @@ int ObTablet::get_rec_log_scn(SCN &rec_scn) int ret = OB_SUCCESS; rec_scn = SCN::max_scn(); ObTableHandleV2 handle; - memtable::ObMemtable *mt = NULL; + ObITabletMemtable *mt = NULL; ObProtectedMemtableMgrHandle *protected_handle = NULL; if (IS_NOT_INIT) { ret = OB_NOT_INIT; @@ -5472,8 +5468,8 @@ int ObTablet::get_rec_log_scn(SCN &rec_scn) } else { LOG_WARN("fail to get first memtable", KR(ret), K(handle)); } - } else if (OB_FAIL(handle.get_data_memtable(mt))) { - LOG_WARN("fail to get data memtables", KR(ret), K(handle)); + } else if (OB_FAIL(handle.get_tablet_memtable(mt))) { + LOG_WARN("fail to get tablet memtable", KR(ret), K(handle)); } else if (OB_ISNULL(mt)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("mt is NULL", KR(ret), K(handle)); @@ -5564,16 +5560,12 @@ int ObTablet::get_storage_schema_for_transfer_in( if (OB_ISNULL(table)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("table in tables_handle is invalid", K(ret), KP(table)); - } else if (table->is_data_memtable()) { - ObMemtable *memtable = static_cast(table); + } else if (table->is_memtable()) { + ObITabletMemtable *memtable = static_cast(table); if (OB_FAIL(memtable->get_schema_info( store_column_cnt_in_schema, max_schema_version_in_memtable, max_column_cnt_in_memtable))) { LOG_WARN("failed to get schema info from memtable", KR(ret), KPC(table)); } - } else if (table->is_direct_load_memtable()) { - // FIXME : @suzhi.yt - // ret = OB_NOT_SUPPORTED; - LOG_INFO("find a direct load memtable", KR(ret)); } } diff --git a/src/storage/tablet/ob_tablet_table_store_iterator.cpp b/src/storage/tablet/ob_tablet_table_store_iterator.cpp index 23823af4e..fa64bcff8 100644 --- a/src/storage/tablet/ob_tablet_table_store_iterator.cpp +++ b/src/storage/tablet/ob_tablet_table_store_iterator.cpp @@ -414,7 +414,7 @@ int ObTableStoreIterator::set_retire_check() if (OB_UNLIKELY(!table_ptr.is_valid())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("unexpected invalid table handle", K(ret), K(table_ptr), K(*this)); - } else if (table_ptr.table_->is_data_memtable()) { + } else if (table_ptr.table_->is_memtable()) { first_memtable = table_ptr.table_; } else { break; @@ -422,7 +422,7 @@ int ObTableStoreIterator::set_retire_check() } if (OB_SUCC(ret) && OB_NOT_NULL(first_memtable)) { - memtable::ObMemtable *memtable = static_cast(first_memtable); + ObITabletMemtable *memtable = static_cast(first_memtable); memstore_retired_ = &memtable->get_read_barrier(); } return ret;