diff --git a/src/rootserver/ddl_task/ob_column_redefinition_task.cpp b/src/rootserver/ddl_task/ob_column_redefinition_task.cpp index c1ea366ba..702b325a3 100644 --- a/src/rootserver/ddl_task/ob_column_redefinition_task.cpp +++ b/src/rootserver/ddl_task/ob_column_redefinition_task.cpp @@ -304,11 +304,15 @@ int ObColumnRedefinitionTask::copy_table_indexes() } } if (OB_SUCC(ret) && need_rebuild_index) { - ObDDLTaskKey task_key(index_ids.at(i), index_schema->get_schema_version()); + const uint64_t task_key = index_ids.at(i); DependTaskStatus status; status.task_id_ = task_record.task_id_; // child task id is used to judge whether child task finish. - if (OB_FAIL(dependent_task_result_map_.set_refactored(task_key, status, true/*overwrite*/))) { - LOG_WARN("set dependent task map failed", K(ret), K(task_key)); + if (OB_FAIL(dependent_task_result_map_.set_refactored(task_key, status))) { + if (OB_HASH_EXIST == ret) { + ret = OB_SUCCESS; + } else { + LOG_WARN("set dependent task map failed", K(ret), K(task_key)); + } } else { LOG_INFO("add build index task", K(task_record)); } @@ -448,10 +452,9 @@ int ObColumnRedefinitionTask::copy_table_dependent_objects(const ObDDLTaskStatus } else { // wait copy dependent objects to be finished ObAddr unused_addr; - for (common::hash::ObHashMap::const_iterator iter = dependent_task_result_map_.begin(); + for (common::hash::ObHashMap::const_iterator iter = dependent_task_result_map_.begin(); iter != dependent_task_result_map_.end(); ++iter) { - const int64_t table_id = iter->first.object_id_; - const int64_t schema_version = iter->first.schema_version_; + const uint64_t task_key = iter->first; const int64_t target_object_id = -1; const int64_t child_task_id = iter->second.task_id_; if (iter->second.ret_code_ == INT64_MAX) { @@ -462,9 +465,9 @@ int ObColumnRedefinitionTask::copy_table_dependent_objects(const ObDDLTaskStatus unused_addr, false /* is_ddl_retry_task */, *GCTX.sql_proxy_, error_message, unused_user_msg_len))) { if (OB_ENTRY_NOT_EXIST == ret) { ret = OB_SUCCESS; - LOG_INFO("ddl task not finish", K(table_id), K(child_task_id), K(schema_version), K(target_object_id)); + LOG_INFO("ddl task not finish", K(task_key), K(child_task_id), K(target_object_id)); } else { - LOG_WARN("fail to get ddl error message", K(ret), K(table_id), K(child_task_id), K(schema_version), K(target_object_id)); + LOG_WARN("fail to get ddl error message", K(ret), K(task_key), K(child_task_id), K(target_object_id)); } } else { finished_task_cnt++; diff --git a/src/rootserver/ddl_task/ob_ddl_redefinition_task.cpp b/src/rootserver/ddl_task/ob_ddl_redefinition_task.cpp index cb1e54c1b..acf707de2 100644 --- a/src/rootserver/ddl_task/ob_ddl_redefinition_task.cpp +++ b/src/rootserver/ddl_task/ob_ddl_redefinition_task.cpp @@ -786,9 +786,8 @@ int ObDDLRedefinitionTask::add_constraint_ddl_task(const int64_t constraint_id, if (OB_SUCC(ret)) { DependTaskStatus status; bool need_set_status = false; - ObDDLTaskKey task_key(constraint_id, table_schema->get_schema_version()); status.task_id_ = task_id; // child task id, which is used to judge child task finish. - if (OB_FAIL(dependent_task_result_map_.get_refactored(task_key, status))) { + if (OB_FAIL(dependent_task_result_map_.get_refactored(constraint_id, status))) { if (OB_HASH_NOT_EXIST != ret) { LOG_WARN("get from dependent task map failed", K(ret)); } else { @@ -798,10 +797,10 @@ int ObDDLRedefinitionTask::add_constraint_ddl_task(const int64_t constraint_id, } if (OB_SUCC(ret) && need_set_status) { status.task_id_ = task_id; // child task id is used to judge whether child task finish. - if (OB_FAIL(dependent_task_result_map_.set_refactored(task_key, status))) { - LOG_WARN("set dependent task map failed", K(ret), K(task_key)); + if (OB_FAIL(dependent_task_result_map_.set_refactored(constraint_id, status))) { + LOG_WARN("set dependent task map failed", K(ret), K(constraint_id)); } else { - LOG_INFO("add constraint task", K(task_key)); + LOG_INFO("add constraint task", K(constraint_id)); } } } @@ -873,7 +872,6 @@ int ObDDLRedefinitionTask::add_fk_ddl_task(const int64_t fk_id, ObSchemaGetterGu LOG_WARN("cannot find foreign key in table", K(ret), K(fk_id), K(fk_info_array)); } else { DependTaskStatus status; - ObDDLTaskKey task_key(fk_id, hidden_table_schema->get_schema_version()); fk_arg.foreign_key_name_ = fk_info.foreign_key_name_; fk_arg.enable_flag_ = fk_info.enable_flag_; fk_arg.is_modify_enable_flag_ = fk_info.enable_flag_; @@ -903,10 +901,10 @@ int ObDDLRedefinitionTask::add_fk_ddl_task(const int64_t fk_id, ObSchemaGetterGu } else if (OB_FAIL(root_service->get_ddl_task_scheduler().schedule_ddl_task(task_record))) { LOG_WARN("fail to schedule ddl task", K(ret), K(task_record)); } else if (FALSE_IT(status.task_id_ = task_record.task_id_)) { // child task id is used to judge whether child task finish. - } else if (OB_FAIL(dependent_task_result_map_.set_refactored(task_key, status))) { + } else if (OB_FAIL(dependent_task_result_map_.set_refactored(fk_id, status))) { LOG_WARN("set dependent task map failed", K(ret)); } else { - LOG_INFO("add foregin key ddl task", K(fk_arg), "ddl_task_key", task_key); + LOG_INFO("add foregin key ddl task", K(fk_arg), K(fk_id)); } } } @@ -916,14 +914,14 @@ int ObDDLRedefinitionTask::add_fk_ddl_task(const int64_t fk_id, ObSchemaGetterGu } int ObDDLRedefinitionTask::on_child_task_finish( - const ObDDLTaskKey &child_task_key, + const uint64_t child_task_key, const int ret_code) { int ret = OB_SUCCESS; if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; LOG_WARN("ObDDLRedefinitionTask has not been inited", K(ret)); - } else if (OB_UNLIKELY(!child_task_key.is_valid())) { + } else if (OB_UNLIKELY(common::OB_INVALID_ID == child_task_key)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid arguments", K(ret), K(child_task_key)); } else { diff --git a/src/rootserver/ddl_task/ob_ddl_redefinition_task.h b/src/rootserver/ddl_task/ob_ddl_redefinition_task.h index 3fd0c673d..99e8c7024 100644 --- a/src/rootserver/ddl_task/ob_ddl_redefinition_task.h +++ b/src/rootserver/ddl_task/ob_ddl_redefinition_task.h @@ -119,7 +119,7 @@ public: const int64_t execution_id, const int ret_code) = 0; int on_child_task_finish( - const ObDDLTaskKey &child_task_key, + const uint64_t child_task_key, const int ret_code); virtual int serialize_params_to_message(char *buf, const int64_t buf_size, int64_t &pos) const override; virtual int deserlize_params_from_message(const char *buf, const int64_t buf_size, int64_t &pos) override; @@ -212,7 +212,7 @@ protected: int64_t build_replica_request_time_; int64_t complete_sstable_job_ret_code_; obrpc::ObAlterTableArg alter_table_arg_; - common::hash::ObHashMap dependent_task_result_map_; + common::hash::ObHashMap dependent_task_result_map_; bool snapshot_held_; bool has_synced_autoincrement_; bool has_synced_stats_info_; diff --git a/src/rootserver/ddl_task/ob_ddl_retry_task.cpp b/src/rootserver/ddl_task/ob_ddl_retry_task.cpp index 2a5277622..f0ae5fbd7 100644 --- a/src/rootserver/ddl_task/ob_ddl_retry_task.cpp +++ b/src/rootserver/ddl_task/ob_ddl_retry_task.cpp @@ -12,6 +12,7 @@ #define USING_LOG_PREFIX RS #include "ob_ddl_retry_task.h" +#include "lib/mysqlclient/ob_mysql_result.h" #include "lib/rc/context.h" #include "share/schema/ob_multi_version_schema_service.h" #include "share/ob_ddl_error_message_table_operator.h" @@ -30,7 +31,7 @@ using namespace oceanbase::obrpc; ObDDLRetryTask::ObDDLRetryTask() : ObDDLTask(share::DDL_INVALID), ddl_arg_(nullptr), root_service_(nullptr), affected_rows_(0), - forward_user_message_(), allocator_(lib::ObLabel("RedefTask")) + forward_user_message_(), allocator_(lib::ObLabel("RedefTask")), is_schema_change_done_(false) { } @@ -184,6 +185,7 @@ int ObDDLRetryTask::init(const uint64_t tenant_id, task_type_ = ddl_type; task_version_ = OB_DDL_RETRY_TASK_VERSION; task_status_ = static_cast(task_status); + is_schema_change_done_ = false; is_inited_ = true; } return ret; @@ -209,6 +211,7 @@ int ObDDLRetryTask::init(const ObDDLTaskRecord &task_record) task_version_ = task_record.task_version_; ret_code_ = task_record.ret_code_; task_status_ = static_cast(task_record.task_status_); + is_schema_change_done_ = false; // do not worry about it, check_schema_change_done will correct it. if (nullptr != task_record.message_) { int64_t pos = 0; if (OB_FAIL(deserlize_params_from_message(task_record.message_.ptr(), task_record.message_.length(), pos))) { @@ -281,6 +284,46 @@ int ObDDLRetryTask::get_forward_user_message(const obrpc::ObRpcResultCode &rcode return ret; } +int ObDDLRetryTask::check_schema_change_done() +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!is_inited_)) { + ret = OB_NOT_INIT; + LOG_WARN("not init", K(ret)); + } else if (is_schema_change_done_) { + // do nothing. + } else if (OB_ISNULL(root_service_)) { + ret = OB_ERR_SYS; + LOG_WARN("error sys, root service must not be nullptr", K(ret)); + } else { + common::ObMySQLProxy &proxy = root_service_->get_sql_proxy(); + SMART_VAR(ObMySQLProxy::MySQLResult, res) { + ObSqlString query_string; + sqlclient::ObMySQLResult *result = NULL; + if (OB_UNLIKELY(!proxy.is_inited())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected error, proxy is not inited", K(ret)); + } else if (OB_FAIL(query_string.assign_fmt( + " SELECT status FROM %s WHERE task_id = %lu", + OB_ALL_DDL_TASK_STATUS_TNAME, task_id_))) { + LOG_WARN("assign query string failed", K(ret), KPC(this)); + } else if (OB_FAIL(proxy.read(res, tenant_id_, query_string.ptr()))) { + LOG_WARN("read record failed", K(ret), K(query_string)); + } else if (OB_UNLIKELY(nullptr == (result = res.get_result()))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("fail to get sql result", K(ret), KP(result)); + } else if (OB_FAIL(result->next())) { + LOG_WARN("get record failed", K(ret), K(query_string)); + } else { + int64_t table_task_status = 0; + EXTRACT_INT_FIELD_MYSQL(*result, "status", table_task_status, int64_t); + is_schema_change_done_ = ObDDLTaskStatus::WAIT_CHILD_TASK_FINISH == table_task_status ? true : is_schema_change_done_; + } + } + } + return ret; +} + int ObDDLRetryTask::drop_schema(const ObDDLTaskStatus next_task_status) { int ret = OB_SUCCESS; @@ -294,6 +337,11 @@ int ObDDLRetryTask::drop_schema(const ObDDLTaskStatus next_task_status) } else if (OB_ISNULL(ddl_arg_) || lib::Worker::CompatMode::INVALID == compat_mode_) { ret = OB_ERR_UNEXPECTED; LOG_WARN("unexpected error", K(ret), KP(ddl_arg_), K(compat_mode_)); + } else if (OB_FAIL(check_schema_change_done())) { + LOG_WARN("check task finished failed", K(ret)); + } else if (is_schema_change_done_) { + // schema has already changed. + new_status = next_task_status; } else { lib::Worker::CompatMode save_compat_mode = THIS_WORKER.get_compatibility_mode(); THIS_WORKER.set_compatibility_mode(compat_mode_); @@ -508,7 +556,7 @@ int ObDDLRetryTask::process() } case ObDDLTaskStatus::DROP_SCHEMA: { if (OB_FAIL(drop_schema(ObDDLTaskStatus::WAIT_CHILD_TASK_FINISH))) { - LOG_WARN("fail to write barrier log", K(ret)); + LOG_WARN("fail to drop schema", K(ret)); } break; } diff --git a/src/rootserver/ddl_task/ob_ddl_retry_task.h b/src/rootserver/ddl_task/ob_ddl_retry_task.h index 372131da4..ba063f02f 100644 --- a/src/rootserver/ddl_task/ob_ddl_retry_task.h +++ b/src/rootserver/ddl_task/ob_ddl_retry_task.h @@ -56,6 +56,7 @@ private: int deep_copy_ddl_arg(common::ObIAllocator &allocator, const share::ObDDLType &ddl_type, const obrpc::ObDDLArg *source_arg); int init_compat_mode(const share::ObDDLType &ddl_type, const obrpc::ObDDLArg *source_arg); int get_forward_user_message(const obrpc::ObRpcResultCode &rcode); + int check_schema_change_done(); virtual bool is_error_need_retry(const int ret_code) override { return common::OB_PARTITION_NOT_EXIST != ret_code && ObDDLTask::is_error_need_retry(ret_code); @@ -68,6 +69,7 @@ private: common::ObString forward_user_message_; common::ObArenaAllocator allocator_; obrpc::ObAlterTableRes alter_table_res_; // in memory + bool is_schema_change_done_; }; } // end namespace rootserver diff --git a/src/rootserver/ddl_task/ob_ddl_scheduler.cpp b/src/rootserver/ddl_task/ob_ddl_scheduler.cpp index c72709e98..f52d27a0c 100644 --- a/src/rootserver/ddl_task/ob_ddl_scheduler.cpp +++ b/src/rootserver/ddl_task/ob_ddl_scheduler.cpp @@ -499,7 +499,7 @@ int ObDDLScheduler::create_ddl_task(const ObCreateDDLTaskParam ¶m, ret = OB_NOT_SUPPORTED; LOG_WARN("error unexpected, ddl type is not supported", K(ret), K(param.type_)); } - LOG_INFO("create ddl task", K(param), K(task_record)); + LOG_INFO("create ddl task", K(ret), K(param), K(task_record)); } return ret; } @@ -1429,7 +1429,7 @@ int ObDDLScheduler::on_ddl_task_finish( ret = OB_ERR_SYS; LOG_WARN("ddl task must not be nullptr", K(ret)); } else if (FALSE_IT(redefinition_task = static_cast(ddl_task))) { - } else if (OB_FAIL(redefinition_task->on_child_task_finish(child_task_key, ret_code))) { + } else if (OB_FAIL(redefinition_task->on_child_task_finish(child_task_key.object_id_, ret_code))) { LOG_WARN("on child task finish failed", K(ret), K(child_task_key)); } } diff --git a/src/rootserver/ddl_task/ob_ddl_task.cpp b/src/rootserver/ddl_task/ob_ddl_task.cpp index 427732a75..0998da8f2 100644 --- a/src/rootserver/ddl_task/ob_ddl_task.cpp +++ b/src/rootserver/ddl_task/ob_ddl_task.cpp @@ -1906,6 +1906,34 @@ int ObDDLTaskRecordOperator::insert_record( if (OB_UNLIKELY(0 > (pos = record.trace_id_.to_string(trace_id_str, sizeof(trace_id_str))))) { ret = OB_ERR_UNEXPECTED; LOG_WARN("get task id string failed", K(ret), K(record), K(pos)); + } else if (record.parent_task_id_ > 0) { + // for child task only. + SMART_VAR(ObMySQLProxy::MySQLResult, res) { + ObSqlString query_string; + sqlclient::ObMySQLResult *result = NULL; + if (OB_FAIL(query_string.assign_fmt( + " SELECT * FROM %s WHERE object_id = %lu and target_object_id = %lu", + OB_ALL_DDL_TASK_STATUS_TNAME, record.object_id_, record.target_object_id_))) { + LOG_WARN("assign query string failed", K(ret), K(record)); + } else if (OB_FAIL(proxy.read(res, record.tenant_id_, query_string.ptr()))) { + LOG_WARN("read record failed", K(ret), K(query_string)); + } else if (OB_UNLIKELY(nullptr == (result = res.get_result()))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("fail to get sql result", K(ret), KP(result)); + } else if (OB_FAIL(result->next())) { + if (OB_ITER_END == ret) { + ret = OB_SUCCESS; + } else { + LOG_WARN("get next row failed", K(ret)); + } + } else { + // do not insert duplicated record. + ret = OB_ENTRY_EXIST; + } + } + } + + if (OB_FAIL(ret)) { } else if (OB_FAIL(to_hex_str(record.ddl_stmt_str_, ddl_stmt_string))) { LOG_WARN("append hex escaped ddl stmt string failed", K(ret)); } else if (OB_FAIL(to_hex_str(record.message_, message_string))) { diff --git a/src/rootserver/ddl_task/ob_index_build_task.cpp b/src/rootserver/ddl_task/ob_index_build_task.cpp index 3e67dac47..01f1b7982 100644 --- a/src/rootserver/ddl_task/ob_index_build_task.cpp +++ b/src/rootserver/ddl_task/ob_index_build_task.cpp @@ -977,7 +977,7 @@ int ObIndexBuildTask::update_complete_sstable_job_status( sstable_complete_ts_ = ObTimeUtility::current_time(); execution_id_ = execution_id; // update ObIndexBuildTask::execution_id_ from ObIndexSSTableBuildTask::execution_id_ } - LOG_INFO("update complete sstable job return code", K(ret), K(tablet_id), K(snapshot_version), K(ret_code), K(execution_id_)); + LOG_INFO("update complete sstable job return code", K(ret), K(target_object_id_), K(tablet_id), K(snapshot_version), K(ret_code), K(execution_id_)); return ret; } diff --git a/src/rootserver/ddl_task/ob_table_redefinition_task.cpp b/src/rootserver/ddl_task/ob_table_redefinition_task.cpp index 483c68b98..a6a76ca4e 100644 --- a/src/rootserver/ddl_task/ob_table_redefinition_task.cpp +++ b/src/rootserver/ddl_task/ob_table_redefinition_task.cpp @@ -383,11 +383,15 @@ int ObTableRedefinitionTask::copy_table_indexes() } } if (OB_SUCC(ret) && need_rebuild_index) { - ObDDLTaskKey task_key(index_ids.at(i), index_schema->get_schema_version()); + const uint64_t task_key = index_ids.at(i); DependTaskStatus status; status.task_id_ = task_record.task_id_; - if (OB_FAIL(dependent_task_result_map_.set_refactored(task_key, status, true/*overwrite*/))) { - LOG_WARN("set dependent task map failed", K(ret), K(task_key)); + if (OB_FAIL(dependent_task_result_map_.set_refactored(task_key, status))) { + if (OB_HASH_EXIST == ret) { + ret = OB_SUCCESS; + } else { + LOG_WARN("set dependent task map failed", K(ret), K(task_key)); + } } else { LOG_INFO("add build index task", K(task_key)); } @@ -559,10 +563,9 @@ int ObTableRedefinitionTask::copy_table_dependent_objects(const ObDDLTaskStatus } else { // wait copy dependent objects to be finished ObAddr unused_addr; - for (common::hash::ObHashMap::const_iterator iter = dependent_task_result_map_.begin(); + for (common::hash::ObHashMap::const_iterator iter = dependent_task_result_map_.begin(); iter != dependent_task_result_map_.end(); ++iter) { - const int64_t table_id = iter->first.object_id_; - const int64_t schema_version = iter->first.schema_version_; + const uint64_t task_key = iter->first; const int64_t target_object_id = -1; const int64_t child_task_id = iter->second.task_id_; if (iter->second.ret_code_ == INT64_MAX) { @@ -573,9 +576,9 @@ int ObTableRedefinitionTask::copy_table_dependent_objects(const ObDDLTaskStatus unused_addr, false /* is_ddl_retry_task */, *GCTX.sql_proxy_, error_message, unused_user_msg_len))) { if (OB_ENTRY_NOT_EXIST == ret) { ret = OB_SUCCESS; - LOG_INFO("ddl task not finish", K(table_id), K(child_task_id), K(schema_version), K(target_object_id)); + LOG_INFO("ddl task not finish", K(task_key), K(child_task_id), K(target_object_id)); } else { - LOG_WARN("fail to get ddl error message", K(ret), K(table_id), K(child_task_id), K(schema_version), K(target_object_id)); + LOG_WARN("fail to get ddl error message", K(ret), K(task_key), K(child_task_id), K(target_object_id)); } } else { finished_task_cnt++; diff --git a/src/share/ob_ddl_task_executor.h b/src/share/ob_ddl_task_executor.h index 10e427783..4dc92a761 100644 --- a/src/share/ob_ddl_task_executor.h +++ b/src/share/ob_ddl_task_executor.h @@ -80,7 +80,8 @@ private: || common::OB_ERR_REMOTE_SCHEMA_NOT_FULL == ret_code || common::OB_ERR_EXCLUSIVE_LOCK_CONFLICT == ret_code || common::OB_ERR_EXCLUSIVE_LOCK_CONFLICT == ret_code || common::OB_ERR_EXCLUSIVE_LOCK_CONFLICT_NOWAIT == ret_code || common::OB_TRANS_STMT_NEED_RETRY == ret_code || common::OB_SCHEMA_NOT_UPTODATE == ret_code || common::OB_TRANSACTION_SET_VIOLATION == ret_code || common::OB_TRANS_CANNOT_SERIALIZE == ret_code || common::OB_GTI_NOT_READY == ret_code - || common::OB_TRANS_WEAK_READ_VERSION_NOT_READY == ret_code || common::OB_REPLICA_NOT_READABLE == ret_code || common::OB_ERR_INSUFFICIENT_PX_WORKER == ret_code; + || common::OB_TRANS_WEAK_READ_VERSION_NOT_READY == ret_code || common::OB_REPLICA_NOT_READABLE == ret_code || common::OB_ERR_INSUFFICIENT_PX_WORKER == ret_code + || common::OB_EXCEED_MEM_LIMIT == ret_code; } static bool is_not_exist(const int ret_code) { return common::OB_LS_NOT_EXIST == ret_code || common::OB_TABLET_NOT_EXIST == ret_code || common::OB_TENANT_NOT_EXIST == ret_code