[CP] fix data split not wait clog replay.

This commit is contained in:
obdev 2024-10-31 09:17:18 +00:00 committed by ob-robot
parent a9d86c2315
commit 60874e5110
14 changed files with 132 additions and 56 deletions

View File

@ -1351,13 +1351,14 @@ int ObService::check_memtable_cnt(
result.memtable_cnt_ = memtable_handles.count();
freeze_finished = result.memtable_cnt_ == 0 ? true : false;
if (freeze_finished) {
share::SCN unused_scn;
ObTabletFreezeLog freeze_log;
freeze_log.tablet_id_ = tablet_id;
if (OB_FAIL(storage::ObDDLRedoLogWriter::
write_auto_split_log(ls_id,
ObDDLClogType::DDL_TABLET_FREEZE_LOG,
logservice::ObReplayBarrierType::STRICT_BARRIER,
freeze_log))) {
freeze_log, unused_scn))) {
LOG_WARN("write tablet freeze log failed", K(ret), K(freeze_log));
}
}
@ -2843,9 +2844,17 @@ int ObService::build_split_tablet_data_start_request(const obrpc::ObTabletSplitS
LOG_WARN("invalid arg", K(ret), K(arg));
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < arg.split_info_array_.count(); i++) {
share::SCN start_scn;
const ObTabletSplitArg &each_arg = arg.split_info_array_.at(i);
if (OB_FAIL(ObTabletLobSplitUtil::process_write_split_start_log_request(each_arg))) {
if (OB_FAIL(ObTabletLobSplitUtil::process_write_split_start_log_request(each_arg, start_scn))) {
LOG_WARN("process write split start log failed", K(ret), K(tmp_ret), K(arg));
} else if (0 == i) {
if (!start_scn.is_valid_and_not_min()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected start scn", K(ret), K(start_scn));
} else {
res.min_split_start_scn_ = start_scn;
}
}
if (OB_TMP_FAIL(res.ret_codes_.push_back(ret))) {
LOG_WARN("push back result failed", K(ret), K(tmp_ret));

View File

@ -162,6 +162,7 @@ int ObDDLReplicaBuildExecutor::build(const ObDDLReplicaBuildExecutorParam &param
execution_id_ = param.execution_id_;
data_format_version_ = param.data_format_version_;
consumer_group_id_ = param.consumer_group_id_;
min_split_start_scn_ = param.min_split_start_scn_;
ObArray<ObSingleReplicaBuildCtx> replica_build_ctxs;
if (OB_FAIL(construct_replica_build_ctxs(param, replica_build_ctxs))) {
LOG_WARN("failed to construct replica build ctxs", K(ret));
@ -467,6 +468,7 @@ int ObDDLReplicaBuildExecutor::construct_rpc_arg(
arg.consumer_group_id_ = consumer_group_id_;
arg.compaction_scn_ = replica_build_ctx.compaction_scn_;
arg.can_reuse_macro_block_ = replica_build_ctx.can_reuse_macro_block_;
arg.min_split_start_scn_ = min_split_start_scn_;
if (OB_FAIL(arg.lob_col_idxs_.assign(lob_col_idxs_))) {
LOG_WARN("failed to assign to lob col idxs", K(ret));
} else if (OB_FAIL(arg.parallel_datum_rowkey_list_.assign(replica_build_ctx.parallel_datum_rowkey_list_))) {

View File

@ -45,7 +45,8 @@ public:
compaction_scns_(),
lob_col_idxs_(),
can_reuse_macro_blocks_(),
parallel_datum_rowkey_list_()
parallel_datum_rowkey_list_(),
min_split_start_scn_()
{}
~ObDDLReplicaBuildExecutorParam () = default;
bool is_valid() const {
@ -65,7 +66,8 @@ public:
consumer_group_id_ >= 0;
if (is_tablet_split(ddl_type_)) {
is_valid = is_valid && compaction_scns_.count() == source_tablet_ids_.count()
&& can_reuse_macro_blocks_.count() == source_tablet_ids_.count();
&& can_reuse_macro_blocks_.count() == source_tablet_ids_.count()
&& min_split_start_scn_.is_valid_and_not_min();
} else {
is_valid = (is_valid && compaction_scns_.count() == 0);
}
@ -76,7 +78,7 @@ public:
K_(source_schema_versions), K_(dest_schema_versions), K_(snapshot_version),
K_(task_id), K_(parallelism), K_(execution_id),
K_(data_format_version), K_(consumer_group_id), K_(can_reuse_macro_blocks),
K_(parallel_datum_rowkey_list));
K_(parallel_datum_rowkey_list), K(min_split_start_scn_));
public:
uint64_t tenant_id_;
uint64_t dest_tenant_id_;
@ -97,6 +99,7 @@ public:
ObSArray<uint64_t> lob_col_idxs_;
ObSArray<bool> can_reuse_macro_blocks_;
common::ObSEArray<common::ObSEArray<blocksstable::ObDatumRowkey, 8>, 8> parallel_datum_rowkey_list_;
share::SCN min_split_start_scn_;
};
enum class ObReplicaBuildStat
@ -197,6 +200,7 @@ public:
src_tablet_ids_(),
dest_tablet_ids_(),
replica_build_ctxs_(),
min_split_start_scn_(),
lock_()
{}
~ObDDLReplicaBuildExecutor() = default;
@ -214,7 +218,7 @@ public:
K(ddl_task_id_), K(snapshot_version_), K(parallelism_),
K(execution_id_), K(data_format_version_), K(consumer_group_id_),
K(lob_col_idxs_), K(src_tablet_ids_), K(dest_tablet_ids_),
K(replica_build_ctxs_));
K(replica_build_ctxs_), K(min_split_start_scn_));
private:
int schedule_task();
int process_rpc_results(
@ -268,6 +272,7 @@ private:
ObArray<ObTabletID> src_tablet_ids_;
ObSArray<ObTabletID> dest_tablet_ids_;
ObArray<ObSingleReplicaBuildCtx> replica_build_ctxs_; // NOTE hold lock before access
share::SCN min_split_start_scn_;
ObSpinLock lock_; // NOTE keep rpc send out of lock scope
};

View File

@ -93,8 +93,8 @@ ObPartitionSplitTask::ObPartitionSplitTask()
wait_trans_ctx_(),
tablet_size_(0),
data_tablet_parallel_rowkey_list_(),
index_tablet_parallel_rowkey_list_()
index_tablet_parallel_rowkey_list_(),
min_split_start_scn_()
{
ObMemAttr attr(OB_SERVER_TENANT_ID, "RSSplitRange", ObCtxIds::DEFAULT_CTX_ID);
data_tablet_parallel_rowkey_list_.set_attr(attr);
@ -980,6 +980,7 @@ int ObPartitionSplitTask::send_split_request(
param.execution_id_ = execution_id_;
param.data_format_version_ = data_format_version_;
param.consumer_group_id_ = partition_split_arg_.consumer_group_id_;
param.min_split_start_scn_ = min_split_start_scn_;
if (OB_ISNULL(root_service_)) {
ret = OB_ERR_SYS;
LOG_WARN("error sys", K(ret));
@ -1525,7 +1526,17 @@ int ObPartitionSplitTask::send_split_rpc(
if (OB_UNLIKELY(resp_ret_codes.count() > target_split_info_array.count())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected err", K(ret), K(target_split_info_array), K(resp_ret_codes));
} else {
} else if (is_split_start && !min_split_start_scn_.is_valid_and_not_min()) {
if (OB_UNLIKELY(!start_result.min_split_start_scn_.is_valid_and_not_min())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected split start scn", K(ret), K(start_result));
} else if (FALSE_IT(min_split_start_scn_ = start_result.min_split_start_scn_)) {
} else if (OB_FAIL(update_task_message())) {
LOG_WARN("update split start scn failed", K(ret));
}
}
if (OB_SUCC(ret)) {
TCWLockGuard guard(lock_);
for (int64_t i = 0; OB_SUCC(ret) && i < resp_ret_codes.count(); i++) {
const ObTabletID &tablet_id = target_split_info_array.at(i).source_tablet_id_;
@ -1861,7 +1872,7 @@ int ObPartitionSplitTask::serialize_params_to_message(
} else {
LST_DO_CODE(OB_UNIS_ENCODE, all_src_tablet_ids_, data_tablet_compaction_scn_,
index_tablet_compaction_scns_, lob_tablet_compaction_scns_, partition_split_arg_,
tablet_size_, data_tablet_parallel_rowkey_list_, index_tablet_parallel_rowkey_list_);
tablet_size_, data_tablet_parallel_rowkey_list_, index_tablet_parallel_rowkey_list_, min_split_start_scn_);
}
return ret;
}
@ -1901,6 +1912,9 @@ int ObPartitionSplitTask::deserialize_params_from_message(
}
LOG_TRACE("parallel datum rowkey info", K(ret), K(data_tablet_parallel_rowkey_list_), K(index_tablet_parallel_rowkey_list_));
}
if (OB_SUCC(ret)) {
LST_DO_CODE(OB_UNIS_DECODE, min_split_start_scn_);
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(ObDDLUtil::replace_user_tenant_id(tenant_id, tmp_arg))) {
@ -1921,7 +1935,7 @@ int64_t ObPartitionSplitTask::get_serialize_param_size() const
int len = ObDDLTask::get_serialize_param_size();
LST_DO_CODE(OB_UNIS_ADD_LEN, all_src_tablet_ids_, data_tablet_compaction_scn_,
index_tablet_compaction_scns_, lob_tablet_compaction_scns_, partition_split_arg_,
tablet_size_, data_tablet_parallel_rowkey_list_, index_tablet_parallel_rowkey_list_);
tablet_size_, data_tablet_parallel_rowkey_list_, index_tablet_parallel_rowkey_list_, min_split_start_scn_);
return len;
}
@ -2572,6 +2586,7 @@ int ObPartitionSplitTask::prepare_tablet_split_infos(
split_info.consumer_group_id_ = partition_split_arg_.consumer_group_id_;
split_info.can_reuse_macro_block_ = can_reuse_macro_blocks.at(i);
split_info.split_sstable_type_ = share::ObSplitSSTableType::SPLIT_BOTH;
split_info.min_split_start_scn_ = min_split_start_scn_;
const ObIArray<blocksstable::ObDatumRowkey> &parallel_datum_rowkey_list = i > 0 && i < lob_tablet_start_idx ?
index_tablet_parallel_rowkey_list_.at(i - 1) : data_tablet_parallel_rowkey_list_;
int64_t index = 0;

View File

@ -128,7 +128,8 @@ public:
K(data_tablet_compaction_scn_), K(index_tablet_compaction_scns_),
K(lob_tablet_compaction_scns_), K(freeze_progress_status_inited_),
K(compact_progress_status_inited_), K(write_split_log_status_inited_),
K_(data_tablet_parallel_rowkey_list), K_(index_tablet_parallel_rowkey_list));
K_(data_tablet_parallel_rowkey_list), K_(index_tablet_parallel_rowkey_list),
K_(min_split_start_scn));
protected:
virtual void clear_old_status_context() override;
private:
@ -250,6 +251,7 @@ private:
int64_t tablet_size_;
common::ObSEArray<blocksstable::ObDatumRowkey, 8> data_tablet_parallel_rowkey_list_; // data table
common::ObSEArray<common::ObSEArray<blocksstable::ObDatumRowkey, 8>, 8> index_tablet_parallel_rowkey_list_; // index table.
share::SCN min_split_start_scn_;
};
} // end namespace rootserver

View File

@ -10551,7 +10551,6 @@ public:
compaction_scn_(0), can_reuse_macro_block_(false), split_sstable_type_(share::ObSplitSSTableType::SPLIT_BOTH),
lob_col_idxs_(), parallel_datum_rowkey_list_(), is_no_logging_(false),
min_split_start_scn_()
{}
bool is_valid() const;
int assign(const ObDDLBuildSingleReplicaRequestArg &other);

View File

@ -1000,9 +1000,11 @@ int ObDDLRedoLogWriter::write_auto_split_log(
const share::ObLSID &ls_id,
const ObDDLClogType &clog_type,
const ObReplayBarrierType &replay_barrier_type,
const T &log)
const T &log,
SCN &scn)
{
int ret = OB_SUCCESS;
scn = SCN::min_scn();
ObArenaAllocator tmp_arena("SplitLogBuf", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID());
logservice::ObLogBaseHeader base_header(logservice::ObLogBaseType::DDL_LOG_BASE_TYPE,
replay_barrier_type);
@ -1016,7 +1018,6 @@ int ObDDLRedoLogWriter::write_auto_split_log(
palf::LSN lsn;
const bool need_nonblock= false;
SCN scn = SCN::min_scn();
ObLSHandle ls_handle;
ObLS *ls = nullptr;
logservice::ObLogHandler *log_handler = nullptr;
@ -1105,15 +1106,18 @@ int ObDDLRedoLogWriter::write_auto_split_log(
template int ObDDLRedoLogWriter::write_auto_split_log(const share::ObLSID &ls_id,
const ObDDLClogType &clog_type,
const ObReplayBarrierType &replay_barrier_type,
const ObTabletSplitStartLog &log);
const ObTabletSplitStartLog &log,
SCN &scn);
template int ObDDLRedoLogWriter::write_auto_split_log(const share::ObLSID &ls_id,
const ObDDLClogType &clog_type,
const ObReplayBarrierType &replay_barrier_type,
const ObTabletSplitFinishLog &log);
const ObTabletSplitFinishLog &log,
SCN &scn);
template int ObDDLRedoLogWriter::write_auto_split_log(const share::ObLSID &ls_id,
const ObDDLClogType &clog_type,
const ObReplayBarrierType &replay_barrier_type,
const ObTabletFreezeLog &log);
const ObTabletFreezeLog &log,
SCN &scn);
bool ObDDLRedoLogWriter::need_retry(int ret_code)
{

View File

@ -321,7 +321,8 @@ public:
static int write_auto_split_log(const share::ObLSID &ls_id,
const ObDDLClogType &clog_type,
const logservice::ObReplayBarrierType &replay_barrier_type,
const T &log);
const T &log,
SCN &scn);
int write_commit_log_with_retry(
const bool allow_remote_write,
const ObITable::TableKey &table_key,

View File

@ -805,13 +805,15 @@ int ObSplitStartReplayExecutor::init(
int ObSplitStartReplayExecutor::prepare_param_from_log(
const share::ObLSID &ls_id,
const ObTabletHandle &handle,
const ObTabletSplitInfo &info,
const share::SCN &scn,
ObTabletSplitParam &param)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!ls_id.is_valid() || !info.is_valid())) {
if (OB_UNLIKELY(!ls_id.is_valid() || !handle.is_valid() || !info.is_valid() || !scn.is_valid_and_not_min())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arg", K(ret), K(ls_id), K(info));
LOG_WARN("invalid arg", K(ret), K(ls_id), K(handle), K(info), K(scn));
} else {
param.tenant_id_ = MTL_ID();
param.ls_id_ = ls_id;
@ -825,10 +827,10 @@ int ObSplitStartReplayExecutor::prepare_param_from_log(
param.can_reuse_macro_block_ = info.can_reuse_macro_block_;
param.split_sstable_type_ = info.split_sstable_type_;
param.user_parallelism_ = info.parallel_datum_rowkey_list_.count() - 1;
param.compat_mode_ = handle.get_obj()->get_tablet_meta().compat_mode_;
param.min_split_start_scn_ = scn;
// skip lob_col_idxs.
if (OB_FAIL(ObCompatModeGetter::get_table_compat_mode(param.tenant_id_, param.table_id_, param.compat_mode_))) {
LOG_WARN("failed to get compat mode", K(ret));
} else if (OB_FAIL(param.parallel_datum_rowkey_list_.assign(info.parallel_datum_rowkey_list_))) {
if (OB_FAIL(param.parallel_datum_rowkey_list_.assign(info.parallel_datum_rowkey_list_))) {
LOG_WARN("assign failed", K(ret));
} else if (OB_FAIL(param.dest_tablets_id_.assign(info.dest_tablets_id_))) {
LOG_WARN("assign failed", K(ret), K(info));
@ -839,13 +841,15 @@ int ObSplitStartReplayExecutor::prepare_param_from_log(
int ObSplitStartReplayExecutor::prepare_param_from_log(
const share::ObLSID &ls_id,
const ObTabletHandle &handle,
const ObTabletSplitInfo &info,
const share::SCN &scn,
ObLobSplitParam &param)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!ls_id.is_valid() || !info.is_valid())) {
if (OB_UNLIKELY(!ls_id.is_valid() || !handle.is_valid() || !info.is_valid() || !scn.is_valid_and_not_min())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arg", K(ret), K(ls_id), K(info));
LOG_WARN("invalid arg", K(ret), K(ls_id), K(handle), K(info), K(scn));
} else {
param.tenant_id_ = MTL_ID();
param.ls_id_ = ls_id;
@ -860,9 +864,9 @@ int ObSplitStartReplayExecutor::prepare_param_from_log(
// skip can_reuse_macro_block
param.split_sstable_type_ = info.split_sstable_type_;
param.parallelism_ = info.parallel_datum_rowkey_list_.count() - 1;
if (OB_FAIL(ObCompatModeGetter::get_table_compat_mode(param.tenant_id_, param.source_table_id_, param.compat_mode_))) {
LOG_WARN("failed to get compat mode", K(ret));
} else if (OB_FAIL(param.lob_col_idxs_.assign(info.lob_col_idxs_))) {
param.compat_mode_ = handle.get_obj()->get_tablet_meta().compat_mode_;
param.min_split_start_scn_ = scn;
if (OB_FAIL(param.lob_col_idxs_.assign(info.lob_col_idxs_))) {
LOG_WARN("assign failed", K(ret));
} else if (OB_FAIL(param.parallel_datum_rowkey_list_.assign(info.parallel_datum_rowkey_list_))) {
LOG_WARN("assign failed", K(ret));
@ -877,7 +881,6 @@ int ObSplitStartReplayExecutor::prepare_param_from_log(
int ObSplitStartReplayExecutor::do_replay_(ObTabletHandle &handle)
{
int ret = OB_SUCCESS;
UNUSED(handle);
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("ObDDLRedoLogReplayer has not been inited", K(ret));
@ -887,7 +890,7 @@ int ObSplitStartReplayExecutor::do_replay_(ObTabletHandle &handle)
} else if (log_->basic_info_.lob_col_idxs_.count() > 0) {
// lob tablet.
ObLobSplitParam param;
if (OB_FAIL(ObSplitStartReplayExecutor::prepare_param_from_log(ls_->get_ls_id(), log_->basic_info_, param))) {
if (OB_FAIL(ObSplitStartReplayExecutor::prepare_param_from_log(ls_->get_ls_id(), handle, log_->basic_info_, scn_, param))) {
LOG_WARN("prepare lob split param failed", K(ret), KPC(log_));
} else if (OB_FAIL(compaction::ObScheduleDagFunc::schedule_lob_tablet_split_dag(param))) {
LOG_WARN("schedule tablet split dag failed, but ignore to wait", K(ret), K(param));
@ -897,7 +900,7 @@ int ObSplitStartReplayExecutor::do_replay_(ObTabletHandle &handle)
}
} else {
ObTabletSplitParam param;
if (OB_FAIL(ObSplitStartReplayExecutor::prepare_param_from_log(ls_->get_ls_id(), log_->basic_info_, param))) {
if (OB_FAIL(ObSplitStartReplayExecutor::prepare_param_from_log(ls_->get_ls_id(), handle, log_->basic_info_, scn_, param))) {
LOG_WARN("prepare tablet split param failed", K(ret), KPC(log_));
} else if (OB_FAIL(compaction::ObScheduleDagFunc::schedule_tablet_split_dag(param))) {
LOG_WARN("schedule tablet split dag failed, but ignore to wait", K(ret), K(param));
@ -943,7 +946,6 @@ int ObSplitFinishReplayExecutor::init(
int ObSplitFinishReplayExecutor::do_replay_(ObTabletHandle &handle)
{
UNUSED(handle);
int ret = OB_SUCCESS;
ObLSID ls_id;
ObLobSplitParam lob_split_param;
@ -964,10 +966,10 @@ int ObSplitFinishReplayExecutor::do_replay_(ObTabletHandle &handle)
} else if (OB_FALSE_IT(is_lob_tablet = log_->basic_info_.lob_col_idxs_.count() > 0)) {
} else if (OB_FALSE_IT(ls_id = ls_->get_ls_id())) {
} else if (is_lob_tablet &&
OB_FAIL(ObSplitStartReplayExecutor::prepare_param_from_log(ls_id, log_->basic_info_, lob_split_param))) {
OB_FAIL(ObSplitStartReplayExecutor::prepare_param_from_log(ls_id, handle, log_->basic_info_, scn_, lob_split_param))) {
LOG_WARN("prepare lob split param failed", K(ret), K(ls_id), KPC(log_));
} else if (!is_lob_tablet &&
OB_FAIL(ObSplitStartReplayExecutor::prepare_param_from_log(ls_id, log_->basic_info_, data_split_param))) {
OB_FAIL(ObSplitStartReplayExecutor::prepare_param_from_log(ls_id, handle, log_->basic_info_, scn_, data_split_param))) {
LOG_WARN("prepare tablet split param failed", K(ret), K(ls_id), KPC(log_));
} else {
if (is_lob_tablet && OB_FAIL(compaction::ObScheduleDagFunc::schedule_lob_tablet_split_dag(lob_split_param))) {

View File

@ -248,11 +248,15 @@ public:
const share::SCN &scn);
static int prepare_param_from_log(
const share::ObLSID &ls_id,
const ObTabletHandle &handle,
const ObTabletSplitInfo &info,
const share::SCN &scn,
ObTabletSplitParam &param);
static int prepare_param_from_log(
const share::ObLSID &ls_id,
const ObTabletHandle &handle,
const ObTabletSplitInfo &info,
const share::SCN &scn,
ObLobSplitParam &param);
protected:

View File

@ -108,6 +108,7 @@ int ObLobSplitParam::assign(const ObLobSplitParam &other)
dest_schema_id_ = other.dest_schema_id_;
consumer_group_id_ = other.consumer_group_id_;
split_sstable_type_ = other.split_sstable_type_;
min_split_start_scn_ = other.min_split_start_scn_;
if (OB_FAIL(new_lob_tablet_ids_.assign(other.new_lob_tablet_ids_))) {
LOG_WARN("failed to assign new_lob_tablet_ids_", K(ret));
} else if (OB_FAIL(lob_col_idxs_.assign(other.lob_col_idxs_))) {
@ -154,6 +155,7 @@ int ObLobSplitParam::init(const obrpc::ObDDLBuildSingleReplicaRequestArg &arg)
split_sstable_type_ = arg.split_sstable_type_;
parallelism_ = arg.parallel_datum_rowkey_list_.count() - 1;
compaction_scn_ = arg.compaction_scn_;
min_split_start_scn_ = arg.min_split_start_scn_;
if (OB_FAIL(parallel_datum_rowkey_list_.assign(arg.parallel_datum_rowkey_list_))) { // shallow cpy.
LOG_WARN("assign failed", K(ret), "parall_info", arg.parallel_datum_rowkey_list_);
} else if (OB_FAIL(ObTabletSplitUtil::get_split_dest_tablets_info(ls_id_, ori_lob_meta_tablet_id_, new_lob_tablet_ids_, compat_mode_))) {
@ -184,6 +186,7 @@ int ObLobSplitParam::init(const obrpc::ObTabletSplitArg &arg)
split_sstable_type_ = arg.split_sstable_type_;
parallelism_ = arg.parallel_datum_rowkey_list_.count() - 1;
compaction_scn_ = arg.compaction_scn_;
min_split_start_scn_ = arg.min_split_start_scn_;
ObArray<ObTabletID> unused_tablet_ids;
if (OB_FAIL(ObTabletSplitUtil::get_split_dest_tablets_info(ls_id_, ori_lob_meta_tablet_id_, unused_tablet_ids, compat_mode_))) {
LOG_WARN("get split dest tablets failed", K(ret), K(arg));
@ -214,7 +217,8 @@ int ObLobSplitContext::init(const ObLobSplitParam& param)
lob_meta_tablet_handle_,
ObMDSGetTabletMode::READ_WITHOUT_CHECK))) {
LOG_WARN("get tablet handle failed", K(ret), K(param));
} else if (OB_FAIL(ObTabletSplitUtil::check_satisfy_split_condition(param.new_lob_tablet_ids_, param.compaction_scn_, lob_meta_tablet_handle_, ls_handle_))) {
} else if (OB_FAIL(ObTabletSplitUtil::check_satisfy_split_condition(
ls_handle_, lob_meta_tablet_handle_, param.new_lob_tablet_ids_, param.compaction_scn_, param.min_split_start_scn_))) {
if (OB_NEED_RETRY == ret) {
if (REACH_COUNT_INTERVAL(1000L)) {
LOG_WARN("wait to satisfy the data split condition", K(ret), K(param));
@ -1976,7 +1980,8 @@ int ObTabletLobSplitUtil::generate_col_param(const ObMergeSchema *schema,
}
int ObTabletLobSplitUtil::process_write_split_start_log_request(
const ObTabletSplitArg &arg)
const ObTabletSplitArg &arg,
share::SCN &scn)
{
int ret = OB_SUCCESS;
ObLobSplitParam lob_split_param;
@ -1990,13 +1995,13 @@ int ObTabletLobSplitUtil::process_write_split_start_log_request(
} else if (is_lob_tablet) {
if (OB_FAIL(lob_split_param.init(arg))) {
LOG_WARN("init param failed", K(ret));
} else if (OB_FAIL(ObTabletLobSplitUtil::write_split_log(is_lob_tablet, is_start_request, ls_id, &lob_split_param))) {
} else if (OB_FAIL(ObTabletLobSplitUtil::write_split_log(is_lob_tablet, is_start_request, ls_id, &lob_split_param, scn))) {
LOG_WARN("write split log failed", K(ret));
}
} else {
if (OB_FAIL(data_split_param.init(arg))) {
LOG_WARN("init param failed", K(ret));
} else if (OB_FAIL(ObTabletLobSplitUtil::write_split_log(is_lob_tablet, is_start_request, ls_id, &data_split_param))) {
} else if (OB_FAIL(ObTabletLobSplitUtil::write_split_log(is_lob_tablet, is_start_request, ls_id, &data_split_param, scn))) {
LOG_WARN("write split log failed", K(ret));
}
}
@ -2049,7 +2054,8 @@ int ObTabletLobSplitUtil::process_tablet_split_request(
}
if (OB_SUCC(ret) && !is_start_request) {
if (OB_FAIL(ObTabletLobSplitUtil::write_split_log(is_lob_tablet, is_start_request, ls_id, dag_param))) {
share::SCN unused_finish_scn = SCN::min_scn();
if (OB_FAIL(ObTabletLobSplitUtil::write_split_log(is_lob_tablet, is_start_request, ls_id, dag_param, unused_finish_scn))) {
LOG_WARN("write split log failed", K(ret));
}
}
@ -2104,7 +2110,8 @@ int ObTabletLobSplitUtil::write_split_log(
const bool is_lob_tablet,
const bool is_start_request,
const share::ObLSID &ls_id,
const share::ObIDagInitParam *input_param)
const share::ObIDagInitParam *input_param,
SCN &scn)
{
int ret = OB_SUCCESS;
ObTabletSplitStartLog split_start_log;
@ -2166,13 +2173,13 @@ int ObTabletLobSplitUtil::write_split_log(
if (is_start_request) {
if (OB_FAIL(ObDDLRedoLogWriter::write_auto_split_log(
ls_id, ObDDLClogType::DDL_TABLET_SPLIT_START_LOG,
logservice::ObReplayBarrierType::PRE_BARRIER, split_start_log))) {
logservice::ObReplayBarrierType::PRE_BARRIER, split_start_log, scn))) {
LOG_WARN("write tablet split start log failed", K(ret), K(split_start_log));
}
} else {
if (OB_FAIL(ObDDLRedoLogWriter::write_auto_split_log(
ls_id, ObDDLClogType::DDL_TABLET_SPLIT_FINISH_LOG,
logservice::ObReplayBarrierType::STRICT_BARRIER, split_finish_log))) {
logservice::ObReplayBarrierType::STRICT_BARRIER, split_finish_log, scn))) {
LOG_WARN("write tablet split finish log failed", K(ret), K(split_finish_log));
}
}

View File

@ -92,7 +92,8 @@ public:
data_format_version_(0), parallelism_(0), compaction_scn_(),
compat_mode_(lib::Worker::CompatMode::INVALID), task_id_(0),
source_table_id_(OB_INVALID_ID), dest_schema_id_(OB_INVALID_ID), consumer_group_id_(0),
lob_col_idxs_(), split_sstable_type_(share::ObSplitSSTableType::SPLIT_BOTH)
lob_col_idxs_(), split_sstable_type_(share::ObSplitSSTableType::SPLIT_BOTH), parallel_datum_rowkey_list_(),
min_split_start_scn_()
{}
virtual ~ObLobSplitParam();
int init(const ObLobSplitParam &other);
@ -111,7 +112,8 @@ public:
K_(new_lob_tablet_ids), K_(schema_version), K_(data_format_version),
K_(parallelism), K_(compaction_scn), K_(compat_mode), K_(task_id),
K_(source_table_id), K_(dest_schema_id), K_(lob_col_idxs), K_(consumer_group_id),
K_(lob_col_idxs), K_(split_sstable_type), K_(parallel_datum_rowkey_list));
K_(lob_col_idxs), K_(split_sstable_type), K_(parallel_datum_rowkey_list),
K_(min_split_start_scn));
private:
common::ObArenaAllocator rowkey_allocator_; // for DatumRowkey.
public:
@ -131,6 +133,7 @@ public:
ObSArray<uint64_t> lob_col_idxs_;
share::ObSplitSSTableType split_sstable_type_;
common::ObSArray<blocksstable::ObDatumRowkey> parallel_datum_rowkey_list_; // calc by main table.
share::SCN min_split_start_scn_;
};
struct ObLobSplitContext final
@ -359,7 +362,8 @@ public:
int64_t& rk_cnt);
static int process_write_split_start_log_request(
const ObTabletSplitArg &arg);
const ObTabletSplitArg &arg,
share::SCN &scn);
static int process_tablet_split_request(
const bool is_lob_tablet,
const bool is_start_request,
@ -369,7 +373,8 @@ public:
const bool is_lob_tablet,
const bool is_start_request,
const share::ObLSID &ls_id,
const share::ObIDagInitParam *input_param);
const share::ObIDagInitParam *input_param,
share::SCN &scn);
};
class ObSingleRowIterWrapper: public ObIStoreRowIterator

View File

@ -44,7 +44,7 @@ ObTabletSplitParam::ObTabletSplitParam()
dest_tablets_id_(), compaction_scn_(0), user_parallelism_(0),
compat_mode_(lib::Worker::CompatMode::INVALID), data_format_version_(0), consumer_group_id_(0),
can_reuse_macro_block_(false), split_sstable_type_(share::ObSplitSSTableType::SPLIT_BOTH),
parallel_datum_rowkey_list_()
parallel_datum_rowkey_list_(), min_split_start_scn_()
{
}
@ -99,6 +99,7 @@ int ObTabletSplitParam::init(
consumer_group_id_ = param.consumer_group_id_;
split_sstable_type_ = param.split_sstable_type_;
can_reuse_macro_block_ = param.can_reuse_macro_block_;
min_split_start_scn_ = param.min_split_start_scn_;
lib::ob_sort(dest_tablets_id_.begin(), dest_tablets_id_.end());
is_inited_ = true;
}
@ -124,6 +125,7 @@ int ObTabletSplitParam::init(const obrpc::ObDDLBuildSingleReplicaRequestArg &arg
consumer_group_id_ = arg.consumer_group_id_;
split_sstable_type_ = arg.split_sstable_type_;
can_reuse_macro_block_ = arg.can_reuse_macro_block_;
min_split_start_scn_ = arg.min_split_start_scn_;
if (OB_FAIL(parallel_datum_rowkey_list_.assign(arg.parallel_datum_rowkey_list_))) { // shallow cpy.
LOG_WARN("convert to range failed", K(ret), "parall_info", arg.parallel_datum_rowkey_list_);
} else if (OB_FAIL(ObTabletSplitUtil::get_split_dest_tablets_info(ls_id_, source_tablet_id_, dest_tablets_id_, compat_mode_))) {
@ -152,6 +154,7 @@ int ObTabletSplitParam::init(const obrpc::ObTabletSplitArg &arg)
consumer_group_id_ = arg.consumer_group_id_;
split_sstable_type_ = arg.split_sstable_type_;
can_reuse_macro_block_ = arg.can_reuse_macro_block_;
min_split_start_scn_ = arg.min_split_start_scn_;
ObArray<ObTabletID> unused_tablet_ids;
if (OB_FAIL(ObTabletSplitUtil::get_split_dest_tablets_info(ls_id_, source_tablet_id_, unused_tablet_ids, compat_mode_))) {
LOG_WARN("get split dest tablets failed", K(ret), K(arg));
@ -228,7 +231,7 @@ int ObTabletSplitCtx::init(const ObTabletSplitParam &param)
} else if (OB_FAIL(ObDDLUtil::ddl_get_tablet(ls_handle_,
param.source_tablet_id_, tablet_handle_, ObMDSGetTabletMode::READ_ALL_COMMITED))) {
LOG_WARN("get tablet failed", K(ret));
} else if (OB_FAIL(ObTabletSplitUtil::check_satisfy_split_condition(param.dest_tablets_id_, param.compaction_scn_, tablet_handle_, ls_handle_))) {
} else if (OB_FAIL(ObTabletSplitUtil::check_satisfy_split_condition(ls_handle_, tablet_handle_, param.dest_tablets_id_, param.compaction_scn_, param.min_split_start_scn_))) {
if (OB_NEED_RETRY == ret) {
if (REACH_COUNT_INTERVAL(1000L)) {
LOG_WARN("wait to satisfy the data split condition", K(ret), K(param));
@ -2488,10 +2491,11 @@ int ObTabletSplitUtil::check_major_sstables_exist(
}
int ObTabletSplitUtil::check_satisfy_split_condition(
const ObLSHandle &ls_handle,
const ObTabletHandle &source_tablet_handle,
const ObArray<ObTabletID> &dest_tablets_id,
const int64_t compaction_scn,
const ObTabletHandle &source_tablet_handle,
const ObLSHandle &ls_handle)
const share::SCN &min_split_start_scn)
{
int ret = OB_SUCCESS;
UNUSED(compaction_scn);
@ -2500,9 +2504,11 @@ int ObTabletSplitUtil::check_satisfy_split_condition(
ObTablet *tablet = nullptr;
ObTabletMemberWrapper<ObTabletTableStore> table_store_wrapper;
bool is_tablet_status_need_to_split = false;
if (OB_UNLIKELY(!source_tablet_handle.is_valid())) {
share::SCN max_decided_scn;
if (OB_UNLIKELY(!ls_handle.is_valid() || !source_tablet_handle.is_valid()
|| dest_tablets_id.empty() || !min_split_start_scn.is_valid_and_not_min())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arg", K(ret), K(source_tablet_handle));
LOG_WARN("invalid arg", K(ret), K(ls_handle), K(source_tablet_handle), K(dest_tablets_id), K(min_split_start_scn));
} else if (OB_ISNULL(tablet = source_tablet_handle.get_obj())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected err", K(ret), K(source_tablet_handle));
@ -2524,6 +2530,18 @@ int ObTabletSplitUtil::check_satisfy_split_condition(
LOG_INFO("should wait data complete", K(ret), "tablet_id", tablet->get_tablet_meta().tablet_id_,
"tablet_meta", source_tablet_handle.get_obj()->get_tablet_meta());
}
} else if (OB_FAIL(ls_handle.get_ls()->get_max_decided_scn(max_decided_scn))) {
LOG_WARN("get max decided log ts failed", K(ret), "ls_id", ls_handle.get_ls()->get_ls_id(),
"source_tablet_id", tablet->get_tablet_meta().tablet_id_);
if (OB_STATE_NOT_MATCH == ret) {
ret = OB_NEED_RETRY;
}
} else if (SCN::plus(max_decided_scn, 1) < min_split_start_scn) {
ret = OB_NEED_RETRY;
if (REACH_COUNT_INTERVAL(1000L)) {
LOG_INFO("need wait max decided scn reach", K(ret), "ls_id", ls_handle.get_ls()->get_ls_id(),
"source_tablet_id", tablet->get_tablet_meta().tablet_id_, K(max_decided_scn), K(min_split_start_scn));
}
} else if (MTL_TENANT_ROLE_CACHE_IS_RESTORE()) {
LOG_INFO("dont check compaction in restore progress", K(ret), "tablet_id", tablet->get_tablet_meta().tablet_id_);
} else {

View File

@ -62,7 +62,8 @@ public:
TO_STRING_KV(K_(is_inited), K_(tenant_id), K_(ls_id), K_(table_id), K_(schema_version),
K_(task_id), K_(source_tablet_id), K_(dest_tablets_id), K_(compaction_scn), K_(user_parallelism),
K_(compat_mode), K_(data_format_version), K_(consumer_group_id),
K_(can_reuse_macro_block), K_(split_sstable_type), K_(parallel_datum_rowkey_list));
K_(can_reuse_macro_block), K_(split_sstable_type), K_(parallel_datum_rowkey_list),
K_(min_split_start_scn));
private:
common::ObArenaAllocator rowkey_allocator_; // for DatumRowkey.
public:
@ -82,6 +83,7 @@ public:
bool can_reuse_macro_block_;
share::ObSplitSSTableType split_sstable_type_;
common::ObSArray<blocksstable::ObDatumRowkey> parallel_datum_rowkey_list_;
share::SCN min_split_start_scn_;
DISALLOW_COPY_AND_ASSIGN(ObTabletSplitParam);
};
@ -425,10 +427,11 @@ public:
const ObIArray<ObTabletID> &check_tablets_id,
bool &is_all_major_exists);
static int check_satisfy_split_condition(
const ObLSHandle &ls_handle,
const ObTabletHandle &source_tablet_handle,
const ObArray<ObTabletID> &dest_tablets_id,
const int64_t compaction_scn,
const ObTabletHandle &source_tablet_handle,
const ObLSHandle &ls_handle);
const share::SCN &min_split_start_scn);
static int get_split_dest_tablets_info(
const share::ObLSID &ls_id,
const ObTabletID &source_tablet_id,