[CP] get publish schema return value and broadcast consensus version

This commit is contained in:
obdev
2023-05-29 13:46:48 +00:00
committed by ob-robot
parent 253a958f51
commit 17c75534d0
6 changed files with 69 additions and 24 deletions

View File

@ -24517,7 +24517,7 @@ int ObDDLService::alter_tablegroup(const ObAlterTablegroupArg &arg)
return ret; return ret;
} }
int ObDDLService::refresh_schema(uint64_t tenant_id) int ObDDLService::refresh_schema(uint64_t tenant_id, int64_t *publish_schema_version /*NULL*/)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
int64_t refresh_count = 0; int64_t refresh_count = 0;
@ -24573,6 +24573,8 @@ int ObDDLService::refresh_schema(uint64_t tenant_id)
LOG_WARN("increase sequence_id failed", K(ret)); LOG_WARN("increase sequence_id failed", K(ret));
} else if (OB_FAIL(schema_service->set_refresh_schema_info(schema_info))) { } else if (OB_FAIL(schema_service->set_refresh_schema_info(schema_info))) {
LOG_WARN("fail to set refresh schema info", K(ret), K(schema_info)); LOG_WARN("fail to set refresh schema info", K(ret), K(schema_info));
} else if (OB_NOT_NULL(publish_schema_version)) {
*publish_schema_version = schema_version;
} }
} }
} }
@ -24642,14 +24644,16 @@ int ObDDLService::notify_refresh_schema(const ObAddrIArray &addrs)
} }
if (found) { if (found) {
// refresh schema sync and report error // refresh schema sync and report error
timeout = THIS_WORKER.get_timeout_remain(); if (OB_FAIL(ObShareUtil::get_ctx_timeout(rpc_timeout, timeout))) {
LOG_WARN("fail to get timeout", KR(ret));
}
} else { } else {
// refresh schema async and ignore error // refresh schema async and ignore error
timeout = std::min(THIS_WORKER.get_timeout_remain(), rpc_timeout); timeout = std::min(THIS_WORKER.get_timeout_remain(), rpc_timeout);
} }
arg.force_refresh_ = found; arg.force_refresh_ = found;
// overwrite ret // overwrite ret
if (OB_FAIL(proxy.call(*s, timeout, arg))) { if (FAILEDx(proxy.call(*s, timeout, arg))) {
LOG_WARN("send switch schema rpc failed", KR(ret), LOG_WARN("send switch schema rpc failed", KR(ret),
K(timeout), K(schema_version), K(schema_info), K(arg), "server", *s); K(timeout), K(schema_version), K(schema_info), K(arg), "server", *s);
if (!found) { // ignore servers that are not in addrs if (!found) { // ignore servers that are not in addrs
@ -24710,6 +24714,26 @@ int ObDDLService::publish_schema(uint64_t tenant_id,
return ret; return ret;
} }
int ObDDLService::publish_schema_and_get_schema_version(uint64_t tenant_id,
const ObAddrIArray &addrs,
int64_t *schema_version /*NULL*/)
{
int ret = OB_SUCCESS;
ObTimeoutCtx ctx;
const int64_t TIMEOUT = 10L * 1000L * 1000L; // 10s
if (OB_FAIL(check_inner_stat())) {
LOG_WARN("variable is not init", KR(ret));
} else if (OB_FAIL(refresh_schema(tenant_id, schema_version))) {
LOG_WARN("refresh schema failed", KR(ret));
} else if (OB_FAIL(ObShareUtil::set_default_timeout_ctx(ctx, TIMEOUT))) {// 10s for notify_refresh_schema
LOG_WARN("fail to set default timeout ctx", KR(ret));
} else if (OB_FAIL(notify_refresh_schema(addrs))) {
LOG_WARN("notify refresh schema failed", KR(ret));
}
return ret;
}
int ObDDLService::check_create_tenant_schema( int ObDDLService::check_create_tenant_schema(
const ObIArray<ObString> &pool_list, const ObIArray<ObString> &pool_list,
ObTenantSchema &tenant_schema, ObTenantSchema &tenant_schema,

View File

@ -927,7 +927,7 @@ int check_table_udt_id_is_exist(share::schema::ObSchemaGetterGuard &schema_guard
//----End of functions for managing row level security---- //----End of functions for managing row level security----
// refresh local schema busy wait // refresh local schema busy wait
virtual int refresh_schema(const uint64_t tenant_id); virtual int refresh_schema(const uint64_t tenant_id, int64_t *schema_version = NULL);
// notify other servers to refresh schema (call switch_schema rpc) // notify other servers to refresh schema (call switch_schema rpc)
virtual int notify_refresh_schema(const common::ObAddrIArray &addrs); virtual int notify_refresh_schema(const common::ObAddrIArray &addrs);
@ -991,6 +991,10 @@ int check_table_udt_id_is_exist(share::schema::ObSchemaGetterGuard &schema_guard
virtual int publish_schema(const uint64_t tenant_id); virtual int publish_schema(const uint64_t tenant_id);
virtual int publish_schema_and_get_schema_version(const uint64_t tenant_id,
const common::ObAddrIArray &addrs,
int64_t *schema_version = NULL);
int force_set_locality( int force_set_locality(
share::schema::ObSchemaGetterGuard &schema_guard, share::schema::ObSchemaGetterGuard &schema_guard,
share::schema::ObTenantSchema &new_tenant); share::schema::ObTenantSchema &new_tenant);

View File

@ -69,6 +69,18 @@ int ObShareUtil::get_abs_timeout(const int64_t default_timeout, int64_t &abs_tim
return ret; return ret;
} }
int ObShareUtil::get_ctx_timeout(const int64_t default_timeout, int64_t &timeout)
{
int ret = OB_SUCCESS;
ObTimeoutCtx ctx;
if (OB_FAIL(ObShareUtil::set_default_timeout_ctx(ctx, default_timeout))) {
LOG_WARN("fail to set default timeout ctx", KR(ret), K(default_timeout));
} else {
timeout = ctx.get_timeout();
}
return ret;
}
int ObShareUtil::check_compat_version_for_arbitration_service( int ObShareUtil::check_compat_version_for_arbitration_service(
const uint64_t tenant_id, const uint64_t tenant_id,
bool &is_compatible) bool &is_compatible)

View File

@ -29,6 +29,7 @@ public:
static int set_default_timeout_ctx(common::ObTimeoutCtx &ctx, const int64_t default_timeout); static int set_default_timeout_ctx(common::ObTimeoutCtx &ctx, const int64_t default_timeout);
// priority to get timeout: ctx > worker > default_timeout // priority to get timeout: ctx > worker > default_timeout
static int get_abs_timeout(const int64_t default_timeout, int64_t &abs_timeout); static int get_abs_timeout(const int64_t default_timeout, int64_t &abs_timeout);
static int get_ctx_timeout(const int64_t default_timeout, int64_t &timeout);
// data version must up to 4.1 with arbitration service // data version must up to 4.1 with arbitration service
// params[in] tenant_id, which tenant to check // params[in] tenant_id, which tenant to check
// params[out] is_compatible, whether it is up to 4.1 // params[out] is_compatible, whether it is up to 4.1

View File

@ -84,15 +84,22 @@ void ObDDLTransController::run1()
} else { } else {
// ignore ret continue // ignore ret continue
for (int64_t i = 0; i < tenant_ids.count(); i++) { for (int64_t i = 0; i < tenant_ids.count(); i++) {
ObZone zone;
ObArray<ObAddr> server_list;
uint64_t tenant_id = tenant_ids.at(i);
int64_t schema_version = OB_INVALID_VERSION;
int64_t start_time = ObTimeUtility::current_time(); int64_t start_time = ObTimeUtility::current_time();
ObCurTraceId::init(GCONF.self_addr_); ObCurTraceId::init(GCONF.self_addr_);
if (OB_FAIL(GCTX.root_service_->get_ddl_service().publish_schema(tenant_ids.at(i)))) {
LOG_WARN("refresh_schema fail", KR(ret), K(tenant_ids.at(i))); if (OB_FAIL(GCTX.root_service_->get_ddl_service().get_unit_manager().get_tenant_unit_servers(tenant_id, zone, server_list))) {
} else if (OB_FAIL(broadcast_consensus_version(tenant_ids.at(i)))) { LOG_WARN("get alive server failed", KR(ret));
LOG_WARN("fail to broadcast consensus version", KR(ret), K(tenant_ids.at(i))); } else if (OB_FAIL(GCTX.root_service_->get_ddl_service().publish_schema_and_get_schema_version(tenant_id, server_list, &schema_version))) {
LOG_WARN("fail to publish_schema", KR(ret), K(tenant_id));
} else if (OB_FAIL(broadcast_consensus_version(tenant_id, schema_version, server_list))) {
LOG_WARN("fail to broadcast consensus version", KR(ret), K(tenant_id), K(schema_version));
} else { } else {
int64_t end_time = ObTimeUtility::current_time(); int64_t end_time = ObTimeUtility::current_time();
LOG_INFO("refresh_schema", KR(ret), K(tenant_ids.at(i)), K(end_time - start_time)); LOG_INFO("refresh_schema", KR(ret), K(tenant_id), K(end_time - start_time));
} }
} }
} }
@ -103,13 +110,11 @@ void ObDDLTransController::run1()
} }
} }
int ObDDLTransController::broadcast_consensus_version(const int64_t tenant_id) int ObDDLTransController::broadcast_consensus_version(const int64_t tenant_id,
const int64_t schema_version,
const ObArray<ObAddr> &server_list)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
ObZone zone;
ObTimeoutCtx ctx;
ObArray<ObAddr> server_list;
int64_t schema_version = OB_INVALID_VERSION;
obrpc::ObBroadcastConsensusVersionArg arg; obrpc::ObBroadcastConsensusVersionArg arg;
rootserver::ObBroadcstConsensusVersionProxy proxy(*GCTX.srv_rpc_proxy_, &obrpc::ObSrvRpcProxy::broadcast_consensus_version); rootserver::ObBroadcstConsensusVersionProxy proxy(*GCTX.srv_rpc_proxy_, &obrpc::ObSrvRpcProxy::broadcast_consensus_version);
if (!inited_) { if (!inited_) {
@ -124,25 +129,22 @@ int ObDDLTransController::broadcast_consensus_version(const int64_t tenant_id)
} else if (OB_ISNULL(GCTX.root_service_)) { } else if (OB_ISNULL(GCTX.root_service_)) {
ret = OB_ERR_UNEXPECTED; ret = OB_ERR_UNEXPECTED;
LOG_WARN("rootservice is null", KR(ret)); LOG_WARN("rootservice is null", KR(ret));
} else if (OB_FAIL(GCTX.root_service_->get_ddl_service().get_unit_manager().get_tenant_unit_servers(tenant_id, zone, server_list))) { } else if (OB_INVALID_VERSION == schema_version) {
LOG_WARN("get alive server failed", KR(ret)); ret = OB_INVALID_ARGUMENT;
} else if (OB_FAIL(schema_service_->get_tenant_refreshed_schema_version( LOG_WARN("invalid schema_version", KR(ret), K(schema_version));
tenant_id, schema_version))) {
LOG_WARN("fail to get tenant refreshed schema version", KR(ret), K(tenant_id));
} else if (OB_FAIL(ObShareUtil::set_default_timeout_ctx(ctx, GCONF.rpc_timeout))) {
LOG_WARN("fail to set default timeout ctx", KR(ret));
} else { } else {
arg.set_tenant_id(tenant_id); arg.set_tenant_id(tenant_id);
arg.set_consensus_version(schema_version); arg.set_consensus_version(schema_version);
const int64_t rpc_timeout = GCONF.rpc_timeout;
FOREACH_X(s, server_list, OB_SUCC(ret)) { FOREACH_X(s, server_list, OB_SUCC(ret)) {
if (OB_ISNULL(s)) { if (OB_ISNULL(s)) {
ret = OB_ERR_UNEXPECTED; ret = OB_ERR_UNEXPECTED;
LOG_WARN("s is null", KR(ret)); LOG_WARN("s is null", KR(ret));
} else { } else {
// overwrite ret // overwrite ret
if (OB_FAIL(proxy.call(*s, ctx.get_timeout(), arg))) { if (OB_FAIL(proxy.call(*s, rpc_timeout, arg))) {
LOG_WARN("send broadcast consensus version rpc failed", KR(ret), LOG_WARN("send broadcast consensus version rpc failed", KR(ret),
K(ctx.get_timeout()), K(schema_version), K(arg), "server", *s); K(rpc_timeout), K(schema_version), K(arg), "server", *s);
ret = OB_SUCCESS; ret = OB_SUCCESS;
} }
} }

View File

@ -52,7 +52,9 @@ public:
int remove_task(int64_t task_id); int remove_task(int64_t task_id);
int check_enable_ddl_trans_new_lock(int64_t tenant_id, bool &res); int check_enable_ddl_trans_new_lock(int64_t tenant_id, bool &res);
int set_enable_ddl_trans_new_lock(int64_t tenant_id); int set_enable_ddl_trans_new_lock(int64_t tenant_id);
int broadcast_consensus_version(const int64_t tenant_id); int broadcast_consensus_version(const int64_t tenant_id,
const int64_t schema_version,
const ObArray<ObAddr> &server_list);
private: private:
virtual void run1() override; virtual void run1() override;
int check_task_ready(int64_t task_id, bool &ready); int check_task_ready(int64_t task_id, bool &ready);