[FEAT MERGE] merge recover table

Co-authored-by: hamstersox <673144759@qq.com>
Co-authored-by: skylhd <dickylhd@gmail.com>
Co-authored-by: HaHaJeff <jeffzhouhhh@gmail.com>
This commit is contained in:
wxhwang
2023-09-05 06:47:00 +00:00
committed by ob-robot
parent bed8398a6b
commit f4bf5f41c9
147 changed files with 31068 additions and 1341 deletions

View File

@ -2574,6 +2574,27 @@ int ObCheckpointSlogExecutor::execute(ObExecContext &ctx, ObCheckpointSlogStmt &
return ret;
}
int ObRecoverTableExecutor::execute(ObExecContext &ctx, ObRecoverTableStmt &stmt)
{
int ret = OB_SUCCESS;
ObTaskExecutorCtx *task_exec_ctx = nullptr;
ObCommonRpcProxy *common_proxy = nullptr;
ObAddr server;
if (OB_ISNULL(task_exec_ctx = GET_TASK_EXECUTOR_CTX(ctx))) {
ret = OB_NOT_INIT;
LOG_WARN("get task executor failed");
} else if (OB_ISNULL(common_proxy = task_exec_ctx->get_common_rpc())) {
ret = OB_NOT_INIT;
LOG_WARN("get common rpc proxy failed");
} else if (OB_FAIL(common_proxy->recover_table(stmt.get_rpc_arg()))) {
LOG_WARN("failed to send recover table rpc", K(ret));
} else {
const obrpc::ObRecoverTableArg &recover_table_rpc_arg = stmt.get_rpc_arg();
LOG_INFO("send recover table rpc finish", K(recover_table_rpc_arg));
}
return ret;
}
int ObCancelRestoreExecutor::execute(ObExecContext &ctx, ObCancelRestoreStmt &stmt)
{
int ret = OB_SUCCESS;

View File

@ -129,6 +129,7 @@ DEF_SIMPLE_EXECUTOR(ObBackupSetEncryption);
DEF_SIMPLE_EXECUTOR(ObBackupSetDecryption);
DEF_SIMPLE_EXECUTOR(ObAddRestoreSource);
DEF_SIMPLE_EXECUTOR(ObClearRestoreSource);
DEF_SIMPLE_EXECUTOR(ObRecoverTable);
DEF_SIMPLE_EXECUTOR(ObTableTTL);
DEF_SIMPLE_EXECUTOR(ObSetRegionBandwidth);

View File

@ -47,7 +47,7 @@ int ObDDLExecutorUtil::handle_session_exception(ObSQLSessionInfo &session)
int ObDDLExecutorUtil::wait_ddl_finish(
const uint64_t tenant_id,
const int64_t task_id,
ObSQLSessionInfo &session,
ObSQLSessionInfo *session,
obrpc::ObCommonRpcProxy *common_rpc_proxy,
const bool is_support_cancel)
{
@ -93,7 +93,7 @@ int ObDDLExecutorUtil::wait_ddl_finish(
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(handle_session_exception(session))) {
} else if (nullptr != session && OB_FAIL(handle_session_exception(*session))) {
LOG_WARN("session exeception happened", K(ret), K(is_support_cancel));
if (is_support_cancel && OB_TMP_FAIL(cancel_ddl_task(tenant_id, common_rpc_proxy))) {
LOG_WARN("cancel ddl task failed", K(tmp_ret));

View File

@ -55,7 +55,7 @@ public:
static int wait_ddl_finish(
const uint64_t tenant_id,
const int64_t task_id,
ObSQLSessionInfo &session,
ObSQLSessionInfo *session,
obrpc::ObCommonRpcProxy *common_rpc_proxy,
const bool is_support_cancel = true);
static int wait_ddl_retry_task_finish(

View File

@ -98,7 +98,7 @@ int ObCreateIndexExecutor::execute(ObExecContext &ctx, ObCreateIndexStmt &stmt)
ret = OB_ERR_ADD_INDEX;
LOG_WARN("index table id is invalid", KR(ret));
}
} else if (OB_FAIL(ObDDLExecutorUtil::wait_ddl_finish(create_index_arg.tenant_id_, res.task_id_, *my_session, common_rpc_proxy))) {
} else if (OB_FAIL(ObDDLExecutorUtil::wait_ddl_finish(create_index_arg.tenant_id_, res.task_id_, my_session, common_rpc_proxy))) {
LOG_WARN("failed to wait ddl finish", K(ret));
}
}

View File

@ -83,19 +83,12 @@ int ObPhysicalRestoreTenantExecutor::execute(
LOG_WARN("failed to remove user variable", KR(tmp_ret));
}
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(sync_wait_tenant_created_(ctx, restore_tenant_arg.tenant_name_, job_id))) {
LOG_WARN("failed to sync wait tenant created", K(ret));
}
} else {
// TODO(chongrong.th): fix restore preview in 4.3
ret = OB_NOT_SUPPORTED;
LOG_WARN("restore preview is not support now", K(ret));
// if (OB_FAIL(physical_restore_preview(ctx, stmt))) {
// LOG_WARN("failed to do physical restore preview", K(ret));
// }
} else if (OB_FAIL(physical_restore_preview(ctx, stmt))) {
LOG_WARN("failed to do physical restore preview", K(ret));
}
}
return ret;
@ -189,6 +182,7 @@ int ObPhysicalRestoreTenantExecutor::physical_restore_preview(
{
int ret = OB_SUCCESS;
ObSqlString set_backup_dest_sql;
ObSqlString set_scn_sql;
ObSqlString set_timestamp_sql;
sqlclient::ObISQLConnection *conn = NULL;
observer::ObInnerSQLConnectionPool *pool = NULL;
@ -196,7 +190,6 @@ int ObPhysicalRestoreTenantExecutor::physical_restore_preview(
ObSQLSessionInfo *session_info = ctx.get_my_session();
const obrpc::ObPhysicalRestoreTenantArg &restore_tenant_arg = stmt.get_rpc_arg();
int64_t affected_rows = 0;
if (OB_ISNULL(session_info)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", K(ret), KP(session_info));
@ -212,18 +205,21 @@ int ObPhysicalRestoreTenantExecutor::physical_restore_preview(
} else if (OB_FAIL(set_backup_dest_sql.assign_fmt("set @%s = '%.*s'",
OB_RESTORE_PREVIEW_BACKUP_DEST_SESSION_STR, restore_tenant_arg.uri_.length(), restore_tenant_arg.uri_.ptr()))) {
LOG_WARN("failed to set backup dest", KR(ret), K(set_backup_dest_sql));
} else if (OB_FAIL(set_timestamp_sql.assign_fmt("set @%s = '%lu'",
OB_RESTORE_PREVIEW_TIMESTAMP_SESSION_STR, restore_tenant_arg.restore_scn_.get_val_for_inner_table_field()))) {
LOG_WARN("failed to set timestamp", KR(ret), K(set_timestamp_sql));
} else if (OB_FAIL(set_scn_sql.assign_fmt("set @%s = '%ld'",
OB_RESTORE_PREVIEW_SCN_SESSION_STR, restore_tenant_arg.with_restore_scn_ ? restore_tenant_arg.restore_scn_.get_val_for_inner_table_field() : 0))) {
LOG_WARN("failed to set timestamp", KR(ret), K(set_scn_sql));
} else if (OB_FAIL(set_scn_sql.assign_fmt("set @%s = '%.*s'",
OB_RESTORE_PREVIEW_TIMESTAMP_SESSION_STR, restore_tenant_arg.restore_timestamp_.length(), restore_tenant_arg.uri_.ptr()))) {
LOG_WARN("failed to set timestamp", KR(ret), K(set_scn_sql));
} else if (OB_FAIL(pool->acquire(session_info, conn))) {
LOG_WARN("failed to get conn", K(ret));
} else if (OB_FAIL(conn->execute_write(session_info->get_effective_tenant_id(),
set_backup_dest_sql.ptr(), affected_rows))) {
LOG_WARN("failed to set backup dest", K(ret), K(set_backup_dest_sql));
} else if (OB_FAIL(conn->execute_write(session_info->get_effective_tenant_id(),
set_timestamp_sql.ptr(), affected_rows))) {
LOG_WARN("failed to set restore timestamp", K(ret), K(set_timestamp_sql));
}
set_scn_sql.ptr(), affected_rows))) {
LOG_WARN("failed to set restore timestamp", K(ret), K(set_scn_sql));
}
return ret;
}

