[CP] [CP] add option to configure s3 addressing model

This commit is contained in:
nautaa 2024-10-29 06:44:52 +00:00 committed by ob-robot
parent 6f6a1f5db8
commit f1a0605889
5 changed files with 66 additions and 1 deletions

View File

@ -291,6 +291,12 @@ int ObObjectStorageInfo::parse_storage_info_(const char *storage_info, bool &has
} else if (OB_FAIL(set_storage_info_field_(token, extension_, sizeof(extension_)))) {
LOG_WARN("failed to set delete mode", K(ret), K(token));
}
} else if (0 == strncmp(ADDRESSING_MODEL, token, strlen(ADDRESSING_MODEL))) {
if (OB_FAIL(check_addressing_model_(token + strlen(ADDRESSING_MODEL)))) {
OB_LOG(WARN, "failed to check addressing model", K(ret), K(token));
} else if (OB_FAIL(set_storage_info_field_(token, extension_, sizeof(extension_)))) {
LOG_WARN("failed to set addressing model", K(ret), K(token));
}
} else if (0 == strncmp(CHECKSUM_TYPE, token, strlen(CHECKSUM_TYPE))) {
const char *checksum_type_str = token + strlen(CHECKSUM_TYPE);
if (OB_FAIL(set_checksum_type_(checksum_type_str))) {
@ -322,6 +328,20 @@ int ObObjectStorageInfo::check_delete_mode_(const char *delete_mode)
return ret;
}
int ObObjectStorageInfo::check_addressing_model_(const char *addressing_model) const
{
int ret = OB_SUCCESS;
if (OB_ISNULL(addressing_model)) {
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "invalid args", K(ret), KP(addressing_model));
} else if (0 != strcmp(addressing_model, ADDRESSING_MODEL_VIRTUAL_HOSTED_STYLE)
&& 0 != strcmp(addressing_model, ADDRESSING_MODEL_PATH_STYLE)) {
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "addressing model is invalid", K(ret), K(addressing_model));
}
return ret;
}
bool is_oss_supported_checksum(const ObStorageChecksumType checksum_type)
{
return checksum_type == ObStorageChecksumType::OB_NO_CHECKSUM_ALGO

View File

@ -42,11 +42,20 @@ const char *const REGION = "s3_region=";
const char *const MAX_IOPS = "max_iops=";
const char *const MAX_BANDWIDTH = "max_bandwidth=";
const char *const ADDRESSING_MODEL = "addressing_model=";
const char *const ADDRESSING_MODEL_VIRTUAL_HOSTED_STYLE = "virtual_hosted_style";
const char *const ADDRESSING_MODEL_PATH_STYLE = "path_style";
const char *const CHECKSUM_TYPE = "checksum_type=";
const char *const CHECKSUM_TYPE_NO_CHECKSUM = "no_checksum";
const char *const CHECKSUM_TYPE_MD5 = "md5";
const char *const CHECKSUM_TYPE_CRC32 = "crc32";
enum ObStorageAddressingModel
{
OB_VIRTUAL_HOSTED_STYLE = 0,
OB_PATH_STYLE = 1,
};
enum ObStorageChecksumType : uint8_t
{
OB_NO_CHECKSUM_ALGO = 0,
@ -98,6 +107,7 @@ protected:
virtual int get_access_key_(char *key_buf, const int64_t key_buf_len) const;
virtual int parse_storage_info_(const char *storage_info, bool &has_appid);
int check_delete_mode_(const char *delete_mode);
int check_addressing_model_(const char *addressing_model) const;
int set_checksum_type_(const char *checksum_type_str);
int set_storage_info_field_(const char *info, char *field, const int64_t length);

View File

@ -208,6 +208,9 @@ int ObS3Client::init_s3_client_configuration_(const ObS3Account &account,
config.payloadSigningPolicy = Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never;
config.endpointOverride = account.endpoint_;
config.executor = nullptr;
if (account.addressing_model_ == ObStorageAddressingModel::OB_PATH_STYLE) {
config.useVirtualAddressing = false;
}
// Default maxRetries is 10
std::shared_ptr<ObStorageS3DisabledRetryStrategy> retryStrategy =
@ -1022,6 +1025,7 @@ void ObS3Account::reset()
MEMSET(endpoint_, 0, sizeof(endpoint_));
MEMSET(access_id_, 0, sizeof(access_id_));
MEMSET(secret_key_, 0, sizeof(secret_key_));
addressing_model_ = ObStorageAddressingModel::OB_VIRTUAL_HOSTED_STYLE;
}
int64_t ObS3Account::hash() const
@ -1031,6 +1035,7 @@ int64_t ObS3Account::hash() const
hash_value = murmurhash(endpoint_, static_cast<int32_t>(strlen(endpoint_)), hash_value);
hash_value = murmurhash(access_id_, static_cast<int32_t>(strlen(access_id_)), hash_value);
hash_value = murmurhash(secret_key_, static_cast<int32_t>(strlen(secret_key_)), hash_value);
hash_value = murmurhash(&addressing_model_, sizeof(addressing_model_), hash_value);
return hash_value;
}
@ -1084,6 +1089,15 @@ int ObS3Account::parse_from(const char *storage_info_str, const int64_t size)
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "delete mode is invalid", K(ret), KCSTRING(token));
}
} else if (0 == strncmp(ADDRESSING_MODEL, token, strlen(ADDRESSING_MODEL))) {
if (0 == strcmp(token + strlen(ADDRESSING_MODEL), ADDRESSING_MODEL_VIRTUAL_HOSTED_STYLE)) {
addressing_model_ = ObStorageAddressingModel::OB_VIRTUAL_HOSTED_STYLE;
} else if (0 == strcmp(token + strlen(ADDRESSING_MODEL), ADDRESSING_MODEL_PATH_STYLE)) {
addressing_model_ = ObStorageAddressingModel::OB_PATH_STYLE;
} else {
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "addressing model is invalid", K(ret), KCSTRING(token));
}
} else {
OB_LOG(DEBUG, "unknown s3 info", K(*token));
}

