swap tablet in compaction and destroy memtabls array
This commit is contained in:
parent
678650edc8
commit
44745724fc
@ -24,6 +24,7 @@
|
||||
#include "share/scheduler/ob_dag_warning_history_mgr.h"
|
||||
#include "storage/compaction/ob_medium_compaction_mgr.h"
|
||||
#include "storage/compaction/ob_medium_compaction_func.h"
|
||||
#include "src/storage/meta_mem/ob_tenant_meta_mem_mgr.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
@ -1171,6 +1172,42 @@ int ObTabletMergeCtx::prepare_merge_progress()
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTabletMergeCtx::try_swap_tablet_handle()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
// check need swap tablet when compaction
|
||||
if (OB_UNLIKELY(is_mini_merge(param_.merge_type_))) {
|
||||
ret = OB_NOT_SUPPORTED;
|
||||
LOG_WARN("mini merge not support swap tablet", K(ret), K_(param));
|
||||
} else {
|
||||
int64_t row_count = 0;
|
||||
int64_t macro_count = 0;
|
||||
const ObSSTable *table = nullptr;
|
||||
for (int64_t i = 0; i < tables_handle_.get_count(); ++i) {
|
||||
table = static_cast<const ObSSTable*>(tables_handle_.get_table(i));
|
||||
row_count += table->get_meta().get_row_count();
|
||||
macro_count += table->get_meta().get_basic_meta().get_data_macro_block_count();
|
||||
}
|
||||
if (row_count >= LARGE_VOLUME_DATA_ROW_COUNT_THREASHOLD
|
||||
|| macro_count >= LARGE_VOLUME_DATA_MACRO_COUNT_THREASHOLD) {
|
||||
ObTabletHandle alloc_handle;
|
||||
const ObTabletMapKey key(param_.ls_id_, param_.tablet_id_);
|
||||
if (OB_FAIL(MTL(ObTenantMetaMemMgr*)->get_tablet_with_allocator(
|
||||
WashTabletPriority::WTP_HIGH, key, allocator_, alloc_handle, true/*force_alloc_new*/))) {
|
||||
LOG_WARN("failed to get alloc tablet handle", K(ret), K(key));
|
||||
} else {
|
||||
tablet_handle_ = alloc_handle;
|
||||
if (OB_FAIL(alloc_handle.get_obj()->clear_memtables_on_table_store())) {
|
||||
LOG_WARN("failed to clear memtables on table_store", K(ret), K(param_));
|
||||
} else {
|
||||
LOG_INFO("success to swap tablet handle", K(ret), K(macro_count), K(row_count), K(tablet_handle_.get_obj()->get_table_store()));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTabletMergeCtx::generate_participant_table_info(char *buf, const int64_t buf_len) const
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
@ -171,6 +171,8 @@ struct ObTabletMergeCtx
|
||||
int get_schema_and_gene_from_result(const ObGetMergeTablesResult &get_merge_table_result);
|
||||
int get_storage_schema_and_gene_from_result(const ObGetMergeTablesResult &get_merge_table_result);
|
||||
int get_storage_schema_to_merge(const ObTablesHandleArray &merge_tables_handle, const bool get_schema_on_memtable = true);
|
||||
|
||||
int try_swap_tablet_handle();
|
||||
public:
|
||||
int get_medium_compaction_info_to_store();
|
||||
|
||||
@ -197,7 +199,8 @@ public:
|
||||
}
|
||||
|
||||
typedef common::ObSEArray<ObGetMergeTablesResult, ObPartitionMergePolicy::OB_MINOR_PARALLEL_INFO_ARRAY_SIZE> MinorParallelResultArray;
|
||||
|
||||
static const int64_t LARGE_VOLUME_DATA_ROW_COUNT_THREASHOLD = 1000L * 1000L; // 100w
|
||||
static const int64_t LARGE_VOLUME_DATA_MACRO_COUNT_THREASHOLD = 300L;
|
||||
// 1. init in dag
|
||||
ObTabletMergeDagParam ¶m_;
|
||||
common::ObIAllocator &allocator_;
|
||||
|
@ -648,6 +648,8 @@ int ObTabletMergeExecutePrepareTask::process()
|
||||
LOG_WARN("fail to init merge info", K(ret), K_(result), KPC(ctx_));
|
||||
} else if (OB_FAIL(ctx_->prepare_index_tree())) {
|
||||
LOG_WARN("fail to prepare sstable index tree", K(ret), KPC(ctx_));
|
||||
} else if (OB_FAIL(ctx_->try_swap_tablet_handle())) {
|
||||
LOG_WARN("failed to try swap tablet handle", K(ret));
|
||||
} else if (OB_FAIL(ObBasicTabletMergeDag::generate_merge_task(
|
||||
*static_cast<ObTabletMergeExecuteDag *>(get_dag()), *ctx_, this))) {
|
||||
LOG_WARN("Failed to generate_merge_sstable_task", K(ret));
|
||||
@ -817,6 +819,9 @@ int ObTabletMergePrepareTask::process()
|
||||
}
|
||||
|
||||
if (OB_FAIL(ret) || skip_rest_operation) {
|
||||
} else if (!is_mini_merge(ctx->param_.merge_type_)
|
||||
&& OB_FAIL(ctx->try_swap_tablet_handle())) {
|
||||
LOG_WARN("failed to try swap tablet handle", K(ret));
|
||||
} else if (OB_FAIL(ObBasicTabletMergeDag::generate_merge_task(
|
||||
*merge_dag_, *ctx, this))) {
|
||||
LOG_WARN("Failed to generate_merge_sstable_task", K(ret));
|
||||
|
@ -36,7 +36,8 @@ public:
|
||||
int get_meta_obj_with_external_memory(
|
||||
const Key &key,
|
||||
common::ObIAllocator &allocator,
|
||||
ObMetaObjGuard<T> &guard);
|
||||
ObMetaObjGuard<T> &guard,
|
||||
const bool force_alloc_new = false);
|
||||
int try_get_in_memory_meta_obj(const Key &key, bool &success, ObMetaObjGuard<T> &guard);
|
||||
int try_get_in_memory_meta_obj_and_addr(
|
||||
const Key &key,
|
||||
@ -517,7 +518,8 @@ template <typename Key, typename T>
|
||||
int ObMetaPointerMap<Key, T>::get_meta_obj_with_external_memory(
|
||||
const Key &key,
|
||||
common::ObIAllocator &allocator,
|
||||
ObMetaObjGuard<T> &guard)
|
||||
ObMetaObjGuard<T> &guard,
|
||||
const bool force_alloc_new)
|
||||
{
|
||||
int ret = common::OB_SUCCESS;
|
||||
ObMetaPointerHandle<Key, T> ptr_hdl(*this);
|
||||
@ -527,13 +529,21 @@ int ObMetaPointerMap<Key, T>::get_meta_obj_with_external_memory(
|
||||
if (OB_UNLIKELY(!key.is_valid())) {
|
||||
ret = common::OB_INVALID_ARGUMENT;
|
||||
STORAGE_LOG(WARN, "invalid argument", K(ret), K(key));
|
||||
} else if (force_alloc_new) {
|
||||
common::ObBucketHashRLockGuard lock_guard(ResourceMap::bucket_lock_, ResourceMap::hash_func_(key));
|
||||
if (OB_FAIL(ResourceMap::get_without_lock(key, ptr_hdl))) {
|
||||
if (common::OB_ENTRY_NOT_EXIST != ret) {
|
||||
STORAGE_LOG(WARN, "fail to get pointer handle", K(ret));
|
||||
}
|
||||
}
|
||||
} else if (OB_FAIL(try_get_in_memory_meta_obj(key, ptr_hdl, guard, is_in_memory))) {
|
||||
if (OB_ENTRY_NOT_EXIST == ret) {
|
||||
STORAGE_LOG(DEBUG, "meta obj does not exist", K(ret), K(key));
|
||||
} else {
|
||||
STORAGE_LOG(WARN, "fail to try get in memory meta obj", K(ret), K(key));
|
||||
}
|
||||
} else if (!is_in_memory) {
|
||||
}
|
||||
if (OB_SUCC(ret) && !is_in_memory) {
|
||||
t_ptr = ptr_hdl.get_resource_ptr();
|
||||
ObMetaDiskAddr disk_addr;
|
||||
void *buf = allocator.alloc(sizeof(T));
|
||||
|
@ -945,7 +945,8 @@ int ObTenantMetaMemMgr::get_tablet_with_allocator(
|
||||
const WashTabletPriority &priority,
|
||||
const ObTabletMapKey &key,
|
||||
common::ObIAllocator &allocator,
|
||||
ObTabletHandle &handle)
|
||||
ObTabletHandle &handle,
|
||||
const bool force_alloc_new)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
handle.reset();
|
||||
@ -955,7 +956,7 @@ int ObTenantMetaMemMgr::get_tablet_with_allocator(
|
||||
} else if (OB_UNLIKELY(!key.is_valid() || is_used_obj_pool(&allocator))) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", K(ret), K(key), KP(&allocator), KP(&allocator_));
|
||||
} else if (OB_FAIL(tablet_map_.get_meta_obj_with_external_memory(key, allocator, handle))) {
|
||||
} else if (OB_FAIL(tablet_map_.get_meta_obj_with_external_memory(key, allocator, handle, force_alloc_new))) {
|
||||
if (OB_ENTRY_NOT_EXIST != ret) {
|
||||
LOG_WARN("fail to get tablet", K(ret), K(key));
|
||||
}
|
||||
|
@ -161,7 +161,8 @@ public:
|
||||
const WashTabletPriority &priority,
|
||||
const ObTabletMapKey &key,
|
||||
common::ObIAllocator &allocator,
|
||||
ObTabletHandle &handle);
|
||||
ObTabletHandle &handle,
|
||||
const bool force_alloc_new = false);
|
||||
int get_tablet_addr(const ObTabletMapKey &key, ObMetaDiskAddr &addr);
|
||||
int has_tablet(const ObTabletMapKey &key, bool &is_exist);
|
||||
int del_tablet(const ObTabletMapKey &key);
|
||||
|
@ -3313,5 +3313,18 @@ int ObTablet::set_memtable_clog_checkpoint_scn(
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTablet::clear_memtables_on_table_store() // be careful to call this func
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_UNLIKELY(!is_inited_)) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("not inited", K(ret), K_(is_inited));
|
||||
} else {
|
||||
table_store_.clear_memtables();
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
} // namespace storage
|
||||
} // namespace oceanbase
|
||||
|
@ -388,6 +388,7 @@ public:
|
||||
int set_redefined_schema_version_in_tablet_pointer(const int64_t schema_version);
|
||||
int set_memtable_clog_checkpoint_scn(
|
||||
const ObMigrationTabletParam *tablet_meta);
|
||||
int clear_memtables_on_table_store(); // be careful to call this func, will destroy memtables array on table_store
|
||||
TO_STRING_KV(KP(this), K_(wash_score), K_(ref_cnt), K_(tablet_meta), K_(table_store), K_(storage_schema),
|
||||
K_(medium_info_list));
|
||||
private:
|
||||
|
@ -431,6 +431,19 @@ int ObTabletTableStore::update_memtables()
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTabletTableStore::clear_memtables()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_UNLIKELY(!is_valid())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("table store is unexpected invalid", K(ret), KPC(this));
|
||||
} else {
|
||||
memtables_.destroy();
|
||||
read_cache_.reset();
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTabletTableStore::init_read_cache()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -1200,7 +1213,7 @@ int64_t ObTabletTableStore::to_string(char *buf, const int64_t buf_len) const
|
||||
J_OBJ_START();
|
||||
J_NAME("ObTabletTableStore");
|
||||
J_COLON();
|
||||
J_KV(KP(this), KP_(tablet_ptr), K_(major_tables), K_(minor_tables), K_(is_ready_for_read));
|
||||
J_KV(KP(this), KP_(tablet_ptr), K_(major_tables), K_(minor_tables), K_(memtables), K_(is_ready_for_read));
|
||||
J_COMMA();
|
||||
J_ARRAY_START();
|
||||
for (int64_t i = 0; i < major_tables_.count_; ++i) {
|
||||
|
@ -98,6 +98,7 @@ public:
|
||||
int get_memtables(common::ObIArray<storage::ObITable *> &memtables, const bool need_active = false) const;
|
||||
int prepare_memtables();
|
||||
int update_memtables();
|
||||
int clear_memtables();
|
||||
int get_first_frozen_memtable(ObITable *&table);
|
||||
|
||||
int get_ddl_sstable_handles(ObTablesHandleArray &ddl_sstable_handles) const;
|
||||
|
Loading…
x
Reference in New Issue
Block a user