make root key synchronization more robust

This commit is contained in:
yinyj17 2023-09-14 07:10:30 +00:00 committed by ob-robot
parent 8a13fac989
commit 864eccfc5c
4 changed files with 151 additions and 188 deletions

View File

@ -35,6 +35,9 @@
#include "storage/tx_storage/ob_ls_service.h"
#include "storage/meta_mem/ob_tenant_meta_mem_mgr.h"
#include "storage/slog_ckpt/ob_server_checkpoint_slog_handler.h"
#ifdef OB_BUILD_TDE_SECURITY
#include "share/ob_master_key_getter.h"
#endif
using namespace oceanbase::obsys;
using namespace oceanbase::lib;
@ -192,6 +195,14 @@ int ObTenantNodeBalancer::notify_create_tenant(const obrpc::TenantServerUnitConf
ret = OB_SUCCESS;
LOG_INFO("succ to create new user tenant", KR(ret), K(unit), K(basic_tenant_unit), K(create_tenant_timeout_ts));
}
#ifdef OB_BUILD_TDE_SECURITY
if (OB_SUCC(ret) && is_user_tenant(tenant_id)) {
ObRootKey root_key;
if (OB_FAIL(ObMasterKeyGetter::instance().get_root_key(tenant_id, root_key, true))) {
LOG_WARN("failed to get root key", K(ret));
}
}
#endif
// create meta tenant
if (OB_SUCC(ret) && is_user_tenant(tenant_id)) {
if (OB_FAIL(check_new_tenant(meta_tenant_unit, create_tenant_timeout_ts))) {

View File

@ -22124,9 +22124,32 @@ int ObDDLService::create_tenant_schema(
OB_DDL_ADD_TENANT_START, trans, &arg.ddl_stmt_str_))) {
LOG_WARN("create tenant failed", KR(ret), K(user_tenant_schema));
}
LOG_INFO("[CREATE_TENANT] STEP 1.2. finish create tenant schema", KR(ret), K(arg),
LOG_INFO("[CREATE_TENANT] STEP 1.1. finish create tenant schema", KR(ret), K(arg),
"cost", ObTimeUtility::fast_current_time() - tmp_start_time);
}
#ifdef OB_BUILD_TDE_SECURITY
if (OB_SUCC(ret)) {
LOG_INFO("[CREATE_TENANT] STEP 1.1.1. start create root key", K(user_tenant_id));
const int64_t tmp_start_time = ObTimeUtility::fast_current_time();
ObArray<ObAddr> addrs;
bool need_create = false;
if (OB_FAIL(check_need_create_root_key(arg, need_create))) {
LOG_WARN("fail to check need create root key", K(ret));
} else if (!need_create) {
// do nothing
} else if (arg.is_creating_standby_) {
if (OB_FAIL(standby_create_root_key(user_tenant_id, arg, addrs))) {
LOG_WARN("failed to create root key", KR(ret), K(user_tenant_id), K(arg));
}
} else if (OB_FAIL(create_root_key(*rpc_proxy_, user_tenant_id, addrs))) {
LOG_WARN("fail to create root key", KR(ret), K(addrs));
}
LOG_INFO("[CREATE_TENANT] STEP 1.1.1. finish create root key",
KR(ret), K(user_tenant_id), "cost", ObTimeUtility::fast_current_time() - tmp_start_time);
}
#endif
// 2. grant pool
if (OB_SUCC(ret)) {
LOG_INFO("[CREATE_TENANT] STEP 1.2. start grant pools", K(user_tenant_id));
@ -22181,30 +22204,6 @@ int ObDDLService::create_tenant_schema(
"cost", ObTimeUtility::fast_current_time() - tmp_start_time);
}
#ifdef OB_BUILD_TDE_SECURITY
if (OB_SUCC(ret)) {
LOG_INFO("[CREATE_TENANT] STEP 1.5. start create root key", K(user_tenant_id));
const int64_t tmp_start_time = ObTimeUtility::fast_current_time();
ObArray<ObAddr> addrs;
bool need_create = false;
if (OB_FAIL(check_need_create_root_key(arg, need_create))) {
LOG_WARN("fail to check need create root key", K(ret));
} else if (!need_create) {
// do nothing
} else if (OB_FAIL(unit_mgr_->get_servers_by_pools(pools, addrs))) {
LOG_WARN("fail to get tenant's servers", KR(ret), K(user_tenant_id));
} else if (arg.is_creating_standby_) {
if (OB_FAIL(standby_create_root_key(user_tenant_id, arg, addrs))) {
LOG_WARN("failed to create root key", KR(ret), K(user_tenant_id), K(arg));
}
} else if (OB_FAIL(create_root_key(*rpc_proxy_, user_tenant_id, addrs))) {
LOG_WARN("fail to create root key", KR(ret), K(addrs));
}
LOG_INFO("[CREATE_TENANT] STEP 1.5. finish create root key",
KR(ret), K(user_tenant_id), "cost", ObTimeUtility::fast_current_time() - tmp_start_time);
}
#endif
if (OB_SUCC(ret)) {
ObArray<ObAddr> addrs;
ObZone zone; // empty means get all zone's servers
@ -22374,6 +22373,7 @@ int ObDDLService::get_root_key_from_primary(const obrpc::ObCreateTenantArg &arg,
ObLogRestoreSourceServiceConfigParser log_restore_source(ObBackupConfigType::LOG_RESTORE_SOURCE, tenant_id);
common::ObSqlString value;
obrpc::ObRootKeyArg root_key_arg;
obrpc::ObRootKeyResult result;
if (OB_FAIL(value.assign(arg.log_restore_source_))) {
LOG_WARN("fail to assign value", KR(ret), K(log_restore_source));
} else if (OB_FAIL(log_restore_source.get_primary_server_addr(
@ -22382,10 +22382,13 @@ int ObDDLService::get_root_key_from_primary(const obrpc::ObCreateTenantArg &arg,
} else if (OB_FAIL(root_key_arg.init_for_get(primary_tenant_id))) {
LOG_WARN("failed to init for get", KR(ret), K(primary_tenant_id));
}
if (FAILEDx(get_root_key_from_obs(cluster_id, *rpc_proxy_, root_key_arg,
addr_list, key_type, key_value, allocator))) {
if (FAILEDx(notify_root_key(*rpc_proxy_, root_key_arg,
addr_list, result, true, cluster_id, &allocator))) {
LOG_WARN("failed to get root key from obs", KR(ret), K(cluster_id),
K(root_key_arg), K(addr_list));
} else {
key_type = result.key_type_;
key_value = result.root_key_;
}
if (OB_INVALID_ROOT_KEY == ret) {
LOG_USER_ERROR(OB_INVALID_ROOT_KEY, "Can not get root key from primary tenant");
@ -22402,7 +22405,7 @@ int ObDDLService::create_root_key(
const common::ObIArray<common::ObAddr> &addrs)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id || addrs.count() <= 0)) {
if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("tenant_id is invalid", KR(ret), K(tenant_id), K(addrs));
} else {
@ -22422,55 +22425,71 @@ int ObDDLService::create_root_key(
return ret;
}
int ObDDLService::get_root_key_from_obs(
const uint64_t &cluster_id,
obrpc::ObSrvRpcProxy &rpc_proxy,
const obrpc::ObRootKeyArg &arg,
const common::ObIArray<common::ObAddr> &addrs,
obrpc::RootKeyType &key_type,
common::ObString &key_value,
common::ObIAllocator &allocator)
int ObDDLService::notify_root_key(
obrpc::ObSrvRpcProxy &rpc_proxy,
const obrpc::ObRootKeyArg &arg,
const common::ObIArray<common::ObAddr> &addrs,
obrpc::ObRootKeyResult &result,
const bool enable_default /*=true*/,
const uint64_t &cluster_id /*=OB_INVALID_CLUSTER_ID*/,
common::ObIAllocator *allocator /*=NULL*/)
{
int ret = OB_SUCCESS;
key_type = obrpc::RootKeyType::INVALID;
key_value.reset();
if (OB_UNLIKELY(OB_INVALID_CLUSTER_ID == cluster_id
|| !arg.is_valid() || arg.is_set_)) {
ObTimeoutCtx ctx;
bool has_failed = false;
const int64_t DEFAULT_TIMEOUT = 10 * 1000 * 1000L; // 10s
if (OB_UNLIKELY(!arg.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", KR(ret), K(arg));
} else if (OB_FAIL(ObShareUtil::set_default_timeout_ctx(ctx, DEFAULT_TIMEOUT))) {
LOG_WARN("fail to set default timeout", KR(ret));
} else {
ObTimeoutCtx ctx;
bool has_failed = false;
const int64_t DEFAULT_TIMEOUT = 10 * 1000 * 1000L; // 10s
if (OB_FAIL(ObShareUtil::set_default_timeout_ctx(ctx, DEFAULT_TIMEOUT))) {
LOG_WARN("fail to set default timeout", KR(ret));
} else {
// 1. send rpc
rootserver::ObSetRootKeyProxy proxy(
// 1. send rpc
rootserver::ObSetRootKeyProxy proxy(
rpc_proxy, &obrpc::ObSrvRpcProxy::set_root_key);
int tmp_ret = OB_SUCCESS;
for (int64_t i = 0; OB_SUCC(ret) && i < addrs.count(); i++) {
int64_t timeout = ctx.get_timeout();
const ObAddr &addr = addrs.at(i);
if (OB_TMP_FAIL(proxy.call(addr, timeout, cluster_id, OB_SYS_TENANT_ID, arg))) {
has_failed = true;
LOG_WARN("send rpc failed", KR(ret), KR(tmp_ret), K(addr), K(timeout), K(arg), K(cluster_id));
}
} // end for
// 2. check result
ObArray<int> return_ret_array;
if (OB_SUCCESS != (tmp_ret = proxy.wait_all(return_ret_array))) {
LOG_WARN("wait batch result failed", KR(tmp_ret), KR(ret));
ret = OB_SUCC(ret) ? tmp_ret : ret;
int tmp_ret = OB_SUCCESS;
int return_ret = OB_SUCCESS;
bool need_call_rs = OB_INVALID_CLUSTER_ID == cluster_id;
ObAddr rs_addr = GCONF.self_addr_;
int64_t timeout = ctx.get_timeout();
for (int64_t i = 0; OB_SUCC(ret) && i < addrs.count(); i++) {
const ObAddr &addr = addrs.at(i);
if (OB_TMP_FAIL(proxy.call(addr, timeout, cluster_id, OB_SYS_TENANT_ID, arg))) {
has_failed = true;
return_ret= tmp_ret;
LOG_WARN("send rpc failed", KR(ret), KR(tmp_ret), K(addr), K(timeout), K(cluster_id));
} else if (rs_addr == addr) {
need_call_rs = false;
}
for (int64_t i = 0; OB_SUCC(ret) && i < return_ret_array.count() && !has_failed; ++i) {
tmp_ret = return_ret_array.at(i);
if (OB_TMP_FAIL(tmp_ret)) {
has_failed = true;
LOG_WARN("failed to get root key from observer", KR(tmp_ret), K(i));
}
} // end for
if (OB_FAIL(ret) || !need_call_rs) {
} else if (OB_TMP_FAIL(proxy.call(rs_addr, timeout, cluster_id, OB_SYS_TENANT_ID, arg))) {
has_failed = true;
return_ret= tmp_ret;
LOG_WARN("fail to call rs", KR(ret), KR(tmp_ret), K(rs_addr), K(timeout), K(cluster_id));
}
// 2. check result
ObArray<int> return_ret_array;
if (OB_SUCCESS != (tmp_ret = proxy.wait_all(return_ret_array))) { // ignore ret
LOG_WARN("wait batch result failed", KR(tmp_ret), KR(ret));
ret = OB_SUCC(ret) ? tmp_ret : ret;
}
for (int64_t i = 0; OB_SUCC(ret) && i < return_ret_array.count() && !has_failed; ++i) {
if (OB_TMP_FAIL(return_ret_array.at(i))) {
has_failed = true;
return_ret= tmp_ret;
LOG_WARN("rpc return error", KR(tmp_ret), K(i));
}
for (int64_t i = 0; OB_SUCC(ret) && i < proxy.get_results().count(); i++) {
}
if (OB_SUCC(ret) && arg.is_set_) {
if (OB_UNLIKELY(has_failed)) {
ret = return_ret;
LOG_WARN("failed to set root key", KR(ret));
}
} else {
obrpc::RootKeyType key_type = obrpc::RootKeyType::INVALID;
ObString root_key;
for (int64_t i = 0; OB_SUCC(ret) && i < proxy.get_results().count(); ++i) {
const ObRootKeyResult *rpc_result = proxy.get_results().at(i);
if (OB_ISNULL(rpc_result)) {
ret = OB_ERR_UNEXPECTED;
@ -22481,132 +22500,54 @@ int ObDDLService::get_root_key_from_obs(
if (OB_UNLIKELY(obrpc::RootKeyType::INVALID != key_type)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("root key type is conflict", KR(ret), K(key_type), KPC(rpc_result));
} else if (OB_FAIL(deep_copy_ob_string(allocator, rpc_result->root_key_, key_value))) {
LOG_WARN("failed to assign result", KR(ret), KPC(rpc_result));
}
if (OB_SUCC(ret)) {
key_type = rpc_result->key_type_;
root_key = rpc_result->root_key_;
}
} else if (OB_UNLIKELY(0 != key_value.compare(rpc_result->root_key_))) {
} else if (OB_UNLIKELY(0 != root_key.compare(rpc_result->root_key_))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("root key is conflict", KR(ret), K(key_value), KPC(rpc_result));
LOG_WARN("root key is conflict", KR(ret), K(root_key), KPC(rpc_result));
}
} // end for
if (OB_SUCC(ret) && obrpc::RootKeyType::INVALID == key_type) {
if (has_failed) {
ret = OB_INVALID_ROOT_KEY;
LOG_WARN("failed to get root key from obs", KR(ret), K(cluster_id),
K(addrs), K(key_type), K(key_value));
} else {
K(addrs), K(key_type), K(root_key));
} else if (enable_default) {
//If the root_key cannot be obtained from all current observers,
//set default. This tenant may be an upgraded tenant.
//The scope of this observer is obtained from the __all_virtual_log_stat,
//The addrs are obtained from the __all_virtual_log_stat in standby cluster,
//it may not be all observers, ignore this situation
key_type = obrpc::RootKeyType::DEFAULT;
LOG_INFO("can not get root key from all observer, set default", K(cluster_id),
K(addrs), K(key_type), K(key_value));
K(addrs), K(key_type), K(root_key));
}
}
}
}
return ret;
}
int ObDDLService::notify_root_key(
obrpc::ObSrvRpcProxy &rpc_proxy,
const obrpc::ObRootKeyArg &arg,
const common::ObIArray<common::ObAddr> &addrs,
obrpc::ObRootKeyResult &result)
{
int ret = OB_SUCCESS;
ObTimeoutCtx ctx;
const int64_t DEFAULT_TIMEOUT = 10 * 1000 * 1000L; // 10s
if (OB_FAIL(ObShareUtil::set_default_timeout_ctx(ctx, DEFAULT_TIMEOUT))) {
LOG_WARN("fail to set default timeout", KR(ret));
} else {
int64_t server_cnt = addrs.count();
// 1. send rpc
rootserver::ObSetRootKeyProxy proxy(
rpc_proxy, &obrpc::ObSrvRpcProxy::set_root_key);
bool call_rs = false;
ObAddr rs_addr = GCONF.self_addr_;
int64_t timeout = ctx.get_timeout();
for (int64_t i = 0; OB_SUCC(ret) && i < addrs.count(); i++) {
const ObAddr &addr = addrs.at(i);
if (OB_FAIL(proxy.call(addr, timeout, arg))) {
LOG_WARN("send rpc failed", KR(ret), K(addr), K(timeout), K(arg));
} else if (rs_addr == addr) {
call_rs = true;
}
} // end for
if (OB_FAIL(ret) || call_rs) {
} else if (OB_FAIL(proxy.call(rs_addr, timeout, arg))) {
LOG_WARN("fail to call rs", KR(ret), K(rs_addr), K(timeout), K(arg));
} else {
server_cnt++;
}
// 2. check result
ObArray<int> return_ret_array;
int tmp_ret = OB_SUCCESS;
if (OB_SUCCESS != (tmp_ret = proxy.wait_all(return_ret_array))) { // ignore ret
LOG_WARN("wait batch result failed", KR(tmp_ret), KR(ret));
ret = OB_SUCC(ret) ? tmp_ret : ret;
} else if (return_ret_array.count() != server_cnt ||
proxy.get_results().count() != server_cnt) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("result cnt not match", KR(ret), K(server_cnt), "ret_cnt", return_ret_array.count(),
"result_cnt", proxy.get_results().count());
}
for (int64_t i = 0; OB_SUCC(ret) && i < return_ret_array.count(); i++) {
int return_ret = return_ret_array.at(i);
const ObAddr &addr = proxy.get_dests().at(i);
const ObRootKeyResult *result = proxy.get_results().at(i);
if (OB_SUCCESS != return_ret) {
ret = return_ret;
LOG_WARN("rpc return error", KR(ret), K(addr), K(timeout));
} else if (OB_ISNULL(result)) {
if (OB_FAIL(ret)) {
} else if (obrpc::RootKeyType::INVALID == key_type) {
result.key_type_ = key_type;
result.root_key_.reset();
} else if (OB_INVALID_CLUSTER_ID != cluster_id) {
if (OB_ISNULL(allocator)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("allocator is null", K(ret));
} else if (OB_FAIL(deep_copy_ob_string(*allocator, root_key, result.root_key_))) {
LOG_WARN("failed to deep copy string", KR(ret));
} else {
result.key_type_ = key_type;
}
} else if (OB_FAIL(ObMasterKeyGetter::instance().set_root_key(arg.tenant_id_,
key_type, root_key))) {
LOG_WARN("failed to set root key", K(ret));
} else if (OB_FAIL(ObMasterKeyGetter::instance().get_root_key(arg.tenant_id_,
result.key_type_, result.root_key_))) {
LOG_WARN("failed to get root key", K(ret));
} else if (OB_UNLIKELY(key_type != result.key_type_ ||
0 != root_key.compare(result.root_key_))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get empty result", KR(ret), K(addr), K(timeout));
}
} // end for
if (OB_SUCC(ret) && !arg.is_set_) {
obrpc::RootKeyType key_type = obrpc::RootKeyType::INVALID;
ObString root_key;
for (int64_t i = 0; OB_SUCC(ret) && i < proxy.get_results().count(); ++i) {
const ObRootKeyResult *rpc_result = proxy.get_results().at(i);
if (OB_ISNULL(rpc_result)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get empty result", KR(ret), "addr", proxy.get_dests().at(i), K(timeout));
} else if (obrpc::RootKeyType::INVALID == rpc_result->key_type_) {
// do nothing
} else if (rpc_result->key_type_ != key_type) {
if (OB_UNLIKELY(obrpc::RootKeyType::INVALID != key_type)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("root key type is conflict", K(key_type), K(rpc_result->key_type_), K(ret));
} else {
key_type = rpc_result->key_type_;
root_key = rpc_result->root_key_;
}
} else if (OB_UNLIKELY(0 != root_key.compare(rpc_result->root_key_))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("root key is conflict", K(root_key), K(rpc_result->root_key_), K(ret));
}
}
if (OB_SUCC(ret)) {
if (obrpc::RootKeyType::INVALID == key_type) {
key_type = obrpc::RootKeyType::DEFAULT;
}
if (OB_FAIL(ObMasterKeyGetter::instance().set_root_key(arg.tenant_id_,
key_type, root_key))) {
LOG_WARN("failed to set root key", K(ret));
} else if (OB_FAIL(ObMasterKeyGetter::instance().get_root_key(arg.tenant_id_,
result.key_type_, result.root_key_))) {
LOG_WARN("failed to get root key", K(ret));
} else if (OB_UNLIKELY(key_type != result.key_type_ ||
0 != root_key.compare(result.root_key_))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpect root key", K(ret));
}
LOG_WARN("get unexpect root key", K(ret));
}
}
}

View File

@ -1926,14 +1926,6 @@ public:
const uint64_t tenant_id, obrpc::RootKeyType &key_type,
common::ObString &key_value,
common::ObIAllocator &allocator);
static int get_root_key_from_obs(
const uint64_t &cluster_id,
obrpc::ObSrvRpcProxy &rpc_proxy,
const obrpc::ObRootKeyArg &arg,
const common::ObIArray<common::ObAddr> &addrs,
obrpc::RootKeyType &key_type,
common::ObString &key_value,
common::ObIAllocator &allocator);
int standby_create_root_key(
const uint64_t tenant_id,
const obrpc::ObCreateTenantArg &arg,
@ -1946,7 +1938,10 @@ public:
obrpc::ObSrvRpcProxy &rpc_proxy,
const obrpc::ObRootKeyArg &arg,
const common::ObIArray<common::ObAddr> &addrs,
obrpc::ObRootKeyResult &result);
obrpc::ObRootKeyResult &result,
const bool enable_default = true,
const uint64_t &cluster_id = OB_INVALID_CLUSTER_ID,
common::ObIAllocator *allocator = NULL);
#endif
private:
int handle_security_audit_for_stmt(const obrpc::ObSecurityAuditArg &arg,

View File

@ -10573,10 +10573,26 @@ int ObRootService::get_root_key_from_obs(const obrpc::ObRootKeyArg &arg,
ObZone empty_zone;
ObArray<ObAddr> active_server_list;
ObArray<ObAddr> inactive_server_list;
if (OB_FAIL(SVR_TRACER.get_servers_by_status(empty_zone, active_server_list,
inactive_server_list))) {
const ObSimpleTenantSchema *simple_tenant = NULL;
ObSchemaGetterGuard guard;
const uint64_t tenant_id = arg.tenant_id_;
bool enable_default = false;
if (OB_ISNULL(schema_service_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("schema_serviece_ is null", KR(ret));
} else if (OB_FAIL(schema_service_->get_tenant_schema_guard(OB_SYS_TENANT_ID, guard))) {
LOG_WARN("fail to get sys schema guard", KR(ret));
} else if (OB_FAIL(guard.get_tenant_info(tenant_id, simple_tenant))) {
LOG_WARN("fail to get simple tenant schema", KR(ret), K(tenant_id));
} else if (OB_NOT_NULL(simple_tenant) && simple_tenant->is_normal()) {
enable_default = true;
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(SVR_TRACER.get_servers_by_status(empty_zone, active_server_list,
inactive_server_list))) {
LOG_WARN("get alive servers failed", K(ret));
} else if (OB_FAIL(ObDDLService::notify_root_key(rpc_proxy_, arg, active_server_list, result))) {
} else if (OB_FAIL(ObDDLService::notify_root_key(rpc_proxy_, arg, active_server_list, result,
enable_default))) {
LOG_WARN("failed to notify root key");
}
return ret;