[feature](cold-data) move cold data to object storage without losing any feature(BE) (#10280)
This PR supports rowset level data upload on the BE side, so that there can be both cold data and hot data in a tablet, and there is no necessary to prohibit loading new data to cooled tablets. Each rowset is bound to a `FileSystem`, so that the storage layer can read and write rowsets without perceiving the underlying filesystem. The abstracted `RemoteFileSystem` can try local caching strategies with different granularity, instead of caching segment files as before. To avoid conflicts with the code in be/src/io, we temporarily put the file system related code in the be/src/io/fs directory. In the future, `FileReader`s and `FileWriter`s should be unified.
This commit is contained in:
@ -35,6 +35,7 @@
|
||||
#include "olap/olap_common.h"
|
||||
#include "olap/snapshot_manager.h"
|
||||
#include "olap/storage_engine.h"
|
||||
#include "olap/storage_policy_mgr.h"
|
||||
#include "olap/tablet.h"
|
||||
#include "olap/task/engine_alter_tablet_task.h"
|
||||
#include "olap/task/engine_batch_load_task.h"
|
||||
@ -42,7 +43,6 @@
|
||||
#include "olap/task/engine_clone_task.h"
|
||||
#include "olap/task/engine_publish_version_task.h"
|
||||
#include "olap/task/engine_storage_migration_task.h"
|
||||
#include "olap/task/engine_storage_migration_task_v2.h"
|
||||
#include "olap/utils.h"
|
||||
#include "runtime/exec_env.h"
|
||||
#include "runtime/snapshot_loader.h"
|
||||
@ -193,9 +193,13 @@ void TaskWorkerPool::start() {
|
||||
cb = std::bind<void>(&TaskWorkerPool::_submit_table_compaction_worker_thread_callback,
|
||||
this);
|
||||
break;
|
||||
case TaskWorkerType::STORAGE_MEDIUM_MIGRATE_V2:
|
||||
case TaskWorkerType::REFRESH_STORAGE_POLICY:
|
||||
cb = std::bind<void>(
|
||||
&TaskWorkerPool::_storage_refresh_storage_policy_worker_thread_callback, this);
|
||||
break;
|
||||
case TaskWorkerType::UPDATE_STORAGE_POLICY:
|
||||
_worker_count = 1;
|
||||
cb = std::bind<void>(&TaskWorkerPool::_storage_medium_migrate_v2_worker_thread_callback,
|
||||
cb = std::bind<void>(&TaskWorkerPool::_storage_update_storage_policy_worker_thread_callback,
|
||||
this);
|
||||
break;
|
||||
default:
|
||||
@ -361,6 +365,7 @@ void TaskWorkerPool::_create_tablet_worker_thread_callback() {
|
||||
TStatus task_status;
|
||||
|
||||
std::vector<TTabletInfo> finish_tablet_infos;
|
||||
LOG(INFO) << "create tablet: " << create_tablet_req;
|
||||
Status create_status = _env->storage_engine()->create_tablet(create_tablet_req);
|
||||
if (!create_status.ok()) {
|
||||
LOG(WARNING) << "create table failed. status: " << create_status
|
||||
@ -437,6 +442,11 @@ void TaskWorkerPool::_drop_tablet_worker_thread_callback() {
|
||||
StorageEngine::instance()->txn_manager()->force_rollback_tablet_related_txns(
|
||||
dropped_tablet->data_dir()->get_meta(), drop_tablet_req.tablet_id,
|
||||
drop_tablet_req.schema_hash, dropped_tablet->tablet_uid());
|
||||
// We remove remote rowset directly.
|
||||
// TODO(cyx): do remove in background
|
||||
if (drop_tablet_req.is_drop_table_or_partition) {
|
||||
dropped_tablet->remove_all_remote_rowsets();
|
||||
}
|
||||
} else {
|
||||
status_code = TStatusCode::NOT_FOUND;
|
||||
error_msgs.push_back(err);
|
||||
@ -854,8 +864,15 @@ void TaskWorkerPool::_update_tablet_meta_worker_thread_callback() {
|
||||
tablet->set_partition_id(tablet_meta_info.partition_id);
|
||||
break;
|
||||
case TTabletMetaType::INMEMORY:
|
||||
tablet->tablet_meta()->mutable_tablet_schema()->set_is_in_memory(
|
||||
tablet_meta_info.is_in_memory);
|
||||
if (tablet_meta_info.storage_policy.empty()) {
|
||||
tablet->tablet_meta()->mutable_tablet_schema()->set_is_in_memory(
|
||||
tablet_meta_info.is_in_memory);
|
||||
} else {
|
||||
LOG(INFO) << "set tablet cooldown resource "
|
||||
<< tablet_meta_info.storage_policy;
|
||||
tablet->tablet_meta()->set_cooldown_resource(
|
||||
tablet_meta_info.storage_policy);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
@ -1036,8 +1053,8 @@ Status TaskWorkerPool::_check_migrate_request(const TStorageMediumMigrateReq& re
|
||||
return Status::OLAPInternalError(OLAP_REQUEST_FAILED);
|
||||
}
|
||||
|
||||
// check disk capacity
|
||||
int64_t tablet_size = tablet->tablet_footprint();
|
||||
// check local disk capacity
|
||||
int64_t tablet_size = tablet->tablet_local_size();
|
||||
if ((*dest_store)->reach_capacity_limit(tablet_size)) {
|
||||
LOG(WARNING) << "reach the capacity limit of path: " << (*dest_store)->path()
|
||||
<< ", tablet size: " << tablet_size;
|
||||
@ -1106,10 +1123,12 @@ void TaskWorkerPool::_report_task_worker_thread_callback() {
|
||||
|
||||
while (_is_work) {
|
||||
_is_doing_work = false;
|
||||
// wait at most report_task_interval_seconds, or being notified
|
||||
std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock);
|
||||
_worker_thread_condition_variable.wait_for(
|
||||
worker_thread_lock, std::chrono::seconds(config::report_task_interval_seconds));
|
||||
{
|
||||
// wait at most report_task_interval_seconds, or being notified
|
||||
std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock);
|
||||
_worker_thread_condition_variable.wait_for(
|
||||
worker_thread_lock, std::chrono::seconds(config::report_task_interval_seconds));
|
||||
}
|
||||
if (!_is_work) {
|
||||
break;
|
||||
}
|
||||
@ -1143,11 +1162,13 @@ void TaskWorkerPool::_report_disk_state_worker_thread_callback() {
|
||||
|
||||
while (_is_work) {
|
||||
_is_doing_work = false;
|
||||
// wait at most report_disk_state_interval_seconds, or being notified
|
||||
std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock);
|
||||
_worker_thread_condition_variable.wait_for(
|
||||
worker_thread_lock,
|
||||
std::chrono::seconds(config::report_disk_state_interval_seconds));
|
||||
{
|
||||
// wait at most report_disk_state_interval_seconds, or being notified
|
||||
std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock);
|
||||
_worker_thread_condition_variable.wait_for(
|
||||
worker_thread_lock,
|
||||
std::chrono::seconds(config::report_disk_state_interval_seconds));
|
||||
}
|
||||
if (!_is_work) {
|
||||
break;
|
||||
}
|
||||
@ -1171,14 +1192,15 @@ void TaskWorkerPool::_report_disk_state_worker_thread_callback() {
|
||||
map<string, TDisk> disks;
|
||||
for (auto& root_path_info : data_dir_infos) {
|
||||
TDisk disk;
|
||||
disk.__set_root_path(root_path_info.path_desc.filepath);
|
||||
disk.__set_root_path(root_path_info.path);
|
||||
disk.__set_path_hash(root_path_info.path_hash);
|
||||
disk.__set_storage_medium(root_path_info.storage_medium);
|
||||
disk.__set_disk_total_capacity(root_path_info.disk_capacity);
|
||||
disk.__set_data_used_capacity(root_path_info.data_used_capacity);
|
||||
disk.__set_data_used_capacity(root_path_info.local_used_capacity);
|
||||
disk.__set_remote_used_capacity(root_path_info.remote_used_capacity);
|
||||
disk.__set_disk_available_capacity(root_path_info.available);
|
||||
disk.__set_used(root_path_info.is_used);
|
||||
disks[root_path_info.path_desc.filepath] = disk;
|
||||
disks[root_path_info.path] = disk;
|
||||
}
|
||||
request.__set_disks(disks);
|
||||
_handle_report(request, ReportType::DISK);
|
||||
@ -1195,10 +1217,13 @@ void TaskWorkerPool::_report_tablet_worker_thread_callback() {
|
||||
while (_is_work) {
|
||||
_is_doing_work = false;
|
||||
|
||||
// wait at most report_tablet_interval_seconds, or being notified
|
||||
std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock);
|
||||
_worker_thread_condition_variable.wait_for(
|
||||
worker_thread_lock, std::chrono::seconds(config::report_tablet_interval_seconds));
|
||||
{
|
||||
// wait at most report_tablet_interval_seconds, or being notified
|
||||
std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock);
|
||||
_worker_thread_condition_variable.wait_for(
|
||||
worker_thread_lock,
|
||||
std::chrono::seconds(config::report_tablet_interval_seconds));
|
||||
}
|
||||
if (!_is_work) {
|
||||
break;
|
||||
}
|
||||
@ -1565,7 +1590,6 @@ Status TaskWorkerPool::_move_dir(const TTabletId tablet_id, const std::string& s
|
||||
return Status::InvalidArgument("Could not find tablet");
|
||||
}
|
||||
|
||||
std::string dest_tablet_dir = tablet->tablet_path_desc().filepath;
|
||||
SnapshotLoader loader(_env, job_id, tablet_id);
|
||||
Status status = loader.move(src, tablet, overwrite);
|
||||
|
||||
@ -1683,9 +1707,67 @@ void TaskWorkerPool::_submit_table_compaction_worker_thread_callback() {
|
||||
}
|
||||
}
|
||||
|
||||
void TaskWorkerPool::_storage_medium_migrate_v2_worker_thread_callback() {
|
||||
void TaskWorkerPool::_storage_refresh_storage_policy_worker_thread_callback() {
|
||||
while (_is_work) {
|
||||
_is_doing_work = false;
|
||||
{
|
||||
// wait at most report_task_interval_seconds, or being notified
|
||||
std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock);
|
||||
_worker_thread_condition_variable.wait_for(
|
||||
worker_thread_lock,
|
||||
std::chrono::seconds(
|
||||
config::storage_refresh_storage_policy_task_interval_seconds));
|
||||
}
|
||||
if (!_is_work) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (_master_info.network_address.port == 0) {
|
||||
// port == 0 means not received heartbeat yet
|
||||
// sleep a short time and try again
|
||||
LOG(INFO)
|
||||
<< "waiting to receive first heartbeat from frontend before doing task report";
|
||||
continue;
|
||||
}
|
||||
|
||||
_is_doing_work = true;
|
||||
|
||||
TGetStoragePolicyResult result;
|
||||
Status status = _master_client->refresh_storage_policy(&result);
|
||||
if (!status.ok()) {
|
||||
LOG(WARNING) << "refresh storage policy status not ok";
|
||||
} else if (result.status.status_code != TStatusCode::OK) {
|
||||
LOG(WARNING) << "refresh storage policy result status status_code not ok";
|
||||
} else {
|
||||
// update storage policy mgr.
|
||||
StoragePolicyMgr* spm = ExecEnv::GetInstance()->storage_policy_mgr();
|
||||
for (const auto& iter : result.result_entrys) {
|
||||
shared_ptr<StoragePolicy> policy_ptr = make_shared<StoragePolicy>();
|
||||
policy_ptr->storage_policy_name = iter.policy_name;
|
||||
policy_ptr->cooldown_datetime = iter.cooldown_datetime;
|
||||
policy_ptr->cooldown_ttl = iter.cooldown_ttl;
|
||||
policy_ptr->s3_endpoint = iter.s3_storage_param.s3_endpoint;
|
||||
policy_ptr->s3_region = iter.s3_storage_param.s3_region;
|
||||
policy_ptr->s3_ak = iter.s3_storage_param.s3_ak;
|
||||
policy_ptr->s3_sk = iter.s3_storage_param.s3_sk;
|
||||
policy_ptr->root_path = iter.s3_storage_param.root_path;
|
||||
policy_ptr->bucket = iter.s3_storage_param.bucket;
|
||||
policy_ptr->s3_conn_timeout_ms = iter.s3_storage_param.s3_conn_timeout_ms;
|
||||
policy_ptr->s3_max_conn = iter.s3_storage_param.s3_max_conn;
|
||||
policy_ptr->s3_request_timeout_ms = iter.s3_storage_param.s3_request_timeout_ms;
|
||||
policy_ptr->md5_sum = iter.md5_checksum;
|
||||
|
||||
LOG(INFO) << "refresh storage policy task, policy " << *policy_ptr;
|
||||
spm->periodic_put(iter.policy_name, std::move(policy_ptr));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void TaskWorkerPool::_storage_update_storage_policy_worker_thread_callback() {
|
||||
while (_is_work) {
|
||||
TAgentTaskRequest agent_task_req;
|
||||
TGetStoragePolicy get_storage_policy_req;
|
||||
{
|
||||
std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock);
|
||||
while (_is_work && _tasks.empty()) {
|
||||
@ -1696,115 +1778,32 @@ void TaskWorkerPool::_storage_medium_migrate_v2_worker_thread_callback() {
|
||||
}
|
||||
|
||||
agent_task_req = _tasks.front();
|
||||
get_storage_policy_req = agent_task_req.update_policy;
|
||||
_tasks.pop_front();
|
||||
}
|
||||
int64_t signature = agent_task_req.signature;
|
||||
LOG(INFO) << "get migration table v2 task, signature: " << agent_task_req.signature;
|
||||
bool is_task_timeout = false;
|
||||
if (agent_task_req.__isset.recv_time) {
|
||||
int64_t time_elapsed = time(nullptr) - agent_task_req.recv_time;
|
||||
if (time_elapsed > config::report_task_interval_seconds * 20) {
|
||||
LOG(INFO) << "task elapsed " << time_elapsed
|
||||
<< " seconds since it is inserted to queue, it is timeout";
|
||||
is_task_timeout = true;
|
||||
}
|
||||
}
|
||||
if (!is_task_timeout) {
|
||||
TFinishTaskRequest finish_task_request;
|
||||
TTaskType::type task_type = agent_task_req.task_type;
|
||||
switch (task_type) {
|
||||
case TTaskType::STORAGE_MEDIUM_MIGRATE_V2:
|
||||
_storage_medium_migrate_v2(agent_task_req, signature, task_type,
|
||||
&finish_task_request);
|
||||
break;
|
||||
default:
|
||||
// pass
|
||||
break;
|
||||
}
|
||||
_finish_task(finish_task_request);
|
||||
}
|
||||
|
||||
StoragePolicyMgr* spm = ExecEnv::GetInstance()->storage_policy_mgr();
|
||||
shared_ptr<StoragePolicy> policy_ptr = make_shared<StoragePolicy>();
|
||||
policy_ptr->storage_policy_name = get_storage_policy_req.policy_name;
|
||||
policy_ptr->cooldown_datetime = get_storage_policy_req.cooldown_datetime;
|
||||
policy_ptr->cooldown_ttl = get_storage_policy_req.cooldown_ttl;
|
||||
policy_ptr->s3_endpoint = get_storage_policy_req.s3_storage_param.s3_endpoint;
|
||||
policy_ptr->s3_region = get_storage_policy_req.s3_storage_param.s3_region;
|
||||
policy_ptr->s3_ak = get_storage_policy_req.s3_storage_param.s3_ak;
|
||||
policy_ptr->s3_sk = get_storage_policy_req.s3_storage_param.s3_sk;
|
||||
policy_ptr->root_path = get_storage_policy_req.s3_storage_param.root_path;
|
||||
policy_ptr->bucket = get_storage_policy_req.s3_storage_param.bucket;
|
||||
policy_ptr->s3_conn_timeout_ms = get_storage_policy_req.s3_storage_param.s3_conn_timeout_ms;
|
||||
policy_ptr->s3_max_conn = get_storage_policy_req.s3_storage_param.s3_max_conn;
|
||||
policy_ptr->s3_request_timeout_ms =
|
||||
get_storage_policy_req.s3_storage_param.s3_request_timeout_ms;
|
||||
policy_ptr->md5_sum = get_storage_policy_req.md5_checksum;
|
||||
|
||||
LOG(INFO) << "get storage update policy task, update policy " << *policy_ptr;
|
||||
|
||||
spm->update(get_storage_policy_req.policy_name, std::move(policy_ptr));
|
||||
_remove_task_info(agent_task_req.task_type, agent_task_req.signature);
|
||||
}
|
||||
}
|
||||
|
||||
void TaskWorkerPool::_storage_medium_migrate_v2(const TAgentTaskRequest& agent_task_req,
|
||||
int64_t signature, const TTaskType::type task_type,
|
||||
TFinishTaskRequest* finish_task_request) {
|
||||
Status status = Status::OK();
|
||||
TStatus task_status;
|
||||
std::vector<string> error_msgs;
|
||||
|
||||
string process_name;
|
||||
switch (task_type) {
|
||||
case TTaskType::STORAGE_MEDIUM_MIGRATE_V2:
|
||||
process_name = "StorageMediumMigrationV2";
|
||||
break;
|
||||
default:
|
||||
std::string task_name;
|
||||
EnumToString(TTaskType, task_type, task_name);
|
||||
LOG(WARNING) << "Storage medium migration v2 type invalid. type: " << task_name
|
||||
<< ", signature: " << signature;
|
||||
status = Status::NotSupported("Storage medium migration v2 type invalid");
|
||||
break;
|
||||
}
|
||||
|
||||
// Check last storage medium migration v2 status, if failed delete tablet file
|
||||
// Do not need to adjust delete success or not
|
||||
// Because if delete failed task will failed
|
||||
TTabletId new_tablet_id;
|
||||
TSchemaHash new_schema_hash = 0;
|
||||
if (status.ok()) {
|
||||
new_tablet_id = agent_task_req.storage_migration_req_v2.new_tablet_id;
|
||||
new_schema_hash = agent_task_req.storage_migration_req_v2.new_schema_hash;
|
||||
EngineStorageMigrationTaskV2 engine_task(agent_task_req.storage_migration_req_v2);
|
||||
Status sc_status = _env->storage_engine()->execute_task(&engine_task);
|
||||
if (!sc_status.ok()) {
|
||||
if (sc_status.precise_code() == OLAP_ERR_DATA_QUALITY_ERR) {
|
||||
error_msgs.push_back("The data quality does not satisfy, please check your data. ");
|
||||
}
|
||||
status = sc_status;
|
||||
} else {
|
||||
status = Status::OK();
|
||||
}
|
||||
}
|
||||
|
||||
if (status.ok()) {
|
||||
++_s_report_version;
|
||||
LOG(INFO) << process_name << " finished. signature: " << signature;
|
||||
}
|
||||
|
||||
// Return result to fe
|
||||
finish_task_request->__set_backend(_backend);
|
||||
finish_task_request->__set_report_version(_s_report_version);
|
||||
finish_task_request->__set_task_type(task_type);
|
||||
finish_task_request->__set_signature(signature);
|
||||
|
||||
std::vector<TTabletInfo> finish_tablet_infos;
|
||||
if (status.ok()) {
|
||||
TTabletInfo tablet_info;
|
||||
status = _get_tablet_info(new_tablet_id, new_schema_hash, signature, &tablet_info);
|
||||
|
||||
if (!status.ok()) {
|
||||
LOG(WARNING) << process_name << " success, but get new tablet info failed."
|
||||
<< "tablet_id: " << new_tablet_id << ", schema_hash: " << new_schema_hash
|
||||
<< ", signature: " << signature;
|
||||
} else {
|
||||
finish_tablet_infos.push_back(tablet_info);
|
||||
}
|
||||
}
|
||||
|
||||
if (status.ok()) {
|
||||
finish_task_request->__set_finish_tablet_infos(finish_tablet_infos);
|
||||
LOG(INFO) << process_name << " success. signature: " << signature;
|
||||
error_msgs.push_back(process_name + " success");
|
||||
} else {
|
||||
LOG(WARNING) << process_name << " failed. signature: " << signature;
|
||||
error_msgs.push_back(process_name + " failed");
|
||||
error_msgs.push_back("status: " + status.to_string());
|
||||
}
|
||||
task_status.__set_status_code(status.code());
|
||||
task_status.__set_error_msgs(error_msgs);
|
||||
finish_task_request->__set_task_status(task_status);
|
||||
}
|
||||
|
||||
} // namespace doris
|
||||
|
||||
Reference in New Issue
Block a user