get medium info to schedule merge dag in ss and freezer
This commit is contained in:
@ -1175,38 +1175,18 @@ int ObBasicTabletMergeCtx::get_medium_compaction_info()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObTablet *tablet = get_tablet();
|
||||
const share::ObLSID &ls_id = tablet->get_ls_id();
|
||||
const common::ObTabletID &tablet_id = tablet->get_tablet_id();
|
||||
ObArenaAllocator temp_allocator("GetMediumInfo", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()); // for load medium info
|
||||
ObMediumCompactionInfoKey medium_info_key(get_merge_version());
|
||||
ObMediumCompactionInfo *medium_info = nullptr;
|
||||
|
||||
if (OB_UNLIKELY(tablet->get_multi_version_start() > get_merge_version())) {
|
||||
ret = OB_SNAPSHOT_DISCARDED;
|
||||
LOG_ERROR("multi version data is discarded, should not execute compaction now", K(ret),
|
||||
"param", get_dag_param(), KPC(this));
|
||||
} else if (OB_FAIL(ObTabletObjLoadHelper::alloc_and_new(temp_allocator, medium_info))) {
|
||||
LOG_WARN("fail to alloc and new", K(ret));
|
||||
} else {
|
||||
SMART_VARS_2((ObTableScanParam, scan_param), (storage::ObTabletMediumInfoReader, medium_info_reader)) {
|
||||
if (OB_FAIL(ObMdsScanParamHelper::build_medium_info_scan_param(
|
||||
temp_allocator,
|
||||
ls_id,
|
||||
tablet_id,
|
||||
scan_param))) {
|
||||
LOG_WARN("fail to build scan param", K(ret), K(ls_id), K(tablet_id));
|
||||
} else if (OB_FAIL(medium_info_reader.init(*tablet, scan_param))) {
|
||||
LOG_WARN("failed to init medium info reader", K(ret), KPC(this));
|
||||
} else if (OB_FAIL(medium_info_reader.get_specified_medium_info(temp_allocator, medium_info_key, *medium_info))) {
|
||||
LOG_WARN("failed to get specified scn info", K(ret), K(medium_info_key));
|
||||
} else if (OB_UNLIKELY(!medium_info->is_valid())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("medium info is invalid", KR(ret), K(medium_info));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_FAIL(ObTabletMediumInfoReader::get_medium_info_with_merge_version(get_merge_version(),
|
||||
*tablet,
|
||||
temp_allocator,
|
||||
medium_info))) {
|
||||
LOG_WARN("fail to get medium info with merge version", K(ret), K(get_merge_version()), KPC(tablet));
|
||||
} else if (medium_info->contain_parallel_range_
|
||||
&& !parallel_merge_ctx_.is_valid()
|
||||
&& OB_FAIL(parallel_merge_ctx_.init(*medium_info))) {
|
||||
|
||||
@ -23,6 +23,7 @@
|
||||
#include "storage/ob_gc_upper_trans_helper.h"
|
||||
#include "share/schema/ob_tenant_schema_service.h"
|
||||
#include "storage/compaction/ob_schedule_tablet_func.h"
|
||||
#include "storage/tablet/ob_tablet_medium_info_reader.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
@ -1068,6 +1069,29 @@ int ObTenantTabletScheduler::schedule_merge_dag(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTenantTabletScheduler::get_co_merge_type_for_compaction(
|
||||
const int64_t merge_version,
|
||||
const storage::ObTablet &tablet,
|
||||
ObCOMajorMergePolicy::ObCOMajorMergeType &co_major_merge_type)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
co_major_merge_type = ObCOMajorMergePolicy::INVALID_CO_MAJOR_MERGE_TYPE;
|
||||
ObArenaAllocator temp_allocator("GetMediumInfo", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()); // for load medium info
|
||||
ObMediumCompactionInfo *medium_info = nullptr;
|
||||
if (OB_UNLIKELY(tablet.get_multi_version_start() > merge_version)) {
|
||||
ret = OB_SNAPSHOT_DISCARDED;
|
||||
LOG_ERROR("multi version data is discarded, should not execute compaction now", K(ret), K(tablet), K(merge_version));
|
||||
} else if (OB_FAIL(ObTabletMediumInfoReader::get_medium_info_with_merge_version(merge_version,
|
||||
tablet,
|
||||
temp_allocator,
|
||||
medium_info))) {
|
||||
LOG_WARN("fail to get medium info with merge version", K(ret), K(merge_version), K(tablet));
|
||||
} else {
|
||||
co_major_merge_type = static_cast<ObCOMajorMergePolicy::ObCOMajorMergeType>(medium_info->co_major_merge_type_);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTenantTabletScheduler::schedule_convert_co_merge_dag_net(
|
||||
const ObLSID &ls_id,
|
||||
const ObTablet &tablet,
|
||||
@ -1078,8 +1102,15 @@ int ObTenantTabletScheduler::schedule_convert_co_merge_dag_net(
|
||||
int ret = OB_SUCCESS;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
schedule_ret = OB_SUCCESS;
|
||||
if (OB_TMP_FAIL(compaction::ObTenantTabletScheduler::schedule_merge_dag(
|
||||
ls_id, tablet, compaction::ObMergeType::CONVERT_CO_MAJOR_MERGE, tablet.get_last_major_snapshot_version(), EXEC_MODE_LOCAL, &curr_dag_net_id))) {
|
||||
// do not reply on co_major_merge_type in cs replica
|
||||
ObCOMajorMergePolicy::ObCOMajorMergeType co_major_merge_type = ObCOMajorMergePolicy::INVALID_CO_MAJOR_MERGE_TYPE;
|
||||
if (OB_TMP_FAIL(compaction::ObTenantTabletScheduler::schedule_merge_dag(ls_id,
|
||||
tablet,
|
||||
compaction::ObMergeType::CONVERT_CO_MAJOR_MERGE,
|
||||
tablet.get_last_major_snapshot_version(),
|
||||
EXEC_MODE_LOCAL,
|
||||
&curr_dag_net_id,
|
||||
co_major_merge_type))) {
|
||||
if (OB_SIZE_OVERFLOW != tmp_ret && OB_EAGAIN != tmp_ret) {
|
||||
ret = tmp_ret;
|
||||
LOG_WARN("failed to schedule co merge dag net for cs replica", K(ret), K(ls_id), "tablet_id", tablet.get_tablet_id());
|
||||
|
||||
@ -223,6 +223,10 @@ public:
|
||||
const storage::ObTablet &tablet,
|
||||
const ObMergeType merge_type,
|
||||
ObCSReplicaTabletStatus &cs_replica_status);
|
||||
static int get_co_merge_type_for_compaction(
|
||||
const int64_t merge_version,
|
||||
const storage::ObTablet &tablet,
|
||||
ObCOMajorMergePolicy::ObCOMajorMergeType &co_major_merge_type);
|
||||
static int schedule_merge_dag(
|
||||
const share::ObLSID &ls_id,
|
||||
const storage::ObTablet &tablet,
|
||||
|
||||
@ -1167,8 +1167,16 @@ int ObFreezer::handle_no_active_memtable_(const ObTabletID &tablet_id,
|
||||
} else if (protected_handle->get_max_saved_version_from_medium_info_recorder() >=
|
||||
freeze_snapshot_version.get_val_for_tx() && !GCTX.is_shared_storage_mode()) {
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
if (OB_TMP_FAIL(compaction::ObTenantTabletScheduler::schedule_merge_dag(
|
||||
ls_id, *tablet, MEDIUM_MERGE, freeze_snapshot_version.get_val_for_tx(), EXEC_MODE_LOCAL))) {
|
||||
ObCOMajorMergePolicy::ObCOMajorMergeType co_major_merge_type;
|
||||
if (OB_FAIL(ObTenantTabletScheduler::get_co_merge_type_for_compaction(freeze_snapshot_version.get_val_for_tx(), *tablet, co_major_merge_type))) {
|
||||
LOG_WARN("fail to get co merge type from medium info", K(ret), K(freeze_snapshot_version), KPC(tablet));
|
||||
} else if (OB_TMP_FAIL(compaction::ObTenantTabletScheduler::schedule_merge_dag(ls_id,
|
||||
*tablet,
|
||||
MEDIUM_MERGE,
|
||||
freeze_snapshot_version.get_val_for_tx(),
|
||||
EXEC_MODE_LOCAL,
|
||||
nullptr, /*dag_net_id*/
|
||||
co_major_merge_type))) {
|
||||
if (OB_SIZE_OVERFLOW != tmp_ret && OB_EAGAIN != tmp_ret) {
|
||||
ret = tmp_ret;
|
||||
LOG_WARN("failed to schedule medium merge dag", K(ret), K(ls_id), K(tablet_id));
|
||||
|
||||
@ -12,6 +12,7 @@
|
||||
|
||||
#include "storage/tablet/ob_tablet_medium_info_reader.h"
|
||||
#include "storage/tablet/ob_tablet.h"
|
||||
#include "storage/tablet/ob_mds_scan_param_helper.h"
|
||||
|
||||
#define USING_LOG_PREFIX STORAGE
|
||||
|
||||
@ -142,6 +143,40 @@ int ObTabletMediumInfoReader::get_specified_medium_info(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTabletMediumInfoReader::get_medium_info_with_merge_version(
|
||||
const int64_t merge_version,
|
||||
const ObTablet &tablet,
|
||||
common::ObIAllocator &allocator,
|
||||
compaction::ObMediumCompactionInfo *&medium_info)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
medium_info = nullptr;
|
||||
const share::ObLSID &ls_id = tablet.get_ls_id();
|
||||
const common::ObTabletID &tablet_id = tablet.get_tablet_id();
|
||||
compaction::ObMediumCompactionInfoKey medium_info_key(merge_version);
|
||||
if (OB_FAIL(ObTabletObjLoadHelper::alloc_and_new(allocator, medium_info))) {
|
||||
LOG_WARN("fail to alloc and new", K(ret));
|
||||
} else {
|
||||
SMART_VARS_2((ObTableScanParam, scan_param), (ObTabletMediumInfoReader, medium_info_reader)) {
|
||||
if (OB_FAIL(ObMdsScanParamHelper::build_medium_info_scan_param(
|
||||
allocator,
|
||||
ls_id,
|
||||
tablet_id,
|
||||
scan_param))) {
|
||||
LOG_WARN("fail to build scan param", K(ret), K(ls_id), K(tablet_id));
|
||||
} else if (OB_FAIL(medium_info_reader.init(tablet, scan_param))) {
|
||||
LOG_WARN("fail to init medium info reader", K(ret));
|
||||
} else if (OB_FAIL(medium_info_reader.get_specified_medium_info(allocator, medium_info_key, *medium_info))) {
|
||||
LOG_WARN("fail to get specified scn info", K(ret), K(medium_info_key));
|
||||
} else if (OB_ISNULL(medium_info) || OB_UNLIKELY(!medium_info->is_valid())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("medium info is invalid", K(ret), K(medium_info));
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTabletMediumInfoReader::get_min_medium_snapshot(
|
||||
const int64_t last_major_snapshot_version,
|
||||
int64_t &min_medium_snapshot)
|
||||
|
||||
@ -27,6 +27,11 @@ class ObTabletMediumInfoReader
|
||||
public:
|
||||
ObTabletMediumInfoReader();
|
||||
~ObTabletMediumInfoReader();
|
||||
static int get_medium_info_with_merge_version(
|
||||
const int64_t merge_version,
|
||||
const ObTablet &tablet,
|
||||
common::ObIAllocator &allocator,
|
||||
compaction::ObMediumCompactionInfo *&medium_info);
|
||||
public:
|
||||
int init(
|
||||
const ObTablet &tablet,
|
||||
@ -39,7 +44,6 @@ public:
|
||||
common::ObIAllocator &allocator,
|
||||
const compaction::ObMediumCompactionInfoKey &key,
|
||||
compaction::ObMediumCompactionInfo &medium_info);
|
||||
|
||||
int get_min_medium_snapshot(
|
||||
const int64_t last_major_snapshot_version,
|
||||
int64_t &min_medium_snapshot);
|
||||
|
||||
Reference in New Issue
Block a user