View File

@ -752,7 +752,7 @@ int ObAlterTableExecutor::alter_table_rpc_v2(
ObIArray<obrpc::ObDDLRes> &ddl_ress = res.ddl_res_array_;
for (int64_t i = 0; OB_SUCC(ret) && i < ddl_ress.count(); ++i) {
ObDDLRes &ddl_res = ddl_ress.at(i);
if (OB_FAIL(ObDDLExecutorUtil::wait_ddl_finish(ddl_res.tenant_id_, ddl_res.task_id_, *my_session, common_rpc_proxy, is_support_cancel))) {
if (OB_FAIL(ObDDLExecutorUtil::wait_ddl_finish(ddl_res.tenant_id_, ddl_res.task_id_, my_session, common_rpc_proxy, is_support_cancel))) {
LOG_WARN("wait drop index finish", K(ret));
}
}
@ -1255,7 +1255,7 @@ int ObAlterTableExecutor::execute(ObExecContext &ctx, ObAlterTableStmt &stmt)
// do nothing, don't check if data is valid
} else if (OB_FAIL(refresh_schema_for_table(tenant_id))) {
LOG_WARN("refresh_schema_for_table failed", K(ret));
} else if (OB_FAIL(ObDDLExecutorUtil::wait_ddl_finish(tenant_id, res.task_id_, *my_session, common_rpc_proxy))) {
} else if (OB_FAIL(ObDDLExecutorUtil::wait_ddl_finish(tenant_id, res.task_id_, my_session, common_rpc_proxy))) {
LOG_WARN("wait check constraint finish", K(ret));
}
}
@ -1266,7 +1266,7 @@ int ObAlterTableExecutor::execute(ObExecContext &ctx, ObAlterTableStmt &stmt)
} else {
if (OB_FAIL(refresh_schema_for_table(tenant_id))) {
LOG_WARN("refresh_schema_for_table failed", K(ret));
} else if (OB_FAIL(ObDDLExecutorUtil::wait_ddl_finish(tenant_id, res.task_id_, *my_session, common_rpc_proxy))) {
} else if (OB_FAIL(ObDDLExecutorUtil::wait_ddl_finish(tenant_id, res.task_id_, my_session, common_rpc_proxy))) {
LOG_WARN("wait fk constraint finish", K(ret));
}
}
@ -1279,8 +1279,7 @@ int ObAlterTableExecutor::execute(ObExecContext &ctx, ObAlterTableStmt &stmt)
int64_t affected_rows = 0;
if (OB_FAIL(refresh_schema_for_table(alter_table_arg.exec_tenant_id_))) {
LOG_WARN("refresh_schema_for_table failed", K(ret));
} else if (OB_FAIL(ObDDLExecutorUtil::wait_ddl_finish(tenant_id, res.task_id_,
*my_session, common_rpc_proxy))) {
} else if (OB_FAIL(ObDDLExecutorUtil::wait_ddl_finish(tenant_id, res.task_id_, my_session, common_rpc_proxy))) {
LOG_WARN("fail to wait ddl finish", K(ret), K(tenant_id), K(res));
}
}

