[FEAT MERGE] Add anti-fallback checks for the memory of 500 tenant

This commit is contained in:
obdev
2023-04-25 02:41:25 +00:00
committed by ob-robot
parent c317071f7e
commit fdcaa9a932
135 changed files with 936 additions and 660 deletions

View File

@ -21,6 +21,10 @@ const char *OB_STORAGE_ACCESS_TYPES_STR[] = {"reader", "overwriter", "appender",
ObObjectDevice::ObObjectDevice() : oss_account_(), base_info_(NULL), is_started_(false), lock_(common::ObLatchIds::OBJECT_DEVICE_LOCK)
{
auto attr = SET_USE_500("ObjectDevice");
reader_ctx_pool_.set_attr(attr);
appender_ctx_pool_.set_attr(attr);
overwriter_ctx_pool_.set_attr(attr);
}
int ObObjectDevice::init(const ObIODOpts &opts)
@ -51,7 +55,7 @@ ObObjectDevice::~ObObjectDevice()
}
/*the app logical use call ObBackupIoAdapter::get_and_init_device*/
/*decription: base_info just related to storage_info,
/*decription: base_info just related to storage_info,
base_info is used by the reader/appender*/
int ObObjectDevice::start(const ObIODOpts &opts)
{
@ -105,7 +109,7 @@ void get_opt_value(ObIODOpts *opts, const char* key, const char*& value)
for (int i = 0; i < opts->opt_cnt_; i++) {
if (0 == STRCMP(opts->opts_[i].key_, key)) {
value = opts->opts_[i].value_.value_str;
break;
break;
}
}
}
@ -131,7 +135,7 @@ int ObObjectDevice::get_access_type(ObIODOpts *opts, ObStorageAccessType& access
ObOptValue opt_value;
const char* access_type = NULL;
get_opt_value(opts, "AccessType", access_type);
if (NULL == access_type) {
OB_LOG(WARN, "can not find access type!");
} else if (0 == STRCMP(access_type , OB_STORAGE_ACCESS_TYPES_STR[OB_STORAGE_ACCESS_READER])) {
@ -166,7 +170,7 @@ int ObObjectDevice::open_for_reader(const char *pathname, void*& ctx)
return ret;
}
/*ObStorageOssMultiPartWriter is not used int the current version, if we use, later, the open func of
/*ObStorageOssMultiPartWriter is not used int the current version, if we use, later, the open func of
overwriter maybe need to add para(just like the open func of appender)*/
int ObObjectDevice::open_for_overwriter(const char *pathname, void*& ctx)
{
@ -181,7 +185,7 @@ int ObObjectDevice::open_for_overwriter(const char *pathname, void*& ctx)
} else {
ctx = (void*)overwriter;
}
}
}
return ret;
}
@ -218,8 +222,8 @@ int ObObjectDevice::open_for_appender(const char *pathname, ObIODOpts *opts, voi
} else {
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "Invalid open mode!", KCSTRING(open_mode), K(ret));
}
}
if (NULL == append_strategy || 0 == STRCMP(append_strategy, "OB_APPEND_USE_OVERRITE")) {
//just keep the default value
} else if (0 == STRCMP(append_strategy, "OB_APPEND_USE_SLICE_PUT")) {
@ -247,7 +251,7 @@ int ObObjectDevice::open_for_appender(const char *pathname, ObIODOpts *opts, voi
} else {
ctx = appender;
}
}
}
return ret;
}
@ -282,7 +286,7 @@ int ObObjectDevice::release_res(void* ctx, const ObIOFd &fd, ObStorageAccessType
} else {
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "invalid access_type!", K(access_type), K(ret));
}
}
if (OB_SUCCESS != (ret_tmp = fd_mng_.release_fd(fd))) {
ret = (OB_SUCCESS == ret) ? ret_tmp : ret;
@ -295,8 +299,8 @@ int ObObjectDevice::release_res(void* ctx, const ObIOFd &fd, ObStorageAccessType
/*
* mode is not used in object device
*/
int ObObjectDevice::open(const char *pathname, const int flags, const mode_t mode,
ObIOFd &fd, ObIODOpts *opts)
int ObObjectDevice::open(const char *pathname, const int flags, const mode_t mode,
ObIOFd &fd, ObIODOpts *opts)
{
UNUSED(flags);
UNUSED(mode);
@ -310,8 +314,8 @@ int ObObjectDevice::open(const char *pathname, const int flags, const mode_t mod
} else if (OB_ISNULL(pathname)) {
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "pathname is null!", K(ret));
}
}
//handle open logical
if (OB_SUCC(ret)) {
if(OB_ISNULL(opts)) {
@ -322,7 +326,7 @@ int ObObjectDevice::open(const char *pathname, const int flags, const mode_t mod
} else {
if (OB_STORAGE_ACCESS_READER == access_type) {
ret = open_for_reader(pathname, ctx);
} else if (OB_STORAGE_ACCESS_APPENDER == access_type ||
} else if (OB_STORAGE_ACCESS_APPENDER == access_type ||
OB_STORAGE_ACCESS_RANDOMWRITER == access_type) {
ret = open_for_appender(pathname, opts, ctx);
} else if (OB_STORAGE_ACCESS_OVERWRITER == access_type) {
@ -340,9 +344,9 @@ int ObObjectDevice::open(const char *pathname, const int flags, const mode_t mod
if (OB_SUCC(ret)) {
if (OB_FAIL(fd_mng_.get_fd(ctx, device_type_, access_type, fd))) {
OB_LOG(WARN, "fail to alloc fd!", K(ret), K(fd), KCSTRING(pathname), K(access_type));
}
}
}
//handle resource free when exception happen
if (OB_FAIL(ret) && !OB_ISNULL(ctx)) {
int tmp_ret = OB_SUCCESS;
@ -365,7 +369,7 @@ int ObObjectDevice::close(const ObIOFd &fd)
fd_mng_.get_fd_flag(fd, flag);
if (!fd_mng_.validate_fd(fd, true)) {
ret = OB_NOT_INIT;
ret = OB_NOT_INIT;
OB_LOG(WARN, "fail to close fd. since fd is invalid!", K(ret) ,K(fd.first_id_), K(fd.second_id_));
} else if (OB_FAIL(fd_mng_.fd_to_ctx(fd, ctx))) {
OB_LOG(WARN, "fail to get ctx accroding fd!", K(ret));
@ -385,19 +389,19 @@ int ObObjectDevice::mkdir(const char *pathname, mode_t mode)
int ObObjectDevice::rmdir(const char *pathname)
{
common::ObString uri(pathname);
return util_.del_dir(uri);
return util_.del_dir(uri);
}
int ObObjectDevice::unlink(const char *pathname)
{
common::ObString uri(pathname);
return util_.del_file(uri);
return util_.del_file(uri);
}
int ObObjectDevice::exist(const char *pathname, bool &is_exist)
{
common::ObString uri(pathname);
return util_.is_exist(uri, is_exist);
return util_.is_exist(uri, is_exist);
}
/*notice: for backup, this interface only return size*/
@ -425,7 +429,7 @@ int ObObjectDevice::scan_dir(const char *dir_name, common::ObBaseDirEntryOperato
} else {
ret = util_.list_files(uri, op);
}
if (OB_FAIL(ret)) {
OB_LOG(WARN, "fail to do list/dir scan!", K(ret), K(is_dir_scan), KCSTRING(dir_name));
}
@ -439,7 +443,7 @@ int ObObjectDevice::is_tagging(const char *pathname, bool &is_tagging)
}
int ObObjectDevice::pread(const ObIOFd &fd, const int64_t offset, const int64_t size,
void *buf, int64_t &read_size, ObIODPreadChecker *checker)
void *buf, int64_t &read_size, ObIODPreadChecker *checker)
{
UNUSED(checker);
int ret = OB_SUCCESS;
@ -463,7 +467,7 @@ int ObObjectDevice::pread(const ObIOFd &fd, const int64_t offset, const int64_t
} else if (OB_FAIL(reader->pread((char*)buf, size, offset, read_size))) {
OB_LOG(WARN, "fail to pread!", K(ret));
}
}
}
return ret;
}
@ -475,7 +479,7 @@ int ObObjectDevice::write(const ObIOFd &fd, const void *buf, const int64_t size,
int ret = OB_SUCCESS;
int flag = -1;
void* ctx = NULL;
fd_mng_.get_fd_flag(fd, flag);
if (!fd_mng_.validate_fd(fd, true)) {
ret = OB_NOT_INIT;
@ -502,7 +506,7 @@ int ObObjectDevice::write(const ObIOFd &fd, const void *buf, const int64_t size,
} else {
write_size = 0;
}
return ret;
return ret;
}
/*object storage does not support random write, so offset is no use
@ -515,7 +519,7 @@ int ObObjectDevice::pwrite(const ObIOFd &fd, const int64_t offset, const int64_t
int ret = OB_SUCCESS;
int flag = -1;
void* ctx = NULL;
UNUSED(offset);
fd_mng_.get_fd_flag(fd, flag);
@ -644,7 +648,7 @@ int ObObjectDevice::mark_blocks(ObIBlockIterator &block_iter)
{
UNUSED(block_iter);
OB_LOG_RET(WARN, OB_NOT_SUPPORTED, "mark_blocks is not support in object device !", K(device_type_));
return OB_NOT_SUPPORTED;
return OB_NOT_SUPPORTED;
}
int ObObjectDevice::alloc_block(const ObIODOpts *opts, ObIOFd &block_id)
@ -745,7 +749,7 @@ int ObObjectDevice::io_cancel(ObIOContext *io_context, ObIOCB *iocb)
UNUSED(io_context);
UNUSED(iocb);
OB_LOG_RET(WARN, OB_NOT_SUPPORTED, "io_cancel is not support in object device !", K(device_type_));
return OB_NOT_SUPPORTED;
return OB_NOT_SUPPORTED;
}
int ObObjectDevice::io_getevents(ObIOContext *io_context, int64_t min_nr,
ObIOEvents *events, struct timespec *timeout)
@ -806,7 +810,7 @@ int ObObjectDevice::check_space_full(const int64_t required_size) const
{
UNUSED(required_size);
OB_LOG_RET(WARN, OB_NOT_SUPPORTED, "check_space_full is not support in object device !", K(device_type_));
return OB_NOT_SUPPORTED;
return OB_NOT_SUPPORTED;
}
int ObObjectDevice::fdatasync(const ObIOFd &fd)