fix parallel range split problem

This commit is contained in:
chaser-ch 2023-06-13 03:12:25 +00:00 committed by ob-robot
parent eaff13368a
commit 0c27bf4af7
7 changed files with 97 additions and 50 deletions

View File

@ -233,54 +233,6 @@ int ObParallelMergeCtx::init_parallel_major_merge(compaction::ObTabletMergeCtx &
parallel_type_ = PARALLEL_MAJOR;
}
}
/*
else {
bool is_incremental = false;
ObPartitionIncrementalRangeSpliter incremental_range_spliter;
if (OB_FAIL(incremental_range_spliter.init(merge_ctx, allocator_))) {
STORAGE_LOG(WARN, "Failed to init incremental range spliter", KR(ret), K(merge_ctx));
} else if (OB_FAIL(incremental_range_spliter.check_is_incremental(is_incremental))) {
STORAGE_LOG(WARN, "Failed to check is incremental", KR(ret));
} else if (is_incremental) {
// split ranges by incremental data
if (OB_FAIL(incremental_range_spliter.split_ranges(range_array_))) {
STORAGE_LOG(WARN, "Failed to split ranges", KR(ret));
}
} else {
// split ranges by major sstable
ObSEArray<ObStoreRange, 64> store_ranges;
ObPartitionMajorSSTableRangeSpliter major_sstable_range_spliter;
if (OB_ISNULL(merge_ctx.tables_handle_.get_table(0))) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "unexpected null first table", K(ret), K(merge_ctx.tables_handle_));
} else if (OB_UNLIKELY(!merge_ctx.tables_handle_.get_table(0)->is_major_sstable())) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "first table must be major sstable", K(ret), K(merge_ctx.tables_handle_));
} else if (OB_FAIL(major_sstable_range_spliter.init(
merge_ctx.tablet_handle_.get_obj()->get_index_read_info(),
static_cast<ObSSTable *>(merge_ctx.tables_handle_.get_table(0)),
merge_ctx.schema_ctx_.merge_schema_->get_tablet_size(),
allocator_))) {
STORAGE_LOG(WARN, "Failed to init major sstable range spliter", KR(ret), K(merge_ctx));
} else if (OB_FAIL(major_sstable_range_spliter.split_ranges(store_ranges))) {
STORAGE_LOG(WARN, "Failed to split ranges", KR(ret));
}
for (int64_t i = 0; OB_SUCC(ret) && i < store_ranges.count(); i++) {
ObDatumRange datum_range;
if (OB_FAIL(datum_range.from_range(store_ranges.at(i), allocator_))) {
STORAGE_LOG(WARN, "Failed to transfer store range to datum range", K(ret), K(i), K(store_ranges.at(i)));
} else if (OB_FAIL(range_array_.push_back(datum_range))) {
STORAGE_LOG(WARN, "Failed to push back merge range to array", K(ret), K(datum_range));
}
}
}
if (OB_SUCC(ret)) {
concurrent_cnt_ = range_array_.count();
parallel_type_ = PARALLEL_MAJOR;
STORAGE_LOG(INFO, "Succ to get parallel major merge ranges", K_(concurrent_cnt), K_(range_array));
}
}
*/
return ret;
}
@ -473,7 +425,9 @@ int ObParallelMergeCtx::get_major_parallel_ranges(
const int64_t macro_block_cnt = first_major_sstable->get_meta().get_macro_info().get_data_block_ids().count();
const int64_t macro_block_cnt_per_range = (macro_block_cnt + concurrent_cnt_ - 1) / concurrent_cnt_;
ObDatumRowkeyHelper rowkey_helper;
ObDatumRowkey macro_endkey;
ObDatumRowkey multi_version_endkey;
ObDatumRange range;
range.end_key_.set_min_rowkey();
range.set_left_open();
@ -482,10 +436,15 @@ int ObParallelMergeCtx::get_major_parallel_ranges(
blocksstable::ObDataMacroBlockMeta blk_meta;
blocksstable::ObSSTableSecMetaIterator *meta_iter = nullptr;
ObDatumRange query_range;
int64_t schema_rowkey_cnt = first_major_sstable->get_meta().get_schema_rowkey_column_count();
query_range.set_whole_range();
if (OB_FAIL(first_major_sstable->scan_secondary_meta(allocator_, query_range,
index_read_info, DATA_BLOCK_META, meta_iter))) {
STORAGE_LOG(WARN, "Failed to scan secondary meta", KR(ret), KPC(this));
} else if (OB_FAIL(rowkey_helper.reserve(schema_rowkey_cnt + 1))) {
STORAGE_LOG(WARN, "Failed to ", K(ret), K(schema_rowkey_cnt));
} else if (OB_FAIL(multi_version_endkey.assign(rowkey_helper.get_datums(), schema_rowkey_cnt + 1))) {
STORAGE_LOG(WARN, "Failed to assign datums", K(ret), K(schema_rowkey_cnt));
}
// generate ranges
for (int64_t i = 0; OB_SUCC(ret) && i < macro_block_cnt;) {
@ -500,10 +459,23 @@ int ObParallelMergeCtx::get_major_parallel_ranges(
STORAGE_LOG(WARN, "Unexpected invalid macro block meta", KR(ret), K(i - 1));
} else if (OB_FAIL(blk_meta.get_rowkey(macro_endkey))) {
STORAGE_LOG(WARN, "Failed to get rowkey", KR(ret), K(blk_meta));
} else if (OB_UNLIKELY(macro_endkey.datum_cnt_ < schema_rowkey_cnt)) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "Unexpected macro endkey", K(ret), K(macro_endkey));
} else {
ObStorageDatum *datums = const_cast<ObStorageDatum*>(multi_version_endkey.datums_);
for (int64_t i = 0; OB_SUCC(ret) && i < schema_rowkey_cnt; i++) {
datums[i] = macro_endkey.datums_[i];
}
datums[schema_rowkey_cnt].set_max();
multi_version_endkey.datum_cnt_ = schema_rowkey_cnt + 1;
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(range.end_key_.deep_copy(range.start_key_, allocator_))) {
STORAGE_LOG(WARN, "Failed to deep copy rowkey", KR(ret), K(range.get_end_key()), K(range.get_start_key()));
} else if (OB_FAIL(macro_endkey.deep_copy(range.end_key_, allocator_))) {
STORAGE_LOG(WARN, "Failed to deep copy rowkey", KR(ret), K(macro_endkey), K(range.get_end_key()), K(range.get_start_key()));
} else if (OB_FAIL(multi_version_endkey.deep_copy(range.end_key_, allocator_))) {
STORAGE_LOG(WARN, "Failed to deep copy rowkey", KR(ret), K(multi_version_endkey), K(range.get_end_key()), K(range.get_start_key()));
} else if (OB_FAIL(range_array_.push_back(range))) {
STORAGE_LOG(WARN, "Failed to push range", KR(ret), K(range_array_), K(range));
}

View File

@ -111,6 +111,12 @@ def set_parameter(cur, parameter, value, timeout = 0):
cur.execute(sql)
wait_parameter_sync(cur, False, parameter, value, timeout)
def set_tenant_parameter(cur, parameter, value, timeout = 0):
sql = """alter system set {0} = '{1}' tenant = 'all'""".format(parameter, value)
logging.info(sql)
cur.execute(sql)
wait_parameter_sync(cur, True, parameter, value, timeout)
def get_ori_enable_ddl(cur, timeout):
ori_value_str = fetch_ori_enable_ddl(cur)
wait_parameter_sync(cur, False, 'enable_ddl', ori_value_str, timeout)
@ -282,6 +288,17 @@ def do_end_upgrade(cur, timeout):
wait_parameter_sync(cur, False, "enable_upgrade_mode", "False", timeout)
def do_suspend_merge(cur, timeout):
action_sql = "alter system suspend merge tenant = all"
rollback_sql = "alter system resume merge tenant = all"
logging.info(action_sql)
cur.execute(action_sql)
def do_resume_merge(cur, timeout):
action_sql = "alter system resume merge tenant = all"
rollback_sql = "alter system suspend merge tenant = all"
logging.info(action_sql)
cur.execute(action_sql)
class Cursor:
__cursor = None

View File

@ -27,6 +27,9 @@ def do_special_upgrade(conn, cur, timeout, user, passwd):
# when upgrade across version, disable enable_ddl/major_freeze
if current_version != target_version:
actions.set_parameter(cur, 'enable_ddl', 'False', timeout)
actions.set_parameter(cur, 'enable_major_freeze', 'False', timeout)
actions.set_tenant_parameter(cur, '_enable_adaptive_compaction', 'False', timeout)
actions.do_suspend_merge(cur, timeout)
####========******####======== actions begin ========####******========####
return
####========******####========= actions end =========####******========####

View File

@ -401,6 +401,9 @@ def check_cluster_status(query_cur):
(desc, results) = query_cur.exec_query("""select count(1) from CDB_OB_MAJOR_COMPACTION where (GLOBAL_BROADCAST_SCN > LAST_SCN or STATUS != 'IDLE')""")
if results[0][0] > 0 :
fail_list.append('{0} tenant is merging, please check'.format(results[0][0]))
(desc, results) = query_cur.exec_query("""select /*+ query_timeout(1000000000) */ count(1) from __all_virtual_tablet_compaction_info where max_received_scn != finished_scn and max_received_scn > 0""")
if results[0][0] > 0 :
fail_list.append('{0} tablet is merging, please check'.format(results[0][0]))
logging.info('check cluster status success')
# 5. 检查是否有异常租户(creating,延迟删除,恢复中)

View File

@ -119,6 +119,12 @@
# cur.execute(sql)
# wait_parameter_sync(cur, False, parameter, value, timeout)
#
#def set_tenant_parameter(cur, parameter, value, timeout = 0):
# sql = """alter system set {0} = '{1}' tenant = 'all'""".format(parameter, value)
# logging.info(sql)
# cur.execute(sql)
# wait_parameter_sync(cur, True, parameter, value, timeout)
#
#def get_ori_enable_ddl(cur, timeout):
# ori_value_str = fetch_ori_enable_ddl(cur)
# wait_parameter_sync(cur, False, 'enable_ddl', ori_value_str, timeout)
@ -290,6 +296,17 @@
#
# wait_parameter_sync(cur, False, "enable_upgrade_mode", "False", timeout)
#
#def do_suspend_merge(cur, timeout):
# action_sql = "alter system suspend merge tenant = all"
# rollback_sql = "alter system resume merge tenant = all"
# logging.info(action_sql)
# cur.execute(action_sql)
#
#def do_resume_merge(cur, timeout):
# action_sql = "alter system resume merge tenant = all"
# rollback_sql = "alter system suspend merge tenant = all"
# logging.info(action_sql)
# cur.execute(action_sql)
#
#class Cursor:
# __cursor = None
@ -1224,6 +1241,9 @@
# # when upgrade across version, disable enable_ddl/major_freeze
# if current_version != target_version:
# actions.set_parameter(cur, 'enable_ddl', 'False', timeout)
# actions.set_parameter(cur, 'enable_major_freeze', 'False', timeout)
# actions.set_tenant_parameter(cur, '_enable_adaptive_compaction', 'False', timeout)
# actions.do_suspend_merge(cur, timeout)
#####========******####======== actions begin ========####******========####
# return
#####========******####========= actions end =========####******========####
@ -1905,6 +1925,9 @@
# (desc, results) = query_cur.exec_query("""select count(1) from CDB_OB_MAJOR_COMPACTION where (GLOBAL_BROADCAST_SCN > LAST_SCN or STATUS != 'IDLE')""")
# if results[0][0] > 0 :
# fail_list.append('{0} tenant is merging, please check'.format(results[0][0]))
# (desc, results) = query_cur.exec_query("""select /*+ query_timeout(1000000000) */ count(1) from __all_virtual_tablet_compaction_info where max_received_scn != finished_scn and max_received_scn > 0""")
# if results[0][0] > 0 :
# fail_list.append('{0} tablet is merging, please check'.format(results[0][0]))
# logging.info('check cluster status success')
#
## 5. 检查是否有异常租户(creating,延迟删除,恢复中)
@ -2680,6 +2703,8 @@
## 7 打开major freeze
#def enable_major_freeze(cur, timeout):
# actions.set_parameter(cur, 'enable_major_freeze', 'True', timeout)
# actions.set_tenant_parameter(cur, '_enable_adaptive_compaction', 'True', timeout)
# actions.do_resume_merge(cur, timeout)
#
## 开始升级后的检查
#def do_check(conn, cur, query_cur, timeout):

View File

@ -105,6 +105,8 @@ def enable_rereplication(cur, timeout):
# 7 打开major freeze
def enable_major_freeze(cur, timeout):
actions.set_parameter(cur, 'enable_major_freeze', 'True', timeout)
actions.set_tenant_parameter(cur, '_enable_adaptive_compaction', 'True', timeout)
actions.do_resume_merge(cur, timeout)
# 开始升级后的检查
def do_check(conn, cur, query_cur, timeout):

View File

@ -119,6 +119,12 @@
# cur.execute(sql)
# wait_parameter_sync(cur, False, parameter, value, timeout)
#
#def set_tenant_parameter(cur, parameter, value, timeout = 0):
# sql = """alter system set {0} = '{1}' tenant = 'all'""".format(parameter, value)
# logging.info(sql)
# cur.execute(sql)
# wait_parameter_sync(cur, True, parameter, value, timeout)
#
#def get_ori_enable_ddl(cur, timeout):
# ori_value_str = fetch_ori_enable_ddl(cur)
# wait_parameter_sync(cur, False, 'enable_ddl', ori_value_str, timeout)
@ -290,6 +296,17 @@
#
# wait_parameter_sync(cur, False, "enable_upgrade_mode", "False", timeout)
#
#def do_suspend_merge(cur, timeout):
# action_sql = "alter system suspend merge tenant = all"
# rollback_sql = "alter system resume merge tenant = all"
# logging.info(action_sql)
# cur.execute(action_sql)
#
#def do_resume_merge(cur, timeout):
# action_sql = "alter system resume merge tenant = all"
# rollback_sql = "alter system suspend merge tenant = all"
# logging.info(action_sql)
# cur.execute(action_sql)
#
#class Cursor:
# __cursor = None
@ -1224,6 +1241,9 @@
# # when upgrade across version, disable enable_ddl/major_freeze
# if current_version != target_version:
# actions.set_parameter(cur, 'enable_ddl', 'False', timeout)
# actions.set_parameter(cur, 'enable_major_freeze', 'False', timeout)
# actions.set_tenant_parameter(cur, '_enable_adaptive_compaction', 'False', timeout)
# actions.do_suspend_merge(cur, timeout)
#####========******####======== actions begin ========####******========####
# return
#####========******####========= actions end =========####******========####
@ -1905,6 +1925,9 @@
# (desc, results) = query_cur.exec_query("""select count(1) from CDB_OB_MAJOR_COMPACTION where (GLOBAL_BROADCAST_SCN > LAST_SCN or STATUS != 'IDLE')""")
# if results[0][0] > 0 :
# fail_list.append('{0} tenant is merging, please check'.format(results[0][0]))
# (desc, results) = query_cur.exec_query("""select /*+ query_timeout(1000000000) */ count(1) from __all_virtual_tablet_compaction_info where max_received_scn != finished_scn and max_received_scn > 0""")
# if results[0][0] > 0 :
# fail_list.append('{0} tablet is merging, please check'.format(results[0][0]))
# logging.info('check cluster status success')
#
## 5. 检查是否有异常租户(creating,延迟删除,恢复中)
@ -2680,6 +2703,8 @@
## 7 打开major freeze
#def enable_major_freeze(cur, timeout):
# actions.set_parameter(cur, 'enable_major_freeze', 'True', timeout)
# actions.set_tenant_parameter(cur, '_enable_adaptive_compaction', 'True', timeout)
# actions.do_resume_merge(cur, timeout)
#
## 开始升级后的检查
#def do_check(conn, cur, query_cur, timeout):