View File

@ -21,6 +21,7 @@
#include "lib/profile/ob_perf_event.h"
#include "lib/geo/ob_s2adapter.h"
#include "lib/geo/ob_geo_utils.h"
#include "share/ob_ddl_common.h"
#include "share/ob_ddl_checksum.h"
#include "storage/access/ob_table_scan_iterator.h"
#include "observer/ob_server_struct.h"
@ -2596,7 +2597,7 @@ int ObTableScanOp::add_ddl_column_checksum()
// } else if (OB_FAIL(corrupt_obj(store_datum))) {
// LOG_WARN("failed to corrupt obj", K(ret));
#endif
} else if (col_need_reshape_[i] && OB_FAIL(reshape_ddl_column_obj(store_datum, e->obj_meta_))) {
} else if (col_need_reshape_[i] && OB_FAIL(ObDDLUtil::reshape_ddl_column_obj(store_datum, e->obj_meta_))) {
LOG_WARN("reshape ddl column obj failed", K(ret));
} else {
column_checksum_[i] += store_datum.checksum(0);
@ -2642,7 +2643,7 @@ int ObTableScanOp::add_ddl_column_checksum_batch(const int64_t row_count)
// } else if (OB_FAIL(corrupt_obj(store_datum))) {
// LOG_WARN("failed to corrupt obj", K(ret));
#endif
} else if (col_need_reshape_[i] && OB_FAIL(reshape_ddl_column_obj(store_datum, e->obj_meta_))) {
} else if (col_need_reshape_[i] && OB_FAIL(ObDDLUtil::reshape_ddl_column_obj(store_datum, e->obj_meta_))) {
LOG_WARN("reshape ddl column obj failed", K(ret));
} else {
column_checksum_[i] += store_datum.checksum(0);
@ -2661,39 +2662,6 @@ int ObTableScanOp::add_ddl_column_checksum_batch(const int64_t row_count)
return ret;
}
int ObTableScanOp::reshape_ddl_column_obj(ObDatum &datum, const ObObjMeta &obj_meta)
{
int ret = OB_SUCCESS;
if (datum.is_null()) {
// do not need to reshape
} else if (obj_meta.is_lob_storage()) {
ObLobLocatorV2 lob(datum.get_string(), obj_meta.has_lob_header());
ObString disk_loc;
if (!lob.is_valid()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid lob locator", K(ret));
} else if (!lob.is_lob_disk_locator() && !lob.is_persist_lob()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid lob locator, should be persist lob", K(ret), K(lob));
} else if (OB_FAIL(lob.get_disk_locator(disk_loc))) {
LOG_WARN("get disk locator failed", K(ret), K(lob));
}
if (OB_SUCC(ret)) {
datum.set_string(disk_loc);
}
} else if (OB_UNLIKELY(!obj_meta.is_fixed_len_char_type())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("no need to reshape non-char", K(ret));
} else {
const char *ptr = datum.ptr_;
int32_t len = datum.len_;
int32_t trunc_len_byte = static_cast<int32_t>(ObCharset::strlen_byte_no_sp(
obj_meta.get_collation_type(), ptr, len));
datum.set_string(ObString(trunc_len_byte, ptr));
}
return ret;
}
int ObTableScanOp::report_ddl_column_checksum()
{
int ret = OB_SUCCESS;

View File

@ -466,7 +466,6 @@ protected:
int add_ddl_column_checksum();
int add_ddl_column_checksum_batch(const int64_t row_count);
static int corrupt_obj(ObObj &obj);
int reshape_ddl_column_obj(common::ObDatum &datum, const ObObjMeta &obj_meta);
int report_ddl_column_checksum();
int get_next_batch_with_das(int64_t &count, int64_t capacity);
void replace_bnlj_param(int64_t batch_idx);