Files
oceanbase/src/sql/engine/cmd/ob_table_executor.cpp

2179 lines
104 KiB
C++

/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX SQL_ENG
#include "sql/engine/cmd/ob_table_executor.h"
#include "sql/engine/cmd/ob_index_executor.h"
#include "sql/engine/cmd/ob_ddl_executor_util.h"
#include "share/object/ob_obj_cast.h"
#include "lib/mysqlclient/ob_mysql_proxy.h"
#include "lib/utility/ob_tracepoint.h"
#include "share/ob_common_rpc_proxy.h"
#include "share/ob_ddl_error_message_table_operator.h"
#include "sql/resolver/ddl/ob_create_table_stmt.h"
#include "sql/resolver/ddl/ob_alter_table_stmt.h"
#include "sql/resolver/ddl/ob_drop_table_stmt.h"
#include "sql/resolver/ddl/ob_rename_table_stmt.h"
#include "sql/resolver/ddl/ob_truncate_table_stmt.h"
#include "sql/resolver/ddl/ob_create_table_like_stmt.h"
#include "sql/resolver/ddl/ob_flashback_stmt.h"
#include "sql/resolver/ddl/ob_purge_stmt.h"
#include "sql/resolver/ddl/ob_optimize_stmt.h"
#include "sql/resolver/dml/ob_delete_stmt.h"
#include "sql/resolver/dml/ob_delete_resolver.h"
#include "sql/resolver/ob_resolver_utils.h"
#include "sql/engine/ob_exec_context.h"
#include "sql/engine/ob_physical_plan.h"
#include "sql/session/ob_sql_session_info.h"
#include "sql/code_generator/ob_expr_generator_impl.h"
#include "sql/engine/cmd/ob_partition_executor_utils.h"
#include "sql/parser/ob_parser.h"
#include "sql/ob_select_stmt_printer.h"
#include "observer/ob_server_struct.h"
#include "observer/ob_server.h"
#include "lib/worker.h"
namespace oceanbase
{
using namespace common;
using namespace share;
using namespace share::schema;
using namespace observer;
namespace sql
{
ObCreateTableExecutor::ObCreateTableExecutor()
{
}
ObCreateTableExecutor::~ObCreateTableExecutor()
{
}
int ObCreateTableExecutor::prepare_stmt(ObCreateTableStmt &stmt,
const ObSQLSessionInfo &my_session,
ObString &create_table_name)
{
int ret = OB_SUCCESS;
ObArenaAllocator allocator("CreateTableExec");
const int64_t buf_len = OB_MAX_SQL_LENGTH;
char *buf = static_cast<char*>(allocator.alloc(buf_len));
int64_t pos = 0;
const int64_t session_id = my_session.get_sessid();
const int64_t timestamp = ObTimeUtility::current_time();
obrpc::ObCreateTableArg &create_table_arg = stmt.get_create_table_arg();
create_table_name = create_table_arg.schema_.get_table_name_str();
if (OB_FAIL(databuff_printf(buf, buf_len, pos, "__ctas_%ld_%ld", session_id, timestamp))) {
LOG_WARN("failed to print tmp table name", K(ret));
} else {
ObString tmp_table_name(pos, buf);
if (OB_FAIL(create_table_arg.schema_.set_table_name(tmp_table_name))) {
LOG_WARN("failed to set tmp table name", K(ret));
}
}
return ret;
}
//准备查询插入的脚本
int ObCreateTableExecutor::prepare_ins_arg(ObCreateTableStmt &stmt,
const ObSQLSessionInfo *my_session,
const ParamStore *param_store,
ObSqlString &ins_sql) //out, 最终的查询插入语句
{
int ret = OB_SUCCESS;
ObArenaAllocator allocator("CreateTableExec");
char *buf = static_cast<char*>(allocator.alloc(OB_MAX_SQL_LENGTH));
int64_t buf_len = OB_MAX_SQL_LENGTH;
int64_t pos1 = 0;
bool is_set_subquery = false;
bool is_oracle_mode = lib::is_oracle_mode();
const ObString &db_name = stmt.get_database_name();
const ObString &tab_name = stmt.get_table_name();
const char sep_char = is_oracle_mode? '"': '`';
ObSelectStmt *select_stmt = stmt.get_sub_select();
ObSelectStmtPrinter select_stmt_printer(buf, buf_len, &pos1, select_stmt,
select_stmt->get_query_ctx()->get_timezone_info(),
param_store,
NULL, // column_list is null here
is_set_subquery);
if (OB_ISNULL(buf)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_ERROR("allocate memory failed");
} else if (OB_ISNULL(select_stmt)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("select stmt should not be null", K(ret));
} else if (OB_FAIL(databuff_printf(buf, buf_len, pos1, "insert into %c%.*s%c.%c%.*s%c",
sep_char,
db_name.length(),
db_name.ptr(),
sep_char,
sep_char,
tab_name.length(),
tab_name.ptr(),
sep_char))) {
LOG_WARN("fail to print insert into string", K(ret), K(db_name), K(tab_name));
} else if (lib::is_oracle_mode()) {
ObTableSchema &table_schema = stmt.get_create_table_arg().schema_;
int64_t used_column_count = 0;
for (int64_t i = 0; OB_SUCC(ret) && i < table_schema.get_column_count(); ++i) {
const ObColumnSchemaV2 *column_schema = table_schema.get_column_schema_by_idx(i);
if (OB_ISNULL(column_schema)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get null column schema", K(ret));
} else if (column_schema->get_column_id() < OB_END_RESERVED_COLUMN_ID_NUM) {
// do nothing
} else if (OB_FAIL(databuff_printf(buf, buf_len, pos1, (0 == used_column_count)? "(": ", "))) {
LOG_WARN("failed to print insert into string", K(ret), K(i));
} else if (OB_FAIL(databuff_printf(buf, buf_len, pos1, "%c%.*s%c", sep_char, LEN_AND_PTR(column_schema->get_column_name_str()), sep_char))) {
LOG_WARN("failed to print insert into string", K(ret));
} else {
++ used_column_count;
}
}
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < select_stmt->get_select_item_size(); ++i) {
const SelectItem &select_item = select_stmt->get_select_item(i);
if (OB_FAIL(databuff_printf(buf, buf_len, pos1, (0 == i)? "(": ", "))) {
LOG_WARN("failed to print insert into string", K(ret), K(i));
} else { /* do nothing */ }
if (OB_SUCC(ret)) {
if (!select_item.alias_name_.empty()) {
if (OB_FAIL(databuff_printf(buf, buf_len, pos1, "%c%.*s%c", sep_char, LEN_AND_PTR(select_item.alias_name_), sep_char))) {
LOG_WARN("failed to print insert into string", K(ret));
} else { /* do nothing */ }
} else {
if (OB_FAIL(databuff_printf(buf, buf_len, pos1, "%c%.*s%c", sep_char, LEN_AND_PTR(select_item.expr_name_), sep_char))) {
LOG_WARN("failed to print insert into string", K(ret));
} else { /* do nothing */ }
}
} else { /* do nothing */ }
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(databuff_printf(buf, buf_len, pos1, ") "))) {
LOG_WARN("fail to append ')'", K(ret));
} else if (OB_FAIL(select_stmt_printer.do_print())) {
LOG_WARN("fail to print select stmt", K(ret));
} else if (OB_FAIL(ins_sql.append(buf, pos1))){
LOG_WARN("fail to append insert into string", K(ret));
}
}
if (OB_SUCC(ret)) {
ObString converted_sql = ins_sql.string();
if (OB_FAIL(ObSQLUtils::convert_sql_text_from_schema_for_resolve(allocator,
my_session->get_dtc_params(),
converted_sql,
ObCharset::COPY_STRING_ON_SAME_CHARSET))) {
LOG_WARN("fail to convert insert into string to client_cs_type", K(ret));
} else if (OB_FAIL(ins_sql.assign(converted_sql))) {
LOG_WARN("fail to assign converted insert into string", K(ret));
}
}
LOG_DEBUG("ins str preparation complete!", K(ins_sql), K(ret), K(lib::is_oracle_mode()));
return ret;
}
//准备alter table 的参数
int ObCreateTableExecutor::prepare_alter_arg(ObCreateTableStmt &stmt,
const ObSQLSessionInfo *my_session,
const ObString &create_table_name,
obrpc::ObAlterTableArg &alter_table_arg) //out, 最终的alter table arg, set session_id = 0;
{
int ret = OB_SUCCESS;
const obrpc::ObCreateTableArg &create_table_arg = stmt.get_create_table_arg();
ObTableSchema &table_schema = const_cast<obrpc::ObCreateTableArg&>(create_table_arg).schema_;
AlterTableSchema *alter_table_schema = &alter_table_arg.alter_table_schema_;
table_schema.set_session_id(my_session->get_sessid_for_table());
alter_table_arg.session_id_ = my_session->get_sessid_for_table();
alter_table_schema->alter_type_ = OB_DDL_ALTER_TABLE;
//compat for old server
alter_table_arg.tz_info_ = my_session->get_tz_info_wrap().get_tz_info_offset();
alter_table_arg.is_inner_ = my_session->is_inner();
alter_table_arg.exec_tenant_id_ = my_session->get_effective_tenant_id();
if (OB_FAIL(alter_table_arg.tz_info_wrap_.deep_copy(my_session->get_tz_info_wrap()))) {
LOG_WARN("failed to deep_copy tz info wrap", "tz_info_wrap", my_session->get_tz_info_wrap(), K(ret));
} else if (OB_FAIL(alter_table_arg.set_nls_formats(
my_session->get_local_nls_date_format(),
my_session->get_local_nls_timestamp_format(),
my_session->get_local_nls_timestamp_tz_format()))) {
LOG_WARN("failed to set_nls_formats", K(ret));
} else if (OB_FAIL(alter_table_schema->assign(table_schema))) {
LOG_WARN("failed to assign alter table schema", K(ret));
} else if (!table_schema.is_mysql_tmp_table()
&& FALSE_IT(alter_table_schema->set_session_id(0))) {
//impossible
} else if (OB_FAIL(alter_table_schema->set_origin_table_name(stmt.get_table_name()))) {
LOG_WARN("failed to set origin table name", K(ret));
} else if (OB_FAIL(alter_table_schema->set_origin_database_name(stmt.get_database_name()))) {
LOG_WARN("failed to set origin database name", K(ret));
} else if (OB_FAIL(alter_table_schema->set_table_name(create_table_name))) {
LOG_WARN("failed to set table name", K(ret));
} else if (OB_FAIL(alter_table_schema->set_database_name(stmt.get_database_name()))) {
LOG_WARN("failed to set database name", K(ret));
} else if (!table_schema.is_mysql_tmp_table()
&& OB_FAIL(alter_table_schema->alter_option_bitset_.add_member(obrpc::ObAlterTableArg::SESSION_ID))) {
LOG_WARN("failed to add member SESSION_ID for alter table schema", K(ret), K(alter_table_arg));
} else if (OB_FAIL(alter_table_schema->alter_option_bitset_.add_member(obrpc::ObAlterTableArg::TABLE_NAME))) {
LOG_WARN("failed to add member TABLE_NAME for alter table schema", K(ret), K(alter_table_arg));
}
LOG_DEBUG("alter table arg preparation complete!", K(*alter_table_schema), K(ret));
return ret;
}
//准备drop table的参数
int ObCreateTableExecutor::prepare_drop_arg(const ObCreateTableStmt &stmt,
const ObSQLSessionInfo *my_session,
obrpc::ObTableItem &table_item,
obrpc::ObDropTableArg &drop_table_arg) //out, drop table的参数
{
int ret = OB_SUCCESS;
const ObString &db_name = stmt.get_database_name();
const ObString &tab_name = stmt.get_table_name();
drop_table_arg.if_exist_ = true;
drop_table_arg.tenant_id_ = my_session->get_login_tenant_id();
drop_table_arg.to_recyclebin_ = false;
drop_table_arg.table_type_ = USER_TABLE;
drop_table_arg.session_id_ = my_session->get_sessid_for_table();
drop_table_arg.exec_tenant_id_ = my_session->get_effective_tenant_id();
int64_t foreign_key_checks = 0;
my_session->get_foreign_key_checks(foreign_key_checks);
drop_table_arg.foreign_key_checks_ = is_oracle_mode() || (is_mysql_mode() && foreign_key_checks);
table_item.database_name_ = db_name;
table_item.table_name_ = tab_name;
if (OB_FAIL(my_session->get_name_case_mode(table_item.mode_))) {
LOG_WARN("failed to get name case mode!", K(ret));
} else if (OB_FAIL(drop_table_arg.tables_.push_back(table_item))) {
LOG_WARN("failed to add table item!", K(table_item), K(ret));
}
LOG_DEBUG("drop table arg preparation complete!", K(drop_table_arg), K(table_item), K(ret));
return ret;
}
//查询建表的处理, 通过内部session执行查询插入代码参考了 ObTableModify::ObTableModifyCtx::open_inner_conn() 实现
int ObCreateTableExecutor::execute_ctas(ObExecContext &ctx,
ObCreateTableStmt &stmt,
obrpc::ObCommonRpcProxy *common_rpc_proxy)
{
int ret = OB_SUCCESS;
int64_t affected_rows = 0;
ObMySQLProxy *sql_proxy = ctx.get_sql_proxy();
common::ObCommonSqlProxy *user_sql_proxy;
common::ObOracleSqlProxy oracle_sql_proxy;
ObSQLSessionInfo *my_session = ctx.get_my_session();
ObPhysicalPlanCtx *plan_ctx = ctx.get_physical_plan_ctx();
HEAP_VAR(obrpc::ObAlterTableArg, alter_table_arg) {
obrpc::ObDropTableArg drop_table_arg;
obrpc::ObTableItem table_item;
ObString create_table_name;
ObSqlString ins_sql;
const observer::ObGlobalContext &gctx = observer::ObServer::get_instance().get_gctx();
obrpc::ObCreateTableRes create_table_res;
obrpc::ObCreateTableArg &create_table_arg = stmt.get_create_table_arg();
create_table_arg.is_inner_ = my_session->is_inner();
bool need_clean = true;
CK(OB_NOT_NULL(sql_proxy),
OB_NOT_NULL(my_session),
OB_NOT_NULL(gctx.schema_service_),
OB_NOT_NULL(plan_ctx),
OB_NOT_NULL(common_rpc_proxy));
if (OB_SUCC(ret)) {
ObInnerSQLConnectionPool *pool = static_cast<observer::ObInnerSQLConnectionPool*>(sql_proxy->get_pool());
if (OB_ISNULL(pool)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("pool is null", K(ret));
} else if (OB_FAIL(oracle_sql_proxy.init(pool))) {
LOG_WARN("init oracle sql proxy failed", K(ret));
} else if (OB_FAIL(prepare_stmt(stmt, *my_session, create_table_name))) {
LOG_WARN("failed to prepare stmt", K(ret));
} else if (OB_FAIL(prepare_ins_arg(stmt, my_session, &plan_ctx->get_param_store(), ins_sql))) { //1, 参数准备;
LOG_WARN("failed to prepare insert table arg", K(ret));
} else if (OB_FAIL(prepare_alter_arg(stmt, my_session, create_table_name, alter_table_arg))) {
LOG_WARN("failed to prepare alter table arg", K(ret));
} else if (OB_FAIL(prepare_drop_arg(stmt, my_session, table_item, drop_table_arg))) {
LOG_WARN("failed to prepare drop table arg", K(ret));
} else if (OB_FAIL(ctx.get_sql_ctx()->schema_guard_->reset())){
LOG_WARN("schema_guard reset failed", K(ret));
} else if (OB_FAIL(common_rpc_proxy->create_table(create_table_arg, create_table_res))) { //2, 建表;
LOG_WARN("rpc proxy create table failed", K(ret), "dst", common_rpc_proxy->get_server());
} else if (OB_INVALID_ID != create_table_res.table_id_) { //如果表已存在则后续的查询插入不进行
if (OB_INVALID_VERSION == create_table_res.schema_version_) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected schema version", K(ret), K(create_table_res));
} else {
uint64_t tenant_id = my_session->get_effective_tenant_id();
if (OB_FAIL(gctx.schema_service_->async_refresh_schema(tenant_id,
create_table_res.schema_version_))) {
LOG_WARN("failed to async refresh schema", K(ret));
}
}
#ifdef ERRSIM
{
int tmp_ret = E(EventTable::EN_CTAS_FAIL_NO_DROP_ERROR) OB_SUCCESS; //错误注入, 使表不能清理
if (OB_FAIL(tmp_ret)) {
ret = tmp_ret;
need_clean = false;
}
}
#else
//do nothing...
#endif
//3, 插入数据
if (OB_SUCC(ret)) {
bool is_mysql_temp_table = stmt.get_create_table_arg().schema_.is_mysql_tmp_table();
bool in_trans = my_session->is_in_transaction();
ObBasicSessionInfo::UserScopeGuard user_scope_guard(my_session->get_sql_scope_flags());
common::sqlclient::ObISQLConnection *conn = NULL;
const uint64_t tenant_id = my_session->get_effective_tenant_id();
if (lib::is_oracle_mode()) {
user_sql_proxy = &oracle_sql_proxy;
} else {
user_sql_proxy = sql_proxy;
}
if (OB_FAIL(pool->acquire(my_session, conn))) {
LOG_WARN("failed to acquire inner connection", K(ret));
} else if (OB_ISNULL(conn)) {
ret = OB_INNER_STAT_ERROR;
LOG_WARN("connection can not be NULL", K(ret));
} else if ((!is_mysql_temp_table || !in_trans)
&& OB_FAIL(conn->start_transaction(tenant_id))) {
LOG_WARN("failed start transaction", K(ret), K(tenant_id));
} else {
if (OB_FAIL(conn->execute_write(tenant_id, ins_sql.ptr(),
affected_rows, true))) {
LOG_WARN("failed to exec sql", K(tenant_id), K(ins_sql), K(ret));
}
// transaction started, must commit or rollback
int tmp_ret = OB_SUCCESS;
if (!is_mysql_temp_table || !in_trans) {
if (OB_LIKELY(OB_SUCCESS == ret)) {
tmp_ret = conn->commit();
} else {
tmp_ret = conn->rollback();
}
if (OB_UNLIKELY(OB_SUCCESS != tmp_ret)) {
ret = (OB_SUCCESS == ret) ? tmp_ret : ret;
LOG_WARN("fail to end transaction", K(ret), K(tmp_ret));
}
}
}
if (OB_NOT_NULL(conn)) {
user_sql_proxy->close(conn, true);
}
}
DEBUG_SYNC(BEFORE_EXECUTE_CTAS_CLEAR_SESSION_ID);
//4, 刷新schema, 将table的sess id重置为0
if (OB_SUCC(ret)) {
obrpc::ObAlterTableRes res;
alter_table_arg.compat_mode_ = ORACLE_MODE == my_session->get_compatibility_mode() ?
lib::Worker::CompatMode::ORACLE : lib::Worker::CompatMode::MYSQL;
if (OB_FAIL(common_rpc_proxy->alter_table(alter_table_arg, res))) {
LOG_WARN("failed to update table session", K(ret), K(alter_table_arg));
if (alter_table_arg.compat_mode_ == lib::Worker::CompatMode::ORACLE && OB_ERR_TABLE_EXIST == ret) {
ret = OB_ERR_EXIST_OBJECT;
}
}
}
if (OB_FAIL(ret)) { //5, 查询建表失败, 需要清理环境即DROP TABLE
my_session->update_last_active_time();
if (OB_LIKELY(need_clean)) {
int tmp_ret = OB_SUCCESS;
obrpc::ObDDLRes res;
drop_table_arg.compat_mode_ = ORACLE_MODE == my_session->get_compatibility_mode() ?
lib::Worker::CompatMode::ORACLE : lib::Worker::CompatMode::MYSQL;
if (OB_SUCCESS != (tmp_ret = common_rpc_proxy->drop_table(drop_table_arg, res))) {
LOG_WARN("failed to drop table", K(drop_table_arg), K(ret));
} else {
LOG_INFO("table is created and dropped due to error ", K(ret));
}
}
} else {
plan_ctx->set_affected_rows(affected_rows);
LOG_DEBUG("CTAS all done", K(ins_sql), K(affected_rows), K(lib::is_oracle_mode()));
}
} else {
LOG_DEBUG("table exists, no need to CTAS", K(create_table_res.table_id_));
}
}
}
return ret;
}
int ObCreateTableExecutor::execute(ObExecContext &ctx, ObCreateTableStmt &stmt)
{
int ret = OB_SUCCESS;
ObTaskExecutorCtx *task_exec_ctx = NULL;
obrpc::ObCommonRpcProxy *common_rpc_proxy = NULL;
obrpc::ObCreateTableRes res;
obrpc::ObCreateTableArg &create_table_arg = stmt.get_create_table_arg();
ObString first_stmt;
ObSelectStmt *select_stmt = stmt.get_sub_select();
ObTableSchema &table_schema = create_table_arg.schema_;
ObSQLSessionInfo *my_session = ctx.get_my_session();
if (OB_ISNULL(my_session)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("session is null", K(ret));
} else if (OB_FAIL(stmt.get_first_stmt(first_stmt))) {
LOG_WARN("get first statement failed", K(ret));
} else {
create_table_arg.is_inner_ = my_session->is_inner();
const_cast<obrpc::ObCreateTableArg&>(create_table_arg).ddl_stmt_str_ = first_stmt;
if (OB_ISNULL(task_exec_ctx = GET_TASK_EXECUTOR_CTX(ctx))) {
ret = OB_NOT_INIT;
LOG_WARN("get task executor context failed", K(ret));
} else if (OB_FAIL(ObPartitionExecutorUtils::calc_values_exprs(ctx, stmt))) {
LOG_WARN("compare range parition expr fail", K(ret));
} else if (OB_FAIL(set_index_arg_list(ctx, stmt))) {
LOG_WARN("fail to set index_arg_list", K(ret));
} else if (OB_FAIL(task_exec_ctx->get_common_rpc(common_rpc_proxy))) {
LOG_WARN("get common rpc proxy failed", K(ret));
} else if (OB_ISNULL(common_rpc_proxy)){
ret = OB_ERR_UNEXPECTED;
LOG_WARN("common rpc proxy should not be null", K(ret));
} else if (OB_ISNULL(select_stmt)) { // 普通建表的处理
if (OB_FAIL(ctx.get_sql_ctx()->schema_guard_->reset())){
LOG_WARN("schema_guard reset failed", K(ret));
} else if (OB_FAIL(common_rpc_proxy->create_table(create_table_arg, res))) {
LOG_WARN("rpc proxy create table failed", K(ret), "dst", common_rpc_proxy->get_server());
} else { /* do nothing */ }
} else if (OB_FAIL(execute_ctas(ctx, stmt, common_rpc_proxy))){ // 查询建表的处理
LOG_WARN("execute create table as select failed", K(ret));
}
// only CTAS or create temperary table will make session_id != 0. If such table detected, set
// need ctas cleanup task anyway to do some cleanup jobs
if (0 != table_schema.get_session_id()) {
LOG_TRACE("CTAS or temporary table create detected", K(table_schema));
ATOMIC_STORE(&OBSERVER.need_ctas_cleanup_, true);
}
}
return ret;
}
int ObCreateTableExecutor::set_index_arg_list(ObExecContext &ctx, ObCreateTableStmt &stmt)
{
int ret = OB_SUCCESS;
obrpc::ObCreateTableArg &create_table_arg = const_cast<obrpc::ObCreateTableArg &>(stmt.get_create_table_arg());
if (stmt.get_index_partition_resolve_results().count()
!= stmt.get_index_arg_list().count()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid index resolve result", K(ret), K(stmt));
}
for (int64_t i = 0; OB_SUCC(ret) && i < stmt.get_index_arg_list().count(); i++) {
HEAP_VAR(ObCreateIndexStmt, index_stmt) {
ObPartitionResolveResult &resolve_result = stmt.get_index_partition_resolve_results().at(i);
index_stmt.get_part_fun_exprs() = resolve_result.get_part_fun_exprs();
index_stmt.get_part_values_exprs() = resolve_result.get_part_values_exprs();
index_stmt.get_subpart_fun_exprs() = resolve_result.get_subpart_fun_exprs();
index_stmt.get_template_subpart_values_exprs() = resolve_result.get_template_subpart_values_exprs();
index_stmt.get_individual_subpart_values_exprs() = resolve_result.get_individual_subpart_values_exprs();
if (OB_FAIL(index_stmt.get_create_index_arg().assign(stmt.get_index_arg_list().at(i)))) {
LOG_WARN("fail to assign index arg", K(ret));
} else if (OB_FAIL(ObPartitionExecutorUtils::calc_values_exprs(ctx, index_stmt))) {
LOG_WARN("fail to compare range partition expr", K(ret));
} else if (OB_FAIL(create_table_arg.index_arg_list_.push_back(index_stmt.get_create_index_arg()))) {
LOG_WARN("fail to push back index_arg", K(ret));
}
}
}
return ret;
}
ObAlterTableExecutor::ObAlterTableExecutor()
{
}
ObAlterTableExecutor::~ObAlterTableExecutor()
{
}
int ObAlterTableExecutor::refresh_schema_for_table(
const uint64_t tenant_id)
{
int ret = OB_SUCCESS;
share::schema::ObSchemaGetterGuard schema_guard;
const observer::ObGlobalContext &gctx = observer::ObServer::get_instance().get_gctx();
ObMultiVersionSchemaService *schema_service = gctx.schema_service_;
int64_t local_version = OB_INVALID_VERSION;
int64_t global_version = OB_INVALID_VERSION;
if (OB_ISNULL(schema_service)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("error unexpected, schema service must not be NULL", K(ret));
} else if (OB_FAIL(schema_service->get_tenant_refreshed_schema_version(
tenant_id, local_version))) {
LOG_WARN("fail to get local version", K(ret), "tenant_id", tenant_id);
} else if (OB_FAIL(schema_service->get_tenant_received_broadcast_version(
tenant_id, global_version))) {
LOG_WARN("fail to get global version", K(ret), "tenant_id", tenant_id);
} else if (local_version < global_version) {
LOG_INFO("try to refresh schema", K(local_version), K(global_version));
// force refresh schema最新版本
ObSEArray<uint64_t, 1> tenant_ids;
if (OB_FAIL(tenant_ids.push_back(tenant_id))) {
LOG_WARN("fail to push back tenant_id", K(ret), "tenant_id", tenant_id);
} else if (OB_FAIL(schema_service->refresh_and_add_schema(tenant_ids))) {
LOG_WARN("failed to refresh schema", K(ret));
}
}
return ret;
}
/* 3100 之前的版本 alter table 逻辑
alter table 向 RS 发 RPC 时,如果有 add index 或者 add unqiue 操作,就发送 “两批” RPC:
第一批 RPC 只有一个 RPC,是不包含 add index 或者 add unqiue 操作以外的其他操作的 alter table 的 RPC
第二批 RPC 是 alter table 中每创建一个索引,就向 RS 发一个 rpc,observer 端同步等待索引建成功或者建失败
a)如果成功就继续发送下一个创建索引的 RPC;
b)如果失败,就报错结束,并终止发送后面还没有向 RS 发送 RPC(即还没有创建)的创建索引请求。前面已经发送和创建索引无关的 RPC 在 RS 端均已处理成功,不再回滚。但如果一条 alter table 同时建了多个 index,其中一个失败,就回滚所有已经建立的 index。
*/
int ObAlterTableExecutor::alter_table_rpc_v1(
obrpc::ObAlterTableArg &alter_table_arg,
obrpc::ObAlterTableRes &res,
common::ObIAllocator &allocator,
obrpc::ObCommonRpcProxy *common_rpc_proxy,
ObSQLSessionInfo *my_session,
const bool is_sync_ddl_user)
{
int ret = OB_SUCCESS;
bool alter_table_add_index = false;
const ObSArray<obrpc::ObIndexArg *> index_arg_list = alter_table_arg.index_arg_list_;
if (OB_ISNULL(my_session)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret));
} else {
alter_table_arg.compat_mode_ = ORACLE_MODE == my_session->get_compatibility_mode() ?
lib::Worker::CompatMode::ORACLE : lib::Worker::CompatMode::MYSQL;
}
for (int64_t i = 0; OB_SUCC(ret) && i < index_arg_list.size(); ++i) {
obrpc::ObIndexArg *index_arg = index_arg_list.at(i);
if (OB_ISNULL(index_arg)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("index arg should not be null", K(ret));
} else if (obrpc::ObIndexArg::ADD_INDEX == index_arg->index_action_type_) {
alter_table_add_index = true;
break;
}
}
if (!alter_table_add_index) {
// alter table 中没有 add index 的情况
if (OB_FAIL(common_rpc_proxy->alter_table(alter_table_arg, res))) {
LOG_WARN("rpc proxy alter table failed", K(ret), "dst", common_rpc_proxy->get_server(), K(alter_table_arg));
}
} else {
// alter table 中有 add index 的情况
ObSArray<obrpc::ObIndexArg *> add_index_arg_list;
alter_table_arg.index_arg_list_.reset();
for (int64_t i = 0; OB_SUCC(ret) && i < index_arg_list.size(); ++i) {
obrpc::ObIndexArg *index_arg = index_arg_list.at(i);
if (OB_ISNULL(index_arg)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("index arg should not be null", K(ret));
} else if (obrpc::ObIndexArg::ADD_INDEX == index_arg->index_action_type_) {
if (OB_FAIL(add_index_arg_list.push_back(index_arg))) {
LOG_WARN("fail to push back to arg_for_adding_index_list", K(ret));
}
} else { // not for adding index
if (OB_FAIL(alter_table_arg.index_arg_list_.push_back(index_arg))) {
LOG_WARN("fail to push back to arg_for_adding_index_list", K(ret));
}
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(common_rpc_proxy->alter_table(alter_table_arg, res))) {
LOG_WARN("rpc proxy alter table for not adding index failed", K(ret), "dst", common_rpc_proxy->get_server(), K(alter_table_arg));
}
}
if (OB_SUCC(ret)) {
ObString empty_stmt;
alter_table_arg.is_alter_columns_ = false;
alter_table_arg.is_alter_options_ = false;
alter_table_arg.is_alter_partitions_ = false;
alter_table_arg.ddl_stmt_str_ = empty_stmt;
alter_table_arg.ddl_id_str_ = empty_stmt;
alter_table_arg.alter_constraint_type_ = obrpc::ObAlterTableArg::CONSTRAINT_NO_OPERATION;
ObSArray<uint64_t> added_index_table_ids;
ObCreateIndexExecutor create_index_executor;
for (int64_t i = 0; OB_SUCC(ret) && i < add_index_arg_list.size(); ++i) {
alter_table_arg.index_arg_list_.reset();
if (OB_FAIL(alter_table_arg.index_arg_list_.push_back(add_index_arg_list.at(i)))) {
LOG_WARN("fail to push back to arg_for_adding_index_list", K(ret));
} else if (OB_FAIL(common_rpc_proxy->alter_table(alter_table_arg, res))) {
LOG_WARN("rpc proxy alter table for adding index failed", K(ret), "dst", common_rpc_proxy->get_server(), K(alter_table_arg));
} else {
// 同步等索引建成功
obrpc::ObIndexArg *index_arg = alter_table_arg.index_arg_list_.at(0);
obrpc::ObCreateIndexArg *create_index_arg = NULL;
if (OB_ISNULL(index_arg)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("index arg is null", K(ret), K(i));
} else if (obrpc::ObIndexArg::ADD_INDEX != index_arg->index_action_type_) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("index action type should be add index", K(ret), K(i));
} else if (OB_ISNULL(create_index_arg = static_cast<obrpc::ObCreateIndexArg *>(index_arg))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("create index arg is null", K(ret), K(i));
} else if (!is_sync_ddl_user) {
// 只考虑非备份恢复时的索引同步检查
create_index_arg->index_schema_.set_table_id(res.index_table_id_);
create_index_arg->index_schema_.set_schema_version(res.schema_version_);
if (OB_FAIL(create_index_executor.sync_check_index_status(*my_session, *common_rpc_proxy, *create_index_arg, res, allocator))) {
LOG_WARN("failed to sync_check_index_status", K(ret), K(*create_index_arg), K(i));
} else {
added_index_table_ids.push_back(res.index_table_id_);
}
}
}
}
// 如果一条 alter table 同时建了多个 index,其中一个失败,就回滚所有已经建立的 index
if (OB_FAIL(ret)) {
int tmp_ret = OB_SUCCESS;
for (int64_t i = 0; OB_SUCCESS == tmp_ret && i < added_index_table_ids.size(); ++i) {
obrpc::ObDropIndexArg drop_index_arg;
obrpc::ObDropIndexRes drop_index_res;
obrpc::ObCreateIndexArg *create_index_arg = static_cast<obrpc::ObCreateIndexArg *>(add_index_arg_list.at(i));
drop_index_arg.tenant_id_ = create_index_arg->tenant_id_;
drop_index_arg.exec_tenant_id_ = create_index_arg->tenant_id_;
drop_index_arg.index_table_id_ = added_index_table_ids.at(i);
drop_index_arg.session_id_ = create_index_arg->session_id_;
drop_index_arg.index_name_ = create_index_arg->index_name_;
drop_index_arg.table_name_ = create_index_arg->table_name_;
drop_index_arg.database_name_ = create_index_arg->database_name_;
drop_index_arg.index_action_type_ = obrpc::ObIndexArg::DROP_INDEX;
drop_index_arg.is_add_to_scheduler_ = false;
if (OB_SUCCESS != (tmp_ret = create_index_executor.set_drop_index_stmt_str(drop_index_arg, allocator))) {
LOG_WARN("fail to set drop index ddl_stmt_str", K(tmp_ret));
} else if (OB_SUCCESS != (tmp_ret = common_rpc_proxy->drop_index(drop_index_arg, drop_index_res))) {
LOG_WARN("rpc proxy drop index failed", "dst", common_rpc_proxy->get_server(),
K(tmp_ret),
K(drop_index_arg.table_name_),
K(drop_index_arg.index_name_));
}
}
LOG_INFO("added indexes failed, we rolled back all indexes added in this same alter table sql. But we didn't roll back other actions in this same alter table sql");
}
}
}
return ret;
}
/* 从 3100 开始的版本 alter table 逻辑是将建索引和其他操作放到同一个 rpc 里发到 rs,返回后对每个创建的索引进行同步等,如果一个索引创建失败,则回滚全部索引
mysql 模式下支持 alter table 同时做建索引操作和其他操作,需要保证 rs 在处理 drop index 之后再处理 add index
否则前缀索引会有问题:https://code.aone.alibaba-inc.com/oceanbase/oceanbase/codereview/1907077
*/
int ObAlterTableExecutor::alter_table_rpc_v2(
obrpc::ObAlterTableArg &alter_table_arg,
obrpc::ObAlterTableRes &res,
common::ObIAllocator &allocator,
obrpc::ObCommonRpcProxy *common_rpc_proxy,
ObSQLSessionInfo *my_session,
const bool is_sync_ddl_user)
{
int ret = OB_SUCCESS;
const ObSArray<obrpc::ObIndexArg *> index_arg_list = alter_table_arg.index_arg_list_;
ObSArray<obrpc::ObIndexArg *> add_index_arg_list;
ObSArray<obrpc::ObIndexArg *> drop_index_args;
alter_table_arg.index_arg_list_.reset();
if (OB_ISNULL(my_session)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret));
} else {
alter_table_arg.compat_mode_ = ORACLE_MODE == my_session->get_compatibility_mode() ?
lib::Worker::CompatMode::ORACLE : lib::Worker::CompatMode::MYSQL;
}
for (int64_t i = 0; OB_SUCC(ret) && i < index_arg_list.size(); ++i) {
obrpc::ObIndexArg *index_arg = index_arg_list.at(i);
if (OB_ISNULL(index_arg)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("index arg should not be null", KR(ret));
} else if (obrpc::ObIndexArg::ADD_INDEX == index_arg->index_action_type_) {
if (OB_FAIL(add_index_arg_list.push_back(index_arg))) {
LOG_WARN("fail to push back to arg_for_adding_index_list", KR(ret));
}
} else if (obrpc::ObIndexArg::DROP_INDEX == index_arg->index_action_type_) {
if (OB_FAIL(drop_index_args.push_back(index_arg))) {
LOG_WARN("push back drop index arg failed", K(ret));
} else if (OB_FAIL(alter_table_arg.index_arg_list_.push_back(index_arg))) {
LOG_WARN("push back index arg failed", K(ret));
} else {
ObDropIndexArg *drop_index_arg = static_cast<ObDropIndexArg *>(index_arg);
drop_index_arg->is_add_to_scheduler_ = true;
}
} else { // for rename/drop index action
if (OB_FAIL(alter_table_arg.index_arg_list_.push_back(index_arg))) {
LOG_WARN("fail to push back to arg_for_adding_index_list", KR(ret));
}
}
}
// for add index action
for (int64_t i = 0; OB_SUCC(ret) && i < add_index_arg_list.size(); ++i) {
if (OB_FAIL(alter_table_arg.index_arg_list_.push_back(add_index_arg_list.at(i)))) {
LOG_WARN("fail to push back to arg_for_adding_index_list", KR(ret));
}
}
if (OB_SUCC(ret)) {
if (obrpc::ObAlterTableArg::SET_INTERVAL == alter_table_arg.alter_part_type_
|| obrpc::ObAlterTableArg::INTERVAL_TO_RANGE == alter_table_arg.alter_part_type_) {
alter_table_arg.is_alter_partitions_ = true;
}
if (OB_FAIL(common_rpc_proxy->alter_table(alter_table_arg, res))) {
LOG_WARN("rpc proxy alter table failed", KR(ret), "dst", common_rpc_proxy->get_server(), K(alter_table_arg));
} else {
// 在回滚时不会重试,也不检查 schema version
alter_table_arg.based_schema_object_infos_.reset();
}
}
if (OB_SUCC(ret)) {
ObIArray<obrpc::ObDDLRes> &ddl_ress = res.ddl_res_array_;
for (int64_t i = 0; OB_SUCC(ret) && i < ddl_ress.count(); ++i) {
ObDDLRes &ddl_res = ddl_ress.at(i);
if (OB_FAIL(ObDDLExecutorUtil::wait_ddl_finish(ddl_res.tenant_id_, ddl_res.task_id_, *my_session, common_rpc_proxy))) {
LOG_WARN("wait drop index finish", K(ret));
}
}
}
if (OB_SUCC(ret)) {
ObCreateIndexExecutor create_index_executor;
uint64_t failed_index_no = OB_INVALID_ID;
// 对drop/truncate分区全局索引的处理
if (!is_sync_ddl_user && alter_table_arg.is_update_global_indexes_
&& (obrpc::ObAlterTableArg::DROP_PARTITION == alter_table_arg.alter_part_type_
|| obrpc::ObAlterTableArg::DROP_SUB_PARTITION == alter_table_arg.alter_part_type_
|| obrpc::ObAlterTableArg::TRUNCATE_PARTITION == alter_table_arg.alter_part_type_
|| obrpc::ObAlterTableArg::TRUNCATE_SUB_PARTITION == alter_table_arg.alter_part_type_)) {
common::ObSArray<ObAlterTableResArg> &res_array = res.res_arg_array_;
for (int64_t i = 0; OB_SUCC(ret) && i < res_array.size(); ++i) {
SMART_VAR(obrpc::ObCreateIndexArg, create_index_arg) {
create_index_arg.index_schema_.set_table_id(res_array.at(i).schema_id_);
create_index_arg.index_schema_.set_schema_version(res_array.at(i).schema_version_);
if (OB_FAIL(create_index_executor.sync_check_index_status(*my_session, *common_rpc_proxy, create_index_arg, res, allocator))) {
LOG_WARN("failed to sync_check_index_status", KR(ret), K(create_index_arg), K(i));
}
}
}
} else if (DDL_CREATE_INDEX == res.ddl_type_ || DDL_NORMAL_TYPE == res.ddl_type_) {
// TODO(shuangcan): alter table create index returns DDL_NORMAL_TYPE now, check if we can fix this later
// 同步等索引建成功
for (int64_t i = 0; OB_SUCC(ret) && i < add_index_arg_list.size(); ++i) {
obrpc::ObIndexArg *index_arg = add_index_arg_list.at(i);
obrpc::ObCreateIndexArg *create_index_arg = NULL;
if (OB_ISNULL(index_arg)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("index arg is null", KR(ret), K(i));
} else if (obrpc::ObIndexArg::ADD_INDEX != index_arg->index_action_type_) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("index action type should be add index", KR(ret), K(i), K(*index_arg));
} else if (OB_ISNULL(create_index_arg = static_cast<obrpc::ObCreateIndexArg *>(index_arg))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("create index arg is null", KR(ret), K(i));
} else if (INDEX_TYPE_PRIMARY == create_index_arg->index_type_) {
// do nothing
} else if (!is_sync_ddl_user) {
// 只考虑非备份恢复时的索引同步检查
create_index_arg->index_schema_.set_table_id(res.res_arg_array_.at(i).schema_id_);
create_index_arg->index_schema_.set_schema_version(res.res_arg_array_.at(i).schema_version_);
// 只考虑非备份恢复时的索引同步检查
if (OB_FAIL(create_index_executor.sync_check_index_status(*my_session, *common_rpc_proxy, *create_index_arg, res, allocator))) {
failed_index_no = i;
LOG_WARN("failed to sync_check_index_status", KR(ret), K(*create_index_arg), K(i));
}
}
}
// 回滚所有已经建立的 index
if (OB_FAIL(ret)) {
int tmp_ret = OB_SUCCESS;
uint64_t tenant_id = OB_INVALID_ID;
for (int64_t i = 0; (OB_SUCCESS == tmp_ret) && (i < add_index_arg_list.size()); ++i) {
if (failed_index_no == i) {
// 同步建索引逻辑里已经把这个失败的删掉了
continue;
} else {
obrpc::ObDropIndexArg drop_index_arg;
obrpc::ObDropIndexRes drop_index_res;
obrpc::ObCreateIndexArg *create_index_arg = static_cast<obrpc::ObCreateIndexArg *>(add_index_arg_list.at(i));
drop_index_arg.tenant_id_ = create_index_arg->tenant_id_;
drop_index_arg.exec_tenant_id_ = create_index_arg->tenant_id_;
drop_index_arg.index_table_id_ = res.res_arg_array_.at(i).schema_id_;
drop_index_arg.session_id_ = create_index_arg->session_id_;
drop_index_arg.index_name_ = create_index_arg->index_name_;
drop_index_arg.table_name_ = create_index_arg->table_name_;
drop_index_arg.database_name_ = create_index_arg->database_name_;
drop_index_arg.index_action_type_ = obrpc::ObIndexArg::DROP_INDEX;
drop_index_arg.is_add_to_scheduler_ = false;
tenant_id = drop_index_arg.tenant_id_;
if (OB_SUCCESS != (tmp_ret = create_index_executor.set_drop_index_stmt_str(drop_index_arg, allocator))) {
LOG_WARN("fail to set drop index ddl_stmt_str", K(tmp_ret));
} else if (OB_SUCCESS != (tmp_ret = common_rpc_proxy->drop_index(drop_index_arg, drop_index_res))) {
LOG_WARN("rpc proxy drop index failed", "dst", common_rpc_proxy->get_server(),
K(tmp_ret),
K(drop_index_arg.table_name_),
K(drop_index_arg.index_name_));
}
}
}
if (OB_SUCCESS != tmp_ret && OB_INVALID_ID != failed_index_no) { // rewrite LOG_USER_ERROR message
uint64_t index_table_id = res.res_arg_array_.at(failed_index_no).schema_id_;
int64_t schema_version = res.res_arg_array_.at(failed_index_no).schema_version_;
bool is_finish = false;
if (OB_SUCCESS != (tmp_ret = ObDDLExecutorUtil::wait_build_index_finish(tenant_id, res.task_id_, is_finish))) {
LOG_WARN("wait build index finish failed", K(tmp_ret), K(tenant_id), K(res.task_id_));
}
}
LOG_INFO("added indexes failed, we rolled back all indexes added in this same alter table sql. But we didn't roll back other actions in this same alter table sql");
}
}
}
return ret;
}
int ObAlterTableExecutor::execute(ObExecContext &ctx, ObAlterTableStmt &stmt)
{
int ret = OB_SUCCESS;
ObTaskExecutorCtx *task_exec_ctx = NULL;
obrpc::ObCommonRpcProxy *common_rpc_proxy = NULL;
obrpc::ObAlterTableArg &alter_table_arg = stmt.get_alter_table_arg();
LOG_DEBUG("start of alter table execute", K(alter_table_arg));
ObString first_stmt;
OZ (stmt.get_first_stmt(first_stmt));
OV (OB_NOT_NULL(task_exec_ctx = GET_TASK_EXECUTOR_CTX(ctx)), OB_NOT_INIT);
OZ (task_exec_ctx->get_common_rpc(common_rpc_proxy));
OV (OB_NOT_NULL(common_rpc_proxy));
if (OB_FAIL(ret)) {
// do nothing
} else if (stmt.is_alter_triggers()) {
if (stmt.get_tg_arg().trigger_infos_.count() > 0) {
stmt.get_tg_arg().exec_tenant_id_ = alter_table_arg.exec_tenant_id_;
stmt.get_tg_arg().ddl_id_str_ = alter_table_arg.ddl_id_str_;
stmt.get_tg_arg().ddl_stmt_str_ = first_stmt;
OZ (common_rpc_proxy->alter_trigger(stmt.get_tg_arg()), common_rpc_proxy->get_server());
}
} else {
ObSQLSessionInfo *my_session = NULL;
obrpc::ObAlterTableRes res;
bool is_sync_ddl_user = false;
bool need_modify_fk_validate = false;
bool need_check = false;
bool need_modify_notnull_validate = false;
bool is_oracle_mode = false;
const int64_t tenant_id = alter_table_arg.alter_table_schema_.get_tenant_id();
ObArenaAllocator allocator(ObModIds::OB_SQL_EXECUTOR);
if (OB_FAIL(stmt.get_first_stmt(first_stmt))) {
LOG_WARN("get first statement failed", K(ret));
} else {
alter_table_arg.ddl_stmt_str_ = first_stmt;
my_session = ctx.get_my_session();
if (NULL == my_session) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("failed to get my session", K(ret), K(ctx));
} else if (FALSE_IT(alter_table_arg.sql_mode_ = my_session->get_sql_mode())) {
// do nothing
} else if (FALSE_IT(alter_table_arg.parallelism_ = stmt.get_parallelism())) {
} else if (OB_FAIL(check_alter_partition(ctx, stmt, alter_table_arg))) {
LOG_WARN("check alter partition failed", K(ret));
} else if (OB_FAIL(alter_table_arg.alter_table_schema_.check_if_oracle_compat_mode(is_oracle_mode))) {
LOG_WARN("fail to check if tenant mode is oracle mode", K(ret));
} else if (!is_oracle_mode && OB_FAIL(check_alter_part_key(ctx, alter_table_arg))) {
LOG_WARN("check alter part key failed", K(ret));
} else if (OB_FAIL(set_index_arg_list(ctx, stmt))) {
LOG_WARN("fail to set index_arg_list", K(ret));
} else if (OB_FAIL(ObResolverUtils::check_sync_ddl_user(my_session, is_sync_ddl_user))) {
LOG_WARN("Failed to check sync_dll_user", K(ret));
} else if (OB_INVALID_ID == alter_table_arg.session_id_
&& 0 != my_session->get_sessid_for_table()
&& FALSE_IT(alter_table_arg.session_id_ = my_session->get_sessid_for_table())) {
//impossible
} else {
int64_t foreign_key_checks = 0;
my_session->get_foreign_key_checks(foreign_key_checks);
alter_table_arg.foreign_key_checks_ = is_oracle_mode || (!is_oracle_mode && foreign_key_checks);
if ((obrpc::ObAlterTableArg::ADD_CONSTRAINT == alter_table_arg.alter_constraint_type_
|| (obrpc::ObAlterTableArg::ALTER_CONSTRAINT_STATE == alter_table_arg.alter_constraint_type_))) {
if (OB_FAIL(need_check_constraint_validity(alter_table_arg, need_check))) {
LOG_WARN("check whether need check failed", K(ret));
} else if (need_check && GET_MIN_CLUSTER_VERSION() < CLUSTER_VERSION_4_0_0_0) {
ret = OB_NOT_SUPPORTED;
LOG_USER_ERROR(OB_NOT_SUPPORTED, "Such ddl operation during upgrade");
}
}
// 如果追加 validate 属性的外键或者 modify 外键为 validate 属性时,不立即生效
// 校验已有数据满足外键的 validate 属性要求之后再生效,确保优化器可以正确地根据外键的 validate 属性进行优化
if (OB_SUCC(ret) && alter_table_arg.foreign_key_checks_ && 1 == alter_table_arg.foreign_key_arg_list_.count()) {
if ((!alter_table_arg.foreign_key_arg_list_.at(0).is_modify_fk_state_
&& alter_table_arg.foreign_key_arg_list_.at(0).validate_flag_)
|| (alter_table_arg.foreign_key_arg_list_.at(0).is_modify_validate_flag_
&& alter_table_arg.foreign_key_arg_list_.at(0).validate_flag_)) {
need_modify_fk_validate = true;
}
}
if (OB_SUCC(ret)
&& (GET_MIN_CLUSTER_VERSION() >= CLUSTER_VERSION_3100)) {
if (OB_FAIL(alter_table_rpc_v2(
alter_table_arg,
res,
allocator,
common_rpc_proxy,
my_session,
is_sync_ddl_user))) {
LOG_WARN("Failed to alter table rpc v2", K(ret));
}
} else if (OB_SUCC(ret)
&& (GET_MIN_CLUSTER_VERSION() < CLUSTER_VERSION_3100)) {
if (OB_FAIL(alter_table_rpc_v1(
alter_table_arg,
res,
allocator,
common_rpc_proxy,
my_session,
is_sync_ddl_user))) {
LOG_WARN("Failed to alter table rpc v1", K(ret));
}
}
}
}
if (OB_SUCC(ret)) {
if (!need_check) {
// do nothing, don't check if data is valid
} else if (OB_FAIL(refresh_schema_for_table(tenant_id))) {
LOG_WARN("refresh_schema_for_table failed", K(ret));
} else if (OB_FAIL(ObDDLExecutorUtil::wait_ddl_finish(tenant_id, res.task_id_, *my_session, common_rpc_proxy))) {
LOG_WARN("wait check constraint finish", K(ret));
}
}
if (OB_SUCC(ret)
&& 1 == alter_table_arg.foreign_key_arg_list_.count()) {
if (!need_modify_fk_validate) {
// do nothing, don't check if data is valid
} else {
if (OB_FAIL(refresh_schema_for_table(tenant_id))) {
LOG_WARN("refresh_schema_for_table failed", K(ret));
} else if (OB_FAIL(ObDDLExecutorUtil::wait_ddl_finish(tenant_id, res.task_id_, *my_session, common_rpc_proxy))) {
LOG_WARN("wait fk constraint finish", K(ret));
}
}
}
if (OB_SUCC(ret)) {
const bool is_ddl_retry_task = is_drop_schema_block_concurrent_trans(res.ddl_type_);
const bool need_wait_ddl_finish = is_double_table_long_running_ddl(res.ddl_type_)
|| is_simple_table_long_running_ddl(res.ddl_type_)
|| is_ddl_retry_task;
if (OB_SUCC(ret) && need_wait_ddl_finish) {
int64_t affected_rows = 0;
if (OB_FAIL(refresh_schema_for_table(alter_table_arg.exec_tenant_id_))) {
LOG_WARN("refresh_schema_for_table failed", K(ret));
} else if (!is_ddl_retry_task && OB_FAIL(ObDDLExecutorUtil::wait_ddl_finish(tenant_id, res.task_id_,
*my_session, common_rpc_proxy))) {
LOG_WARN("fail to wait ddl finish", K(ret), K(tenant_id), K(res));
} else if (is_ddl_retry_task && OB_FAIL(ObDDLExecutorUtil::wait_ddl_retry_task_finish(tenant_id, res.task_id_,
*my_session, common_rpc_proxy,
affected_rows))) {
LOG_WARN("fail to wait ddl retry task finish", K(ret), K(tenant_id), K(res));
}
}
}
}
return ret;
}
int ObAlterTableExecutor::need_check_constraint_validity(obrpc::ObAlterTableArg &alter_table_arg, bool &need_check)
{
int ret = OB_SUCCESS;
need_check = false;
if (obrpc::ObAlterTableArg::ADD_CONSTRAINT != alter_table_arg.alter_constraint_type_
&& obrpc::ObAlterTableArg::ALTER_CONSTRAINT_STATE != alter_table_arg.alter_constraint_type_) {
} else {
ObTableSchema::const_constraint_iterator iter =
alter_table_arg.alter_table_schema_.constraint_begin();
for(; iter != alter_table_arg.alter_table_schema_.constraint_end() && OB_SUCC(ret) && !need_check; iter++) {
if (obrpc::ObAlterTableArg::ALTER_CONSTRAINT_STATE == alter_table_arg.alter_constraint_type_) {
if ((*iter)->get_is_modify_validate_flag() && (*iter)->is_validated()) {
need_check = true;
}
} else if (CONSTRAINT_TYPE_CHECK == (*iter)->get_constraint_type()) {
if ((*iter)->is_validated()) {
need_check = (*iter)->is_validated();
}
} else if (CONSTRAINT_TYPE_NOT_NULL == (*iter)->get_constraint_type()) {
if (1 != (*iter)->get_column_cnt()) {
ret = OB_ERR_UNEXPECTED;
} else if (OB_INVALID_ID == *(*iter)->cst_col_begin()) {
// alter table add column not null.
ObTableSchema::const_column_iterator target_col_iter = NULL;
ObTableSchema::const_column_iterator cst_col_iter =
alter_table_arg.alter_table_schema_.column_begin();
ObString cst_col_name;
if (OB_FAIL((*iter)->get_not_null_column_name(cst_col_name))) {
LOG_WARN("get not null column name failed", K(ret));
} else {
for(; NULL == target_col_iter
&& cst_col_iter != alter_table_arg.alter_table_schema_.column_end();
cst_col_iter++) {
if ((*cst_col_iter)->get_column_name_str().length() == cst_col_name.length()
&& 0 == (*cst_col_iter)->get_column_name_str().compare(cst_col_name)) {
target_col_iter = cst_col_iter;
}
}
if (OB_ISNULL(target_col_iter)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("column schema not found", K(ret),K(alter_table_arg.alter_table_schema_),
K(cst_col_name));
} else {
const ObObj &cur_default_value = (*target_col_iter)->get_cur_default_value();
need_check = cur_default_value.is_null() ||
(cur_default_value.is_string_type()
&& (0 == cur_default_value.get_string().case_compare(N_NULL)
|| 0 == cur_default_value.get_string().case_compare("''")));
}
}
} else {
// alter table modify column not null.
need_check = (*iter)->is_validated();
}
} else {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected constraint type", K(ret));
}
if (OB_SUCC(ret) && !need_check) {
(*iter)->set_need_validate_data(false);
}
}
}
return ret;
}
int ObAlterTableExecutor::set_alter_col_nullable_ddl_stmt_str(
obrpc::ObAlterTableArg &alter_table_arg,
common::ObIAllocator &allocator)
{
int ret = OB_SUCCESS;
AlterTableSchema &alter_table_schema = alter_table_arg.alter_table_schema_;
ObString column_name;
const ObColumnSchemaV2 *col_schema = NULL;
char *buf = NULL;
int64_t buf_len = OB_MAX_SQL_LENGTH;
int64_t pos = 0;
if (alter_table_schema.get_column_count() != 1) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("column count != 1", K(ret), K(alter_table_schema.get_column_count()));
} else if (OB_ISNULL(col_schema = alter_table_schema.get_column_schema_by_idx(0))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("col_schema is null", K(ret));
} else {
column_name = col_schema->get_column_name_str();
}
if (OB_FAIL(ret)) {
} else if (column_name.empty()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("cst_name is empty", K(ret), K(column_name));
} else if (OB_ISNULL(buf = static_cast<char *>(allocator.alloc(buf_len)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to allocate memory", K(ret), K(OB_MAX_SQL_LENGTH));
} else if (OB_FAIL(databuff_printf(buf, buf_len, pos,
"ALTER TABLE \"%.*s\".\"%.*s\" MODIFY COLUMN %.*s NULL",
alter_table_schema.get_origin_database_name().length(),
alter_table_schema.get_origin_database_name().ptr(),
alter_table_schema.get_origin_table_name().length(),
alter_table_schema.get_origin_table_name().ptr(),
column_name.length(), column_name.ptr()))) {
LOG_WARN("fail to print ddl_stmt_str for rollback", K(ret));
} else {
alter_table_arg.ddl_stmt_str_.assign_ptr(buf, static_cast<int32_t>(pos));
}
return ret;
}
int ObAlterTableExecutor::resolve_alter_column_partition_expr(
const share::schema::ObColumnSchemaV2 &col_schema,
const share::schema::ObTableSchema &table_schema,
ObSchemaGetterGuard &schema_guard,
ObSQLSessionInfo &session_info,
common::ObIAllocator &allocator,
const bool is_sub_part,
ObExprResType &dst_res_type)
{
int ret = OB_SUCCESS;
ObRawExpr *part_expr = NULL;
ObRawExprFactory expr_factory(allocator);
ObSchemaChecker schema_checker;
if (OB_FAIL(schema_checker.init(schema_guard))) {
LOG_WARN("fail to init schema_checker", K(ret));
} else {
ObResolverParams resolver_ctx;
ObStmtFactory stmt_factory(allocator);
TableItem table_item;
resolver_ctx.allocator_ = &allocator;
resolver_ctx.schema_checker_ = &schema_checker;
resolver_ctx.session_info_ = &session_info;
resolver_ctx.disable_privilege_check_ = PRIV_CHECK_FLAG_DISABLE;
resolver_ctx.expr_factory_ = &expr_factory;
resolver_ctx.stmt_factory_ = &stmt_factory;
resolver_ctx.query_ctx_ = stmt_factory.get_query_ctx();
table_item.table_id_ = table_schema.get_table_id();
table_item.ref_id_ = table_schema.get_table_id();
table_item.type_ = TableItem::BASE_TABLE;
// This is just to use the resolver to resolve the partition expr interface.
// The resolver of any statement has this ability. The reason for using the delete
// resolver is that the delete resolver is the simplest
ObPartitionFuncType part_type;
SMART_VAR (ObDeleteResolver, delete_resolver, resolver_ctx) {
ObDeleteStmt *delete_stmt = delete_resolver.create_stmt<ObDeleteStmt>();
CK (OB_NOT_NULL(delete_stmt));
CK (OB_NOT_NULL(resolver_ctx.query_ctx_));
OZ (delete_stmt->get_table_items().push_back(&table_item));
OZ (delete_stmt->set_table_bit_index(table_schema.get_table_id()));
if (!is_sub_part) {
const ObString &part_str = table_schema.get_part_option().get_part_func_expr_str();
part_type = table_schema.get_part_option().get_part_func_type();
OZ (delete_resolver.resolve_partition_expr(table_item, table_schema,
part_type, part_str, part_expr));
} else {
const ObString &part_str = table_schema.get_sub_part_option().get_part_func_expr_str();
part_type = table_schema.get_sub_part_option().get_part_func_type();
OZ (delete_resolver.resolve_partition_expr(table_item, table_schema,
part_type, part_str, part_expr));
}
CK (OB_NOT_NULL(part_expr));
}
}
if (OB_FAIL(ret)) {
} else if (part_expr->is_column_ref_expr()) {
ObColumnRefRawExpr *column_ref = static_cast<ObColumnRefRawExpr*>(part_expr);
if (column_ref->get_column_id() == col_schema.get_column_id()) {
if (OB_FAIL(ObRawExprUtils::init_column_expr(col_schema, *column_ref))) {
LOG_WARN("init column expr failed", K(ret));
} else if (CS_TYPE_INVALID == column_ref->get_collation_type()) {
column_ref->set_collation_type(table_schema.get_collation_type());
}
}
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < part_expr->get_param_count(); ++i) {
ObRawExpr *sub_expr = part_expr->get_param_expr(i);
if (OB_ISNULL(sub_expr)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("sub_expr should not be null", K(ret));
} else if (sub_expr->is_column_ref_expr()) {
ObColumnRefRawExpr *column_ref = static_cast<ObColumnRefRawExpr*>(sub_expr);
if (column_ref->get_column_id() == col_schema.get_column_id()) {
if (OB_FAIL(ObRawExprUtils::init_column_expr(col_schema, *column_ref))) {
LOG_WARN("init column expr failed", K(ret));
} else if (CS_TYPE_INVALID == column_ref->get_collation_type()) {
column_ref->set_collation_type(table_schema.get_collation_type());
}
}
}
}
}
OZ (part_expr->formalize(&session_info));
OX (dst_res_type = part_expr->get_result_type());
return ret;
}
template<class T>
int ObAlterTableExecutor::calc_range_part_high_bound(
const ObPartitionFuncType part_func_type,
const ObString &col_name,
const ObExprResType &dst_res_type,
T &part,
ObExecContext &ctx)
{
int ret = OB_SUCCESS;
ObExprCtx expr_ctx;
const ObObjType fun_expr_type = dst_res_type.get_type();
const ObCollationType fun_collation_type = dst_res_type.get_collation_type();
ObObjType expected_obj_type = fun_expr_type;
ObSEArray<ObObj, OB_DEFAULT_ARRAY_SIZE> range_partition_obj;
if (OB_FAIL(ObSQLUtils::wrap_expr_ctx(stmt::T_ALTER_TABLE, ctx, ctx.get_allocator(), expr_ctx))) {
LOG_WARN("Failed to wrap expr ctx", K(ret));
} else {
expr_ctx.cast_mode_ = CM_WARN_ON_FAIL; //always set to WARN_ON_FAIL to allow calculate
EXPR_SET_CAST_CTX_MODE(expr_ctx);
const common::ObRowkey &row_key = part.get_high_bound_val();
int64_t obj_cnt = row_key.get_obj_cnt();
for (int64_t i = 0; OB_SUCC(ret) && i < obj_cnt; ++i) {
const ObObj &src_obj = row_key.get_obj_ptr()[i];
if (src_obj.is_max_value()) {
if (OB_FAIL(range_partition_obj.push_back(src_obj))) {
LOG_WARN("array push back fail", K(ret));
}
} else {
if (ob_is_integer_type(fun_expr_type)) {
// Type promotion to int64 or uint64
expected_obj_type = ob_is_int_tc(fun_expr_type) ? ObIntType : ObUInt64Type;
}
const ObObj *dst_obj = NULL;
EXPR_DEFINE_CAST_CTX(expr_ctx, CM_NONE);
cast_ctx.dest_collation_ = fun_collation_type;
EXPR_CAST_OBJ_V2(expected_obj_type, src_obj, dst_obj);
if (OB_SUCC(ret)) {
if (OB_ISNULL(dst_obj)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("succ to cast obj, but dst_obj is NULL", K(ret),
K(expected_obj_type), K(fun_expr_type), K(src_obj));
} else if ((PARTITION_FUNC_TYPE_RANGE_COLUMNS == part_func_type
|| PARTITION_FUNC_TYPE_LIST_COLUMNS == part_func_type)
&& OB_FAIL(ObResolverUtils::check_partition_range_value_result_type(part_func_type,
dst_res_type,
col_name,
const_cast<ObObj&>(*dst_obj)))) {
LOG_WARN("get partition range value result type failed", K(ret));
} else if (OB_FAIL(range_partition_obj.push_back(*dst_obj))) {
LOG_WARN("array push back fail", K(ret));
}
} else if (OB_ERR_UNEXPECTED != ret) {
ret = OB_ERR_WRONG_TYPE_COLUMN_VALUE_ERROR;
LOG_WARN("failed to cast obj", K(expected_obj_type), K(fun_expr_type), K(src_obj));
}
}
}
if (OB_SUCC(ret)) {
ObRowkey high_rowkey(&range_partition_obj.at(0), range_partition_obj.count());
part.reset_high_bound_val();
if (OB_FAIL(part.set_high_bound_val(high_rowkey))) {
LOG_WARN("deep_copy_str fail", K(ret));
}
}
}
return ret;
}
int ObAlterTableExecutor::calc_range_values_exprs(
const ObColumnSchemaV2 &col_schema,
const share::schema::ObTableSchema &orig_table_schema,
share::schema::ObTableSchema &new_table_schema,
ObSchemaGetterGuard &schema_guard,
ObSQLSessionInfo &session_info,
common::ObIAllocator &allocator,
ObExecContext &ctx,
const bool is_subpart)
{
int ret = OB_SUCCESS;
ObExprResType dst_res_type;
if (OB_FAIL(resolve_alter_column_partition_expr(col_schema, orig_table_schema, schema_guard,
session_info, allocator, is_subpart, dst_res_type))) {
LOG_WARN("failed to resolve alter column partition expr", K(ret));
} else if (is_subpart) {
const int64_t part_num = new_table_schema.get_partition_num();
const ObPartitionFuncType part_func_type = orig_table_schema.get_sub_part_option().get_part_func_type();
ObPartition **part_array = new_table_schema.get_part_array();
CK (OB_NOT_NULL(part_array));
for (int64_t i = 0; i < part_num && OB_SUCC(ret); ++i) {
ObPartition *part = part_array[i];
CK (OB_NOT_NULL(part));
CK (OB_NOT_NULL(part->get_subpart_array()));
for (int64_t j = 0; OB_SUCC(ret) && j < part->get_subpartition_num(); j++) {
ObSubPartition *sub_part = part->get_subpart_array()[j];
CK (OB_NOT_NULL(sub_part));
OZ (calc_range_part_high_bound(part_func_type,
col_schema.get_column_name_str(),
dst_res_type,
*sub_part,
ctx));
}
}
} else {
const int64_t part_num = new_table_schema.get_partition_num();
const ObPartitionFuncType part_func_type = orig_table_schema.get_part_option().get_part_func_type();
ObPartition **part_array = new_table_schema.get_part_array();
CK (OB_NOT_NULL(part_array));
for (int64_t i = 0; i < part_num && OB_SUCC(ret); ++i) {
ObPartition *part = part_array[i];
CK (OB_NOT_NULL(part));
OZ (calc_range_part_high_bound(part_func_type,
col_schema.get_column_name_str(),
dst_res_type,
*part,
ctx));
}
}
return ret;
}
template<class T>
int ObAlterTableExecutor::calc_list_part_rows(
const ObPartitionFuncType part_func_type,
const ObString &col_name,
const ObExprResType &dst_res_type,
const T &orig_part,
T &new_part,
ObExecContext &ctx,
common::ObIAllocator &allocator)
{
int ret = OB_SUCCESS;
ObExprCtx expr_ctx;
const ObObjType fun_expr_type = dst_res_type.get_type();
const ObCollationType fun_collation_type = dst_res_type.get_collation_type();
ObObjType expected_obj_type = fun_expr_type;
ObSEArray<ObObj, OB_DEFAULT_ARRAY_SIZE> range_partition_obj;
if (OB_FAIL(ObSQLUtils::wrap_expr_ctx(stmt::T_ALTER_TABLE, ctx, ctx.get_allocator(), expr_ctx))) {
LOG_WARN("Failed to wrap expr ctx", K(ret));
} else {
new_part.reset_list_row_values();
expr_ctx.cast_mode_ = CM_WARN_ON_FAIL; //always set to WARN_ON_FAIL to allow calculate
EXPR_SET_CAST_CTX_MODE(expr_ctx);
const common::ObIArray<common::ObNewRow>& row_list = orig_part.get_list_row_values();
for (int64_t i = 0; OB_SUCC(ret) && i < row_list.count(); i ++) {
const common::ObNewRow &row = row_list.at(i);
for (int64_t j = 0; OB_SUCC(ret) && j < row.get_count(); ++j) {
const ObObj &src_obj = row.get_cell(j);
if (src_obj.is_max_value()) {
if (OB_FAIL(range_partition_obj.push_back(src_obj))) {
LOG_WARN("array push back fail", K(ret));
}
} else {
if (ob_is_integer_type(fun_expr_type)) {
// Type promotion to int64 or uint64
expected_obj_type = ob_is_int_tc(fun_expr_type) ? ObIntType : ObUInt64Type;
}
const ObObj *dst_obj = NULL;
EXPR_DEFINE_CAST_CTX(expr_ctx, CM_NONE);
cast_ctx.dest_collation_ = fun_collation_type;
EXPR_CAST_OBJ_V2(expected_obj_type, src_obj, dst_obj);
if (OB_SUCC(ret)) {
if (OB_ISNULL(dst_obj)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("succ to cast obj, but dst_obj is NULL", K(ret),
K(expected_obj_type), K(fun_expr_type), K(src_obj));
} else if ((PARTITION_FUNC_TYPE_RANGE_COLUMNS == part_func_type
|| PARTITION_FUNC_TYPE_LIST_COLUMNS == part_func_type)
&& OB_FAIL(ObResolverUtils::check_partition_range_value_result_type(part_func_type,
dst_res_type,
col_name,
const_cast<ObObj&>(*dst_obj)))) {
LOG_WARN("get partition range value result type failed", K(ret));
} else if (OB_FAIL(range_partition_obj.push_back(*dst_obj))) {
LOG_WARN("array push back fail", K(ret));
}
} else if (OB_ERR_UNEXPECTED != ret) {
ret = OB_ERR_WRONG_TYPE_COLUMN_VALUE_ERROR;
LOG_WARN("failed to cast obj", K(expected_obj_type), K(fun_expr_type), K(src_obj));
}
}
}
if (OB_SUCC(ret)) {
ObNewRow row;
ObObj* obj_array = (ObObj *)allocator.alloc(range_partition_obj.count() * sizeof(ObObj));
CK (OB_NOT_NULL(obj_array));
for (int64_t k = 0; OB_SUCC(ret) && k < range_partition_obj.count(); k ++) {
new (obj_array + k) ObObj();
obj_array[k] = (&range_partition_obj.at(0))[k];
}
OX (row.assign(obj_array, range_partition_obj.count()));
OZ (new_part.add_list_row(row));
OX (range_partition_obj.reuse());
}
}
}
return ret;
}
int ObAlterTableExecutor::calc_list_values_exprs(
const ObColumnSchemaV2 &col_schema,
const share::schema::ObTableSchema &orig_table_schema,
share::schema::ObTableSchema &new_table_schema,
ObSchemaGetterGuard &schema_guard,
ObSQLSessionInfo &session_info,
common::ObIAllocator &allocator,
ObExecContext &ctx,
const bool is_subpart)
{
int ret = OB_SUCCESS;
ObExprResType dst_res_type;
if (OB_FAIL(resolve_alter_column_partition_expr(col_schema, orig_table_schema, schema_guard,
session_info, allocator, is_subpart, dst_res_type))) {
LOG_WARN("failed to resolve alter column partition expr", K(ret));
} else if (is_subpart) {
const int64_t part_num = orig_table_schema.get_partition_num();
const ObPartitionFuncType part_func_type = orig_table_schema.get_sub_part_option().get_part_func_type();
ObPartition **orig_part_array = orig_table_schema.get_part_array();
ObPartition **new_part_array = new_table_schema.get_part_array();
CK (OB_NOT_NULL(orig_part_array) && OB_NOT_NULL(new_part_array));
for (int64_t i = 0; i < part_num && OB_SUCC(ret); ++i) {
ObPartition *orig_part = orig_part_array[i];
ObPartition *new_part = new_part_array[i];
CK (OB_NOT_NULL(orig_part) && OB_NOT_NULL(new_part));
CK (OB_NOT_NULL(orig_part->get_subpart_array()));
CK (OB_NOT_NULL(orig_part->get_subpart_array()));
for (int64_t j = 0; OB_SUCC(ret) && j < orig_part->get_subpartition_num(); j++) {
ObSubPartition *orig_sub_part = orig_part->get_subpart_array()[j];
ObSubPartition *new_sub_part = new_part->get_subpart_array()[j];
CK (OB_NOT_NULL(orig_sub_part) && OB_NOT_NULL(new_sub_part));
OZ (calc_list_part_rows(part_func_type,
col_schema.get_column_name_str(),
dst_res_type,
*orig_sub_part,
*new_sub_part,
ctx,
allocator));
}
}
} else {
const int64_t part_num = orig_table_schema.get_partition_num();
const ObPartitionFuncType part_func_type = orig_table_schema.get_part_option().get_part_func_type();
ObPartition **orig_part_array = orig_table_schema.get_part_array();
ObPartition **new_part_array = new_table_schema.get_part_array();
CK (OB_NOT_NULL(orig_part_array) && OB_NOT_NULL(new_part_array));
for (int64_t i = 0; i < part_num && OB_SUCC(ret); ++i) {
ObPartition *orig_part = orig_part_array[i];
ObPartition *new_part = new_part_array[i];
CK (OB_NOT_NULL(orig_part) && OB_NOT_NULL(new_part));
OZ (calc_list_part_rows(part_func_type,
col_schema.get_column_name_str(),
dst_res_type,
*orig_part,
*new_part,
ctx,
allocator));
}
}
return ret;
}
int ObAlterTableExecutor::check_alter_part_key(ObExecContext &ctx,
obrpc::ObAlterTableArg &arg)
{
int ret = OB_SUCCESS;
if (arg.is_alter_columns_) {
bool is_contain_part_key = false;
ObSchemaGetterGuard schema_guard;
common::ObIAllocator &allocator = arg.allocator_;
AlterTableSchema &table_schema = const_cast<AlterTableSchema &>(arg.alter_table_schema_);
share::schema::ObTableSchema::const_column_iterator it_begin = table_schema.column_begin();
share::schema::ObTableSchema::const_column_iterator it_end = table_schema.column_end();
const uint64_t tenant_id = table_schema.get_tenant_id();
schema_guard.set_session_id(arg.session_id_);
const ObString &origin_database_name = table_schema.get_origin_database_name();
const ObString &origin_table_name = table_schema.get_origin_table_name();
ObSQLSessionInfo *my_session = ctx.get_my_session();
const share::schema::ObTableSchema *orig_table_schema = NULL;
AlterColumnSchema *alter_column_schema = NULL;
const ObColumnSchemaV2 *orig_column_schema = NULL;
bool is_oracle_mode = false;
CK (!origin_database_name.empty() && !origin_table_name.empty());
CK (OB_NOT_NULL(my_session));
OZ (ObMultiVersionSchemaService::get_instance().get_tenant_schema_guard(
tenant_id, schema_guard));
if (FAILEDx(schema_guard.get_table_schema(tenant_id,
origin_database_name,
origin_table_name,
false/*is_index*/,
orig_table_schema))) {
LOG_WARN("fail to get table schema", KR(ret), K(tenant_id), K(origin_database_name), K(origin_table_name));
} else if (OB_ISNULL(orig_table_schema)) {
ret = OB_TABLE_NOT_EXIST;
LOG_WARN("table is not exist", KR(ret), K(tenant_id), K(origin_database_name), K(origin_table_name));
} else if (OB_FAIL(orig_table_schema->check_if_oracle_compat_mode(is_oracle_mode))) {
LOG_WARN("fail to check oracle mode", KR(ret), KPC(orig_table_schema));
} else if (is_oracle_mode) {
// skip
} else {
OZ (table_schema.assign_partition_schema(*orig_table_schema));
for(;OB_SUCC(ret) && it_begin != it_end; it_begin++) {
if (OB_ISNULL(alter_column_schema = static_cast<AlterColumnSchema *>(*it_begin))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("*it_begin is NULL", K(ret));
} else if (OB_DDL_CHANGE_COLUMN == alter_column_schema->alter_type_
|| OB_DDL_MODIFY_COLUMN == alter_column_schema->alter_type_) {
bool is_same = false;
const ObString &orig_column_name = alter_column_schema->get_origin_column_name();
orig_column_schema = orig_table_schema->get_column_schema(orig_column_name);
if (OB_ISNULL(orig_column_schema)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unknown column", K(orig_column_name), KPC(orig_table_schema));
} else if (OB_FAIL(ObTableSchema::check_is_exactly_same_type(
*orig_column_schema, *alter_column_schema, is_same))) {
LOG_WARN("failed to check is exactly same type", K(ret));
} else if (!is_same && orig_column_schema->is_tbl_part_key_column()) {
const bool is_part = orig_column_schema->is_part_key_column();
const bool is_subpart = orig_column_schema->is_subpart_key_column();
if (is_contain_part_key) {
ret = OB_NOT_SUPPORTED;
LOG_USER_ERROR(OB_NOT_SUPPORTED, "There are several mutually exclusive "
"DDL in single statement");
}
// The column may be part key column, and may be subpart key column too.
if (OB_SUCC(ret) && is_part) {
if (orig_table_schema->is_list_part()) {
OZ (calc_list_values_exprs(*alter_column_schema, *orig_table_schema, table_schema,
schema_guard, *my_session, allocator, ctx, false));
} else if (orig_table_schema->is_range_part()) {
OZ (calc_range_values_exprs(*alter_column_schema, *orig_table_schema, table_schema,
schema_guard, *my_session, allocator, ctx, false));
}
}
if (OB_SUCC(ret) && is_subpart) {
if (orig_table_schema->is_list_subpart()) {
OZ (calc_list_values_exprs(*alter_column_schema, *orig_table_schema, table_schema,
schema_guard, *my_session, allocator, ctx, true));
} else if (orig_table_schema->is_range_subpart()) {
OZ (calc_range_values_exprs(*alter_column_schema, *orig_table_schema, table_schema,
schema_guard, *my_session, allocator, ctx, true));
}
}
is_contain_part_key = true;
}
}
}
}
}
return ret;
}
int ObAlterTableExecutor::check_alter_partition(ObExecContext &ctx,
ObAlterTableStmt &stmt,
const obrpc::ObAlterTableArg &arg)
{
int ret = OB_SUCCESS;
AlterTableSchema &table_schema = const_cast<AlterTableSchema &>(arg.alter_table_schema_);
if (arg.is_alter_partitions_) {
if (obrpc::ObAlterTableArg::PARTITIONED_TABLE == arg.alter_part_type_
|| obrpc::ObAlterTableArg::REORGANIZE_PARTITION == arg.alter_part_type_
|| obrpc::ObAlterTableArg::SPLIT_PARTITION == arg.alter_part_type_) {
ObPartition **partition_array = table_schema.get_part_array();
int64_t realy_part_num = OB_INVALID_PARTITION_ID;
if (obrpc::ObAlterTableArg::SPLIT_PARTITION == arg.alter_part_type_) {
realy_part_num = table_schema.get_part_option().get_part_num();
} else {
realy_part_num = table_schema.get_partition_num();
}
if (table_schema.is_range_part()) {
if (OB_FAIL(ObPartitionExecutorUtils::set_range_part_high_bound(ctx,
stmt::T_CREATE_TABLE,
table_schema,
stmt,
false /*is_subpart*/))) {
LOG_WARN("partition_array is NULL", K(ret));
}
} else if (table_schema.is_list_part()) {
if (OB_ISNULL(partition_array)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("NULL ptr", K(ret));
} else if (OB_FAIL(ObPartitionExecutorUtils::cast_list_expr_to_obj(ctx,
stmt::T_CREATE_TABLE,
false, // is_subpart
realy_part_num,
partition_array,
NULL,
stmt.get_part_fun_exprs(),
stmt.get_part_values_exprs()))) {
LOG_WARN("partition_array is NULL", K(ret));
}
} else if (obrpc::ObAlterTableArg::PARTITIONED_TABLE != arg.alter_part_type_) {
ret = OB_ERR_ONLY_ON_RANGE_LIST_PARTITION;
LOG_WARN("only support range or list part", K(ret), K(arg.alter_part_type_),
"partition type", table_schema.get_part_option().get_part_func_type());
}
if (OB_FAIL(ret)) {
} else if (obrpc::ObAlterTableArg::SPLIT_PARTITION == arg.alter_part_type_) {
const_cast<AlterTableSchema&>(table_schema).get_part_option().set_part_num(table_schema.get_partition_num());
}
} else if (obrpc::ObAlterTableArg::REPARTITION_TABLE == arg.alter_part_type_) {
if (table_schema.is_range_part() || table_schema.is_list_part()
|| table_schema.is_range_subpart() || table_schema.is_list_subpart()) {
if (OB_FAIL(ObPartitionExecutorUtils::calc_values_exprs_for_alter_table(ctx,
table_schema,
stmt))) {
LOG_WARN("failed to calc values exprs for alter partition by", K(ret));
}
}
} else if (obrpc::ObAlterTableArg::ADD_PARTITION == arg.alter_part_type_) {
if (table_schema.is_range_part() || table_schema.is_list_part()) {
if (OB_FAIL(ObPartitionExecutorUtils::calc_values_exprs_for_alter_table(ctx,
table_schema,
stmt))) {
LOG_WARN("failed to calc values exprs for alter table", K(ret));
}
} else {
ret = OB_NOT_SUPPORTED;
LOG_USER_WARN(OB_NOT_SUPPORTED, "add hash partition");
}
} else if (obrpc::ObAlterTableArg::ADD_SUB_PARTITION == arg.alter_part_type_) {
if (table_schema.is_range_subpart()) {
if (OB_FAIL(ObPartitionExecutorUtils::set_individual_range_part_high_bound(
ctx, stmt::T_CREATE_TABLE, table_schema, stmt))) {
LOG_WARN("failed to set individual range part high bound", K(ret));
}
} else if (table_schema.is_list_subpart()) {
if (OB_FAIL(ObPartitionExecutorUtils::set_individual_list_part_rows(
ctx, stmt, stmt::T_CREATE_TABLE, table_schema,
stmt.get_subpart_fun_exprs(),
stmt.get_individual_subpart_values_exprs()))) {
LOG_WARN("failed to set individual list part rows", K(ret));
}
} else {
ret = OB_NOT_SUPPORTED;
LOG_USER_WARN(OB_NOT_SUPPORTED, "add hash subpartition");
}
} else if (obrpc::ObAlterTableArg::DROP_PARTITION == arg.alter_part_type_
|| obrpc::ObAlterTableArg::DROP_SUB_PARTITION == arg.alter_part_type_
|| obrpc::ObAlterTableArg::TRUNCATE_PARTITION == arg.alter_part_type_
|| obrpc::ObAlterTableArg::TRUNCATE_SUB_PARTITION == arg.alter_part_type_) {
// do-nothing
} else {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("no operation", K(arg.alter_part_type_), K(ret));
}
LOG_DEBUG("dump table schema", K(table_schema));
} else if (stmt.get_interval_expr() != NULL) {
CK (NULL != stmt.get_transition_expr());
OZ (ObPartitionExecutorUtils::check_transition_interval_valid(
stmt::T_CREATE_TABLE,
ctx,
stmt.get_transition_expr(),
stmt.get_interval_expr()));
OZ (ObPartitionExecutorUtils::set_interval_value(ctx,
stmt::T_CREATE_TABLE,
table_schema,
stmt.get_interval_expr()));
}
return ret;
}
int ObAlterTableExecutor::set_index_arg_list(ObExecContext &ctx, ObAlterTableStmt &stmt)
{
int ret = OB_SUCCESS;
ObSQLSessionInfo *my_session = ctx.get_my_session();
obrpc::ObAlterTableArg &alter_table_arg = const_cast<obrpc::ObAlterTableArg &>(stmt.get_alter_table_arg());
if (OB_ISNULL(my_session)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("session is null", K(ret));
} else if (stmt.get_index_partition_resolve_results().count()
!= stmt.get_index_arg_list().count()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid index resolve result", K(ret), K(stmt));
}
for (int64_t i = 0; OB_SUCC(ret) && i < stmt.get_index_arg_list().count(); i++) {
HEAP_VAR(ObCreateIndexStmt, index_stmt) {
obrpc::ObCreateIndexArg &create_index_arg = *(stmt.get_index_arg_list().at(i));
ObPartitionResolveResult &resolve_result = stmt.get_index_partition_resolve_results().at(i);
index_stmt.get_part_fun_exprs() = resolve_result.get_part_fun_exprs();
index_stmt.get_part_values_exprs() = resolve_result.get_part_values_exprs();
index_stmt.get_subpart_fun_exprs() = resolve_result.get_subpart_fun_exprs();
index_stmt.get_template_subpart_values_exprs() = resolve_result.get_template_subpart_values_exprs();
index_stmt.get_individual_subpart_values_exprs() = resolve_result.get_individual_subpart_values_exprs();
if (OB_FAIL(index_stmt.get_create_index_arg().assign(create_index_arg))) {
LOG_WARN("fail to assign create index arg", K(ret));
} else if (OB_FAIL(ObPartitionExecutorUtils::calc_values_exprs(ctx, index_stmt))) {
LOG_WARN("fail to compare range partition expr", K(ret));
} else {
create_index_arg.is_inner_ = my_session->is_inner();
if (OB_FAIL(create_index_arg.assign(index_stmt.get_create_index_arg()))) {
LOG_WARN("fail to assign create index arg", K(ret));
} else if (OB_FAIL(alter_table_arg.index_arg_list_.push_back(&create_index_arg))) {
LOG_WARN("fail to push back index_arg", K(ret));
}
}
}
}
return ret;
}
/**
*
*/
ObDropTableExecutor::ObDropTableExecutor()
{
}
ObDropTableExecutor::~ObDropTableExecutor()
{
}
int ObDropTableExecutor::execute(ObExecContext &ctx, ObDropTableStmt &stmt)
{
int ret = OB_SUCCESS;
ObTaskExecutorCtx *task_exec_ctx = NULL;
obrpc::ObCommonRpcProxy *common_rpc_proxy = NULL;
obrpc::ObDDLRes res;
const obrpc::ObDropTableArg &drop_table_arg = stmt.get_drop_table_arg();
ObString first_stmt;
ObSQLSessionInfo *my_session = NULL;
int64_t foreign_key_checks = 0;
if (OB_FAIL(stmt.get_first_stmt(first_stmt))) {
LOG_WARN("get first statement failed", K(ret));
} else {
int64_t affected_rows = 0;
const_cast<obrpc::ObDropTableArg&>(drop_table_arg).ddl_stmt_str_ = first_stmt;
my_session = ctx.get_my_session();
if (NULL == my_session) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("failed to get my session", K(ret), K(ctx));
} else if (OB_ISNULL(task_exec_ctx = GET_TASK_EXECUTOR_CTX(ctx))) {
ret = OB_NOT_INIT;
LOG_WARN("get task executor context failed");
} else if (OB_FAIL(task_exec_ctx->get_common_rpc(common_rpc_proxy))) {
LOG_WARN("get common rpc proxy failed", K(ret));
} else if (OB_ISNULL(common_rpc_proxy)){
ret = OB_ERR_UNEXPECTED;
LOG_WARN("common rpc proxy should not be null", K(ret));
} else if (OB_INVALID_ID == drop_table_arg.session_id_
&& FALSE_IT(const_cast<obrpc::ObDropTableArg&>(drop_table_arg).session_id_ = my_session->get_sessid_for_table())) {
//impossible
} else if (FALSE_IT(my_session->get_foreign_key_checks(foreign_key_checks))) {
} else if (FALSE_IT(const_cast<obrpc::ObDropTableArg&>(drop_table_arg).foreign_key_checks_ = is_oracle_mode() || (is_mysql_mode() && foreign_key_checks))) {
} else if (FALSE_IT(const_cast<obrpc::ObDropTableArg&>(drop_table_arg).compat_mode_ =
ORACLE_MODE == my_session->get_compatibility_mode() ?
lib::Worker::CompatMode::ORACLE : lib::Worker::CompatMode::MYSQL)) {
} else if (OB_FAIL(common_rpc_proxy->drop_table(drop_table_arg, res))) {
LOG_WARN("rpc proxy drop table failed", K(ret), "dst", common_rpc_proxy->get_server());
} else if (res.is_valid() && OB_FAIL(ObDDLExecutorUtil::wait_ddl_retry_task_finish(res.tenant_id_, res.task_id_, *my_session, common_rpc_proxy, affected_rows))) {
LOG_WARN("wait ddl finish failed", K(ret), K(res.tenant_id_), K(res.task_id_));
}
}
return ret;
}
/**
*
*/
ObRenameTableExecutor::ObRenameTableExecutor()
{
}
ObRenameTableExecutor::~ObRenameTableExecutor()
{
}
int ObRenameTableExecutor::execute(ObExecContext &ctx, ObRenameTableStmt &stmt)
{
int ret = OB_SUCCESS;
const obrpc::ObRenameTableArg &rename_table_arg = stmt.get_rename_table_arg();
ObTaskExecutorCtx *task_exec_ctx = NULL;
obrpc::ObCommonRpcProxy *common_rpc_proxy = NULL;
if (OB_ISNULL(task_exec_ctx = GET_TASK_EXECUTOR_CTX(ctx))) {
ret = OB_NOT_INIT;
LOG_WARN("get task executor context failed");
} else if (OB_FAIL(task_exec_ctx->get_common_rpc(common_rpc_proxy))) {
LOG_WARN("get common rpc proxy failed", K(ret));
} else if (OB_ISNULL(common_rpc_proxy)){
ret = OB_ERR_UNEXPECTED;
LOG_WARN("common rpc proxy should not be null", K(ret));
} else if (OB_FAIL(common_rpc_proxy->rename_table(rename_table_arg))) {
LOG_WARN("rpc proxy rename table failed", K(ret));
}
return ret;
}
/**
*
*/
ObTruncateTableExecutor::ObTruncateTableExecutor()
{
}
ObTruncateTableExecutor::~ObTruncateTableExecutor()
{
}
int ObTruncateTableExecutor::execute(ObExecContext &ctx, ObTruncateTableStmt &stmt)
{
int ret = OB_SUCCESS;
const obrpc::ObTruncateTableArg &truncate_table_arg = stmt.get_truncate_table_arg();
ObString first_stmt;
obrpc::ObDDLRes res;
ObSQLSessionInfo *my_session = ctx.get_my_session();
if (OB_FAIL(stmt.get_first_stmt(first_stmt))) {
LOG_WARN("get first statement failed", K(ret));
} else {
const_cast<obrpc::ObTruncateTableArg&>(truncate_table_arg).ddl_stmt_str_ = first_stmt;
ObTaskExecutorCtx *task_exec_ctx = NULL;
obrpc::ObCommonRpcProxy *common_rpc_proxy = NULL;
if (OB_ISNULL(task_exec_ctx = GET_TASK_EXECUTOR_CTX(ctx))) {
ret = OB_NOT_INIT;
LOG_WARN("get task executor context failed");
} else if (OB_FAIL(task_exec_ctx->get_common_rpc(common_rpc_proxy))) {
LOG_WARN("get common rpc proxy failed", K(ret));
} else if (OB_ISNULL(common_rpc_proxy)){
ret = OB_ERR_UNEXPECTED;
LOG_WARN("common rpc proxy should not be null", K(ret));
} else if (OB_ISNULL(my_session)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("failed to get my session", K(ret), K(ctx));
} else if (OB_INVALID_ID == truncate_table_arg.session_id_
&& FALSE_IT(const_cast<obrpc::ObTruncateTableArg&>(truncate_table_arg).session_id_ = my_session->get_sessid_for_table())) {
//impossible
} else if (!stmt.is_truncate_oracle_temp_table()) {
int64_t foreign_key_checks = 0;
my_session->get_foreign_key_checks(foreign_key_checks);
const_cast<obrpc::ObTruncateTableArg&>(truncate_table_arg).foreign_key_checks_ = is_oracle_mode() || (is_mysql_mode() && foreign_key_checks);
const_cast<obrpc::ObTruncateTableArg&>(truncate_table_arg).compat_mode_ = ORACLE_MODE == my_session->get_compatibility_mode()
? lib::Worker::CompatMode::ORACLE : lib::Worker::CompatMode::MYSQL;
int64_t affected_rows = 0;
if (OB_FAIL(common_rpc_proxy->truncate_table(truncate_table_arg, res))) {
LOG_WARN("rpc proxy alter table failed", K(ret));
} else if (res.is_valid()
&& OB_FAIL(ObDDLExecutorUtil::wait_ddl_retry_task_finish(res.tenant_id_, res.task_id_, *my_session, common_rpc_proxy, affected_rows))) {
LOG_WARN("wait ddl finish failed", K(ret));
}
} else {
ObSqlString sql;
int64_t affect_rows = 0;
common::ObOracleSqlProxy oracle_sql_proxy;
uint64_t tenant_id = stmt.get_tenant_id();
ObString db_name = stmt.get_database_name();
ObString tab_name = stmt.get_table_name();
int64_t session_id = my_session->get_sessid_for_table();
if (OB_FAIL(oracle_sql_proxy.init(GCTX.sql_proxy_->get_pool()))) {
LOG_WARN("init oracle sql proxy failed", K(ret));
} else if (OB_FAIL(sql.assign_fmt("DELETE FROM \"%.*s\".\"%.*s\" WHERE "
"%s = %ld",
db_name.length(), db_name.ptr(),
tab_name.length(), tab_name.ptr(),
OB_HIDDEN_SESSION_ID_COLUMN_NAME, session_id))) {
LOG_WARN("fail to assign sql", K(ret));
} else if (OB_FAIL(oracle_sql_proxy.write(tenant_id, sql.ptr(), affect_rows))) {
LOG_WARN("execute sql failed", K(ret), K(sql), K(affect_rows));
} else {
int64_t query_timeout = 0;
my_session->get_query_timeout(query_timeout);
LOG_INFO("succeed to truncate table using delete", K(sql), K(affect_rows),
K(query_timeout), K(THIS_WORKER.get_timeout_remain()));
}
}
}
return ret;
}
ObCreateTableLikeExecutor::ObCreateTableLikeExecutor()
{
}
ObCreateTableLikeExecutor::~ObCreateTableLikeExecutor()
{
}
int ObCreateTableLikeExecutor::execute(ObExecContext &ctx, ObCreateTableLikeStmt &stmt)
{
int ret = OB_SUCCESS;
const obrpc::ObCreateTableLikeArg &create_table_like_arg = stmt.get_create_table_like_arg();
ObString first_stmt;
if (OB_FAIL(stmt.get_first_stmt(first_stmt))) {
LOG_WARN("get first statement failed", K(ret));
} else {
const_cast<obrpc::ObCreateTableLikeArg&>(create_table_like_arg).ddl_stmt_str_ = first_stmt;
const_cast<obrpc::ObCreateTableLikeArg&>(create_table_like_arg).session_id_ =
ctx.get_my_session()->get_sessid_for_table();
ObTaskExecutorCtx *task_exec_ctx = NULL;
obrpc::ObCommonRpcProxy *common_rpc_proxy = NULL;
if (OB_ISNULL(task_exec_ctx = GET_TASK_EXECUTOR_CTX(ctx))) {
ret = OB_NOT_INIT;
LOG_WARN("get task executor context failed");
} else if (OB_FAIL(task_exec_ctx->get_common_rpc(common_rpc_proxy))) {
LOG_WARN("get common rpc proxy failed", K(ret));
} else if (OB_ISNULL(common_rpc_proxy)){
ret = OB_ERR_UNEXPECTED;
LOG_WARN("common rpc proxy should not be null", K(ret));
} else if (OB_FAIL(common_rpc_proxy->create_table_like(create_table_like_arg))) {
LOG_WARN("rpc proxy create table like failed", K(ret));
}
}
return ret;
}
int ObFlashBackTableFromRecyclebinExecutor::execute(ObExecContext &ctx, ObFlashBackTableFromRecyclebinStmt &stmt)
{
int ret = OB_SUCCESS;
const obrpc::ObFlashBackTableFromRecyclebinArg &flashback_table_arg = stmt.get_flashback_table_arg();
ObString first_stmt;
if (OB_FAIL(stmt.get_first_stmt(first_stmt))) {
LOG_WARN("get first statement failed", K(ret));
} else {
const_cast<obrpc::ObFlashBackTableFromRecyclebinArg&>(flashback_table_arg).ddl_stmt_str_ = first_stmt;
ObTaskExecutorCtx *task_exec_ctx = NULL;
obrpc::ObCommonRpcProxy *common_rpc_proxy = NULL;
if (OB_ISNULL(task_exec_ctx = GET_TASK_EXECUTOR_CTX(ctx))) {
ret = OB_NOT_INIT;
LOG_WARN("get task executor context failed");
} else if (OB_FAIL(task_exec_ctx->get_common_rpc(common_rpc_proxy))) {
LOG_WARN("get common rpc proxy failed", K(ret));
} else if (OB_ISNULL(common_rpc_proxy)){
ret = OB_ERR_UNEXPECTED;
LOG_WARN("common rpc proxy should not be null", K(ret));
} else if (OB_FAIL(common_rpc_proxy->flashback_table_from_recyclebin(flashback_table_arg))) {
LOG_WARN("rpc proxy flashback table failed", K(ret));
}
}
return ret;
}
int ObFlashBackTableToScnExecutor::execute(ObExecContext &ctx, ObFlashBackTableToScnStmt &stmt) {
int ret = OB_SUCCESS;
obrpc::ObFlashBackTableToScnArg &arg = stmt.flashback_table_to_scn_arg_;
RowDesc row_desc;
ObTempExpr *temp_expr = NULL;
OZ(ObStaticEngineExprCG::gen_expr_with_row_desc(stmt.get_time_expr(),
row_desc, ctx.get_allocator(), ctx.get_my_session(), temp_expr));
CK(OB_NOT_NULL(temp_expr));
if (OB_SUCC(ret)) {
ObNewRow empty_row;
ObObj tmp_obj;
ObExprCtx expr_ctx;
expr_ctx.calc_buf_ = &ctx.get_allocator();
expr_ctx.phy_plan_ctx_ = ctx.get_physical_plan_ctx();
expr_ctx.my_session_ = ctx.get_my_session();
expr_ctx.exec_ctx_ = &ctx;
const int64_t cur_time = expr_ctx.phy_plan_ctx_->has_cur_time() ?
expr_ctx.phy_plan_ctx_->get_cur_time().get_timestamp() : ObTimeUtility::current_time();
expr_ctx.phy_plan_ctx_->set_cur_time(cur_time, *expr_ctx.my_session_);
ObObj result_obj;
if (OB_FAIL(temp_expr->eval(ctx, empty_row, result_obj))) {
LOG_WARN("failed to calculate", K(ret));
} else if (ObFlashBackTableToScnStmt::TIME_TIMESTAMP == stmt.get_time_type()) {
EXPR_DEFINE_CAST_CTX(expr_ctx, CM_NONE);
if (OB_FAIL(ObObjCaster::to_type(ObTimestampTZType, cast_ctx, tmp_obj, result_obj))) {
LOG_WARN("failed to cast object", K(ret), K(tmp_obj));
} else {
arg.time_point_ = result_obj.v_.datetime_;
LOG_DEBUG("timestamp_val result", K(tmp_obj), K(result_obj), K(arg.time_point_));
}
} else if (ObFlashBackTableToScnStmt::TIME_SCN == stmt.get_time_type()) {
EXPR_DEFINE_CAST_CTX(expr_ctx, CM_NONE);
if (OB_FAIL(ObObjCaster::to_type(ObUInt64Type, cast_ctx, tmp_obj, result_obj))) {
LOG_WARN("failed to cast object", K(ret), K(tmp_obj));
} else {
arg.time_point_ = result_obj.v_.uint64_;
LOG_DEBUG("timestamp_val result", K(tmp_obj), K(result_obj), K(arg.time_point_));
}
}
}
if (OB_SUCC(ret)) {
ObTaskExecutorCtx *task_exec_ctx = nullptr;
obrpc::ObCommonRpcProxy *common_rpc_proxy = NULL;
if (OB_ISNULL(task_exec_ctx = GET_TASK_EXECUTOR_CTX(ctx))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get task executor context failed", K(ret));
} else if (OB_FAIL(task_exec_ctx->get_common_rpc(common_rpc_proxy))) {
LOG_WARN("get common rpc proxy failed", K(ret));
} else if (OB_ISNULL(common_rpc_proxy)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("cmmon rpc proxy should not be null", K(ret));
} else if (OB_FAIL(common_rpc_proxy->flashback_table_to_time_point(arg))) {
LOG_WARN("rpc proxy flashback table failed", K(ret));
}
}
return ret;
}
int ObPurgeTableExecutor::execute(ObExecContext &ctx, ObPurgeTableStmt &stmt)
{
int ret = OB_SUCCESS;
const obrpc::ObPurgeTableArg &purge_table_arg = stmt.get_purge_table_arg();
ObString first_stmt;
if (OB_FAIL(stmt.get_first_stmt(first_stmt))) {
LOG_WARN("get first statement failed", K(ret));
} else {
const_cast<obrpc::ObPurgeTableArg &>(purge_table_arg).ddl_stmt_str_ = first_stmt;
ObTaskExecutorCtx *task_exec_ctx = NULL;
obrpc::ObCommonRpcProxy *common_rpc_proxy = NULL;
if (OB_ISNULL(task_exec_ctx = GET_TASK_EXECUTOR_CTX(ctx))) {
ret = OB_NOT_INIT;
LOG_WARN("get task executor context failed");
} else if (OB_FAIL(task_exec_ctx->get_common_rpc(common_rpc_proxy))) {
LOG_WARN("get common rpc proxy failed", K(ret));
} else if (OB_ISNULL(common_rpc_proxy)){
ret = OB_ERR_UNEXPECTED;
LOG_WARN("common rpc proxy should not be null", K(ret));
} else if (OB_FAIL(common_rpc_proxy->purge_table(purge_table_arg))) {
LOG_WARN("rpc proxy purge table failed", K(ret));
}
}
return ret;
}
int ObOptimizeTableExecutor::execute(ObExecContext &ctx, ObOptimizeTableStmt &stmt)
{
int ret = OB_SUCCESS;
obrpc::ObOptimizeTableArg &arg = stmt.get_optimize_table_arg();
ObString first_stmt;
if (OB_FAIL(stmt.get_first_stmt(first_stmt))) {
LOG_WARN("fail to get first stmt", K(ret));
} else {
arg.ddl_stmt_str_ = first_stmt;
ObTaskExecutorCtx *task_exec_ctx = nullptr;
obrpc::ObCommonRpcProxy *common_rpc_proxy = nullptr;
if (OB_ISNULL(task_exec_ctx = GET_TASK_EXECUTOR_CTX(ctx))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("error unexpected, task executor must not be NULL", K(ret));
} else if (OB_FAIL(task_exec_ctx->get_common_rpc(common_rpc_proxy))) {
LOG_WARN("fail to get common rpc", K(ret));
} else if (OB_ISNULL(common_rpc_proxy)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("error unexpected, common rpc proxy must not be NULL", K(ret));
} else if (OB_FAIL(common_rpc_proxy->optimize_table(arg))) {
LOG_WARN("fail to optimize table", K(ret));
}
}
return ret;
}
int ObOptimizeTenantExecutor::execute(ObExecContext &ctx, ObOptimizeTenantStmt &stmt)
{
int ret = OB_SUCCESS;
obrpc::ObOptimizeTenantArg &arg = stmt.get_optimize_tenant_arg();
ObString first_stmt;
ObSQLSessionInfo *my_session = ctx.get_my_session();
if (OB_FAIL(stmt.get_first_stmt(first_stmt))) {
LOG_WARN("fail to get first stmt", K(ret));
} else if (OB_ISNULL(my_session)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("error unexpected, my session must not be NULL", K(ret));
} else {
arg.ddl_stmt_str_ = first_stmt;
ObTaskExecutorCtx *task_exec_ctx = nullptr;
obrpc::ObCommonRpcProxy *common_rpc_proxy = nullptr;
const observer::ObGlobalContext &gctx = observer::ObServer::get_instance().get_gctx();
const int64_t effective_tenant_id = my_session->get_effective_tenant_id();
if (OB_ISNULL(task_exec_ctx = GET_TASK_EXECUTOR_CTX(ctx))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("error unexpected, task executor must not be NULL", K(ret));
} else if (OB_FAIL(task_exec_ctx->get_common_rpc(common_rpc_proxy))) {
LOG_WARN("fail to get common rpc", K(ret));
} else if (OB_ISNULL(common_rpc_proxy)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("error unexpected, common rpc proxy must not be NULL", K(ret));
} else if (OB_ISNULL(gctx.schema_service_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("error unexpected, schema service must not be NULL", K(ret));
} else if (OB_FAIL(optimize_tenant(arg, effective_tenant_id, *gctx.schema_service_, common_rpc_proxy))) {
LOG_WARN("fail to optimize tenant", K(ret));
}
}
return ret;
}
int ObOptimizeTenantExecutor::optimize_tenant(const obrpc::ObOptimizeTenantArg &arg,
const uint64_t effective_tenant_id, ObMultiVersionSchemaService &schema_service, obrpc::ObCommonRpcProxy *common_rpc_proxy)
{
int ret = OB_SUCCESS;
uint64_t tenant_id = OB_INVALID_ID;
ObSchemaGetterGuard schema_guard;
LOG_INFO("receive optimize tenant request", K(arg));
if (!arg.is_valid() || NULL == common_rpc_proxy) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arguments", K(ret), K(arg), KP(common_rpc_proxy));
} else if (OB_FAIL(schema_service.get_tenant_schema_guard(OB_SYS_TENANT_ID, schema_guard))) {
LOG_WARN("fail to get schema guard", K(ret));
} else if (OB_FAIL(schema_guard.get_tenant_id(arg.tenant_name_, tenant_id))) {
LOG_WARN("fail to get tenant id", K(ret));
} else if (OB_SYS_TENANT_ID != effective_tenant_id && tenant_id != effective_tenant_id) {
ret = OB_OP_NOT_ALLOW;
LOG_WARN("tenant id mismatch", K(tenant_id), K(effective_tenant_id));
} else {
ObArray<const ObTableSchema *> table_schemas;
if (OB_FAIL(schema_service.get_tenant_schema_guard(tenant_id, schema_guard))) {
LOG_WARN("fail to get tenant schema guard", K(ret));
} else if (OB_FAIL(schema_guard.get_table_schemas_in_tenant(tenant_id, table_schemas))) {
LOG_WARN("fail to get table schemas in tenant", K(ret));
} else {
LOG_INFO("optimize tenant, table schema count", K(table_schemas.count()));
for (int64_t i = 0; OB_SUCC(ret) && i < table_schemas.count(); ++i) {
const ObTableSchema *table_schema = table_schemas.at(i);
const ObDatabaseSchema *database_schema = nullptr;
obrpc::ObOptimizeTableArg optimize_table_arg;
if (OB_ISNULL(table_schema)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("error unexpected, table schema must not be NULL", K(ret));
} else if (table_schema->is_index_table() || table_schema->is_vir_table() || table_schema->is_view_table()) {
// do nothing
} else if (OB_FAIL(schema_guard.get_database_schema(tenant_id, table_schema->get_database_id(), database_schema))) {
LOG_WARN("fail to get database schema", K(ret), K(tenant_id));
} else {
obrpc::ObTableItem table_item;
optimize_table_arg.tenant_id_ = tenant_id;
optimize_table_arg.exec_tenant_id_ = tenant_id;
table_item.database_name_ = database_schema->get_database_name();
table_item.table_name_ = table_schema->get_table_name();
if (OB_FAIL(optimize_table_arg.tables_.push_back(table_item))) {
LOG_WARN("fail to push back optimize table arg", K(ret));
} else if (OB_FAIL(common_rpc_proxy->optimize_table(optimize_table_arg))) {
LOG_WARN("fail to optimize table", K(ret));
}
}
}
}
}
return ret;
}
int ObOptimizeAllExecutor::execute(ObExecContext &ctx, ObOptimizeAllStmt &stmt)
{
int ret = OB_SUCCESS;
obrpc::ObOptimizeAllArg &arg = stmt.get_optimize_all_arg();
ObString first_stmt;
if (OB_FAIL(stmt.get_first_stmt(first_stmt))) {
LOG_WARN("fail to get first stmt", K(ret));
} else {
arg.ddl_stmt_str_ = first_stmt;
ObTaskExecutorCtx *task_exec_ctx = nullptr;
obrpc::ObCommonRpcProxy *common_rpc_proxy = nullptr;
ObSchemaGetterGuard schema_guard;
ObArray<uint64_t> tenant_ids;
const observer::ObGlobalContext &gctx = observer::ObServer::get_instance().get_gctx();
ObSQLSessionInfo *my_session = ctx.get_my_session();
if (OB_ISNULL(task_exec_ctx = GET_TASK_EXECUTOR_CTX(ctx))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("error unexpected, task executor must not be NULL", K(ret));
} else if (OB_FAIL(task_exec_ctx->get_common_rpc(common_rpc_proxy))) {
LOG_WARN("fail to get common rpc", K(ret));
} else if (OB_ISNULL(common_rpc_proxy)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("error unexpected, common rpc proxy must not be NULL", K(ret));
} else if (OB_ISNULL(my_session)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("error unexpected, my session must not be NULL", K(ret));
} else if (OB_ISNULL(gctx.schema_service_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("error unexpected, schema service must not be NULL", K(ret));
} else if (OB_FAIL(gctx.schema_service_->get_tenant_schema_guard(OB_SYS_TENANT_ID, schema_guard))) {
LOG_WARN("fail to get tenant schema guard", K(ret));
} else if (OB_FAIL(schema_guard.get_tenant_ids(tenant_ids))) {
LOG_WARN("fail to get tenant ids", K(ret));
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < tenant_ids.count(); ++i) {
obrpc::ObOptimizeTenantArg tenant_arg;
const ObTenantSchema *tenant_schema = nullptr;
if (OB_FAIL(schema_guard.get_tenant_info(tenant_ids.at(i), tenant_schema))) {
LOG_WARN("fail to get tenant name", K(ret));
} else {
tenant_arg.tenant_name_ = tenant_schema->get_tenant_name();
if (OB_FAIL(ObOptimizeTenantExecutor::optimize_tenant(tenant_arg, my_session->get_effective_tenant_id(), *gctx.schema_service_, common_rpc_proxy))) {
LOG_WARN("fail to optimize tenant", K(ret));
}
}
}
}
}
return ret;
}
} //end namespace sql
} //end namespace oceanbase