[feature](cooldown) Decouple storage policy and resource (#15873)

This commit is contained in:
plat1ko
2023-01-31 14:13:47 +08:00
committed by GitHub
parent a8a29427f6
commit 00a598a839
73 changed files with 1108 additions and 1038 deletions

View File

@ -17,6 +17,7 @@
#include "agent/task_worker_pool.h"
#include <gen_cpp/AgentService_types.h>
#include <pthread.h>
#include <sys/stat.h>
@ -24,18 +25,21 @@
#include <chrono>
#include <csignal>
#include <ctime>
#include <memory>
#include <sstream>
#include <string>
#include "common/logging.h"
#include "common/status.h"
#include "env/env.h"
#include "gen_cpp/Types_types.h"
#include "gutil/strings/substitute.h"
#include "io/fs/s3_file_system.h"
#include "olap/data_dir.h"
#include "olap/olap_common.h"
#include "olap/snapshot_manager.h"
#include "olap/storage_engine.h"
#include "olap/storage_policy_mgr.h"
#include "olap/storage_policy.h"
#include "olap/tablet.h"
#include "olap/task/engine_alter_tablet_task.h"
#include "olap/task/engine_batch_load_task.h"
@ -195,14 +199,9 @@ void TaskWorkerPool::start() {
cb = std::bind<void>(&TaskWorkerPool::_submit_table_compaction_worker_thread_callback,
this);
break;
case TaskWorkerType::REFRESH_STORAGE_POLICY:
cb = std::bind<void>(
&TaskWorkerPool::_storage_refresh_storage_policy_worker_thread_callback, this);
break;
case TaskWorkerType::UPDATE_STORAGE_POLICY:
case TaskWorkerType::PUSH_STORAGE_POLICY:
_worker_count = 1;
cb = std::bind<void>(&TaskWorkerPool::_storage_update_storage_policy_worker_thread_callback,
this);
cb = std::bind<void>(&TaskWorkerPool::_push_storage_policy_worker_thread_callback, this);
break;
case TaskWorkerType::PUSH_COOLDOWN_CONF:
_worker_count = 1;
@ -269,6 +268,10 @@ void TaskWorkerPool::notify_thread() {
}
bool TaskWorkerPool::_register_task_info(const TTaskType::type task_type, int64_t signature) {
if (task_type == TTaskType::type::PUSH_STORAGE_POLICY) {
// no need to report task of these types
return true;
}
std::lock_guard<std::mutex> task_signatures_lock(_s_task_signatures_lock);
std::set<int64_t>& signature_set = _s_task_signatures[task_type];
return signature_set.insert(signature).second;
@ -283,9 +286,7 @@ void TaskWorkerPool::_remove_task_info(const TTaskType::type task_type, int64_t
queue_size = signature_set.size();
}
std::string type_str;
EnumToString(TTaskType, task_type, type_str);
VLOG_NOTICE << "remove task info. type=" << type_str << ", signature=" << signature
VLOG_NOTICE << "remove task info. type=" << task_type << ", signature=" << signature
<< ", queue_size=" << queue_size;
TRACE("remove task info");
}
@ -854,25 +855,27 @@ void TaskWorkerPool::_update_tablet_meta_worker_thread_callback() {
tablet->set_partition_id(tablet_meta_info.partition_id);
} else {
switch (tablet_meta_info.meta_type) {
case TTabletMetaType::PARTITIONID:
case TTabletMetaType::PARTITIONID: // FIXME(plat1ko): deprecate?
tablet->set_partition_id(tablet_meta_info.partition_id);
break;
case TTabletMetaType::INMEMORY:
if (tablet_meta_info.storage_policy.empty()) {
if (tablet_meta_info.__isset.storage_policy_id) {
LOG(INFO) << "set tablet storage_policy_id="
<< tablet_meta_info.storage_policy_id;
tablet->tablet_meta()->set_storage_policy_id(
tablet_meta_info.storage_policy_id);
}
if (tablet_meta_info.__isset.is_in_memory) {
tablet->tablet_meta()->mutable_tablet_schema()->set_is_in_memory(
tablet_meta_info.is_in_memory);
// The field is_in_memory should not be in the tablet_schema.
// it should be in the tablet_meta.
for (auto rowset_meta : tablet->tablet_meta()->all_mutable_rs_metas()) {
for (auto& rowset_meta : tablet->tablet_meta()->all_mutable_rs_metas()) {
rowset_meta->tablet_schema()->set_is_in_memory(
tablet_meta_info.is_in_memory);
}
tablet->get_max_version_schema(wrlock)->set_is_in_memory(
tablet_meta_info.is_in_memory);
} else {
LOG(INFO) << "set tablet cooldown resource "
<< tablet_meta_info.storage_policy;
tablet->tablet_meta()->set_storage_policy(tablet_meta_info.storage_policy);
}
break;
}
@ -1232,6 +1235,23 @@ void TaskWorkerPool::_report_tablet_worker_thread_callback() {
DorisMetrics::instance()->tablet_base_max_compaction_score->value());
request.__set_tablet_max_compaction_score(max_compaction_score);
request.__set_report_version(report_version);
// report storage policy and resource
auto& storage_policy_list = request.storage_policy;
for (auto [id, version] : get_storage_policy_ids()) {
auto& storage_policy = storage_policy_list.emplace_back();
storage_policy.__set_id(id);
storage_policy.__set_version(version);
}
request.__isset.storage_policy = true;
auto& resource_list = request.resource;
for (auto [id, version] : get_storage_resource_ids()) {
auto& resource = resource_list.emplace_back();
resource.__set_id(id);
resource.__set_version(version);
}
request.__isset.resource = true;
_handle_report(request, ReportType::TABLET);
}
StorageEngine::instance()->deregister_report_listener(this);
@ -1603,63 +1623,9 @@ void TaskWorkerPool::_submit_table_compaction_worker_thread_callback() {
}
}
void TaskWorkerPool::_storage_refresh_storage_policy_worker_thread_callback() {
while (_is_work) {
_is_doing_work = false;
{
// wait at most report_task_interval_seconds, or being notified
std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock);
_worker_thread_condition_variable.wait_for(
worker_thread_lock,
std::chrono::seconds(
config::storage_refresh_storage_policy_task_interval_seconds));
}
if (!_is_work) {
break;
}
if (_master_info.network_address.port == 0) {
// port == 0 means not received heartbeat yet
// sleep a short time and try again
LOG(INFO)
<< "waiting to receive first heartbeat from frontend before doing task report";
continue;
}
_is_doing_work = true;
TGetStoragePolicyResult result;
Status status = _master_client->refresh_storage_policy(&result);
if (status.ok() && result.status.status_code == TStatusCode::OK) {
// update storage policy mgr.
StoragePolicyMgr* spm = ExecEnv::GetInstance()->storage_policy_mgr();
for (const auto& iter : result.result_entrys) {
auto policy_ptr = std::make_shared<StoragePolicy>();
policy_ptr->storage_policy_name = iter.policy_name;
policy_ptr->cooldown_datetime = iter.cooldown_datetime;
policy_ptr->cooldown_ttl = iter.cooldown_ttl;
policy_ptr->s3_endpoint = iter.s3_storage_param.s3_endpoint;
policy_ptr->s3_region = iter.s3_storage_param.s3_region;
policy_ptr->s3_ak = iter.s3_storage_param.s3_ak;
policy_ptr->s3_sk = iter.s3_storage_param.s3_sk;
policy_ptr->root_path = iter.s3_storage_param.root_path;
policy_ptr->bucket = iter.s3_storage_param.bucket;
policy_ptr->s3_conn_timeout_ms = iter.s3_storage_param.s3_conn_timeout_ms;
policy_ptr->s3_max_conn = iter.s3_storage_param.s3_max_conn;
policy_ptr->s3_request_timeout_ms = iter.s3_storage_param.s3_request_timeout_ms;
policy_ptr->md5_sum = iter.md5_checksum;
LOG_EVERY_N(INFO, 12) << "refresh storage policy task. policy=" << *policy_ptr;
spm->periodic_put(iter.policy_name, policy_ptr);
}
}
}
}
void TaskWorkerPool::_storage_update_storage_policy_worker_thread_callback() {
void TaskWorkerPool::_push_storage_policy_worker_thread_callback() {
while (_is_work) {
TAgentTaskRequest agent_task_req;
TGetStoragePolicy get_storage_policy_req;
{
std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock);
_worker_thread_condition_variable.wait(
@ -1669,31 +1635,68 @@ void TaskWorkerPool::_storage_update_storage_policy_worker_thread_callback() {
}
agent_task_req = _tasks.front();
get_storage_policy_req = agent_task_req.update_policy;
_tasks.pop_front();
}
StoragePolicyMgr* spm = ExecEnv::GetInstance()->storage_policy_mgr();
auto policy_ptr = std::make_shared<StoragePolicy>();
policy_ptr->storage_policy_name = get_storage_policy_req.policy_name;
policy_ptr->cooldown_datetime = get_storage_policy_req.cooldown_datetime;
policy_ptr->cooldown_ttl = get_storage_policy_req.cooldown_ttl;
policy_ptr->s3_endpoint = get_storage_policy_req.s3_storage_param.s3_endpoint;
policy_ptr->s3_region = get_storage_policy_req.s3_storage_param.s3_region;
policy_ptr->s3_ak = get_storage_policy_req.s3_storage_param.s3_ak;
policy_ptr->s3_sk = get_storage_policy_req.s3_storage_param.s3_sk;
policy_ptr->root_path = get_storage_policy_req.s3_storage_param.root_path;
policy_ptr->bucket = get_storage_policy_req.s3_storage_param.bucket;
policy_ptr->s3_conn_timeout_ms = get_storage_policy_req.s3_storage_param.s3_conn_timeout_ms;
policy_ptr->s3_max_conn = get_storage_policy_req.s3_storage_param.s3_max_conn;
policy_ptr->s3_request_timeout_ms =
get_storage_policy_req.s3_storage_param.s3_request_timeout_ms;
policy_ptr->md5_sum = get_storage_policy_req.md5_checksum;
LOG(INFO) << "get storage update policy task. policy=" << *policy_ptr;
spm->update(get_storage_policy_req.policy_name, policy_ptr);
_remove_task_info(agent_task_req.task_type, agent_task_req.signature);
TPushStoragePolicyReq& push_storage_policy_req = agent_task_req.push_storage_policy_req;
// refresh resource
for (auto& resource : push_storage_policy_req.resource) {
auto existed_resource = get_storage_resource(resource.id);
if (existed_resource.version >= resource.version) {
continue;
}
if (resource.__isset.s3_storage_param) {
S3Conf s3_conf;
s3_conf.ak = std::move(resource.s3_storage_param.ak);
s3_conf.sk = std::move(resource.s3_storage_param.sk);
s3_conf.endpoint = std::move(resource.s3_storage_param.endpoint);
s3_conf.region = std::move(resource.s3_storage_param.region);
s3_conf.prefix = std::move(resource.s3_storage_param.root_path);
s3_conf.bucket = std::move(resource.s3_storage_param.bucket);
s3_conf.connect_timeout_ms = resource.s3_storage_param.conn_timeout_ms;
s3_conf.max_connections = resource.s3_storage_param.max_conn;
s3_conf.request_timeout_ms = resource.s3_storage_param.request_timeout_ms;
std::shared_ptr<io::S3FileSystem> s3_fs;
if (existed_resource.fs == nullptr) {
s3_fs = io::S3FileSystem::create(s3_conf, std::to_string(resource.id));
} else {
s3_fs = std::static_pointer_cast<io::S3FileSystem>(existed_resource.fs);
s3_fs->set_conf(s3_conf);
}
auto st = s3_fs->connect();
if (!st.ok()) {
LOG(WARNING) << "update s3 resource failed: " << st;
} else {
LOG_INFO("successfully update s3 resource")
.tag("resource_id", resource.id)
.tag("resource_name", resource.name)
.tag("s3_conf", s3_conf.to_string());
put_storage_resource(resource.id, {std::move(s3_fs), resource.version});
}
} else {
LOG(WARNING) << "unknown resource=" << resource;
}
}
// drop storage policy
for (auto policy_id : push_storage_policy_req.dropped_storage_policy) {
delete_storage_policy(policy_id);
}
// refresh storage policy
for (auto& storage_policy : push_storage_policy_req.storage_policy) {
auto existed_storage_policy = get_storage_policy(storage_policy.id);
if (existed_storage_policy == nullptr ||
existed_storage_policy->version < storage_policy.version) {
auto storage_policy1 = std::make_shared<StoragePolicy>();
storage_policy1->name = std::move(storage_policy.name);
storage_policy1->version = storage_policy.version;
storage_policy1->cooldown_datetime = storage_policy.cooldown_datetime;
storage_policy1->cooldown_ttl = storage_policy.cooldown_ttl;
storage_policy1->resource_id = storage_policy.resource_id;
LOG_INFO("successfully update storage policy")
.tag("storage_policy_id", storage_policy.id)
.tag("storage_policy", storage_policy1->to_string());
put_storage_policy(storage_policy.id, std::move(storage_policy1));
}
}
}
}