[CP] add consensus version for truncate

This commit is contained in:
obdev 2023-05-16 07:16:51 +00:00 committed by ob-robot
parent c5d6ca2c5d
commit 5980811329
18 changed files with 263 additions and 9 deletions

View File

@ -670,6 +670,7 @@ PCODE_DEF(OB_DUMP_SINGLE_TX_DATA, 0x742)
PCODE_DEF(OB_GET_LS_REPLAYED_SCN, 0x743)
PCODE_DEF(OB_REFRESH_TENANT_INFO, 0x744)
PCODE_DEF(OB_UPDATE_TENANT_INFO_CACHE, 0x745)
PCODE_DEF(OB_BROADCAST_CONSENSUS_VERSION, 0x746)
// BatchRpc
PCODE_DEF(OB_BATCH, 0x750)

View File

@ -2477,5 +2477,17 @@ int ObRpcGetServerResourceInfoP::process()
return ret;
}
int ObBroadcastConsensusVersionP::process()
{
int ret = OB_SUCCESS;
if (OB_ISNULL(gctx_.ob_service_)) {
ret = OB_INVALID_ARGUMENT;
LOG_ERROR("invalid argument", K(gctx_.ob_service_), K(ret));
} else {
ret = gctx_.ob_service_->broadcast_consensus_version(arg_, result_);
}
return ret;
}
} // end of namespace observer
} // end of namespace oceanbase

View File

@ -222,6 +222,7 @@ OB_DEFINE_PROCESSOR_SM(Srv, OB_SESS_INFO_VERIFICATION, ObSessInfoVerificationP);
OB_DEFINE_PROCESSOR_S(Srv, OB_SEND_HEARTBEAT, ObRpcSendHeartbeatP);
OB_DEFINE_PROCESSOR_S(Srv, OB_GET_SERVER_RESOURCE_INFO, ObRpcGetServerResourceInfoP);
OB_DEFINE_PROCESSOR_S(Srv, OB_UPDATE_TENANT_INFO_CACHE, ObUpdateTenantInfoCacheP);
OB_DEFINE_PROCESSOR_S(Srv, OB_BROADCAST_CONSENSUS_VERSION, ObBroadcastConsensusVersionP);
} // end of namespace observer
} // end of namespace oceanbase

View File

