[FEAT MERGE] column store ddl
Co-authored-by: AnimationFan <30674773338@qq.com> Co-authored-by: simonjoylet <simonjoylet@gmail.com> Co-authored-by: Monk-Liu <1152761042@qq.com>
This commit is contained in:
@ -39,7 +39,7 @@ ObTableRedefinitionTask::ObTableRedefinitionTask()
|
||||
has_rebuild_index_(false), has_rebuild_constraint_(false), has_rebuild_foreign_key_(false),
|
||||
allocator_(lib::ObLabel("RedefTask")),
|
||||
is_copy_indexes_(true), is_copy_triggers_(true), is_copy_constraints_(true), is_copy_foreign_keys_(true),
|
||||
is_ignore_errors_(false), is_do_finish_(false)
|
||||
is_ignore_errors_(false), is_do_finish_(false), target_cg_cnt_(0)
|
||||
{
|
||||
}
|
||||
|
||||
@ -47,61 +47,69 @@ ObTableRedefinitionTask::~ObTableRedefinitionTask()
|
||||
{
|
||||
}
|
||||
|
||||
int ObTableRedefinitionTask::init(const uint64_t src_tenant_id, const uint64_t dst_tenant_id, const int64_t task_id,
|
||||
const share::ObDDLType &ddl_type, const int64_t data_table_id, const int64_t dest_table_id, const int64_t src_schema_version,
|
||||
const int64_t dst_schema_version, const int64_t parallelism, const int64_t consumer_group_id, const int32_t sub_task_trace_id,
|
||||
const ObAlterTableArg &alter_table_arg, const int64_t task_status, const int64_t snapshot_version)
|
||||
int ObTableRedefinitionTask::init(const ObTableSchema* src_table_schema, const ObTableSchema* dst_table_schema, const int64_t task_id,
|
||||
const share::ObDDLType &ddl_type, const int64_t parallelism, const int64_t consumer_group_id, const int32_t sub_task_trace_id,
|
||||
const ObAlterTableArg &alter_table_arg, const uint64_t tenant_data_version, const int64_t task_status, const int64_t snapshot_version)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
uint64_t tenant_data_format_version = 0;
|
||||
if (OB_UNLIKELY(is_inited_)) {
|
||||
ret = OB_INIT_TWICE;
|
||||
LOG_WARN("ObTableRedefinitionTask has already been inited", K(ret));
|
||||
} else if (OB_UNLIKELY(OB_INVALID_ID == src_tenant_id || OB_INVALID_ID == dst_tenant_id
|
||||
|| task_id <= 0 || OB_INVALID_ID == data_table_id || OB_INVALID_ID == dest_table_id
|
||||
|| src_schema_version <= 0 || dst_schema_version <= 0
|
||||
|| task_status < ObDDLTaskStatus::PREPARE || task_status > ObDDLTaskStatus::SUCCESS || snapshot_version < 0
|
||||
|| (snapshot_version > 0 && task_status < ObDDLTaskStatus::WAIT_TRANS_END))) {
|
||||
} else if (OB_ISNULL(src_table_schema) || OB_ISNULL(dst_table_schema)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid arguments", K(ret), K(src_tenant_id), K(dst_tenant_id), K(task_id),
|
||||
K(data_table_id), K(dest_table_id), K(src_schema_version), K(dst_schema_version),
|
||||
LOG_WARN("invalid argument", K(ret), KP(src_table_schema), KP(dst_table_schema));
|
||||
} else if (OB_UNLIKELY( !src_table_schema->is_valid()
|
||||
|| !dst_table_schema->is_valid()
|
||||
|| task_id <= 0 || snapshot_version < 0 || tenant_data_version <= 0
|
||||
|| task_status < ObDDLTaskStatus::PREPARE || task_status > ObDDLTaskStatus::SUCCESS
|
||||
|| (snapshot_version > 0 && task_status < ObDDLTaskStatus::WAIT_TRANS_END))) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid arguments", K(ret), KPC(src_table_schema), KPC(dst_table_schema), K(task_id),
|
||||
K(task_status), K(snapshot_version));
|
||||
} else if (OB_FAIL(deep_copy_table_arg(allocator_, alter_table_arg, alter_table_arg_))) {
|
||||
LOG_WARN("deep copy alter table arg failed", K(ret));
|
||||
} else if (OB_FAIL(set_ddl_stmt_str(alter_table_arg_.ddl_stmt_str_))) {
|
||||
LOG_WARN("set ddl stmt str failed", K(ret));
|
||||
} else if (OB_FAIL(ObShareUtil::fetch_current_data_version(*GCTX.sql_proxy_, src_tenant_id, tenant_data_format_version))) {
|
||||
LOG_WARN("get min data version failed", K(ret), K(src_tenant_id));
|
||||
} else {
|
||||
set_gmt_create(ObTimeUtility::current_time());
|
||||
consumer_group_id_ = consumer_group_id;
|
||||
sub_task_trace_id_ = sub_task_trace_id;
|
||||
task_type_ = ddl_type;
|
||||
object_id_ = data_table_id;
|
||||
target_object_id_ = dest_table_id;
|
||||
schema_version_ = src_schema_version;
|
||||
object_id_ = src_table_schema->get_table_id();
|
||||
target_object_id_ = dst_table_schema->get_table_id();
|
||||
|
||||
/* only table restore set schema_serson = src, other use dst*/
|
||||
if (ObDDLType::DDL_TABLE_RESTORE == ddl_type) {
|
||||
schema_version_ = src_table_schema->get_schema_version();
|
||||
} else {
|
||||
schema_version_ = dst_table_schema->get_schema_version();
|
||||
}
|
||||
|
||||
task_status_ = static_cast<ObDDLTaskStatus>(task_status);
|
||||
snapshot_version_ = snapshot_version;
|
||||
tenant_id_ = src_tenant_id;
|
||||
tenant_id_ = src_table_schema->get_tenant_id();
|
||||
task_version_ = OB_TABLE_REDEFINITION_TASK_VERSION;
|
||||
task_id_ = task_id;
|
||||
parallelism_ = parallelism;
|
||||
data_format_version_ = tenant_data_format_version;
|
||||
data_format_version_ = tenant_data_version;
|
||||
start_time_ = ObTimeUtility::current_time();
|
||||
// For common offline ddl, dest_tenant_id is also the tenant_id_, i.e., tenant id of the data table.
|
||||
// But for DDL_RESTORE_TABLE, dst_tenant_id_ is different to the tenant_id_.
|
||||
dst_tenant_id_ = dst_tenant_id;
|
||||
dst_schema_version_ = dst_schema_version;
|
||||
alter_table_arg_.alter_table_schema_.set_tenant_id(src_tenant_id);
|
||||
alter_table_arg_.alter_table_schema_.set_schema_version(src_schema_version);
|
||||
dst_tenant_id_ = dst_table_schema->get_tenant_id();
|
||||
dst_schema_version_ = dst_table_schema->get_schema_version();
|
||||
alter_table_arg_.alter_table_schema_.set_tenant_id(tenant_id_);
|
||||
alter_table_arg_.alter_table_schema_.set_schema_version(schema_version_);
|
||||
alter_table_arg_.exec_tenant_id_ = dst_tenant_id_;
|
||||
if (OB_FAIL(init_ddl_task_monitor_info(target_object_id_))) {
|
||||
if (OB_FAIL(dst_table_schema->get_store_column_group_count(target_cg_cnt_))) {
|
||||
LOG_WARN("fail to get target cg cnt", K(ret), KPC(dst_table_schema));
|
||||
} else if (OB_FAIL(init_ddl_task_monitor_info(target_object_id_))) {
|
||||
LOG_WARN("init ddl task monitor info failed", K(ret));
|
||||
} else {
|
||||
is_inited_ = true;
|
||||
ddl_tracing_.open();
|
||||
}
|
||||
}
|
||||
|
||||
LOG_INFO("init table redefinition task finished", K(ret), KPC(this));
|
||||
return ret;
|
||||
}
|
||||
@ -167,6 +175,7 @@ int ObTableRedefinitionTask::init(const ObDDLTaskRecord &task_record)
|
||||
ddl_tracing_.open_for_recovery();
|
||||
}
|
||||
}
|
||||
|
||||
LOG_INFO("init table redefinition task finished", K(ret), KPC(this));
|
||||
return ret;
|
||||
}
|
||||
@ -283,7 +292,8 @@ int ObTableRedefinitionTask::send_build_replica_request_by_sql()
|
||||
alter_table_arg_.mview_refresh_info_.is_mview_complete_refresh_,
|
||||
alter_table_arg_.mview_refresh_info_.mview_table_id_,
|
||||
GCTX.root_service_,
|
||||
alter_table_arg_.inner_sql_exec_addr_);
|
||||
alter_table_arg_.inner_sql_exec_addr_,
|
||||
data_format_version_);
|
||||
if (OB_FAIL(root_service->get_ddl_service().get_tenant_schema_guard_with_version_in_inner_table(tenant_id_, schema_guard))) {
|
||||
LOG_WARN("get schema guard failed", K(ret));
|
||||
} else if (OB_FAIL(schema_guard.get_table_schema(tenant_id_, object_id_, orig_table_schema))) {
|
||||
@ -531,6 +541,7 @@ int ObTableRedefinitionTask::copy_table_indexes()
|
||||
&create_index_arg,
|
||||
task_id_);
|
||||
param.sub_task_trace_id_ = sub_task_trace_id_;
|
||||
param.tenant_data_version_ = data_format_version_;
|
||||
if (OB_FAIL(GCTX.root_service_->get_ddl_task_scheduler().create_ddl_task(param, *GCTX.sql_proxy_, task_record))) {
|
||||
if (OB_ENTRY_EXIST == ret) {
|
||||
ret = OB_SUCCESS;
|
||||
@ -1037,7 +1048,8 @@ int64_t ObTableRedefinitionTask::get_serialize_param_size() const
|
||||
return alter_table_arg_.get_serialize_size() + ObDDLTask::get_serialize_param_size()
|
||||
+ serialization::encoded_length_i8(copy_indexes) + serialization::encoded_length_i8(copy_triggers)
|
||||
+ serialization::encoded_length_i8(copy_constraints) + serialization::encoded_length_i8(copy_foreign_keys)
|
||||
+ serialization::encoded_length_i8(ignore_errors) + serialization::encoded_length_i8(do_finish);
|
||||
+ serialization::encoded_length_i8(ignore_errors) + serialization::encoded_length_i8(do_finish)
|
||||
+ serialization::encoded_length_i64(target_cg_cnt_);
|
||||
}
|
||||
|
||||
int ObTableRedefinitionTask::serialize_params_to_message(char *buf, const int64_t buf_len, int64_t &pos) const
|
||||
@ -1068,6 +1080,8 @@ int ObTableRedefinitionTask::serialize_params_to_message(char *buf, const int64_
|
||||
LOG_WARN("fail to serialize is_ignore_errors", K(ret));
|
||||
} else if (OB_FAIL(serialization::encode_i8(buf, buf_len, pos, do_finish))) {
|
||||
LOG_WARN("fail to serialize is_do_finish", K(ret));
|
||||
} else if (OB_FAIL(serialization::encode_i64(buf, buf_len, pos, target_cg_cnt_))) {
|
||||
LOG_WARN("fail to serialize target_cg_cnt", K(ret));
|
||||
}
|
||||
FLOG_INFO("serialize message for table redefinition", K(ret),
|
||||
K(copy_indexes), K(copy_triggers), K(copy_constraints), K(copy_foreign_keys), K(ignore_errors), K(do_finish), K(*this));
|
||||
@ -1108,7 +1122,10 @@ int ObTableRedefinitionTask::deserlize_params_from_message(const uint64_t tenant
|
||||
LOG_WARN("fail to deserialize is_ignore_errors_", K(ret));
|
||||
} else if (OB_FAIL(serialization::decode_i8(buf, data_len, pos, &do_finish))) {
|
||||
LOG_WARN("fail to deserialize is_do_finish_", K(ret));
|
||||
} else {
|
||||
} else if (OB_FAIL(serialization::decode_i64(buf, data_len, pos, &target_cg_cnt_))) {
|
||||
LOG_WARN("fail to deserialize target_cg_ctn_", K(ret));
|
||||
}
|
||||
else {
|
||||
is_copy_indexes_ = static_cast<bool>(copy_indexes);
|
||||
is_copy_triggers_ = static_cast<bool>(copy_triggers);
|
||||
is_copy_constraints_ = static_cast<bool>(copy_constraints);
|
||||
@ -1197,17 +1214,37 @@ int ObTableRedefinitionTask::collect_longops_stat(ObLongopsValue &value)
|
||||
case ObDDLTaskStatus::REDEFINITION: {
|
||||
int64_t row_scanned = 0;
|
||||
int64_t row_sorted = 0;
|
||||
int64_t row_inserted = 0;
|
||||
if (OB_FAIL(gather_redefinition_stats(dst_tenant_id_, task_id_, *GCTX.sql_proxy_, row_scanned, row_sorted, row_inserted))) {
|
||||
int64_t row_inserted_cg = 0;
|
||||
int64_t row_inserted_file = 0;
|
||||
|
||||
if (OB_FAIL(gather_redefinition_stats(dst_tenant_id_, task_id_, *GCTX.sql_proxy_, row_scanned, row_sorted, row_inserted_cg, row_inserted_file))) {
|
||||
LOG_WARN("failed to gather redefinition stats", K(ret));
|
||||
} else if (OB_FAIL(databuff_printf(stat_info_.message_,
|
||||
MAX_LONG_OPS_MESSAGE_LENGTH,
|
||||
pos,
|
||||
"STATUS: REPLICA BUILD, ROW_SCANNED: %ld, ROW_SORTED: %ld, ROW_INSERTED: %ld",
|
||||
row_scanned,
|
||||
row_sorted,
|
||||
row_inserted))) {
|
||||
LOG_WARN("failed to print", K(ret));
|
||||
}
|
||||
|
||||
|
||||
if (OB_FAIL(ret)){
|
||||
} else if (target_cg_cnt_> 1) {
|
||||
if (OB_FAIL(databuff_printf(stat_info_.message_,
|
||||
MAX_LONG_OPS_MESSAGE_LENGTH,
|
||||
pos,
|
||||
"STATUS: REPLICA BUILD, ROW_SCANNED: %ld, ROW_SORTED: %ld, ROW_INSERTED_TMP_FILE: %ld, ROW_INSERTED: %ld out of %ld column group rows",
|
||||
row_scanned,
|
||||
row_sorted,
|
||||
row_inserted_file,
|
||||
row_inserted_cg,
|
||||
row_scanned * target_cg_cnt_))) {
|
||||
LOG_WARN("failed to print", K(ret));
|
||||
}
|
||||
} else {
|
||||
if (OB_FAIL(databuff_printf(stat_info_.message_,
|
||||
MAX_LONG_OPS_MESSAGE_LENGTH,
|
||||
pos,
|
||||
"STATUS: REPLICA BUILD, ROW_SCANNED: %ld, ROW_SORTED: %ld, ROW_INSERTED: %ld",
|
||||
row_scanned,
|
||||
row_sorted,
|
||||
row_inserted_file))) {
|
||||
LOG_WARN("failed to print", K(ret));
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user