[FEAT MERGE] ddl parallel truncate table

Co-authored-by: obdev <obdev@oceanbase.com>
Co-authored-by: obdev <obdev@oceanbase.com>
Co-authored-by: obdev <obdev@oceanbase.com>
Co-authored-by: obdev <obdev@oceanbase.com>
Co-authored-by: obdev <obdev@oceanbase.com>
This commit is contained in:
obdev
2023-01-28 13:37:56 +08:00
committed by ob-robot
parent a13f63478a
commit b2607a0ccf
77 changed files with 5485 additions and 783 deletions

View File

@ -10,6 +10,7 @@
* See the Mulan PubL v2 for more details.
*/
#include "share/ob_cluster_version.h"
#define USING_LOG_PREFIX SQL_ENG
#include "sql/engine/cmd/ob_table_executor.h"
#include "sql/engine/cmd/ob_index_executor.h"
@ -381,7 +382,7 @@ int ObCreateTableExecutor::execute_ctas(ObExecContext &ctx,
//4, 刷新schema, 将table的sess id重置为0
if (OB_SUCC(ret)) {
obrpc::ObAlterTableRes res;
alter_table_arg.compat_mode_ = ORACLE_MODE == my_session->get_compatibility_mode() ?
alter_table_arg.compat_mode_ = ORACLE_MODE == my_session->get_compatibility_mode() ?
lib::Worker::CompatMode::ORACLE : lib::Worker::CompatMode::MYSQL;
if (OB_FAIL(common_rpc_proxy->alter_table(alter_table_arg, res))) {
LOG_WARN("failed to update table session", K(ret), K(alter_table_arg));
@ -396,7 +397,7 @@ int ObCreateTableExecutor::execute_ctas(ObExecContext &ctx,
if (OB_LIKELY(need_clean)) {
int tmp_ret = OB_SUCCESS;
obrpc::ObDDLRes res;
drop_table_arg.compat_mode_ = ORACLE_MODE == my_session->get_compatibility_mode() ?
drop_table_arg.compat_mode_ = ORACLE_MODE == my_session->get_compatibility_mode() ?
lib::Worker::CompatMode::ORACLE : lib::Worker::CompatMode::MYSQL;
if (OB_SUCCESS != (tmp_ret = common_rpc_proxy->drop_table(drop_table_arg, res))) {
LOG_WARN("failed to drop table", K(drop_table_arg), K(ret));
@ -458,7 +459,7 @@ int ObCreateTableExecutor::execute(ObExecContext &ctx, ObCreateTableStmt &stmt)
LOG_WARN("execute create table as select failed", K(ret));
}
// only CTAS or create temperary table will make session_id != 0. If such table detected, set
// only CTAS or create temperary table will make session_id != 0. If such table detected, set
// need ctas cleanup task anyway to do some cleanup jobs
if (0 != table_schema.get_session_id()) {
LOG_TRACE("CTAS or temporary table create detected", K(table_schema));
@ -556,12 +557,12 @@ int ObAlterTableExecutor::alter_table_rpc_v2(
ObSArray<obrpc::ObIndexArg *> add_index_arg_list;
ObSArray<obrpc::ObIndexArg *> drop_index_args;
alter_table_arg.index_arg_list_.reset();
if (OB_ISNULL(my_session)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret));
} else {
alter_table_arg.compat_mode_ = ORACLE_MODE == my_session->get_compatibility_mode() ?
alter_table_arg.compat_mode_ = ORACLE_MODE == my_session->get_compatibility_mode() ?
lib::Worker::CompatMode::ORACLE : lib::Worker::CompatMode::MYSQL;
}
for (int64_t i = 0; OB_SUCC(ret) && i < index_arg_list.size(); ++i) {
@ -833,7 +834,7 @@ int ObAlterTableExecutor::execute(ObExecContext &ctx, ObAlterTableStmt &stmt)
int64_t affected_rows = 0;
if (OB_FAIL(refresh_schema_for_table(alter_table_arg.exec_tenant_id_))) {
LOG_WARN("refresh_schema_for_table failed", K(ret));
} else if (!is_ddl_retry_task && OB_FAIL(ObDDLExecutorUtil::wait_ddl_finish(tenant_id, res.task_id_,
} else if (!is_ddl_retry_task && OB_FAIL(ObDDLExecutorUtil::wait_ddl_finish(tenant_id, res.task_id_,
*my_session, common_rpc_proxy))) {
LOG_WARN("fail to wait ddl finish", K(ret), K(tenant_id), K(res));
} else if (is_ddl_retry_task && OB_FAIL(ObDDLExecutorUtil::wait_ddl_retry_task_finish(tenant_id, res.task_id_,
@ -1492,12 +1493,12 @@ int ObAlterTableExecutor::check_alter_partition(ObExecContext &ctx,
CK (NULL != stmt.get_transition_expr());
OZ (ObPartitionExecutorUtils::check_transition_interval_valid(
stmt::T_CREATE_TABLE,
ctx,
ctx,
stmt.get_transition_expr(),
stmt.get_interval_expr()));
OZ (ObPartitionExecutorUtils::set_interval_value(ctx,
stmt::T_CREATE_TABLE,
table_schema,
OZ (ObPartitionExecutorUtils::set_interval_value(ctx,
stmt::T_CREATE_TABLE,
table_schema,
stmt.get_interval_expr()));
}
@ -1587,8 +1588,8 @@ int ObDropTableExecutor::execute(ObExecContext &ctx, ObDropTableStmt &stmt)
//impossible
} else if (FALSE_IT(my_session->get_foreign_key_checks(foreign_key_checks))) {
} else if (FALSE_IT(const_cast<obrpc::ObDropTableArg&>(drop_table_arg).foreign_key_checks_ = is_oracle_mode() || (is_mysql_mode() && foreign_key_checks))) {
} else if (FALSE_IT(const_cast<obrpc::ObDropTableArg&>(drop_table_arg).compat_mode_ =
ORACLE_MODE == my_session->get_compatibility_mode() ?
} else if (FALSE_IT(const_cast<obrpc::ObDropTableArg&>(drop_table_arg).compat_mode_ =
ORACLE_MODE == my_session->get_compatibility_mode() ?
lib::Worker::CompatMode::ORACLE : lib::Worker::CompatMode::MYSQL)) {
} else if (OB_FAIL(common_rpc_proxy->drop_table(drop_table_arg, res))) {
LOG_WARN("rpc proxy drop table failed", K(ret), "dst", common_rpc_proxy->get_server());
@ -1677,12 +1678,41 @@ int ObTruncateTableExecutor::execute(ObExecContext &ctx, ObTruncateTableStmt &st
const_cast<obrpc::ObTruncateTableArg&>(truncate_table_arg).compat_mode_ = ORACLE_MODE == my_session->get_compatibility_mode()
? lib::Worker::CompatMode::ORACLE : lib::Worker::CompatMode::MYSQL;
int64_t affected_rows = 0;
if (OB_FAIL(common_rpc_proxy->truncate_table(truncate_table_arg, res))) {
LOG_WARN("rpc proxy alter table failed", K(ret));
} else if (res.is_valid()
&& OB_FAIL(ObDDLExecutorUtil::wait_ddl_retry_task_finish(res.tenant_id_, res.task_id_, *my_session, common_rpc_proxy, affected_rows))) {
LOG_WARN("wait ddl finish failed", K(ret));
uint64_t compat_version = 0;
if (OB_FAIL(GET_MIN_DATA_VERSION(truncate_table_arg.tenant_id_, compat_version))) {
LOG_WARN("get min data_version failed", K(ret), K(truncate_table_arg.tenant_id_));
} else if (compat_version < DATA_VERSION_4_1_0_0) {
if (OB_FAIL(common_rpc_proxy->truncate_table(truncate_table_arg, res))) {
LOG_WARN("rpc proxy alter table failed", K(ret));
} else if (res.is_valid()
&& OB_FAIL(ObDDLExecutorUtil::wait_ddl_retry_task_finish(res.tenant_id_, res.task_id_, *my_session, common_rpc_proxy, affected_rows))) {
LOG_WARN("wait ddl finish failed", K(ret));
}
} else {
// new parallel truncate
ObTimeoutCtx ctx;
if (OB_FAIL(ObShareUtil::set_default_timeout_ctx(ctx, GCONF._ob_ddl_timeout))) {
LOG_WARN("fail to set timeout ctx", K(ret));
} else {
int64_t start_time = ObTimeUtility::current_time();
while (OB_SUCC(ret)) {
if (OB_FAIL(common_rpc_proxy->truncate_table_v2(truncate_table_arg, res))) {
LOG_WARN("rpc proxy truncate table failed", K(ret));
if ((OB_TRY_LOCK_ROW_CONFLICT == ret || OB_TIMEOUT == ret) && ctx.get_timeout() > 0) {
ob_usleep(1 * 1000 * 1000);
// retry
ret = OB_SUCCESS;
}
} else {
// success
break;
}
}
int64_t end_time = ObTimeUtility::current_time();
LOG_INFO("truncate_table_v2", K(ret), "cost", end_time-start_time, "table_name", truncate_table_arg.table_name_);
}
}
} else {
ObSqlString sql;
int64_t affect_rows = 0;