fix table recovery split task encountered unexpected table type, such as ddl dump table.

This commit is contained in:
SidewinderAK47 2024-11-21 19:16:31 +00:00 committed by ob-robot
parent 1ce2614422
commit 54b292d266
7 changed files with 66 additions and 26 deletions

View File

@ -1393,7 +1393,7 @@ int ObService::prepare_tablet_split_task_ranges(
} else if (OB_UNLIKELY(!arg.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), K(arg));
} else if (OB_FAIL(ObTabletSplitUtil::split_task_ranges(result.rowkey_allocator_, arg.ls_id_,
} else if (OB_FAIL(ObTabletSplitUtil::split_task_ranges(result.rowkey_allocator_, arg.ddl_type_, arg.ls_id_,
arg.tablet_id_, arg.user_parallelism_, arg.schema_tablet_size_, result.parallel_datum_rowkey_list_))) {
LOG_WARN("split task ranges failed", K(ret));
}

View File

@ -2482,6 +2482,7 @@ int ObPartitionSplitTask::prepare_tablet_split_ranges(
arg.tablet_id_ = src_tablet_id;
arg.user_parallelism_ = parallelism_; // parallelism_;
arg.schema_tablet_size_ = std::max(tablet_size_, 128 * 1024 * 1024L/*128MB*/);
arg.ddl_type_ = task_type_;
if (OB_FAIL(root_service_->get_rpc_proxy().to(leader_addr)
.by(tenant_id_).timeout(rpc_timeout).prepare_tablet_split_task_ranges(arg, result))) {
LOG_WARN("prepare tablet split task ranges failed", K(ret), K(arg));

View File

@ -9160,7 +9160,7 @@ int ObDDLBuildSingleReplicaResponseArg::assign(const ObDDLBuildSingleReplicaResp
// === Functions for tablet split start. ===
OB_SERIALIZE_MEMBER(ObPrepareSplitRangesArg, ls_id_, tablet_id_,
user_parallelism_, schema_tablet_size_);
user_parallelism_, schema_tablet_size_, ddl_type_);
OB_DEF_SERIALIZE(ObPrepareSplitRangesRes)
{
int ret = OB_SUCCESS;

View File

@ -10744,19 +10744,21 @@ public:
: ls_id_(),
tablet_id_(),
user_parallelism_(0),
schema_tablet_size_(0)
schema_tablet_size_(0),
ddl_type_(share::ObDDLType::DDL_INVALID)
{}
~ObPrepareSplitRangesArg() {}
bool is_valid() const
{
return ls_id_.is_valid() && tablet_id_.is_valid();
return ls_id_.is_valid() && tablet_id_.is_valid() && share::ObDDLType::DDL_INVALID != ddl_type_;
}
TO_STRING_KV(K(ls_id_), K(tablet_id_), K_(user_parallelism), K_(schema_tablet_size));
TO_STRING_KV(K(ls_id_), K(tablet_id_), K_(user_parallelism), K_(schema_tablet_size), K_(ddl_type));
public:
share::ObLSID ls_id_;
ObTabletID tablet_id_;
int64_t user_parallelism_;
int64_t schema_tablet_size_;
share::ObDDLType ddl_type_;
DISALLOW_COPY_AND_ASSIGN(ObPrepareSplitRangesArg);
};

View File

@ -373,6 +373,7 @@ int ObComplementDataParam::split_task_ranges_remote(
arg.user_parallelism_ = MIN(MIN(MAX(hint_parallelism, 1), MAX_RPC_STREAM_WAIT_THREAD_COUNT),
ObMacroDataSeq::MAX_PARALLEL_IDX + 1);
arg.schema_tablet_size_ = RECOVER_TABLE_PARALLEL_MIN_TASK_SIZE; /*2M*/
arg.ddl_type_ = ObDDLType::DDL_TABLE_RESTORE;
const int64_t rpc_timeout = ObDDLUtil::get_default_ddl_rpc_timeout();
const int64_t retry_interval_us = 200 * 1000; // 200ms
/* recover table partition data complete: dest leader server send rpc to src leader server */

View File

@ -275,7 +275,7 @@ int ObTabletSplitCtx::prepare_index_builder(
LOG_WARN("init twice", K(ret));
} else if (OB_FAIL(index_builder_map_.create(bucket_num, "SplitSstIdxMap"))) {
LOG_WARN("create sstable record map failed", K(ret));
} else if (OB_FAIL(ObTabletSplitUtil::get_participants(param.split_sstable_type_, table_store_iterator_, sstables))) {
} else if (OB_FAIL(ObTabletSplitUtil::get_participants(param.split_sstable_type_, table_store_iterator_, false, sstables))) {
LOG_WARN("get participant sstables failed", K(ret));
} else if (OB_FAIL(tablet_handle_.get_obj()->get_split_data(split_data, ObTabletCommon::DEFAULT_GET_TABLET_DURATION_10_S))) {
LOG_WARN("failed to get split data", K(ret));
@ -395,7 +395,7 @@ int ObTabletSplitDag::create_first_task()
ret = OB_NOT_INIT;
LOG_WARN("not init", K(ret));
} else if (OB_FAIL(ObTabletSplitUtil::get_participants(
param_.split_sstable_type_, context_.table_store_iterator_, source_sstables))) {
param_.split_sstable_type_, context_.table_store_iterator_, false, source_sstables))) {
LOG_WARN("get all sstables failed", K(ret));
} else if (OB_FAIL(alloc_task(prepare_task))) {
LOG_WARN("allocate task failed", K(ret));
@ -1329,7 +1329,8 @@ int ObTabletSplitMergeTask::create_sstable(
&& share::ObSplitSSTableType::SPLIT_MINOR != split_sstable_type)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arg", K(ret), K(split_sstable_type), KPC(param_));
} else if (OB_FAIL(ObTabletSplitUtil::get_participants(split_sstable_type, context_->table_store_iterator_, participants))) {
} else if (OB_FAIL(ObTabletSplitUtil::get_participants(
split_sstable_type, context_->table_store_iterator_, false, participants))) {
LOG_WARN("get participants failed", K(ret));
} else {
ObArenaAllocator tmp_arena("PartSplitSchema");
@ -2352,6 +2353,7 @@ int ObUncommittedRowScan::check_can_skip(const ObDatumRow &row, bool &can_skip)
int ObTabletSplitUtil::get_participants(
const share::ObSplitSSTableType &split_sstable_type,
const ObTableStoreIterator &const_table_store_iter,
const bool is_table_restore,
ObIArray<ObITable *> &participants)
{
int ret = OB_SUCCESS;
@ -2371,26 +2373,40 @@ int ObTabletSplitUtil::get_participants(
} else {
LOG_WARN("get next table failed", K(ret), K(table_store_iter));
}
} else if (OB_UNLIKELY(nullptr == table || !table->is_sstable())) {
} else if (OB_UNLIKELY(nullptr == table)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected err", K(ret), KPC(table));
} else if (table->is_minor_sstable()) {
if (share::ObSplitSSTableType::SPLIT_MAJOR == split_sstable_type) {
// Split with major only.
} else if (OB_FAIL(participants.push_back(table))) {
LOG_WARN("push back failed", K(ret));
} else if (is_table_restore) {
if (table->is_minor_sstable() || table->is_major_sstable()
|| ObITable::TableType::DDL_DUMP_SSTABLE == table->get_table_type()) {
if (OB_FAIL(participants.push_back(table))) {
LOG_WARN("push back major failed", K(ret));
}
} else {
LOG_INFO("skip table", K(ret), KPC(table));
}
} else if (table->is_major_sstable()) {
if (share::ObSplitSSTableType::SPLIT_MINOR == split_sstable_type) {
// Split with minor only.
} else if (OB_FAIL(participants.push_back(table))) {
LOG_WARN("push back major failed", K(ret));
}
} else if (table->is_mds_sstable()) {
// skip
} else {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected err", K(ret), KPC(table));
if (!table->is_sstable()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected err", K(ret), KPC(table));
} else if (table->is_minor_sstable()) {
if (share::ObSplitSSTableType::SPLIT_MAJOR == split_sstable_type) {
// Split with major only.
} else if (OB_FAIL(participants.push_back(table))) {
LOG_WARN("push back failed", K(ret));
}
} else if (table->is_major_sstable()) {
if (share::ObSplitSSTableType::SPLIT_MINOR == split_sstable_type) {
// Split with minor only.
} else if (OB_FAIL(participants.push_back(table))) {
LOG_WARN("push back major failed", K(ret));
}
} else if (table->is_mds_sstable()) {
// skip
} else {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected err", K(ret), KPC(table));
}
}
}
}
@ -2399,6 +2415,7 @@ int ObTabletSplitUtil::get_participants(
int ObTabletSplitUtil::split_task_ranges(
ObIAllocator &allocator,
const share::ObDDLType ddl_type,
const share::ObLSID &ls_id,
const ObTabletID &tablet_id,
const int64_t user_parallelism,
@ -2412,6 +2429,7 @@ int ObTabletSplitUtil::split_task_ranges(
ObTableStoreIterator table_store_iterator;
ObSEArray<ObStoreRange, 32> store_ranges;
ObSEArray<ObITable *, MAX_SSTABLE_CNT_IN_STORAGE> tables;
const bool is_table_restore = ObDDLType::DDL_TABLE_RESTORE == ddl_type;
common::ObArenaAllocator tmp_arena("SplitRange", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID());
if (OB_UNLIKELY(!ls_id.is_valid() || !tablet_id.is_valid())) {
ret = OB_INVALID_ARGUMENT;
@ -2424,8 +2442,20 @@ int ObTabletSplitUtil::split_task_ranges(
} else if (OB_FAIL(tablet_handle.get_obj()->get_all_sstables(table_store_iterator))) {
LOG_WARN("fail to fetch table store", K(ret));
} else if (OB_FAIL(ObTabletSplitUtil::get_participants(
share::ObSplitSSTableType::SPLIT_BOTH, table_store_iterator, tables))) {
share::ObSplitSSTableType::SPLIT_BOTH, table_store_iterator, is_table_restore, tables))) {
LOG_WARN("get participants failed", K(ret));
} else if (is_table_restore && tables.empty()) {
ObDatumRowkey tmp_min_key;
ObDatumRowkey tmp_max_key;
tmp_min_key.set_min_rowkey();
tmp_max_key.set_max_rowkey();
if (OB_FAIL(parallel_datum_rowkey_list.prepare_allocate(2))) { // min key and max key.
LOG_WARN("reserve failed", K(ret), K(ls_id), K(tablet_id));
} else if (OB_FAIL(tmp_min_key.deep_copy(parallel_datum_rowkey_list.at(0), allocator))) {
LOG_WARN("failed to push min rowkey", K(ret));
} else if (OB_FAIL(tmp_max_key.deep_copy(parallel_datum_rowkey_list.at(1), allocator))) {
LOG_WARN("failed to push min rowkey", K(ret));
}
} else {
const ObITableReadInfo &rowkey_read_info = tablet_handle.get_obj()->get_rowkey_read_info();
ObRangeSplitInfo range_info;
@ -2548,6 +2578,9 @@ int ObTabletSplitUtil::check_major_sstables_exist(
return ret;
}
int ObTabletSplitUtil::check_satisfy_split_condition(
const ObLSHandle &ls_handle,
const ObTabletHandle &source_tablet_handle,
@ -2684,7 +2717,8 @@ int ObTabletSplitUtil::check_medium_compaction_info_list_cnt(
result.primary_compaction_scn_ = -1;
} else if (OB_FAIL(tablet->get_all_sstables(table_store_iterator))) {
LOG_WARN("fail to fetch table store", K(ret));
} else if (OB_FAIL(ObTabletSplitUtil::get_participants(ObSplitSSTableType::SPLIT_MAJOR, table_store_iterator, major_tables))) {
} else if (OB_FAIL(ObTabletSplitUtil::get_participants(
ObSplitSSTableType::SPLIT_MAJOR, table_store_iterator, false, major_tables))) {
LOG_WARN("get participant sstables failed", K(ret));
} else {
result.info_list_cnt_ = 0;

View File

@ -410,9 +410,11 @@ public:
static int get_participants(
const share::ObSplitSSTableType &split_sstable_type,
const ObTableStoreIterator &table_store_iterator,
const bool is_table_restore,
ObIArray<ObITable *> &participants);
static int split_task_ranges(
ObIAllocator &allocator,
const share::ObDDLType ddl_type,
const share::ObLSID &ls_id,
const ObTabletID &tablet_id,
const int64_t user_parallelism,