Support group_id transfer of ddl task with serlize_message

This commit is contained in:
renju96
2023-04-03 19:15:07 +00:00
committed by ob-robot
parent e29c97171c
commit db6535d4d2
54 changed files with 307 additions and 92 deletions

View File

@ -459,6 +459,7 @@ int ObCreateTableExecutor::execute(ObExecContext &ctx, ObCreateTableStmt &stmt)
LOG_WARN("get first statement failed", K(ret));
} else {
create_table_arg.is_inner_ = my_session->is_inner();
create_table_arg.consumer_group_id_ = THIS_WORKER.get_group_id();
const_cast<obrpc::ObCreateTableArg&>(create_table_arg).ddl_stmt_str_ = first_stmt;
if (OB_ISNULL(task_exec_ctx = GET_TASK_EXECUTOR_CTX(ctx))) {
ret = OB_NOT_INIT;
@ -779,6 +780,7 @@ int ObAlterTableExecutor::execute(ObExecContext &ctx, ObAlterTableStmt &stmt)
} else if (FALSE_IT(alter_table_arg.sql_mode_ = my_session->get_sql_mode())) {
// do nothing
} else if (FALSE_IT(alter_table_arg.parallelism_ = stmt.get_parallelism())) {
} else if (FALSE_IT(alter_table_arg.consumer_group_id_ = THIS_WORKER.get_group_id())) {
} else if (OB_FAIL(check_alter_partition(ctx, stmt, alter_table_arg))) {
LOG_WARN("check alter partition failed", K(ret));
} else if (OB_FAIL(alter_table_arg.alter_table_schema_.check_if_oracle_compat_mode(is_oracle_mode))) {
@ -1586,6 +1588,7 @@ int ObDropTableExecutor::execute(ObExecContext &ctx, ObDropTableStmt &stmt)
obrpc::ObCommonRpcProxy *common_rpc_proxy = NULL;
obrpc::ObDDLRes res;
const obrpc::ObDropTableArg &drop_table_arg = stmt.get_drop_table_arg();
obrpc::ObDropTableArg &tmp_arg = const_cast<obrpc::ObDropTableArg&>(drop_table_arg);
ObString first_stmt;
ObSQLSessionInfo *my_session = NULL;
int64_t foreign_key_checks = 0;
@ -1593,7 +1596,8 @@ int ObDropTableExecutor::execute(ObExecContext &ctx, ObDropTableStmt &stmt)
LOG_WARN("get first statement failed", K(ret));
} else {
int64_t affected_rows = 0;
const_cast<obrpc::ObDropTableArg&>(drop_table_arg).ddl_stmt_str_ = first_stmt;
tmp_arg.ddl_stmt_str_ = first_stmt;
tmp_arg.consumer_group_id_ = THIS_WORKER.get_group_id();
my_session = ctx.get_my_session();
if (NULL == my_session) {
ret = OB_ERR_UNEXPECTED;
@ -1607,12 +1611,11 @@ int ObDropTableExecutor::execute(ObExecContext &ctx, ObDropTableStmt &stmt)
ret = OB_ERR_UNEXPECTED;
LOG_WARN("common rpc proxy should not be null", K(ret));
} else if (OB_INVALID_ID == drop_table_arg.session_id_
&& FALSE_IT(const_cast<obrpc::ObDropTableArg&>(drop_table_arg).session_id_ = my_session->get_sessid_for_table())) {
&& FALSE_IT(tmp_arg.session_id_ = my_session->get_sessid_for_table())) {
//impossible
} else if (FALSE_IT(my_session->get_foreign_key_checks(foreign_key_checks))) {
} else if (FALSE_IT(const_cast<obrpc::ObDropTableArg&>(drop_table_arg).foreign_key_checks_ = is_oracle_mode() || (is_mysql_mode() && foreign_key_checks))) {
} else if (FALSE_IT(const_cast<obrpc::ObDropTableArg&>(drop_table_arg).compat_mode_ =
ORACLE_MODE == my_session->get_compatibility_mode() ?
} else if (FALSE_IT(tmp_arg.foreign_key_checks_ = is_oracle_mode() || (is_mysql_mode() && foreign_key_checks))) {
} else if (FALSE_IT(tmp_arg.compat_mode_ = ORACLE_MODE == my_session->get_compatibility_mode() ?
lib::Worker::CompatMode::ORACLE : lib::Worker::CompatMode::MYSQL)) {
} else if (OB_FAIL(common_rpc_proxy->drop_table(drop_table_arg, res))) {
LOG_WARN("rpc proxy drop table failed", K(ret), "dst", common_rpc_proxy->get_server());
@ -1670,13 +1673,15 @@ int ObTruncateTableExecutor::execute(ObExecContext &ctx, ObTruncateTableStmt &st
{
int ret = OB_SUCCESS;
const obrpc::ObTruncateTableArg &truncate_table_arg = stmt.get_truncate_table_arg();
obrpc::ObTruncateTableArg &tmp_arg = const_cast<obrpc::ObTruncateTableArg&>(truncate_table_arg);
ObString first_stmt;
obrpc::ObDDLRes res;
ObSQLSessionInfo *my_session = ctx.get_my_session();
if (OB_FAIL(stmt.get_first_stmt(first_stmt))) {
LOG_WARN("get first statement failed", K(ret));
} else {
const_cast<obrpc::ObTruncateTableArg&>(truncate_table_arg).ddl_stmt_str_ = first_stmt;
tmp_arg.ddl_stmt_str_ = first_stmt;
tmp_arg.consumer_group_id_ = THIS_WORKER.get_group_id();
ObTaskExecutorCtx *task_exec_ctx = NULL;
obrpc::ObCommonRpcProxy *common_rpc_proxy = NULL;
@ -1692,14 +1697,14 @@ int ObTruncateTableExecutor::execute(ObExecContext &ctx, ObTruncateTableStmt &st
ret = OB_ERR_UNEXPECTED;
LOG_WARN("failed to get my session", K(ret), K(ctx));
} else if (OB_INVALID_ID == truncate_table_arg.session_id_
&& FALSE_IT(const_cast<obrpc::ObTruncateTableArg&>(truncate_table_arg).session_id_ = my_session->get_sessid_for_table())) {
&& FALSE_IT(tmp_arg.session_id_ = my_session->get_sessid_for_table())) {
//impossible
} else if (!stmt.is_truncate_oracle_temp_table()) {
int64_t foreign_key_checks = 0;
share::schema::ObSchemaGetterGuard schema_guard;
my_session->get_foreign_key_checks(foreign_key_checks);
const_cast<obrpc::ObTruncateTableArg&>(truncate_table_arg).foreign_key_checks_ = is_oracle_mode() || (is_mysql_mode() && foreign_key_checks);
const_cast<obrpc::ObTruncateTableArg&>(truncate_table_arg).compat_mode_ = ORACLE_MODE == my_session->get_compatibility_mode()
tmp_arg.foreign_key_checks_ = is_oracle_mode() || (is_mysql_mode() && foreign_key_checks);
tmp_arg.compat_mode_ = ORACLE_MODE == my_session->get_compatibility_mode()
? lib::Worker::CompatMode::ORACLE : lib::Worker::CompatMode::MYSQL;
int64_t affected_rows = 0;
uint64_t compat_version = 0;
@ -1819,13 +1824,14 @@ int ObCreateTableLikeExecutor::execute(ObExecContext &ctx, ObCreateTableLikeStmt
{
int ret = OB_SUCCESS;
const obrpc::ObCreateTableLikeArg &create_table_like_arg = stmt.get_create_table_like_arg();
obrpc::ObCreateTableLikeArg &tmp_arg = const_cast<obrpc::ObCreateTableLikeArg&>(create_table_like_arg);
ObString first_stmt;
if (OB_FAIL(stmt.get_first_stmt(first_stmt))) {
LOG_WARN("get first statement failed", K(ret));
} else {
const_cast<obrpc::ObCreateTableLikeArg&>(create_table_like_arg).ddl_stmt_str_ = first_stmt;
const_cast<obrpc::ObCreateTableLikeArg&>(create_table_like_arg).session_id_ =
ctx.get_my_session()->get_sessid_for_table();
tmp_arg.ddl_stmt_str_ = first_stmt;
tmp_arg.consumer_group_id_ = THIS_WORKER.get_group_id();
tmp_arg.session_id_ = ctx.get_my_session()->get_sessid_for_table();
ObTaskExecutorCtx *task_exec_ctx = NULL;
obrpc::ObCommonRpcProxy *common_rpc_proxy = NULL;
if (OB_ISNULL(task_exec_ctx = GET_TASK_EXECUTOR_CTX(ctx))) {
@ -1847,11 +1853,13 @@ int ObFlashBackTableFromRecyclebinExecutor::execute(ObExecContext &ctx, ObFlashB
{
int ret = OB_SUCCESS;
const obrpc::ObFlashBackTableFromRecyclebinArg &flashback_table_arg = stmt.get_flashback_table_arg();
obrpc::ObFlashBackTableFromRecyclebinArg &tmp_arg = const_cast<obrpc::ObFlashBackTableFromRecyclebinArg&>(flashback_table_arg);
ObString first_stmt;
if (OB_FAIL(stmt.get_first_stmt(first_stmt))) {
LOG_WARN("get first statement failed", K(ret));
} else {
const_cast<obrpc::ObFlashBackTableFromRecyclebinArg&>(flashback_table_arg).ddl_stmt_str_ = first_stmt;
tmp_arg.ddl_stmt_str_ = first_stmt;
tmp_arg.consumer_group_id_ = THIS_WORKER.get_group_id();
ObTaskExecutorCtx *task_exec_ctx = NULL;
obrpc::ObCommonRpcProxy *common_rpc_proxy = NULL;
if (OB_ISNULL(task_exec_ctx = GET_TASK_EXECUTOR_CTX(ctx))) {
@ -1872,6 +1880,7 @@ int ObFlashBackTableFromRecyclebinExecutor::execute(ObExecContext &ctx, ObFlashB
int ObFlashBackTableToScnExecutor::execute(ObExecContext &ctx, ObFlashBackTableToScnStmt &stmt) {
int ret = OB_SUCCESS;
obrpc::ObFlashBackTableToScnArg &arg = stmt.flashback_table_to_scn_arg_;
arg.consumer_group_id_ = THIS_WORKER.get_group_id();
RowDesc row_desc;
ObTempExpr *temp_expr = NULL;
CK(OB_NOT_NULL(ctx.get_sql_ctx()));
@ -1935,11 +1944,13 @@ int ObPurgeTableExecutor::execute(ObExecContext &ctx, ObPurgeTableStmt &stmt)
{
int ret = OB_SUCCESS;
const obrpc::ObPurgeTableArg &purge_table_arg = stmt.get_purge_table_arg();
obrpc::ObPurgeTableArg &tmp_arg = const_cast<obrpc::ObPurgeTableArg&>(purge_table_arg);
ObString first_stmt;
if (OB_FAIL(stmt.get_first_stmt(first_stmt))) {
LOG_WARN("get first statement failed", K(ret));
} else {
const_cast<obrpc::ObPurgeTableArg &>(purge_table_arg).ddl_stmt_str_ = first_stmt;
tmp_arg.ddl_stmt_str_ = first_stmt;
tmp_arg.consumer_group_id_ = THIS_WORKER.get_group_id();
ObTaskExecutorCtx *task_exec_ctx = NULL;
obrpc::ObCommonRpcProxy *common_rpc_proxy = NULL;
@ -1967,6 +1978,7 @@ int ObOptimizeTableExecutor::execute(ObExecContext &ctx, ObOptimizeTableStmt &st
LOG_WARN("fail to get first stmt", K(ret));
} else {
arg.ddl_stmt_str_ = first_stmt;
arg.consumer_group_id_ = THIS_WORKER.get_group_id();
ObTaskExecutorCtx *task_exec_ctx = nullptr;
obrpc::ObCommonRpcProxy *common_rpc_proxy = nullptr;
if (OB_ISNULL(task_exec_ctx = GET_TASK_EXECUTOR_CTX(ctx))) {
@ -1997,6 +2009,7 @@ int ObOptimizeTenantExecutor::execute(ObExecContext &ctx, ObOptimizeTenantStmt &
LOG_WARN("error unexpected, my session must not be NULL", K(ret));
} else {
arg.ddl_stmt_str_ = first_stmt;
arg.consumer_group_id_ = THIS_WORKER.get_group_id();
ObTaskExecutorCtx *task_exec_ctx = nullptr;
obrpc::ObCommonRpcProxy *common_rpc_proxy = nullptr;
const observer::ObGlobalContext &gctx = observer::ObServer::get_instance().get_gctx();
@ -2082,6 +2095,7 @@ int ObOptimizeAllExecutor::execute(ObExecContext &ctx, ObOptimizeAllStmt &stmt)
LOG_WARN("fail to get first stmt", K(ret));
} else {
arg.ddl_stmt_str_ = first_stmt;
arg.consumer_group_id_ = THIS_WORKER.get_group_id();
ObTaskExecutorCtx *task_exec_ctx = nullptr;
obrpc::ObCommonRpcProxy *common_rpc_proxy = nullptr;
ObSchemaGetterGuard schema_guard;