@ -1376,6 +1376,35 @@ int ObService::switch_schema(
return ret;
}
int ObService::broadcast_consensus_version(
const obrpc::ObBroadcastConsensusVersionArg &arg,
obrpc::ObBroadcastConsensusVersionRes &result)
{
int ret = OB_SUCCESS;
int64_t local_consensus_version = OB_INVALID_VERSION;
const uint64_t tenant_id = arg.get_tenant_id();
const int64_t consensus_version = arg.get_consensus_version();
if (OB_UNLIKELY(!inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("not init", KR(ret));
} else if (OB_UNLIKELY(consensus_version <= 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", KR(ret), K(consensus_version));
} else if (OB_ISNULL(gctx_.schema_service_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("schema_service is null", KR(ret));
} else if (OB_FAIL(gctx_.schema_service_->get_tenant_broadcast_consensus_version(tenant_id, local_consensus_version))) {
LOG_WARN("fail to get local tenant consensus_version", KR(ret), K(tenant_id));
} else if (OB_UNLIKELY(consensus_version < local_consensus_version)) {
ret = OB_EAGAIN;
LOG_WARN("consensus version is less than local consensus version", KR(ret), K(consensus_version), K(local_consensus_version));
} else if (OB_FAIL(gctx_.schema_service_->set_tenant_broadcast_consensus_version(tenant_id, consensus_version))) {
LOG_WARN("failt to update received schema version", KR(ret), K(tenant_id), K(consensus_version));
}
result.set_ret(ret);
return OB_SUCCESS;
}
int ObService::bootstrap(const obrpc::ObBootstrapArg &arg)
{
int ret = OB_SUCCESS;

View File

@ -131,6 +131,9 @@ public:
int stop_partition_write(const obrpc::Int64 &switchover_timestamp, obrpc::Int64 &result);
int check_partition_log(const obrpc::Int64 &switchover_timestamp, obrpc::Int64 &result);
int get_wrs_info(const obrpc::ObGetWRSArg &arg, obrpc::ObGetWRSResult &result);
int broadcast_consensus_version(
const obrpc::ObBroadcastConsensusVersionArg &arg,
obrpc::ObBroadcastConsensusVersionRes &result);
////////////////////////////////////////////////////////////////
// ObRpcFetchSysLSP @RS load balance
int fetch_sys_ls(share::ObLSReplica &replica);

View File

@ -290,4 +290,5 @@ void oceanbase::observer::init_srv_xlator_for_others(ObSrvRpcXlator *xlator) {
// session info verification
RPC_PROCESSOR(ObSessInfoVerificationP, gctx_);
RPC_PROCESSOR(ObBroadcastConsensusVersionP, gctx_);
}

View File

@ -140,6 +140,7 @@ public:
// these functions should be called after ddl_service has been inited
share::schema::ObMultiVersionSchemaService &get_schema_service() { return *schema_service_; }
common::ObMySQLProxy &get_sql_proxy() { return *sql_proxy_; }
ObUnitManager &get_unit_manager() { return *unit_mgr_; }
ObZoneManager &get_zone_mgr() { return *zone_mgr_; }
ObSnapshotInfoManager &get_snapshot_mgr() { return *snapshot_mgr_; }
share::ObLSTableOperator &get_lst_operator() { return *lst_operator_; }

View File

@ -75,7 +75,7 @@ RPC_F(obrpc::OB_GET_SERVER_RESOURCE_INFO, obrpc::ObGetServerResourceInfoArg, obr
RPC_F(obrpc::OB_NOTIFY_SWITCH_LEADER, obrpc::ObNotifySwitchLeaderArg,
obrpc::ObSrvRpcProxy::ObRpc<obrpc::OB_NOTIFY_SWITCH_LEADER>::Response, ObNotifySwitchLeaderProxy);
RPC_F(obrpc::OB_UPDATE_TENANT_INFO_CACHE, obrpc::ObUpdateTenantInfoCacheArg, obrpc::ObUpdateTenantInfoCacheRes, ObUpdateTenantInfoCacheProxy);
RPC_F(obrpc::OB_BROADCAST_CONSENSUS_VERSION, obrpc::ObBroadcastConsensusVersionArg, obrpc::ObBroadcastConsensusVersionRes, ObBroadcstConsensusVersionProxy);
}//end namespace rootserver
}//end namespace oceanbase

View File

@ -8368,5 +8368,37 @@ void ObGetServerResourceInfoResult::reset()
server_.reset();
resource_info_.reset();
}
OB_SERIALIZE_MEMBER(ObBroadcastConsensusVersionArg, tenant_id_, consensus_version_);
bool ObBroadcastConsensusVersionArg::is_valid() const
{
return OB_INVALID_TENANT_ID != tenant_id_ && OB_INVALID_VERSION != consensus_version_;
}
int ObBroadcastConsensusVersionArg::init(const uint64_t tenant_id, const int64_t consensus_version)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id
|| OB_INVALID_VERSION == consensus_version)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", KR(ret), K(tenant_id), K(consensus_version));
} else {
tenant_id_ = tenant_id;
consensus_version_ = consensus_version;
}
return ret;
}
int ObBroadcastConsensusVersionArg::assign(const ObBroadcastConsensusVersionArg &other)
{
int ret = OB_SUCCESS;
if (this != &other) {
tenant_id_ = other.tenant_id_;
consensus_version_ = other.consensus_version_;
}
return ret;
}
OB_SERIALIZE_MEMBER(ObBroadcastConsensusVersionRes, ret_);
}//end namespace obrpc
}//end namepsace oceanbase

View File

