fix create_hidden_table not use DEFINE_DDL_RS_RPC_PROCESSOR bug
This commit is contained in:
@ -19,158 +19,6 @@ using namespace obrpc;
|
|||||||
namespace observer
|
namespace observer
|
||||||
{
|
{
|
||||||
|
|
||||||
ObTableLoadRedefTable::ObTableLoadRedefTable()
|
|
||||||
: ctx_(nullptr),
|
|
||||||
session_info_(nullptr),
|
|
||||||
dest_table_id_(OB_INVALID_ID),
|
|
||||||
ddl_task_id_(0),
|
|
||||||
schema_version_(0),
|
|
||||||
is_finish_or_abort_called_(false),
|
|
||||||
is_inited_(false)
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
ObTableLoadRedefTable::~ObTableLoadRedefTable()
|
|
||||||
{
|
|
||||||
reset();
|
|
||||||
}
|
|
||||||
|
|
||||||
void ObTableLoadRedefTable::reset()
|
|
||||||
{
|
|
||||||
if (!is_finish_or_abort_called_) {
|
|
||||||
abort(); // 这个必须执行
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
int ObTableLoadRedefTable::init(ObTableLoadTableCtx *ctx, ObSQLSessionInfo *session_info)
|
|
||||||
{
|
|
||||||
int ret = OB_SUCCESS;
|
|
||||||
if (IS_INIT) {
|
|
||||||
ret = OB_INIT_TWICE;
|
|
||||||
LOG_WARN("ObTableLoadRedefTable init twice", KR(ret), KP(this));
|
|
||||||
} else if (OB_ISNULL(ctx) || OB_ISNULL(session_info)) {
|
|
||||||
ret = OB_INVALID_ARGUMENT;
|
|
||||||
LOG_WARN("table_ctx or session_info cannot be null", KR(ret));
|
|
||||||
} else {
|
|
||||||
ctx_ = ctx;
|
|
||||||
session_info_ = session_info;
|
|
||||||
is_inited_ = true;
|
|
||||||
}
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
int ObTableLoadRedefTable::start()
|
|
||||||
{
|
|
||||||
int ret = OB_SUCCESS;
|
|
||||||
if (IS_NOT_INIT) {
|
|
||||||
ret = OB_NOT_INIT;
|
|
||||||
LOG_WARN("ObTableLoadRedefTable not init", KR(ret));
|
|
||||||
} else {
|
|
||||||
const int64_t origin_timeout_ts = THIS_WORKER.get_timeout_ts();
|
|
||||||
ObCommonRpcProxy *common_rpc_proxy = GCTX.rs_rpc_proxy_;
|
|
||||||
ObCreateHiddenTableArg create_table_arg;
|
|
||||||
ObCreateHiddenTableRes create_table_res;
|
|
||||||
create_table_arg.reset();
|
|
||||||
create_table_arg.tenant_id_ = ctx_->param_.tenant_id_;
|
|
||||||
create_table_arg.table_id_ = ctx_->param_.table_id_;
|
|
||||||
create_table_arg.dest_tenant_id_ = ctx_->param_.tenant_id_;
|
|
||||||
create_table_arg.parallelism_ = ctx_->param_.session_count_;
|
|
||||||
create_table_arg.ddl_type_ = share::DDL_DIRECT_LOAD;
|
|
||||||
create_table_arg.session_id_ = session_info_->get_sessid_for_table();
|
|
||||||
create_table_arg.sql_mode_ = session_info_->get_sql_mode();
|
|
||||||
create_table_arg.tz_info_ = session_info_->get_tz_info_wrap().get_tz_info_offset();
|
|
||||||
create_table_arg.nls_formats_[ObNLSFormatEnum::NLS_DATE] = session_info_->get_local_nls_date_format();
|
|
||||||
create_table_arg.nls_formats_[ObNLSFormatEnum::NLS_TIMESTAMP] = session_info_->get_local_nls_timestamp_format();
|
|
||||||
create_table_arg.nls_formats_[ObNLSFormatEnum::NLS_TIMESTAMP_TZ] = session_info_->get_local_nls_timestamp_tz_format();
|
|
||||||
if (OB_FAIL(create_table_arg.tz_info_wrap_.deep_copy(session_info_->get_tz_info_wrap()))) {
|
|
||||||
LOG_WARN("failed to deep copy tz_info_wrap", KR(ret));
|
|
||||||
} else if (OB_FAIL(ObDDLServerClient::create_hidden_table(create_table_arg, create_table_res, *session_info_))) {
|
|
||||||
LOG_WARN("failed to create hidden table", KR(ret), K(create_table_arg));
|
|
||||||
} else {
|
|
||||||
ddl_task_id_ = create_table_res.task_id_;
|
|
||||||
schema_version_ = create_table_res.schema_version_;
|
|
||||||
dest_table_id_ = create_table_res.dest_table_id_;
|
|
||||||
// const_cast<ObTableLoadParam &>(ctx_->param_).target_table_id_ = create_table_res.dest_table_id_;
|
|
||||||
LOG_INFO("succeed to create hidden table", K(create_table_res), K(create_table_res));
|
|
||||||
}
|
|
||||||
THIS_WORKER.set_timeout_ts(origin_timeout_ts);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
int ObTableLoadRedefTable::finish()
|
|
||||||
{
|
|
||||||
int ret = OB_SUCCESS;
|
|
||||||
if (IS_NOT_INIT) {
|
|
||||||
ret = OB_NOT_INIT;
|
|
||||||
LOG_WARN("ObTableLoadRedefTable not init", KR(ret));
|
|
||||||
} else {
|
|
||||||
const int64_t origin_timeout_ts = THIS_WORKER.get_timeout_ts();
|
|
||||||
obrpc::ObCommonRpcProxy *common_rpc_proxy = GCTX.rs_rpc_proxy_;
|
|
||||||
obrpc::ObCopyTableDependentsArg copy_table_dependents_arg;
|
|
||||||
copy_table_dependents_arg.task_id_ = ddl_task_id_;
|
|
||||||
copy_table_dependents_arg.tenant_id_ = ctx_->param_.tenant_id_;
|
|
||||||
copy_table_dependents_arg.copy_indexes_ = true;
|
|
||||||
copy_table_dependents_arg.copy_constraints_ = true;
|
|
||||||
copy_table_dependents_arg.copy_triggers_ = false;
|
|
||||||
copy_table_dependents_arg.ignore_errors_ = false;
|
|
||||||
if (OB_FAIL(ObDDLServerClient::copy_table_dependents(copy_table_dependents_arg))) {
|
|
||||||
LOG_WARN("failed to copy table dependents", KR(ret), K(copy_table_dependents_arg));
|
|
||||||
} else {
|
|
||||||
LOG_INFO("succeed to copy table dependents", K(copy_table_dependents_arg));
|
|
||||||
obrpc::ObFinishRedefTableArg finish_redef_table_arg;
|
|
||||||
finish_redef_table_arg.task_id_ = ddl_task_id_;
|
|
||||||
finish_redef_table_arg.tenant_id_ = ctx_->param_.tenant_id_;
|
|
||||||
|
|
||||||
ObDDLBuildSingleReplicaResponseArg build_single_replica_response_arg;
|
|
||||||
build_single_replica_response_arg.task_id_ = ddl_task_id_;
|
|
||||||
build_single_replica_response_arg.tenant_id_ = ctx_->param_.tenant_id_;
|
|
||||||
build_single_replica_response_arg.ls_id_ = share::ObLSID(1);
|
|
||||||
build_single_replica_response_arg.tablet_id_ = ObTableID(-1);
|
|
||||||
build_single_replica_response_arg.source_table_id_ = ctx_->param_.table_id_;
|
|
||||||
build_single_replica_response_arg.dest_schema_id_ = dest_table_id_;
|
|
||||||
build_single_replica_response_arg.ret_code_ = ret;
|
|
||||||
build_single_replica_response_arg.task_id_ = ddl_task_id_;
|
|
||||||
build_single_replica_response_arg.snapshot_version_ = 1;
|
|
||||||
build_single_replica_response_arg.schema_version_ = schema_version_;
|
|
||||||
build_single_replica_response_arg.execution_id_ = 1;
|
|
||||||
if (OB_FAIL(ObDDLServerClient::finish_redef_table(finish_redef_table_arg, build_single_replica_response_arg, *session_info_))) {
|
|
||||||
LOG_WARN("failed to finish redef table", KR(ret), K(finish_redef_table_arg));
|
|
||||||
} else {
|
|
||||||
is_finish_or_abort_called_ = true;
|
|
||||||
LOG_INFO("succeed to finish redef table", KR(ret), K(finish_redef_table_arg));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
THIS_WORKER.set_timeout_ts(origin_timeout_ts);
|
|
||||||
}
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
int ObTableLoadRedefTable::abort()
|
|
||||||
{
|
|
||||||
int ret = OB_SUCCESS;
|
|
||||||
if (IS_NOT_INIT) {
|
|
||||||
ret = OB_NOT_INIT;
|
|
||||||
LOG_WARN("ObTableLoadRedefTable not init", KR(ret));
|
|
||||||
} else {
|
|
||||||
const int64_t origin_timeout_ts = THIS_WORKER.get_timeout_ts();
|
|
||||||
is_finish_or_abort_called_ = true;
|
|
||||||
obrpc::ObCommonRpcProxy *common_rpc_proxy = GCTX.rs_rpc_proxy_;
|
|
||||||
obrpc::ObAbortRedefTableArg abort_redef_table_arg;
|
|
||||||
abort_redef_table_arg.task_id_ = ddl_task_id_;
|
|
||||||
abort_redef_table_arg.tenant_id_ = ctx_->param_.tenant_id_;
|
|
||||||
if (OB_FAIL(ObDDLServerClient::abort_redef_table(abort_redef_table_arg, *session_info_))) {
|
|
||||||
LOG_WARN("failed to abort redef table", KR(ret), K(abort_redef_table_arg));
|
|
||||||
} else {
|
|
||||||
LOG_INFO("abort_redef_table success", KR(ret));
|
|
||||||
}
|
|
||||||
THIS_WORKER.set_timeout_ts(origin_timeout_ts);
|
|
||||||
}
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
int ObTableLoadRedefTable::start(const ObTableLoadRedefTableStartArg &arg,
|
int ObTableLoadRedefTable::start(const ObTableLoadRedefTableStartArg &arg,
|
||||||
ObTableLoadRedefTableStartRes &res, ObSQLSessionInfo &session_info)
|
ObTableLoadRedefTableStartRes &res, ObSQLSessionInfo &session_info)
|
||||||
{
|
{
|
||||||
@ -183,6 +31,7 @@ int ObTableLoadRedefTable::start(const ObTableLoadRedefTableStartArg &arg,
|
|||||||
ObCreateHiddenTableArg create_table_arg;
|
ObCreateHiddenTableArg create_table_arg;
|
||||||
ObCreateHiddenTableRes create_table_res;
|
ObCreateHiddenTableRes create_table_res;
|
||||||
create_table_arg.reset();
|
create_table_arg.reset();
|
||||||
|
create_table_arg.exec_tenant_id_ = arg.tenant_id_;
|
||||||
create_table_arg.tenant_id_ = arg.tenant_id_;
|
create_table_arg.tenant_id_ = arg.tenant_id_;
|
||||||
create_table_arg.table_id_ = arg.table_id_;
|
create_table_arg.table_id_ = arg.table_id_;
|
||||||
create_table_arg.dest_tenant_id_ = arg.tenant_id_;
|
create_table_arg.dest_tenant_id_ = arg.tenant_id_;
|
||||||
|
|||||||
@ -117,26 +117,10 @@ public:
|
|||||||
class ObTableLoadRedefTable
|
class ObTableLoadRedefTable
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
ObTableLoadRedefTable();
|
|
||||||
~ObTableLoadRedefTable();
|
|
||||||
void reset();
|
|
||||||
int init(ObTableLoadTableCtx *ctx, sql::ObSQLSessionInfo *session_info);
|
|
||||||
int start();
|
|
||||||
int finish();
|
|
||||||
int abort();
|
|
||||||
OB_INLINE int64_t get_ddl_task_id() const { return ddl_task_id_; }
|
|
||||||
static int start(const ObTableLoadRedefTableStartArg &arg, ObTableLoadRedefTableStartRes &res,
|
static int start(const ObTableLoadRedefTableStartArg &arg, ObTableLoadRedefTableStartRes &res,
|
||||||
sql::ObSQLSessionInfo &session_info);
|
sql::ObSQLSessionInfo &session_info);
|
||||||
static int finish(const ObTableLoadRedefTableFinishArg &arg, sql::ObSQLSessionInfo &session_info);
|
static int finish(const ObTableLoadRedefTableFinishArg &arg, sql::ObSQLSessionInfo &session_info);
|
||||||
static int abort(const ObTableLoadRedefTableAbortArg &arg, sql::ObSQLSessionInfo &session_info);
|
static int abort(const ObTableLoadRedefTableAbortArg &arg, sql::ObSQLSessionInfo &session_info);
|
||||||
private:
|
|
||||||
ObTableLoadTableCtx *ctx_;
|
|
||||||
sql::ObSQLSessionInfo *session_info_;
|
|
||||||
uint64_t dest_table_id_;
|
|
||||||
int64_t ddl_task_id_;
|
|
||||||
int64_t schema_version_;
|
|
||||||
bool is_finish_or_abort_called_;
|
|
||||||
bool is_inited_;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace observer
|
} // namespace observer
|
||||||
|
|||||||
@ -313,10 +313,10 @@ DEFINE_RS_RPC_PROCESSOR(obrpc::OB_COPY_TABLE_DEPENDENTS, ObRpcCopyTableDependent
|
|||||||
DEFINE_RS_RPC_PROCESSOR(obrpc::OB_FINISH_REDEF_TABLE, ObRpcFinishRedefTableP, finish_redef_table(arg_));
|
DEFINE_RS_RPC_PROCESSOR(obrpc::OB_FINISH_REDEF_TABLE, ObRpcFinishRedefTableP, finish_redef_table(arg_));
|
||||||
DEFINE_RS_RPC_PROCESSOR(obrpc::OB_ABORT_REDEF_TABLE, ObRpcAbortRedefTableP, abort_redef_table(arg_));
|
DEFINE_RS_RPC_PROCESSOR(obrpc::OB_ABORT_REDEF_TABLE, ObRpcAbortRedefTableP, abort_redef_table(arg_));
|
||||||
DEFINE_RS_RPC_PROCESSOR(obrpc::OB_UPDATE_DDL_TASK_ACTIVE_TIME, ObRpcUpdateDDLTaskActiveTimeP, update_ddl_task_active_time(arg_));
|
DEFINE_RS_RPC_PROCESSOR(obrpc::OB_UPDATE_DDL_TASK_ACTIVE_TIME, ObRpcUpdateDDLTaskActiveTimeP, update_ddl_task_active_time(arg_));
|
||||||
DEFINE_RS_RPC_PROCESSOR(obrpc::OB_CREATE_HIDDEN_TABLE, ObRpcCreateHiddenTableP, create_hidden_table(arg_, result_));
|
|
||||||
DEFINE_RS_RPC_PROCESSOR(obrpc::OB_START_REDEF_TABLE, ObRpcStartRedefTableP, start_redef_table(arg_, result_));
|
DEFINE_RS_RPC_PROCESSOR(obrpc::OB_START_REDEF_TABLE, ObRpcStartRedefTableP, start_redef_table(arg_, result_));
|
||||||
|
|
||||||
// ddl rpc processors
|
// ddl rpc processors
|
||||||
|
DEFINE_DDL_RS_RPC_PROCESSOR(obrpc::OB_CREATE_HIDDEN_TABLE, ObRpcCreateHiddenTableP, create_hidden_table(arg_, result_));
|
||||||
DEFINE_DDL_RS_RPC_PROCESSOR(obrpc::OB_COMMIT_ALTER_TENANT_LOCALITY, ObRpcCommitAlterTenantLocalityP, commit_alter_tenant_locality(arg_));
|
DEFINE_DDL_RS_RPC_PROCESSOR(obrpc::OB_COMMIT_ALTER_TENANT_LOCALITY, ObRpcCommitAlterTenantLocalityP, commit_alter_tenant_locality(arg_));
|
||||||
DEFINE_DDL_RS_RPC_PROCESSOR(obrpc::OB_CREATE_TENANT, ObRpcCreateTenantP, create_tenant(arg_, result_));
|
DEFINE_DDL_RS_RPC_PROCESSOR(obrpc::OB_CREATE_TENANT, ObRpcCreateTenantP, create_tenant(arg_, result_));
|
||||||
DEFINE_DDL_RS_RPC_PROCESSOR(obrpc::OB_CREATE_TENANT_END, ObRpcCreateTenantEndP, create_tenant_end(arg_));
|
DEFINE_DDL_RS_RPC_PROCESSOR(obrpc::OB_CREATE_TENANT_END, ObRpcCreateTenantEndP, create_tenant_end(arg_));
|
||||||
|
|||||||
@ -516,6 +516,7 @@ int ObUpdateDDLTaskActiveTimeArg::assign(const ObUpdateDDLTaskActiveTimeArg &arg
|
|||||||
bool ObCreateHiddenTableArg::is_valid() const
|
bool ObCreateHiddenTableArg::is_valid() const
|
||||||
{
|
{
|
||||||
return (OB_INVALID_ID != tenant_id_
|
return (OB_INVALID_ID != tenant_id_
|
||||||
|
&& OB_INVALID_TENANT_ID != exec_tenant_id_
|
||||||
&& OB_INVALID_ID != table_id_
|
&& OB_INVALID_ID != table_id_
|
||||||
&& OB_INVALID_ID != dest_tenant_id_
|
&& OB_INVALID_ID != dest_tenant_id_
|
||||||
&& share::DDL_INVALID != ddl_type_);
|
&& share::DDL_INVALID != ddl_type_);
|
||||||
@ -551,6 +552,8 @@ OB_DEF_SERIALIZE(ObCreateHiddenTableArg)
|
|||||||
if (!is_valid()) {
|
if (!is_valid()) {
|
||||||
ret = OB_INVALID_ARGUMENT;
|
ret = OB_INVALID_ARGUMENT;
|
||||||
LOG_WARN("invalid argument", K(ret));
|
LOG_WARN("invalid argument", K(ret));
|
||||||
|
} else if (OB_FAIL(ObDDLArg::serialize(buf, buf_len, pos))) {
|
||||||
|
LOG_WARN("fail to serialize DDLArg", K(ret), K(buf_len), K(pos));
|
||||||
} else {
|
} else {
|
||||||
LST_DO_CODE(OB_UNIS_ENCODE,
|
LST_DO_CODE(OB_UNIS_ENCODE,
|
||||||
tenant_id_,
|
tenant_id_,
|
||||||
@ -576,7 +579,10 @@ OB_DEF_SERIALIZE(ObCreateHiddenTableArg)
|
|||||||
OB_DEF_DESERIALIZE(ObCreateHiddenTableArg)
|
OB_DEF_DESERIALIZE(ObCreateHiddenTableArg)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
LST_DO_CODE(OB_UNIS_DECODE,
|
if (OB_FAIL(ObDDLArg::deserialize(buf, data_len, pos))) {
|
||||||
|
LOG_WARN("fail to deserialize DDLArg", K(ret), K(data_len), K(pos));
|
||||||
|
} else {
|
||||||
|
LST_DO_CODE(OB_UNIS_DECODE,
|
||||||
tenant_id_,
|
tenant_id_,
|
||||||
table_id_,
|
table_id_,
|
||||||
dest_tenant_id_,
|
dest_tenant_id_,
|
||||||
@ -586,7 +592,6 @@ OB_DEF_DESERIALIZE(ObCreateHiddenTableArg)
|
|||||||
sql_mode_,
|
sql_mode_,
|
||||||
tz_info_,
|
tz_info_,
|
||||||
tz_info_wrap_);
|
tz_info_wrap_);
|
||||||
if (OB_SUCC(ret)) {
|
|
||||||
ObString tmp_string;
|
ObString tmp_string;
|
||||||
char *tmp_ptr[ObNLSFormatEnum::NLS_MAX] = {};
|
char *tmp_ptr[ObNLSFormatEnum::NLS_MAX] = {};
|
||||||
for (int64_t i = 0; OB_SUCC(ret) && i < ObNLSFormatEnum::NLS_MAX; i++) {
|
for (int64_t i = 0; OB_SUCC(ret) && i < ObNLSFormatEnum::NLS_MAX; i++) {
|
||||||
@ -618,6 +623,7 @@ OB_DEF_SERIALIZE_SIZE(ObCreateHiddenTableArg)
|
|||||||
ret = OB_INVALID_ARGUMENT;
|
ret = OB_INVALID_ARGUMENT;
|
||||||
LOG_WARN("invalid argument", K(ret));
|
LOG_WARN("invalid argument", K(ret));
|
||||||
} else {
|
} else {
|
||||||
|
len += ObDDLArg::get_serialize_size();
|
||||||
LST_DO_CODE(OB_UNIS_ADD_LEN,
|
LST_DO_CODE(OB_UNIS_ADD_LEN,
|
||||||
tenant_id_,
|
tenant_id_,
|
||||||
table_id_,
|
table_id_,
|
||||||
|
|||||||
@ -1736,11 +1736,12 @@ public:
|
|||||||
uint64_t tenant_id_;
|
uint64_t tenant_id_;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct ObCreateHiddenTableArg final
|
struct ObCreateHiddenTableArg : public ObDDLArg
|
||||||
{
|
{
|
||||||
OB_UNIS_VERSION(1);
|
OB_UNIS_VERSION(1);
|
||||||
public:
|
public:
|
||||||
TO_STRING_KV(K_(tenant_id),
|
TO_STRING_KV(K_(exec_tenant_id),
|
||||||
|
K_(tenant_id),
|
||||||
K_(table_id),
|
K_(table_id),
|
||||||
K_(dest_tenant_id),
|
K_(dest_tenant_id),
|
||||||
K_(session_id),
|
K_(session_id),
|
||||||
@ -1751,6 +1752,7 @@ public:
|
|||||||
K_(tz_info_wrap),
|
K_(tz_info_wrap),
|
||||||
"nls_formats", common::ObArrayWrap<common::ObString>(nls_formats_, common::ObNLSFormatEnum::NLS_MAX));
|
"nls_formats", common::ObArrayWrap<common::ObString>(nls_formats_, common::ObNLSFormatEnum::NLS_MAX));
|
||||||
ObCreateHiddenTableArg() :
|
ObCreateHiddenTableArg() :
|
||||||
|
ObDDLArg(),
|
||||||
tenant_id_(common::OB_INVALID_ID),
|
tenant_id_(common::OB_INVALID_ID),
|
||||||
table_id_(common::OB_INVALID_ID),
|
table_id_(common::OB_INVALID_ID),
|
||||||
dest_tenant_id_(common::OB_INVALID_ID),
|
dest_tenant_id_(common::OB_INVALID_ID),
|
||||||
|
|||||||
Reference in New Issue
Block a user