[FEAT MERGE] Backup support AWS S3

Co-authored-by: donglou-zhang <zhangleisoft2012@163.com>
Co-authored-by: xuhuleon <xuhuleon@qq.com>
This commit is contained in:
LoLolobster
2023-12-13 13:43:20 +00:00
committed by ob-robot
parent 051cb24288
commit dd84275032
68 changed files with 12175 additions and 1129 deletions

View File

@ -17,15 +17,18 @@ namespace oceanbase
namespace common
{
const char *OB_STORAGE_ACCESS_TYPES_STR[] = {"reader", "overwriter", "appender", "random_write"};
const char *OB_STORAGE_ACCESS_TYPES_STR[] = {"reader", "adaptive_reader", "overwriter",
"appender", "random_write", "multipart_writer"};
ObObjectDevice::ObObjectDevice()
: storage_info_(), is_started_(false), lock_(common::ObLatchIds::OBJECT_DEVICE_LOCK)
{
ObMemAttr attr = SET_USE_500("ObjectDevice");
reader_ctx_pool_.set_attr(attr);
adaptive_reader_ctx_pool_.set_attr(attr);
appender_ctx_pool_.set_attr(attr);
overwriter_ctx_pool_.set_attr(attr);
multipart_writer_ctx_pool_.set_attr(attr);
}
int ObObjectDevice::init(const ObIODOpts &opts)
@ -41,7 +44,9 @@ void ObObjectDevice::destroy()
if (is_started_) {
appender_ctx_pool_.reset();
reader_ctx_pool_.reset();
adaptive_reader_ctx_pool_.reset();
overwriter_ctx_pool_.reset();
multipart_writer_ctx_pool_.reset();
//close the util
util_.close();
//baseinfo will be free with allocator
@ -131,12 +136,16 @@ int ObObjectDevice::get_access_type(ObIODOpts *opts, ObStorageAccessType& access
OB_LOG(WARN, "can not find access type!");
} else if (0 == STRCMP(access_type , OB_STORAGE_ACCESS_TYPES_STR[OB_STORAGE_ACCESS_READER])) {
access_type_flag = OB_STORAGE_ACCESS_READER;
} else if (0 == STRCMP(access_type , OB_STORAGE_ACCESS_TYPES_STR[OB_STORAGE_ACCESS_ADAPTIVE_READER])) {
access_type_flag = OB_STORAGE_ACCESS_ADAPTIVE_READER;
} else if (0 == STRCMP(access_type , OB_STORAGE_ACCESS_TYPES_STR[OB_STORAGE_ACCESS_OVERWRITER])) {
access_type_flag = OB_STORAGE_ACCESS_OVERWRITER;
} else if (0 == STRCMP(access_type , OB_STORAGE_ACCESS_TYPES_STR[OB_STORAGE_ACCESS_APPENDER])) {
access_type_flag = OB_STORAGE_ACCESS_APPENDER;
} else if (0 == STRCMP(access_type , OB_STORAGE_ACCESS_TYPES_STR[OB_STORAGE_ACCESS_RANDOMWRITER])) {
access_type_flag = OB_STORAGE_ACCESS_RANDOMWRITER;
} else if (0 == STRCMP(access_type , OB_STORAGE_ACCESS_TYPES_STR[OB_STORAGE_ACCESS_MULTIPART_WRITER])) {
access_type_flag = OB_STORAGE_ACCESS_MULTIPART_WRITER;
} else {
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "invaild access type!", KCSTRING(access_type));
@ -165,6 +174,27 @@ int ObObjectDevice::open_for_reader(const char *pathname, void*& ctx)
return ret;
}
int ObObjectDevice::open_for_adaptive_reader_(const char *pathname, void *&ctx)
{
int ret = OB_SUCCESS;
ObStorageAdaptiveReader *adaptive_reader = adaptive_reader_ctx_pool_.alloc();
if (OB_ISNULL(adaptive_reader)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
OB_LOG(WARN, "fail to alloc mem for object device adaptive_reader! ", K(ret), K(pathname));
} else {
if (OB_FAIL(adaptive_reader->open(pathname, &storage_info_))) {
OB_LOG(WARN, "fail to open for read!", K(ret), K(pathname), K_(storage_info));
} else {
ctx = (void*)adaptive_reader;
}
}
if (OB_FAIL(ret) && OB_NOT_NULL(adaptive_reader)) {
adaptive_reader_ctx_pool_.free(adaptive_reader);
adaptive_reader = nullptr;
}
return ret;
}
/*ObStorageOssMultiPartWriter is not used int the current version, if we use, later, the open func of
overwriter maybe need to add para(just like the open func of appender)*/
int ObObjectDevice::open_for_overwriter(const char *pathname, void*& ctx)
@ -232,6 +262,27 @@ int ObObjectDevice::open_for_appender(const char *pathname, ObIODOpts *opts, voi
return ret;
}
int ObObjectDevice::open_for_multipart_writer_(const char *pathname, void *&ctx)
{
int ret = OB_SUCCESS;
ObStorageMultiPartWriter *multipart_writer = multipart_writer_ctx_pool_.alloc();
if (OB_ISNULL(multipart_writer)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
OB_LOG(WARN, "fail to alloc multipart_writer!", K(ret), K(pathname));
} else {
if (OB_FAIL(multipart_writer->open(pathname, &storage_info_))) {
OB_LOG(WARN, "fail to open for multipart_writer!", K(ret), K(pathname), K_(storage_info));
} else {
ctx = (void*)multipart_writer;
}
}
if (OB_FAIL(ret) && OB_NOT_NULL(multipart_writer)) {
multipart_writer_ctx_pool_.free(multipart_writer);
multipart_writer = nullptr;
}
return ret;
}
int ObObjectDevice::release_res(void* ctx, const ObIOFd &fd, ObStorageAccessType access_type)
{
int ret = OB_SUCCESS;
@ -254,12 +305,24 @@ int ObObjectDevice::release_res(void* ctx, const ObIOFd &fd, ObStorageAccessType
OB_LOG(WARN, "fail to close the reader!", K(ret), K(access_type));
}
reader_ctx_pool_.free(reader);
} else if (OB_STORAGE_ACCESS_ADAPTIVE_READER == access_type) {
ObStorageAdaptiveReader *adaptive_reader = static_cast<ObStorageAdaptiveReader*>(ctx);
if (OB_FAIL(adaptive_reader->close())) {
OB_LOG(WARN, "fail to close the adaptive reader!", K(ret), K(access_type));
}
adaptive_reader_ctx_pool_.free(adaptive_reader);
} else if (OB_STORAGE_ACCESS_OVERWRITER == access_type) {
ObStorageWriter *overwriter = static_cast<ObStorageWriter*>(ctx);
if (OB_FAIL(overwriter->close())) {
OB_LOG(WARN, "fail to close the overwriter!", K(ret), K(access_type));
}
overwriter_ctx_pool_.free(overwriter);
} else if (OB_STORAGE_ACCESS_MULTIPART_WRITER == access_type) {
ObStorageMultiPartWriter *multipart_writer = static_cast<ObStorageMultiPartWriter*>(ctx);
if (OB_FAIL(multipart_writer->close())) {
OB_LOG(WARN, "fail to close the multipart writer!", K(ret), K(access_type));
}
multipart_writer_ctx_pool_.free(multipart_writer);
} else {
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "invalid access_type!", K(access_type), K(ret));
@ -303,11 +366,15 @@ int ObObjectDevice::open(const char *pathname, const int flags, const mode_t mod
} else {
if (OB_STORAGE_ACCESS_READER == access_type) {
ret = open_for_reader(pathname, ctx);
} else if (OB_STORAGE_ACCESS_ADAPTIVE_READER == access_type) {
ret = open_for_adaptive_reader_(pathname, ctx);
} else if (OB_STORAGE_ACCESS_APPENDER == access_type ||
OB_STORAGE_ACCESS_RANDOMWRITER == access_type) {
ret = open_for_appender(pathname, opts, ctx);
} else if (OB_STORAGE_ACCESS_OVERWRITER == access_type) {
ret = open_for_overwriter(pathname, ctx);
} else if (OB_STORAGE_ACCESS_MULTIPART_WRITER == access_type) {
ret = open_for_multipart_writer_(pathname, ctx);
}
if (OB_FAIL(ret)) {
@ -342,7 +409,7 @@ int ObObjectDevice::close(const ObIOFd &fd)
{
int ret = OB_SUCCESS;
int flag = -1;
void* ctx = NULL;
void *ctx = NULL;
fd_mng_.get_fd_flag(fd, flag);
if (!fd_mng_.validate_fd(fd, true)) {
@ -356,6 +423,43 @@ int ObObjectDevice::close(const ObIOFd &fd)
return ret;
}
int ObObjectDevice::seal_for_adaptive(const ObIOFd &fd)
{
int ret = OB_SUCCESS;
int flag = -1;
void *ctx = NULL;
fd_mng_.get_fd_flag(fd, flag);
if (!fd_mng_.validate_fd(fd, true)) {
ret = OB_NOT_INIT;
OB_LOG(WARN, "fd is not init!", K(fd.first_id_), K(fd.second_id_));
} else if (OB_FAIL(fd_mng_.fd_to_ctx(fd, ctx))) {
OB_LOG(WARN, "fail to get ctx accroding fd!", K(ret));
} else if (OB_ISNULL(ctx)) {
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "fd ctx is null!", K(flag), K(ret));
} else if (flag == OB_STORAGE_ACCESS_RANDOMWRITER || flag == OB_STORAGE_ACCESS_APPENDER) {
ObStorageAppender *appender = static_cast<ObStorageAppender*>(ctx);
if (OB_FAIL(appender->seal_for_adaptive())) {
OB_LOG(WARN, "fail to seal!", K(ret), K(flag));
}
} else {
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "unknow access type, not a appender fd!", K(flag), K(ret));
}
return ret;
}
int ObObjectDevice::del_unmerged_parts(const char *pathname)
{
int ret = OB_SUCCESS;
common::ObString uri(pathname);
if (OB_FAIL(util_.del_unmerged_parts(uri))) {
OB_LOG(WARN, "fail to del unmerged parts", K(ret), K(uri));
}
return ret;
}
int ObObjectDevice::mkdir(const char *pathname, mode_t mode)
{
UNUSED(mode);
@ -371,24 +475,66 @@ int ObObjectDevice::rmdir(const char *pathname)
int ObObjectDevice::unlink(const char *pathname)
{
return inner_unlink_(pathname, false/*is_adaptive*/);
}
int ObObjectDevice::adaptive_unlink(const char *pathname)
{
const bool is_adaptive = true;
return inner_unlink_(pathname, is_adaptive);
}
int ObObjectDevice::inner_unlink_(const char *pathname, const bool is_adaptive)
{
int ret = OB_SUCCESS;
common::ObString uri(pathname);
return util_.del_file(uri);
if (OB_FAIL(util_.del_file(uri, is_adaptive))) {
OB_LOG(WARN, "fail to del file", K(ret), K(uri), K(is_adaptive));
}
return ret;
}
int ObObjectDevice::exist(const char *pathname, bool &is_exist)
{
return inner_exist_(pathname, is_exist, false/*is_adaptive*/);
}
int ObObjectDevice::adaptive_exist(const char *pathname, bool &is_exist)
{
const bool is_adaptive = true;
return inner_exist_(pathname, is_exist, is_adaptive);
}
int ObObjectDevice::inner_exist_(const char *pathname, bool &is_exist, const bool is_adaptive)
{
int ret = OB_SUCCESS;
common::ObString uri(pathname);
return util_.is_exist(uri, is_exist);
if (OB_FAIL(util_.is_exist(uri, is_adaptive, is_exist))) {
OB_LOG(WARN, "fail to check if the file exists", K(ret), K(uri), K(is_adaptive));
}
return ret;
}
/*notice: for backup, this interface only return size*/
int ObObjectDevice::stat(const char *pathname, ObIODFileStat &statbuf)
{
return inner_stat_(pathname, statbuf, false/*is_adaptive*/);
}
int ObObjectDevice::adaptive_stat(const char *pathname, ObIODFileStat &statbuf)
{
const bool is_adaptive = true;
return inner_stat_(pathname, statbuf, is_adaptive);
}
int ObObjectDevice::inner_stat_(const char *pathname,
ObIODFileStat &statbuf, const bool is_adaptive)
{
int ret = OB_SUCCESS;
int64_t length = 0;
common::ObString uri(pathname);
if (OB_FAIL(util_.get_file_length(uri, length))) {
OB_LOG(WARN, "fail to get fail length!", K(ret));
if (OB_FAIL(util_.get_file_length(uri, is_adaptive, length))) {
OB_LOG(WARN, "fail to get file length!", K(ret), K(uri), K(is_adaptive));
} else {
statbuf.size_ = length;
}
@ -396,15 +542,27 @@ int ObObjectDevice::stat(const char *pathname, ObIODFileStat &statbuf)
}
int ObObjectDevice::scan_dir(const char *dir_name, common::ObBaseDirEntryOperator &op)
{
return inner_scan_dir_(dir_name, op, false/*is_adaptive*/);
}
int ObObjectDevice::adaptive_scan_dir(const char *dir_name, ObBaseDirEntryOperator &op)
{
const bool is_adaptive = true;
return inner_scan_dir_(dir_name, op, is_adaptive);
}
int ObObjectDevice::inner_scan_dir_(const char *dir_name,
ObBaseDirEntryOperator &op, const bool is_adaptive)
{
common::ObString uri(dir_name);
int ret = OB_SUCCESS;
bool is_dir_scan = false;
if (op.is_dir_scan()) {
ret = util_.list_directories(uri, op);
ret = util_.list_directories(uri, is_adaptive, op);
is_dir_scan = true;
} else {
ret = util_.list_files(uri, op);
ret = util_.list_files(uri, is_adaptive, op);
}
if (OB_FAIL(ret)) {
@ -425,25 +583,30 @@ int ObObjectDevice::pread(const ObIOFd &fd, const int64_t offset, const int64_t
UNUSED(checker);
int ret = OB_SUCCESS;
int flag = -1;
void* ctx = NULL;
void *ctx = NULL;
fd_mng_.get_fd_flag(fd, flag);
if (!fd_mng_.validate_fd(fd, true)) {
ret = OB_NOT_INIT;
OB_LOG(WARN, "fd is not init!", K(fd.first_id_), K(fd.second_id_));
} else if (flag != OB_STORAGE_ACCESS_READER) {
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "fd is not a reader fd!", K(flag), K(ret));
} else if (OB_FAIL(fd_mng_.fd_to_ctx(fd, ctx))) {
OB_LOG(WARN, "fail to get ctx accroding fd!", K(ret));
} else {
} else if (OB_ISNULL(ctx)) {
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "fd ctx is null!", K(flag), K(ret));
} else if (flag == OB_STORAGE_ACCESS_READER) {
ObStorageReader *reader = static_cast<ObStorageReader*>(ctx);
if (OB_ISNULL(reader)) {
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "fd ctx is null!", K(flag), K(ret));
} else if (OB_FAIL(reader->pread((char*)buf, size, offset, read_size))) {
OB_LOG(WARN, "fail to pread!", K(ret));
if (OB_FAIL(reader->pread((char*)buf, size, offset, read_size))) {
OB_LOG(WARN, "fail to do normal pread!", K(ret), K(flag));
}
} else if (flag == OB_STORAGE_ACCESS_ADAPTIVE_READER) {
ObStorageAdaptiveReader *adaptive_reader = static_cast<ObStorageAdaptiveReader*>(ctx);
if (OB_FAIL(adaptive_reader->pread((char*)buf, size, offset, read_size))) {
OB_LOG(WARN, "fail to do adaptive reader pread!", K(ret), K(flag));
}
} else {
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "fd is not a reader fd!", K(flag), K(ret));
}
return ret;
}
@ -463,16 +626,24 @@ int ObObjectDevice::write(const ObIOFd &fd, const void *buf, const int64_t size,
OB_LOG(WARN, "fd is not init!", K(fd.first_id_), K(fd.second_id_), K(ret));
} else if (OB_FAIL(fd_mng_.fd_to_ctx(fd, ctx))) {
OB_LOG(WARN, "fail to get ctx accroding fd!", K(ret));
} else if (OB_ISNULL(ctx)) {
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "fd ctx is null!", K(flag), K(ret));
} else if (flag == OB_STORAGE_ACCESS_OVERWRITER) {
ObStorageWriter* overwriter = static_cast<ObStorageWriter*>(ctx);
ObStorageWriter *overwriter = static_cast<ObStorageWriter*>(ctx);
if (OB_FAIL(overwriter->write((char*)buf, size))) {
OB_LOG(WARN, "fail to do overwrite write!", K(ret));
}
} else if (flag == OB_STORAGE_ACCESS_APPENDER) {
ObStorageAppender* appender = static_cast<ObStorageAppender*>(ctx);
ObStorageAppender *appender = static_cast<ObStorageAppender*>(ctx);
if (OB_FAIL(appender->write((char*)buf, size))) {
OB_LOG(WARN, "fail to do append write!", K(ret));
}
} else if (flag == OB_STORAGE_ACCESS_MULTIPART_WRITER) {
ObStorageMultiPartWriter *multipart_writer = static_cast<ObStorageMultiPartWriter*>(ctx);
if (OB_FAIL(multipart_writer->write((char*)buf, size))) {
OB_LOG(WARN, "fail to do multipart writer write!", K(ret));
}
} else {
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "unknow access type, not a writable type!", K(flag), K(ret));
@ -505,11 +676,19 @@ int ObObjectDevice::pwrite(const ObIOFd &fd, const int64_t offset, const int64_t
OB_LOG(WARN, "fd is not init!", K(fd.first_id_), K(fd.second_id_));
} else if (OB_FAIL(fd_mng_.fd_to_ctx(fd, ctx))) {
OB_LOG(WARN, "fail to get ctx accroding fd!", K(ret));
} else if (OB_ISNULL(ctx)) {
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "fd ctx is null!", K(flag), K(ret));
} else if (flag == OB_STORAGE_ACCESS_RANDOMWRITER) {
ObStorageAppender* appender = static_cast<ObStorageAppender*>(ctx);
ObStorageAppender *appender = static_cast<ObStorageAppender*>(ctx);
if (OB_FAIL(appender->pwrite((char*)buf, size, offset))) {
OB_LOG(WARN, "fail to do appender pwrite!", K(ret));
}
} else if (flag == OB_STORAGE_ACCESS_MULTIPART_WRITER) {
ObStorageMultiPartWriter *multipart_writer = static_cast<ObStorageMultiPartWriter*>(ctx);
if (OB_FAIL(multipart_writer->pwrite((char*)buf, size, offset))) {
OB_LOG(WARN, "fail to do multipart writer pwrite!", K(ret));
}
} else {
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "unknow access type, not a writable type!", K(flag), K(ret));
@ -550,9 +729,7 @@ int ObObjectDevice::reconfig(const ObIODOpts &opts)
int ObObjectDevice::seal_file(const ObIOFd &fd)
{
UNUSED(fd);
OB_LOG_RET(WARN, OB_NOT_SUPPORTED, "seal file is not support in object device !", K(fd));
return OB_NOT_SUPPORTED;
return seal_for_adaptive(fd);
}
int ObObjectDevice::scan_dir(const char *dir_name, int (*func)(const dirent *entry))