@ -9153,6 +9153,57 @@ private:
common::ObAddr server_;
share::ObServerResourceInfo resource_info_;
};
struct ObBroadcastConsensusVersionArg
{
OB_UNIS_VERSION(1);
public:
ObBroadcastConsensusVersionArg()
: tenant_id_(OB_INVALID_ID),
consensus_version_(OB_INVALID_VERSION) {}
~ObBroadcastConsensusVersionArg() {}
int init(const uint64_t tenant_id, const int64_t consensus_version);
bool is_valid() const;
int assign(const ObBroadcastConsensusVersionArg &other);
uint64_t get_tenant_id() const
{
return tenant_id_;
}
int64_t get_consensus_version() const
{
return consensus_version_;
}
void set_tenant_id(const uint64_t tenant_id)
{
tenant_id_ = tenant_id;
}
void set_consensus_version(const int64_t consensus_version)
{
consensus_version_ = consensus_version;
}
TO_STRING_KV(K_(tenant_id), K_(consensus_version));
private:
DISALLOW_COPY_AND_ASSIGN(ObBroadcastConsensusVersionArg);
private:
uint64_t tenant_id_;
int64_t consensus_version_;
};
struct ObBroadcastConsensusVersionRes
{
OB_UNIS_VERSION(1);
public:
ObBroadcastConsensusVersionRes()
: ret_(OB_SUCCESS) {}
~ObBroadcastConsensusVersionRes() {}
void set_ret(int ret) { ret_ = ret; }
TO_STRING_KV(K_(ret));
private:
DISALLOW_COPY_AND_ASSIGN(ObBroadcastConsensusVersionRes);
private:
int ret_;
};
}//end namespace obrpc
}//end namespace oceanbase
#endif

View File

@ -195,6 +195,7 @@ public:
RPC_AP(PR5 get_server_resource_info, OB_GET_SERVER_RESOURCE_INFO, (obrpc::ObGetServerResourceInfoArg), obrpc::ObGetServerResourceInfoResult);
RPC_AP(PR5 notify_switch_leader, OB_NOTIFY_SWITCH_LEADER, (obrpc::ObNotifySwitchLeaderArg));
RPC_AP(PR5 update_tenant_info_cache, OB_UPDATE_TENANT_INFO_CACHE, (obrpc::ObUpdateTenantInfoCacheArg), obrpc::ObUpdateTenantInfoCacheRes);
RPC_AP(PR5 broadcast_consensus_version, OB_BROADCAST_CONSENSUS_VERSION, (obrpc::ObBroadcastConsensusVersionArg), obrpc::ObBroadcastConsensusVersionRes);
}; // end of class ObSrvRpcProxy
} // end of namespace rpc

View File

