set the initial value of wrs to scn min

This commit is contained in:
obdev
2022-11-28 02:55:06 +00:00
committed by ob-robot
parent 99d4f56fec
commit 87a9357186
406 changed files with 6473 additions and 50308 deletions

View File

@ -132,18 +132,20 @@ int ObComplementDataParam::deep_copy_table_schemas(const ObTableSchema *data_tab
LOG_WARN("invalid arguments", K(ret), KP(data_table_schema), KP(hidden_table_schema));
} else {
ObIAllocator &allocator = allocator_;
const int64_t alloc_size = 2 * sizeof(ObTableSchema);
char *buf = nullptr;
if (OB_ISNULL(buf = static_cast<char *>(allocator.alloc(alloc_size)))) {
const int64_t alloc_size = sizeof(ObTableSchema);
char *buf_for_data_schema = nullptr;
char *buf_for_hidden_schema = nullptr;
if (OB_ISNULL(buf_for_data_schema = static_cast<char *>(allocator.alloc(alloc_size)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to allocate memory", K(ret));
LOG_WARN("alloc memory failed", K(ret));
} else if (OB_ISNULL(buf_for_hidden_schema = static_cast<char *>(allocator.alloc(alloc_size)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("alloc memory failed", K(ret));
} else {
ObTableSchema *deep_copy_data_table_schema = nullptr;
ObTableSchema *deep_copy_hidden_table_schema = nullptr;
deep_copy_data_table_schema = new (buf) ObTableSchema(&allocator);
buf += sizeof(ObTableSchema);
deep_copy_hidden_table_schema = new (buf) ObTableSchema(&allocator);
buf += sizeof(ObTableSchema);
deep_copy_data_table_schema = new (buf_for_data_schema) ObTableSchema(&allocator);
deep_copy_hidden_table_schema = new (buf_for_hidden_schema) ObTableSchema(&allocator);
if (OB_FAIL(deep_copy_data_table_schema->assign(*data_table_schema))) {
LOG_WARN("fail to assign data table schema", K(ret));
} else if (OB_FAIL(deep_copy_hidden_table_schema->assign(*hidden_table_schema))) {
@ -152,11 +154,23 @@ int ObComplementDataParam::deep_copy_table_schemas(const ObTableSchema *data_tab
data_table_schema_ = deep_copy_data_table_schema;
hidden_table_schema_ = deep_copy_hidden_table_schema;
}
if (OB_FAIL(ret) && OB_NOT_NULL(buf)) {
deep_copy_data_table_schema->~ObTableSchema();
deep_copy_hidden_table_schema->~ObTableSchema();
allocator.free(buf);
buf = nullptr;
if (OB_FAIL(ret)) {
if (nullptr != deep_copy_data_table_schema) {
deep_copy_data_table_schema->~ObTableSchema();
deep_copy_data_table_schema = nullptr;
}
if (nullptr != buf_for_data_schema) {
allocator_.free(buf_for_data_schema);
buf_for_data_schema = nullptr;
}
if (nullptr != deep_copy_hidden_table_schema) {
deep_copy_hidden_table_schema->~ObTableSchema();
deep_copy_hidden_table_schema = nullptr;
}
if (nullptr != buf_for_hidden_schema) {
allocator_.free(buf_for_hidden_schema);
buf_for_hidden_schema = nullptr;
}
}
}
}
@ -259,12 +273,15 @@ int ObComplementDataContext::init(const ObComplementDataParam &param, const ObDa
{
int ret = OB_SUCCESS;
void *builder_buf = nullptr;
const ObSSTable *latest_major_sstable = nullptr;
if (OB_UNLIKELY(is_inited_)) {
ret = OB_INIT_TWICE;
LOG_WARN("ObComplementDataContext has already been inited", K(ret));
} else if (OB_UNLIKELY(!param.is_valid() || !desc.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arguments", K(ret), K(param), K(desc));
} else if (OB_FAIL(ObTabletDDLUtil::check_and_get_major_sstable(param.ls_id_, param.dest_tablet_id_, latest_major_sstable))) {
LOG_WARN("check if major sstable exist failed", K(ret), K(param));
} else if (OB_FAIL(data_sstable_redo_writer_.init(param.ls_id_,
param.dest_tablet_id_))) {
LOG_WARN("fail to init data sstable redo writer", K(ret), K(param));
@ -279,6 +296,7 @@ int ObComplementDataContext::init(const ObComplementDataParam &param, const ObDa
} else if (OB_FAIL(index_builder_->init(desc))) {
LOG_WARN("failed to init index builder", K(ret), K(desc));
} else {
is_major_sstable_exist_ = nullptr != latest_major_sstable ? true : false;
concurrent_cnt_ = param.concurrent_cnt_;
is_inited_ = true;
}
@ -321,6 +339,7 @@ int ObComplementDataContext::write_start_log(const ObComplementDataParam &param)
void ObComplementDataContext::destroy()
{
is_inited_ = false;
is_major_sstable_exist_ = false;
complement_data_ret_ = OB_SUCCESS;
concurrent_cnt_ = 0;
if (OB_NOT_NULL(index_builder_)) {
@ -363,6 +382,51 @@ int ObComplementDataDag::init(const ObDDLBuildSingleReplicaRequestArg &arg)
return ret;
}
int ObComplementDataDag::create_first_task()
{
int ret = OB_SUCCESS;
ObComplementPrepareTask *prepare_task = nullptr;
ObComplementWriteTask *write_task = nullptr;
ObComplementMergeTask *merge_task = nullptr;
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("not init", K(ret));
} else if (OB_FAIL(alloc_task(prepare_task))) {
LOG_WARN("allocate task failed", K(ret));
} else if (OB_ISNULL(prepare_task)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected nullptr task", K(ret));
} else if (OB_FAIL(prepare_task->init(param_, context_))) {
LOG_WARN("init prepare task failed", K(ret));
} else if (OB_FAIL(add_task(*prepare_task))) {
LOG_WARN("add task failed", K(ret));
} else if (OB_FAIL(alloc_task(write_task))) {
LOG_WARN("alloc task failed", K(ret));
} else if (OB_ISNULL(write_task)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected nullptr task", K(ret));
} else if (OB_FAIL(write_task->init(0, param_, context_))) {
LOG_WARN("init write task failed", K(ret));
} else if (OB_FAIL(prepare_task->add_child(*write_task))) {
LOG_WARN("add child task failed", K(ret));
} else if (OB_FAIL(add_task(*write_task))) {
LOG_WARN("add task failed", K(ret));
} else if (OB_FAIL(alloc_task(merge_task))) {
LOG_WARN("alloc task failed", K(ret));
} else if (OB_ISNULL(merge_task)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected nullptr task", K(ret));
} else if (OB_FAIL(merge_task->init(param_, context_))) {
LOG_WARN("init merge task failed", K(ret));
} else if (OB_FAIL(write_task->add_child(*merge_task))) {
LOG_WARN("add child task failed", K(ret));
} else if (OB_FAIL(add_task(*merge_task))) {
LOG_WARN("add task failed");
}
return ret;
}
int ObComplementDataDag::prepare_context()
{
int ret = OB_SUCCESS;
@ -420,8 +484,9 @@ int64_t ObComplementDataDag::hash() const
tmp_ret = OB_ERR_SYS;
LOG_ERROR("table schema must not be NULL", K(tmp_ret), K(is_inited_), K(param_));
} else {
hash_val = param_.ls_id_.hash() + param_.source_tablet_id_.hash() + param_.dest_tablet_id_.hash() +
param_.data_table_schema_->get_table_id() + param_.hidden_table_schema_->get_table_id() + ObDagType::DAG_TYPE_DDL;
hash_val = param_.tenant_id_ + param_.ls_id_.hash()
+ param_.data_table_schema_->get_table_id() + param_.hidden_table_schema_->get_table_id()
+ param_.source_tablet_id_.hash() + param_.dest_tablet_id_.hash() + ObDagType::DAG_TYPE_DDL;
}
return hash_val;
}
@ -438,11 +503,10 @@ bool ObComplementDataDag::operator==(const ObIDag &other) const
tmp_ret = OB_ERR_SYS;
LOG_ERROR("invalid argument", K(tmp_ret), K(param_), K(dag.param_));
} else {
is_equal = (param_.ls_id_ == dag.param_.ls_id_) && (param_.tenant_id_ == dag.param_.tenant_id_) &&
(param_.source_tablet_id_ == dag.param_.source_tablet_id_) && (param_.dest_tablet_id_ == dag.param_.dest_tablet_id_) &&
is_equal = (param_.tenant_id_ == dag.param_.tenant_id_) && (param_.ls_id_ == dag.param_.ls_id_) &&
(param_.data_table_schema_->get_table_id() == dag.param_.data_table_schema_->get_table_id()) &&
(param_.hidden_table_schema_->get_table_id() == dag.param_.hidden_table_schema_->get_table_id()) &&
(param_.compat_mode_ == dag.param_.compat_mode_);
(param_.source_tablet_id_ == dag.param_.source_tablet_id_) && (param_.dest_tablet_id_ == dag.param_.dest_tablet_id_);
}
}
return is_equal;
@ -460,6 +524,12 @@ int ObComplementDataDag::report_replica_build_status()
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid param", K(ret), K(param_));
} else {
#ifdef ERRSIM
if (OB_SUCC(ret)) {
ret = E(EventTable::EN_DDL_REPORT_REPLICA_BUILD_STATUS_FAIL) OB_SUCCESS;
LOG_INFO("report replica build status errsim", K(ret));
}
#endif
obrpc::ObDDLBuildSingleReplicaResponseArg arg;
ObAddr rs_addr;
arg.tenant_id_ = param_.tenant_id_;
@ -473,7 +543,8 @@ int ObComplementDataDag::report_replica_build_status()
arg.task_id_ = param_.task_id_;
arg.execution_id_ = param_.execution_id_;
FLOG_INFO("send replica build status response to RS", K(ret), K(context_.complement_data_ret_), K(arg));
if (OB_ISNULL(GCTX.rs_rpc_proxy_) || OB_ISNULL(GCTX.rs_mgr_)) {
if (OB_FAIL(ret)) {
} else if (OB_ISNULL(GCTX.rs_rpc_proxy_) || OB_ISNULL(GCTX.rs_mgr_)) {
ret = OB_ERR_SYS;
LOG_WARN("innner system error, rootserver rpc proxy or rs mgr must not be NULL", K(ret), K(GCTX));
} else if (OB_FAIL(GCTX.rs_mgr_->get_master_root_server(rs_addr))) {
@ -561,12 +632,10 @@ int ObComplementPrepareTask::process()
} else if (FALSE_IT(dag = static_cast<ObComplementDataDag *>(tmp_dag))) {
} else if (OB_FAIL(dag->prepare_context())) {
LOG_WARN("prepare complement context failed", K(ret));
} else if (context_->is_major_sstable_exist_) {
FLOG_INFO("major sstable exists, all task should finish", K(ret), K(*param_));
} else if (OB_FAIL(context_->write_start_log(*param_))) {
LOG_WARN("write start log failed", K(ret), KPC(param_));
} else if (OB_FAIL(generate_complement_write_task(dag, write_task))) {
LOG_WARN("fail to generate complement write task", K(ret));
} else if (OB_FAIL(generate_complement_merge_task(dag, write_task, merge_task))) {
LOG_WARN("fail to generate complement merge task", K(ret));
} else {
LOG_INFO("finish the complement prepare task", K(ret));
}
@ -578,51 +647,6 @@ int ObComplementPrepareTask::process()
return ret;
}
int ObComplementPrepareTask::generate_complement_write_task(ObComplementDataDag *dag,
ObComplementWriteTask *&write_task)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("ObComplementPrepareTask has not been inited", K(ret));
} else if (OB_ISNULL(dag)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arguments", K(ret), KP(dag));
} else if (OB_FAIL(dag->alloc_task(write_task))) {
LOG_WARN("fail to alloc write task", K(ret));
} else if (OB_FAIL(write_task->init(0, *param_, *context_))) {
LOG_WARN("fail to init complement write task", K(ret));
} else if (OB_FAIL(add_child(*write_task))) {
LOG_WARN("fail to add child for complement prepare task", K(ret));
} else if (OB_FAIL(dag->add_task(*write_task))) {
LOG_WARN("fail to add complement write task to dag", K(ret));
}
return ret;
}
int ObComplementPrepareTask::generate_complement_merge_task(ObComplementDataDag *dag,
ObComplementWriteTask *write_task, ObComplementMergeTask *&merge_task)
{
int ret = OB_SUCCESS;
merge_task = NULL;
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("ObComplementPrepareTask has not been inited", K(ret));
} else if (OB_ISNULL(dag) || OB_ISNULL(write_task)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arguments", K(ret), KP(dag), KP(write_task));
} else if (OB_FAIL(dag->alloc_task(merge_task))) {
LOG_WARN("fail to alloc merge task", K(ret));
} else if (OB_FAIL(merge_task->init(*param_, *context_))) {
LOG_WARN("fail to init merge task", K(ret));
} else if (OB_FAIL(write_task->add_child(*merge_task))) {
LOG_WARN("fail to add child for write task", K(ret));
} else if (OB_FAIL(dag->add_task(*merge_task))) {
LOG_WARN("fail to add merge task", K(ret));
}
return ret;
}
ObComplementWriteTask::ObComplementWriteTask()
: ObITask(TASK_TYPE_COMPLEMENT_WRITE), is_inited_(false), task_id_(0), param_(nullptr),
context_(nullptr), write_row_(),
@ -670,6 +694,7 @@ int ObComplementWriteTask::process()
LOG_WARN("dag is invalid", K(ret), KP(tmp_dag));
} else if (OB_SUCCESS != (context_->complement_data_ret_)) {
LOG_WARN("complement data has already failed", "ret", context_->complement_data_ret_);
} else if (context_->is_major_sstable_exist_) {
} else if (OB_FAIL(guard.switch_to(param_->tenant_id_))) {
LOG_WARN("switch to tenant failed", K(ret), K(param_->tenant_id_));
} else if (OB_FAIL(local_scan_by_range())) {
@ -1121,6 +1146,23 @@ int ObComplementMergeTask::process()
LOG_WARN("complement data has already failed", "ret", context_->complement_data_ret_);
} else if (OB_FAIL(guard.switch_to(param_->hidden_table_schema_->get_tenant_id()))) {
LOG_WARN("switch to tenant failed", K(ret), K(param_->hidden_table_schema_->get_tenant_id()));
} else if (context_->is_major_sstable_exist_) {
const ObSSTable *latest_major_sstable = nullptr;
if (OB_FAIL(ObTabletDDLUtil::check_and_get_major_sstable(param_->ls_id_, param_->dest_tablet_id_, latest_major_sstable))) {
LOG_WARN("check if major sstable exist failed", K(ret), K(*param_));
} else if (OB_ISNULL(latest_major_sstable)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected error, major sstable shoud not be null", K(ret), K(*param_));
} else if (OB_FAIL(ObTabletDDLUtil::report_ddl_checksum(param_->ls_id_,
param_->dest_tablet_id_,
param_->hidden_table_schema_->get_table_id(),
1 /* execution_id */,
param_->task_id_,
latest_major_sstable->get_meta().get_col_checksum()))) {
LOG_WARN("report ddl column checksum failed", K(ret), K(*param_));
} else if (OB_FAIL(GCTX.ob_service_->submit_tablet_update_task(param_->tenant_id_, param_->ls_id_, param_->dest_tablet_id_))) {
LOG_WARN("fail to submit tablet update task", K(ret), K(*param_));
}
} else if (OB_FAIL(add_build_hidden_table_sstable())) {
LOG_WARN("fail to build new sstable and write macro redo", K(ret));
}