diff --git a/src/rootserver/ddl_task/ob_column_redefinition_task.cpp b/src/rootserver/ddl_task/ob_column_redefinition_task.cpp index 702b325a3..7be408092 100644 --- a/src/rootserver/ddl_task/ob_column_redefinition_task.cpp +++ b/src/rootserver/ddl_task/ob_column_redefinition_task.cpp @@ -67,6 +67,7 @@ int ObColumnRedefinitionTask::init(const uint64_t tenant_id, const int64_t task_ task_id_ = task_id; parallelism_ = parallelism; execution_id_ = 1L; + cluster_version_ = GET_MIN_CLUSTER_VERSION(); is_inited_ = true; } return ret; @@ -152,6 +153,7 @@ int ObColumnRedefinitionTask::send_build_single_replica_request() param.task_id_ = task_id_; param.parallelism_ = alter_table_arg_.parallelism_; param.execution_id_ = execution_id_; + param.cluster_version_ = cluster_version_; if (OB_FAIL(ObDDLUtil::get_tablets(tenant_id_, object_id_, param.source_tablet_ids_))) { LOG_WARN("fail to get tablets", K(ret), K(tenant_id_), K(object_id_)); } else if (OB_FAIL(ObDDLUtil::get_tablets(tenant_id_, target_object_id_, param.dest_tablet_ids_))) { diff --git a/src/rootserver/ddl_task/ob_ddl_redefinition_task.cpp b/src/rootserver/ddl_task/ob_ddl_redefinition_task.cpp index 6137a5eae..5fd4d6095 100644 --- a/src/rootserver/ddl_task/ob_ddl_redefinition_task.cpp +++ b/src/rootserver/ddl_task/ob_ddl_redefinition_task.cpp @@ -48,8 +48,8 @@ ObDDLRedefinitionSSTableBuildTask::ObDDLRedefinitionSSTableBuildTask( const common::ObAddr &inner_sql_exec_addr) : is_inited_(false), tenant_id_(tenant_id), task_id_(task_id), data_table_id_(data_table_id), dest_table_id_(dest_table_id), schema_version_(schema_version), snapshot_version_(snapshot_version), - execution_id_(execution_id), sql_mode_(sql_mode), trace_id_(trace_id), parallelism_(parallelism), - use_heap_table_ddl_plan_(use_heap_table_ddl_plan), root_service_(root_service), + execution_id_(execution_id), sql_mode_(sql_mode), trace_id_(trace_id), + parallelism_(parallelism), use_heap_table_ddl_plan_(use_heap_table_ddl_plan), root_service_(root_service), inner_sql_exec_addr_(inner_sql_exec_addr) { set_retry_times(0); // do not retry @@ -1310,7 +1310,7 @@ int ObDDLRedefinitionTask::serialize_params_to_message(char *buf, const int64_t } else if (OB_FAIL(alter_table_arg_.serialize(buf, buf_len, pos))) { LOG_WARN("serialize table arg failed", K(ret)); } else { - LST_DO_CODE(OB_UNIS_ENCODE, parallelism_); + LST_DO_CODE(OB_UNIS_ENCODE, parallelism_, cluster_version_); } return ret; } @@ -1329,7 +1329,7 @@ int ObDDLRedefinitionTask::deserlize_params_from_message(const char *buf, const } else if (OB_FAIL(deep_copy_table_arg(allocator_, tmp_arg, alter_table_arg_))) { LOG_WARN("deep copy table arg failed", K(ret)); } else { - LST_DO_CODE(OB_UNIS_DECODE, parallelism_); + LST_DO_CODE(OB_UNIS_DECODE, parallelism_, cluster_version_); } return ret; } @@ -1337,7 +1337,7 @@ int ObDDLRedefinitionTask::deserlize_params_from_message(const char *buf, const int64_t ObDDLRedefinitionTask::get_serialize_param_size() const { return alter_table_arg_.get_serialize_size() + serialization::encoded_length_i64(task_version_) - + serialization::encoded_length_i64(parallelism_); + + serialization::encoded_length_i64(parallelism_) + serialization::encoded_length_i64(cluster_version_); } int ObDDLRedefinitionTask::check_health() diff --git a/src/rootserver/ddl_task/ob_ddl_single_replica_executor.cpp b/src/rootserver/ddl_task/ob_ddl_single_replica_executor.cpp index 0db3f9803..d9bcdf822 100644 --- a/src/rootserver/ddl_task/ob_ddl_single_replica_executor.cpp +++ b/src/rootserver/ddl_task/ob_ddl_single_replica_executor.cpp @@ -44,6 +44,7 @@ int ObDDLSingleReplicaExecutor::build(const ObDDLSingleReplicaExecutorParam &par task_id_ = param.task_id_; execution_id_ = param.execution_id_; parallelism_ = param.parallelism_; + cluster_version_ = param.cluster_version_; common::ObIArray &build_infos = partition_build_stat_; common::ObIArray &tablet_ids = source_tablet_ids_; @@ -117,6 +118,7 @@ int ObDDLSingleReplicaExecutor::schedule_task() arg.task_id_ = task_id_; arg.parallelism_ = parallelism_; arg.execution_id_ = execution_id_; + arg.cluster_version_ = cluster_version_; arg.tablet_task_id_ = tablet_task_ids_.at(i); if (OB_FAIL(location_service->get(tenant_id_, arg.source_tablet_id_, expire_renew_time, is_cache_hit, ls_id))) { diff --git a/src/rootserver/ddl_task/ob_ddl_single_replica_executor.h b/src/rootserver/ddl_task/ob_ddl_single_replica_executor.h index 0d483be81..66479b06b 100644 --- a/src/rootserver/ddl_task/ob_ddl_single_replica_executor.h +++ b/src/rootserver/ddl_task/ob_ddl_single_replica_executor.h @@ -36,18 +36,20 @@ public: snapshot_version_(0), task_id_(0), parallelism_(0), - execution_id_(-1) + execution_id_(-1), + cluster_version_(0) {} ~ObDDLSingleReplicaExecutorParam() = default; bool is_valid() const { return common::OB_INVALID_TENANT_ID != tenant_id_ && share::DDL_INVALID != type_ && source_tablet_ids_.count() > 0 && dest_tablet_ids_.count() > 0 && common::OB_INVALID_ID != source_table_id_ && common::OB_INVALID_ID != dest_table_id_ - && schema_version_ > 0 && snapshot_version_ > 0 && task_id_ > 0 && execution_id_ >= 0; + && schema_version_ > 0 && snapshot_version_ > 0 && task_id_ > 0 && execution_id_ >= 0 + && cluster_version_ > 0; } TO_STRING_KV(K_(tenant_id), K_(type), K_(source_tablet_ids), K_(dest_tablet_ids), K_(source_table_id), K_(dest_table_id), K_(schema_version), - K_(snapshot_version), K_(task_id), K_(parallelism), K_(execution_id)); + K_(snapshot_version), K_(task_id), K_(parallelism), K_(execution_id), K_(cluster_version)); public: uint64_t tenant_id_; share::ObDDLType type_; @@ -60,6 +62,7 @@ public: int64_t task_id_; int64_t parallelism_; int64_t execution_id_; + int64_t cluster_version_; }; class ObDDLSingleReplicaExecutor @@ -111,6 +114,7 @@ private: int64_t task_id_; int64_t parallelism_; int64_t execution_id_; + int64_t cluster_version_; common::ObArray partition_build_stat_; common::ObSpinLock lock_; }; diff --git a/src/rootserver/ddl_task/ob_ddl_task.h b/src/rootserver/ddl_task/ob_ddl_task.h index 571ebaf2c..19e080dbb 100644 --- a/src/rootserver/ddl_task/ob_ddl_task.h +++ b/src/rootserver/ddl_task/ob_ddl_task.h @@ -274,7 +274,7 @@ public: target_object_id_(0), task_status_(share::ObDDLTaskStatus::PREPARE), snapshot_version_(0), ret_code_(OB_SUCCESS), task_id_(0), parent_task_id_(0), parent_task_key_(), task_version_(0), parallelism_(0), allocator_(lib::ObLabel("DdlTask")), compat_mode_(lib::Worker::CompatMode::INVALID), err_code_occurence_cnt_(0), - delay_schedule_time_(0), next_schedule_ts_(0), execution_id_(-1), sql_exec_addr_() + delay_schedule_time_(0), next_schedule_ts_(0), execution_id_(-1), sql_exec_addr_(), cluster_version_(0) {} virtual ~ObDDLTask() {} virtual int process() = 0; @@ -300,6 +300,7 @@ public: int64_t get_task_version() const { return task_version_; } int64_t get_execution_id() const { return execution_id_; } int64_t get_parallelism() const { return parallelism_; } + int64_t get_cluster_version() const { return cluster_version_; } static int deep_copy_table_arg(common::ObIAllocator &allocator, const obrpc::ObDDLArg &source_arg, obrpc::ObDDLArg &dest_arg); static int fetch_new_task_id(ObMySQLProxy &sql_proxy, int64_t &new_task_id); virtual int serialize_params_to_message(char *buf, const int64_t buf_size, int64_t &pos) const = 0; @@ -336,7 +337,8 @@ public: K(target_object_id_), K(task_status_), K(snapshot_version_), K_(ret_code), K_(task_id), K_(parent_task_id), K_(parent_task_key), K_(task_version), K_(parallelism), K_(ddl_stmt_str), K_(compat_mode), - K_(sys_task_id), K_(err_code_occurence_cnt), K_(next_schedule_ts), K_(delay_schedule_time), K(execution_id_), K(sql_exec_addr_)); + K_(sys_task_id), K_(err_code_occurence_cnt), K_(next_schedule_ts), K_(delay_schedule_time), + K(execution_id_), K(sql_exec_addr_), K_(cluster_version)); protected: virtual bool is_error_need_retry(const int ret_code) { @@ -371,6 +373,7 @@ protected: int64_t next_schedule_ts_; int64_t execution_id_; common::ObAddr sql_exec_addr_; + int64_t cluster_version_; }; enum ColChecksumStat diff --git a/src/rootserver/ddl_task/ob_index_build_task.cpp b/src/rootserver/ddl_task/ob_index_build_task.cpp index ab8085990..6572812e8 100644 --- a/src/rootserver/ddl_task/ob_index_build_task.cpp +++ b/src/rootserver/ddl_task/ob_index_build_task.cpp @@ -321,6 +321,7 @@ int ObIndexBuildTask::init( task_id_ = task_id; parent_task_id_ = parent_task_id; task_version_ = OB_INDEX_BUILD_TASK_VERSION; + cluster_version_ = GET_MIN_CLUSTER_VERSION(); if (OB_SUCC(ret)) { task_status_ = static_cast(task_status); is_inited_ = true; @@ -1238,7 +1239,7 @@ int ObIndexBuildTask::serialize_params_to_message(char *buf, const int64_t buf_l LOG_WARN("serialize create index arg failed", K(ret)); } else { LST_DO_CODE(OB_UNIS_ENCODE, check_unique_snapshot_); - LST_DO_CODE(OB_UNIS_ENCODE, parallelism_); + LST_DO_CODE(OB_UNIS_ENCODE, parallelism_, cluster_version_); } return ret; } @@ -1258,7 +1259,7 @@ int ObIndexBuildTask::deserlize_params_from_message(const char *buf, const int64 LOG_WARN("deep copy create index arg failed", K(ret)); } else { LST_DO_CODE(OB_UNIS_DECODE, check_unique_snapshot_); - LST_DO_CODE(OB_UNIS_DECODE, parallelism_); + LST_DO_CODE(OB_UNIS_DECODE, parallelism_, cluster_version_); } return ret; } @@ -1266,5 +1267,6 @@ int ObIndexBuildTask::deserlize_params_from_message(const char *buf, const int64 int64_t ObIndexBuildTask::get_serialize_param_size() const { return create_index_arg_.get_serialize_size() + serialization::encoded_length_i64(check_unique_snapshot_) - + serialization::encoded_length_i64(task_version_) + serialization::encoded_length_i64(parallelism_); + + serialization::encoded_length_i64(task_version_) + serialization::encoded_length_i64(parallelism_) + + serialization::encoded_length_i64(cluster_version_); } diff --git a/src/rootserver/ddl_task/ob_index_build_task.h b/src/rootserver/ddl_task/ob_index_build_task.h index e3264ea7a..4a095da2c 100644 --- a/src/rootserver/ddl_task/ob_index_build_task.h +++ b/src/rootserver/ddl_task/ob_index_build_task.h @@ -38,8 +38,7 @@ public: : task_id_(task_id), tenant_id_(tenant_id), data_table_id_(data_table_id), dest_table_id_(dest_table_id), schema_version_(schema_version), snapshot_version_(snapshot_version), execution_id_(execution_id), trace_id_(trace_id), parallelism_(parallelism), allocator_("IdxSSTBuildTask"), - root_service_(root_service), - inner_sql_exec_addr_(inner_sql_exec_addr) + root_service_(root_service), inner_sql_exec_addr_(inner_sql_exec_addr) { set_retry_times(0); } diff --git a/src/rootserver/ddl_task/ob_table_redefinition_task.cpp b/src/rootserver/ddl_task/ob_table_redefinition_task.cpp index bcee91b3d..b283f1b19 100644 --- a/src/rootserver/ddl_task/ob_table_redefinition_task.cpp +++ b/src/rootserver/ddl_task/ob_table_redefinition_task.cpp @@ -70,6 +70,7 @@ int ObTableRedefinitionTask::init(const uint64_t tenant_id, const int64_t task_i task_version_ = OB_TABLE_REDEFINITION_TASK_VERSION; task_id_ = task_id; parallelism_ = parallelism; + cluster_version_ = GET_MIN_CLUSTER_VERSION(); alter_table_arg_.exec_tenant_id_ = tenant_id_; is_inited_ = true; } diff --git a/src/share/ob_ddl_common.cpp b/src/share/ob_ddl_common.cpp index 3767cb973..fe62e2b1f 100644 --- a/src/share/ob_ddl_common.cpp +++ b/src/share/ob_ddl_common.cpp @@ -28,6 +28,7 @@ #include "storage/tx_storage/ob_ls_handle.h" #include "storage/tx_storage/ob_ls_map.h" #include "rootserver/ob_root_service.h" +#include "rootserver/ddl_task/ob_ddl_task.h" using namespace oceanbase::share; using namespace oceanbase::common; @@ -959,6 +960,59 @@ int64_t ObDDLUtil::get_ddl_rpc_timeout() return max(GCONF.rpc_timeout, 9 * 1000 * 1000L); } +int ObDDLUtil::get_ddl_cluster_version( + const uint64_t tenant_id, + const uint64_t task_id, + int64_t &ddl_cluster_version) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(OB_INVALID_ID == tenant_id || task_id <= 0 + || nullptr == GCTX.sql_proxy_)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid arg", K(ret), K(tenant_id), K(task_id), KP(GCTX.sql_proxy_)); + } else { + SMART_VAR(ObMySQLProxy::MySQLResult, res) { + ObSqlString query_string; + sqlclient::ObMySQLResult *result = NULL; + if (OB_FAIL(query_string.assign_fmt(" SELECT ddl_type, UNHEX(message) as message_unhex FROM %s WHERE task_id = %lu", + OB_ALL_DDL_TASK_STATUS_TNAME, task_id))) { + LOG_WARN("assign sql string failed", K(ret)); + } else if (OB_FAIL(GCTX.sql_proxy_->read(res, tenant_id, query_string.ptr()))) { + LOG_WARN("read record failed", K(ret), K(query_string)); + } else if (OB_UNLIKELY(nullptr == (result = res.get_result()))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("fail to get sql result", K(ret), KP(result)); + } else if (OB_FAIL(result->next())) { + LOG_WARN("get next row failed", K(ret)); + } else { + int64_t pos = 0; + ObDDLType ddl_type = ObDDLType::DDL_INVALID; + ObString task_message; + EXTRACT_INT_FIELD_MYSQL(*result, "ddl_type", ddl_type, ObDDLType); + EXTRACT_VARCHAR_FIELD_MYSQL(*result, "message_unhex", task_message); + if (ObDDLType::DDL_CREATE_INDEX == ddl_type) { + SMART_VAR(rootserver::ObIndexBuildTask, task) { + if (OB_FAIL(task.deserlize_params_from_message(task_message.ptr(), task_message.length(), pos))) { + LOG_WARN("deserialize from msg failed", K(ret)); + } else { + ddl_cluster_version = task.get_cluster_version(); + } + } + } else { + SMART_VAR(rootserver::ObTableRedefinitionTask, task) { + if (OB_FAIL(task.deserlize_params_from_message(task_message.ptr(), task_message.length(), pos))) { + LOG_WARN("deserialize from msg failed", K(ret)); + } else { + ddl_cluster_version = task.get_cluster_version(); + } + } + } + } + } + } + return ret; +} + /****************** ObCheckTabletDataComplementOp *************/ int ObCheckTabletDataComplementOp::check_task_inner_sql_session_status( diff --git a/src/share/ob_ddl_common.h b/src/share/ob_ddl_common.h index e5b0a9a2c..e9b96cb52 100644 --- a/src/share/ob_ddl_common.h +++ b/src/share/ob_ddl_common.h @@ -313,6 +313,11 @@ public: static int64_t get_ddl_rpc_timeout(); + static int get_ddl_cluster_version( + const uint64_t tenant_id, + const uint64_t task_id, + int64_t &ddl_cluster_version); + private: static int generate_column_name_str( const common::ObIArray &column_names, diff --git a/src/share/ob_rpc_struct.cpp b/src/share/ob_rpc_struct.cpp index 46f82b0ac..2d38273ec 100644 --- a/src/share/ob_rpc_struct.cpp +++ b/src/share/ob_rpc_struct.cpp @@ -5724,7 +5724,7 @@ OB_SERIALIZE_MEMBER(ObLogReqLoadProxyProgressResponse, err_, progress_); OB_SERIALIZE_MEMBER(ObDDLBuildSingleReplicaRequestArg, tenant_id_, ls_id_, source_tablet_id_, dest_tablet_id_, source_table_id_, dest_schema_id_, schema_version_, snapshot_version_, ddl_type_, task_id_, execution_id_, - parallelism_, tablet_task_id_); + parallelism_, tablet_task_id_, cluster_version_); int ObDDLBuildSingleReplicaRequestArg::assign(const ObDDLBuildSingleReplicaRequestArg &other) { @@ -5742,6 +5742,7 @@ int ObDDLBuildSingleReplicaRequestArg::assign(const ObDDLBuildSingleReplicaReque parallelism_ = other.parallelism_; execution_id_ = other.execution_id_; tablet_task_id_ = other.tablet_task_id_; + cluster_version_ = other.cluster_version_; return ret; } diff --git a/src/share/ob_rpc_struct.h b/src/share/ob_rpc_struct.h index 34b7b0337..8fe3f0cb0 100644 --- a/src/share/ob_rpc_struct.h +++ b/src/share/ob_rpc_struct.h @@ -7213,16 +7213,16 @@ public: ObDDLBuildSingleReplicaRequestArg() : tenant_id_(OB_INVALID_ID), ls_id_(), source_tablet_id_(), dest_tablet_id_(), source_table_id_(OB_INVALID_ID), dest_schema_id_(OB_INVALID_ID), schema_version_(0), snapshot_version_(0), ddl_type_(0), task_id_(0), - parallelism_(0), execution_id_(-1), tablet_task_id_(0) {} + parallelism_(0), execution_id_(-1), tablet_task_id_(0), cluster_version_(0) {} bool is_valid() const { return OB_INVALID_ID != tenant_id_ && ls_id_.is_valid() && source_tablet_id_.is_valid() && dest_tablet_id_.is_valid() && OB_INVALID_ID != source_table_id_ && OB_INVALID_ID != dest_schema_id_ && schema_version_ > 0 && snapshot_version_ > 0 - && task_id_ > 0 && parallelism_ > 0 && tablet_task_id_ > 0; + && task_id_ > 0 && parallelism_ > 0 && tablet_task_id_ > 0 && cluster_version_ > 0; } int assign(const ObDDLBuildSingleReplicaRequestArg &other); TO_STRING_KV(K_(tenant_id), K_(ls_id), K_(source_tablet_id), K_(dest_tablet_id), K_(source_table_id), K_(dest_schema_id), K_(schema_version), K_(snapshot_version), - K_(task_id), K_(parallelism), K_(execution_id), K_(tablet_task_id)); + K_(task_id), K_(parallelism), K_(execution_id), K_(tablet_task_id), K_(cluster_version)); public: uint64_t tenant_id_; share::ObLSID ls_id_; @@ -7237,6 +7237,7 @@ public: int64_t parallelism_; int64_t execution_id_; int64_t tablet_task_id_; + int64_t cluster_version_; }; struct ObDDLBuildSingleReplicaRequestResult final diff --git a/src/sql/engine/px/ob_px_sub_coord.cpp b/src/sql/engine/px/ob_px_sub_coord.cpp index 4046bfb24..610c3751b 100644 --- a/src/sql/engine/px/ob_px_sub_coord.cpp +++ b/src/sql/engine/px/ob_px_sub_coord.cpp @@ -816,7 +816,9 @@ int ObPxSubCoord::start_ddl() param.exec_ctx_ = exec_ctx; param.execution_id_ = phy_plan->get_ddl_execution_id(); param.ddl_task_id_ = phy_plan->get_ddl_task_id(); - if (OB_FAIL(ObSSTableInsertManager::get_instance().create_table_context(param, ddl_ctrl_.context_id_))) { + if (OB_FAIL(ObDDLUtil::get_ddl_cluster_version(tenant_id, param.ddl_task_id_, param.cluster_version_))) { + LOG_WARN("get ddl cluster version failed", K(ret)); + } else if (OB_FAIL(ObSSTableInsertManager::get_instance().create_table_context(param, ddl_ctrl_.context_id_))) { LOG_WARN("create table context failed", K(ret)); } else { FLOG_INFO("start ddl", "context_id", ddl_ctrl_.context_id_, K(param)); diff --git a/src/storage/ddl/ob_complement_data_task.cpp b/src/storage/ddl/ob_complement_data_task.cpp index 891daf5d9..4b9666b16 100644 --- a/src/storage/ddl/ob_complement_data_task.cpp +++ b/src/storage/ddl/ob_complement_data_task.cpp @@ -115,8 +115,10 @@ int ObComplementDataParam::init(const ObDDLBuildSingleReplicaRequestArg &arg) task_id_ = arg.task_id_; execution_id_ = arg.execution_id_; tablet_task_id_ = arg.tablet_task_id_; + cluster_version_ = arg.cluster_version_; FLOG_INFO("succeed to init ObComplementDataParam", K(ret), K(is_inited_), K(tenant_id_), K(ls_id_), - K(source_tablet_id_), K(dest_tablet_id_), K(schema_version_), K(task_id_), K(arg), K(concurrent_cnt_)); + K(source_tablet_id_), K(dest_tablet_id_), K(schema_version_), K(task_id_), K(arg), K(concurrent_cnt_), + K(cluster_version_)); } return ret; } @@ -334,7 +336,8 @@ int ObComplementDataContext::write_start_log(const ObComplementDataParam ¶m) } else if (OB_UNLIKELY(!hidden_table_key.is_valid())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid table key", K(ret), K(hidden_table_key)); - } else if (OB_FAIL(data_sstable_redo_writer_.start_ddl_redo(hidden_table_key, param.execution_id_, ddl_kv_mgr_handle_))) { + } else if (OB_FAIL(data_sstable_redo_writer_.start_ddl_redo(hidden_table_key, + param.execution_id_, param.cluster_version_, ddl_kv_mgr_handle_))) { LOG_WARN("fail write start log", K(ret), K(hidden_table_key), K(param)); } else { LOG_INFO("complement task start ddl redo success", K(hidden_table_key)); @@ -447,7 +450,8 @@ int ObComplementDataDag::prepare_context() param_.ls_id_, param_.dest_tablet_id_, MAJOR_MERGE, - param_.snapshot_version_))) { + param_.snapshot_version_, + param_.cluster_version_))) { LOG_WARN("fail to init data desc", K(ret)); } else { data_desc.row_column_count_ = data_desc.rowkey_column_count_ + 1; @@ -981,7 +985,8 @@ int ObComplementWriteTask::append_row(ObLocalScan &local_scan) param_->ls_id_, param_->dest_tablet_id_, MAJOR_MERGE, - param_->snapshot_version_))) { + param_->snapshot_version_, + param_->cluster_version_))) { LOG_WARN("fail to init data store desc", K(ret), K(*param_), K(param_->dest_tablet_id_)); } else if (FALSE_IT(data_desc.sstable_index_builder_ = context_->index_builder_)) { } else if (FALSE_IT(data_desc.is_ddl_ = true)) { diff --git a/src/storage/ddl/ob_complement_data_task.h b/src/storage/ddl/ob_complement_data_task.h index 979e38853..c9e288600 100644 --- a/src/storage/ddl/ob_complement_data_task.h +++ b/src/storage/ddl/ob_complement_data_task.h @@ -43,7 +43,7 @@ public: data_table_schema_(nullptr), hidden_table_schema_(nullptr), allocator_("CompleteDataPar"), row_store_type_(common::ENCODING_ROW_STORE), schema_version_(0), snapshot_version_(0), concurrent_cnt_(0), task_id_(0), execution_id_(-1), tablet_task_id_(0), - compat_mode_(lib::Worker::CompatMode::INVALID) + compat_mode_(lib::Worker::CompatMode::INVALID), cluster_version_(0) {} ~ObComplementDataParam() { destroy(); } int init(const ObDDLBuildSingleReplicaRequestArg &arg); @@ -54,7 +54,7 @@ public: return common::OB_INVALID_TENANT_ID != tenant_id_ && ls_id_.is_valid() && source_tablet_id_.is_valid() && dest_tablet_id_.is_valid() && OB_NOT_NULL(data_table_schema_) && OB_NOT_NULL(hidden_table_schema_) && 0 != concurrent_cnt_ && snapshot_version_ > 0 && compat_mode_ != lib::Worker::CompatMode::INVALID - && execution_id_ >= 0 && tablet_task_id_ > 0; + && execution_id_ >= 0 && tablet_task_id_ > 0 && cluster_version_ > 0; } int get_hidden_table_key(ObITable::TableKey &table_key) const; void destroy() @@ -82,10 +82,12 @@ public: execution_id_ = -1; tablet_task_id_ = 0; compat_mode_ = lib::Worker::CompatMode::INVALID; + cluster_version_ = 0; } TO_STRING_KV(K_(is_inited), K_(tenant_id), K_(ls_id), K_(source_tablet_id), K_(dest_tablet_id), KPC_(data_table_schema), KPC_(hidden_table_schema), K_(schema_version), K_(tablet_task_id), - K_(snapshot_version), K_(concurrent_cnt), K_(task_id), K_(execution_id), K_(compat_mode)); + K_(snapshot_version), K_(concurrent_cnt), K_(task_id), K_(execution_id), K_(compat_mode), + K_(cluster_version)); public: bool is_inited_; uint64_t tenant_id_; @@ -103,6 +105,7 @@ public: int64_t execution_id_; int64_t tablet_task_id_; lib::Worker::CompatMode compat_mode_; + int64_t cluster_version_; ObSEArray ranges_; }; diff --git a/src/storage/ddl/ob_ddl_merge_task.cpp b/src/storage/ddl/ob_ddl_merge_task.cpp index 30943e545..f03903dff 100644 --- a/src/storage/ddl/ob_ddl_merge_task.cpp +++ b/src/storage/ddl/ob_ddl_merge_task.cpp @@ -666,6 +666,7 @@ int ObTabletDDLUtil::create_ddl_sstable(ObSSTableIndexBuilder *sstable_index_bui ddl_param.table_key_.is_major_sstable() ? false: true, // keep_old_ddl_sstable &storage_schema, rebuild_seq, + ddl_param.cluster_version_, ddl_param.table_key_.is_major_sstable() ? true : false, // update_with_major_flag ddl_param.table_key_.is_major_sstable() ? true : false); // need report checksum if (OB_FAIL(ls_handle.get_ls()->update_tablet_table_store(ddl_param.table_key_.get_tablet_id(), table_store_param, new_tablet_handle))) { diff --git a/src/storage/ddl/ob_ddl_redo_log_writer.cpp b/src/storage/ddl/ob_ddl_redo_log_writer.cpp index 638865aa3..1942aeb72 100644 --- a/src/storage/ddl/ob_ddl_redo_log_writer.cpp +++ b/src/storage/ddl/ob_ddl_redo_log_writer.cpp @@ -960,6 +960,7 @@ int ObDDLSSTableRedoWriter::init(const ObLSID &ls_id, const ObTabletID &tablet_i int ObDDLSSTableRedoWriter::start_ddl_redo(const ObITable::TableKey &table_key, const int64_t execution_id, + const int64_t ddl_cluster_version, ObDDLKvMgrHandle &ddl_kv_mgr_handle) { int ret = OB_SUCCESS; @@ -972,11 +973,11 @@ int ObDDLSSTableRedoWriter::start_ddl_redo(const ObITable::TableKey &table_key, if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; LOG_WARN("ObDDLSSTableRedoWriter has not been inited", K(ret)); - } else if (OB_UNLIKELY(!table_key.is_valid() || execution_id < 0)) { + } else if (OB_UNLIKELY(!table_key.is_valid() || execution_id < 0 || ddl_cluster_version <= 0)) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid arguments", K(ret), K(table_key), K(execution_id)); - } else if (OB_FAIL(log.init(table_key, GET_MIN_CLUSTER_VERSION(), execution_id))) { - LOG_WARN("fail to init DDLStartLog", K(ret), K(table_key), K(execution_id), "cluster_version", GET_MIN_CLUSTER_VERSION()); + LOG_WARN("invalid arguments", K(ret), K(table_key), K(execution_id), K(ddl_cluster_version)); + } else if (OB_FAIL(log.init(table_key, ddl_cluster_version, execution_id))) { + LOG_WARN("fail to init DDLStartLog", K(ret), K(table_key), K(execution_id), K(ddl_cluster_version)); } else if (OB_FAIL(MTL(ObLSService *)->get_ls(ls_id_, ls_handle, ObLSGetMod::DDL_MOD))) { LOG_WARN("get ls failed", K(ret), K(ls_id_)); } else if (OB_ISNULL(ls = ls_handle.get_ls())) { diff --git a/src/storage/ddl/ob_ddl_redo_log_writer.h b/src/storage/ddl/ob_ddl_redo_log_writer.h index aafa3f0bc..7476d0da6 100644 --- a/src/storage/ddl/ob_ddl_redo_log_writer.h +++ b/src/storage/ddl/ob_ddl_redo_log_writer.h @@ -247,6 +247,7 @@ public: int init(const share::ObLSID &ls_id, const ObTabletID &tablet_id); int start_ddl_redo(const ObITable::TableKey &table_key, const int64_t execution_id, + const int64_t ddl_cluster_version, ObDDLKvMgrHandle &ddl_kv_mgr_handle); int write_redo_log(const blocksstable::ObDDLMacroBlockRedoInfo &redo_info, const blocksstable::MacroBlockId ¯o_block_id); diff --git a/src/storage/ddl/ob_direct_insert_sstable_ctx.cpp b/src/storage/ddl/ob_direct_insert_sstable_ctx.cpp index 2ac83d49d..dc0de6690 100644 --- a/src/storage/ddl/ob_direct_insert_sstable_ctx.cpp +++ b/src/storage/ddl/ob_direct_insert_sstable_ctx.cpp @@ -38,7 +38,8 @@ using namespace oceanbase::sql; /*************** ObSSTableInsertTabletParam *****************/ ObSSTableInsertTabletParam::ObSSTableInsertTabletParam() : context_id_(0), ls_id_(), tablet_id_(), table_id_(0), write_major_(false), - task_cnt_(0), schema_version_(0), snapshot_version_(0), execution_id_(-1), ddl_task_id_(0) + task_cnt_(0), schema_version_(0), snapshot_version_(0), execution_id_(-1), ddl_task_id_(0), + cluster_version_(0) { } @@ -57,7 +58,8 @@ bool ObSSTableInsertTabletParam::is_valid() const && task_cnt_ >= 0 && schema_version_ > 0 && execution_id_ >= 0 - && ddl_task_id_ > 0; + && ddl_task_id_ > 0 + && cluster_version_ > 0; return bret; } @@ -261,7 +263,8 @@ int ObSSTableInsertTabletContext::update(const int64_t snapshot_version) LOG_WARN("invalid argument", K(ret), K(table_key)); } else if (data_sstable_redo_writer_.get_start_scn().is_valid_and_not_min()) { // ddl start log is already written, do nothing - } else if (OB_FAIL(data_sstable_redo_writer_.start_ddl_redo(table_key, build_param_.execution_id_, ddl_kv_mgr_handle_))) { + } else if (OB_FAIL(data_sstable_redo_writer_.start_ddl_redo(table_key, + build_param_.execution_id_, build_param_.cluster_version_, ddl_kv_mgr_handle_))) { LOG_WARN("fail write start log", K(ret), K(table_key), K(build_param_)); } } @@ -364,7 +367,8 @@ int ObSSTableInsertTabletContext::build_sstable_slice( ls_id, tablet_id, // TODO(shuangcan): confirm this build_param.write_major_ ? storage::MAJOR_MERGE : storage::MINOR_MERGE, - frozen_status.frozen_scn_.get_val_for_tx()))) { + frozen_status.frozen_scn_.get_val_for_tx(), + build_param_.cluster_version_))) { LOG_WARN("init data store desc failed", K(ret), K(tablet_id)); } else { // index builder is need for write macro meta block. @@ -511,7 +515,7 @@ int ObSSTableInsertTabletContext::prepare_index_builder_if_need(const ObTableSch build_param_.tablet_id_, // TODO(shuangcan): confirm this build_param_.write_major_ ? storage::MAJOR_MERGE : storage::MINOR_MERGE, 1L /*snapshot_version*/, - GET_MIN_CLUSTER_VERSION()))) { + build_param_.cluster_version_))) { LOG_WARN("fail to init data desc", K(ret)); } else { data_desc.row_column_count_ = data_desc.rowkey_column_count_ + 1; @@ -738,7 +742,7 @@ int ObSSTableInsertTabletContext::get_table_key(ObITable::TableKey &table_key) ObSSTableInsertTableParam::ObSSTableInsertTableParam() : exec_ctx_(nullptr), context_id_(0), dest_table_id_(OB_INVALID_ID), write_major_(false), schema_version_(0), - snapshot_version_(0), task_cnt_(0), execution_id_(-1), ddl_task_id_(0), ls_tablet_ids_() + snapshot_version_(0), task_cnt_(0), execution_id_(-1), ddl_task_id_(0), cluster_version_(0), ls_tablet_ids_() { } @@ -756,6 +760,7 @@ int ObSSTableInsertTableParam::assign(const ObSSTableInsertTableParam &other) task_cnt_ = other.task_cnt_; execution_id_ = other.execution_id_; ddl_task_id_ = other.ddl_task_id_; + cluster_version_ = other.cluster_version_; exec_ctx_ = other.exec_ctx_; } return ret; @@ -826,6 +831,7 @@ int ObSSTableInsertTableContext::create_all_tablet_contexts( param.task_cnt_ = param_.task_cnt_; param.execution_id_ = param_.execution_id_; param.ddl_task_id_ = param_.ddl_task_id_; + param.cluster_version_ = param_.cluster_version_; if (OB_FAIL(tablet_ctx->init(param))) { LOG_WARN("init tablet insert sstable context", K(ret)); } else if (OB_FAIL(tablet_ctx_map_.set_refactored(tablet_id, tablet_ctx))) { diff --git a/src/storage/ddl/ob_direct_insert_sstable_ctx.h b/src/storage/ddl/ob_direct_insert_sstable_ctx.h index 91858a52a..ae4e8742a 100644 --- a/src/storage/ddl/ob_direct_insert_sstable_ctx.h +++ b/src/storage/ddl/ob_direct_insert_sstable_ctx.h @@ -55,7 +55,8 @@ public: ~ObSSTableInsertTabletParam(); bool is_valid() const; TO_STRING_KV(K(context_id_), K(ls_id_), K(tablet_id_), K(table_id_), K(write_major_), - K(task_cnt_), K(schema_version_), K(snapshot_version_), K_(execution_id), K_(ddl_task_id)); + K(task_cnt_), K(schema_version_), K(snapshot_version_), K_(execution_id), K_(ddl_task_id), + K_(cluster_version)); public: int64_t context_id_; share::ObLSID ls_id_; @@ -67,6 +68,7 @@ public: int64_t snapshot_version_; int64_t execution_id_; int64_t ddl_task_id_; + int64_t cluster_version_; }; typedef std::pair LSTabletIDPair; @@ -148,9 +150,9 @@ public: int assign(const ObSSTableInsertTableParam &other); bool is_valid() const { return exec_ctx_ != nullptr && OB_INVALID_ID != dest_table_id_ && schema_version_ >= 0 && snapshot_version_ >= 0 && task_cnt_ >= 0 - && execution_id_ >= 0 && ddl_task_id_ > 0 && ls_tablet_ids_.count() > 0; } + && execution_id_ >= 0 && ddl_task_id_ > 0 && cluster_version_ > 0 && ls_tablet_ids_.count() > 0; } TO_STRING_KV(K_(context_id), K_(dest_table_id), K_(write_major), K_(schema_version), K_(snapshot_version), - K_(task_cnt), K_(execution_id), K_(ddl_task_id), K_(ls_tablet_ids)); + K_(task_cnt), K_(execution_id), K_(ddl_task_id), K_(cluster_version), K_(ls_tablet_ids)); public: sql::ObExecContext *exec_ctx_; int64_t context_id_; @@ -161,6 +163,7 @@ public: int64_t task_cnt_; int64_t execution_id_; int64_t ddl_task_id_; + int64_t cluster_version_; common::ObArray ls_tablet_ids_; }; diff --git a/src/storage/ob_storage_struct.cpp b/src/storage/ob_storage_struct.cpp index d18ab060c..add3ab862 100644 --- a/src/storage/ob_storage_struct.cpp +++ b/src/storage/ob_storage_struct.cpp @@ -279,6 +279,7 @@ ObUpdateTableStoreParam::ObUpdateTableStoreParam( const bool keep_old_ddl_sstable, const ObStorageSchema *storage_schema, const int64_t rebuild_seq, + const int64_t ddl_cluster_version, const bool update_with_major_flag, const bool need_report) : table_handle_(table_handle), @@ -295,7 +296,7 @@ ObUpdateTableStoreParam::ObUpdateTableStoreParam( ddl_start_scn_(SCN::min_scn()), ddl_snapshot_version_(0), ddl_execution_id_(-1), - ddl_cluster_version_(0), + ddl_cluster_version_(ddl_cluster_version), allow_duplicate_sstable_(false), tx_data_(), binding_info_(), diff --git a/src/storage/ob_storage_struct.h b/src/storage/ob_storage_struct.h index b9f88cd24..3653f5d7f 100644 --- a/src/storage/ob_storage_struct.h +++ b/src/storage/ob_storage_struct.h @@ -318,6 +318,7 @@ struct ObUpdateTableStoreParam const bool keep_old_ddl_sstable, const ObStorageSchema *storage_schema, const int64_t rebuild_seq, + const int64_t ddl_cluster_version, const bool update_with_major_flag, const bool need_report = false);