[FEAT MERGE] Patch Assume Role from 4.2.5 to 4.3.5
This commit is contained in:
parent
6c53a590ee
commit
bcdbabf580
1
deps/init/oceanbase.el7.aarch64.deps
vendored
1
deps/init/oceanbase.el7.aarch64.deps
vendored
@ -29,6 +29,7 @@ devdeps-boost-1.74.0-22022110914.el7.aarch64.rpm
|
||||
devdeps-s2geometry-0.9.0-12023092021.el7.aarch64.rpm
|
||||
devdeps-icu-69.1-72022112416.el7.aarch64.rpm
|
||||
devdeps-cos-c-sdk-5.0.21-202024080916.el7.aarch64.rpm
|
||||
devdeps-protobuf-c-1.4.1-100000072023102410.el7.aarch64.rpm
|
||||
devdeps-s3-cpp-sdk-1.11.156-102023122011.el7.aarch64.rpm
|
||||
devdeps-protobuf-c-1.4.1-100000072023102410.el7.aarch64.rpm
|
||||
devdeps-roaringbitmap-croaring-3.0.0-42024042816.el7.aarch64.rpm
|
||||
|
1
deps/init/oceanbase.el7.x86_64.deps
vendored
1
deps/init/oceanbase.el7.x86_64.deps
vendored
@ -32,6 +32,7 @@ devdeps-s2geometry-0.9.0-12023092021.el7.x86_64.rpm
|
||||
devdeps-icu-69.1-72022112416.el7.x86_64.rpm
|
||||
devdeps-cloud-qpl-1.1.0-272023061419.el7.x86_64.rpm
|
||||
devdeps-cos-c-sdk-5.0.21-202024080916.el7.x86_64.rpm
|
||||
devdeps-protobuf-c-1.4.1-100000062023102016.el7.x86_64.rpm
|
||||
devdeps-s3-cpp-sdk-1.11.156-102023122011.el7.x86_64.rpm
|
||||
devdeps-protobuf-c-1.4.1-100000062023102016.el7.x86_64.rpm
|
||||
devdeps-roaringbitmap-croaring-3.0.0-42024042816.el7.x86_64.rpm
|
||||
|
1
deps/init/oceanbase.el8.aarch64.deps
vendored
1
deps/init/oceanbase.el8.aarch64.deps
vendored
@ -29,6 +29,7 @@ devdeps-boost-1.74.0-22022110914.el8.aarch64.rpm
|
||||
devdeps-s2geometry-0.9.0-12023092021.el8.aarch64.rpm
|
||||
devdeps-icu-69.1-72022112416.el8.aarch64.rpm
|
||||
devdeps-cos-c-sdk-5.0.21-202024080916.el8.aarch64.rpm
|
||||
devdeps-protobuf-c-1.4.1-100000072023102410.el8.aarch64.rpm
|
||||
devdeps-s3-cpp-sdk-1.11.156-102023122011.el8.aarch64.rpm
|
||||
devdeps-protobuf-c-1.4.1-100000072023102410.el8.aarch64.rpm
|
||||
devdeps-roaringbitmap-croaring-3.0.0-42024042816.el8.aarch64.rpm
|
||||
|
1
deps/init/oceanbase.el8.x86_64.deps
vendored
1
deps/init/oceanbase.el8.x86_64.deps
vendored
@ -30,6 +30,7 @@ devdeps-boost-1.74.0-22022110914.el8.x86_64.rpm
|
||||
devdeps-s2geometry-0.9.0-12023092021.el8.x86_64.rpm
|
||||
devdeps-icu-69.1-72022112416.el8.x86_64.rpm
|
||||
devdeps-cos-c-sdk-5.0.21-202024080916.el8.x86_64.rpm
|
||||
devdeps-protobuf-c-1.4.1-100000062023102016.el8.x86_64.rpm
|
||||
devdeps-s3-cpp-sdk-1.11.156-102023122011.el8.x86_64.rpm
|
||||
devdeps-protobuf-c-1.4.1-100000062023102016.el8.x86_64.rpm
|
||||
devdeps-roaringbitmap-croaring-3.0.0-42024042816.el8.x86_64.rpm
|
||||
|
1
deps/oblib/src/lib/ob_errno.h
vendored
1
deps/oblib/src/lib/ob_errno.h
vendored
@ -119,6 +119,7 @@ constexpr int OB_HASH_PLACEMENT_RETRY = -4205;
|
||||
constexpr int OB_HASH_FULL = -4206;
|
||||
constexpr int OB_WAIT_NEXT_TIMEOUT = -4208;
|
||||
constexpr int OB_MAJOR_FREEZE_NOT_FINISHED = -4213;
|
||||
constexpr int OB_CURL_ERROR = -4216;
|
||||
constexpr int OB_INVALID_DATE_VALUE = -4219;
|
||||
constexpr int OB_INACTIVE_SQL_CLIENT = -4220;
|
||||
constexpr int OB_INACTIVE_RPC_PROXY = -4221;
|
||||
|
1
deps/oblib/src/lib/restore/CMakeLists.txt
vendored
1
deps/oblib/src/lib/restore/CMakeLists.txt
vendored
@ -17,6 +17,7 @@ oblib_add_library(restore OBJECT
|
||||
cos/ob_cos_wrapper_handle.h
|
||||
cos/ob_singleton.h
|
||||
ob_object_device.cpp
|
||||
hmac_signature.cpp
|
||||
ob_object_device.h
|
||||
ob_object_storage_base.cpp)
|
||||
|
||||
|
@ -618,6 +618,7 @@ int ObCosWrapper::create_cos_handle(
|
||||
OB_COS_customMem &custom_mem,
|
||||
const struct ObCosAccount &account,
|
||||
const bool check_md5,
|
||||
const char *cos_sts_token,
|
||||
ObCosWrapper::Handle **h)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -666,6 +667,9 @@ int ObCosWrapper::create_cos_handle(
|
||||
cos_str_set(&ctx->options->config->access_key_id, account.access_id_);
|
||||
cos_str_set(&ctx->options->config->access_key_secret, account.access_key_);
|
||||
cos_str_set(&ctx->options->config->appid, account.appid_);
|
||||
if (nullptr != cos_sts_token) {
|
||||
cos_str_set(&ctx->options->config->sts_token, cos_sts_token);
|
||||
}
|
||||
ctx->options->config->is_cname = 0;
|
||||
// connection timeout, default 60s
|
||||
ctx->options->ctl->options->connect_timeout = 60;
|
||||
|
@ -230,6 +230,7 @@ public:
|
||||
OB_COS_customMem &custom_mem,
|
||||
const struct ObCosAccount &account,
|
||||
const bool check_md5,
|
||||
const char *cos_sts_token,
|
||||
Handle **h);
|
||||
|
||||
// You can not use handle any more after destroy_cos_handle is called.
|
||||
|
@ -12,6 +12,7 @@
|
||||
|
||||
#include "lib/utility/ob_print_utils.h"
|
||||
#include "ob_cos_wrapper_handle.h"
|
||||
#include "lib/restore/ob_i_storage.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
@ -46,6 +47,10 @@ void ObCosMemAllocator::reuse()
|
||||
allocator_.reuse();
|
||||
}
|
||||
|
||||
void ObCosMemAllocator::reset()
|
||||
{
|
||||
allocator_.clear();
|
||||
}
|
||||
|
||||
// Memory function used for cos context
|
||||
static void *ob_cos_malloc(void *opaque, size_t size)
|
||||
@ -68,7 +73,7 @@ static void ob_cos_free(void *opaque, void *address)
|
||||
|
||||
/*--------------------------------ObCosWrapperHandle-----------------------------------*/
|
||||
ObCosWrapperHandle::ObCosWrapperHandle()
|
||||
: is_inited_(false), handle_(nullptr), cos_account_(), allocator_(),
|
||||
: is_inited_(false), handle_(nullptr), cos_account_(), sts_token_(), allocator_(),
|
||||
delete_mode_(ObStorageDeleteMode::STORAGE_DELETE_MODE)
|
||||
{}
|
||||
|
||||
@ -83,11 +88,13 @@ int ObCosWrapperHandle::init(const ObObjectStorageInfo *storage_info)
|
||||
} else if (OB_ISNULL(storage_info)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
OB_LOG(WARN, "storage info is null", K(ret));
|
||||
} else if (OB_FAIL(storage_info->get_storage_info_str(storage_info_str, sizeof(storage_info_str)))) {
|
||||
} else if (OB_FAIL(storage_info->get_authorization_str(
|
||||
storage_info_str, sizeof(storage_info_str), sts_token_))) {
|
||||
OB_LOG(WARN, "fail to get cos storage info str", K(ret), K(storage_info));
|
||||
} else if (OB_FAIL(cos_account_.parse_from(storage_info_str, strlen(storage_info_str)))) {
|
||||
OB_LOG(WARN, "fail to parse cos account from storage info str", K(ret));
|
||||
} else if (strlen(cos_account_.delete_mode_) > 0 && OB_FAIL(set_delete_mode(cos_account_.delete_mode_))) {
|
||||
OB_LOG(WARN, "fail to parse cos account from authorization str", K(ret));
|
||||
} else if (strlen(cos_account_.delete_mode_) > 0
|
||||
&& OB_FAIL(set_delete_mode(cos_account_.delete_mode_))) {
|
||||
OB_LOG(WARN, "fail to set cos delete mode", K(cos_account_.delete_mode_), K(ret));
|
||||
} else {
|
||||
is_inited_ = true;
|
||||
@ -100,25 +107,28 @@ void ObCosWrapperHandle::reset()
|
||||
destroy_cos_handle();
|
||||
is_inited_ = false;
|
||||
delete_mode_ = ObStorageDeleteMode::STORAGE_DELETE_MODE;
|
||||
// clear memory used by cos account sts token
|
||||
cos_account_.clear();
|
||||
sts_token_.reset();
|
||||
allocator_.reset();
|
||||
}
|
||||
|
||||
int create_cos_handle(
|
||||
int ObCosWrapperHandle::create_cos_handle(
|
||||
ObCosMemAllocator &allocator,
|
||||
const qcloud_cos::ObCosAccount &cos_account,
|
||||
const bool check_md5,
|
||||
qcloud_cos::ObCosWrapper::Handle *&handle)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
handle = nullptr;
|
||||
qcloud_cos::OB_COS_customMem cos_mem = {ob_cos_malloc, ob_cos_free, &allocator};
|
||||
if (OB_FAIL(qcloud_cos::ObCosWrapper::create_cos_handle(cos_mem, cos_account,
|
||||
check_md5, &handle))) {
|
||||
const char *sts_data = sts_token_.get_data();
|
||||
if (OB_FAIL(qcloud_cos::ObCosWrapper::create_cos_handle(
|
||||
cos_mem, cos_account_, check_md5, sts_data, &handle))) {
|
||||
OB_LOG(WARN, "failed to create tmp cos handle", K(ret));
|
||||
} else if (OB_ISNULL(handle)) {
|
||||
ret = OB_OBJECT_STORAGE_IO_ERROR;
|
||||
OB_LOG(WARN, "create tmp handle succeed, but returned handle is null", K(ret));
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -131,7 +141,7 @@ int ObCosWrapperHandle::create_cos_handle(const bool check_md5)
|
||||
} else if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
OB_LOG(WARN, "handle is not inited", K(ret));
|
||||
} else if (OB_FAIL(common::create_cos_handle(allocator_, cos_account_, check_md5, handle_))) {
|
||||
} else if (OB_FAIL(create_cos_handle(allocator_, check_md5, handle_))) {
|
||||
OB_LOG(WARN, "failed to create cos handle", K(ret));
|
||||
}
|
||||
return ret;
|
||||
@ -149,7 +159,7 @@ int ObCosWrapperHandle::create_tmp_cos_handle(
|
||||
ret = OB_NOT_INIT;
|
||||
OB_LOG(WARN, "handle is not inited", K(ret));
|
||||
} else if (OB_FAIL(qcloud_cos::ObCosWrapper::create_cos_handle(cos_mem, cos_account_,
|
||||
check_md5, &handle))) {
|
||||
check_md5, sts_token_.data_, &handle))) {
|
||||
OB_LOG(WARN, "failed to create tmp cos handle", K(ret));
|
||||
} else if (OB_ISNULL(handle)) {
|
||||
ret = OB_OBJECT_STORAGE_IO_ERROR;
|
||||
|
@ -35,18 +35,11 @@ public:
|
||||
void *alloc(size_t size);
|
||||
void free(void *addr);
|
||||
void reuse();
|
||||
void reset();
|
||||
private:
|
||||
ObArenaAllocator allocator_;
|
||||
};
|
||||
|
||||
// Create a temporary cos_handle object,
|
||||
// utilizing the 'allocator' to allocate the necessary memory.
|
||||
int create_cos_handle(
|
||||
ObCosMemAllocator &allocator,
|
||||
const qcloud_cos::ObCosAccount &cos_account,
|
||||
const bool check_md5,
|
||||
qcloud_cos::ObCosWrapper::Handle *&handle);
|
||||
|
||||
class ObCosWrapperHandle
|
||||
{
|
||||
public:
|
||||
@ -55,6 +48,12 @@ public:
|
||||
|
||||
int init(const common::ObObjectStorageInfo *storage_info);
|
||||
void reset();
|
||||
// Create a temporary cos_handle object,
|
||||
// utilizing the 'allocator' to allocate the necessary memory.
|
||||
int create_cos_handle(
|
||||
ObCosMemAllocator &allocator,
|
||||
const bool check_md5,
|
||||
qcloud_cos::ObCosWrapper::Handle *&handle);
|
||||
int create_cos_handle(const bool check_md5);
|
||||
void destroy_cos_handle();
|
||||
qcloud_cos::ObCosWrapper::Handle *get_ptr() { return handle_; }
|
||||
@ -86,6 +85,7 @@ private:
|
||||
bool is_inited_;
|
||||
qcloud_cos::ObCosWrapper::Handle *handle_;
|
||||
qcloud_cos::ObCosAccount cos_account_;
|
||||
ObSTSToken sts_token_;
|
||||
ObCosMemAllocator allocator_;
|
||||
ObString bucket_name_;
|
||||
ObString object_name_;
|
||||
|
311
deps/oblib/src/lib/restore/hmac_signature.cpp
vendored
Normal file
311
deps/oblib/src/lib/restore/hmac_signature.cpp
vendored
Normal file
@ -0,0 +1,311 @@
|
||||
/**
|
||||
* Copyright (c) 2021 OceanBase
|
||||
* OceanBase CE is licensed under Mulan PubL v2.
|
||||
* You can use this software according to the terms and conditions of the Mulan PubL v2.
|
||||
* You may obtain a copy of Mulan PubL v2 at:
|
||||
* http://license.coscl.org.cn/MulanPubL-2.0
|
||||
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
||||
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
||||
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
||||
* See the Mulan PubL v2 for more details.
|
||||
*/
|
||||
|
||||
#include "hmac_signature.h"
|
||||
#include "lib/encode/ob_base64_encode.h"
|
||||
#include "lib/allocator/page_arena.h"
|
||||
#include "lib/container/ob_array_iterator.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
namespace common
|
||||
{
|
||||
|
||||
// Function to generate a signature nonce
|
||||
int generate_signature_nonce(char *nonce, const int64_t buf_size)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
// Get current timestamp in milliseconds
|
||||
const int64_t curr_time_us = ObTimeUtility::current_time();
|
||||
if (OB_ISNULL(nonce) || OB_UNLIKELY(buf_size <= 0)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
OB_LOG(WARN, "invalid args to generate signature nonce", K(ret), KP(nonce), K(buf_size));
|
||||
} else {
|
||||
int64_t random_num = ObRandom::rand(0, OB_MAX_STS_SIGNATURE_RAND_NUM);
|
||||
if (OB_FAIL(databuff_printf(nonce, buf_size, "%ld%ld", curr_time_us, random_num))) {
|
||||
OB_LOG(WARN, "failed to gen signature nonce", K(ret), K(nonce), K(buf_size), K(curr_time_us), K(random_num));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
// request_id is used as the identifier for connecting to ocp.
|
||||
// observer needs to ensure that request_id is unique and there are no other requirements.
|
||||
int generate_request_id(char *request_id, const int64_t buf_size)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
char signature_nonce[OB_MAX_STS_SIGNATURE_NONCE_LENTH] = {0};
|
||||
// Get current time
|
||||
if (OB_FAIL(generate_signature_nonce(signature_nonce, sizeof(signature_nonce)))) {
|
||||
OB_LOG(WARN, "generage signature nonce failed", K(ret), K(signature_nonce), K(sizeof(signature_nonce)));
|
||||
} else if (OB_FAIL(databuff_printf(request_id, buf_size, "observer-%s", signature_nonce))) {
|
||||
OB_LOG(WARN, "failed to generate request id", K(ret), K(request_id), K(buf_size), K(signature_nonce));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int base64_encoded(const char *input, const int64_t input_len, char *encoded_result, const int64_t encoded_result_buf_len)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t encoded_buf_len = ObBase64Encoder::needed_encoded_length(input_len);
|
||||
int64_t encoded_pos = 0;
|
||||
|
||||
if (OB_ISNULL(input) || OB_UNLIKELY(input_len <= 0) || OB_ISNULL(encoded_result) || OB_UNLIKELY(encoded_result_buf_len <= 0)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
OB_LOG(WARN, "invalid args to encode base64", K(ret), KP(input), K(input_len), KP(encoded_result), K(encoded_result_buf_len));
|
||||
} else if (OB_UNLIKELY(encoded_buf_len >= encoded_result_buf_len)) {
|
||||
ret = OB_SIZE_OVERFLOW;
|
||||
OB_LOG(WARN, "size overflow", K(ret), K(encoded_buf_len), K(encoded_result_buf_len));
|
||||
} else if (OB_FAIL(ObBase64Encoder::encode((const uint8_t *)input, input_len, encoded_result, encoded_result_buf_len, encoded_pos))) {
|
||||
OB_LOG(WARN, "encode base64 fails", K(ret), KP(input), K(input_len), KP(encoded_result), K(encoded_result_buf_len),
|
||||
K(encoded_buf_len), K(encoded_pos));
|
||||
} else if (OB_UNLIKELY(encoded_pos >= encoded_result_buf_len)) {
|
||||
ret = OB_SIZE_OVERFLOW;
|
||||
OB_LOG(WARN, "size overflow", K(ret), K(encoded_pos), K(encoded_result_buf_len));
|
||||
} else {
|
||||
encoded_result[encoded_pos] = '\0';
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int percent_encode(const char *content, char *buf, const int64_t buf_len)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_ISNULL(content) || OB_ISNULL(buf) || OB_UNLIKELY(buf_len <= 0)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
OB_LOG(WARN, "invalid argument", K(ret), KP(content), KP(buf), K(buf_len));
|
||||
} else {
|
||||
char *p = buf;
|
||||
const int64_t content_len = strlen(content);
|
||||
int64_t num = 0;
|
||||
const char *hex = "0123456789ABCDEF";
|
||||
int64_t triple_num = 0;
|
||||
if (OB_UNLIKELY(content_len <= 0)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
OB_LOG(WARN, "invalid content", K(ret), K(content_len));
|
||||
}
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < content_len; ++i) {
|
||||
char c = content[i];
|
||||
if (std::isalnum(c) || c == '-' || c == '_' || c == '.' || c == '~') {
|
||||
num++;
|
||||
}
|
||||
}
|
||||
triple_num = content_len - num;
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_UNLIKELY(content_len + 2 * triple_num >= buf_len)) {
|
||||
ret = OB_SIZE_OVERFLOW;
|
||||
OB_LOG(WARN, "content is too long", K(ret), KP(content), K(content_len), K(triple_num), K(buf_len));
|
||||
} else {
|
||||
for (uint64_t i = 0; OB_SUCC(ret) && i < content_len; ++i) {
|
||||
char c = content[i];
|
||||
if (std::isalnum(c) || c == '-' || c == '_' || c == '.' || c == '~') {
|
||||
*p++ = c;
|
||||
} else if (c == ' ') {
|
||||
*p++ = '%';
|
||||
*p++ = '2';
|
||||
*p++ = '0';
|
||||
} else {
|
||||
*p++ = '%';
|
||||
*p++ = hex[(c >> 4) & 0xF];
|
||||
*p++ = hex[c & 0xF];
|
||||
}
|
||||
}
|
||||
*p = '\0';
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
// The signature generated by hmac_sha1 may contain \0,
|
||||
// result_len is used to record the actual length of the signature after hmac_sha1
|
||||
int hmac_sha1(const char *key, const char *data, char *encoded_result, const int64_t encode_buf_len,
|
||||
uint32_t &result_len)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
unsigned char *result = nullptr;
|
||||
ObArenaAllocator allocator("hmac_sha1");
|
||||
// The buffer should have at least EVP_MAX_MD_SIZE bytes of space to ensure that it can store the
|
||||
// output of any hashing algorithm.
|
||||
if (OB_ISNULL(key) || OB_ISNULL(data) || OB_ISNULL(encoded_result)
|
||||
|| OB_UNLIKELY(encode_buf_len < EVP_MAX_MD_SIZE)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
OB_LOG(WARN, "invalid argument", K(ret), KP(key), KP(data), KP(encoded_result),
|
||||
K(encode_buf_len), K(result_len));
|
||||
}
|
||||
// The max length of hmac_sha1 is 20
|
||||
else if (OB_ISNULL(result = static_cast<unsigned char *>(allocator.alloc(sizeof(char) * 20)))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
OB_LOG(WARN, "failed to malloc", K(ret));
|
||||
} else {
|
||||
HMAC_CTX *hmac = HMAC_CTX_new();
|
||||
if (OB_ISNULL(hmac)) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
OB_LOG(WARN, "failed to allocate memory", K(ret));
|
||||
} else if (OB_UNLIKELY(1 != HMAC_Init_ex(hmac, key, strlen(key), EVP_sha1(), nullptr))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
OB_LOG(WARN, "failed to init hmac", K(ret), KP(key), K(strlen(key)));
|
||||
} else {
|
||||
if (OB_UNLIKELY(1 != HMAC_Update(hmac, reinterpret_cast<const unsigned char *>(data), strlen(data)))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
OB_LOG(WARN, "failed to update hmac", K(ret), KP(data), K(strlen(data)));
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
if (OB_UNLIKELY(1 != HMAC_Final(hmac, result, &result_len))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
OB_LOG(WARN, "failed to final hmac", K(ret), KP(result), K(result_len));
|
||||
} else if (OB_UNLIKELY(result_len >= encode_buf_len)) {
|
||||
ret = OB_SIZE_OVERFLOW;
|
||||
OB_LOG(WARN, "encoded result is too long", K(ret), K(result_len), K(encode_buf_len));
|
||||
} else {
|
||||
MEMCPY(encoded_result, result, result_len);
|
||||
encoded_result[result_len] = '\0';
|
||||
}
|
||||
}
|
||||
if (OB_NOT_NULL(hmac)) {
|
||||
HMAC_CTX_free(hmac);
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
struct Compare
|
||||
{
|
||||
// Sort lexicographically by key
|
||||
bool operator()(const std::pair<const char *, const char *> &a, const std::pair<const char *, const char *> &b) const
|
||||
{
|
||||
// Check if either key is NULL
|
||||
bool flag = false;
|
||||
if (OB_ISNULL(a.first) && OB_ISNULL(b.first)) {
|
||||
flag = false; // Both are NULL, considered equal
|
||||
} else if (OB_ISNULL(a.first)) {
|
||||
flag = true; // NULL is considered less than any non-NULL string
|
||||
} else if (OB_ISNULL(b.first)) {
|
||||
flag = false; // Any non-NULL string is considered greater than NULL
|
||||
} else {
|
||||
int key_cmp = strcmp(a.first, b.first);
|
||||
if (key_cmp < 0) {
|
||||
flag = true;
|
||||
} else if (key_cmp > 0) {
|
||||
flag = false;
|
||||
} else {
|
||||
// If keys are equal, compare values
|
||||
// Check if either value is NULL
|
||||
if (OB_ISNULL(a.second) && OB_ISNULL(b.second)) {
|
||||
flag = false; // Both are NULL, considered equal
|
||||
} else if (OB_ISNULL(a.second)) {
|
||||
flag = true; // NULL is considered less than any non-NULL string
|
||||
} else if (OB_ISNULL(b.second)) {
|
||||
flag = false; // Any non-NULL string is considered greater than NULL
|
||||
} else {
|
||||
flag = strcmp(a.second, b.second) < 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
return flag;
|
||||
}
|
||||
};
|
||||
|
||||
int sign_request(
|
||||
ObArray<std::pair<const char *, const char *>> params,
|
||||
const char *method, char *signature,
|
||||
const int64_t signature_buf_len)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_UNLIKELY(params.count() <= 0) || OB_ISNULL(method) || OB_ISNULL(signature)
|
||||
|| OB_UNLIKELY(signature_buf_len <= 0)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
OB_LOG(WARN, "invalid argument", K(ret), K(params.count()), KP(method), KP(signature),
|
||||
K(signature_buf_len));
|
||||
} else {
|
||||
// Sort the parameters by key
|
||||
lib::ob_sort(params.begin(), params.end(), Compare());
|
||||
// Make the string to sign
|
||||
const char *sk = nullptr;
|
||||
// Currently, the longest key used for signature will not exceed 64
|
||||
const int64_t OB_MAX_HMAC_ENCODED_KEY = 64;
|
||||
constexpr int64_t OB_MAX_HMAC_ENCODED_VALUE = MAX(OB_MAX_STS_SIGNATURE_NONCE_LENTH,
|
||||
MAX(OB_MAX_STS_REQUEST_ID_LENTH, MAX(OB_MAX_ROLE_ARN_LENGTH, MAX(OB_MAX_STS_AK_LENGTH, OB_MAX_STS_SK_LENGTH))));
|
||||
char encoded_key[OB_MAX_HMAC_ENCODED_KEY] = {0};
|
||||
char encoded_value[OB_MAX_HMAC_ENCODED_VALUE] = {0};
|
||||
char delimiter[4] = {0};
|
||||
char params_to_sign[OB_MAX_STS_CONCAT_LENGTH] = {0};
|
||||
char concat_params[OB_MAX_STS_CONCAT_LENGTH] = {0};
|
||||
char sign_result[OB_MAX_STS_CONCAT_LENGTH] = {0};
|
||||
char encoded_result[OB_MAX_STS_SIGNATURE_LENGTH] = {0};
|
||||
uint32_t sign_result_len = 0;
|
||||
ObArenaAllocator allocator("hmac_signature");
|
||||
ObStringBuffer encoded_params(&allocator);
|
||||
|
||||
const char *first = nullptr;
|
||||
const char *second = nullptr;
|
||||
// e.g.Action=GetResourceSTSCredential&CloudProvider=aliyun&RequestId=obrequest&RequestSource=OBSERVER
|
||||
// &ResourceAccount=oceanbase&ResourceType=OSS&SignatureNonce=xxxxx
|
||||
for (uint64_t i = 0; OB_SUCC(ret) && i < params.count(); ++i) {
|
||||
first = params.at(i).first;
|
||||
second = params.at(i).second;
|
||||
if (OB_ISNULL(first) || OB_ISNULL(second)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
OB_LOG(WARN, "params is not allowed to be nullptr", K(ret), K(first), KP(second), K(i));
|
||||
} else if (MEMCMP(first, "ObSignature", strlen(first)) == 0
|
||||
|| MEMCMP(first, "ObSignatureKey", strlen(first)) == 0
|
||||
|| MEMCMP(first, "ObSignatureSecret", strlen(first)) == 0
|
||||
|| MEMCMP(first, "UID", strlen(first)) == 0
|
||||
|| MEMCMP(first, "OpSource", strlen(first)) == 0) {
|
||||
if (MEMCMP(first, "ObSignatureSecret", strlen(first)) == 0) {
|
||||
sk = second;
|
||||
}
|
||||
continue;
|
||||
} else if (OB_FAIL(percent_encode(first, encoded_key, sizeof(encoded_key)))) {
|
||||
OB_LOG(WARN, "failed to percent encode", K(ret), K(i), K(first));
|
||||
} else if (OB_FAIL(percent_encode(second, encoded_value, sizeof(encoded_value)))) {
|
||||
OB_LOG(WARN, "failed to percent encode", K(ret), K(i), KP(second));
|
||||
} else if (i != 0 && OB_FAIL(encoded_params.append("&"))) {
|
||||
OB_LOG(WARN, "failed to append", K(ret), K(i));
|
||||
} else if (OB_FAIL(encoded_params.append(encoded_key))) {
|
||||
OB_LOG(WARN, "failed to append", K(ret), KP(encoded_key));
|
||||
} else if (OB_FAIL(encoded_params.append("="))) {
|
||||
OB_LOG(WARN, "failed to append", K(ret));
|
||||
} else if (OB_FAIL(encoded_params.append(encoded_value))) {
|
||||
OB_LOG(WARN, "failed to append", K(ret), KP(encoded_value));
|
||||
}
|
||||
}
|
||||
// e.g. params_to_sign="POST&%2F&Action%3DGetResourceSTSCredential%26CloudProvider%3Daliyun%26
|
||||
// RequestId%3Dobrequest%26RequestSource%3DOBSERVER%26ResourceAccount%3Doceanbase%26ResourceType%3DOSS%26SignatureNonce%3Dxxxxxx"
|
||||
if (FAILEDx(percent_encode("/", delimiter, sizeof(delimiter)))) {
|
||||
OB_LOG(WARN, "failed to percent encode", K(ret), K(delimiter), K(sizeof(delimiter)));
|
||||
} else if (OB_FAIL(percent_encode(encoded_params.ptr(), concat_params, sizeof(concat_params)))) {
|
||||
OB_LOG(WARN, "failed to percent encode", K(ret), KP(encoded_params.ptr()), K(encoded_params.length()), KP(concat_params));
|
||||
} else if (OB_FAIL(databuff_printf(
|
||||
params_to_sign, sizeof(params_to_sign), "%s&%s&%s", method, delimiter, concat_params))) {
|
||||
OB_LOG(WARN, "failed to percent encode", K(ret), K(method), K(delimiter), KP(concat_params), KP(params_to_sign));
|
||||
}
|
||||
// signature with HMAC-SHA1
|
||||
else if (OB_FAIL(hmac_sha1(sk, params_to_sign, sign_result, sizeof(sign_result), sign_result_len))) {
|
||||
OB_LOG(WARN, "failed to hmac sha1", K(ret), KP(params_to_sign), K(sign_result), K(sign_result_len));
|
||||
} else if (OB_FAIL(base64_encoded(sign_result, sign_result_len, encoded_result, sizeof(encoded_result)))) {
|
||||
OB_LOG(WARN, "failed to base64 encode", K(ret), K(sign_result));
|
||||
}
|
||||
const int64_t len = strlen(encoded_result);
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (len >= signature_buf_len) {
|
||||
ret = OB_SIZE_OVERFLOW;
|
||||
OB_LOG(WARN, "signature is too long", K(ret), K(signature_buf_len), K(len));
|
||||
} else {
|
||||
MEMCPY(signature, encoded_result, len);
|
||||
signature[len] = '\0';
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
} // namespace common
|
||||
} // namespace oceanbase
|
47
deps/oblib/src/lib/restore/hmac_signature.h
vendored
Normal file
47
deps/oblib/src/lib/restore/hmac_signature.h
vendored
Normal file
@ -0,0 +1,47 @@
|
||||
/**
|
||||
* Copyright (c) 2021 OceanBase
|
||||
* OceanBase CE is licensed under Mulan PubL v2.
|
||||
* You can use this software according to the terms and conditions of the Mulan PubL v2.
|
||||
* You may obtain a copy of Mulan PubL v2 at:
|
||||
* http://license.coscl.org.cn/MulanPubL-2.0
|
||||
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
||||
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
||||
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
||||
* See the Mulan PubL v2 for more details.
|
||||
*/
|
||||
#ifndef OB_HMAC_SIGNATURE_H_
|
||||
#define OB_HMAC_SIGNATURE_H_
|
||||
#include <stdlib.h>
|
||||
#include <stdio.h>
|
||||
#include <time.h>
|
||||
#include <openssl/hmac.h>
|
||||
#include <openssl/sha.h>
|
||||
#include <openssl/evp.h>
|
||||
#include <openssl/bio.h>
|
||||
#include "lib/container/ob_array.h"
|
||||
#include "lib/allocator/ob_malloc.h"
|
||||
#include "lib/string/ob_string_buffer.h"
|
||||
#include "share/rc/ob_tenant_base.h"
|
||||
#include "lib/restore/ob_storage_info.h"
|
||||
namespace oceanbase
|
||||
{
|
||||
namespace common
|
||||
{
|
||||
int generate_signature_nonce(char *nonce, const int64_t buf_size);
|
||||
|
||||
int generate_request_id(char *request_id, const int64_t buf_size);
|
||||
|
||||
int base64_encoded(const char *input, const int64_t input_len, char *encoded_result,
|
||||
const int64_t encoded_result_buf_len);
|
||||
|
||||
int percent_encode(const char *content, char *buf, const int64_t buf_len);
|
||||
|
||||
int hmac_sha1(const char *key, const char *data, char *encoded_result, const int64_t encode_buf_len,
|
||||
uint32_t &result_len);
|
||||
|
||||
int sign_request(ObArray<std::pair<const char *, const char *>> params, const char *method,
|
||||
char *signature, const int64_t signature_buf_len);
|
||||
|
||||
} // namespace common
|
||||
} // namespace oceanbase
|
||||
#endif // OB_HMAC_SIGNATURE_H_
|
41
deps/oblib/src/lib/restore/ob_i_storage.cpp
vendored
41
deps/oblib/src/lib/restore/ob_i_storage.cpp
vendored
@ -339,6 +339,25 @@ int record_failed_files_idx(const hash::ObHashMap<ObString, int64_t> &files_to_d
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ob_set_field(const char *value, char *field, const uint32_t field_length)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_ISNULL(value) || OB_ISNULL(field)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
OB_LOG(WARN, "invalid arguments", K(ret), KP(value), KP(field));
|
||||
} else {
|
||||
const int64_t value_len = strlen(value);
|
||||
if (value_len >= field_length) {
|
||||
ret = OB_SIZE_OVERFLOW;
|
||||
OB_LOG(WARN, "value is too long", K(ret), KP(value), K(value_len), K(field_length));
|
||||
} else {
|
||||
MEMCPY(field, value, value_len);
|
||||
field[value_len] = '\0';
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ob_apr_abort_fn(int retcode)
|
||||
{
|
||||
int ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
@ -846,6 +865,27 @@ ObObjectStorageGuard::ObObjectStorageGuard(
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// when accessing the object storage, if the error code returned is OB_BACKUP_PERMISSION_DENIED,
|
||||
// it may be due to expired temporary ak/sk
|
||||
// attempt to refresh the temporary ak/sk, and if the refresh fails,
|
||||
// only log the error message to avoid overriding the original error code.
|
||||
static void try_refresh_device_credential(
|
||||
const int ob_errcode, const ObObjectStorageInfo *storage_info)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (ob_errcode != OB_OBJECT_STORAGE_PERMISSION_DENIED) {
|
||||
// do nothing
|
||||
} else if (OB_ISNULL(storage_info) || OB_UNLIKELY(!storage_info->is_valid())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
OB_LOG(WARN, "invalid argument", K(ret), K(ob_errcode), KPC(storage_info));
|
||||
} else if (storage_info->is_assume_role_mode()
|
||||
&& OB_FAIL(ObDeviceCredentialMgr::get_instance().curl_credential(
|
||||
*storage_info, true /*update_access_time*/))) {
|
||||
OB_LOG(WARN, "failed to refresh credential", K(ret), K(ob_errcode), KPC(storage_info));
|
||||
}
|
||||
}
|
||||
|
||||
void ObObjectStorageGuard::print_access_storage_log_() const
|
||||
{
|
||||
const int64_t cost_time_us = ObTimeUtility::current_time() - start_time_us_;
|
||||
@ -880,6 +920,7 @@ bool ObObjectStorageGuard::is_slow_io_(const int64_t cost_time_us) const
|
||||
ObObjectStorageGuard::~ObObjectStorageGuard()
|
||||
{
|
||||
print_access_storage_log_();
|
||||
try_refresh_device_credential(ob_errcode_, storage_info_);
|
||||
lib::ObMallocHookAttrGuard::~ObMallocHookAttrGuard();
|
||||
}
|
||||
|
||||
|
1
deps/oblib/src/lib/restore/ob_i_storage.h
vendored
1
deps/oblib/src/lib/restore/ob_i_storage.h
vendored
@ -72,6 +72,7 @@ int check_files_map_validity(const hash::ObHashMap<ObString, int64_t> &files_to_
|
||||
// record all files's idx remained in files_to_delete
|
||||
int record_failed_files_idx(const hash::ObHashMap<ObString, int64_t> &files_to_delete,
|
||||
ObIArray<int64_t> &failed_files_idx);
|
||||
int ob_set_field(const char *value, char *field, const uint32_t field_length);
|
||||
int ob_apr_abort_fn(int retcode);
|
||||
|
||||
struct ObStorageObjectMetaBase
|
||||
|
101
deps/oblib/src/lib/restore/ob_storage.cpp
vendored
101
deps/oblib/src/lib/restore/ob_storage.cpp
vendored
@ -669,43 +669,53 @@ int ObStorageUtil::read_seal_meta_if_needed(
|
||||
seal_meta_uri, OB_MAX_URI_LENGTH))) {
|
||||
OB_LOG(WARN, "fail to construct s3 seal_meta name", K(ret), K(uri));
|
||||
} else {
|
||||
ObStorageReader reader;
|
||||
int64_t seal_meta_len = 0;
|
||||
if (OB_FAIL(reader.open(seal_meta_uri, storage_info_))) {
|
||||
if (OB_OBJECT_NOT_EXIST == ret) {
|
||||
obj_meta.is_exist_ = false;
|
||||
ret = OB_SUCCESS;
|
||||
}
|
||||
ObStorageReader *reader = nullptr;
|
||||
if (OB_ISNULL(reader = static_cast<ObStorageReader *>(allocator.alloc(sizeof(ObStorageReader))))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
OB_LOG(WARN, "fail to alloc buf for reader", K(ret));
|
||||
} else if (FALSE_IT(new (reader) ObStorageReader())) {
|
||||
} else {
|
||||
// If exist seal meta, directly read it content.
|
||||
seal_meta_len = reader.get_length();
|
||||
if (seal_meta_len > 0) {
|
||||
int64_t read_size = 0;
|
||||
char *buf = NULL;
|
||||
pos = 0;
|
||||
if (OB_ISNULL(buf = static_cast<char *>(allocator.alloc(seal_meta_len + 1)))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
OB_LOG(WARN, "fail to alloc buf for reading seal meta file", K(ret), K(seal_meta_uri), K(seal_meta_len));
|
||||
} else if (OB_FAIL(reader.pread(buf, seal_meta_len, 0/*offset*/, read_size))) {
|
||||
OB_LOG(WARN, "failed to read seal meta file content", K(ret), K(seal_meta_uri), K(seal_meta_len));
|
||||
} else if (OB_UNLIKELY(seal_meta_len != read_size)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
OB_LOG(WARN, "fail to read seal meta file entire content",
|
||||
K(ret), K(seal_meta_uri), K(seal_meta_len), K(read_size));
|
||||
} else if (OB_FAIL(obj_meta.deserialize(buf, read_size, pos))) {
|
||||
OB_LOG(WARN, "fail to deserialize storage object meta", K(ret), K(seal_meta_uri), K(read_size), KP(buf));
|
||||
} else {
|
||||
obj_meta.is_exist_ = true;
|
||||
int64_t seal_meta_len = 0;
|
||||
if (OB_FAIL(reader->open(seal_meta_uri, storage_info_))) {
|
||||
if (OB_OBJECT_NOT_EXIST == ret) {
|
||||
obj_meta.is_exist_ = false;
|
||||
ret = OB_SUCCESS;
|
||||
}
|
||||
} else {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
OB_LOG(WARN, "the seal meta file is empty", K(ret), K(seal_meta_uri));
|
||||
}
|
||||
// If exist seal meta, directly read it content.
|
||||
seal_meta_len = reader->get_length();
|
||||
if (seal_meta_len > 0) {
|
||||
int64_t read_size = 0;
|
||||
char *buf = NULL;
|
||||
pos = 0;
|
||||
if (OB_ISNULL(buf = static_cast<char *>(allocator.alloc(seal_meta_len + 1)))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
OB_LOG(WARN, "fail to alloc buf for reading seal meta file", K(ret), K(seal_meta_uri), K(seal_meta_len));
|
||||
} else if (OB_FAIL(reader->pread(buf, seal_meta_len, 0/*offset*/, read_size))) {
|
||||
OB_LOG(WARN, "failed to read seal meta file content", K(ret), K(seal_meta_uri), K(seal_meta_len));
|
||||
} else if (OB_UNLIKELY(seal_meta_len != read_size)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
OB_LOG(WARN, "fail to read seal meta file entire content",
|
||||
K(ret), K(seal_meta_uri), K(seal_meta_len), K(read_size));
|
||||
} else if (OB_FAIL(obj_meta.deserialize(buf, read_size, pos))) {
|
||||
OB_LOG(WARN, "fail to deserialize storage object meta", K(ret), K(seal_meta_uri), K(read_size), KP(buf));
|
||||
} else {
|
||||
obj_meta.is_exist_ = true;
|
||||
}
|
||||
} else {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
OB_LOG(WARN, "the seal meta file is empty", K(ret), K(seal_meta_uri));
|
||||
}
|
||||
|
||||
if (OB_TMP_FAIL(reader.close())) {
|
||||
OB_LOG(WARN, "fail to close reader", K(ret), K(tmp_ret), K(seal_meta_uri));
|
||||
if (OB_TMP_FAIL(reader->close())) {
|
||||
OB_LOG(WARN, "fail to close reader", K(ret), K(tmp_ret), K(seal_meta_uri));
|
||||
}
|
||||
}
|
||||
}
|
||||
if (OB_NOT_NULL(reader)) {
|
||||
reader->~ObStorageReader();
|
||||
reader = nullptr;
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -2361,6 +2371,8 @@ int ObStorageWriter::open(const common::ObString &uri, common::ObObjectStorageIn
|
||||
STORAGE_LOG(WARN, "writer_ is null", K(ret), K(uri));
|
||||
} else if (OB_FAIL(writer_->open(uri, storage_info))) {
|
||||
STORAGE_LOG(WARN, "failed to open writer", K(ret), K(uri));
|
||||
} else {
|
||||
storage_info_ = storage_info;
|
||||
}
|
||||
}
|
||||
|
||||
@ -2534,31 +2546,35 @@ int ObStorageAppender::repeatable_pwrite_(const char *buf, const int64_t size, c
|
||||
int64_t read_buf_size = 0;
|
||||
int64_t actual_write_offset = 0;
|
||||
char *read_buffer = nullptr;
|
||||
ObStorageReader reader;
|
||||
ObStorageReader *reader = nullptr;
|
||||
ObArenaAllocator allocator;
|
||||
|
||||
if (OB_ISNULL(appender_)) {
|
||||
ret = OB_NOT_INIT;
|
||||
STORAGE_LOG(WARN, "not opened", K(ret));
|
||||
} else if (OB_FAIL(reader.open(uri_, storage_info_))) {
|
||||
} else if (OB_ISNULL(reader = static_cast<ObStorageReader *>(allocator.alloc(sizeof(ObStorageReader))))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
OB_LOG(WARN, "fail to alloc buf for reader", K(ret));
|
||||
} else if(FALSE_IT(new (reader) ObStorageReader())) {
|
||||
} else if (OB_FAIL(reader->open(uri_, storage_info_))) {
|
||||
STORAGE_LOG(WARN, "failed to open reader", K(ret));
|
||||
} else if (reader.get_length() <= offset) {
|
||||
} else if (reader->get_length() <= offset) {
|
||||
// This situation also has concurrency issues.
|
||||
// The length read by the reader may be old, so offset not match needs to be returned for retry.
|
||||
ret = OB_OBJECT_STORAGE_PWRITE_OFFSET_NOT_MATCH;
|
||||
STORAGE_LOG(WARN, "offset is invalid", K(offset), "length", reader.get_length(), K(ret));
|
||||
} else if (OB_FALSE_IT(actual_write_offset = reader.get_length() - offset)) {
|
||||
STORAGE_LOG(WARN, "offset is invalid", K(offset), "length", reader->get_length(), K(ret));
|
||||
} else if (OB_FALSE_IT(actual_write_offset = reader->get_length() - offset)) {
|
||||
} else if (OB_FALSE_IT(read_buf_size = std::min(actual_write_offset, size))) {
|
||||
} else if (OB_ISNULL(read_buffer = static_cast<char *>(allocator.alloc(read_buf_size)))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
OB_LOG(WARN, "failed to allocate memory", K(ret), K(size));
|
||||
} else if (OB_FAIL(reader.pread(read_buffer, read_buf_size, offset, read_size))) {
|
||||
} else if (OB_FAIL(reader->pread(read_buffer, read_buf_size, offset, read_size))) {
|
||||
STORAGE_LOG(WARN, "failed to pread", K(ret));
|
||||
} else if (0 != MEMCMP(buf, read_buffer, read_buf_size)) {
|
||||
ret = OB_OBJECT_STORAGE_PWRITE_CONTENT_NOT_MATCH;
|
||||
STORAGE_LOG(WARN, "data inconsistent", K(ret));
|
||||
} else if (offset + size > reader.get_length()) {
|
||||
if (OB_FAIL(appender_->pwrite(buf + actual_write_offset, size - actual_write_offset, reader.get_length()))) {
|
||||
} else if (offset + size > reader->get_length()) {
|
||||
if (OB_FAIL(appender_->pwrite(buf + actual_write_offset, size - actual_write_offset, reader->get_length()))) {
|
||||
if (OB_OBJECT_STORAGE_PWRITE_OFFSET_NOT_MATCH == ret) {
|
||||
ret = OB_IO_ERROR;
|
||||
STORAGE_LOG(WARN, "There may be concurrency problems that require the caller to retry", K(ret));
|
||||
@ -2566,10 +2582,13 @@ int ObStorageAppender::repeatable_pwrite_(const char *buf, const int64_t size, c
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_SUCCESS != (tmp_ret = reader.close())) {
|
||||
if (OB_SUCCESS != (tmp_ret = reader->close())) {
|
||||
STORAGE_LOG(WARN, "failed to close reader", K(tmp_ret));
|
||||
}
|
||||
|
||||
if (OB_NOT_NULL(reader)) {
|
||||
reader->~ObStorageReader();
|
||||
reader = nullptr;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
@ -935,9 +935,8 @@ int ObStorageCosReader::pread(
|
||||
// To maintain thread safety, a new temporary cos_handle should be created for each individual
|
||||
// pread operation rather than reusing the same handle. This approach ensures that memory
|
||||
// allocation is safely performed without conflicts across concurrent operations.
|
||||
} else if (OB_FAIL(create_cos_handle(
|
||||
allocator, handle_.get_cos_account(),
|
||||
checksum_type_ == ObStorageChecksumType::OB_MD5_ALGO, tmp_cos_handle))) {
|
||||
} else if (OB_FAIL(handle_.create_cos_handle(
|
||||
allocator, checksum_type_ == ObStorageChecksumType::OB_MD5_ALGO, tmp_cos_handle))) {
|
||||
OB_LOG(WARN, "fail to create tmp cos handle", K(ret), K_(checksum_type));
|
||||
} else {
|
||||
// When is_range_read is true, it indicates that only a part of the data is read.
|
||||
|
1265
deps/oblib/src/lib/restore/ob_storage_info.cpp
vendored
1265
deps/oblib/src/lib/restore/ob_storage_info.cpp
vendored
File diff suppressed because it is too large
Load Diff
288
deps/oblib/src/lib/restore/ob_storage_info.h
vendored
288
deps/oblib/src/lib/restore/ob_storage_info.h
vendored
@ -18,6 +18,10 @@
|
||||
#include "common/storage/ob_device_common.h"
|
||||
#include "lib/utility/ob_print_utils.h"
|
||||
#include "lib/utility/ob_unify_serialize.h"
|
||||
#include <curl/curl.h>
|
||||
#include "lib/hash/ob_hashmap.h"
|
||||
#include "lib/allocator/page_arena.h"
|
||||
#include "lib/string/ob_string.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
@ -29,10 +33,36 @@ const int64_t OB_MAX_BACKUP_EXTENSION_LENGTH = 512;
|
||||
const int64_t OB_MAX_BACKUP_ENDPOINT_LENGTH = 256;
|
||||
const int64_t OB_MAX_BACKUP_ACCESSID_LENGTH = 256;
|
||||
const int64_t OB_MAX_BACKUP_ACCESSKEY_LENGTH = 256;
|
||||
const int64_t OB_MAX_BACKUP_STORAGE_INFO_LENGTH = 1536;
|
||||
const int64_t OB_MAX_BACKUP_STORAGE_INFO_LENGTH = 1600;
|
||||
// OB_MAX_DEVICE_KEY_LENGTH = OB_MAX_BACKUP_STORAGE_INFO_LENGTH + strlen("&storage_type=x")
|
||||
const int64_t OB_MAX_DEVICE_KEY_LENGTH = OB_MAX_BACKUP_STORAGE_INFO_LENGTH + 15;
|
||||
const int64_t OB_MAX_BACKUP_ENCRYPTKEY_LENGTH = OB_MAX_BACKUP_ACCESSKEY_LENGTH + 32;
|
||||
const int64_t OB_MAX_BACKUP_SERIALIZEKEY_LENGTH = OB_MAX_BACKUP_ENCRYPTKEY_LENGTH * 2;
|
||||
// We have agreed with OCP that the maximum role_arn length shall not exceed 256
|
||||
static constexpr int64_t OB_MAX_ROLE_ARN_LENGTH = 256;
|
||||
// The limit on the maximum length of external_id in obs/cos/oss/s3 is 128
|
||||
static constexpr int64_t OB_MAX_EXTERNAL_ID_LENGTH = 128;
|
||||
static constexpr int64_t OB_MAX_ASSUME_ROLE_JSON_DATA_LENGTH = 1024;
|
||||
// STS_AK and STS_SK are used to connect to STS service of OCP.
|
||||
// We have agreed with ocp that the maximum length of sts_sk/sts_ak is 32.
|
||||
// And the maximum length of sts_url is 512.
|
||||
static constexpr int64_t OB_MAX_STS_AK_LENGTH = 64;
|
||||
static constexpr int64_t OB_MAX_STS_SK_LENGTH = 64;
|
||||
static constexpr int64_t OB_MAX_STS_URL_LENGTH = 512;
|
||||
static constexpr int64_t OB_PREDEFINED_STS_TOKEN_LENGTH = 1024;
|
||||
static constexpr int64_t OB_MAX_STS_CREDENTIAL_LENGTH = OB_MAX_STS_AK_LENGTH + OB_MAX_STS_SK_LENGTH + OB_MAX_STS_URL_LENGTH;
|
||||
|
||||
static constexpr int64_t OB_MAX_STS_SIGNATURE_LENGTH = 64;
|
||||
static constexpr int64_t OB_MAX_STS_CONCAT_LENGTH = 512;
|
||||
static constexpr int64_t OB_MAX_STS_SIGNATURE_NONCE_LENTH = 64;
|
||||
static constexpr int64_t OB_MAX_STS_SIGNATURE_RAND_NUM = 10000;
|
||||
static constexpr int64_t OB_MAX_STS_REQUEST_ID_LENTH = 64;
|
||||
static constexpr int64_t OB_MAX_STS_CURL_CONNECTTIMEOUT_MS = 10000; // 10s
|
||||
static constexpr int64_t OB_MAX_STS_CURL_TIMEOUT_SECONDS = 10; // 10s
|
||||
|
||||
// To ensure that the temporary credentials in the credential map are always valid,
|
||||
// the credentials are refreshed every 20 minutes.
|
||||
static constexpr int64_t CREDENTIAL_TASK_SCHEDULE_INTERVAL_US = 1200LL * 1000LL * 1000LL; // 20min
|
||||
const char *const ACCESS_ID = "access_id=";
|
||||
const char *const ACCESS_KEY = "access_key=";
|
||||
const char *const HOST = "host=";
|
||||
@ -51,6 +81,21 @@ const char *const CHECKSUM_TYPE_NO_CHECKSUM = "no_checksum";
|
||||
const char *const CHECKSUM_TYPE_MD5 = "md5";
|
||||
const char *const CHECKSUM_TYPE_CRC32 = "crc32";
|
||||
|
||||
const char *const ROLE_ARN = "role_arn=";
|
||||
const char *const EXTERNAL_ID = "external_id=";
|
||||
const char *const STS_AK = "sts_ak=";
|
||||
const char *const STS_SK = "sts_sk=";
|
||||
const char *const STS_URL = "sts_url=";
|
||||
const char *const OSS_ROLE_ARN_PREFIX = "acs";
|
||||
const char *const OBS_ROLE_ARN_PREFIX = "iam";
|
||||
const char *const S3_ROLE_ARN_PREFIX = "arn";
|
||||
const char *const COS_ROLE_ARN_PREFIX = "qcs";
|
||||
const char *const STS_ACTION = "GetResourceSTSCredential";
|
||||
const char *const STS_RESOURCE_SOURCE = "OBSERVER";
|
||||
|
||||
const char *const OB_DEVICE_CREDENTIAL_ALLOCATOR = "ObjDeviceCredentialAlloc";
|
||||
static constexpr int64_t MAX_CREDENTIAL_IDLE_DURATION_US = 24 * 3600 * 1000 * 1000L; // 24h
|
||||
|
||||
enum ObStorageAddressingModel
|
||||
{
|
||||
OB_VIRTUAL_HOSTED_STYLE = 0,
|
||||
@ -68,13 +113,80 @@ bool is_oss_supported_checksum(const ObStorageChecksumType checksum_type);
|
||||
bool is_cos_supported_checksum(const ObStorageChecksumType checksum_type);
|
||||
bool is_s3_supported_checksum(const ObStorageChecksumType checksum_type);
|
||||
const char *get_storage_checksum_type_str(const ObStorageChecksumType &type);
|
||||
|
||||
// [Extensions]
|
||||
// load_data_* : sql/engine/cmd/ob_load_data_storage_info.h
|
||||
|
||||
struct ObSTSToken
|
||||
{
|
||||
ObSTSToken();
|
||||
virtual ~ObSTSToken();
|
||||
TO_STRING_KV(KP_(data), KP_(data_arr), K_(is_below_predefined_length));
|
||||
void reset();
|
||||
int set(const ObString &token);
|
||||
int assign(const ObSTSToken &token);
|
||||
const char *get_data() const;
|
||||
// "阿里云STS服务返回的安全令牌(STS Token)的长度不固定,强烈建议您不要假设安全令牌的最大长度。"
|
||||
// therefore, use allocator to alloc mem for sts_token dynamically
|
||||
int64_t length() const {return data_len_;};
|
||||
bool is_valid() const;
|
||||
bool is_below_predefined_length() const {return is_below_predefined_length_;};
|
||||
char *data_;
|
||||
ObArenaAllocator allocator_;
|
||||
// Since cloud vendors all claim that the length of sts_token is variable, we previously used
|
||||
// allocator for memory allocation. But later we found that allocating memory every time would
|
||||
// increase the cpu consumption, so we optimize it by using a fixed-length char array. When the
|
||||
// length of sts_token is less than 1024, allocation is no longer performed, but data_arr is used
|
||||
// directly for storage. Please refer to the documentation for details:
|
||||
//
|
||||
char data_arr_[OB_PREDEFINED_STS_TOKEN_LENGTH];
|
||||
int64_t data_len_;
|
||||
bool is_below_predefined_length_;
|
||||
bool is_inited_;
|
||||
};
|
||||
|
||||
struct ObObjectStorageCredential
|
||||
{
|
||||
ObObjectStorageCredential();
|
||||
virtual ~ObObjectStorageCredential()
|
||||
{
|
||||
reset();
|
||||
}
|
||||
TO_STRING_KV(K_(expiration_s), K_(access_time_us), K_(born_time_us),
|
||||
K_(access_id), KP_(access_key), K_(sts_token));
|
||||
void reset();
|
||||
int assign(const ObObjectStorageCredential &credential);
|
||||
|
||||
// Temporary ak and sk to access bucket
|
||||
char access_id_[OB_MAX_BACKUP_ACCESSID_LENGTH];
|
||||
char access_key_[OB_MAX_BACKUP_ACCESSKEY_LENGTH];
|
||||
ObSTSToken sts_token_;
|
||||
// Expiration time of current ak/sk returned from STS Service
|
||||
int64_t expiration_s_;
|
||||
// Latest access time of the credential
|
||||
int64_t access_time_us_;
|
||||
int64_t born_time_us_;
|
||||
};
|
||||
|
||||
class ObClusterVersionBaseMgr
|
||||
{
|
||||
public:
|
||||
ObClusterVersionBaseMgr() {}
|
||||
virtual ~ObClusterVersionBaseMgr() {}
|
||||
virtual int is_supported_assume_version() const
|
||||
{
|
||||
return OB_SUCCESS;
|
||||
};
|
||||
static ObClusterVersionBaseMgr &get_instance()
|
||||
{
|
||||
static ObClusterVersionBaseMgr mgr;
|
||||
return mgr;
|
||||
}
|
||||
};
|
||||
|
||||
class ObObjectStorageInfo
|
||||
{
|
||||
OB_UNIS_VERSION(1);
|
||||
|
||||
public:
|
||||
ObObjectStorageInfo();
|
||||
virtual ~ObObjectStorageInfo();
|
||||
@ -87,6 +199,14 @@ public:
|
||||
ObStorageChecksumType get_checksum_type() const;
|
||||
const char *get_checksum_type_str() const;
|
||||
virtual int get_storage_info_str(char *storage_info, const int64_t info_len) const;
|
||||
|
||||
// the following two functions are designed for Assume Role.
|
||||
int validate_arguments() const;
|
||||
bool is_assume_role_mode() const;
|
||||
virtual int get_authorization_str(char *authorization_str,
|
||||
const int64_t authorization_str_len,
|
||||
ObSTSToken &sts_token) const;
|
||||
|
||||
// the following two functions are designed for ObDeviceManager, which manages all devices by a device_map_
|
||||
int get_device_map_key_str(char *key_str, const int64_t len) const;
|
||||
int64_t get_device_map_key_len() const;
|
||||
@ -101,7 +221,8 @@ public:
|
||||
int reset_access_id_and_access_key(
|
||||
const char *access_id, const char *access_key);
|
||||
TO_STRING_KV(K_(endpoint), K_(access_id), K_(extension), "type", get_type_str(),
|
||||
K_(checksum_type), K_(max_iops), K_(max_bandwidth));
|
||||
K_(checksum_type), K_(max_iops), K_(max_bandwidth), KP_(role_arn), KP_(external_id));
|
||||
static int register_cluster_version_mgr(ObClusterVersionBaseMgr *cluster_version_mgr);
|
||||
|
||||
protected:
|
||||
virtual int get_access_key_(char *key_buf, const int64_t key_buf_len) const;
|
||||
@ -110,13 +231,16 @@ protected:
|
||||
int check_addressing_model_(const char *addressing_model) const;
|
||||
int set_checksum_type_(const char *checksum_type_str);
|
||||
int set_storage_info_field_(const char *info, char *field, const int64_t length);
|
||||
int get_info_str_(char *storage_info, const int64_t info_len) const;
|
||||
int append_extension_str_(char *storage_info, const int64_t info_len) const;
|
||||
|
||||
|
||||
public:
|
||||
int delete_mode_;
|
||||
// TODO: Rename device_type_ to storage_protocol_type_ for better clarity
|
||||
// Prefix in the storage_info string, such as 's3://', indicates the protocol used to access the target
|
||||
// Currently, both OBS and GCS are accessed via the s3 protocol,
|
||||
// hence s3_region is updated to be an optional parameter
|
||||
// Prefix in the storage_info string, such as 's3://', indicates the protocol used to access the
|
||||
// target. Currently, both OBS and GCS are accessed via the s3 protocol, hence s3_region is updated
|
||||
// to be an optional parameter
|
||||
common::ObStorageType device_type_;
|
||||
// Optional parameter. If not provided, the default value OB_MD5_ALGO will be used.
|
||||
// For OSS/COS, OB_NO_CHECKSUM_ALGO indicates that no checksum algorithm will be used.
|
||||
@ -129,8 +253,160 @@ public:
|
||||
char extension_[OB_MAX_BACKUP_EXTENSION_LENGTH];
|
||||
int64_t max_iops_;
|
||||
int64_t max_bandwidth_;
|
||||
|
||||
// Support access object storage by assume role
|
||||
char role_arn_[OB_MAX_ROLE_ARN_LENGTH];
|
||||
char external_id_[OB_MAX_EXTERNAL_ID_LENGTH];
|
||||
bool is_assume_role_mode_;
|
||||
static ObClusterVersionBaseMgr *cluster_version_mgr_;
|
||||
};
|
||||
|
||||
class ObTenantStsCredentialBaseMgr
|
||||
{
|
||||
public:
|
||||
ObTenantStsCredentialBaseMgr() {}
|
||||
~ObTenantStsCredentialBaseMgr() {}
|
||||
virtual int get_sts_credential(char *sts_credential, const int64_t sts_credential_buf_len) = 0;
|
||||
};
|
||||
|
||||
class ObStsCredential
|
||||
{
|
||||
public:
|
||||
ObStsCredential();
|
||||
virtual ~ObStsCredential();
|
||||
int init(const uint64_t tenant_id);
|
||||
void reset();
|
||||
int get_sts_credential();
|
||||
static int register_sts_credential_mgr(ObTenantStsCredentialBaseMgr *sts_credential_mgr);
|
||||
TO_STRING_KV(K_(tenant_id), K_(sts_ak), KP_(sts_sk), K_(sts_url), KP_(sts_credential_mgr));
|
||||
public:
|
||||
uint64_t tenant_id_;
|
||||
char sts_ak_[OB_MAX_STS_AK_LENGTH];
|
||||
char sts_sk_[OB_MAX_STS_SK_LENGTH];
|
||||
char sts_url_[OB_MAX_STS_URL_LENGTH];
|
||||
bool is_inited_;
|
||||
private:
|
||||
static ObTenantStsCredentialBaseMgr *sts_credential_mgr_;
|
||||
};
|
||||
|
||||
class ObDeviceCredentialKey
|
||||
{
|
||||
public:
|
||||
ObDeviceCredentialKey();
|
||||
virtual ~ObDeviceCredentialKey();
|
||||
int init(const ObObjectStorageInfo &storage_info);
|
||||
void reset();
|
||||
uint64_t hash() const;
|
||||
int hash(uint64_t &hash_val) const
|
||||
{
|
||||
hash_val = hash();
|
||||
return OB_SUCCESS;
|
||||
}
|
||||
int assign(const ObDeviceCredentialKey &other);
|
||||
bool operator==(const ObDeviceCredentialKey &other) const;
|
||||
bool operator!=(const ObDeviceCredentialKey &other) const;
|
||||
bool is_valid() const;
|
||||
|
||||
int construct_signed_url(char *url_buf, const int64_t url_buf_len) const;
|
||||
TO_STRING_KV(K_(is_inited), KP_(role_arn), KP_(external_id), K_(tenant_id));
|
||||
|
||||
public:
|
||||
char role_arn_[OB_MAX_ROLE_ARN_LENGTH];
|
||||
char external_id_[OB_MAX_EXTERNAL_ID_LENGTH];
|
||||
// tenant_id is used to distinguish different tenants
|
||||
uint64_t tenant_id_;
|
||||
bool is_inited_;
|
||||
|
||||
private:
|
||||
int init_(const char *role_arn, const char *external_id);
|
||||
};
|
||||
|
||||
int check_sts_credential_format(const char *sts_credential, ObStsCredential &credential_key);
|
||||
|
||||
class ObDeviceCredentialMgr
|
||||
{
|
||||
public:
|
||||
enum ResponseItem
|
||||
{
|
||||
AccessKeyId = 0,
|
||||
AccessKeySecret = 1,
|
||||
SecurityToken = 2,
|
||||
DurationSeconds = 3
|
||||
};
|
||||
// It is used to receive the response from STS
|
||||
class ResponseAndAllocator
|
||||
{
|
||||
public:
|
||||
ResponseAndAllocator(char *&response, common::ObArenaAllocator &allocator)
|
||||
: response_(response), allocator_(allocator)
|
||||
{}
|
||||
~ResponseAndAllocator()
|
||||
{}
|
||||
char *&response_;
|
||||
ObArenaAllocator &allocator_;
|
||||
};
|
||||
|
||||
static ObDeviceCredentialMgr &get_instance();
|
||||
virtual ~ObDeviceCredentialMgr();
|
||||
int init();
|
||||
// curl STS service to perform as assume role is used, and then update @credential_map_
|
||||
void destroy();
|
||||
int connect_to_sts(
|
||||
const ObDeviceCredentialKey &credential_key, ResponseAndAllocator &res_and_allocator);
|
||||
int curl_credential(
|
||||
const ObObjectStorageInfo &storage_info, const bool update_access_time = true);
|
||||
int curl_credential(
|
||||
const ObDeviceCredentialKey &credential_key, const bool update_access_time = true);
|
||||
int get_credential(
|
||||
const ObObjectStorageInfo &storage_info, ObObjectStorageCredential &device_credential);
|
||||
int get_credential(
|
||||
const ObDeviceCredentialKey &credential_key, ObObjectStorageCredential &device_credential);
|
||||
bool operator=(const ObDeviceCredentialMgr &) = delete;
|
||||
// refresh all managed credentials
|
||||
int refresh();
|
||||
void set_credential_duration_us(const int64_t duration_us)
|
||||
{
|
||||
credential_duration_us_ = duration_us;
|
||||
}
|
||||
|
||||
private:
|
||||
ObDeviceCredentialMgr();
|
||||
static int64_t on_write_data_(
|
||||
const void *ptr, const int64_t size, const int64_t nmemb, void *user_data);
|
||||
static int64_t debug_callback(
|
||||
CURL *handle, curl_infotype type, char *data, size_t size, void *userp);
|
||||
int get_credential_from_map_(
|
||||
const ObDeviceCredentialKey &credential_key, ObObjectStorageCredential &device_credential);
|
||||
int parse_device_credential_(const char *res_ptr, ObObjectStorageCredential &credential);
|
||||
|
||||
private:
|
||||
static const int64_t RESPONSE_ITEM_CNT = 4;
|
||||
static const char *response_items_[RESPONSE_ITEM_CNT];
|
||||
typedef hash::ObHashMap<ObDeviceCredentialKey, ObObjectStorageCredential> CredentialMap;
|
||||
bool is_inited_;
|
||||
CredentialMap credential_map_;
|
||||
common::SpinRWLock credential_lock_;
|
||||
// The time when the credential expires from the cache
|
||||
int64_t credential_duration_us_;
|
||||
};
|
||||
|
||||
class CredentialAccessTimeCallBack
|
||||
{
|
||||
public:
|
||||
explicit CredentialAccessTimeCallBack(const bool update_access_time_us)
|
||||
: update_access_time_us_(update_access_time_us), original_access_time_us_(0)
|
||||
{}
|
||||
void operator()(hash::HashMapPair<ObDeviceCredentialKey, ObObjectStorageCredential> &v)
|
||||
{
|
||||
original_access_time_us_ = v.second.access_time_us_;
|
||||
if (update_access_time_us_) {
|
||||
v.second.access_time_us_ = ObTimeUtility::current_time();
|
||||
}
|
||||
};
|
||||
|
||||
bool update_access_time_us_;
|
||||
int64_t original_access_time_us_;
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -555,8 +555,8 @@ void ObOssAccount::reset_account()
|
||||
memset(oss_domain_, 0, MAX_OSS_ENDPOINT_LENGTH);
|
||||
memset(oss_id_, 0, MAX_OSS_ID_LENGTH);
|
||||
memset(oss_key_, 0, MAX_OSS_KEY_LENGTH);
|
||||
oss_sts_token_ = nullptr;
|
||||
delete_mode_ = ObStorageDeleteMode::STORAGE_DELETE_MODE;
|
||||
sts_token_.reset();
|
||||
is_inited_ = false;
|
||||
}
|
||||
|
||||
@ -610,10 +610,10 @@ int ObStorageOssBase::init_with_storage_info(common::ObObjectStorageInfo *storag
|
||||
} else if (OB_ISNULL(storage_info) || OB_UNLIKELY(!storage_info->is_valid())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
OB_LOG(WARN, "oss account is invalid, fail to init oss base!", K(ret), KPC(storage_info));
|
||||
} else if (OB_FAIL(storage_info->get_storage_info_str(info_str, sizeof(info_str)))) {
|
||||
OB_LOG(WARN, "fail to get storage info str", K(ret), KPC(storage_info));
|
||||
} else if (OB_FAIL(storage_info->get_authorization_str(info_str, sizeof(info_str), oss_account_.sts_token_))) {
|
||||
OB_LOG(WARN, "fail to get authorization str", K(ret), KPC(storage_info));
|
||||
} else if (OB_FAIL(oss_account_.parse_oss_arg(info_str))) {
|
||||
OB_LOG(WARN, "fail to build oss account", K(ret));
|
||||
OB_LOG(WARN, "fail to build oss account", K(ret), KP(info_str), KPC(storage_info));
|
||||
} else if (OB_FAIL(init_oss_options(aos_pool_, oss_option_))) {
|
||||
OB_LOG(WARN, "fail to init oss options", K(aos_pool_), K(oss_option_), K(ret));
|
||||
} else if (OB_ISNULL(aos_pool_) || OB_ISNULL(oss_option_)) {
|
||||
@ -682,15 +682,15 @@ int ObOssAccount::parse_oss_arg(const common::ObString &storage_info)
|
||||
if (NULL == token) {
|
||||
break;
|
||||
} else if (0 == strncmp(HOST, token, strlen(HOST))) {
|
||||
if (OB_FAIL(set_oss_field(token + strlen(HOST), oss_domain_, sizeof(oss_domain_)))) {
|
||||
if (OB_FAIL(ob_set_field(token + strlen(HOST), oss_domain_, sizeof(oss_domain_)))) {
|
||||
OB_LOG(WARN, "failed to set oss_domain", K(ret), KCSTRING(token));
|
||||
}
|
||||
} else if (0 == strncmp(ACCESS_ID, token, strlen(ACCESS_ID))) {
|
||||
if (OB_FAIL(set_oss_field(token + strlen(ACCESS_ID), oss_id_, sizeof(oss_id_)))) {
|
||||
if (OB_FAIL(ob_set_field(token + strlen(ACCESS_ID), oss_id_, sizeof(oss_id_)))) {
|
||||
OB_LOG(WARN, "failed to set oss_id_", K(ret), KCSTRING(token));
|
||||
}
|
||||
} else if (0 == strncmp(ACCESS_KEY, token, strlen(ACCESS_KEY))) {
|
||||
if (OB_FAIL(set_oss_field(token + strlen(ACCESS_KEY), oss_key_, sizeof(oss_key_)))) {
|
||||
if (OB_FAIL(ob_set_field(token + strlen(ACCESS_KEY), oss_key_, sizeof(oss_key_)))) {
|
||||
OB_LOG(WARN, "failed to set oss_key_", K(ret));
|
||||
}
|
||||
} else if (0 == strncmp(STS_TOKEN_KEY, token, strlen(STS_TOKEN_KEY))) {
|
||||
@ -700,7 +700,7 @@ int ObOssAccount::parse_oss_arg(const common::ObString &storage_info)
|
||||
if (OB_ISNULL(oss_sts_token_ = reinterpret_cast<char*>(allocator_.alloc(sts_token_len + 1)))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
OB_LOG(WARN, "fail to alloc memory", K(ret), K(sts_token_len));
|
||||
} else if (OB_FAIL(set_oss_field(token + strlen(STS_TOKEN_KEY), oss_sts_token_, sts_token_len + 1))) {
|
||||
} else if (OB_FAIL(ob_set_field(token + strlen(STS_TOKEN_KEY), oss_sts_token_, sts_token_len + 1))) {
|
||||
OB_LOG(WARN, "failed to set oss_sts_token_", K(ret), KCSTRING(token));
|
||||
}
|
||||
} else if (0 == strncmp(DELETE_MODE, token, strlen(DELETE_MODE))) {
|
||||
@ -724,25 +724,6 @@ int ObOssAccount::parse_oss_arg(const common::ObString &storage_info)
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObOssAccount::set_oss_field(const char *info, char *field, const int64_t length)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
if (NULL == info || NULL == field) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
OB_LOG(WARN, "invalid args", K(ret), KP(info), KP(field));
|
||||
} else {
|
||||
const int64_t info_len = strlen(info);
|
||||
if (info_len >= length) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
OB_LOG(WARN, "info is too long ", K(ret), K(info_len), K(length));
|
||||
} else {
|
||||
MEMCPY(field, info, info_len);
|
||||
field[info_len] = '\0';
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
/* only used by pread and init. Initialize one aos_pol and oss_option. Other methods shouldn't call this method.
|
||||
* pread need alloc memory on aos_pol and release after read finish */
|
||||
int ObStorageOssBase::init_oss_options(aos_pool_t *&aos_pool, oss_request_options_t *&oss_option)
|
||||
@ -770,12 +751,14 @@ int ObStorageOssBase::init_oss_options(aos_pool_t *&aos_pool, oss_request_option
|
||||
ret = OB_OBJECT_STORAGE_IO_ERROR;
|
||||
OB_LOG(WARN, "fail to create aos http request options", K(ret));
|
||||
} else {
|
||||
const char *sts_data = oss_account_.sts_token_.get_data();
|
||||
aos_str_set(&oss_option->config->endpoint, oss_endpoint_);
|
||||
aos_str_set(&oss_option->config->access_key_id, oss_account_.oss_id_);
|
||||
aos_str_set(&oss_option->config->access_key_secret, oss_account_.oss_key_);
|
||||
if (OB_NOT_NULL(oss_account_.oss_sts_token_)) {
|
||||
aos_str_set(&oss_option->config->sts_token, oss_account_.oss_sts_token_);
|
||||
if (OB_NOT_NULL(sts_data)) {
|
||||
aos_str_set(&oss_option->config->sts_token, sts_data);
|
||||
}
|
||||
|
||||
oss_option->config->is_cname = 0;
|
||||
|
||||
// Set connection timeout, the default value is 10s
|
||||
|
@ -129,9 +129,10 @@ public:
|
||||
ObOssAccount();
|
||||
virtual ~ObOssAccount();
|
||||
int parse_oss_arg(const common::ObString &storage_info);
|
||||
static int set_oss_field(const char *info, char *field, const int64_t length);
|
||||
void reset_account();
|
||||
int set_delete_mode(const char *parameter);
|
||||
TO_STRING_KV(K_(oss_domain), K_(delete_mode), K_(oss_id), KP_(oss_key), K_(is_inited), K_(sts_token));
|
||||
|
||||
char oss_domain_[MAX_OSS_ENDPOINT_LENGTH];
|
||||
char oss_id_[MAX_OSS_ID_LENGTH];
|
||||
char oss_key_[MAX_OSS_KEY_LENGTH];
|
||||
@ -141,8 +142,7 @@ public:
|
||||
common::ObArenaAllocator allocator_;
|
||||
ObStorageDeleteMode delete_mode_;
|
||||
bool is_inited_;
|
||||
|
||||
TO_STRING_KV(K_(is_inited), K_(delete_mode), K_(oss_domain), K_(oss_id));
|
||||
ObSTSToken sts_token_;
|
||||
};
|
||||
|
||||
class ObStorageOssBase
|
||||
|
@ -230,7 +230,15 @@ int ObS3Client::init(const ObS3Account &account)
|
||||
S3ClientConfiguration config(init_values);
|
||||
// Re-enables IMDS access for subsequent operations if needed
|
||||
config.disableIMDS = false;
|
||||
Aws::Auth::AWSCredentials credentials(account.access_id_, account.secret_key_);
|
||||
Aws::Auth::AWSCredentials credentials;
|
||||
const char *sts_data = account.sts_token_.get_data();
|
||||
if (OB_NOT_NULL(sts_data)) {
|
||||
credentials = Aws::Auth::AWSCredentials(account.access_id_, account.secret_key_, sts_data);
|
||||
} else {
|
||||
credentials = Aws::Auth::AWSCredentials(account.access_id_, account.secret_key_);
|
||||
}
|
||||
|
||||
|
||||
SpinWLockGuard guard(lock_);
|
||||
if (OB_UNLIKELY(is_inited_)) {
|
||||
ret = OB_INIT_TWICE;
|
||||
@ -1025,6 +1033,7 @@ void ObS3Account::reset()
|
||||
MEMSET(endpoint_, 0, sizeof(endpoint_));
|
||||
MEMSET(access_id_, 0, sizeof(access_id_));
|
||||
MEMSET(secret_key_, 0, sizeof(secret_key_));
|
||||
sts_token_.reset();
|
||||
addressing_model_ = ObStorageAddressingModel::OB_VIRTUAL_HOSTED_STYLE;
|
||||
}
|
||||
|
||||
@ -1059,23 +1068,23 @@ int ObS3Account::parse_from(const char *storage_info_str, const int64_t size)
|
||||
if (OB_ISNULL(token)) {
|
||||
break;
|
||||
} else if (0 == strncmp(REGION, token, strlen(REGION))) {
|
||||
if (OB_FAIL(set_field(token + strlen(REGION), region_, sizeof(region_)))) {
|
||||
if (OB_FAIL(ob_set_field(token + strlen(REGION), region_, sizeof(region_)))) {
|
||||
OB_LOG(WARN, "failed to set s3 region", K(ret), KCSTRING(token));
|
||||
}
|
||||
} else if (0 == strncmp(HOST, token, strlen(HOST))) {
|
||||
if (OB_FAIL(set_field(token + strlen(HOST), endpoint_, sizeof(endpoint_)))) {
|
||||
if (OB_FAIL(ob_set_field(token + strlen(HOST), endpoint_, sizeof(endpoint_)))) {
|
||||
OB_LOG(WARN, "failed to set s3 endpoint", K(ret), KCSTRING(token));
|
||||
} else {
|
||||
bitmap |= 1;
|
||||
}
|
||||
} else if (0 == strncmp(ACCESS_ID, token, strlen(ACCESS_ID))) {
|
||||
if (OB_FAIL(set_field(token + strlen(ACCESS_ID), access_id_, sizeof(access_id_)))) {
|
||||
if (OB_FAIL(ob_set_field(token + strlen(ACCESS_ID), access_id_, sizeof(access_id_)))) {
|
||||
OB_LOG(WARN, "failed to set s3 access id", K(ret), KCSTRING(token));
|
||||
} else {
|
||||
bitmap |= (1 << 1);
|
||||
}
|
||||
} else if (0 == strncmp(ACCESS_KEY, token, strlen(ACCESS_KEY))) {
|
||||
if (OB_FAIL(set_field(token + strlen(ACCESS_KEY), secret_key_, sizeof(secret_key_)))) {
|
||||
if (OB_FAIL(ob_set_field(token + strlen(ACCESS_KEY), secret_key_, sizeof(secret_key_)))) {
|
||||
OB_LOG(WARN, "failed to set s3 secret key", K(ret), KP(token));
|
||||
} else {
|
||||
bitmap |= (1 << 2);
|
||||
@ -1120,25 +1129,6 @@ int ObS3Account::parse_from(const char *storage_info_str, const int64_t size)
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObS3Account::set_field(const char *value, char *field, const uint32_t field_length)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_ISNULL(value) || OB_ISNULL(field)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
OB_LOG(WARN, "invliad arguments", K(ret), KP(value), KP(field));
|
||||
} else {
|
||||
const int64_t value_len = strlen(value);
|
||||
if (value_len >= field_length) {
|
||||
ret = OB_SIZE_OVERFLOW;
|
||||
OB_LOG(WARN, "value is too long", K(ret), KP(value), K(value_len), K(field_length));
|
||||
} else {
|
||||
MEMCPY(field, value, value_len);
|
||||
field[value_len] = '\0';
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
/*--------------------------------ObStorageS3Base--------------------------------*/
|
||||
ObStorageS3Base::ObStorageS3Base()
|
||||
: allocator_(OB_STORAGE_S3_ALLOCATOR),
|
||||
@ -1199,8 +1189,8 @@ int ObStorageS3Base::inner_open(const ObString &uri, ObObjectStorageInfo *storag
|
||||
OB_LOG(WARN, "failed to init s3 base, invalid arguments", K(ret), K(uri), KPC(storage_info));
|
||||
} else if (OB_FAIL(build_bucket_and_object_name(allocator_, uri, bucket_, object_))) {
|
||||
OB_LOG(WARN, "failed to parse uri", K(ret), K(uri));
|
||||
} else if (OB_FAIL(storage_info->get_storage_info_str(info_str, sizeof(info_str)))) {
|
||||
OB_LOG(WARN, "failed to get storage info str", K(ret), KPC(storage_info));
|
||||
} else if (OB_FAIL(storage_info->get_authorization_str(info_str, sizeof(info_str), s3_account_.sts_token_))) {
|
||||
OB_LOG(WARN, "failed to get authorization str", K(ret), KPC(storage_info));
|
||||
} else if (OB_FAIL(s3_account_.parse_from(info_str, strlen(info_str)))) {
|
||||
OB_LOG(WARN, "failed to build s3 account", K(ret));
|
||||
} else if (OB_FAIL(ObS3Env::get_instance().get_or_create_s3_client(s3_account_, s3_client_))) {
|
||||
|
@ -136,10 +136,9 @@ struct ObS3Account
|
||||
void reset();
|
||||
bool is_valid() const { return is_valid_; }
|
||||
int64_t hash() const;
|
||||
TO_STRING_KV(K_(is_valid), K_(delete_mode), K_(region), K_(endpoint), K_(access_id), K_(addressing_model));
|
||||
TO_STRING_KV(K_(is_valid), K_(delete_mode), K_(region), K_(endpoint), K_(access_id), KP_(secret_key), K_(sts_token), K_(addressing_model));
|
||||
|
||||
int parse_from(const char *storage_info_str, const int64_t size);
|
||||
int set_field(const char *value, char *field, const uint32_t field_length);
|
||||
|
||||
bool is_valid_;
|
||||
int64_t delete_mode_;
|
||||
@ -147,6 +146,7 @@ struct ObS3Account
|
||||
char endpoint_[MAX_S3_ENDPOINT_LENGTH];
|
||||
char access_id_[MAX_S3_ACCESS_ID_LENGTH]; // ak
|
||||
char secret_key_[MAX_S3_SECRET_KEY_LENGTH]; // sk
|
||||
ObSTSToken sts_token_;
|
||||
ObStorageAddressingModel addressing_model_;
|
||||
};
|
||||
|
||||
@ -392,7 +392,6 @@ protected:
|
||||
private:
|
||||
bool is_inited_;
|
||||
ObS3Account s3_account_;
|
||||
|
||||
friend class ObStorageS3Util;
|
||||
DISALLOW_COPY_AND_ASSIGN(ObStorageS3Base);
|
||||
};
|
||||
|
6
deps/oblib/unittest/lib/CMakeLists.txt
vendored
6
deps/oblib/unittest/lib/CMakeLists.txt
vendored
@ -92,11 +92,11 @@ oblib_addtest(rc/test_context.cpp)
|
||||
oblib_addtest(resource/test_resource_mgr.cpp)
|
||||
#oblib_addtest(restore/test_storage_file.cpp)
|
||||
#oblib_addtest(restore/test_storage_oss.cpp)
|
||||
oblib_addtest(restore/test_storage_cos.cpp)
|
||||
oblib_addtest(restore/test_storage_s3.cpp)
|
||||
#oblib_addtest(restore/test_storage_cos.cpp)
|
||||
#oblib_addtest(restore/test_storage_s3.cpp)
|
||||
# oblib_addtest(restore/test_object_storage.cpp)
|
||||
# oblib_addtest(restore/test_common_storage.cpp)
|
||||
oblib_addtest(restore/test_storage_info.cpp)
|
||||
#oblib_addtest(restore/test_storage_info.cpp)
|
||||
#oblib_addtest(restore/test_storage.cpp)
|
||||
oblib_addtest(stat/test_di_cache.cpp)
|
||||
oblib_addtest(stat/test_diagnose_info.cpp)
|
||||
|
@ -125,6 +125,7 @@
|
||||
#include "observer/table/ttl/ob_table_ttl_task.h"
|
||||
#include "storage/high_availability/ob_storage_ha_diagnose_service.h"
|
||||
#include "logservice/palf/log_cache.h"
|
||||
#include "share/ob_device_credential_task.h"
|
||||
#ifdef OB_BUILD_ARBITRATION
|
||||
#include "logservice/arbserver/palf_env_lite_mgr.h"
|
||||
#include "logservice/arbserver/ob_arb_srv_network_frame.h"
|
||||
@ -403,6 +404,8 @@ int ObServer::init(const ObServerOptions &opts, const ObPLogWriterCfg &log_cfg)
|
||||
#endif
|
||||
} else if (OB_FAIL(schema_status_proxy_.init())) {
|
||||
LOG_ERROR("fail to init schema status proxy", KR(ret));
|
||||
} else if (OB_FAIL(device_credential_task_.init(CREDENTIAL_TASK_SCHEDULE_INTERVAL_US))) {
|
||||
LOG_ERROR("fail to init device_credential_task", KR(ret), K(CREDENTIAL_TASK_SCHEDULE_INTERVAL_US));
|
||||
} else if (OB_FAIL(init_schema())) {
|
||||
LOG_ERROR("init schema failed", KR(ret));
|
||||
} else if (OB_FAIL(init_network())) {
|
||||
|
@ -83,6 +83,7 @@
|
||||
#ifdef OB_BUILD_SHARED_STORAGE
|
||||
#include "close_modules/shared_storage/storage/shared_storage/ob_tenant_gc_task.h"
|
||||
#endif
|
||||
#include "share/ob_device_credential_task.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
@ -477,6 +478,7 @@ private:
|
||||
ObRefreshIOCalibrationTimeTask refresh_io_calibration_task_; // retry to success & no repeat
|
||||
blocksstable::ObStorageEnv storage_env_;
|
||||
share::ObSchemaStatusProxy schema_status_proxy_;
|
||||
ObDeviceCredentialTask device_credential_task_;
|
||||
|
||||
// for locality
|
||||
ObLocalityManager locality_manager_;
|
||||
|
@ -13,6 +13,7 @@
|
||||
#define USING_LOG_PREFIX SERVER
|
||||
#include "ob_table_connection_mgr.h"
|
||||
#include "rpc/ob_rpc_request_operator.h"
|
||||
#include "lib/allocator/ob_sql_mem_leak_checker.h"
|
||||
|
||||
using namespace oceanbase::table;
|
||||
using namespace oceanbase::common;
|
||||
@ -55,6 +56,7 @@ ObTableConnectionMgr::ObTableConnectionMgr()
|
||||
ObTableConnectionMgr &ObTableConnectionMgr::get_instance()
|
||||
{
|
||||
ObTableConnectionMgr *instance = NULL;
|
||||
DISABLE_SQL_MEMLEAK_GUARD;
|
||||
while (OB_UNLIKELY(once_ < 2)) {
|
||||
if (ATOMIC_BCAS(&once_, 0, 1)) {
|
||||
instance = OB_NEW(ObTableConnectionMgr, ObModIds::TABLE_PROC);
|
||||
|
@ -254,6 +254,9 @@ int ObRestoreScheduler::restore_tenant(const ObPhysicalRestoreJob &job_info)
|
||||
LOG_WARN("update restore option", K(ret), K(new_tenant_id), K(job_id), K(tenant_id_));
|
||||
} else if (OB_FAIL(may_update_restore_concurrency_(new_tenant_id, job_info))) {
|
||||
LOG_WARN("failed to update restore concurrency", K(ret), K(new_tenant_id), K(job_info));
|
||||
} else if (!job_info.get_sts_credential().empty()
|
||||
&& OB_FAIL(set_tenant_sts_crendential_config_(*sql_proxy_, new_tenant_id, job_info))) {
|
||||
LOG_WARN("fail to set tenant sts credential config", K(ret), K(new_tenant_id));
|
||||
} else {
|
||||
restore_service_->wakeup();
|
||||
}
|
||||
@ -452,6 +455,26 @@ int ObRestoreScheduler::restore_pre(const ObPhysicalRestoreJob &job_info)
|
||||
|
||||
return ret;
|
||||
}
|
||||
int ObRestoreScheduler::set_tenant_sts_crendential_config_(
|
||||
common::ObISQLClient &proxy, const uint64_t tenant_id, const share::ObPhysicalRestoreJob &job_info)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t affected_row = 0;
|
||||
ObSqlString sql;
|
||||
const ObString &sts_credential = job_info.get_sts_credential();
|
||||
bool is_exist = false;
|
||||
if (!is_user_tenant(tenant_id) || sts_credential.empty()) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("not user tenant or sts credential is empty", K(job_info));
|
||||
} else if (OB_FAIL(sql.assign_fmt("alter system set sts_credential='%.*s'", sts_credential.length(), sts_credential.ptr()))) {
|
||||
LOG_WARN("failed to assign fmt", K(ret));
|
||||
} else if (OB_FAIL(sql_proxy_->write(tenant_id, sql.ptr(), affected_row))) {
|
||||
LOG_WARN("failed to set sts credential", K(ret), K(tenant_id));
|
||||
} else {
|
||||
LOG_INFO("update restore tenant sts credential", K(tenant_id));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObRestoreScheduler::wait_sys_job_ready_(const ObPhysicalRestoreJob &job, bool &is_ready) {
|
||||
int ret = OB_SUCCESS;
|
||||
|
@ -122,6 +122,8 @@ private:
|
||||
int wait_restore_safe_mview_merge_info_();
|
||||
int try_collect_ls_mv_merge_scn_(const share::SCN &tenant_mv_merge_scn);
|
||||
int update_restore_progress_by_bytes_(const ObPhysicalRestoreJob &job, const int64_t total_bytes, const int64_t finish_bytes);
|
||||
int set_tenant_sts_crendential_config_(common::ObISQLClient &proxy,
|
||||
const uint64_t tenant_id, const share::ObPhysicalRestoreJob &job_info);
|
||||
private:
|
||||
bool inited_;
|
||||
share::schema::ObMultiVersionSchemaService *schema_service_;
|
||||
|
@ -98,6 +98,10 @@ int ObRestoreUtil::fill_physical_restore_job(
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (FAILEDx(fill_sts_credential_(arg, job))) {
|
||||
LOG_WARN("fail to fill sts credential", K(ret));
|
||||
}
|
||||
}
|
||||
|
||||
LOG_INFO("finish fill_physical_restore_job", K(job_id), K(arg), K(job));
|
||||
@ -599,6 +603,20 @@ int ObRestoreUtil::fill_encrypt_info_(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObRestoreUtil::fill_sts_credential_(
|
||||
const obrpc::ObPhysicalRestoreTenantArg &arg,
|
||||
share::ObPhysicalRestoreJob &job)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
bool is_valid = false;
|
||||
if (arg.sts_credential_.empty()) {
|
||||
//TODO(mingqiao): add format check function from shifangdan
|
||||
} else if (OB_FAIL(job.set_sts_credential(arg.sts_credential_))) {
|
||||
LOG_WARN("fail to set sts crendential");
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObRestoreUtil::get_restore_source(
|
||||
const bool restore_using_compl_log,
|
||||
const ObIArray<ObString>& tenant_path_array,
|
||||
|
@ -238,6 +238,9 @@ private:
|
||||
static int fill_encrypt_info_(
|
||||
const obrpc::ObPhysicalRestoreTenantArg &arg,
|
||||
share::ObPhysicalRestoreJob &job);
|
||||
static int fill_sts_credential_(
|
||||
const obrpc::ObPhysicalRestoreTenantArg &arg,
|
||||
share::ObPhysicalRestoreJob &job);
|
||||
DISALLOW_COPY_AND_ASSIGN(ObRestoreUtil);
|
||||
};
|
||||
|
||||
|
@ -235,6 +235,7 @@ ob_set_subtarget(ob_share common
|
||||
ob_domain_index_builder_util.cpp
|
||||
ob_service_name_proxy.cpp
|
||||
ob_compatibility_control.cpp
|
||||
ob_device_credential_task.cpp
|
||||
)
|
||||
|
||||
ob_set_subtarget(ob_share common_mixed
|
||||
|
@ -1151,10 +1151,23 @@ int ObBackupStorageInfo::get_authorization_info(char *authorization, const int64
|
||||
LOG_WARN("invalid args", K(ret), KP(authorization), K(length));
|
||||
} else if (OB_STORAGE_FILE == device_type_) {
|
||||
// do nothing
|
||||
} else if (OB_FAIL(get_access_key_(access_key_buf, sizeof(access_key_buf)))) {
|
||||
LOG_WARN("failed to get access key", K(ret));
|
||||
} else if (OB_FAIL(databuff_printf(authorization, length, "%s&%s", access_id_, access_key_buf))) {
|
||||
LOG_WARN("failed to set authorization", K(ret), K(length), K_(access_id), K(strlen(access_key_buf)));
|
||||
} else if (!is_assume_role_mode_) {
|
||||
// access by ak/sk mode
|
||||
if (OB_FAIL(get_access_key_(access_key_buf, sizeof(access_key_buf)))) {
|
||||
LOG_WARN("failed to get access key", K(ret));
|
||||
} else if (OB_FAIL(databuff_printf(authorization, length, "%s&%s", access_id_, access_key_buf))) {
|
||||
LOG_WARN("failed to set authorization", K(ret), K(length), K_(access_id), K(strlen(access_key_buf)));
|
||||
}
|
||||
} else {
|
||||
// access by assume role mode
|
||||
int64_t pos = 0;
|
||||
if (OB_FAIL(databuff_printf(authorization, length, pos, "%s", role_arn_))) {
|
||||
LOG_WARN("failed to set authorization", K(ret), K(length), KP_(role_arn));
|
||||
} else if (external_id_[0] != '\0') {
|
||||
if (OB_FAIL(databuff_printf(authorization, length, pos, "&%s", external_id_))) {
|
||||
LOG_WARN("failed to set authorization", K(ret), K(length), KP_(external_id));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return ret;
|
||||
@ -1400,6 +1413,8 @@ int ObBackupDest::parse_backup_dest_str_(const char *backup_dest)
|
||||
LOG_WARN("failed to get storage type", K(ret));
|
||||
} else {
|
||||
// oss://backup_dir/?host=xxx.com&access_id=111&access_key=222
|
||||
// oss://backup_dir/?host=xxx.com&role_arn=xxx&external_id=xxx
|
||||
// oss://backup_dir/?host=xxx.com&role_arn=xxx (external_id is optional)
|
||||
// file:///root_backup_dir"
|
||||
while (backup_dest[pos] != '\0') {
|
||||
if ('?' == backup_dest[pos]) {
|
||||
|
@ -890,6 +890,23 @@ bool ObConfigPlanCacheGCChecker::check(const ObConfigItem &t) const
|
||||
return is_valid;
|
||||
}
|
||||
|
||||
bool ObConfigSTScredentialChecker::check(const ObConfigItem &t) const
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
bool flag = true;
|
||||
const char *tmp_credential = t.str();
|
||||
ObStsCredential key;
|
||||
if (OB_ISNULL(tmp_credential) || OB_UNLIKELY(strlen(tmp_credential) <= 0
|
||||
|| strlen(tmp_credential) > OB_MAX_STS_CREDENTIAL_LENGTH)) {
|
||||
flag = false;
|
||||
OB_LOG(WARN, "invalid sts credential", KP(tmp_credential));
|
||||
} else if (OB_FAIL(check_sts_credential_format(tmp_credential, key))) {
|
||||
flag = false;
|
||||
OB_LOG(WARN, "fail to check sts credential format", K(ret), K(key), KP(tmp_credential));
|
||||
}
|
||||
return flag;
|
||||
}
|
||||
|
||||
bool ObConfigUseLargePagesChecker::check(const ObConfigItem &t) const
|
||||
{
|
||||
bool is_valid = false;
|
||||
|
@ -301,6 +301,18 @@ private:
|
||||
DISALLOW_COPY_AND_ASSIGN(ObConfigSyslogFileUncompressedCountChecker);
|
||||
};
|
||||
|
||||
// Used to check the format of STS credential
|
||||
class ObConfigSTScredentialChecker
|
||||
: public ObConfigChecker
|
||||
{
|
||||
public:
|
||||
ObConfigSTScredentialChecker() {}
|
||||
virtual ~ObConfigSTScredentialChecker() {}
|
||||
bool check(const ObConfigItem &t) const;
|
||||
private:
|
||||
DISALLOW_COPY_AND_ASSIGN(ObConfigSTScredentialChecker);
|
||||
};
|
||||
|
||||
class ObConfigUseLargePagesChecker
|
||||
: public ObConfigChecker
|
||||
{
|
||||
|
87
src/share/ob_device_credential_task.cpp
Normal file
87
src/share/ob_device_credential_task.cpp
Normal file
@ -0,0 +1,87 @@
|
||||
/**
|
||||
* Copyright (c) 2021 OceanBase
|
||||
* OceanBase is licensed under Mulan PubL v2.
|
||||
* You can use this software according to the terms and conditions of the Mulan
|
||||
* PubL v2. You may obtain a copy of Mulan PubL v2 at:
|
||||
* http://license.coscl.org.cn/MulanPubL-2.0
|
||||
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY
|
||||
* KIND, EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO
|
||||
* NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. See the
|
||||
* Mulan PubL v2 for more details.
|
||||
*/
|
||||
|
||||
#define USING_LOG_PREFIX SHARE
|
||||
|
||||
#include "share/ob_device_credential_task.h"
|
||||
#include "lib/time/ob_time_utility.h"
|
||||
#include "share/ob_thread_mgr.h"
|
||||
#include "share/config/ob_server_config.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
namespace share
|
||||
{
|
||||
using namespace oceanbase::common;
|
||||
|
||||
ObDeviceCredentialTask::ObDeviceCredentialTask() : is_inited_(false), schedule_interval_us_(0)
|
||||
{}
|
||||
|
||||
ObDeviceCredentialTask::~ObDeviceCredentialTask()
|
||||
{}
|
||||
|
||||
int ObDeviceCredentialTask::init(const int64_t interval_us)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (IS_INIT) {
|
||||
ret = OB_INIT_TWICE;
|
||||
LOG_ERROR("ObDeviceCredentialTask has already been inited", K(ret));
|
||||
} else if (interval_us <= 0) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_ERROR("invalid argument", K(ret), K(interval_us));
|
||||
} else {
|
||||
schedule_interval_us_ = interval_us;
|
||||
if (OB_FAIL(TG_SCHEDULE(lib::TGDefIDs::ServerGTimer, *this, schedule_interval_us_, true /*schedule repeatly*/))) {
|
||||
LOG_ERROR("fail to schedule task ObDeviceCredentialTask", K(ret), K(interval_us), KPC(this));
|
||||
} else {
|
||||
is_inited_ = true;
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
reset();
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
void ObDeviceCredentialTask::reset()
|
||||
{
|
||||
is_inited_ = false;
|
||||
schedule_interval_us_ = 0;
|
||||
}
|
||||
|
||||
void ObDeviceCredentialTask::runTimerTask()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const int64_t start_us = common::ObTimeUtility::fast_current_time();
|
||||
LOG_INFO("device credential task start", K(start_us));
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("device credential task not init", K(ret));
|
||||
} else if (OB_FAIL(do_work_())) {
|
||||
LOG_WARN("fail to do work", K(ret));
|
||||
}
|
||||
const int64_t cost_us = common::ObTimeUtility::fast_current_time() - start_us;
|
||||
LOG_INFO("device credential task finish", K(cost_us));
|
||||
}
|
||||
|
||||
int ObDeviceCredentialTask::do_work_()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObCurTraceId::init(GCONF.self_addr_);
|
||||
if (OB_FAIL(ObDeviceCredentialMgr::get_instance().refresh())) {
|
||||
OB_LOG(WARN, "failed to refresh device credentials", K(ret));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
} // namespace share
|
||||
} // namespace oceanbase
|
47
src/share/ob_device_credential_task.h
Normal file
47
src/share/ob_device_credential_task.h
Normal file
@ -0,0 +1,47 @@
|
||||
/**
|
||||
* Copyright (c) 2021 OceanBase
|
||||
* OceanBase is licensed under Mulan PubL v2.
|
||||
* You can use this software according to the terms and conditions of the Mulan
|
||||
* PubL v2. You may obtain a copy of Mulan PubL v2 at:
|
||||
* http://license.coscl.org.cn/MulanPubL-2.0
|
||||
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY
|
||||
* KIND, EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO
|
||||
* NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. See the
|
||||
* Mulan PubL v2 for more details.
|
||||
*/
|
||||
|
||||
#ifndef OCEANBASE_SHARE_DEVICE_OB_DEVICE_CREDENTIAL_TASK_H_
|
||||
#define OCEANBASE_SHARE_DEVICE_OB_DEVICE_CREDENTIAL_TASK_H_
|
||||
|
||||
#include "lib/restore/ob_storage_info.h"
|
||||
#include "lib/task/ob_timer.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
namespace share
|
||||
{
|
||||
// To ensure that the temporary credentials in the credential map are always valid,
|
||||
// the credentials are refreshed every 20 minutes.
|
||||
|
||||
class ObDeviceCredentialTask : public common::ObTimerTask
|
||||
{
|
||||
public:
|
||||
ObDeviceCredentialTask();
|
||||
virtual ~ObDeviceCredentialTask();
|
||||
int init(const int64_t interval_us);
|
||||
void reset();
|
||||
virtual void runTimerTask() override;
|
||||
TO_STRING_KV(K_(is_inited), K_(schedule_interval_us));
|
||||
|
||||
private:
|
||||
int do_work_();
|
||||
|
||||
private:
|
||||
bool is_inited_;
|
||||
int64_t schedule_interval_us_;
|
||||
};
|
||||
|
||||
} // namespace share
|
||||
} // namespace oceanbase
|
||||
|
||||
#endif // OCEANBASE_SHARE_DEVICE_OB_DEVICE_CREDENTIAL_TASK_H_
|
@ -26,6 +26,84 @@ namespace oceanbase
|
||||
{
|
||||
namespace common
|
||||
{
|
||||
int ObTenantStsCredentialMgr::get_sts_credential(
|
||||
char *sts_credential, const int64_t sts_credential_buf_len)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t tenant_id = ObObjectStorageTenantGuard::get_tenant_id();
|
||||
if (OB_ISNULL(sts_credential) || OB_UNLIKELY(sts_credential_buf_len <= 0)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
OB_LOG(WARN, "invalid args", K(ret),
|
||||
K(tenant_id), KP(sts_credential), K(sts_credential_buf_len));
|
||||
} else if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || is_virtual_tenant_id(tenant_id))) {
|
||||
// If the tenant is invalid or illegal, the sts_credential of the system tenant will be used as
|
||||
// a backup. Please refer to the following document for specific reasons.
|
||||
//
|
||||
tenant_id = OB_SYS_TENANT_ID;
|
||||
OB_LOG(WARN, "invalid tenant ctx, use sys tenant", K(ret), K(tenant_id));
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
if (is_meta_tenant(tenant_id)) {
|
||||
tenant_id = gen_user_tenant_id(tenant_id);
|
||||
}
|
||||
const char *tmp_credential = nullptr;
|
||||
|
||||
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(tenant_id));
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
// If the tenant does not have sts_credential, return OB_EAGAIN to wait for the next try.
|
||||
if (OB_TMP_FAIL(check_sts_credential(tenant_config))) {
|
||||
ret = OB_EAGAIN;
|
||||
OB_LOG(WARN, "fail to check sts credential, should try again", K(ret), K(tmp_ret), K(tenant_id));
|
||||
} else {
|
||||
tmp_credential = tenant_config->sts_credential;
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(databuff_printf(sts_credential, sts_credential_buf_len,
|
||||
"%s", tmp_credential))) {
|
||||
OB_LOG(WARN, "fail to deep copy sts_credential", K(ret), K(tenant_id), KP(tmp_credential));
|
||||
} else if (OB_UNLIKELY(sts_credential[0] == '\0')) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
OB_LOG(WARN, "sts_credential is null", K(ret), K(tenant_id), KP(tmp_credential));
|
||||
}
|
||||
OB_LOG(INFO, "get sts credential successfully", K(tenant_id));
|
||||
}
|
||||
if (OB_FAIL(ret) && REACH_TIME_INTERVAL(LOG_INTERVAL_US)) {
|
||||
OB_LOG(WARN, "try to get sts credential", K(ret), K(tenant_id), KP(tmp_credential));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTenantStsCredentialMgr::check_sts_credential(omt::ObTenantConfigGuard &tenant_config) const
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const char *sts_credential = nullptr;
|
||||
if (OB_UNLIKELY(!tenant_config.is_valid())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
OB_LOG(WARN, "tenant config is invalid", K(ret));
|
||||
} else if (OB_ISNULL(sts_credential = tenant_config->sts_credential)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
OB_LOG(WARN, "tenant config is invalid", K(ret), KP(sts_credential));
|
||||
} else if (OB_UNLIKELY(sts_credential[0] == '\0')) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
OB_LOG(WARN, "sts_credential is null", K(ret), KP(sts_credential));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObClusterVersionMgr::is_supported_assume_version() const
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
uint64 min_cluster_version = GET_MIN_CLUSTER_VERSION();
|
||||
if (min_cluster_version < MOCK_CLUSTER_VERSION_4_2_5_0
|
||||
|| (min_cluster_version >= CLUSTER_VERSION_4_3_0_0
|
||||
&& min_cluster_version < CLUSTER_VERSION_4_3_5_0)) {
|
||||
ret = OB_NOT_SUPPORTED;
|
||||
OB_LOG(WARN, "cluster version is too low for assume role", K(ret), K(GET_MIN_CLUSTER_VERSION()));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
const int ObDeviceManager::MAX_DEVICE_INSTANCE;
|
||||
ObDeviceManager::ObDeviceManager() : allocator_(), device_count_(0), is_init_(false)
|
||||
@ -59,6 +137,11 @@ int ObDeviceManager::init_devices_env()
|
||||
OB_LOG(WARN, "fail to init cos storage", K(ret));
|
||||
} else if (OB_FAIL(init_s3_env())) {
|
||||
OB_LOG(WARN, "fail to init s3 storage", K(ret));
|
||||
} else if (OB_FAIL(ObStsCredential::register_sts_credential_mgr(
|
||||
&ObTenantStsCredentialMgr::get_instance()))) {
|
||||
OB_LOG(WARN, "fail to register sts crendential", K(ret));
|
||||
} else if (OB_FAIL(ObDeviceCredentialMgr::get_instance().init())) {
|
||||
OB_LOG(WARN, "fail to init device credential mgr", K(ret));
|
||||
} else {
|
||||
// When compliantRfc3986Encoding is set to true:
|
||||
// - Adhere to RFC 3986 by supporting the encoding of reserved characters
|
||||
@ -117,6 +200,7 @@ void ObDeviceManager::destroy()
|
||||
fin_cos_env();
|
||||
fin_s3_env();
|
||||
lock_.destroy();
|
||||
ObDeviceCredentialMgr::get_instance().destroy();
|
||||
is_init_ = false;
|
||||
device_count_ = 0;
|
||||
OB_LOG_RET(WARN, ret_dev, "release the init resource", K(ret_dev), K(ret_handle));
|
||||
|
@ -17,6 +17,8 @@
|
||||
#include "lib/allocator/ob_fifo_allocator.h"
|
||||
#include "lib/hash/ob_hashmap.h"
|
||||
#include "lib/lock/ob_qsync_lock.h"
|
||||
#include "lib/restore/ob_storage_info.h"
|
||||
#include "observer/omt/ob_tenant_config_mgr.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
@ -24,6 +26,33 @@ namespace common
|
||||
{
|
||||
|
||||
class ObObjectStorageInfo;
|
||||
class ObTenantStsCredentialMgr : public ObTenantStsCredentialBaseMgr
|
||||
{
|
||||
public:
|
||||
ObTenantStsCredentialMgr() {}
|
||||
virtual ~ObTenantStsCredentialMgr() {}
|
||||
virtual int get_sts_credential(char *sts_credential, const int64_t sts_credential_buf_len) override;
|
||||
virtual int check_sts_credential(omt::ObTenantConfigGuard &tenant_config) const;
|
||||
static ObTenantStsCredentialBaseMgr &get_instance()
|
||||
{
|
||||
static ObTenantStsCredentialMgr mgr;
|
||||
return mgr;
|
||||
}
|
||||
const static int64_t LOG_INTERVAL_US = 5 * 1000 * 1000; // 5s
|
||||
};
|
||||
|
||||
class ObClusterVersionMgr: public ObClusterVersionBaseMgr
|
||||
{
|
||||
public:
|
||||
ObClusterVersionMgr() {}
|
||||
virtual ~ObClusterVersionMgr() {}
|
||||
virtual int is_supported_assume_version() const override;
|
||||
static ObClusterVersionMgr &get_instance()
|
||||
{
|
||||
static ObClusterVersionMgr mgr;
|
||||
return mgr;
|
||||
}
|
||||
};
|
||||
|
||||
class ObDeviceManager
|
||||
{
|
||||
@ -33,6 +62,10 @@ public:
|
||||
void destroy();
|
||||
static ObDeviceManager &get_instance();
|
||||
|
||||
int get_device_key(const common::ObString &storage_info,
|
||||
const common::ObString &storage_type_prefix,
|
||||
char *device_key,
|
||||
const int64_t device_key_len) const;
|
||||
/*for object device, will return a new object to caller*/
|
||||
/*ofs/local will share in upper logical*/
|
||||
// 1. ObObjectStorageInfo is a member of ObObjectDevice, which is used for accessing object storage.
|
||||
|
@ -286,7 +286,7 @@ DEFINE_ERROR(OB_ABORT_MAJOR_FREEZE_FAILED, -4212, -1, "HY000", "abort major free
|
||||
DEFINE_ERROR_EXT_DEP(OB_MAJOR_FREEZE_NOT_FINISHED, -4213, -1, "HY000", "last major freeze not finish", "%s");
|
||||
DEFINE_ERROR(OB_PARTITION_NOT_LEADER, -4214, -1, "HY000", "partition is not leader partition");
|
||||
DEFINE_ERROR(OB_WAIT_MAJOR_FREEZE_RESPONSE_TIMEOUT, -4215, -1, "HY000", "wait major freeze response timeout");
|
||||
DEFINE_ERROR(OB_CURL_ERROR, -4216, -1, "HY000", "curl error");
|
||||
DEFINE_ERROR_DEP(OB_CURL_ERROR, -4216, -1, "HY000", "curl error");
|
||||
DEFINE_ERROR_EXT(OB_MAJOR_FREEZE_NOT_ALLOW, -4217, -1, "HY000", "Major freeze not allowed now", "%s");
|
||||
DEFINE_ERROR(OB_PREPARE_FREEZE_FAILED, -4218, -1, "HY000", "prepare freeze failed");
|
||||
DEFINE_ORACLE_ERROR_EXT_DEP(OB_INVALID_DATE_VALUE, -4219, ER_TRUNCATED_WRONG_VALUE, "22007", "Incorrect value", "Incorrect datetime value: '%.*s' for column '%s'", 1861, "literal does not match format string", "literal does not match format string: '%.*s' for column '%s'");
|
||||
|
@ -106,7 +106,6 @@ constexpr int OB_COMMIT_MAJOR_FREEZE_FAILED = -4211;
|
||||
constexpr int OB_ABORT_MAJOR_FREEZE_FAILED = -4212;
|
||||
constexpr int OB_PARTITION_NOT_LEADER = -4214;
|
||||
constexpr int OB_WAIT_MAJOR_FREEZE_RESPONSE_TIMEOUT = -4215;
|
||||
constexpr int OB_CURL_ERROR = -4216;
|
||||
constexpr int OB_MAJOR_FREEZE_NOT_ALLOW = -4217;
|
||||
constexpr int OB_PREPARE_FREEZE_FAILED = -4218;
|
||||
constexpr int OB_PARTITION_NOT_EXIST = -4225;
|
||||
|
@ -1467,6 +1467,11 @@ DEF_BOOL(plsql_debug, OB_TENANT_PARAMETER, "False",
|
||||
DEF_BOOL(plsql_v2_compatibility, OB_TENANT_PARAMETER, "False",
|
||||
"allows to control store routine compile action at DDL stage",
|
||||
ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
|
||||
DEF_STR_WITH_CHECKER(sts_credential, OB_TENANT_PARAMETER, "",
|
||||
common::ObConfigSTScredentialChecker,
|
||||
"STS credential for object storage, "
|
||||
"values: sts_url=xxx&sts_ak=xxx&sts_sk=xxx",
|
||||
ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
|
||||
// for bloom filter
|
||||
DEF_BOOL(_bloom_filter_enabled, OB_TENANT_PARAMETER, "True",
|
||||
"enable join bloom filter",
|
||||
|
@ -331,6 +331,8 @@ int ObPhysicalRestoreJob::assign(const ObPhysicalRestoreJob &other)
|
||||
LOG_WARN("failed to assign path list", KR(ret), K(other));
|
||||
} else if (OB_FAIL(white_list_.assign(other.white_list_))) {
|
||||
LOG_WARN("failed to assign white list", KR(ret), K(other));
|
||||
} else if (OB_FAIL(deep_copy_ob_string(allocator_, other.sts_credential_, sts_credential_))) {
|
||||
LOG_WARN("failed to copy string", KR(ret), K(other));
|
||||
}
|
||||
|
||||
}
|
||||
@ -374,6 +376,7 @@ void ObPhysicalRestoreJob::reset()
|
||||
recover_table_ = false;
|
||||
using_complement_log_ = false;
|
||||
|
||||
sts_credential_.reset();
|
||||
|
||||
passwd_array_.reset();
|
||||
multi_restore_path_list_.reset();
|
||||
|
@ -191,6 +191,8 @@ public:
|
||||
Property_declare_int(bool, using_complement_log)
|
||||
Property_declare_int(int64_t, backup_compatible)
|
||||
|
||||
//for sts
|
||||
Property_declare_ObString(sts_credential)
|
||||
private:
|
||||
//job_id and tenant_id in __all_restore_job primary_key
|
||||
ObRestoreJobPersistKey restore_key_;
|
||||
|
@ -268,6 +268,7 @@ int ObPhysicalRestoreTableOperator::fill_dml_splicer(
|
||||
ADD_COLUMN_MACRO_IN_TABLE_OPERATOR(job_info, recover_table);
|
||||
ADD_COLUMN_MACRO_IN_TABLE_OPERATOR(job_info, using_complement_log);
|
||||
ADD_COLUMN_MACRO_IN_TABLE_OPERATOR(job_info, backup_compatible);
|
||||
ADD_COLUMN_MACRO_IN_TABLE_OPERATOR(job_info, sts_credential);
|
||||
|
||||
// source_cluster_version
|
||||
if (OB_SUCC(ret)) {
|
||||
@ -525,6 +526,7 @@ int ObPhysicalRestoreTableOperator::retrieve_restore_option(
|
||||
RETRIEVE_STR_VALUE(kms_encrypt_key, job);
|
||||
RETRIEVE_INT_VALUE(concurrency, job);
|
||||
RETRIEVE_INT_VALUE(backup_compatible, job);
|
||||
RETRIEVE_STR_VALUE(sts_credential, job);
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
if (name == "backup_dest") {
|
||||
|
@ -505,7 +505,6 @@ int ObLoadDataDirectImpl::DataReader::init(const DataAccessParam &data_access_pa
|
||||
file_read_param.file_location_ = data_access_param.file_location_;
|
||||
file_read_param.filename_ = data_desc.filename_;
|
||||
file_read_param.compression_format_ = data_access_param.compression_format_;
|
||||
file_read_param.access_info_ = data_access_param.access_info_;
|
||||
file_read_param.packet_handle_ = nullptr;
|
||||
if (OB_NOT_NULL(execute_ctx.exec_ctx_.get_session_info())
|
||||
&& OB_NOT_NULL(execute_ctx.exec_ctx_.get_session_info()->get_pl_query_sender())) {
|
||||
@ -514,7 +513,9 @@ int ObLoadDataDirectImpl::DataReader::init(const DataAccessParam &data_access_pa
|
||||
file_read_param.session_ = execute_ctx.exec_ctx_.get_session_info();
|
||||
file_read_param.timeout_ts_ = THIS_WORKER.get_timeout_ts();
|
||||
|
||||
if (OB_FAIL(ObFileReader::open(file_read_param, allocator_, file_reader_))) {
|
||||
if (OB_FAIL(file_read_param.access_info_.assign(data_access_param.access_info_))) {
|
||||
LOG_WARN("fail to assign access info", KR(ret), K_(data_access_param.access_info));
|
||||
} else if (OB_FAIL(ObFileReader::open(file_read_param, allocator_, file_reader_))) {
|
||||
LOG_WARN("failed to open file", KR(ret), K(data_desc));
|
||||
} else if (file_reader_->seekable()) {
|
||||
|
||||
@ -833,14 +834,16 @@ int ObLoadDataDirectImpl::SimpleDataSplitUtils::split(const DataAccessParam &dat
|
||||
ObFileReadParam file_read_param;
|
||||
file_read_param.file_location_ = data_access_param.file_location_;
|
||||
file_read_param.filename_ = data_desc.filename_;
|
||||
file_read_param.access_info_ = data_access_param.access_info_;
|
||||
file_read_param.compression_format_ = data_access_param.compression_format_;
|
||||
file_read_param.packet_handle_ = NULL;
|
||||
file_read_param.session_ = NULL;
|
||||
file_read_param.timeout_ts_ = THIS_WORKER.get_timeout_ts();
|
||||
|
||||
ObFileReader *file_reader = NULL;
|
||||
if (OB_FAIL(ObFileReader::open(file_read_param, allocator, file_reader))) {
|
||||
|
||||
if (OB_FAIL(file_read_param.access_info_.assign(data_access_param.access_info_))) {
|
||||
LOG_WARN("fail to assign access info", KR(ret), K_(data_access_param.access_info));
|
||||
} else if (OB_FAIL(ObFileReader::open(file_read_param, allocator, file_reader))) {
|
||||
LOG_WARN("failed to open file.", KR(ret), K(data_desc));
|
||||
} else if (!file_reader->seekable()) {
|
||||
if (OB_FAIL(data_desc_iter.add_data_desc(data_desc))) {
|
||||
@ -2397,7 +2400,9 @@ int ObLoadDataDirectImpl::init_execute_param()
|
||||
data_access_param.file_column_num_ = field_or_var_list.count();
|
||||
data_access_param.file_format_ = load_stmt_->get_data_struct_in_file();
|
||||
data_access_param.file_cs_type_ = load_args.file_cs_type_;
|
||||
data_access_param.access_info_ = load_args.access_info_;
|
||||
if (OB_FAIL(data_access_param.access_info_.assign(load_args.access_info_))) {
|
||||
LOG_WARN("fail to set access info", KR(ret));
|
||||
}
|
||||
data_access_param.compression_format_ = load_args.compression_format_;
|
||||
}
|
||||
// column_ids_
|
||||
|
@ -2804,13 +2804,15 @@ int ObLoadDataSPImpl::ToolBox::init(ObExecContext &ctx, ObLoadDataStmt &load_stm
|
||||
file_read_param.file_location_ = load_file_storage;
|
||||
// file_read_param.filename_ = load_args.file_name_;
|
||||
file_read_param.compression_format_ = load_args.compression_format_;
|
||||
file_read_param.access_info_ = load_args.access_info_;
|
||||
file_read_param.packet_handle_ = nullptr;
|
||||
if (OB_NOT_NULL(ctx.get_my_session()) && OB_NOT_NULL(ctx.get_my_session()->get_pl_query_sender())) {
|
||||
file_read_param.packet_handle_ = &ctx.get_my_session()->get_pl_query_sender()->get_packet_sender();
|
||||
}
|
||||
file_read_param.session_ = ctx.get_my_session();
|
||||
file_read_param.timeout_ts_ = THIS_WORKER.get_timeout_ts();
|
||||
if (OB_FAIL(file_read_param.access_info_.assign(load_args.access_info_))) {
|
||||
LOG_WARN("fail to assign access info", K(ret), K(load_args.access_info_));
|
||||
}
|
||||
}
|
||||
|
||||
OZ (init_file_size(ctx));
|
||||
|
@ -1094,7 +1094,8 @@ static const NonReservedKeyword Mysql_none_reserved_keywords[] =
|
||||
{"rb_and_agg", RB_AND_AGG},
|
||||
{"rb_iterate", RB_ITERATE},
|
||||
{"optimizer_costs", OPTIMIZER_COSTS},
|
||||
{"micro_index_clustered", MICRO_INDEX_CLUSTERED}
|
||||
{"micro_index_clustered", MICRO_INDEX_CLUSTERED},
|
||||
{"tenant_sts_credential", TENANT_STS_CREDENTIAL}
|
||||
};
|
||||
|
||||
/** https://dev.mysql.com/doc/refman/5.7/en/sql-syntax-prepared-statements.html
|
||||
|
@ -365,7 +365,7 @@ END_P SET_VAR DELIMITER
|
||||
TEMPLATE TEMPORARY TEMPTABLE TENANT TEXT THAN TIME TIMESTAMP TIMESTAMPADD TIMESTAMPDIFF TP_NO
|
||||
TP_NAME TRACE TRADITIONAL TRANSACTION TRIGGERS TRIM TRUNCATE TYPE TYPES TASK TABLET_SIZE
|
||||
TABLEGROUP_ID TENANT_ID THROTTLE TIME_ZONE_INFO TOP_K_FRE_HIST TIMES TRIM_SPACE TTL
|
||||
TRANSFER
|
||||
TRANSFER TENANT_STS_CREDENTIAL
|
||||
|
||||
UNCOMMITTED UNCONDITIONAL UNDEFINED UNDO_BUFFER_SIZE UNDOFILE UNICODE UNINSTALL UNIT UNIT_GROUP UNIT_NUM UNLOCKED UNTIL
|
||||
UNUSUAL UPGRADE USE_BLOOM_FILTER UNKNOWN USE_FRM USER USER_RESOURCES UNBOUNDED UP UNLIMITED USER_SPECIFIED
|
||||
@ -531,8 +531,9 @@ END_P SET_VAR DELIMITER
|
||||
%type <node> permanent_tablespace permanent_tablespace_options permanent_tablespace_option alter_tablespace_actions alter_tablespace_action alter_tablespace_options opt_force_purge
|
||||
%type <node> opt_tablespace_option opt_tablespace_options opt_tablespace_engine opt_alter_tablespace_option opt_alter_tablespace_options
|
||||
%type <node> opt_sql_throttle_for_priority opt_sql_throttle_using_cond sql_throttle_one_or_more_metrics sql_throttle_metric
|
||||
%type <node> opt_copy_id opt_backup_dest opt_backup_backup_dest opt_tenant_info opt_with_active_piece get_format_unit opt_backup_tenant_list opt_backup_to opt_description policy_name opt_recovery_window opt_redundancy opt_backup_copies opt_restore_until opt_backup_key_info opt_encrypt_key
|
||||
%type <node> opt_copy_id opt_backup_dest opt_backup_backup_dest opt_tenant_info opt_with_active_piece get_format_unit opt_backup_tenant_list opt_backup_to opt_description policy_name opt_recovery_window opt_redundancy opt_backup_copies opt_restore_until opt_encrypt_key
|
||||
%type <node> opt_recover_tenant recover_table_list recover_table_relation_name restore_remap_list remap_relation_name table_relation_name opt_recover_remap_item_list restore_remap_item_list restore_remap_item remap_item remap_table_val opt_tenant
|
||||
%type <node> opt_restore_with_config_list restore_with_config_list restore_with_config restore_with_item
|
||||
%type <node> new_or_old new_or_old_column_ref diagnostics_info_ref
|
||||
%type <node> on_empty on_error json_on_response opt_returning_type opt_on_empty_or_error json_value_expr opt_ascii opt_truncate_clause
|
||||
%type <node> json_extract_unquote_expr json_extract_expr json_query_expr opt_multivalue opt_asis opt_array opt_pretty opt_wrapper opt_scalars opt_query_on_error_or_empty_or_mismatch on_empty_query on_error_query on_mismatch_query opt_response_query
|
||||
@ -18996,7 +18997,7 @@ alter_with_opt_hint SYSTEM CLEAR RESTORE SOURCE
|
||||
malloc_terminal_node($$, result->malloc_pool_, T_CLEAR_RESTORE_SOURCE);
|
||||
}
|
||||
|
|
||||
alter_with_opt_hint SYSTEM RECOVER TABLE { result->is_for_remap_ = 1; } recover_table_list opt_recover_tenant opt_backup_dest opt_restore_until WITH STRING_VALUE opt_encrypt_key opt_backup_key_info opt_recover_remap_item_list opt_description
|
||||
alter_with_opt_hint SYSTEM RECOVER TABLE { result->is_for_remap_ = 1; } recover_table_list opt_recover_tenant opt_backup_dest opt_restore_until WITH STRING_VALUE opt_encrypt_key opt_restore_with_config_list opt_recover_remap_item_list opt_description
|
||||
{
|
||||
(void)($1);
|
||||
ParseNode *tables = NULL;
|
||||
@ -19010,7 +19011,7 @@ alter_with_opt_hint SYSTEM RESTORE FROM STRING_VALUE opt_restore_until PREVIEW
|
||||
malloc_non_terminal_node($$, result->malloc_pool_, T_PHYSICAL_RESTORE_TENANT, 2, $5, $6);
|
||||
}
|
||||
|
|
||||
alter_with_opt_hint SYSTEM RESTORE relation_name opt_backup_dest opt_restore_until WITH STRING_VALUE opt_encrypt_key opt_backup_key_info opt_description
|
||||
alter_with_opt_hint SYSTEM RESTORE relation_name opt_backup_dest opt_restore_until WITH STRING_VALUE opt_encrypt_key opt_restore_with_config_list opt_description
|
||||
{
|
||||
(void)($1);
|
||||
malloc_non_terminal_node($$, result->malloc_pool_, T_PHYSICAL_RESTORE_TENANT, 7, $4, $5, $6, $8, $9, $10, $11);
|
||||
@ -22252,12 +22253,45 @@ opt_restore_until:
|
||||
}
|
||||
;
|
||||
|
||||
opt_backup_key_info:
|
||||
/*EMPTY*/ { $$ = NULL; }
|
||||
| WITH KEY FROM STRING_VALUE opt_encrypt_key
|
||||
opt_restore_with_config_list:
|
||||
/*empty*/
|
||||
{
|
||||
result->contain_sensitive_data_ = true;
|
||||
malloc_non_terminal_node($$, result->malloc_pool_, T_BACKUP_KEY, 2, $4, $5);
|
||||
$$ = NULL;
|
||||
}
|
||||
| restore_with_config_list
|
||||
{
|
||||
ParseNode *with_items = NULL;
|
||||
merge_nodes(with_items, result, T_RESTORE_WITH_CONFIG_LIST, $1);
|
||||
$$ = with_items;
|
||||
}
|
||||
;
|
||||
|
||||
restore_with_config_list:
|
||||
restore_with_config
|
||||
{
|
||||
$$ = $1;
|
||||
}
|
||||
| restore_with_config_list restore_with_config
|
||||
{
|
||||
malloc_non_terminal_node($$, result->malloc_pool_, T_LINK_NODE, 2, $1, $2);
|
||||
}
|
||||
;
|
||||
|
||||
restore_with_config:
|
||||
WITH restore_with_item
|
||||
{
|
||||
$$ = $2;
|
||||
}
|
||||
;
|
||||
|
||||
restore_with_item:
|
||||
KEY FROM STRING_VALUE opt_encrypt_key
|
||||
{
|
||||
malloc_non_terminal_node($$, result->malloc_pool_, T_BACKUP_KEY, 2, $3, $4);
|
||||
}
|
||||
| TENANT_STS_CREDENTIAL STRING_VALUE
|
||||
{
|
||||
malloc_non_terminal_node($$, result->malloc_pool_, T_STS_CREDENTIAL, 1, $2);
|
||||
}
|
||||
;
|
||||
|
||||
@ -24478,6 +24512,7 @@ ACCESS_INFO
|
||||
| OVERWRITE
|
||||
| OPTIMIZER_COSTS
|
||||
| MICRO_INDEX_CLUSTERED
|
||||
| TENANT_STS_CREDENTIAL
|
||||
;
|
||||
|
||||
unreserved_keyword_special:
|
||||
|
@ -3913,22 +3913,9 @@ int ObPhysicalRestoreTenantResolver::resolve(const ParseNode &parse_tree)
|
||||
&& OB_FAIL(Util::resolve_string(parse_tree.children_[4],
|
||||
stmt->get_rpc_arg().encrypt_key_))) {
|
||||
LOG_WARN("failed to resolve encrypt key", K(ret));
|
||||
} else if (OB_NOT_NULL(parse_tree.children_[5])) {
|
||||
ParseNode *kms_node = parse_tree.children_[5];
|
||||
if (2 != kms_node->num_child_) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("num of children not match", K(ret), "child_num", kms_node->num_child_);
|
||||
} else if (OB_ISNULL(kms_node->children_[0])) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("kms uri should not be NULL", K(ret));
|
||||
} else if (OB_FAIL(Util::resolve_string(kms_node->children_[0],
|
||||
stmt->get_rpc_arg().kms_uri_))) {
|
||||
LOG_WARN("failed to resolve kms uri", K(ret));
|
||||
} else if (OB_NOT_NULL(kms_node->children_[1])
|
||||
&& OB_FAIL(Util::resolve_string(kms_node->children_[1],
|
||||
stmt->get_rpc_arg().kms_encrypt_key_))) {
|
||||
LOG_WARN("failed to resolve kms encrypt key", K(ret));
|
||||
}
|
||||
} else if (OB_NOT_NULL(parse_tree.children_[5])
|
||||
&& OB_FAIL(resolve_restore_with_config_item(parse_tree.children_[5], stmt->get_rpc_arg()))) {
|
||||
LOG_WARN("fail to resolve config item", K(ret));
|
||||
}
|
||||
|
||||
ParseNode *description_node = parse_tree.children_[6];
|
||||
@ -3972,6 +3959,50 @@ int ObPhysicalRestoreTenantResolver::resolve(const ParseNode &parse_tree)
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObPhysicalRestoreTenantResolver::resolve_restore_with_config_item(const ParseNode *node, obrpc::ObPhysicalRestoreTenantArg &arg)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_ISNULL(node)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("node must not be null", K(ret));
|
||||
} else if (T_RESTORE_WITH_CONFIG_LIST != node->type_) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("invalid nde type", K(ret), "node_type", node->type_);
|
||||
} else if (node->num_child_ > 2) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("children num not match", K(ret), "num_child", node->num_child_);
|
||||
} else {
|
||||
for (int64_t i = 0; i < node->num_child_ && OB_SUCC(ret); i++) {
|
||||
const ParseNode *child_node = node->children_[i];
|
||||
if (OB_ISNULL(child_node)) {
|
||||
} else if (T_BACKUP_KEY == child_node->type_) {
|
||||
if (2 != child_node->num_child_) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("num of children not match", K(ret), "child_num", child_node->num_child_);
|
||||
} else if (OB_ISNULL(child_node->children_[0])) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("kms uri should not be NULL", K(ret));
|
||||
} else if (OB_FAIL(Util::resolve_string(child_node->children_[0],
|
||||
arg.kms_uri_))) {
|
||||
LOG_WARN("failed to resolve kms uri", K(ret));
|
||||
} else if (OB_NOT_NULL(child_node->children_[1])
|
||||
&& OB_FAIL(Util::resolve_string(child_node->children_[1],
|
||||
arg.kms_encrypt_key_))) {
|
||||
LOG_WARN("failed to resolve kms encrypt key", K(ret));
|
||||
}
|
||||
} else if (T_STS_CREDENTIAL == child_node->type_) {
|
||||
if (1 != child_node->num_child_) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("num of children not match", K(ret), "child_num", child_node->num_child_);
|
||||
} else if (OB_FAIL(Util::resolve_string(child_node->children_[0], arg.sts_credential_))) {
|
||||
LOG_WARN("fail to resolve string", K(ret));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
#ifdef OB_BUILD_TDE_SECURITY
|
||||
int ObPhysicalRestoreTenantResolver::resolve_kms_encrypt_info(common::ObString store_option)
|
||||
{
|
||||
@ -6052,22 +6083,9 @@ int ObRecoverTableResolver::resolve(const ParseNode &parse_tree)
|
||||
} else if (OB_NOT_NULL(parse_tree.children_[5])
|
||||
&& OB_FAIL(Util::resolve_string(parse_tree.children_[5], stmt->get_rpc_arg().restore_tenant_arg_.encrypt_key_))) {
|
||||
LOG_WARN("failed to resolve encrypt key", K(ret));
|
||||
} else if (OB_NOT_NULL(parse_tree.children_[6])) {
|
||||
ParseNode *kms_node = parse_tree.children_[6];
|
||||
if (2 != kms_node->num_child_) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("num of children not match", K(ret), "child_num", kms_node->num_child_);
|
||||
} else if (OB_ISNULL(kms_node->children_[0])) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("kms uri should not be NULL", K(ret));
|
||||
} else if (OB_FAIL(Util::resolve_string(kms_node->children_[0],
|
||||
stmt->get_rpc_arg().restore_tenant_arg_.kms_uri_))) {
|
||||
LOG_WARN("failed to resolve kms uri", K(ret));
|
||||
} else if (OB_NOT_NULL(kms_node->children_[1])
|
||||
&& OB_FAIL(Util::resolve_string(kms_node->children_[1],
|
||||
stmt->get_rpc_arg().restore_tenant_arg_.kms_encrypt_key_))) {
|
||||
LOG_WARN("failed to resolve kms encrypt key", K(ret));
|
||||
}
|
||||
} else if (OB_NOT_NULL(parse_tree.children_[6])
|
||||
&& OB_FAIL(resolve_restore_with_config_item_(parse_tree.children_[6], stmt->get_rpc_arg()))){
|
||||
LOG_WARN("fail to resolve restore_with config item", K(ret));
|
||||
}
|
||||
|
||||
if (OB_FAIL(ret)) {
|
||||
@ -6623,6 +6641,50 @@ int ObRecoverTableResolver::resolve_tenant_(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObRecoverTableResolver::resolve_restore_with_config_item_(const ParseNode *node, obrpc::ObRecoverTableArg &arg)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_ISNULL(node)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("node must not be null", K(ret));
|
||||
} else if (T_RESTORE_WITH_CONFIG_LIST != node->type_) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("invalid nde type", K(ret), "node_type", node->type_);
|
||||
} else if (node->num_child_ > 2) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("children num not match", K(ret), "num_child", node->num_child_);
|
||||
} else {
|
||||
for (int64_t i = 0; i < node->num_child_ && OB_SUCC(ret); i++) {
|
||||
const ParseNode *child_node = node->children_[i];
|
||||
if (OB_ISNULL(child_node)) {
|
||||
} else if (T_BACKUP_KEY == child_node->type_) {
|
||||
if (2 != child_node->num_child_) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("num of children not match", K(ret), "child_num", child_node->num_child_);
|
||||
} else if (OB_ISNULL(child_node->children_[0])) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("kms uri should not be NULL", K(ret));
|
||||
} else if (OB_FAIL(Util::resolve_string(child_node->children_[0],
|
||||
arg.restore_tenant_arg_.kms_uri_))) {
|
||||
LOG_WARN("failed to resolve kms uri", K(ret));
|
||||
} else if (OB_NOT_NULL(child_node->children_[1])
|
||||
&& OB_FAIL(Util::resolve_string(child_node->children_[1],
|
||||
arg.restore_tenant_arg_.kms_encrypt_key_))) {
|
||||
LOG_WARN("failed to resolve kms encrypt key", K(ret));
|
||||
}
|
||||
} else if (T_STS_CREDENTIAL == child_node->type_) {
|
||||
if (1 != child_node->num_child_) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("num of children not match", K(ret), "child_num", child_node->num_child_);
|
||||
} else if (OB_FAIL(Util::resolve_string(child_node->children_[0], arg.restore_tenant_arg_.sts_credential_))) {
|
||||
LOG_WARN("fail to resolve string", K(ret));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObRecoverTenantResolver::resolve(const ParseNode &parse_tree)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
@ -208,6 +208,7 @@ class ObPhysicalRestoreTenantResolver : public ObSystemCmdResolver
|
||||
#endif
|
||||
int resolve_decryption_passwd(obrpc::ObPhysicalRestoreTenantArg &arg);
|
||||
int resolve_restore_source_array(obrpc::ObPhysicalRestoreTenantArg &arg);
|
||||
int resolve_restore_with_config_item(const ParseNode *node, obrpc::ObPhysicalRestoreTenantArg &arg);
|
||||
};
|
||||
|
||||
class ObRecoverTenantResolver : public ObSystemCmdResolver
|
||||
@ -341,6 +342,7 @@ private:
|
||||
#endif
|
||||
int resolve_backup_set_pwd_(common::ObString &pwd);
|
||||
int resolve_restore_source_(common::ObString &restore_source);
|
||||
int resolve_restore_with_config_item_(const ParseNode *node, obrpc::ObRecoverTableArg &arg);
|
||||
};
|
||||
|
||||
DEF_SIMPLE_CMD_RESOLVER(ObTableTTLResolver);
|
||||
|
@ -88,14 +88,14 @@ struct ObLoadArgument
|
||||
K_(file_iter),
|
||||
K_(compression_format));
|
||||
|
||||
void assign(const ObLoadArgument &other) {
|
||||
int assign(const ObLoadArgument &other) {
|
||||
int ret = OB_SUCCESS;
|
||||
load_file_storage_ = other.load_file_storage_;
|
||||
is_default_charset_ = other.is_default_charset_;
|
||||
ignore_rows_ = other.ignore_rows_;
|
||||
dupl_action_ = other.dupl_action_;
|
||||
file_cs_type_ = other.file_cs_type_;
|
||||
file_name_ = other.file_name_;
|
||||
access_info_ = other.access_info_;
|
||||
database_name_ = other.database_name_;
|
||||
table_name_ = other.table_name_;
|
||||
combined_name_ = other.combined_name_;
|
||||
@ -106,6 +106,10 @@ struct ObLoadArgument
|
||||
part_level_ = other.part_level_;
|
||||
file_iter_.copy(other.file_iter_);
|
||||
compression_format_ = other.compression_format_;
|
||||
if (OB_FAIL(access_info_.assign(other.access_info_))) {
|
||||
OB_LOG(WARN, "fail to assign access info", K(ret), K_(other.access_info));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
ObLoadFileLocation load_file_storage_;
|
||||
|
@ -258,6 +258,7 @@ standby_fetch_log_bandwidth_limit
|
||||
storage_meta_cache_priority
|
||||
storage_rowsets_size
|
||||
strict_check_os_params
|
||||
sts_credential
|
||||
sync_io_thread_count
|
||||
syslog_compress_func
|
||||
syslog_disk_size
|
||||
|
@ -74,7 +74,7 @@ int ObAdminIOAdapterBenchmarkExecutor::parse_cmd_(int argc, char *argv[])
|
||||
int ret = OB_SUCCESS;
|
||||
int opt = 0;
|
||||
int index = -1;
|
||||
const char *opt_str = "h:d:s:t:r:l:o:n:f:p:b:c:j:e:";
|
||||
const char *opt_str = "h:d:s:t:r:l:o:n:f:p:b:c:j:e:i:";
|
||||
struct option longopts[] = {{"help", 0, NULL, 'h'},
|
||||
{"file-path-prefix", 1, NULL, 'd'},
|
||||
{"storage-info", 1, NULL, 's'},
|
||||
@ -89,6 +89,7 @@ int ObAdminIOAdapterBenchmarkExecutor::parse_cmd_(int argc, char *argv[])
|
||||
{"clean-before-execution", 0, NULL, 'b'},
|
||||
{"clean-after-execution", 0, NULL, 'c'},
|
||||
{"s3_url_encode_type", 0, NULL, 'e'},
|
||||
{"sts_credential", 0, NULL, 'i'},
|
||||
{NULL, 0, NULL, 0}};
|
||||
while (OB_SUCC(ret) && -1 != (opt = getopt_long(argc, argv, opt_str, longopts, &index))) {
|
||||
switch (opt) {
|
||||
@ -188,6 +189,12 @@ int ObAdminIOAdapterBenchmarkExecutor::parse_cmd_(int argc, char *argv[])
|
||||
}
|
||||
break;
|
||||
}
|
||||
case 'i': {
|
||||
if (OB_FAIL(set_sts_credential_key(optarg))) {
|
||||
STORAGE_LOG(WARN, "failed to set sts credential", KR(ret));
|
||||
}
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
print_usage_();
|
||||
exit(1);
|
||||
@ -333,6 +340,9 @@ int ObAdminIOAdapterBenchmarkExecutor::print_usage_()
|
||||
printf("\tob_admin bench_io_adapter -d's3://home/admin/backup_info' "
|
||||
"-s'host=xxx.com&access_id=111&access_key=222®ion=333'\t"
|
||||
"-e'compliantRfc3986Encoding'");
|
||||
printf("\tob_admin bench_io_adapter -d's3://home/admin/backup_info' "
|
||||
"-s'host=xxx.com&role_arn=111®ion=333'\t"
|
||||
"-i'sts_url=xxx&sts_ak=xxx&sts_sk=xxx'");
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
@ -44,6 +44,7 @@ ObAdminExecutor::ObAdminExecutor()
|
||||
// 设置MTL上下文
|
||||
mock_server_tenant_.set(&blocksstable::ObDecodeResourcePool::get_instance());
|
||||
share::ObTenantEnv::set_tenant(&mock_server_tenant_);
|
||||
omt::ObTenantConfigMgr::get_instance().add_tenant_config(OB_SYS_TENANT_ID);
|
||||
|
||||
storage_env_.data_dir_ = data_dir_;
|
||||
storage_env_.sstable_dir_ = sstable_dir_;
|
||||
@ -215,6 +216,28 @@ int ObAdminExecutor::set_s3_url_encode_type(const char *type_str) const
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
int ObAdminExecutor::set_sts_credential_key(const char *sts_credential)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_ISNULL(sts_credential)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
STORAGE_LOG(WARN, "sts credential is null", KR(ret), KP(sts_credential));
|
||||
} else {
|
||||
if (OB_FAIL(ObDeviceManager::get_instance().init_devices_env())) {
|
||||
STORAGE_LOG(WARN, "fail to init device env", KR(ret));
|
||||
} else {
|
||||
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(OB_SYS_TENANT_ID));
|
||||
if (OB_UNLIKELY(!tenant_config.is_valid())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
STORAGE_LOG(
|
||||
WARN, "tenant config is invalid", KR(ret), K(OB_SYS_TENANT_ID));
|
||||
} else {
|
||||
tenant_config->sts_credential = sts_credential;
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
@ -45,6 +45,7 @@ protected:
|
||||
int prepare_decoder();
|
||||
int load_config();
|
||||
int set_s3_url_encode_type(const char *type_str) const;
|
||||
int set_sts_credential_key(const char *sts_credential);
|
||||
|
||||
protected:
|
||||
share::ObTenantBase mock_server_tenant_;
|
||||
|
@ -193,7 +193,7 @@ TEST_F(TestDeviceManager, test_device_manager)
|
||||
device_num = manager.get_device_cnt();
|
||||
ASSERT_EQ(1, device_num); //since we do not release automatic
|
||||
|
||||
//MAX_DEVICE_INSTANCE different deivce
|
||||
//MAX_DEVICE_INSTANCE different deivce
|
||||
for (int i = 0; i < max_dev_num; i++ ) {
|
||||
ObObjectStorageInfo tmp_storage_info;
|
||||
tmp_storage_info.device_type_ = ObStorageType::OB_STORAGE_OSS;
|
||||
|
Loading…
x
Reference in New Issue
Block a user