[FEAT MERGE] enhance_ddl_quality

Co-authored-by: renju96 <fcbrenju@163.com>
This commit is contained in:
simonjoylet
2023-11-23 04:11:38 +00:00
committed by ob-robot
parent fd1115b382
commit 16f37c94b6
74 changed files with 3169 additions and 184 deletions

View File

@ -22,6 +22,7 @@
#include "share/ob_freeze_info_proxy.h"
#include "share/ob_get_compat_mode.h"
#include "share/schema/ob_table_dml_param.h"
#include "share/ob_ddl_sim_point.h"
#include "share/schema/ob_part_mgr_util.h"
#include "sql/engine/px/ob_granule_util.h"
#include "sql/ob_sql_utils.h"
@ -40,6 +41,7 @@
#include "storage/lob/ob_lob_util.h"
#include "logservice/ob_log_service.h"
#include "storage/ddl/ob_tablet_ddl_kv_mgr.h"
#include "observer/ob_server_event_history_table_operator.h"
namespace oceanbase
{
@ -57,6 +59,25 @@ using namespace blocksstable;
namespace storage
{
void add_ddl_event(const ObComplementDataParam *param, const ObString &stmt)
{
if (OB_NOT_NULL(param)) {
char table_id_buffer[256];
char tablet_id_buffer[256];
snprintf(table_id_buffer, sizeof(table_id_buffer), "source_table_id:%ld, dest_table_id:%ld", param->orig_table_id_, param->dest_table_id_);
snprintf(tablet_id_buffer, sizeof(tablet_id_buffer), "source_id:%lu, dest_id:%lu", param->orig_tablet_id_.id(), param->dest_tablet_id_.id());
SERVER_EVENT_ADD("ddl", stmt.ptr(),
"tenant_id", param->dest_tenant_id_,
"ret", ret,
"trace_id", *ObCurTraceId::get_trace_id(),
"task_id", param->task_id_,
"table_id", table_id_buffer,
"schema_version", param->dest_schema_version_,
tablet_id_buffer);
}
LOG_INFO("complement data task.", K(ret), "ddl_event_info", ObDDLEventInfo(), K(stmt), KPC(param));
}
int ObComplementDataParam::init(const ObDDLBuildSingleReplicaRequestArg &arg)
{
@ -193,6 +214,8 @@ int ObComplementDataParam::split_task_ranges(
} else if (OB_ISNULL(tablet_service = ls_handle.get_ls()->get_tablet_svr())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("tablet service is nullptr", K(ret));
} else if (OB_FAIL(DDL_SIM(MTL_ID(), task_id_, COMPLEMENT_DATA_TASK_SPLIT_RANGE_FAILED))) {
LOG_WARN("ddl sim failure", K(ret), K(MTL_ID()), K(task_id_));
} else {
int64_t total_size = 0;
int64_t expected_task_count = 0;
@ -329,7 +352,7 @@ int ObComplementDataContext::write_start_log(const ObComplementDataParam &param)
} else if (OB_UNLIKELY(!hidden_table_key.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid table key", K(ret), K(hidden_table_key));
} else if (OB_FAIL(data_sstable_redo_writer_.start_ddl_redo(hidden_table_key,
} else if (OB_FAIL(data_sstable_redo_writer_.start_ddl_redo(hidden_table_key, param.task_id_,
param.execution_id_, param.data_format_version_, ddl_kv_mgr_handle_))) {
LOG_WARN("fail write start log", K(ret), K(hidden_table_key), K(param));
} else {
@ -675,8 +698,6 @@ int ObComplementPrepareTask::process()
int ret = OB_SUCCESS;
ObIDag *tmp_dag = get_dag();
ObComplementDataDag *dag = nullptr;
ObComplementWriteTask *write_task = nullptr;
ObComplementMergeTask *merge_task = nullptr;
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("ObComplementPrepareTask has not been inited", K(ret));
@ -699,13 +720,15 @@ int ObComplementPrepareTask::process()
param_->tablet_task_id_))) {
LOG_WARN("failed to delete checksum", K(ret), KPC(param_));
} else {
LOG_INFO("finish the complement prepare task", K(ret), KPC(param_));
LOG_INFO("finish the complement prepare task", K(ret), KPC(param_), "ddl_event_info", ObDDLEventInfo());
}
if (OB_FAIL(ret)) {
context_->complement_data_ret_ = ret;
ret = OB_SUCCESS;
}
add_ddl_event(param_, "complement prepare task");
return ret;
}
@ -773,6 +796,9 @@ int ObComplementWriteTask::process()
} else if (param_->dest_tenant_id_ == param_->orig_tenant_id_) {
if (OB_FAIL(local_scan_by_range())) {
LOG_WARN("local scan and append row for column redefinition failed", K(ret), K(task_id_));
} else {
ObDDLEventInfo event_info;
LOG_INFO("finish the complement write task", K(ret), "ddl_event_info", ObDDLEventInfo());
}
} else if (OB_FAIL(remote_scan())) {
LOG_WARN("remote scan for recover restore table ddl failed", K(ret));
@ -781,6 +807,8 @@ int ObComplementWriteTask::process()
context_->complement_data_ret_ = ret;
ret = OB_SUCCESS;
}
add_ddl_event(param_, "complement write task");
return ret;
}
@ -965,6 +993,8 @@ int ObComplementWriteTask::do_local_scan()
} else if (OB_UNLIKELY(nullptr == ls_handle.get_ls())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("ls is null", K(ret), K(ls_handle));
} else if (OB_FAIL(DDL_SIM(tenant_id, param_->task_id_, COMPLEMENT_DATA_TASK_LOCAL_SCAN_FAILED))) {
LOG_WARN("ddl sim failure", K(ret), KPC(param_));
} else if (OB_FAIL(ls_handle.get_ls()->get_tablet_svr()->get_read_tables(param_->orig_tablet_id_,
ObTabletCommon::DEFAULT_GET_TABLET_DURATION_US,
param_->snapshot_version_, iterator, allow_not_ready))) {
@ -1214,6 +1244,8 @@ int ObComplementWriteTask::append_row(ObScan *scan)
t1 = ObTimeUtility::current_time();
if (OB_FAIL(dag_yield())) {
LOG_WARN("fail to yield dag", KR(ret));
} else if (OB_FAIL(DDL_SIM(param_->dest_tenant_id_, param_->task_id_, DDL_INSERT_SSTABLE_GET_NEXT_ROW_FAILED))) {
LOG_WARN("ddl sim failure", K(ret), KPC(param_));
} else if (OB_FAIL(scan->get_next_row(tmp_row, reshape_row_only_for_remote_scan))) {
if (OB_UNLIKELY(OB_ITER_END != ret)) {
LOG_WARN("fail to get next row", K(ret));
@ -1428,6 +1460,8 @@ int ObComplementMergeTask::process()
ret = OB_SUCCESS == ret ? tmp_ret : ret;
LOG_WARN("fail to report replica build status", K(ret), K(tmp_ret));
}
add_ddl_event(param_, "complement merge task");
return ret;
}