@ -12,6 +12,7 @@
#include "share/schema/ob_ddl_trans_controller.h"
#include "share/schema/ob_multi_version_schema_service.h"
#include "rootserver/ob_root_service.h"
#include "share/ob_srv_rpc_proxy.h"
namespace oceanbase
@ -83,13 +84,15 @@ void ObDDLTransController::run1()
} else {
// ignore ret continue
for (int64_t i = 0; i < tenant_ids.count(); i++) {
int64_t start_time= ObTimeUtility::current_time();
int64_t start_time = ObTimeUtility::current_time();
ObCurTraceId::init(GCONF.self_addr_);
if (OB_FAIL(GCTX.root_service_->get_ddl_service().publish_schema(tenant_ids.at(i)))) {
LOG_WARN("refresh_schema fail", KR(ret), K(tenant_ids.at(i)));
} else if (OB_FAIL(broadcast_consensus_version(tenant_ids.at(i)))) {
LOG_WARN("fail to broadcast consensus version", KR(ret), K(tenant_ids.at(i)));
} else {
int64_t end_time= ObTimeUtility::current_time();
LOG_INFO("refresh_schema", K(tenant_ids.at(i)), K(end_time - start_time));
int64_t end_time = ObTimeUtility::current_time();
LOG_INFO("refresh_schema", KR(ret), K(tenant_ids.at(i)), K(end_time - start_time));
}
}
}
@ -100,6 +103,60 @@ void ObDDLTransController::run1()
}
}
int ObDDLTransController::broadcast_consensus_version(const int64_t tenant_id)
{
int ret = OB_SUCCESS;
ObZone zone;
ObTimeoutCtx ctx;
ObArray<ObAddr> server_list;
int64_t schema_version = OB_INVALID_VERSION;
obrpc::ObBroadcastConsensusVersionArg arg;
rootserver::ObBroadcstConsensusVersionProxy proxy(*GCTX.srv_rpc_proxy_, &obrpc::ObSrvRpcProxy::broadcast_consensus_version);
if (!inited_) {
ret = OB_NOT_INIT;
LOG_WARN("ObDDLTransController", KR(ret));
} else if (OB_ISNULL(schema_service_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("ObDDLTransController", KR(ret));
} else if (tenant_id == OB_INVALID_TENANT_ID) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid tenant id", KR(ret), K(tenant_id));
} else if (OB_ISNULL(GCTX.root_service_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("rootservice is null", KR(ret));
} else if (OB_FAIL(GCTX.root_service_->get_ddl_service().get_unit_manager().get_tenant_unit_servers(tenant_id, zone, server_list))) {
LOG_WARN("get alive server failed", KR(ret));
} else if (OB_FAIL(schema_service_->get_tenant_refreshed_schema_version(
tenant_id, schema_version))) {
LOG_WARN("fail to get tenant refreshed schema version", KR(ret), K(tenant_id));
} else if (OB_FAIL(ObShareUtil::set_default_timeout_ctx(ctx, GCONF.rpc_timeout))) {
LOG_WARN("fail to set default timeout ctx", KR(ret));
} else {
arg.set_tenant_id(tenant_id);
arg.set_consensus_version(schema_version);
FOREACH_X(s, server_list, OB_SUCC(ret)) {
if (OB_ISNULL(s)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("s is null", KR(ret));
} else {
// overwrite ret
if (OB_FAIL(proxy.call(*s, ctx.get_timeout(), arg))) {
LOG_WARN("send broadcast consensus version rpc failed", KR(ret),
K(ctx.get_timeout()), K(schema_version), K(arg), "server", *s);
ret = OB_SUCCESS;
}
}
}
int tmp_ret = OB_SUCCESS;
ObArray<int> return_code_array;
if (OB_TMP_FAIL(proxy.wait_all(return_code_array))) {
LOG_WARN("wait result failed", KR(tmp_ret));
}
}
LOG_INFO("broadcast consensus version finished", KR(ret), K(schema_version), K(arg), K(server_list));
return ret;
}
int ObDDLTransController::create_task_and_assign_schema_version(const uint64_t tenant_id,
const uint64_t schema_version_count,
int64_t &task_id,

View File

@ -52,6 +52,7 @@ public:
int remove_task(int64_t task_id);
int check_enable_ddl_trans_new_lock(int64_t tenant_id, bool &res);
int set_enable_ddl_trans_new_lock(int64_t tenant_id);
int broadcast_consensus_version(const int64_t tenant_id);
private:
virtual void run1() override;
int check_task_ready(int64_t task_id, bool &ready);

View File

@ -3667,6 +3667,47 @@ int ObMultiVersionSchemaService::get_tenant_received_broadcast_version(
return ret;
}
int ObMultiVersionSchemaService::get_tenant_broadcast_consensus_version(
const uint64_t tenant_id,
int64_t &consensus_version)
{
int ret = OB_SUCCESS;
if (OB_INVALID_TENANT_ID == tenant_id) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid tenant_id", KR(ret), K(tenant_id));
} else {
const ObSchemaStore* schema_store = NULL;
if (NULL == (schema_store = schema_store_map_.get(tenant_id))) {
ret = OB_ENTRY_NOT_EXIST;
LOG_WARN("fail to get schema_store", KR(ret));
} else {
consensus_version = schema_store->get_consensus_version();
}
}
return ret;
}
int ObMultiVersionSchemaService::set_tenant_broadcast_consensus_version(
const uint64_t tenant_id,
const int64_t consensus_version)
{
int ret = OB_SUCCESS;
if (OB_INVALID_TENANT_ID == tenant_id) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid tenant_id", KR(ret), K(tenant_id), K(consensus_version));
} else {
ObSchemaStore* schema_store = NULL;
if (NULL == (schema_store = schema_store_map_.get(tenant_id))) {
ret = OB_ENTRY_NOT_EXIST;
LOG_WARN("fail to get schema_store", KR(ret));
} else {
schema_store->update_consensus_version(consensus_version);
LOG_INFO("try to set tenant broadcast consensus version", KR(ret), K(tenant_id), K(consensus_version));
}
}
return ret;
}
int ObMultiVersionSchemaService::set_tenant_received_broadcast_version(
const uint64_t tenant_id,
const int64_t version)

View File

@ -226,6 +226,10 @@ public:
int get_schema_store_tenants(common::ObIArray<uint64_t> &tenant_ids);
bool check_schema_store_tenant_exist(const uint64_t &tenant_id);
int get_tenant_broadcast_consensus_version(const uint64_t tenant_id,
int64_t &consensus_version);
int set_tenant_broadcast_consensus_version(const uint64_t tenant_id,
const int64_t consensus_version);
virtual int set_tenant_received_broadcast_version(const uint64_t tenant_id, const int64_t version);
virtual int set_last_refreshed_schema_info(const ObRefreshSchemaInfo &schema_info);
int update_baseline_schema_version(const uint64_t tenant_id, const int64_t baseline_schema_version);

