From 6fc4bc4a2679d1f48dc105d29a8dfd8aa83a82fd Mon Sep 17 00:00:00 2001 From: YoungYang0820 Date: Thu, 17 Nov 2022 06:35:42 +0000 Subject: [PATCH] persist execution id and protect ddl start using execution id --- src/observer/ob_rpc_processor_simple.cpp | 1 - src/observer/ob_server.cpp | 3 + .../ddl_task/ob_column_redefinition_task.cpp | 11 +- .../ddl_task/ob_ddl_redefinition_task.h | 1 - src/rootserver/ddl_task/ob_ddl_retry_task.cpp | 3 +- src/rootserver/ddl_task/ob_ddl_scheduler.cpp | 6 +- src/rootserver/ddl_task/ob_ddl_task.cpp | 106 ++++++++++++++++-- src/rootserver/ddl_task/ob_ddl_task.h | 20 +++- .../ddl_task/ob_index_build_task.cpp | 25 +++-- .../ddl_task/ob_table_redefinition_task.cpp | 17 ++- src/storage/ddl/ob_complement_data_task.cpp | 12 +- src/storage/ddl/ob_ddl_clog.cpp | 11 +- src/storage/ddl/ob_ddl_clog.h | 8 +- src/storage/ddl/ob_ddl_redo_log_replayer.cpp | 6 +- src/storage/ddl/ob_ddl_redo_log_writer.cpp | 99 +++++++++------- src/storage/ddl/ob_ddl_redo_log_writer.h | 13 ++- .../ddl/ob_direct_insert_sstable_ctx.cpp | 12 +- src/storage/ddl/ob_tablet_ddl_kv_mgr.cpp | 80 ++++++++----- src/storage/ddl/ob_tablet_ddl_kv_mgr.h | 6 +- src/storage/ob_storage_struct.cpp | 6 + src/storage/ob_storage_struct.h | 4 +- src/storage/tablet/ob_tablet.cpp | 10 +- src/storage/tablet/ob_tablet_meta.cpp | 46 +++++++- src/storage/tablet/ob_tablet_meta.h | 12 +- 24 files changed, 377 insertions(+), 141 deletions(-) diff --git a/src/observer/ob_rpc_processor_simple.cpp b/src/observer/ob_rpc_processor_simple.cpp index 3fd1f8dec2..2c88421548 100644 --- a/src/observer/ob_rpc_processor_simple.cpp +++ b/src/observer/ob_rpc_processor_simple.cpp @@ -1934,7 +1934,6 @@ int ObRpcRemoteWriteDDLPrepareLogP::process() } else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->ddl_prepare(arg_.start_log_ts_, prepare_log_ts, arg_.table_id_, - arg_.execution_id_, arg_.ddl_task_id_))) { LOG_WARN("failed to do ddl kv prepare", K(ret), K(arg_)); } else { diff --git a/src/observer/ob_server.cpp b/src/observer/ob_server.cpp index f0646b3d9c..36e4f07f4a 100644 --- a/src/observer/ob_server.cpp +++ b/src/observer/ob_server.cpp @@ -98,6 +98,7 @@ #include "share/ob_server_blacklist.h" #include "share/ob_primary_standby_service.h" // ObPrimaryStandbyService #include "logservice/palf/election/interface/election.h" +#include "storage/ddl/ob_ddl_redo_log_writer.h" using namespace oceanbase::lib; using namespace oceanbase::common; @@ -364,6 +365,8 @@ int ObServer::init(const ObServerOptions &opts, const ObPLogWriterCfg &log_cfg) LOG_ERROR("init ASH failed", KR(ret)); } else if (OB_FAIL(ObServerBlacklist::get_instance().init(self_addr_, net_frame_.get_req_transport()))) { LOG_ERROR("init server blacklist failed", KR(ret)); + } else if (OB_FAIL(ObDDLRedoLogWriter::get_instance().init())) { + LOG_WARN("init DDL redo log writer failed", KR(ret)); } else { GDS.set_rpc_proxy(&rs_rpc_proxy_); } diff --git a/src/rootserver/ddl_task/ob_column_redefinition_task.cpp b/src/rootserver/ddl_task/ob_column_redefinition_task.cpp index dfcb0eb87f..dae8335179 100644 --- a/src/rootserver/ddl_task/ob_column_redefinition_task.cpp +++ b/src/rootserver/ddl_task/ob_column_redefinition_task.cpp @@ -96,6 +96,7 @@ int ObColumnRedefinitionTask::init(const ObDDLTaskRecord &task_record) schema_version_ = schema_version; task_status_ = static_cast(task_record.task_status_); snapshot_version_ = task_record.snapshot_version_; + execution_id_ = task_record.execution_id_; tenant_id_ = task_record.tenant_id_; ret_code_ = task_record.ret_code_; is_inited_ = true; @@ -140,7 +141,6 @@ int ObColumnRedefinitionTask::send_build_single_replica_request() ret = OB_NOT_INIT; LOG_WARN("ObColumnRedefinitionTask has not been inited", K(ret)); } else { - redefinition_execution_id_ = ObTimeUtility::fast_current_time(); ObDDLSingleReplicaExecutorParam param; param.tenant_id_ = tenant_id_; param.type_ = task_type_; @@ -150,7 +150,7 @@ int ObColumnRedefinitionTask::send_build_single_replica_request() param.snapshot_version_ = snapshot_version_; param.task_id_ = task_id_; param.parallelism_ = alter_table_arg_.parallelism_; - param.execution_id_ = redefinition_execution_id_; + param.execution_id_ = execution_id_; if (OB_FAIL(ObDDLUtil::get_tablets(tenant_id_, object_id_, param.source_tablet_ids_))) { LOG_WARN("fail to get tablets", K(ret), K(tenant_id_), K(object_id_)); } else if (OB_FAIL(ObDDLUtil::get_tablets(tenant_id_, target_object_id_, param.dest_tablet_ids_))) { @@ -192,6 +192,7 @@ int ObColumnRedefinitionTask::update_complete_sstable_job_status(const common::O const int ret_code) { int ret = OB_SUCCESS; + bool is_latest_execution_id = false; if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; LOG_WARN("ObColumnRedefinitionTask has not been inited", K(ret)); @@ -200,8 +201,10 @@ int ObColumnRedefinitionTask::update_complete_sstable_job_status(const common::O } else if (snapshot_version != snapshot_version_) { ret = OB_ERR_UNEXPECTED; LOG_WARN("snapshot version not match", K(ret), K(snapshot_version), K(snapshot_version_)); - } else if (execution_id != redefinition_execution_id_) { - LOG_INFO("receive a mismatch execution result, ignore", K(execution_id), K(redefinition_execution_id_)); + } else if (OB_FAIL(check_is_latest_execution_id(execution_id, is_latest_execution_id))) { + LOG_WARN("failed to check latest execution id", K(ret), K(execution_id)); + } else if (!is_latest_execution_id) { + LOG_INFO("receive a mismatch execution result, ignore", K(execution_id), K(execution_id_)); } else if (OB_FAIL(replica_builder_.set_partition_task_status(tablet_id, ret_code))) { LOG_WARN("fail to set partition task status", K(ret)); } diff --git a/src/rootserver/ddl_task/ob_ddl_redefinition_task.h b/src/rootserver/ddl_task/ob_ddl_redefinition_task.h index 1b1a26ae2c..ebb6434466 100644 --- a/src/rootserver/ddl_task/ob_ddl_redefinition_task.h +++ b/src/rootserver/ddl_task/ob_ddl_redefinition_task.h @@ -217,7 +217,6 @@ protected: int64_t update_autoinc_job_time_; int64_t check_table_empty_job_ret_code_; int64_t check_table_empty_job_time_; - int64_t redefinition_execution_id_; }; } // end namespace rootserver diff --git a/src/rootserver/ddl_task/ob_ddl_retry_task.cpp b/src/rootserver/ddl_task/ob_ddl_retry_task.cpp index 310f27d9b5..f5685e0c4a 100644 --- a/src/rootserver/ddl_task/ob_ddl_retry_task.cpp +++ b/src/rootserver/ddl_task/ob_ddl_retry_task.cpp @@ -581,11 +581,12 @@ int ObDDLRetryTask::update_task_status_succ( int64_t affected_rows = 0; ObSqlString sql_string; int64_t curr_task_status = 0; + int64_t execution_id = 0; /*unused*/ const int64_t new_task_status = ObDDLTaskStatus::SUCCESS; if (OB_UNLIKELY(OB_INVALID_ID == tenant_id || task_id <= 0)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", K(ret), K(tenant_id), K(task_id)); - } else if (OB_FAIL(ObDDLTaskRecordOperator::select_for_update(trans, tenant_id, task_id, curr_task_status))) { + } else if (OB_FAIL(ObDDLTaskRecordOperator::select_for_update(trans, tenant_id, task_id, curr_task_status, execution_id))) { LOG_WARN("select for update failed", K(ret), K(tenant_id), K(task_id)); } else if (OB_FAIL(ObDDLTaskRecordOperator::update_task_status(trans, tenant_id, task_id, new_task_status))) { LOG_WARN("update task status failed", K(ret)); diff --git a/src/rootserver/ddl_task/ob_ddl_scheduler.cpp b/src/rootserver/ddl_task/ob_ddl_scheduler.cpp index ea1009891a..1e812d90e0 100644 --- a/src/rootserver/ddl_task/ob_ddl_scheduler.cpp +++ b/src/rootserver/ddl_task/ob_ddl_scheduler.cpp @@ -902,6 +902,7 @@ int ObDDLScheduler::recover_task() const ObDDLTaskRecord &cur_record = task_records.at(i); int64_t tenant_schema_version = 0; int64_t table_task_status = 0; + int64_t execution_id = 0; ObMySQLTransaction trans; if (OB_FAIL(schema_service.get_tenant_schema_version(cur_record.tenant_id_, tenant_schema_version))) { LOG_WARN("failed to get tenant schema version", K(ret), K(cur_record)); @@ -912,7 +913,8 @@ int ObDDLScheduler::recover_task() } else if (OB_FAIL(ObDDLTaskRecordOperator::select_for_update(trans, cur_record.tenant_id_, cur_record.task_id_, - table_task_status))) { + table_task_status, + execution_id))) { LOG_WARN("select for update failed", K(ret), K(cur_record)); } else if (OB_FAIL(schedule_ddl_task(cur_record))) { LOG_WARN("failed to schedule ddl task", K(ret), K(cur_record)); @@ -1111,6 +1113,8 @@ int ObDDLScheduler::schedule_column_redefinition_task(const ObDDLTaskRecord &tas if (OB_ENTRY_EXIST != ret) { LOG_WARN("inner schedule task failed", K(ret), K(*redefinition_task)); } + } else if (OB_FAIL(redefinition_task->push_execution_id())) { + LOG_WARN("failed to push execution id", K(ret)); } if (OB_FAIL(ret) && nullptr != redefinition_task) { redefinition_task->~ObColumnRedefinitionTask(); diff --git a/src/rootserver/ddl_task/ob_ddl_task.cpp b/src/rootserver/ddl_task/ob_ddl_task.cpp index 0b621a1315..6f66f27d84 100644 --- a/src/rootserver/ddl_task/ob_ddl_task.cpp +++ b/src/rootserver/ddl_task/ob_ddl_task.cpp @@ -259,6 +259,7 @@ int ObDDLTask::convert_to_record( task_record.task_id_ = get_task_id(); task_record.parent_task_id_ = get_parent_task_id(); task_record.task_version_ = get_task_version(); + task_record.execution_id_ = get_execution_id(); task_record.ret_code_ = get_ret_code(); const ObString &ddl_stmt_str = get_ddl_stmt_str(); if (serialize_param_size > 0) { @@ -313,7 +314,8 @@ int ObDDLTask::switch_status(ObDDLTaskStatus new_status, const int ret_code) LOG_WARN("start transaction failed", K(ret)); } else { int64_t table_task_status = 0; - if (OB_FAIL(ObDDLTaskRecordOperator::select_for_update(trans, tenant_id_, task_id_, table_task_status))) { + int64_t execution_id = 0; + if (OB_FAIL(ObDDLTaskRecordOperator::select_for_update(trans, tenant_id_, task_id_, table_task_status, execution_id))) { LOG_WARN("select for update failed", K(ret), K(task_id_)); } else if (old_status != task_status_) { ret = OB_EAGAIN; @@ -551,6 +553,60 @@ int ObDDLTask::batch_release_snapshot( return ret; } +int ObDDLTask::check_is_latest_execution_id(const int64_t execution_id, bool &is_latest) +{ + int ret = OB_SUCCESS; + is_latest = true; + ObMySQLTransaction trans; + ObRootService *root_service = nullptr; + int64_t table_task_status = 0; + int64_t table_execution_id = 0; + if (OB_ISNULL(root_service = GCTX.root_service_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("error unexpected, root service must not be nullptr", K(ret)); + } else if (OB_FAIL(trans.start(&root_service->get_sql_proxy(), tenant_id_))) { + LOG_WARN("start transaction failed", K(ret)); + } else { + if (OB_FAIL(ObDDLTaskRecordOperator::select_for_update(trans, tenant_id_, task_id_, table_task_status, table_execution_id))) { + LOG_WARN("select for update failed", K(ret), K(task_id_)); + } else if (table_execution_id > execution_id) { + is_latest = false; + } + trans.end(false);// abort + } + return ret; +} + +int ObDDLTask::push_execution_id() +{ + int ret = OB_SUCCESS; + ObMySQLTransaction trans; + ObRootService *root_service = nullptr; + int64_t table_task_status = 0; + int64_t table_execution_id = 0; + if (OB_ISNULL(root_service = GCTX.root_service_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("error unexpected, root service must not be nullptr", K(ret)); + } else if (OB_FAIL(trans.start(&root_service->get_sql_proxy(), tenant_id_))) { + LOG_WARN("start transaction failed", K(ret)); + } else { + if (OB_FAIL(ObDDLTaskRecordOperator::select_for_update(trans, tenant_id_, task_id_, table_task_status, table_execution_id))) { + LOG_WARN("select for update failed", K(ret), K(task_id_)); + } else if (OB_FAIL(ObDDLTaskRecordOperator::update_execution_id(trans, tenant_id_, task_id_, table_execution_id + 1))) { + LOG_WARN("update task status failed", K(ret)); + } else { + execution_id_ = table_execution_id + 1; + } + bool commit = (OB_SUCCESS == ret); + int tmp_ret = trans.end(commit); + if (OB_SUCCESS != tmp_ret) { + LOG_WARN("fail to end trans", K(tmp_ret)); + ret = (OB_SUCCESS == ret) ? tmp_ret : ret; + } + } + return ret; +} + void ObDDLTask::calc_next_schedule_ts(int ret_code) { if (OB_TIMEOUT == ret_code) { @@ -1382,7 +1438,8 @@ bool ObDDLTaskRecord::is_valid() const && task_version_ > 0 && OB_INVALID_ID != object_id_ && schema_version_ > 0 - && ret_code_ >= 0; + && ret_code_ >= 0 + && execution_id_ >= 0; return is_valid; } @@ -1401,6 +1458,7 @@ void ObDDLTaskRecord::reset() message_.reset(); task_version_ = 0; ret_code_ = OB_SUCCESS; + execution_id_ = 0; } @@ -1477,6 +1535,30 @@ int ObDDLTaskRecordOperator::update_ret_code( return ret; } +int ObDDLTaskRecordOperator::update_execution_id( + common::ObISQLClient &sql_client, + const uint64_t tenant_id, + const int64_t task_id, + const int64_t execution_id) +{ + int ret = OB_SUCCESS; + ObSqlString sql_string; + int64_t affected_rows = 0; + if (OB_ISNULL(sql_client.get_pool()) || OB_UNLIKELY(task_id <= 0 || tenant_id <= 0 || execution_id <= 0)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid arg", K(ret), K(tenant_id), K(task_id)); + } else if (OB_FAIL(sql_string.assign_fmt(" UPDATE %s SET execution_id=%lu WHERE task_id=%lu ", + OB_ALL_DDL_TASK_STATUS_TNAME, execution_id, task_id))) { + LOG_WARN("assign sql string failed", K(ret), K(execution_id), K(task_id)); + } else if (OB_FAIL(sql_client.write(tenant_id, sql_string.ptr(), affected_rows))) { + LOG_WARN("update snapshot_version of ddl task record failed", K(ret), K(sql_string)); + } else if (OB_UNLIKELY(affected_rows < 0)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected affected_rows", K(ret), K(affected_rows)); + } + return ret; +} + int ObDDLTaskRecordOperator::update_message( common::ObISQLClient &proxy, const uint64_t tenant_id, @@ -1541,7 +1623,7 @@ int ObDDLTaskRecordOperator::check_is_adding_constraint( SMART_VAR(ObMySQLProxy::MySQLResult, res) { sqlclient::ObMySQLResult *result = NULL; if (OB_FAIL(sql_string.assign_fmt(" SELECT tenant_id, task_id, object_id, target_object_id, ddl_type, " - "schema_version, parent_task_id, trace_id, status, snapshot_version, task_version," + "schema_version, parent_task_id, trace_id, status, snapshot_version, task_version, execution_id, " "UNHEX(ddl_stmt_str) as ddl_stmt_str_unhex, ret_code, UNHEX(message) as message_unhex FROM %s " "WHERE object_id = %" PRIu64 " && ddl_type IN (%d, %d, %d)", OB_ALL_VIRTUAL_DDL_TASK_STATUS_TNAME, object_id, DDL_CHECK_CONSTRAINT, DDL_FOREIGN_KEY_CONSTRAINT, DDL_ADD_NOT_NULL_COLUMN))) { @@ -1633,7 +1715,7 @@ int ObDDLTaskRecordOperator::check_has_conflict_ddl( SMART_VAR(ObMySQLProxy::MySQLResult, res) { sqlclient::ObMySQLResult *result = nullptr; if (OB_FAIL(sql_string.assign_fmt("SELECT tenant_id, task_id, object_id, target_object_id, ddl_type," - "schema_version, parent_task_id, trace_id, status, snapshot_version, task_version," + "schema_version, parent_task_id, trace_id, status, snapshot_version, task_version, execution_id," "UNHEX(ddl_stmt_str) as ddl_stmt_str_unhex, ret_code, UNHEX(message) as message_unhex FROM %s " "WHERE tenant_id = %lu AND object_id = %lu", OB_ALL_VIRTUAL_DDL_TASK_STATUS_TNAME, tenant_id, table_id))) { @@ -1689,7 +1771,7 @@ int ObDDLTaskRecordOperator::get_all_record( SMART_VAR(ObMySQLProxy::MySQLResult, res) { sqlclient::ObMySQLResult *result = NULL; if (OB_FAIL(sql_string.assign_fmt(" SELECT tenant_id, task_id, object_id, target_object_id, ddl_type, " - "schema_version, parent_task_id, trace_id, status, snapshot_version, task_version," + "schema_version, parent_task_id, trace_id, status, snapshot_version, task_version, execution_id, " "UNHEX(ddl_stmt_str) as ddl_stmt_str_unhex, ret_code, UNHEX(message) as message_unhex FROM %s ", OB_ALL_VIRTUAL_DDL_TASK_STATUS_TNAME))) { LOG_WARN("assign sql string failed", K(ret)); } else if (OB_FAIL(proxy.read(res, sql_string.ptr()))) { @@ -1758,11 +1840,11 @@ int ObDDLTaskRecordOperator::insert_record( } else if (OB_FAIL(to_hex_str(record.message_, message_string))) { LOG_WARN("append hex escaped string failed", K(ret)); } else if (OB_FAIL(sql_string.assign_fmt( - " INSERT INTO %s (task_id, parent_task_id, tenant_id, object_id, schema_version, target_object_id, ddl_type, trace_id, status, task_version, ret_code, ddl_stmt_str, message) " - " VALUES (%lu, %lu, %lu, %lu, %lu, %lu, %d, '%s', %ld, %lu, %lu, '%.*s', \"%.*s\") ", + " INSERT INTO %s (task_id, parent_task_id, tenant_id, object_id, schema_version, target_object_id, ddl_type, trace_id, status, task_version, execution_id, ret_code, ddl_stmt_str, message) " + " VALUES (%lu, %lu, %lu, %lu, %lu, %lu, %d, '%s', %ld, %lu, %lu, %lu, '%.*s', \"%.*s\") ", OB_ALL_DDL_TASK_STATUS_TNAME, record.task_id_, record.parent_task_id_, ObSchemaUtils::get_extract_tenant_id(record.tenant_id_, record.tenant_id_), record.object_id_, record.schema_version_, - get_record_id(record.ddl_type_, record.target_object_id_), record.ddl_type_, trace_id_str, record.task_status_, record.task_version_, record.ret_code_, + get_record_id(record.ddl_type_, record.target_object_id_), record.ddl_type_, trace_id_str, record.task_status_, record.task_version_, record.execution_id_, record.ret_code_, static_cast(ddl_stmt_string.length()), ddl_stmt_string.ptr(), static_cast(message_string.length()), message_string.ptr()))) { LOG_WARN("assign sql string failed", K(ret), K(record)); } else if (OB_FAIL(proxy.write(record.tenant_id_, sql_string.ptr(), affected_rows))) { @@ -1803,6 +1885,7 @@ int ObDDLTaskRecordOperator::fill_task_record( EXTRACT_UINT_FIELD_MYSQL(*result_row, "snapshot_version", task_record.snapshot_version_, uint64_t); EXTRACT_INT_FIELD_MYSQL(*result_row, "task_version", task_record.task_version_, int64_t); EXTRACT_INT_FIELD_MYSQL(*result_row, "ret_code", task_record.ret_code_, int64_t); + EXTRACT_INT_FIELD_MYSQL(*result_row, "execution_id", task_record.execution_id_, int64_t); EXTRACT_VARCHAR_FIELD_MYSQL(*result_row, "message_unhex", task_message); EXTRACT_VARCHAR_FIELD_MYSQL(*result_row, "ddl_stmt_str_unhex", ddl_stmt_str); if (OB_SUCC(ret)) { @@ -1849,18 +1932,20 @@ int ObDDLTaskRecordOperator::select_for_update( common::ObMySQLTransaction &trans, const uint64_t tenant_id, const int64_t task_id, - int64_t &task_status) + int64_t &task_status, + int64_t &execution_id) { int ret = OB_SUCCESS; ObSqlString sql_string; task_status = 0; + execution_id = 0; if (OB_UNLIKELY(task_id <= 0 || tenant_id <= 0)) { ret = OB_NOT_INIT; LOG_WARN("not init", K(ret), K(tenant_id), K(task_id)); } else { SMART_VAR(ObMySQLProxy::MySQLResult, res) { sqlclient::ObMySQLResult *result = NULL; - if (OB_FAIL(sql_string.assign_fmt("SELECT status FROM %s WHERE task_id = %lu FOR UPDATE", + if (OB_FAIL(sql_string.assign_fmt("SELECT status, execution_id FROM %s WHERE task_id = %lu FOR UPDATE", OB_ALL_DDL_TASK_STATUS_TNAME, task_id))) { LOG_WARN("assign sql string failed", K(ret), K(task_id), K(tenant_id)); } else if (OB_FAIL(trans.read(res, tenant_id, sql_string.ptr()))) { @@ -1872,6 +1957,7 @@ int ObDDLTaskRecordOperator::select_for_update( LOG_WARN("fail to get next row", K(ret)); } else { EXTRACT_INT_FIELD_MYSQL(*result, "status", task_status, int64_t); + EXTRACT_INT_FIELD_MYSQL(*result, "execution_id", execution_id, int64_t); } } } diff --git a/src/rootserver/ddl_task/ob_ddl_task.h b/src/rootserver/ddl_task/ob_ddl_task.h index c210faa131..601d6a26a7 100644 --- a/src/rootserver/ddl_task/ob_ddl_task.h +++ b/src/rootserver/ddl_task/ob_ddl_task.h @@ -52,7 +52,7 @@ public: bool is_valid() const; void reset(); TO_STRING_KV(K_(task_id), K_(parent_task_id), K_(ddl_type), K_(trace_id), K_(task_status), K_(tenant_id), K_(object_id), - K_(schema_version), K_(target_object_id), K_(snapshot_version), K_(message), K_(task_version), K_(ret_code)); + K_(schema_version), K_(target_object_id), K_(snapshot_version), K_(message), K_(task_version), K_(ret_code), K_(execution_id)); public: static const int64_t MAX_MESSAGE_LENGTH = 4096; typedef common::ObFixedLengthString TaskMessage; @@ -70,6 +70,7 @@ public: ObString message_; int64_t task_version_; int64_t ret_code_; + int64_t execution_id_; ObString ddl_stmt_str_; }; @@ -126,6 +127,12 @@ public: const int64_t task_id, const int64_t ret_code); + static int update_execution_id( + common::ObISQLClient &sql_client, + const uint64_t tenant_id, + const int64_t task_id, + const int64_t execution_id); + static int update_message( common::ObISQLClient &proxy, const uint64_t tenant_id, @@ -141,7 +148,8 @@ public: common::ObMySQLTransaction &trans, const uint64_t tenant_id, const int64_t task_id, - int64_t &task_status); + int64_t &task_status, + int64_t &execution_id); static int get_all_record( common::ObMySQLProxy &proxy, @@ -255,7 +263,7 @@ public: target_object_id_(0), task_status_(share::ObDDLTaskStatus::PREPARE), snapshot_version_(0), ret_code_(OB_SUCCESS), task_id_(0), parent_task_id_(0), parent_task_key_(), task_version_(0), parallelism_(0), allocator_(lib::ObLabel("DdlTask")), compat_mode_(lib::Worker::CompatMode::INVALID), err_code_occurence_cnt_(0), - delay_schedule_time_(0), next_schedule_ts_(0) + delay_schedule_time_(0), next_schedule_ts_(0), execution_id_(0) {} virtual ~ObDDLTask() {} virtual int process() = 0; @@ -279,6 +287,7 @@ public: ObDDLTaskKey get_task_key() const { return ObDDLTaskKey(target_object_id_, schema_version_); } int64_t get_parent_task_id() const { return parent_task_id_; } int64_t get_task_version() const { return task_version_; } + int64_t get_execution_id() const { return execution_id_; } int64_t get_parallelism() const { return parallelism_; } static int deep_copy_table_arg(common::ObIAllocator &allocator, const obrpc::ObDDLArg &source_arg, obrpc::ObDDLArg &dest_arg); static int fetch_new_task_id(ObMySQLProxy &sql_proxy, int64_t &new_task_id); @@ -303,6 +312,7 @@ public: const TraceId &get_sys_task_id() const { return sys_task_id_; } void calc_next_schedule_ts(int ret_code); bool need_schedule() { return next_schedule_ts_ <= ObTimeUtility::current_time(); } + int push_execution_id(); #ifdef ERRSIM int check_errsim_error(); #endif @@ -312,8 +322,9 @@ public: K(target_object_id_), K(task_status_), K(snapshot_version_), K_(ret_code), K_(task_id), K_(parent_task_id), K_(parent_task_key), K_(task_version), K_(parallelism), K_(ddl_stmt_str), K_(compat_mode), - K_(sys_task_id), K_(err_code_occurence_cnt), K_(next_schedule_ts), K_(delay_schedule_time)); + K_(sys_task_id), K_(err_code_occurence_cnt), K_(next_schedule_ts), K_(delay_schedule_time), K(execution_id_)); protected: + int check_is_latest_execution_id(const int64_t execution_id, bool &is_latest); virtual bool is_error_need_retry(const int ret_code) { return !share::ObIDDLTask::in_ddl_retry_black_list(ret_code) && (share::ObIDDLTask::in_ddl_retry_white_list(ret_code) @@ -345,6 +356,7 @@ protected: int64_t err_code_occurence_cnt_; // occurence count for all error return codes not in white list. int64_t delay_schedule_time_; int64_t next_schedule_ts_; + int64_t execution_id_; }; enum ColChecksumStat diff --git a/src/rootserver/ddl_task/ob_index_build_task.cpp b/src/rootserver/ddl_task/ob_index_build_task.cpp index f6d30dfac9..9774b807c5 100644 --- a/src/rootserver/ddl_task/ob_index_build_task.cpp +++ b/src/rootserver/ddl_task/ob_index_build_task.cpp @@ -162,8 +162,7 @@ ObIndexBuildTask::ObIndexBuildTask() : ObDDLTask(ObDDLType::DDL_CREATE_INDEX), index_table_id_(target_object_id_), is_unique_index_(false), is_global_index_(false), root_service_(nullptr), snapshot_held_(false), is_sstable_complete_task_submitted_(false), sstable_complete_request_time_(0), sstable_complete_ts_(0), - check_unique_snapshot_(0), complete_sstable_job_ret_code_(INT64_MAX), - redefinition_execution_id_(0), create_index_arg_() + check_unique_snapshot_(0), complete_sstable_job_ret_code_(INT64_MAX), create_index_arg_() { } @@ -343,6 +342,7 @@ int ObIndexBuildTask::init(const ObDDLTaskRecord &task_record) index_table_id_ = index_table_id; schema_version_ = schema_version; snapshot_version_ = task_record.snapshot_version_; + execution_id_ = task_record.execution_id_; task_status_ = static_cast(task_record.task_status_); if (ObDDLTaskStatus::VALIDATE_CHECKSUM == task_status_) { sstable_complete_ts_ = ObTimeUtility::current_time(); @@ -641,7 +641,7 @@ int ObIndexBuildTask::send_build_single_replica_request() target_object_id_, schema_version_, snapshot_version_, - redefinition_execution_id_, + execution_id_, trace_id_, parallelism_, root_service_); @@ -706,8 +706,9 @@ int ObIndexBuildTask::wait_data_complement() // submit a job to complete sstable for the index table on snapshot_version if (OB_SUCC(ret) && !state_finished && !is_sstable_complete_task_submitted_) { - redefinition_execution_id_ = ObTimeUtility::current_time(); - if (OB_FAIL(send_build_single_replica_request())) { + if (OB_FAIL(push_execution_id())) { + LOG_WARN("failed to push execution id", K(ret)); + } else if (OB_FAIL(send_build_single_replica_request())) { LOG_WARN("fail to send build single replica request", K(ret)); } } @@ -725,7 +726,7 @@ int ObIndexBuildTask::wait_data_complement() if (OB_SUCC(ret) && state_finished) { bool dummy_equal = false; if (OB_FAIL(ObDDLChecksumOperator::check_column_checksum( - tenant_id_, redefinition_execution_id_, object_id_, index_table_id_, task_id_, dummy_equal, root_service_->get_sql_proxy()))) { + tenant_id_, execution_id_, object_id_, index_table_id_, task_id_, dummy_equal, root_service_->get_sql_proxy()))) { if (OB_ITER_END != ret) { LOG_WARN("fail to check column checksum", K(ret), K(index_table_id_), K(object_id_), K(task_id_)); state_finished = true; @@ -866,7 +867,7 @@ int ObIndexBuildTask::verify_checksum() bool is_column_checksum_ready = false; bool dummy_equal = false; if (!wait_column_checksum_ctx_.is_inited() && OB_FAIL(wait_column_checksum_ctx_.init( - task_id_, tenant_id_, object_id_, index_table_id_, schema_version_, check_unique_snapshot_, redefinition_execution_id_, checksum_wait_timeout))) { + task_id_, tenant_id_, object_id_, index_table_id_, schema_version_, check_unique_snapshot_, execution_id_, checksum_wait_timeout))) { LOG_WARN("init context of wait column checksum failed", K(ret), K(object_id_), K(index_table_id_)); } else { if (OB_FAIL(wait_column_checksum_ctx_.try_wait(is_column_checksum_ready))) { @@ -879,7 +880,7 @@ int ObIndexBuildTask::verify_checksum() // do nothing } else { if (OB_FAIL(ObDDLChecksumOperator::check_column_checksum( - tenant_id_, redefinition_execution_id_, object_id_, index_table_id_, task_id_, dummy_equal, root_service_->get_sql_proxy()))) { + tenant_id_, execution_id_, object_id_, index_table_id_, task_id_, dummy_equal, root_service_->get_sql_proxy()))) { if (OB_CHECKSUM_ERROR == ret && is_unique_index_) { ret = OB_ERR_DUPLICATED_UNIQUE_KEY; } @@ -901,6 +902,7 @@ int ObIndexBuildTask::update_column_checksum_calc_status( const int ret_code) { int ret = OB_SUCCESS; + bool is_latest_execution_id = false; if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; LOG_WARN("not init", K(ret)); @@ -925,6 +927,7 @@ int ObIndexBuildTask::update_complete_sstable_job_status( const int ret_code) { int ret = OB_SUCCESS; + bool is_latest_execution_id = false; if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; LOG_WARN("not init", K(ret)); @@ -937,8 +940,10 @@ int ObIndexBuildTask::update_complete_sstable_job_status( } else if (snapshot_version != snapshot_version_) { ret = OB_ERR_UNEXPECTED; LOG_WARN("snapshot version not match", K(ret), K(snapshot_version), K(snapshot_version_)); - } else if (execution_id != redefinition_execution_id_) { - LOG_INFO("receive a mismatch execution result, ignore", K(execution_id), K(redefinition_execution_id_)); + } else if (OB_FAIL(check_is_latest_execution_id(execution_id, is_latest_execution_id))) { + LOG_WARN("failed to check latest execution id", K(ret), K(execution_id)); + } else if (!is_latest_execution_id) { + LOG_INFO("receive a mismatch execution result, ignore", K(execution_id), K(execution_id_)); } else { complete_sstable_job_ret_code_ = ret_code; sstable_complete_ts_ = ObTimeUtility::current_time(); diff --git a/src/rootserver/ddl_task/ob_table_redefinition_task.cpp b/src/rootserver/ddl_task/ob_table_redefinition_task.cpp index f018f5d6f2..abe7fda827 100644 --- a/src/rootserver/ddl_task/ob_table_redefinition_task.cpp +++ b/src/rootserver/ddl_task/ob_table_redefinition_task.cpp @@ -97,6 +97,7 @@ int ObTableRedefinitionTask::init(const ObDDLTaskRecord &task_record) schema_version_ = schema_version; task_status_ = static_cast(task_record.task_status_); snapshot_version_ = task_record.snapshot_version_; + execution_id_ = task_record.execution_id_; tenant_id_ = task_record.tenant_id_; ret_code_ = task_record.ret_code_; alter_table_arg_.exec_tenant_id_ = tenant_id_; @@ -113,6 +114,7 @@ int ObTableRedefinitionTask::update_complete_sstable_job_status(const common::Ob int ret = OB_SUCCESS; TCWLockGuard guard(lock_); UNUSED(tablet_id); + bool is_latest_execution_id = false; if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; LOG_WARN("ObTableRedefinitionTask has not been inited", K(ret)); @@ -121,8 +123,10 @@ int ObTableRedefinitionTask::update_complete_sstable_job_status(const common::Ob } else if (OB_UNLIKELY(snapshot_version_ != snapshot_version)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("error unexpected, snapshot version is not equal", K(ret), K(snapshot_version_), K(snapshot_version)); - } else if (execution_id != redefinition_execution_id_) { - LOG_INFO("receive a mismatch execution result, ignore", K(execution_id), K(redefinition_execution_id_)); + } else if (OB_FAIL(check_is_latest_execution_id(execution_id, is_latest_execution_id))) { + LOG_WARN("failed to check latest execution id", K(ret), K(execution_id)); + } else if (!is_latest_execution_id) { + LOG_INFO("receive a mismatch execution result, ignore", K(execution_id), K(execution_id_)); } else { complete_sstable_job_ret_code_ = ret_code; LOG_INFO("table redefinition task callback", K(complete_sstable_job_ret_code_)); @@ -157,7 +161,7 @@ int ObTableRedefinitionTask::send_build_replica_request() target_object_id_, schema_version_, snapshot_version_, - redefinition_execution_id_, + execution_id_, sql_mode, trace_id_, parallelism_, @@ -250,8 +254,9 @@ int ObTableRedefinitionTask::table_redefinition(const ObDDLTaskStatus next_task_ } if (OB_SUCC(ret) && !is_build_replica_end && 0 == build_replica_request_time_) { - redefinition_execution_id_ = ObTimeUtility::current_time(); - if (OB_FAIL(send_build_replica_request())) { + if (OB_FAIL(push_execution_id())) { + LOG_WARN("failed to push execution id", K(ret)); + } else if (OB_FAIL(send_build_replica_request())) { LOG_WARN("fail to send build replica request", K(ret)); } else { build_replica_request_time_ = ObTimeUtility::current_time(); @@ -273,7 +278,7 @@ int ObTableRedefinitionTask::table_redefinition(const ObDDLTaskStatus next_task_ if (is_build_replica_end) { ret = complete_sstable_job_ret_code_; if (OB_SUCC(ret)) { - if (OB_FAIL(check_data_dest_tables_columns_checksum(redefinition_execution_id_))) { + if (OB_FAIL(check_data_dest_tables_columns_checksum(execution_id_))) { LOG_WARN("fail to check the columns checksum of data table and destination table", K(ret)); } } diff --git a/src/storage/ddl/ob_complement_data_task.cpp b/src/storage/ddl/ob_complement_data_task.cpp index 1e6685b8fc..8051b8c9e0 100644 --- a/src/storage/ddl/ob_complement_data_task.cpp +++ b/src/storage/ddl/ob_complement_data_task.cpp @@ -328,7 +328,7 @@ int ObComplementDataContext::write_start_log(const ObComplementDataParam ¶m) } else if (OB_UNLIKELY(!hidden_table_key.is_valid())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid table key", K(ret), K(hidden_table_key)); - } else if (OB_FAIL(data_sstable_redo_writer_.start_ddl_redo(hidden_table_key, ddl_kv_mgr_handle_))) { + } else if (OB_FAIL(data_sstable_redo_writer_.start_ddl_redo(hidden_table_key, param.execution_id_, ddl_kv_mgr_handle_))) { LOG_WARN("fail write start log", K(ret), K(hidden_table_key), K(param)); } else { LOG_INFO("complement task start ddl redo success", K(hidden_table_key)); @@ -1213,11 +1213,14 @@ int ObComplementMergeTask::add_build_hidden_table_sstable() if (OB_FAIL(ret)) { } else if (OB_FAIL(context_->data_sstable_redo_writer_.write_prepare_log(hidden_table_key, param_->hidden_table_schema_->get_table_id(), - 1/*execution_id*/, + param_->execution_id_, param_->task_id_, prepare_log_ts))) { - saved_ret = ret; - LOG_WARN("fail write ddl prepare log", K(ret), K(hidden_table_key), K(prepare_log_ts), KPC(param_)); + if (OB_TASK_EXPIRED == ret) { + LOG_INFO("ddl task expired", K(ret), K(hidden_table_key), KPC(param_)); + } else { + LOG_WARN("fail write ddl prepare log", K(ret), K(hidden_table_key)); + } } else { ObTabletHandle new_tablet_handle; // no use here ObDDLKvMgrHandle ddl_kv_mgr_handle; @@ -1229,7 +1232,6 @@ int ObComplementMergeTask::add_build_hidden_table_sstable() } else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->ddl_prepare(ddl_start_log_ts, prepare_log_ts, param_->hidden_table_schema_->get_table_id(), - 1/*execution_id*/, param_->task_id_))) { saved_ret = ret; LOG_WARN("commit ddl log failed", K(ret), K(ls_id), K(tablet_id), K(prepare_log_ts), K(hidden_table_key), diff --git a/src/storage/ddl/ob_ddl_clog.cpp b/src/storage/ddl/ob_ddl_clog.cpp index 9e049525f7..f50e698dd1 100644 --- a/src/storage/ddl/ob_ddl_clog.cpp +++ b/src/storage/ddl/ob_ddl_clog.cpp @@ -196,24 +196,25 @@ DEFINE_GET_SERIALIZE_SIZE(ObDDLClogHeader) } ObDDLStartLog::ObDDLStartLog() - : table_key_(), cluster_version_(0) + : table_key_(), cluster_version_(0), execution_id_(0) { } -int ObDDLStartLog::init(const ObITable::TableKey &table_key, const int64_t cluster_version) +int ObDDLStartLog::init(const ObITable::TableKey &table_key, const int64_t cluster_version, const int64_t execution_id) { int ret = OB_SUCCESS; - if (OB_UNLIKELY(!table_key.is_valid() || cluster_version_ < 0)) { + if (OB_UNLIKELY(!table_key.is_valid() || execution_id < 0 || cluster_version_ < 0)) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid argument", K(ret), K(table_key), K(cluster_version)); + LOG_WARN("invalid argument", K(ret), K(table_key), K(execution_id), K(cluster_version)); } else { table_key_ = table_key; cluster_version_ = cluster_version; + execution_id_ = execution_id; } return ret; } -OB_SERIALIZE_MEMBER(ObDDLStartLog, table_key_, cluster_version_); +OB_SERIALIZE_MEMBER(ObDDLStartLog, table_key_, cluster_version_, execution_id_); ObDDLRedoLog::ObDDLRedoLog() : redo_info_() diff --git a/src/storage/ddl/ob_ddl_clog.h b/src/storage/ddl/ob_ddl_clog.h index 28924ef189..63bf747b02 100644 --- a/src/storage/ddl/ob_ddl_clog.h +++ b/src/storage/ddl/ob_ddl_clog.h @@ -120,14 +120,16 @@ class ObDDLStartLog final public: ObDDLStartLog(); ~ObDDLStartLog() = default; - int init(const ObITable::TableKey &table_key, const int64_t cluster_version); - bool is_valid() const { return table_key_.is_valid() && cluster_version_ >= 0; } + int init(const ObITable::TableKey &table_key, const int64_t cluster_version, const int64_t execution_id); + bool is_valid() const { return table_key_.is_valid() && cluster_version_ >= 0 && execution_id_ >= 0; } ObITable::TableKey get_table_key() const { return table_key_; } int64_t get_cluster_version() const { return cluster_version_; } - TO_STRING_KV(K_(table_key), K_(cluster_version)); + int64_t get_execution_id() const { return execution_id_; } + TO_STRING_KV(K_(table_key), K_(cluster_version), K_(execution_id)); private: ObITable::TableKey table_key_; int64_t cluster_version_; // used for compatibility + int64_t execution_id_; }; class ObDDLRedoLog final diff --git a/src/storage/ddl/ob_ddl_redo_log_replayer.cpp b/src/storage/ddl/ob_ddl_redo_log_replayer.cpp index 892b85deec..442d3ad856 100644 --- a/src/storage/ddl/ob_ddl_redo_log_replayer.cpp +++ b/src/storage/ddl/ob_ddl_redo_log_replayer.cpp @@ -75,7 +75,11 @@ int ObDDLRedoLogReplayer::replay_start(const ObDDLStartLog &log, const int64_t l LOG_WARN("need replay but tablet handle is invalid", K(ret), K(need_replay), K(tablet_handle)); } else if (OB_FAIL(tablet_handle.get_obj()->get_ddl_kv_mgr(ddl_kv_mgr_handle, true/*try_create*/))) { LOG_WARN("create ddl kv mgr failed", K(ret), K(table_key)); - } else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->ddl_start(table_key, log_ts, log.get_cluster_version()))) { + } else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->ddl_start(table_key, + log_ts, + log.get_cluster_version(), + log.get_execution_id(), + 0/*checkpoint_log_ts*/))) { LOG_WARN("start ddl log failed", K(ret), K(log), K(log_ts)); } else { LOG_INFO("succeed to replay ddl start log", K(ret), K(log)); diff --git a/src/storage/ddl/ob_ddl_redo_log_writer.cpp b/src/storage/ddl/ob_ddl_redo_log_writer.cpp index 7df9124551..c45871fc47 100644 --- a/src/storage/ddl/ob_ddl_redo_log_writer.cpp +++ b/src/storage/ddl/ob_ddl_redo_log_writer.cpp @@ -518,7 +518,7 @@ void ObDDLCtrlSpeedHandle::RefreshSpeedHandleTask::runTimerTask() } } -ObDDLRedoLogWriter::ObDDLRedoLogWriter() +ObDDLRedoLogWriter::ObDDLRedoLogWriter() : is_inited_(false), bucket_lock_() { } @@ -532,6 +532,19 @@ ObDDLRedoLogWriter &ObDDLRedoLogWriter::get_instance() return instance; } +int ObDDLRedoLogWriter::init() +{ + int ret = OB_SUCCESS; + const int64_t bucket_num = 10243L; + if (is_inited_) { + } else if (OB_FAIL(bucket_lock_.init(bucket_num))) { + LOG_WARN("init bucket lock failed", K(ret), K(bucket_num)); + } else { + is_inited_ = true; + } + return ret; +} + int ObDDLRedoLogWriter::write( const ObDDLRedoLog &log, const uint64_t tenant_id, @@ -571,12 +584,12 @@ int ObDDLRedoLogWriter::write( } else if (OB_FAIL(log.serialize(buffer, buffer_size, pos))) { LOG_WARN("fail to seriaize ddl redo log", K(ret)); } else if (OB_FAIL(log_handler->append(buffer, - buffer_size, - base_log_ts, - need_nonblock, - cb, - lsn, - log_ts))) { + buffer_size, + base_log_ts, + need_nonblock, + cb, + lsn, + log_ts))) { LOG_WARN("fail to submit ddl redo log", K(ret), K(buffer), K(buffer_size)); } else { handle.cb_ = cb; @@ -599,7 +612,10 @@ int ObDDLRedoLogWriter::write( return ret; } -int ObDDLRedoLogWriter::write_ddl_start_log(const ObDDLStartLog &log, ObLogHandler *log_handler, int64_t &start_log_ts) +int ObDDLRedoLogWriter::write_ddl_start_log(ObDDLKvMgrHandle &ddl_kv_mgr_handle, + const ObDDLStartLog &log, + ObLogHandler *log_handler, + int64_t &start_log_ts) { int ret = OB_SUCCESS; start_log_ts = 0; @@ -619,7 +635,11 @@ int ObDDLRedoLogWriter::write_ddl_start_log(const ObDDLStartLog &log, ObLogHandl int64_t base_log_ts = 0; int64_t log_ts = 0; bool is_external_consistent = false; - if (OB_ISNULL(cb = op_alloc(ObDDLClogCb))) { + ObBucketHashWLockGuard guard(bucket_lock_, log.get_table_key().get_tablet_id().hash()); + if (ddl_kv_mgr_handle.get_obj()->is_execution_id_older(log.get_execution_id())) { + ret = OB_TASK_EXPIRED; + LOG_INFO("receive a old execution id, don't do ddl start", K(ret), K(log)); + } else if (OB_ISNULL(cb = op_alloc(ObDDLClogCb))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("fail to alloc memory", K(ret)); } else if (OB_FAIL(base_header.serialize(buffer, buffer_size, pos))) { @@ -629,12 +649,12 @@ int ObDDLRedoLogWriter::write_ddl_start_log(const ObDDLStartLog &log, ObLogHandl } else if (OB_FAIL(log.serialize(buffer, buffer_size, pos))) { LOG_WARN("fail to seriaize ddl start log", K(ret)); } else if (OB_FAIL(log_handler->append(buffer, - buffer_size, - base_log_ts, - need_nonblock, - cb, - lsn, - log_ts))) { + buffer_size, + base_log_ts, + need_nonblock, + cb, + lsn, + log_ts))) { LOG_WARN("fail to submit ddl start log", K(ret), K(buffer_size)); } else { ObDDLClogCb *tmp_cb = cb; @@ -659,6 +679,13 @@ int ObDDLRedoLogWriter::write_ddl_start_log(const ObDDLStartLog &log, ObLogHandl } if (OB_SUCC(ret)) { start_log_ts = log_ts; + if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->ddl_start(log.get_table_key(), + start_log_ts, + log.get_cluster_version(), + log.get_execution_id(), + 0/*checkpoint_log_ts*/))) { + LOG_WARN("start ddl log failed", K(ret), K(start_log_ts), K(log)); + } } tmp_cb->try_release(); // release the memory no matter succ or not } @@ -707,12 +734,12 @@ int ObDDLRedoLogWriter::write_ddl_finish_log(const T &log, const ObDDLClogType c } else if (OB_FAIL(OB_TS_MGR.get_ts_sync(MTL_ID(), ObDDLRedoLogHandle::DDL_REDO_LOG_TIMEOUT, base_log_ts, is_external_consistent))) { LOG_WARN("fail to get gts sync", K(ret), K(log)); } else if (OB_FAIL(log_handler->append(buffer, - buffer_size, - base_log_ts, - need_nonblock, - cb, - lsn, - log_ts))) { + buffer_size, + base_log_ts, + need_nonblock, + cb, + lsn, + log_ts))) { LOG_WARN("fail to submit ddl commit log", K(ret), K(buffer), K(buffer_size)); } else { ObDDLClogCb *tmp_cb = cb; @@ -910,7 +937,9 @@ int ObDDLSSTableRedoWriter::init(const ObLSID &ls_id, const ObTabletID &tablet_i return ret; } -int ObDDLSSTableRedoWriter::start_ddl_redo(const ObITable::TableKey &table_key, ObDDLKvMgrHandle &ddl_kv_mgr_handle) +int ObDDLSSTableRedoWriter::start_ddl_redo(const ObITable::TableKey &table_key, + const int64_t execution_id, + ObDDLKvMgrHandle &ddl_kv_mgr_handle) { int ret = OB_SUCCESS; ObLSHandle ls_handle; @@ -922,11 +951,11 @@ int ObDDLSSTableRedoWriter::start_ddl_redo(const ObITable::TableKey &table_key, if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; LOG_WARN("ObDDLSSTableRedoWriter has not been inited", K(ret)); - } else if (OB_UNLIKELY(!table_key.is_valid())) { + } else if (OB_UNLIKELY(!table_key.is_valid() || execution_id <= 0)) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid arguments", K(ret), K(table_key)); - } else if (OB_FAIL(log.init(table_key, GET_MIN_CLUSTER_VERSION()))) { - LOG_WARN("fail to init DDLStartLog", K(ret), K(table_key), "cluster_version", GET_MIN_CLUSTER_VERSION()); + LOG_WARN("invalid arguments", K(ret), K(table_key), K(execution_id)); + } else if (OB_FAIL(log.init(table_key, GET_MIN_CLUSTER_VERSION(), execution_id))) { + LOG_WARN("fail to init DDLStartLog", K(ret), K(table_key), K(execution_id), "cluster_version", GET_MIN_CLUSTER_VERSION()); } else if (OB_FAIL(MTL(ObLSService *)->get_ls(ls_id_, ls_handle, ObLSGetMod::DDL_MOD))) { LOG_WARN("get ls failed", K(ret), K(ls_id_)); } else if (OB_ISNULL(ls = ls_handle.get_ls())) { @@ -934,13 +963,11 @@ int ObDDLSSTableRedoWriter::start_ddl_redo(const ObITable::TableKey &table_key, LOG_ERROR("ls should not be null", K(ret), K(table_key)); } else if (OB_FAIL(ls->get_tablet(tablet_id_, tablet_handle, ObTabletCommon::NO_CHECK_GET_TABLET_TIMEOUT_US))) { LOG_WARN("get tablet handle failed", K(ret), K(ls_id_), K(tablet_id_)); - } else if (OB_FAIL(ObDDLRedoLogWriter::get_instance().write_ddl_start_log(log, ls->get_log_handler(), tmp_log_ts))) { - LOG_WARN("fail to write ddl start log", K(ret), K(table_key)); - } else if (FALSE_IT(set_start_log_ts(tmp_log_ts))) { } else if (OB_FAIL(tablet_handle.get_obj()->get_ddl_kv_mgr(ddl_kv_mgr_handle, true/*try_create*/))) { LOG_WARN("create ddl kv mgr failed", K(ret)); - } else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->ddl_start(table_key, get_start_log_ts(), log.get_cluster_version()))) { - LOG_WARN("start ddl log failed", K(ret), K(table_key), K(start_log_ts_), K(log)); + } else if (OB_FAIL(ObDDLRedoLogWriter::get_instance().write_ddl_start_log(ddl_kv_mgr_handle, log, ls->get_log_handler(), tmp_log_ts))) { + LOG_WARN("fail to write ddl start log", K(ret), K(table_key)); + } else if (FALSE_IT(set_start_log_ts(tmp_log_ts))) { } return ret; } @@ -1067,9 +1094,7 @@ int ObDDLSSTableRedoWriter::write_prepare_log(const ObITable::TableKey &table_ke ret = OB_ERR_SYS; LOG_WARN("srv rpc proxy or location service is null", K(ret), KP(srv_rpc_proxy)); } else if (OB_FAIL(srv_rpc_proxy->to(leader_addr_).remote_write_ddl_prepare_log(arg, log_ts))) { - if (OB_TASK_EXPIRED != ret) { - LOG_WARN("fail to remote write ddl redo log", K(ret), K(arg)); - } + LOG_WARN("fail to remote write ddl redo log", K(ret), K(arg)); } else { prepare_log_ts = log_ts; } @@ -1121,11 +1146,7 @@ int ObDDLSSTableRedoWriter::write_commit_log(const ObITable::TableKey &table_key ret = OB_ERR_SYS; LOG_WARN("srv rpc proxy or location service is null", K(ret), KP(srv_rpc_proxy)); } else if (OB_FAIL(srv_rpc_proxy->to(leader_addr_).remote_write_ddl_commit_log(arg, log_ts))) { - if (OB_TASK_EXPIRED == ret) { - ret = OB_SUCCESS; - } else { - LOG_WARN("fail to remote write ddl redo log", K(ret), K(arg)); - } + LOG_WARN("fail to remote write ddl redo log", K(ret), K(arg)); } } return ret; diff --git a/src/storage/ddl/ob_ddl_redo_log_writer.h b/src/storage/ddl/ob_ddl_redo_log_writer.h index beda3a032e..f1781e0eb1 100644 --- a/src/storage/ddl/ob_ddl_redo_log_writer.h +++ b/src/storage/ddl/ob_ddl_redo_log_writer.h @@ -184,6 +184,7 @@ class ObDDLRedoLogWriter final { public: static ObDDLRedoLogWriter &get_instance(); + int init(); int write(const ObDDLRedoLog &log, const uint64_t tenant_id, const share::ObLSID &ls_id, @@ -191,7 +192,10 @@ public: const blocksstable::MacroBlockId ¯o_block_id, char *buffer, ObDDLRedoLogHandle &handle); - int write_ddl_start_log(const ObDDLStartLog &log, logservice::ObLogHandler *log_handler, int64_t &start_log_ts); + int write_ddl_start_log(ObDDLKvMgrHandle &ddl_kv_mgr_handle, + const ObDDLStartLog &log, + logservice::ObLogHandler *log_handler, + int64_t &start_log_ts); template int write_ddl_finish_log(const T &log, const ObDDLClogType clog_type, @@ -208,6 +212,9 @@ private: public: }; // TODO: traffic control +private: + bool is_inited_; + common::ObBucketLock bucket_lock_; }; @@ -235,7 +242,9 @@ public: ObDDLSSTableRedoWriter(); ~ObDDLSSTableRedoWriter(); int init(const share::ObLSID &ls_id, const ObTabletID &tablet_id); - int start_ddl_redo(const ObITable::TableKey &table_key, ObDDLKvMgrHandle &ddl_kv_mgr_handle); + int start_ddl_redo(const ObITable::TableKey &table_key, + const int64_t execution_id, + ObDDLKvMgrHandle &ddl_kv_mgr_handle); int write_redo_log(const blocksstable::ObDDLMacroBlockRedoInfo &redo_info, const blocksstable::MacroBlockId ¯o_block_id); int wait_redo_log_finish(const blocksstable::ObDDLMacroBlockRedoInfo &redo_info, diff --git a/src/storage/ddl/ob_direct_insert_sstable_ctx.cpp b/src/storage/ddl/ob_direct_insert_sstable_ctx.cpp index f887eba047..56b982e32d 100644 --- a/src/storage/ddl/ob_direct_insert_sstable_ctx.cpp +++ b/src/storage/ddl/ob_direct_insert_sstable_ctx.cpp @@ -261,7 +261,7 @@ int ObSSTableInsertTabletContext::update(const int64_t snapshot_version) LOG_WARN("invalid argument", K(ret), K(table_key)); } else if (data_sstable_redo_writer_.get_start_log_ts() > 0) { // ddl start log is already written, do nothing - } else if (OB_FAIL(data_sstable_redo_writer_.start_ddl_redo(table_key, ddl_kv_mgr_handle_))) { + } else if (OB_FAIL(data_sstable_redo_writer_.start_ddl_redo(table_key, build_param_.execution_id_, ddl_kv_mgr_handle_))) { LOG_WARN("fail write start log", K(ret), K(table_key), K(build_param_)); } } @@ -679,8 +679,7 @@ int ObSSTableInsertTabletContext::create_sstable_with_clog( build_param_.ddl_task_id_, prepare_log_ts))) { if (OB_TASK_EXPIRED == ret) { - LOG_INFO("ddl task expired, but return success", K(ret), K(table_key), K(build_param_)); - ret = OB_SUCCESS; + LOG_INFO("ddl task expired", K(ret), K(table_key), K(build_param_)); } else { LOG_WARN("fail write ddl prepare log", K(ret), K(table_key)); } @@ -699,20 +698,17 @@ int ObSSTableInsertTabletContext::create_sstable_with_clog( } else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->ddl_prepare(ddl_start_log_ts, prepare_log_ts, table_schema->get_table_id(), - build_param_.execution_id_, build_param_.ddl_task_id_))) { if (OB_TASK_EXPIRED == ret) { - LOG_INFO("ddl task expired, but return success", K(ret), K(ls_id), K(tablet_id), + LOG_INFO("ddl task expired", K(ret), K(ls_id), K(tablet_id), K(ddl_start_log_ts), "new_ddl_start_log_ts", ddl_kv_mgr_handle.get_obj()->get_start_log_ts()); - ret = OB_SUCCESS; } else { LOG_WARN("failed to do ddl kv prepare", K(ret), K(ddl_start_log_ts), K(prepare_log_ts), K(build_param_)); } } else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->wait_ddl_commit(ddl_start_log_ts, prepare_log_ts))) { if (OB_TASK_EXPIRED == ret) { - LOG_INFO("ddl task expired, but return success", K(ret), K(ls_id), K(tablet_id), + LOG_INFO("ddl task expired", K(ret), K(ls_id), K(tablet_id), K(ddl_start_log_ts), "new_ddl_start_log_ts", ddl_kv_mgr_handle.get_obj()->get_start_log_ts()); - ret = OB_SUCCESS; } else { LOG_WARN("failed to wait ddl kv commit", K(ret), K(ddl_start_log_ts), K(build_param_)); } diff --git a/src/storage/ddl/ob_tablet_ddl_kv_mgr.cpp b/src/storage/ddl/ob_tablet_ddl_kv_mgr.cpp index 3d61ab6c6b..6592310fd3 100644 --- a/src/storage/ddl/ob_tablet_ddl_kv_mgr.cpp +++ b/src/storage/ddl/ob_tablet_ddl_kv_mgr.cpp @@ -87,30 +87,33 @@ int ObTabletDDLKvMgr::init(const share::ObLSID &ls_id, const common::ObTabletID // ddl start from checkpoint // keep ddl sstable table -int ObTabletDDLKvMgr::ddl_start(const ObITable::TableKey &table_key, const int64_t start_log_ts, const int64_t cluster_version, const int64_t checkpoint_log_ts) +int ObTabletDDLKvMgr::ddl_start(const ObITable::TableKey &table_key, + const int64_t start_log_ts, + const int64_t cluster_version, + const int64_t execution_id, + const int64_t checkpoint_log_ts) { int ret = OB_SUCCESS; bool is_brand_new = false; + TCWLockGuard guard(lock_); if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; LOG_WARN("not init", K(ret), K(is_inited_)); - } else if (OB_UNLIKELY(!table_key.is_valid() || start_log_ts <= 0 || cluster_version < 0 + } else if (OB_UNLIKELY(!table_key.is_valid() || start_log_ts <= 0 || execution_id < 0 || cluster_version < 0 || (checkpoint_log_ts > 0 && checkpoint_log_ts < start_log_ts))) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid argument", K(ret), K(table_key), K(start_log_ts), K(cluster_version), K(checkpoint_log_ts)); + LOG_WARN("invalid argument", K(ret), K(table_key), K(start_log_ts), K(execution_id), K(cluster_version), K(checkpoint_log_ts)); } else if (table_key.get_tablet_id() != tablet_id_) { ret = OB_ERR_SYS; LOG_WARN("tablet id not same", K(ret), K(table_key), K(tablet_id_)); } else if (0 != start_log_ts_) { - if (start_log_ts > start_log_ts_) { - LOG_INFO("start log ts changed, need cleanup", K(ls_id_), K(tablet_id_), K(start_log_ts_), K(start_log_ts)); - if (OB_FAIL(cleanup())) { - LOG_WARN("clean up start log failed", K(ret)); - } else { - is_brand_new = true; - } + if (execution_id >= execution_id_ && start_log_ts > start_log_ts_) { + LOG_INFO("execution id changed, need cleanup", K(ls_id_), K(tablet_id_), K(execution_id_), K(execution_id), K(start_log_ts_), K(start_log_ts)); + cleanup_unlock(); + is_brand_new = true; } else { - LOG_INFO("ddl start ignored", K(ls_id_), K(tablet_id_), K(start_log_ts_), K(start_log_ts)); + ret = OB_TASK_EXPIRED; + LOG_INFO("ddl start ignored", K(ls_id_), K(tablet_id_), K(execution_id_), K(execution_id), K(start_log_ts_), K(start_log_ts)); } } else { is_brand_new = true; @@ -118,6 +121,7 @@ int ObTabletDDLKvMgr::ddl_start(const ObITable::TableKey &table_key, const int64 if (OB_SUCC(ret) && is_brand_new) { table_key_ = table_key; cluster_version_ = cluster_version; + execution_id_ = execution_id; start_log_ts_ = start_log_ts; max_freeze_log_ts_ = max(start_log_ts, checkpoint_log_ts); } @@ -127,14 +131,13 @@ int ObTabletDDLKvMgr::ddl_start(const ObITable::TableKey &table_key, const int64 LOG_WARN("clean up ddl sstable failed", K(ret), K(ls_id_), K(tablet_id_)); } } - FLOG_INFO("start ddl kv mgr finished", K(ret), K(is_brand_new), K(start_log_ts), K(checkpoint_log_ts), K(*this)); + FLOG_INFO("start ddl kv mgr finished", K(ret), K(is_brand_new), K(start_log_ts), K(execution_id), K(checkpoint_log_ts), K(*this)); return ret; } int ObTabletDDLKvMgr::ddl_prepare(const int64_t start_log_ts, const int64_t prepare_log_ts, const uint64_t table_id, - const int64_t execution_id, const int64_t ddl_task_id) { int ret = OB_SUCCESS; @@ -152,7 +155,6 @@ int ObTabletDDLKvMgr::ddl_prepare(const int64_t start_log_ts, LOG_WARN("freeze ddl kv failed", K(ret), K(prepare_log_ts)); } else { table_id_ = table_id; - execution_id_ = execution_id; ddl_task_id_ = ddl_task_id; ObDDLTableMergeDagParam param; @@ -291,24 +293,35 @@ int ObTabletDDLKvMgr::cleanup() LOG_WARN("not init", K(ret)); } else { TCWLockGuard guard(lock_); - for (int64_t pos = head_; pos < tail_; ++pos) { - const int64_t idx = get_idx(pos); - free_ddl_kv(idx); - } - head_ = 0; - tail_ = 0; - MEMSET(ddl_kvs_, 0, sizeof(ddl_kvs_)); - table_key_.reset(); - cluster_version_ = 0; - start_log_ts_ = 0; - max_freeze_log_ts_ = 0; - table_id_ = 0; - execution_id_ = 0; - is_commit_success_ = false; + cleanup_unlock(); } return ret; } +void ObTabletDDLKvMgr::cleanup_unlock() +{ + for (int64_t pos = head_; pos < tail_; ++pos) { + const int64_t idx = get_idx(pos); + free_ddl_kv(idx); + } + head_ = 0; + tail_ = 0; + MEMSET(ddl_kvs_, 0, sizeof(ddl_kvs_)); + table_key_.reset(); + cluster_version_ = 0; + start_log_ts_ = 0; + max_freeze_log_ts_ = 0; + table_id_ = 0; + execution_id_ = 0; + is_commit_success_ = false; +} + +bool ObTabletDDLKvMgr::is_execution_id_older(const int64_t execution_id) +{ + TCRLockGuard guard(lock_); + return execution_id < execution_id_; +} + int ObTabletDDLKvMgr::online() { int ret = OB_SUCCESS; @@ -333,9 +346,14 @@ int ObTabletDDLKvMgr::online() const int64_t start_log_ts = tablet_meta.ddl_start_log_ts_; if (OB_FAIL(ddl_start(table_key, start_log_ts, - GET_MIN_CLUSTER_VERSION(), + tablet_meta.ddl_cluster_version_, + tablet_meta.ddl_execution_id_, tablet_meta.ddl_checkpoint_ts_))) { - LOG_WARN("start ddl kv manager failed", K(ret), K(tablet_meta)); + if (OB_TASK_EXPIRED == ret) { + ret = OB_SUCCESS; + } else { + LOG_WARN("start ddl kv manager failed", K(ret), K(tablet_meta)); + } } } return ret; @@ -371,6 +389,8 @@ int ObTabletDDLKvMgr::update_tablet(const int64_t start_log_ts, const int64_t sn param.ddl_start_log_ts_ = start_log_ts; param.ddl_snapshot_version_ = snapshot_version; param.ddl_checkpoint_ts_ = ddl_checkpoint_ts; + param.ddl_execution_id_ = execution_id_; + param.ddl_cluster_version_ = cluster_version_; if (OB_FAIL(ls_handle.get_ls()->update_tablet_table_store(tablet_id_, param, new_tablet_handle))) { LOG_WARN("failed to update tablet table store", K(ret), K(ls_id_), K(tablet_id_), K(param)); } else { diff --git a/src/storage/ddl/ob_tablet_ddl_kv_mgr.h b/src/storage/ddl/ob_tablet_ddl_kv_mgr.h index 3621a39ba7..f1b8c2a3b1 100644 --- a/src/storage/ddl/ob_tablet_ddl_kv_mgr.h +++ b/src/storage/ddl/ob_tablet_ddl_kv_mgr.h @@ -36,8 +36,8 @@ public: ObTabletDDLKvMgr(); ~ObTabletDDLKvMgr(); int init(const share::ObLSID &ls_id, const common::ObTabletID &tablet_id); // init before memtable mgr - int ddl_start(const ObITable::TableKey &table_key, const int64_t start_log_ts, const int64_t cluster_version, const int64_t checkpoint_log_ts = 0); - int ddl_prepare(const int64_t start_log_ts, const int64_t commit_log_ts, const uint64_t table_id = 0, const int64_t execution_id = 0, const int64_t ddl_task_id = 0); // schedule build a major sstable + int ddl_start(const ObITable::TableKey &table_key, const int64_t start_log_ts, const int64_t cluster_version, const int64_t execution_id, const int64_t checkpoint_log_ts); + int ddl_prepare(const int64_t start_log_ts, const int64_t commit_log_ts, const uint64_t table_id = 0, const int64_t ddl_task_id = 0); // schedule build a major sstable int ddl_commit(const int64_t start_log_ts, const int64_t prepare_log_ts, const bool is_replay); // try wait build major sstable int wait_ddl_commit(const int64_t start_log_ts, const int64_t prepare_log_ts); int get_ddl_param(ObTabletDDLParam &ddl_param); @@ -55,6 +55,7 @@ public: common::ObTabletID get_tablet_id() const { return tablet_id_; } int cleanup(); int online(); + bool is_execution_id_older(const int64_t execution_id); OB_INLINE void inc_ref() { ATOMIC_INC(&ref_cnt_); } OB_INLINE int64_t dec_ref() { return ATOMIC_SAF(&ref_cnt_, 1 /* just sub 1 */); } OB_INLINE int64_t get_ref() const { return ATOMIC_LOAD(&ref_cnt_); } @@ -71,6 +72,7 @@ private: int get_active_ddl_kv_impl(ObDDLKVHandle &kv_handle); void try_get_ddl_kv_unlock(const int64_t log_ts, ObDDLKV *&kv); int update_tablet(const int64_t start_log_ts, const int64_t snapshot_version, const int64_t ddl_checkpoint_ts); + void cleanup_unlock(); void destroy(); private: static const int64_t MAX_DDL_KV_CNT_IN_STORAGE = 64; diff --git a/src/storage/ob_storage_struct.cpp b/src/storage/ob_storage_struct.cpp index 839d749d0d..4896fd97a0 100644 --- a/src/storage/ob_storage_struct.cpp +++ b/src/storage/ob_storage_struct.cpp @@ -222,6 +222,8 @@ ObUpdateTableStoreParam::ObUpdateTableStoreParam( ddl_checkpoint_ts_(0), ddl_start_log_ts_(0), ddl_snapshot_version_(0), + ddl_execution_id_(0), + ddl_cluster_version_(0), tx_data_(), binding_info_(), auto_inc_seq_() @@ -250,6 +252,8 @@ ObUpdateTableStoreParam::ObUpdateTableStoreParam( ddl_checkpoint_ts_(0), ddl_start_log_ts_(0), ddl_snapshot_version_(0), + ddl_execution_id_(0), + ddl_cluster_version_(0), tx_data_(), binding_info_(), auto_inc_seq_() @@ -277,6 +281,8 @@ ObUpdateTableStoreParam::ObUpdateTableStoreParam( ddl_checkpoint_ts_(0), ddl_start_log_ts_(0), ddl_snapshot_version_(0), + ddl_execution_id_(0), + ddl_cluster_version_(0), tx_data_(), binding_info_(), auto_inc_seq_() diff --git a/src/storage/ob_storage_struct.h b/src/storage/ob_storage_struct.h index 0289e6963e..9475bda530 100644 --- a/src/storage/ob_storage_struct.h +++ b/src/storage/ob_storage_struct.h @@ -319,7 +319,7 @@ struct ObUpdateTableStoreParam TO_STRING_KV(K_(table_handle), K_(snapshot_version), K_(clog_checkpoint_ts), K_(multi_version_start), K_(keep_old_ddl_sstable), K_(need_report), KPC_(storage_schema), K_(rebuild_seq), K_(update_with_major_flag), K_(need_check_sstable), K_(ddl_checkpoint_ts), K_(ddl_start_log_ts), K_(ddl_snapshot_version), - K_(tx_data), K_(binding_info), K_(auto_inc_seq)); + K_(ddl_execution_id), K_(ddl_cluster_version), K_(tx_data), K_(binding_info), K_(auto_inc_seq)); ObTableHandleV2 table_handle_; int64_t snapshot_version_; @@ -334,6 +334,8 @@ struct ObUpdateTableStoreParam int64_t ddl_checkpoint_ts_; int64_t ddl_start_log_ts_; int64_t ddl_snapshot_version_; + int64_t ddl_execution_id_; + int64_t ddl_cluster_version_; // msd ObTabletTxMultiSourceDataUnit tx_data_; diff --git a/src/storage/tablet/ob_tablet.cpp b/src/storage/tablet/ob_tablet.cpp index 945b292f9b..27efd5e87f 100644 --- a/src/storage/tablet/ob_tablet.cpp +++ b/src/storage/tablet/ob_tablet.cpp @@ -212,7 +212,8 @@ int ObTablet::init( } else if (OB_FAIL(tablet_meta_.init(*allocator_, old_tablet.tablet_meta_, param.snapshot_version_, param.multi_version_start_, tx_data, ddl_data, autoinc_seq, input_max_sync_schema_version, - param.clog_checkpoint_ts_, param.ddl_checkpoint_ts_, param.ddl_start_log_ts_, param.ddl_snapshot_version_))) { + param.clog_checkpoint_ts_, param.ddl_checkpoint_ts_, param.ddl_start_log_ts_, param.ddl_snapshot_version_, + param.ddl_execution_id_, param.ddl_cluster_version_))) { LOG_WARN("failed to init tablet meta", K(ret), K(old_tablet), K(param), K(tx_data), K(ddl_data), K(autoinc_seq), K(input_max_sync_schema_version)); } else if (OB_FAIL(table_store_.init(*allocator_, this, param, old_tablet.table_store_))) { @@ -1829,6 +1830,8 @@ int ObTablet::build_migration_tablet_param(ObMigrationTabletParam &mig_tablet_pa mig_tablet_param.ddl_start_log_ts_ = tablet_meta_.ddl_start_log_ts_; mig_tablet_param.ddl_snapshot_version_ = tablet_meta_.ddl_snapshot_version_; mig_tablet_param.max_sync_storage_schema_version_ = tablet_meta_.max_sync_storage_schema_version_; + mig_tablet_param.ddl_execution_id_ = tablet_meta_.ddl_execution_id_; + mig_tablet_param.ddl_cluster_version_ = tablet_meta_.ddl_cluster_version_; mig_tablet_param.report_status_.reset(); if (OB_FAIL(mig_tablet_param.storage_schema_.init(mig_tablet_param.allocator_, storage_schema_))) { @@ -2140,9 +2143,10 @@ int ObTablet::start_ddl_if_need() const int64_t start_log_ts = tablet_meta_.ddl_start_log_ts_; if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->ddl_start(table_key, start_log_ts, - GET_MIN_CLUSTER_VERSION(), + tablet_meta_.ddl_cluster_version_, + tablet_meta_.ddl_execution_id_, tablet_meta_.ddl_checkpoint_ts_))) { - LOG_WARN("start ddl kv manager failed", K(ret), K(table_key), K(start_log_ts)); + LOG_WARN("start ddl kv manager failed", K(ret), K(table_key), K(tablet_meta_)); } } return ret; diff --git a/src/storage/tablet/ob_tablet_meta.cpp b/src/storage/tablet/ob_tablet_meta.cpp index 32f288c7cb..ebf03e759a 100644 --- a/src/storage/tablet/ob_tablet_meta.cpp +++ b/src/storage/tablet/ob_tablet_meta.cpp @@ -54,6 +54,8 @@ ObTabletMeta::ObTabletMeta() ddl_start_log_ts_(OB_INVALID_TIMESTAMP), ddl_snapshot_version_(OB_INVALID_TIMESTAMP), max_sync_storage_schema_version_(0), + ddl_execution_id_(0), + ddl_cluster_version_(0), is_inited_(false) { } @@ -108,6 +110,8 @@ int ObTabletMeta::init( ddl_start_log_ts_ = 0; ddl_snapshot_version_ = 0; max_sync_storage_schema_version_ = max_sync_storage_schema_version; + ddl_execution_id_ = 0; + ddl_cluster_version_ = 0; report_status_.merge_snapshot_version_ = snapshot_version; report_status_.cur_report_version_ = snapshot_version; @@ -144,7 +148,9 @@ int ObTabletMeta::init( const int64_t clog_checkpoint_ts, const int64_t ddl_checkpoint_ts, const int64_t ddl_start_log_ts, - const int64_t ddl_snapshot_version) + const int64_t ddl_snapshot_version, + const int64_t ddl_execution_id, + const int64_t ddl_cluster_version) { int ret = OB_SUCCESS; @@ -180,6 +186,8 @@ int ObTabletMeta::init( ddl_checkpoint_ts_ = MAX(old_tablet_meta.ddl_checkpoint_ts_, ddl_checkpoint_ts); ddl_start_log_ts_ = MAX(old_tablet_meta.ddl_start_log_ts_, ddl_start_log_ts); ddl_snapshot_version_ = MAX(old_tablet_meta.ddl_snapshot_version_, ddl_snapshot_version); + ddl_execution_id_ = MAX(old_tablet_meta.ddl_execution_id_, ddl_execution_id); + ddl_cluster_version_ = MAX(old_tablet_meta.ddl_cluster_version_, ddl_cluster_version); is_inited_ = true; } @@ -226,6 +234,8 @@ int ObTabletMeta::init( ddl_start_log_ts_ = param.ddl_start_log_ts_; ddl_snapshot_version_ = param.ddl_snapshot_version_; max_sync_storage_schema_version_ = param.max_sync_storage_schema_version_; + ddl_execution_id_ = param.ddl_execution_id_; + ddl_cluster_version_ = param.ddl_cluster_version_; is_inited_ = true; } @@ -305,6 +315,8 @@ int ObTabletMeta::init( ddl_start_log_ts_ = old_tablet_meta.ddl_start_log_ts_; ddl_snapshot_version_ = old_tablet_meta.ddl_snapshot_version_; max_sync_storage_schema_version_ = max_sync_storage_schema_version; + ddl_execution_id_ = old_tablet_meta.ddl_checkpoint_ts_; + ddl_cluster_version_ = old_tablet_meta.ddl_cluster_version_; if (OB_SUCC(ret)) { is_inited_ = true; @@ -341,6 +353,8 @@ void ObTabletMeta::reset() ddl_start_log_ts_ = OB_INVALID_TIMESTAMP; ddl_snapshot_version_ = OB_INVALID_TIMESTAMP; max_sync_storage_schema_version_ = 0; + ddl_execution_id_ = 0; + ddl_cluster_version_ = 0; is_inited_ = false; } @@ -433,6 +447,10 @@ int ObTabletMeta::serialize(char *buf, const int64_t len, int64_t &pos) LOG_WARN("failed to serialize ddl snapshot version", K(ret), K(len), K(new_pos), K_(ddl_snapshot_version)); } else if (new_pos - pos < length_ && OB_FAIL(serialization::encode_i64(buf, len, new_pos, max_sync_storage_schema_version_))) { LOG_WARN("failed to serialize max_sync_storage_schema_version", K(ret), K(len), K(new_pos), K_(max_sync_storage_schema_version)); + } else if (new_pos - pos < length_ && OB_FAIL(serialization::encode_i64(buf, len, new_pos, ddl_execution_id_))) { + LOG_WARN("failed to serialize ddl execution id", K(ret), K(len), K(new_pos), K_(ddl_execution_id)); + } else if (new_pos - pos < length_ && OB_FAIL(serialization::encode_i64(buf, len, new_pos, ddl_cluster_version_))) { + LOG_WARN("failed to serialize ddl cluster version", K(ret), K(len), K(new_pos), K_(ddl_cluster_version)); } else if (OB_UNLIKELY(length_ != new_pos - pos)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("tablet meta's length doesn't match standard length", K(ret), K(new_pos), K(pos), K_(length)); @@ -512,6 +530,10 @@ int ObTabletMeta::deserialize( LOG_WARN("failed to deserialize ddl snapshot version", K(ret), K(len), K(new_pos)); } else if (new_pos - pos < length_ && OB_FAIL(serialization::decode_i64(buf, len, new_pos, &max_sync_storage_schema_version_))) { LOG_WARN("failed to deserialize max_sync_storage_schema_version_", K(ret), K(len), K(new_pos)); + } else if (new_pos - pos < length_ && OB_FAIL(serialization::decode_i64(buf, len, new_pos, &ddl_execution_id_))) { + LOG_WARN("failed to deserialize ddl execution id", K(ret), K(len), K(new_pos)); + } else if (new_pos - pos < length_ && OB_FAIL(serialization::decode_i64(buf, len, new_pos, &ddl_cluster_version_))) { + LOG_WARN("failed to deserialize ddl cluster version", K(ret), K(len), K(new_pos)); } else if (OB_UNLIKELY(length_ != new_pos - pos)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("tablet's length doesn't match standard length", K(ret), K(new_pos), K(pos), K_(length)); @@ -554,6 +576,8 @@ int64_t ObTabletMeta::get_serialize_size() const size += serialization::encoded_length_i64(ddl_start_log_ts_); size += serialization::encoded_length_i64(ddl_snapshot_version_); size += serialization::encoded_length_i64(max_sync_storage_schema_version_); + size += serialization::encoded_length_i64(ddl_execution_id_); + size += serialization::encoded_length_i64(ddl_cluster_version_); return size; } @@ -647,6 +671,8 @@ int ObTabletMeta::update(const ObMigrationTabletParam ¶m) ddl_start_log_ts_ = param.ddl_start_log_ts_; ddl_snapshot_version_ = param.ddl_snapshot_version_; max_sync_storage_schema_version_ = param.max_sync_storage_schema_version_; + ddl_execution_id_ = param.ddl_execution_id_; + ddl_cluster_version_ = param.ddl_cluster_version_; } return ret; @@ -715,7 +741,9 @@ ObMigrationTabletParam::ObMigrationTabletParam() table_store_flag_(), ddl_start_log_ts_(OB_INVALID_TIMESTAMP), ddl_snapshot_version_(OB_INVALID_TIMESTAMP), - max_sync_storage_schema_version_(0) + max_sync_storage_schema_version_(0), + ddl_execution_id_(0), + ddl_cluster_version_(0) { } @@ -795,6 +823,10 @@ int ObMigrationTabletParam::serialize(char *buf, const int64_t len, int64_t &pos LOG_WARN("failed to serialize ddl snapshot version", K(ret), K(len), K(new_pos), K_(ddl_snapshot_version)); } else if (OB_FAIL(serialization::encode_i64(buf, len, new_pos, max_sync_storage_schema_version_))) { LOG_WARN("failed to serialize max_sync_storage_schema_version", K(ret), K(len), K(new_pos), K_(max_sync_storage_schema_version)); + } else if (OB_FAIL(serialization::encode_i64(buf, len, new_pos, ddl_execution_id_))) { + LOG_WARN("failed to serialize ddl execution id", K(ret), K(len), K(new_pos), K_(ddl_execution_id)); + } else if (OB_FAIL(serialization::encode_i64(buf, len, new_pos, ddl_cluster_version_))) { + LOG_WARN("failed to serialize ddl cluster version", K(ret), K(len), K(new_pos), K_(ddl_cluster_version)); } else { pos = new_pos; } @@ -858,6 +890,10 @@ int ObMigrationTabletParam::deserialize(const char *buf, const int64_t len, int6 LOG_WARN("failed to deserialize ddl snapshot version", K(ret), K(len), K(new_pos)); } else if (OB_FAIL(serialization::decode_i64(buf, len, new_pos, &max_sync_storage_schema_version_))) { LOG_WARN("failed to deserialize max sync storage schema version", K(ret), K(len), K(new_pos)); + } else if (OB_FAIL(serialization::decode_i64(buf, len, new_pos, &ddl_execution_id_))) { + LOG_WARN("failed to deserialize ddl execution id", K(ret), K(len), K(new_pos)); + } else if (OB_FAIL(serialization::decode_i64(buf, len, new_pos, &ddl_cluster_version_))) { + LOG_WARN("failed to deserialize ddl cluster version", K(ret), K(len), K(new_pos)); } else { compat_mode_ = static_cast(compat_mode); pos = new_pos; @@ -891,6 +927,8 @@ int64_t ObMigrationTabletParam::get_serialize_size() const size += serialization::encoded_length_i64(ddl_start_log_ts_); size += serialization::encoded_length_i64(ddl_snapshot_version_); size += serialization::encoded_length_i64(max_sync_storage_schema_version_); + size += serialization::encoded_length_i64(ddl_execution_id_); + size += serialization::encoded_length_i64(ddl_cluster_version_); return size; } @@ -917,6 +955,8 @@ void ObMigrationTabletParam::reset() ddl_start_log_ts_ = OB_INVALID_TIMESTAMP; ddl_snapshot_version_ = OB_INVALID_TIMESTAMP; max_sync_storage_schema_version_ = 0; + ddl_execution_id_ = 0; + ddl_cluster_version_ = 0; } int ObMigrationTabletParam::assign(const ObMigrationTabletParam ¶m) @@ -946,6 +986,8 @@ int ObMigrationTabletParam::assign(const ObMigrationTabletParam ¶m) ddl_start_log_ts_ = param.ddl_start_log_ts_; ddl_snapshot_version_ = param.ddl_snapshot_version_; max_sync_storage_schema_version_ = param.max_sync_storage_schema_version_; + ddl_execution_id_ = param.ddl_execution_id_; + ddl_cluster_version_ = param.ddl_cluster_version_; if (OB_FAIL(ddl_data_.assign(param.ddl_data_))) { LOG_WARN("failed to assign ddl data", K(ret), K(param), K(*this)); diff --git a/src/storage/tablet/ob_tablet_meta.h b/src/storage/tablet/ob_tablet_meta.h index 6896ff251d..2fd192eb20 100644 --- a/src/storage/tablet/ob_tablet_meta.h +++ b/src/storage/tablet/ob_tablet_meta.h @@ -76,7 +76,9 @@ public: const int64_t clog_checkpoint_ts = 0, const int64_t ddl_checkpoint_ts = 0, const int64_t ddl_start_log_ts = 0, - const int64_t ddl_snapshot_version = 0); + const int64_t ddl_snapshot_version = 0, + const int64_t ddl_execution_id = 0, + const int64_t ddl_cluster_version = 0); int init( common::ObIAllocator &allocator, const ObMigrationTabletParam ¶m); @@ -134,7 +136,9 @@ public: K_(table_store_flag), K_(ddl_start_log_ts), K_(ddl_snapshot_version), - K_(max_sync_storage_schema_version)); + K_(max_sync_storage_schema_version), + K_(ddl_execution_id), + K_(ddl_cluster_version)); public: int32_t version_; @@ -161,6 +165,8 @@ public: int64_t ddl_start_log_ts_; int64_t ddl_snapshot_version_; int64_t max_sync_storage_schema_version_; + int64_t ddl_execution_id_; + int64_t ddl_cluster_version_; //ATTENTION : Add a new variable need consider ObMigrationTabletParam // and tablet meta init interface for migration. // yuque : https://yuque.antfin.com/ob/ob-backup/zzwpuh @@ -242,6 +248,8 @@ public: int64_t ddl_snapshot_version_; // max_sync_version may less than storage_schema.schema_version_ when major update schema int64_t max_sync_storage_schema_version_; + int64_t ddl_execution_id_; + int64_t ddl_cluster_version_; }; } // namespace storage