[FEAT MERGE] Support exchanging partition between partitioned table A and non-partitioned table B.

Co-authored-by: fkuner <784819644@qq.com>
This commit is contained in:
791065426@qq.com
2024-04-16 07:19:48 +00:00
committed by ob-robot
parent 2c510c6538
commit 1ee64365ea
46 changed files with 4105 additions and 102 deletions

View File

@ -900,6 +900,24 @@ int ObAlterTableExecutor::alter_table_rpc_v2(
return ret;
}
int ObAlterTableExecutor::alter_table_exchange_partition_rpc(obrpc::ObExchangePartitionArg &exchange_partition_arg,
obrpc::ObAlterTableRes &res,
obrpc::ObCommonRpcProxy *common_rpc_proxy,
ObSQLSessionInfo *my_session)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(my_session) || OB_ISNULL(common_rpc_proxy) || OB_UNLIKELY(!exchange_partition_arg.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), K(exchange_partition_arg.is_valid()));
} else if (OB_FAIL(common_rpc_proxy->exchange_partition(exchange_partition_arg, res))) {
LOG_WARN("rpc proxy alter table failed", K(ret), "dst", common_rpc_proxy->get_server(), K(exchange_partition_arg));
} else {
// 在回滚时不会重试,也不检查 schema version
exchange_partition_arg.based_schema_object_infos_.reset();
}
return ret;
}
int ObAlterTableExecutor::sort_external_files(ObIArray<ObString> &file_urls,
ObIArray<int64_t> &file_sizes) {
int ret = OB_SUCCESS;
@ -1188,6 +1206,7 @@ int ObAlterTableExecutor::execute(ObExecContext &ctx, ObAlterTableStmt &stmt)
ObTaskExecutorCtx *task_exec_ctx = NULL;
obrpc::ObCommonRpcProxy *common_rpc_proxy = NULL;
obrpc::ObAlterTableArg &alter_table_arg = stmt.get_alter_table_arg();
obrpc::ObExchangePartitionArg &exchange_partition_arg = stmt.get_exchange_partition_arg();
LOG_DEBUG("start of alter table execute", K(alter_table_arg));
ObString first_stmt;
OZ (stmt.get_first_stmt(first_stmt));
@ -1219,6 +1238,7 @@ int ObAlterTableExecutor::execute(ObExecContext &ctx, ObAlterTableStmt &stmt)
LOG_WARN("get first statement failed", K(ret));
} else {
alter_table_arg.ddl_stmt_str_ = first_stmt;
exchange_partition_arg.ddl_stmt_str_ = first_stmt;
my_session = ctx.get_my_session();
if (NULL == my_session) {
ret = OB_ERR_UNEXPECTED;
@ -1262,13 +1282,19 @@ int ObAlterTableExecutor::execute(ObExecContext &ctx, ObAlterTableStmt &stmt)
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(alter_table_rpc_v2(
alter_table_arg,
res,
allocator,
common_rpc_proxy,
my_session,
is_sync_ddl_user))) {
if (obrpc::ObAlterTableArg::EXCHANGE_PARTITION == alter_table_arg.alter_part_type_) {
if (OB_FAIL(alter_table_exchange_partition_rpc(exchange_partition_arg,
res,
common_rpc_proxy,
my_session))) {
LOG_WARN("Failed to alter table exchange partition rpc", K(ret), K(exchange_partition_arg));
}
} else if (OB_FAIL(alter_table_rpc_v2(alter_table_arg,
res,
allocator,
common_rpc_proxy,
my_session,
is_sync_ddl_user))) {
LOG_WARN("Failed to alter table rpc v2", K(ret));
}
}
@ -1977,6 +2003,8 @@ int ObAlterTableExecutor::check_alter_partition(ObExecContext &ctx,
|| obrpc::ObAlterTableArg::TRUNCATE_PARTITION == arg.alter_part_type_
|| obrpc::ObAlterTableArg::TRUNCATE_SUB_PARTITION == arg.alter_part_type_) {
// do-nothing
} else if (obrpc::ObAlterTableArg::EXCHANGE_PARTITION == arg.alter_part_type_) {
// do-nothing
} else {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("no operation", K(arg.alter_part_type_), K(ret));