View File

@ -136,7 +136,7 @@ struct ObS3Account
void reset();
bool is_valid() const { return is_valid_; }
int64_t hash() const;
TO_STRING_KV(K_(is_valid), K_(delete_mode), K_(region), K_(endpoint), K_(access_id));
TO_STRING_KV(K_(is_valid), K_(delete_mode), K_(region), K_(endpoint), K_(access_id), K_(addressing_model));
int parse_from(const char *storage_info_str, const int64_t size);
int set_field(const char *value, char *field, const uint32_t field_length);
@ -147,6 +147,7 @@ struct ObS3Account
char endpoint_[MAX_S3_ENDPOINT_LENGTH];
char access_id_[MAX_S3_ACCESS_ID_LENGTH]; // ak
char secret_key_[MAX_S3_SECRET_KEY_LENGTH]; // sk
ObStorageAddressingModel addressing_model_;
};
class ObS3MemoryManager : public Aws::Utils::Memory::MemorySystemInterface

View File

@ -187,6 +187,26 @@ TEST(ObObjectStorageInfo, s3)
ASSERT_EQ(OB_INVALID_ARGUMENT, info1.set(uri, storage_info));
}
TEST(ObObjectStorageInfo, s3_compatible)
{
const char *uri = "s3://backup_dir?host=xxx.com&access_id=111&access_key=222&s3_region=333";
ObObjectStorageInfo info1;
const char *storage_info = "";
storage_info = "host=xxx.com&access_id=111&access_key=222&s3_region=333&delete_mode=delete&addressing_model=";
info1.reset();
ASSERT_EQ(OB_INVALID_ARGUMENT, info1.set(uri, storage_info));
storage_info = "host=xxx.com&access_id=111&access_key=222&s3_region=333&delete_mode=delete&addressing_model=path_style";
info1.reset();
ASSERT_EQ(OB_SUCCESS, info1.set(uri, storage_info));
storage_info = "host=xxx.com&access_id=111&access_key=222&s3_region=333&addressing_model=virtual_hosted_style";
info1.reset();
ASSERT_EQ(OB_SUCCESS, info1.set(uri, storage_info));
storage_info = "host=xxx.com&access_id=111&access_key=222&s3_region=333&addressing_model=xxx";
info1.reset();
ASSERT_EQ(OB_INVALID_ARGUMENT, info1.set(uri, storage_info));
}
int main(int argc, char **argv)
{
system("rm -f test_storage_info.log*");