fix schedule major error for schema recycled
This commit is contained in:
2
deps/oblib/src/lib/utility/ob_tracepoint.h
vendored
2
deps/oblib/src/lib/utility/ob_tracepoint.h
vendored
@ -616,6 +616,8 @@ class EventTable
|
||||
EN_MEDIUM_VERIFY_GROUP_SKIP_SET_VERIFY = 707,
|
||||
EN_MEDIUM_VERIFY_GROUP_SKIP_COLUMN_CHECKSUM = 708,
|
||||
EN_SCHEDULE_MEDIUM_COMPACTION = 709,
|
||||
EN_SCHEDULE_MAJOR_GET_TABLE_SCHEMA = 710,
|
||||
EN_SKIP_INDEX_MAJOR = 711,
|
||||
|
||||
// please add new trace point after 750
|
||||
EN_SESSION_LEAK_COUNT_THRESHOLD = 751,
|
||||
|
||||
@ -931,5 +931,7 @@ PCODE_DEF(OB_GAIS_CLEAR_AUTO_INC_CACHE, 0x1554)
|
||||
|
||||
PCODE_DEF(OB_SYNC_REWRITE_RULES, 0x1556)
|
||||
PCODE_DEF(OB_ADMIN_SYNC_REWRITE_RULES, 0x1557)
|
||||
PCODE_DEF(OB_TABLET_MAJOR_FREEZE, 0x1558)
|
||||
|
||||
// 1561-1570 for lob access
|
||||
PCODE_DEF(OB_LOB_QUERY, 0x1561)
|
||||
|
||||
@ -35,6 +35,7 @@
|
||||
#include "share/ob_column_checksum_error_operator.h"
|
||||
#include "share/ob_tablet_meta_table_compaction_operator.h"
|
||||
#include "share/ob_server_table_operator.h"
|
||||
#include "lib/utility/ob_tracepoint.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
@ -562,7 +563,17 @@ int ObMajorMergeScheduler::handle_all_zone_merge(
|
||||
LOG_INFO("check updating merge status", K_(tenant_id), K(zone), K(merged), K(cur_all_merged_scn),
|
||||
K(cur_merged_scn), K(info),"smallest_snapshot_scn", progress->smallest_snapshot_scn_,
|
||||
"merged_cnt", progress->merged_tablet_cnt_, "unmerged_cnt", progress->unmerged_tablet_cnt_);
|
||||
|
||||
#ifdef ERRSIM
|
||||
ret = OB_E(EventTable::EN_SKIP_INDEX_MAJOR) ret;
|
||||
if (OB_FAIL(ret)) {
|
||||
ret = OB_SUCCESS;
|
||||
cur_merged_scn = info.broadcast_scn();
|
||||
cur_all_merged_scn = info.broadcast_scn();
|
||||
merged = true;
|
||||
all_merged = true;
|
||||
LOG_INFO("set errsim", K(ret), K(cur_merged_scn), K(cur_all_merged_scn), K(info), K(merged), K(all_merged));
|
||||
}
|
||||
#endif
|
||||
if (OB_SUCC(ret) && merged) {
|
||||
// cur_all_merged_scn >= cur_merged_scn
|
||||
// 1. Equal: snapshot_version of all tablets change to frozen_scn after major compaction
|
||||
@ -614,6 +625,14 @@ int ObMajorMergeScheduler::handle_all_zone_merge(
|
||||
"/unverified tables", K(all_merged), K(exist_uncompacted), K(exist_unverified));
|
||||
}
|
||||
}
|
||||
#ifdef ERRSIM
|
||||
ret = OB_E(EventTable::EN_SKIP_INDEX_MAJOR) ret;
|
||||
if (OB_FAIL(ret)) {
|
||||
ret = OB_SUCCESS;
|
||||
all_merged = true;
|
||||
LOG_INFO("set errsim", K(ret), K(expected_epoch), K(all_merged));
|
||||
}
|
||||
#endif
|
||||
|
||||
// 3. execute final operations after all merged
|
||||
if (OB_SUCC(ret) && all_merged) {
|
||||
|
||||
@ -11769,6 +11769,18 @@ static const _error _error_OB_ERR_INVALID_ARGUMENT_FOR_JSON_CALL = {
|
||||
.oracle_str_error = "PLS-00185: invalid argument for JSON call",
|
||||
.oracle_str_user_error = "PLS-00185: invalid argument for %s call"
|
||||
};
|
||||
static const _error _error_OB_ERR_SCHEMA_HISTORY_EMPTY = {
|
||||
.error_name = "OB_ERR_SCHEMA_HISTORY_EMPTY",
|
||||
.error_cause = "Internal Error",
|
||||
.error_solution = "Contact OceanBase Support",
|
||||
.mysql_errno = -1,
|
||||
.sqlstate = "HY000",
|
||||
.str_error = "Schema history is empty",
|
||||
.str_user_error = "Schema history is empty",
|
||||
.oracle_errno = 600,
|
||||
.oracle_str_error = "ORA-00600: internal error code, arguments: -5489, Schema history is empty",
|
||||
.oracle_str_user_error = "ORA-00600: internal error code, arguments: -5489, Schema history is empty"
|
||||
};
|
||||
static const _error _error_OB_ERR_SP_ALREADY_EXISTS = {
|
||||
.error_name = "OB_ERR_SP_ALREADY_EXISTS",
|
||||
.error_cause = "Internal Error",
|
||||
@ -25091,6 +25103,7 @@ struct ObStrErrorInit
|
||||
_errors[-OB_ERR_INVALID_DEFAULT_VALUE_PROVIDED] = &_error_OB_ERR_INVALID_DEFAULT_VALUE_PROVIDED;
|
||||
_errors[-OB_ERR_PATH_EXPRESSION_NOT_LITERAL] = &_error_OB_ERR_PATH_EXPRESSION_NOT_LITERAL;
|
||||
_errors[-OB_ERR_INVALID_ARGUMENT_FOR_JSON_CALL] = &_error_OB_ERR_INVALID_ARGUMENT_FOR_JSON_CALL;
|
||||
_errors[-OB_ERR_SCHEMA_HISTORY_EMPTY] = &_error_OB_ERR_SCHEMA_HISTORY_EMPTY;
|
||||
_errors[-OB_ERR_SP_ALREADY_EXISTS] = &_error_OB_ERR_SP_ALREADY_EXISTS;
|
||||
_errors[-OB_ERR_SP_DOES_NOT_EXIST] = &_error_OB_ERR_SP_DOES_NOT_EXIST;
|
||||
_errors[-OB_ERR_SP_UNDECLARED_VAR] = &_error_OB_ERR_SP_UNDECLARED_VAR;
|
||||
|
||||
@ -1088,6 +1088,7 @@ DEFINE_ORACLE_ERROR_EXT_DEP(OB_ERR_JSON_KEY_NOT_FOUND, -5485, -1, "42000", "JSON
|
||||
DEFINE_ORACLE_ERROR(OB_ERR_INVALID_DEFAULT_VALUE_PROVIDED, -5486, -1, "42000", "Invalid numeric", 40451, "invalid default value provided");
|
||||
DEFINE_ORACLE_ERROR(OB_ERR_PATH_EXPRESSION_NOT_LITERAL, -5487, -1, "42000", "path expression not a literal", 40454, "path expression not a literal");
|
||||
DEFINE_PLS_ERROR_EXT(OB_ERR_INVALID_ARGUMENT_FOR_JSON_CALL, -5488, -1, "HY000", "invalid argument for JSON call", "invalid argument for %s call", 185, "invalid argument for JSON call", "invalid argument for %s call");
|
||||
DEFINE_ERROR(OB_ERR_SCHEMA_HISTORY_EMPTY, -5489, -1, "HY000", "Schema history is empty");
|
||||
|
||||
|
||||
DEFINE_ERROR_EXT(OB_ERR_SP_ALREADY_EXISTS, -5541, ER_SP_ALREADY_EXISTS, "42000", "procedure/function already exists", "%s %.*s already exists");
|
||||
|
||||
@ -816,6 +816,7 @@ constexpr int OB_ERR_INVALID_VARIABLE_IN_JSON_PATH = -5484;
|
||||
constexpr int OB_ERR_INVALID_DEFAULT_VALUE_PROVIDED = -5486;
|
||||
constexpr int OB_ERR_PATH_EXPRESSION_NOT_LITERAL = -5487;
|
||||
constexpr int OB_ERR_INVALID_ARGUMENT_FOR_JSON_CALL = -5488;
|
||||
constexpr int OB_ERR_SCHEMA_HISTORY_EMPTY = -5489;
|
||||
constexpr int OB_ERR_SP_ALREADY_EXISTS = -5541;
|
||||
constexpr int OB_ERR_SP_DOES_NOT_EXIST = -5542;
|
||||
constexpr int OB_ERR_SP_UNDECLARED_VAR = -5543;
|
||||
@ -2661,6 +2662,7 @@ constexpr int OB_ERR_INVALID_DATE_MSG_FMT_V2 = -4219;
|
||||
#define OB_ERR_INVALID_DEFAULT_VALUE_PROVIDED__USER_ERROR_MSG "Invalid numeric"
|
||||
#define OB_ERR_PATH_EXPRESSION_NOT_LITERAL__USER_ERROR_MSG "path expression not a literal"
|
||||
#define OB_ERR_INVALID_ARGUMENT_FOR_JSON_CALL__USER_ERROR_MSG "invalid argument for %s call"
|
||||
#define OB_ERR_SCHEMA_HISTORY_EMPTY__USER_ERROR_MSG "Schema history is empty"
|
||||
#define OB_ERR_SP_ALREADY_EXISTS__USER_ERROR_MSG "%s %.*s already exists"
|
||||
#define OB_ERR_SP_DOES_NOT_EXIST__USER_ERROR_MSG "%s %.*s.%.*s does not exist"
|
||||
#define OB_ERR_SP_UNDECLARED_VAR__USER_ERROR_MSG "Undeclared variable: %.*s"
|
||||
@ -4671,6 +4673,7 @@ constexpr int OB_ERR_INVALID_DATE_MSG_FMT_V2 = -4219;
|
||||
#define OB_ERR_INVALID_DEFAULT_VALUE_PROVIDED__ORA_USER_ERROR_MSG "ORA-40451: invalid default value provided"
|
||||
#define OB_ERR_PATH_EXPRESSION_NOT_LITERAL__ORA_USER_ERROR_MSG "ORA-40454: path expression not a literal"
|
||||
#define OB_ERR_INVALID_ARGUMENT_FOR_JSON_CALL__ORA_USER_ERROR_MSG "PLS-00185: invalid argument for %s call"
|
||||
#define OB_ERR_SCHEMA_HISTORY_EMPTY__ORA_USER_ERROR_MSG "ORA-00600: internal error code, arguments: -5489, Schema history is empty"
|
||||
#define OB_ERR_SP_ALREADY_EXISTS__ORA_USER_ERROR_MSG "ORA-00600: internal error code, arguments: -5541, %s %.*s already exists"
|
||||
#define OB_ERR_SP_DOES_NOT_EXIST__ORA_USER_ERROR_MSG "ORA-00600: internal error code, arguments: -5542, %s %.*s.%.*s does not exist"
|
||||
#define OB_ERR_SP_UNDECLARED_VAR__ORA_USER_ERROR_MSG "PLS-00201: identifier '%.*s' must be declared"
|
||||
|
||||
@ -275,8 +275,8 @@ int ObSchemaRetrieveUtils::retrieve_table_schema(
|
||||
SHARE_SCHEMA_LOG(DEBUG, "retrieve table schema");
|
||||
if (OB_FAIL(result.next())) {
|
||||
if (ret == common::OB_ITER_END) { //no record
|
||||
ret = common::OB_ERR_UNEXPECTED;
|
||||
SHARE_SCHEMA_LOG(WARN, "no row", K(ret));
|
||||
ret = common::OB_ERR_SCHEMA_HISTORY_EMPTY;
|
||||
SHARE_SCHEMA_LOG(WARN, "schema history is empty", KR(ret));
|
||||
} else {
|
||||
SHARE_SCHEMA_LOG(WARN, "get table schema failed, iter quit", K(ret));
|
||||
}
|
||||
@ -314,8 +314,8 @@ int ObSchemaRetrieveUtils::retrieve_tablegroup_schema(
|
||||
SHARE_SCHEMA_LOG(DEBUG, "retrieve tablegroup schema");
|
||||
if (OB_FAIL(result.next())) {
|
||||
if (ret == common::OB_ITER_END) { //no record
|
||||
ret = common::OB_ERR_UNEXPECTED;
|
||||
SHARE_SCHEMA_LOG(WARN, "no row", K(ret));
|
||||
ret = common::OB_ERR_SCHEMA_HISTORY_EMPTY;
|
||||
SHARE_SCHEMA_LOG(WARN, "schema history is empty", KR(ret));
|
||||
} else {
|
||||
SHARE_SCHEMA_LOG(WARN, "get tablegroup schema failed, iter quit", K(ret));
|
||||
}
|
||||
|
||||
@ -45,15 +45,27 @@ ObMediumCompactionScheduleFunc::PrepareTableIter ObMediumCompactionScheduleFunc:
|
||||
ObMediumCompactionScheduleFunc::prepare_iter_for_major,
|
||||
};
|
||||
|
||||
int64_t ObMediumCompactionScheduleFunc::to_string(char *buf, const int64_t buf_len) const
|
||||
{
|
||||
int64_t pos = 0;
|
||||
if (OB_ISNULL(buf) || buf_len <= 0) {
|
||||
} else {
|
||||
J_OBJ_START();
|
||||
J_KV("ls_id", ls_.get_ls_id(), "tablet_id", tablet_.get_tablet_meta().tablet_id_);
|
||||
J_OBJ_END();
|
||||
}
|
||||
return pos;
|
||||
}
|
||||
|
||||
int ObMediumCompactionScheduleFunc::choose_medium_snapshot(
|
||||
ObLS &ls,
|
||||
ObTablet &tablet,
|
||||
const int64_t schedule_medium_snapshot,
|
||||
const ObAdaptiveMergePolicy::AdaptiveMergeReason &merge_reason,
|
||||
ObIAllocator &allocator,
|
||||
ObMediumCompactionInfo &medium_info,
|
||||
ObGetMergeTablesResult &result)
|
||||
{
|
||||
UNUSED(schedule_medium_snapshot);
|
||||
UNUSED(allocator);
|
||||
int ret = OB_SUCCESS;
|
||||
ObGetMergeTablesParam param;
|
||||
param.merge_type_ = META_MAJOR_MERGE;
|
||||
@ -78,25 +90,64 @@ int ObMediumCompactionScheduleFunc::choose_medium_snapshot(
|
||||
int ObMediumCompactionScheduleFunc::choose_major_snapshot(
|
||||
ObLS &ls,
|
||||
ObTablet &tablet,
|
||||
const int64_t schedule_medium_snapshot,
|
||||
const ObAdaptiveMergePolicy::AdaptiveMergeReason &merge_reason,
|
||||
ObIAllocator &allocator,
|
||||
ObMediumCompactionInfo &medium_info,
|
||||
ObGetMergeTablesResult &result)
|
||||
{
|
||||
UNUSED(merge_reason);
|
||||
int ret = OB_SUCCESS;
|
||||
const ObLSID &ls_id = ls.get_ls_id();
|
||||
const ObTabletID &tablet_id = tablet.get_tablet_meta().tablet_id_;
|
||||
ObTenantFreezeInfoMgr::FreezeInfo freeze_info;
|
||||
if (OB_FAIL(MTL_CALL_FREEZE_INFO_MGR(get_freeze_info_by_snapshot_version, schedule_medium_snapshot, freeze_info))) {
|
||||
int64_t schedule_snapshot = 0;
|
||||
const int64_t scheduler_frozen_version = MTL(ObTenantTabletScheduler*)->get_frozen_version();
|
||||
const ObMediumCompactionInfoList &medium_list = tablet.get_medium_compaction_info_list();
|
||||
ObITable *last_major = tablet.get_table_store().get_major_sstables().get_boundary_table(true/*last*/);
|
||||
bool schedule_with_newer_info = false;
|
||||
|
||||
if (OB_ISNULL(last_major)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("major sstable is unexpected null", K(ret), KPC(last_major));
|
||||
} else {
|
||||
schedule_snapshot = last_major->get_snapshot_version();
|
||||
}
|
||||
while (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(MTL_CALL_FREEZE_INFO_MGR(get_freeze_info_behind_snapshot_version, schedule_snapshot, freeze_info))) {
|
||||
if (OB_ENTRY_NOT_EXIST != ret) {
|
||||
LOG_WARN("failed to get freeze info", K(ret), K(schedule_medium_snapshot), "ls_id", ls.get_ls_id(),
|
||||
"tablet_id", tablet.get_tablet_meta().tablet_id_);
|
||||
LOG_WARN("failed to get freeze info", K(ret), K(schedule_snapshot), K(ls_id), K(tablet_id));
|
||||
} else {
|
||||
ret = OB_NO_NEED_MERGE;
|
||||
}
|
||||
} else if (OB_UNLIKELY(freeze_info.schema_version <= 0)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("schema version is invalid", K(ret), K(ls_id), K(tablet_id), K(freeze_info));
|
||||
} else if (OB_FAIL(ObMediumCompactionScheduleFunc::get_table_schema_to_merge(
|
||||
tablet, freeze_info.schema_version, allocator, medium_info))) {
|
||||
if (OB_TABLE_IS_DELETED == ret) {
|
||||
// do nothing
|
||||
} else if (OB_ERR_SCHEMA_HISTORY_EMPTY == ret) {
|
||||
if (freeze_info.freeze_version <= scheduler_frozen_version) {
|
||||
FLOG_INFO("table schema may recycled, use newer freeze info instead", K(ret), KPC(last_major), K(freeze_info),
|
||||
K(scheduler_frozen_version));
|
||||
schedule_snapshot = freeze_info.freeze_version;
|
||||
schedule_with_newer_info = true;
|
||||
ret = OB_SUCCESS;
|
||||
}
|
||||
} else {
|
||||
LOG_WARN("failed to get table schema", K(ret), K(ls_id), K(tablet_id), K(medium_info));
|
||||
}
|
||||
} else { // success to get table schema
|
||||
if (schedule_with_newer_info) {
|
||||
LOG_INFO("schedule with newer freeze info", K(ret), K(ls_id), K(tablet_id), K(freeze_info));
|
||||
}
|
||||
break;
|
||||
}
|
||||
} // end of while
|
||||
if (OB_SUCC(ret)) {
|
||||
medium_info.compaction_type_ = ObMediumCompactionInfo::MAJOR_COMPACTION;
|
||||
medium_info.medium_merge_reason_ = ObAdaptiveMergePolicy::AdaptiveMergeReason::NONE;
|
||||
medium_info.medium_snapshot_ = schedule_medium_snapshot;
|
||||
medium_info.medium_snapshot_ = freeze_info.freeze_version;
|
||||
result.schema_version_ = freeze_info.schema_version;
|
||||
LOG_TRACE("choose_major_snapshot", K(ret), "ls_id", ls.get_ls_id(),
|
||||
"tablet_id", tablet.get_tablet_meta().tablet_id_, K(medium_info), K(freeze_info));
|
||||
@ -136,6 +187,18 @@ int ObMediumCompactionScheduleFunc::schedule_next_medium_for_leader(
|
||||
LOG_WARN("failed to get ls role", K(ret), KPC(this));
|
||||
} else if (LEADER == role) {
|
||||
// only log_handler_leader can schedule
|
||||
#ifdef ERRSIM
|
||||
ret = OB_E(EventTable::EN_SKIP_INDEX_MAJOR) ret;
|
||||
// skip schedule major for user index table
|
||||
if (OB_FAIL(ret)) {
|
||||
if (tablet_.get_tablet_meta().tablet_id_.id() > ObTabletID::MIN_USER_TABLET_ID
|
||||
&& tablet_.get_tablet_meta().tablet_id_ != tablet_.get_tablet_meta().data_tablet_id_) {
|
||||
return ret;
|
||||
} else {
|
||||
ret = OB_SUCCESS;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
ret = schedule_next_medium_primary_cluster(major_snapshot);
|
||||
}
|
||||
return ret;
|
||||
@ -152,13 +215,14 @@ int ObMediumCompactionScheduleFunc::schedule_next_medium_primary_cluster(
|
||||
int64_t max_sync_medium_scn = 0;
|
||||
ObITable *last_major = tablet_.get_table_store().get_major_sstables().get_boundary_table(true/*last*/);
|
||||
const ObMediumCompactionInfoList &medium_list = tablet_.get_medium_compaction_info_list();
|
||||
const bool is_major = 0 != schedule_major_snapshot;
|
||||
if (OB_ISNULL(last_major)) {
|
||||
// no major, do nothing
|
||||
} else if (!medium_list.could_schedule_next_round()) { // check serialized list
|
||||
// do nothing
|
||||
} else if (OB_FAIL(tablet_.get_max_sync_medium_scn(max_sync_medium_scn))) { // check info in memory
|
||||
LOG_WARN("failed to get max sync medium scn", K(ret), K(max_sync_medium_scn));
|
||||
} else if (0 != schedule_major_snapshot && schedule_major_snapshot > max_sync_medium_scn) {
|
||||
} else if (is_major && schedule_major_snapshot > max_sync_medium_scn) {
|
||||
schedule_medium_flag = true;
|
||||
} else if (nullptr != last_major && last_major->get_snapshot_version() < max_sync_medium_scn) {
|
||||
// do nothing
|
||||
@ -189,10 +253,10 @@ int ObMediumCompactionScheduleFunc::schedule_next_medium_primary_cluster(
|
||||
if (OB_FAIL(get_status_from_inner_table(ret_info))) {
|
||||
LOG_WARN("failed to get status from inner tablet", K(ret), KPC(this));
|
||||
} else if (ret_info.could_schedule_next_round(last_major->get_snapshot_version())) {
|
||||
ret = decide_medium_snapshot(schedule_major_snapshot);
|
||||
ret = decide_medium_snapshot(is_major);
|
||||
}
|
||||
} else {
|
||||
ret = decide_medium_snapshot(schedule_major_snapshot, adaptive_merge_reason);
|
||||
ret = decide_medium_snapshot(is_major, adaptive_merge_reason);
|
||||
}
|
||||
|
||||
return ret;
|
||||
@ -220,25 +284,24 @@ int ObMediumCompactionScheduleFunc::get_max_reserved_snapshot(int64_t &max_reser
|
||||
}
|
||||
|
||||
int ObMediumCompactionScheduleFunc::decide_medium_snapshot(
|
||||
const int64_t schedule_medium_snapshot,
|
||||
const bool is_major,
|
||||
const ObAdaptiveMergePolicy::AdaptiveMergeReason merge_reason)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
const ObTabletID &tablet_id = tablet_.get_tablet_meta().tablet_id_;
|
||||
int64_t max_sync_medium_scn = 0;
|
||||
LOG_DEBUG("decide_medium_snapshot", K(ret), KPC(this), K(schedule_medium_snapshot));
|
||||
LOG_TRACE("decide_medium_snapshot", K(ret), KPC(this), K(tablet_id));
|
||||
if (OB_FAIL(tablet_.get_max_sync_medium_scn(max_sync_medium_scn))) {
|
||||
LOG_WARN("failed to get max sync medium scn", K(ret), KPC(this));
|
||||
} else if (OB_FAIL(ls_.add_dependent_medium_tablet(tablet_id))) { // add dependent_id in ObLSReservedSnapshotMgr
|
||||
LOG_WARN("failed to add dependent tablet", K(ret), KPC(this));
|
||||
} else {
|
||||
const bool is_major = (0 != schedule_medium_snapshot);
|
||||
int64_t max_reserved_snapshot = 0;
|
||||
ObGetMergeTablesResult result;
|
||||
ObMediumCompactionInfo medium_info;
|
||||
uint64_t compat_version = 0;
|
||||
if (OB_FAIL(choose_medium_scn[is_major](ls_, tablet_, schedule_medium_snapshot, merge_reason, medium_info, result))) {
|
||||
if (OB_FAIL(choose_medium_scn[is_major](ls_, tablet_, merge_reason, allocator_, medium_info, result))) {
|
||||
if (OB_NO_NEED_MERGE != ret) {
|
||||
LOG_WARN("failed to choose medium snapshot", K(ret), KPC(this));
|
||||
}
|
||||
@ -448,17 +511,7 @@ int ObMediumCompactionScheduleFunc::prepare_medium_info(
|
||||
ObTableStoreIterator table_iter;
|
||||
medium_info.cluster_id_ = GCONF.cluster_id; // set cluster id
|
||||
|
||||
if (medium_info.is_major_compaction()) {
|
||||
// get table schema
|
||||
if (OB_UNLIKELY(result.schema_version_ <= 0)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("schema version is invalid", K(ret), K(result), KPC(this), K(medium_info));
|
||||
} else if (OB_FAIL(get_table_schema_to_merge(result.schema_version_, medium_info))) {
|
||||
if (OB_TABLE_IS_DELETED != ret) {
|
||||
LOG_WARN("failed to get table schema", K(ret), KPC(this), K(medium_info));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if (medium_info.is_medium_compaction()) {
|
||||
ObStorageSchema tmp_storage_schema;
|
||||
bool use_storage_schema_on_tablet = true;
|
||||
if (medium_info.medium_snapshot_ > tablet_.get_snapshot_version()) {
|
||||
@ -519,12 +572,14 @@ int ObMediumCompactionScheduleFunc::get_table_id(
|
||||
}
|
||||
|
||||
int ObMediumCompactionScheduleFunc::get_table_schema_to_merge(
|
||||
const ObTablet &tablet,
|
||||
const int64_t schema_version,
|
||||
ObIAllocator &allocator,
|
||||
ObMediumCompactionInfo &medium_info)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const uint64_t tenant_id = MTL_ID();
|
||||
const ObTabletID &tablet_id = tablet_.get_tablet_meta().tablet_id_;
|
||||
const ObTabletID &tablet_id = tablet.get_tablet_meta().tablet_id_;
|
||||
uint64_t table_id = OB_INVALID_ID;
|
||||
ObMultiVersionSchemaService *schema_service = nullptr;
|
||||
schema::ObSchemaGetterGuard schema_guard;
|
||||
@ -542,30 +597,42 @@ int ObMediumCompactionScheduleFunc::get_table_schema_to_merge(
|
||||
table_id,
|
||||
schema_guard,
|
||||
save_schema_version))) {
|
||||
if (OB_TABLE_IS_DELETED != ret) {
|
||||
LOG_WARN("Fail to get schema", K(ret), K(tenant_id), K(schema_version), K(table_id));
|
||||
} else {
|
||||
if (OB_TABLE_IS_DELETED == ret) {
|
||||
LOG_WARN("table is deleted", K(ret), K(table_id));
|
||||
} else if (OB_ERR_SCHEMA_HISTORY_EMPTY == ret) {
|
||||
LOG_WARN("schema history may recycle", K(ret));
|
||||
} else {
|
||||
LOG_WARN("Fail to get schema", K(ret), K(tenant_id), K(schema_version), K(table_id));
|
||||
}
|
||||
} else if (save_schema_version < schema_version) {
|
||||
ret = OB_SCHEMA_ERROR;
|
||||
LOG_WARN("can not use older schema version", K(ret), K(schema_version), K(save_schema_version), K(table_id));
|
||||
} else if (OB_FAIL(schema_guard.get_table_schema(tenant_id, table_id, table_schema))) {
|
||||
LOG_WARN("Fail to get table schema", K(ret), K(table_id));
|
||||
} else if (NULL == table_schema) {
|
||||
if (OB_FAIL(schema_service->get_tenant_full_schema_guard(tenant_id, schema_guard))) {
|
||||
LOG_WARN("Fail to get schema", K(ret), K(tenant_id));
|
||||
} else if (OB_FAIL(schema_guard.get_table_schema(tenant_id, table_id, table_schema))) {
|
||||
LOG_WARN("Fail to get table schema", K(ret), K(table_id));
|
||||
} else if (NULL == table_schema) {
|
||||
ret = OB_TABLE_IS_DELETED;
|
||||
LOG_WARN("table is deleted", K(ret), K(table_id));
|
||||
}
|
||||
#ifdef ERRSIM
|
||||
static bool have_set_errno = false;
|
||||
static ObTabletID errno_tablet_id;
|
||||
ret = OB_E(EventTable::EN_SCHEDULE_MAJOR_GET_TABLE_SCHEMA) ret;
|
||||
if (OB_FAIL(ret)) {
|
||||
if (tablet_id.id() > ObTabletID::MIN_USER_TABLET_ID
|
||||
&& tablet_id != tablet.get_tablet_meta().data_tablet_id_
|
||||
&& ATOMIC_BCAS(&have_set_errno, false, true)) {
|
||||
LOG_INFO("set get table schema failed with errsim", K(ret), K(table_id), K(tablet_id));
|
||||
errno_tablet_id = tablet_id;
|
||||
return ret;
|
||||
} else {
|
||||
ret = OB_SUCCESS;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
if (FAILEDx(medium_info.storage_schema_.init(
|
||||
allocator_,
|
||||
allocator,
|
||||
*table_schema,
|
||||
tablet_.get_tablet_meta().compat_mode_))) {
|
||||
tablet.get_tablet_meta().compat_mode_))) {
|
||||
LOG_WARN("failed to init storage schema", K(ret), K(schema_version));
|
||||
} else {
|
||||
FLOG_INFO("get schema to merge", K(table_id), K(schema_version), K(save_schema_version),
|
||||
|
||||
@ -46,16 +46,22 @@ public:
|
||||
const ObIArray<ObITable *> &memtables,
|
||||
ObMediumCompactionInfoList &medium_list);
|
||||
static int get_palf_role(const share::ObLSID &ls_id, ObRole &role);
|
||||
static int get_table_schema_to_merge(
|
||||
const ObTablet &tablet,
|
||||
const int64_t schema_version,
|
||||
ObIAllocator &allocator,
|
||||
ObMediumCompactionInfo &medium_info);
|
||||
|
||||
int schedule_next_medium_for_leader(const int64_t major_snapshot);
|
||||
|
||||
int decide_medium_snapshot(
|
||||
const int64_t schedule_medium_snapshot,
|
||||
const bool is_major,
|
||||
const ObAdaptiveMergePolicy::AdaptiveMergeReason merge_reason = ObAdaptiveMergePolicy::AdaptiveMergeReason::NONE);
|
||||
|
||||
int check_medium_finish();
|
||||
|
||||
TO_STRING_KV("ls_id", ls_.get_ls_id(), "tablet_id", tablet_.get_tablet_meta().tablet_id_);
|
||||
int64_t to_string(char* buf, const int64_t buf_len) const;
|
||||
|
||||
protected:
|
||||
int get_status_from_inner_table(share::ObTabletCompactionScnInfo &ret_info);
|
||||
int prepare_medium_info(const ObGetMergeTablesResult &result, ObMediumCompactionInfo &medium_info);
|
||||
@ -86,15 +92,15 @@ protected:
|
||||
static int choose_medium_snapshot(
|
||||
ObLS &ls,
|
||||
ObTablet &tablet,
|
||||
const int64_t schedule_medium_snapshot,
|
||||
const ObAdaptiveMergePolicy::AdaptiveMergeReason &merge_reason,
|
||||
ObIAllocator &allocator,
|
||||
ObMediumCompactionInfo &medium_info,
|
||||
ObGetMergeTablesResult &result);
|
||||
static int choose_major_snapshot(
|
||||
ObLS &ls,
|
||||
ObTablet &tablet,
|
||||
const int64_t schedule_medium_snapshot,
|
||||
const ObAdaptiveMergePolicy::AdaptiveMergeReason &merge_reason,
|
||||
ObIAllocator &allocator,
|
||||
ObMediumCompactionInfo &medium_info,
|
||||
ObGetMergeTablesResult &result);
|
||||
static int check_need_merge_and_schedule(
|
||||
@ -104,8 +110,6 @@ protected:
|
||||
const ObMediumCompactionInfo::ObCompactionType compaction_type);
|
||||
int schedule_next_medium_primary_cluster(const int64_t major_snapshot);
|
||||
|
||||
int get_table_schema_to_merge(const int64_t schema_version, ObMediumCompactionInfo &medium_info);
|
||||
|
||||
int get_max_reserved_snapshot(int64_t &max_reserved_snapshot);
|
||||
static int get_schedule_medium_from_memtable(
|
||||
ObTablet &tablet,
|
||||
@ -125,8 +129,8 @@ protected:
|
||||
typedef int (*ChooseMediumScn)(
|
||||
ObLS &ls,
|
||||
ObTablet &tablet,
|
||||
const int64_t schedule_medium_snapshot,
|
||||
const ObAdaptiveMergePolicy::AdaptiveMergeReason &merge_reason,
|
||||
ObIAllocator &allocator,
|
||||
ObMediumCompactionInfo &medium_info,
|
||||
ObGetMergeTablesResult &result);
|
||||
static ChooseMediumScn choose_medium_scn[MEDIUM_FUNC_CNT];
|
||||
|
||||
@ -88,8 +88,12 @@ int64_t ObParalleMergeInfo::to_paral_info_string(char *buf, const int64_t buf_le
|
||||
J_OBJ_START();
|
||||
for (int i = 0; i < ARRAY_IDX_MAX; ++i) {
|
||||
J_OBJ_START();
|
||||
if (0 == info_[i].count_ && 0 == info_[i].sum_value_) {
|
||||
J_KV("type", get_para_info_str(i), "info", "EMPTY");
|
||||
} else {
|
||||
J_KV("type", get_para_info_str(i), "min", info_[i].min_value_, "max", info_[i].max_value_,
|
||||
"avg", info_[i].count_ > 0 ? info_[i].sum_value_ / info_[i].count_ : 0);
|
||||
}
|
||||
J_OBJ_END();
|
||||
J_COMMA();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user