Refactor object storage multipart upload interface to distinguish close, complete and abort operations

This commit is contained in:
obdev
2024-02-02 12:12:33 +00:00
committed by ob-robot
parent 9ddb3fb67b
commit 6c8cde0b84
28 changed files with 506 additions and 144 deletions

View File

@ -333,6 +333,10 @@ int init_task_executor(const char *base_uri,
if (OB_FAIL(alloc_executor<DelTaskExecutor>(executor, attr))) {
OB_LOG(WARN, "fail to alloc and construct executor", K(ret), K(config));
}
} else if (config.type_ == BenchmarkTaskType::BENCHMARK_TASK_IS_EXIST) {
if (OB_FAIL(alloc_executor<IsExistTaskExecutor>(executor, attr))) {
OB_LOG(WARN, "fail to alloc and construct executor", K(ret), K(config));
}
} else {
ret = OB_ERR_UNEXPECTED;
OB_LOG(WARN, "unknown benchmark task type", K(ret), K(config));
@ -627,18 +631,21 @@ int MultipartWriteTaskExecutor::execute()
if (OB_FAIL(device_handle->pwrite(fd, cur_offset, cur_part_size,
write_buf_, actual_write_size))) {
OB_LOG(WARN, "fail to upload part",
K(ret), K_(base_uri), K(cur_offset), K(cur_part_size));
K(ret), K_(base_uri), K(cur_offset), K(cur_part_size), K(fd));
} else {
cur_offset += cur_part_size;
}
}
const int64_t close_start_time_us = ObTimeUtility::current_time();
if (FAILEDx(adapter.close_device_and_fd(device_handle, fd))) {
OB_LOG(WARN, "fail to close device handle", K(ret), K_(base_uri));
} else {
metrics_.close_time_ms_map_.log_entry(close_start_time_us);
if (FAILEDx(device_handle->complete(fd))) {
OB_LOG(WARN, "fail to complete multipart writer", K(ret), K_(base_uri), K(fd));
}
int tmp_ret = OB_SUCCESS;
if (OB_TMP_FAIL(adapter.close_device_and_fd(device_handle, fd))) {
OB_LOG(WARN, "fail to close device handle", K(ret), K(tmp_ret), K_(base_uri));
}
metrics_.close_time_ms_map_.log_entry(close_start_time_us);
if (OB_SUCC(ret)) {
metrics_.operation_num_++;
@ -781,5 +788,57 @@ int DelTaskExecutor::execute()
return ret;
}
/*--------------------------------Is Exist Task Executor--------------------------------*/
IsExistTaskExecutor::IsExistTaskExecutor()
: ITaskExecutor(), obj_num_(-1)
{
}
int IsExistTaskExecutor::init(const char *base_uri,
share::ObBackupStorageInfo *storage_info, const TaskConfig &config)
{
int ret = OB_SUCCESS;
if (OB_FAIL(ITaskExecutor::init(base_uri, storage_info, config))) {
OB_LOG(WARN, "fail to init ITaskExecutor", K(ret), K(base_uri), KPC(storage_info), K(config));
} else if (OB_UNLIKELY(config.obj_num_ <= 0)) {
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "invalid arguments", K(ret), K(config));
} else {
obj_num_ = config.obj_num_;
is_inited_ = true;
}
if (OB_FAIL(ret)) {
reset();
}
return ret;
}
int IsExistTaskExecutor::execute()
{
int ret = OB_SUCCESS;
const int64_t object_id = ObRandom::rand(0, obj_num_ - 1);
const int64_t start_time_us = ObTimeUtility::current_time();
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
OB_LOG(WARN, "ReadTaskExecutor not init", K(ret), K_(base_uri));
} else if (OB_FAIL(prepare_(object_id))) {
OB_LOG(WARN, "fail to prepare", K(ret), K_(base_uri), K(object_id));
} else {
ObBackupIoAdapter adapter;
bool is_exist = false;
if (OB_FAIL(adapter.is_exist(base_uri_, storage_info_, is_exist))) {
OB_LOG(WARN, "fail to check is exist",
K(ret), K_(base_uri), KPC_(storage_info), K(object_id));
} else {
metrics_.operation_num_++;
metrics_.total_op_time_ms_map_.log_entry(start_time_us);
}
}
finish_(ret);
return ret;
}
} //tools
} //oceanbase