View File

@ -88,6 +88,15 @@ void ObSchemaStore::update_baseline_schema_version(int64_t version)
}
}
void ObSchemaStore::update_consensus_version(int64_t version)
{
if (version > consensus_version_) {
inc_update(&consensus_version_, version);
LOG_INFO("[SCHEMA_STORE] schema store update version",
K_(tenant_id), K(version), K_(consensus_version));
}
}
int ObSchemaStoreMap::init(int64_t bucket_num)
{
UNUSED(bucket_num);

View File

@ -32,7 +32,8 @@ public:
refreshed_version_(0),
received_version_(0),
checked_sys_version_(0),
baseline_schema_version_(common::OB_INVALID_VERSION) {}
baseline_schema_version_(common::OB_INVALID_VERSION),
consensus_version_(0) {}
~ObSchemaStore() {}
int init(const uint64_t tenant_id,
const int64_t init_version_count,
@ -42,16 +43,19 @@ public:
void update_received_version(int64_t version);
void update_checked_sys_version(int64_t version);
void update_baseline_schema_version(int64_t version);
void update_consensus_version(int64_t version);
int64_t get_refreshed_version() const { return ATOMIC_LOAD(&refreshed_version_); }
int64_t get_received_version() const { return ATOMIC_LOAD(&received_version_); }
int64_t get_checked_sys_version() const { return ATOMIC_LOAD(&checked_sys_version_); }
int64_t get_baseline_schema_version() const { return ATOMIC_LOAD(&baseline_schema_version_); }
int64_t get_consensus_version() const { return ATOMIC_LOAD(&consensus_version_); }
int64_t tenant_id_;
int64_t refreshed_version_;
int64_t received_version_;
int64_t checked_sys_version_;
int64_t baseline_schema_version_;
int64_t consensus_version_;
ObSchemaMgrCache schema_mgr_cache_;
ObSchemaMgrCache schema_mgr_cache_for_liboblog_;
};
@ -79,4 +83,3 @@ private:
}; // end namespace oceanbase
#endif /* OCEANBASE_SCHEMA_OB_TENANT_SCHEMA_STORE_H_ */

View File

@ -2124,9 +2124,16 @@ int ObTruncateTableExecutor::execute(ObExecContext &ctx, ObTruncateTableStmt &st
// wait schema_version refreshed on this server
while (OB_SUCC(ret) && ctx.get_timeout() > 0) {
int64_t refreshed_schema_version = OB_INVALID_VERSION;
int64_t consensus_schema_version = OB_INVALID_VERSION;
if (OB_FAIL(GCTX.schema_service_->get_tenant_refreshed_schema_version(res.tenant_id_, refreshed_schema_version))) {
LOG_WARN("get schema_version fail", KR(ret), K(res.tenant_id_));
} else if (refreshed_schema_version >= res.task_id_) {
LOG_WARN("get refreshed schema_version fail", KR(ret), K(res.tenant_id_));
} else if (OB_FAIL(GCTX.schema_service_->get_tenant_broadcast_consensus_version(res.tenant_id_, consensus_schema_version))) {
LOG_WARN("get consensus schema_version fail", KR(ret), K(res.tenant_id_));
} else if (refreshed_schema_version >= res.task_id_
&& consensus_schema_version >= res.task_id_) {
break;
} else if (refreshed_schema_version >= res.task_id_
&& ObTimeUtility::current_time() - step_time >= 10 * 1000 * 1000) { //10s
break;
} else {
ob_usleep(10 * 1000);
@ -2134,7 +2141,7 @@ int ObTruncateTableExecutor::execute(ObExecContext &ctx, ObTruncateTableStmt &st
}
}
int64_t end_time = ObTimeUtility::current_time();
LOG_INFO("truncate_table_v2", K(ret), "cost", end_time-start_time,
LOG_INFO("truncate_table_v2", KR(ret), "cost", end_time-start_time,
"trans_cost", step_time - start_time,
"wait_refresh", end_time - step_time,
"table_name", truncate_table_arg.table_name_,