Merge branch 'master' into fix_new_05

This commit is contained in:
csch
2023-09-06 11:23:56 +08:00
committed by GitHub
422 changed files with 48915 additions and 4032 deletions

View File

@ -45,6 +45,7 @@
#include "rpc/obmysql/ob_sql_sock_session.h"
#include "sql/plan_cache/ob_plan_cache.h"
#include "sql/plan_cache/ob_ps_cache.h"
#include "share/table/ob_ttl_util.h"
namespace oceanbase
{
using namespace common;
@ -158,7 +159,7 @@ int ObFreezeExecutor::execute(ObExecContext &ctx, ObFreezeStmt &stmt)
param.freeze_all_user_ = stmt.is_freeze_all_user();
param.freeze_all_meta_ = stmt.is_freeze_all_meta();
param.transport_ = GCTX.net_frame_->get_req_transport();
for (int64_t i = 0; (i < stmt.get_tenant_ids().count()) && OB_SUCC(ret); ++i) {
for (int64_t i = 0; i < stmt.get_tenant_ids().count() && OB_SUCC(ret); ++i) {
uint64_t tenant_id = stmt.get_tenant_ids().at(i);
if (OB_FAIL(param.add_freeze_info(tenant_id))) {
LOG_WARN("fail to assign", KR(ret), K(tenant_id));
@ -2573,6 +2574,27 @@ int ObCheckpointSlogExecutor::execute(ObExecContext &ctx, ObCheckpointSlogStmt &
return ret;
}
int ObRecoverTableExecutor::execute(ObExecContext &ctx, ObRecoverTableStmt &stmt)
{
int ret = OB_SUCCESS;
ObTaskExecutorCtx *task_exec_ctx = nullptr;
ObCommonRpcProxy *common_proxy = nullptr;
ObAddr server;
if (OB_ISNULL(task_exec_ctx = GET_TASK_EXECUTOR_CTX(ctx))) {
ret = OB_NOT_INIT;
LOG_WARN("get task executor failed");
} else if (OB_ISNULL(common_proxy = task_exec_ctx->get_common_rpc())) {
ret = OB_NOT_INIT;
LOG_WARN("get common rpc proxy failed");
} else if (OB_FAIL(common_proxy->recover_table(stmt.get_rpc_arg()))) {
LOG_WARN("failed to send recover table rpc", K(ret));
} else {
const obrpc::ObRecoverTableArg &recover_table_rpc_arg = stmt.get_rpc_arg();
LOG_INFO("send recover table rpc finish", K(recover_table_rpc_arg));
}
return ret;
}
int ObCancelRestoreExecutor::execute(ObExecContext &ctx, ObCancelRestoreStmt &stmt)
{
int ret = OB_SUCCESS;
@ -2606,6 +2628,44 @@ int ObCancelRestoreExecutor::execute(ObExecContext &ctx, ObCancelRestoreStmt &st
return ret;
}
int ObTableTTLExecutor::execute(ObExecContext& ctx, ObTableTTLStmt& stmt)
{
int ret = OB_SUCCESS;
ObTaskExecutorCtx* task_exec_ctx = GET_TASK_EXECUTOR_CTX(ctx);
obrpc::ObCommonRpcProxy* common_rpc_proxy = NULL;
if (OB_ISNULL(task_exec_ctx)) {
ret = OB_NOT_INIT;
LOG_WARN("get task executor context failed");
} else if (OB_FAIL(task_exec_ctx->get_common_rpc(common_rpc_proxy))) {
LOG_WARN("get common rpc proxy failed", K(ret));
} else if (OB_ISNULL(common_rpc_proxy)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("common_rpc_proxy is null", K(ret));
} else {
FLOG_INFO("ObTableTTLExecutor::execute", K(stmt), K(ctx));
common::ObTTLParam param;
ObSEArray<common::ObSimpleTTLInfo, 32> ttl_info_array;
param.ttl_all_ = stmt.is_ttl_all();
param.transport_ = GCTX.net_frame_->get_req_transport();
param.type_ = stmt.get_type();
for (int64_t i = 0; (i < stmt.get_tenant_ids().count()) && OB_SUCC(ret); i++) {
uint64_t tenant_id = stmt.get_tenant_ids().at(i);
if (OB_FAIL(param.add_ttl_info(tenant_id))) {
LOG_WARN("fail to assign ttl info", KR(ret), K(tenant_id));
}
}
if (OB_FAIL(ret)) {
// do nothing
} else if (OB_UNLIKELY(!param.ttl_all_ && param.ttl_info_array_.empty())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), K(param), KR(ret));
} else if (OB_FAIL(ObTTLUtil::dispatch_ttl_cmd(param))) {
LOG_WARN("fail to dispatch ttl cmd", K(ret), K(param));
}
}
return ret;
}
} // end namespace sql
} // end namespace oceanbase