Files
doris/be/src/agent/task_worker_pool.cpp
plat1ko 25b6e4deb2 [fix](daemon) Fix incorrect initialization order of daemon services (#23578)
Current initialization dependency:

      Daemon ───┬──► StorageEngine ──► ExecEnv ──► Disk/Mem/CpuInfo
                │
                │
BackendService ─┘
However, original code incorrectly initialize Daemon before StorageEngine.
This PR also stop and join threads of daemon services in their dtor, to ensure Daemon services release resources in reverse order of initialization via RAII.
2023-08-31 19:46:38 +08:00

2005 lines
87 KiB
C++

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "agent/task_worker_pool.h"
#include <fmt/format.h>
#include <gen_cpp/AgentService_types.h>
#include <gen_cpp/HeartbeatService_types.h>
#include <gen_cpp/MasterService_types.h>
#include <gen_cpp/Status_types.h>
#include <gen_cpp/Types_types.h>
#include <unistd.h>
#include <algorithm>
// IWYU pragma: no_include <bits/chrono.h>
#include <thrift/protocol/TDebugProtocol.h>
#include <chrono> // IWYU pragma: keep
#include <ctime>
#include <functional>
#include <memory>
#include <shared_mutex>
#include <sstream>
#include <string>
#include <thread>
#include <utility>
#include <vector>
#include "agent/utils.h"
#include "common/config.h"
#include "common/logging.h"
#include "common/status.h"
#include "gutil/ref_counted.h"
#include "gutil/strings/numbers.h"
#include "gutil/strings/substitute.h"
#include "io/fs/file_system.h"
#include "io/fs/local_file_system.h"
#include "io/fs/path.h"
#include "io/fs/s3_file_system.h"
#include "olap/data_dir.h"
#include "olap/olap_common.h"
#include "olap/rowset/rowset_meta.h"
#include "olap/snapshot_manager.h"
#include "olap/storage_engine.h"
#include "olap/storage_policy.h"
#include "olap/tablet.h"
#include "olap/tablet_manager.h"
#include "olap/tablet_meta.h"
#include "olap/tablet_schema.h"
#include "olap/task/engine_alter_tablet_task.h"
#include "olap/task/engine_batch_load_task.h"
#include "olap/task/engine_checksum_task.h"
#include "olap/task/engine_clone_task.h"
#include "olap/task/engine_index_change_task.h"
#include "olap/task/engine_publish_version_task.h"
#include "olap/task/engine_storage_migration_task.h"
#include "olap/txn_manager.h"
#include "olap/utils.h"
#include "runtime/exec_env.h"
#include "runtime/snapshot_loader.h"
#include "service/backend_options.h"
#include "util/doris_metrics.h"
#include "util/mem_info.h"
#include "util/random.h"
#include "util/s3_util.h"
#include "util/scoped_cleanup.h"
#include "util/stopwatch.hpp"
#include "util/threadpool.h"
#include "util/time.h"
#include "util/trace.h"
namespace doris {
using namespace ErrorCode;
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(agent_task_queue_size, MetricUnit::NOUNIT);
const uint32_t TASK_FINISH_MAX_RETRY = 3;
const uint32_t PUBLISH_VERSION_MAX_RETRY = 3;
std::atomic_ulong TaskWorkerPool::_s_report_version(time(nullptr) * 10000);
std::mutex TaskWorkerPool::_s_task_signatures_lock;
std::map<TTaskType::type, std::set<int64_t>> TaskWorkerPool::_s_task_signatures;
static bvar::LatencyRecorder g_publish_version_latency("doris_pk", "publish_version");
TaskWorkerPool::TaskWorkerPool(const TaskWorkerType task_worker_type, ExecEnv* env,
const TMasterInfo& master_info, ThreadModel thread_model)
: _master_info(master_info),
_agent_utils(new AgentUtils()),
_env(env),
_stop_background_threads_latch(1),
_is_work(false),
_thread_model(thread_model),
_is_doing_work(false),
_task_worker_type(task_worker_type) {
_backend.__set_host(BackendOptions::get_localhost());
_backend.__set_be_port(config::be_port);
_backend.__set_http_port(config::webserver_port);
string task_worker_type_name = TYPE_STRING(task_worker_type);
_name = strings::Substitute("TaskWorkerPool.$0", task_worker_type_name);
_metric_entity = DorisMetrics::instance()->metric_registry()->register_entity(
task_worker_type_name, {{"type", task_worker_type_name}});
REGISTER_ENTITY_HOOK_METRIC(_metric_entity, this, agent_task_queue_size, [this]() -> uint64_t {
if (_thread_model == ThreadModel::SINGLE_THREAD) {
return _is_doing_work.load();
} else {
std::lock_guard<std::mutex> lock(_worker_thread_lock);
return _tasks.size();
}
});
}
TaskWorkerPool::~TaskWorkerPool() {
_stop_background_threads_latch.count_down();
stop();
DEREGISTER_ENTITY_HOOK_METRIC(_metric_entity, agent_task_queue_size);
DorisMetrics::instance()->metric_registry()->deregister_entity(_metric_entity);
}
void TaskWorkerPool::start() {
// Init task pool and task workers
_is_work = true;
if (_thread_model == ThreadModel::SINGLE_THREAD) {
_worker_count = 1;
}
switch (_task_worker_type) {
case TaskWorkerType::CREATE_TABLE:
break;
case TaskWorkerType::DROP_TABLE:
break;
case TaskWorkerType::PUSH:
case TaskWorkerType::REALTIME_PUSH:
break;
case TaskWorkerType::PUBLISH_VERSION:
break;
case TaskWorkerType::CLEAR_TRANSACTION_TASK:
break;
case TaskWorkerType::DELETE:
break;
case TaskWorkerType::ALTER_TABLE:
break;
case TaskWorkerType::ALTER_INVERTED_INDEX:
_worker_count = config::alter_index_worker_count;
_cb = std::bind<void>(&TaskWorkerPool::_alter_inverted_index_worker_thread_callback, this);
break;
case TaskWorkerType::CLONE:
break;
case TaskWorkerType::STORAGE_MEDIUM_MIGRATE:
break;
case TaskWorkerType::CHECK_CONSISTENCY:
_worker_count = config::check_consistency_worker_count;
_cb = std::bind<void>(&TaskWorkerPool::_check_consistency_worker_thread_callback, this);
break;
case TaskWorkerType::REPORT_TASK:
_cb = std::bind<void>(&TaskWorkerPool::_report_task_worker_thread_callback, this);
break;
case TaskWorkerType::REPORT_DISK_STATE:
_cb = std::bind<void>(&TaskWorkerPool::_report_disk_state_worker_thread_callback, this);
break;
case TaskWorkerType::REPORT_OLAP_TABLE:
_cb = std::bind<void>(&TaskWorkerPool::_report_tablet_worker_thread_callback, this);
break;
case TaskWorkerType::UPLOAD:
_worker_count = config::upload_worker_count;
_cb = std::bind<void>(&TaskWorkerPool::_upload_worker_thread_callback, this);
break;
case TaskWorkerType::DOWNLOAD:
_worker_count = config::download_worker_count;
_cb = std::bind<void>(&TaskWorkerPool::_download_worker_thread_callback, this);
break;
case TaskWorkerType::MAKE_SNAPSHOT:
_worker_count = config::make_snapshot_worker_count;
_cb = std::bind<void>(&TaskWorkerPool::_make_snapshot_thread_callback, this);
break;
case TaskWorkerType::RELEASE_SNAPSHOT:
_worker_count = config::release_snapshot_worker_count;
_cb = std::bind<void>(&TaskWorkerPool::_release_snapshot_thread_callback, this);
break;
case TaskWorkerType::MOVE:
_worker_count = 1;
_cb = std::bind<void>(&TaskWorkerPool::_move_dir_thread_callback, this);
break;
case TaskWorkerType::UPDATE_TABLET_META_INFO:
_worker_count = 1;
_cb = std::bind<void>(&TaskWorkerPool::_update_tablet_meta_worker_thread_callback, this);
break;
case TaskWorkerType::SUBMIT_TABLE_COMPACTION:
_worker_count = 1;
_cb = std::bind<void>(&TaskWorkerPool::_submit_table_compaction_worker_thread_callback,
this);
break;
case TaskWorkerType::PUSH_STORAGE_POLICY:
_worker_count = 1;
_cb = std::bind<void>(&TaskWorkerPool::_push_storage_policy_worker_thread_callback, this);
break;
case TaskWorkerType::PUSH_COOLDOWN_CONF:
_worker_count = 1;
_cb = std::bind<void>(&TaskWorkerPool::_push_cooldown_conf_worker_thread_callback, this);
break;
case TaskWorkerType::GC_BINLOG:
_worker_count = 1;
_cb = std::bind<void>(&TaskWorkerPool::_gc_binlog_worker_thread_callback, this);
break;
default:
// pass
break;
}
CHECK(_thread_model == ThreadModel::MULTI_THREADS || _worker_count == 1);
#ifndef BE_TEST
ThreadPoolBuilder(_name)
.set_min_threads(_worker_count)
.set_max_threads(_worker_count)
.build(&_thread_pool);
for (int i = 0; i < _worker_count; i++) {
auto st = _thread_pool->submit_func(_cb);
CHECK(st.ok()) << st;
}
#endif
}
void TaskWorkerPool::stop() {
{
std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock);
_is_work = false;
_worker_thread_condition_variable.notify_all();
}
_thread_pool->shutdown();
}
void TaskWorkerPool::submit_task(const TAgentTaskRequest& task) {
const TTaskType::type task_type = task.task_type;
int64_t signature = task.signature;
std::string type_str;
EnumToString(TTaskType, task_type, type_str);
VLOG_CRITICAL << "submitting task. type=" << type_str << ", signature=" << signature;
if (_register_task_info(task_type, signature)) {
// Set the receiving time of task so that we can determine whether it is timed out later
(const_cast<TAgentTaskRequest&>(task)).__set_recv_time(time(nullptr));
size_t task_count_in_queue = 0;
{
std::lock_guard<std::mutex> worker_thread_lock(_worker_thread_lock);
_tasks.push_back(task);
task_count_in_queue = _tasks.size();
_worker_thread_condition_variable.notify_one();
}
LOG_INFO("successfully submit task")
.tag("type", type_str)
.tag("signature", signature)
.tag("queue_size", task_count_in_queue);
} else {
LOG_WARNING("failed to register task").tag("type", type_str).tag("signature", signature);
}
}
void TaskWorkerPool::notify_thread() {
_worker_thread_condition_variable.notify_one();
VLOG_CRITICAL << "notify task worker pool: " << _name;
}
bool TaskWorkerPool::_register_task_info(const TTaskType::type task_type, int64_t signature) {
if (task_type == TTaskType::type::PUSH_STORAGE_POLICY ||
task_type == TTaskType::type::PUSH_COOLDOWN_CONF) {
// no need to report task of these types
return true;
}
if (signature == -1) { // No need to report task with unintialized signature
return true;
}
std::lock_guard<std::mutex> task_signatures_lock(_s_task_signatures_lock);
std::set<int64_t>& signature_set = _s_task_signatures[task_type];
return signature_set.insert(signature).second;
}
void TaskWorkerPool::_remove_task_info(const TTaskType::type task_type, int64_t signature) {
size_t queue_size;
{
std::lock_guard<std::mutex> task_signatures_lock(_s_task_signatures_lock);
std::set<int64_t>& signature_set = _s_task_signatures[task_type];
signature_set.erase(signature);
queue_size = signature_set.size();
}
VLOG_NOTICE << "remove task info. type=" << task_type << ", signature=" << signature
<< ", queue_size=" << queue_size;
}
void TaskWorkerPool::_finish_task(const TFinishTaskRequest& finish_task_request) {
// Return result to FE
TMasterResult result;
uint32_t try_time = 0;
while (try_time < TASK_FINISH_MAX_RETRY) {
DorisMetrics::instance()->finish_task_requests_total->increment(1);
Status client_status =
MasterServerClient::instance()->finish_task(finish_task_request, &result);
if (client_status.ok()) {
break;
} else {
DorisMetrics::instance()->finish_task_requests_failed->increment(1);
LOG_WARNING("failed to finish task")
.tag("type", finish_task_request.task_type)
.tag("signature", finish_task_request.signature)
.error(result.status);
try_time += 1;
}
sleep(config::sleep_one_second);
}
}
void TaskWorkerPool::_alter_inverted_index_worker_thread_callback() {
while (_is_work) {
TAgentTaskRequest agent_task_req;
{
std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock);
_worker_thread_condition_variable.wait(
worker_thread_lock, [this]() { return !_is_work || !_tasks.empty(); });
if (!_is_work) {
return;
}
agent_task_req = _tasks.front();
_tasks.pop_front();
}
auto& alter_inverted_index_rq = agent_task_req.alter_inverted_index_req;
LOG(INFO) << "get alter inverted index task. signature=" << agent_task_req.signature
<< ", tablet_id=" << alter_inverted_index_rq.tablet_id
<< ", job_id=" << alter_inverted_index_rq.job_id;
Status status = Status::OK();
TabletSharedPtr tablet_ptr = StorageEngine::instance()->tablet_manager()->get_tablet(
alter_inverted_index_rq.tablet_id);
if (tablet_ptr != nullptr) {
EngineIndexChangeTask engine_task(alter_inverted_index_rq);
status = StorageEngine::instance()->execute_task(&engine_task);
} else {
status =
Status::NotFound("could not find tablet {}", alter_inverted_index_rq.tablet_id);
}
// Return result to fe
TFinishTaskRequest finish_task_request;
finish_task_request.__set_backend(_backend);
finish_task_request.__set_task_type(agent_task_req.task_type);
finish_task_request.__set_signature(agent_task_req.signature);
std::vector<TTabletInfo> finish_tablet_infos;
if (!status.ok()) {
LOG(WARNING) << "failed to alter inverted index task, signature="
<< agent_task_req.signature
<< ", tablet_id=" << alter_inverted_index_rq.tablet_id
<< ", job_id=" << alter_inverted_index_rq.job_id << ", error=" << status;
} else {
LOG(INFO) << "successfully alter inverted index task, signature="
<< agent_task_req.signature
<< ", tablet_id=" << alter_inverted_index_rq.tablet_id
<< ", job_id=" << alter_inverted_index_rq.job_id;
TTabletInfo tablet_info;
status = _get_tablet_info(alter_inverted_index_rq.tablet_id,
alter_inverted_index_rq.schema_hash, agent_task_req.signature,
&tablet_info);
if (status.ok()) {
finish_tablet_infos.push_back(tablet_info);
}
finish_task_request.__set_finish_tablet_infos(finish_tablet_infos);
}
finish_task_request.__set_task_status(status.to_thrift());
_finish_task(finish_task_request);
_remove_task_info(agent_task_req.task_type, agent_task_req.signature);
}
}
void TaskWorkerPool::_update_tablet_meta_worker_thread_callback() {
while (_is_work) {
TAgentTaskRequest agent_task_req;
{
std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock);
_worker_thread_condition_variable.wait(
worker_thread_lock, [this]() { return !_is_work || !_tasks.empty(); });
if (!_is_work) {
return;
}
agent_task_req = _tasks.front();
_tasks.pop_front();
}
LOG(INFO) << "get update tablet meta task. signature=" << agent_task_req.signature;
Status status;
auto& update_tablet_meta_req = agent_task_req.update_tablet_meta_info_req;
for (auto& tablet_meta_info : update_tablet_meta_req.tabletMetaInfos) {
TabletSharedPtr tablet = StorageEngine::instance()->tablet_manager()->get_tablet(
tablet_meta_info.tablet_id);
if (tablet == nullptr) {
status = Status::NotFound("tablet not found");
LOG(WARNING) << "could not find tablet when update tablet meta. tablet_id="
<< tablet_meta_info.tablet_id;
continue;
}
bool need_to_save = false;
if (tablet_meta_info.__isset.storage_policy_id) {
tablet->tablet_meta()->set_storage_policy_id(tablet_meta_info.storage_policy_id);
need_to_save = true;
}
if (tablet_meta_info.__isset.is_in_memory) {
tablet->tablet_meta()->mutable_tablet_schema()->set_is_in_memory(
tablet_meta_info.is_in_memory);
std::shared_lock rlock(tablet->get_header_lock());
for (auto& rowset_meta : tablet->tablet_meta()->all_mutable_rs_metas()) {
rowset_meta->tablet_schema()->set_is_in_memory(tablet_meta_info.is_in_memory);
}
tablet->tablet_schema_unlocked()->set_is_in_memory(tablet_meta_info.is_in_memory);
need_to_save = true;
}
if (tablet_meta_info.__isset.compaction_policy) {
if (tablet_meta_info.compaction_policy != "size_based" &&
tablet_meta_info.compaction_policy != "time_series") {
status = Status::InvalidArgument(
"invalid compaction policy, only support for size_based or "
"time_series");
continue;
}
tablet->tablet_meta()->set_compaction_policy(tablet_meta_info.compaction_policy);
need_to_save = true;
}
if (tablet_meta_info.__isset.time_series_compaction_goal_size_mbytes) {
if (tablet->tablet_meta()->compaction_policy() != "time_series") {
status = Status::InvalidArgument(
"only time series compaction policy support time series config");
continue;
}
tablet->tablet_meta()->set_time_series_compaction_goal_size_mbytes(
tablet_meta_info.time_series_compaction_goal_size_mbytes);
need_to_save = true;
}
if (tablet_meta_info.__isset.time_series_compaction_file_count_threshold) {
if (tablet->tablet_meta()->compaction_policy() != "time_series") {
status = Status::InvalidArgument(
"only time series compaction policy support time series config");
continue;
}
tablet->tablet_meta()->set_time_series_compaction_file_count_threshold(
tablet_meta_info.time_series_compaction_file_count_threshold);
need_to_save = true;
}
if (tablet_meta_info.__isset.time_series_compaction_time_threshold_seconds) {
if (tablet->tablet_meta()->compaction_policy() != "time_series") {
status = Status::InvalidArgument(
"only time series compaction policy support time series config");
continue;
}
tablet->tablet_meta()->set_time_series_compaction_time_threshold_seconds(
tablet_meta_info.time_series_compaction_time_threshold_seconds);
need_to_save = true;
}
if (tablet_meta_info.__isset.replica_id) {
tablet->tablet_meta()->set_replica_id(tablet_meta_info.replica_id);
}
if (tablet_meta_info.__isset.binlog_config) {
// check binlog_config require fields: enable, ttl_seconds, max_bytes, max_history_nums
auto& t_binlog_config = tablet_meta_info.binlog_config;
if (!t_binlog_config.__isset.enable || !t_binlog_config.__isset.ttl_seconds ||
!t_binlog_config.__isset.max_bytes ||
!t_binlog_config.__isset.max_history_nums) {
status = Status::InvalidArgument("invalid binlog config, some fields not set");
LOG(WARNING) << fmt::format(
"invalid binlog config, some fields not set, tablet_id={}, "
"t_binlog_config={}",
tablet_meta_info.tablet_id,
apache::thrift::ThriftDebugString(t_binlog_config));
continue;
}
BinlogConfig new_binlog_config;
new_binlog_config = tablet_meta_info.binlog_config;
LOG(INFO) << fmt::format(
"update tablet meta binlog config. tablet_id={}, old_binlog_config={}, "
"new_binlog_config={}",
tablet_meta_info.tablet_id,
tablet->tablet_meta()->binlog_config().to_string(),
new_binlog_config.to_string());
tablet->set_binlog_config(new_binlog_config);
need_to_save = true;
}
if (tablet_meta_info.__isset.enable_single_replica_compaction) {
std::shared_lock rlock(tablet->get_header_lock());
tablet->tablet_meta()
->mutable_tablet_schema()
->set_enable_single_replica_compaction(
tablet_meta_info.enable_single_replica_compaction);
for (auto& rowset_meta : tablet->tablet_meta()->all_mutable_rs_metas()) {
rowset_meta->tablet_schema()->set_enable_single_replica_compaction(
tablet_meta_info.enable_single_replica_compaction);
}
tablet->tablet_schema_unlocked()->set_enable_single_replica_compaction(
tablet_meta_info.enable_single_replica_compaction);
need_to_save = true;
}
if (tablet_meta_info.__isset.skip_write_index_on_load) {
std::shared_lock rlock(tablet->get_header_lock());
tablet->tablet_meta()->mutable_tablet_schema()->set_skip_write_index_on_load(
tablet_meta_info.skip_write_index_on_load);
for (auto& rowset_meta : tablet->tablet_meta()->all_mutable_rs_metas()) {
rowset_meta->tablet_schema()->set_skip_write_index_on_load(
tablet_meta_info.skip_write_index_on_load);
}
tablet->tablet_schema_unlocked()->set_skip_write_index_on_load(
tablet_meta_info.skip_write_index_on_load);
need_to_save = true;
}
if (need_to_save) {
std::shared_lock rlock(tablet->get_header_lock());
tablet->save_meta();
}
}
LOG(INFO) << "finish update tablet meta task. signature=" << agent_task_req.signature;
if (agent_task_req.signature != -1) {
TFinishTaskRequest finish_task_request;
finish_task_request.__set_task_status(status.to_thrift());
finish_task_request.__set_backend(BackendOptions::get_local_backend());
finish_task_request.__set_task_type(agent_task_req.task_type);
finish_task_request.__set_signature(agent_task_req.signature);
_finish_task(finish_task_request);
_remove_task_info(agent_task_req.task_type, agent_task_req.signature);
}
}
}
void TaskWorkerPool::_check_consistency_worker_thread_callback() {
while (_is_work) {
TAgentTaskRequest agent_task_req;
TCheckConsistencyReq check_consistency_req;
{
std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock);
_worker_thread_condition_variable.wait(
worker_thread_lock, [this]() { return !_is_work || !_tasks.empty(); });
if (!_is_work) {
return;
}
agent_task_req = _tasks.front();
check_consistency_req = agent_task_req.check_consistency_req;
_tasks.pop_front();
}
uint32_t checksum = 0;
EngineChecksumTask engine_task(check_consistency_req.tablet_id,
check_consistency_req.schema_hash,
check_consistency_req.version, &checksum);
Status status = StorageEngine::instance()->execute_task(&engine_task);
if (!status.ok()) {
LOG_WARNING("failed to check consistency")
.tag("signature", agent_task_req.signature)
.tag("tablet_id", check_consistency_req.tablet_id)
.error(status);
} else {
LOG_INFO("successfully check consistency")
.tag("signature", agent_task_req.signature)
.tag("tablet_id", check_consistency_req.tablet_id)
.tag("checksum", checksum);
}
TFinishTaskRequest finish_task_request;
finish_task_request.__set_backend(BackendOptions::get_local_backend());
finish_task_request.__set_task_type(agent_task_req.task_type);
finish_task_request.__set_signature(agent_task_req.signature);
finish_task_request.__set_task_status(status.to_thrift());
finish_task_request.__set_tablet_checksum(static_cast<int64_t>(checksum));
finish_task_request.__set_request_version(check_consistency_req.version);
_finish_task(finish_task_request);
_remove_task_info(agent_task_req.task_type, agent_task_req.signature);
}
}
void TaskWorkerPool::_report_task_worker_thread_callback() {
StorageEngine::instance()->register_report_listener(this);
TReportRequest request;
while (_is_work) {
_is_doing_work = false;
{
// wait at most report_task_interval_seconds, or being notified
std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock);
_worker_thread_condition_variable.wait_for(
worker_thread_lock, std::chrono::seconds(config::report_task_interval_seconds));
}
if (!_is_work) {
break;
}
if (_master_info.network_address.port == 0) {
// port == 0 means not received heartbeat yet
// sleep a short time and try again
LOG(INFO)
<< "waiting to receive first heartbeat from frontend before doing task report";
continue;
}
_is_doing_work = true;
// See _random_sleep() comment in _report_disk_state_worker_thread_callback
_random_sleep(5);
{
std::lock_guard<std::mutex> task_signatures_lock(_s_task_signatures_lock);
request.__set_tasks(_s_task_signatures);
request.__set_backend(BackendOptions::get_local_backend());
}
_handle_report(request, ReportType::TASK);
}
StorageEngine::instance()->deregister_report_listener(this);
}
/// disk state report thread will report disk state at a configurable fix interval.
void TaskWorkerPool::_report_disk_state_worker_thread_callback() {
StorageEngine::instance()->register_report_listener(this);
while (_is_work) {
_is_doing_work = false;
{
// wait at most report_disk_state_interval_seconds, or being notified
std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock);
_worker_thread_condition_variable.wait_for(
worker_thread_lock,
std::chrono::seconds(config::report_disk_state_interval_seconds));
}
if (!_is_work) {
break;
}
if (_master_info.network_address.port == 0) {
// port == 0 means not received heartbeat yet
LOG(INFO)
<< "waiting to receive first heartbeat from frontend before doing disk report";
continue;
}
_is_doing_work = true;
// Random sleep 1~5 seconds before doing report.
// In order to avoid the problem that the FE receives many report requests at the same time
// and can not be processed.
_random_sleep(5);
TReportRequest request;
request.__set_backend(BackendOptions::get_local_backend());
request.__isset.disks = true;
std::vector<DataDirInfo> data_dir_infos;
StorageEngine::instance()->get_all_data_dir_info(&data_dir_infos, true /* update */);
for (auto& root_path_info : data_dir_infos) {
TDisk disk;
disk.__set_root_path(root_path_info.path);
disk.__set_path_hash(root_path_info.path_hash);
disk.__set_storage_medium(root_path_info.storage_medium);
disk.__set_disk_total_capacity(root_path_info.disk_capacity);
disk.__set_data_used_capacity(root_path_info.local_used_capacity);
disk.__set_remote_used_capacity(root_path_info.remote_used_capacity);
disk.__set_disk_available_capacity(root_path_info.available);
disk.__set_used(root_path_info.is_used);
request.disks[root_path_info.path] = disk;
}
request.__set_num_cores(CpuInfo::num_cores());
request.__set_pipeline_executor_size(config::pipeline_executor_size > 0
? config::pipeline_executor_size
: CpuInfo::num_cores());
_handle_report(request, ReportType::DISK);
}
StorageEngine::instance()->deregister_report_listener(this);
}
void TaskWorkerPool::_report_tablet_worker_thread_callback() {
StorageEngine::instance()->register_report_listener(this);
while (_is_work) {
_is_doing_work = false;
{
// wait at most report_tablet_interval_seconds, or being notified
std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock);
_worker_thread_condition_variable.wait_for(
worker_thread_lock,
std::chrono::seconds(config::report_tablet_interval_seconds));
}
if (!_is_work) {
break;
}
if (_master_info.network_address.port == 0) {
// port == 0 means not received heartbeat yet
LOG(INFO) << "waiting to receive first heartbeat from frontend before doing tablet "
"report";
continue;
}
_is_doing_work = true;
// See _random_sleep() comment in _report_disk_state_worker_thread_callback
_random_sleep(5);
TReportRequest request;
request.__set_backend(BackendOptions::get_local_backend());
request.__isset.tablets = true;
uint64_t report_version = _s_report_version;
StorageEngine::instance()->tablet_manager()->build_all_report_tablets_info(
&request.tablets);
if (report_version < _s_report_version) {
// TODO llj This can only reduce the possibility for report error, but can't avoid it.
// If FE create a tablet in FE meta and send CREATE task to this BE, the tablet may not be included in this
// report, and the report version has a small probability that it has not been updated in time. When FE
// receives this report, it is possible to delete the new tablet.
LOG(WARNING) << "report version " << report_version << " change to "
<< _s_report_version;
DorisMetrics::instance()->report_all_tablets_requests_skip->increment(1);
continue;
}
int64_t max_compaction_score =
std::max(DorisMetrics::instance()->tablet_cumulative_max_compaction_score->value(),
DorisMetrics::instance()->tablet_base_max_compaction_score->value());
request.__set_tablet_max_compaction_score(max_compaction_score);
request.__set_report_version(report_version);
// report storage policy and resource
auto& storage_policy_list = request.storage_policy;
for (auto [id, version] : get_storage_policy_ids()) {
auto& storage_policy = storage_policy_list.emplace_back();
storage_policy.__set_id(id);
storage_policy.__set_version(version);
}
request.__isset.storage_policy = true;
auto& resource_list = request.resource;
for (auto [id, version] : get_storage_resource_ids()) {
auto& resource = resource_list.emplace_back();
resource.__set_id(id);
resource.__set_version(version);
}
request.__isset.resource = true;
_handle_report(request, ReportType::TABLET);
}
StorageEngine::instance()->deregister_report_listener(this);
}
void TaskWorkerPool::_upload_worker_thread_callback() {
while (_is_work) {
TAgentTaskRequest agent_task_req;
TUploadReq upload_request;
{
std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock);
_worker_thread_condition_variable.wait(
worker_thread_lock, [this]() { return !_is_work || !_tasks.empty(); });
if (!_is_work) {
return;
}
agent_task_req = _tasks.front();
upload_request = agent_task_req.upload_req;
_tasks.pop_front();
}
LOG(INFO) << "get upload task. signature=" << agent_task_req.signature
<< ", job_id=" << upload_request.job_id;
std::map<int64_t, std::vector<std::string>> tablet_files;
std::unique_ptr<SnapshotLoader> loader = std::make_unique<SnapshotLoader>(
_env, upload_request.job_id, agent_task_req.signature, upload_request.broker_addr,
upload_request.broker_prop);
Status status = loader->init(
upload_request.__isset.storage_backend ? upload_request.storage_backend
: TStorageBackendType::type::BROKER,
upload_request.__isset.location ? upload_request.location : "");
if (status.ok()) {
status = loader->upload(upload_request.src_dest_map, &tablet_files);
}
if (!status.ok()) {
LOG_WARNING("failed to upload")
.tag("signature", agent_task_req.signature)
.tag("job_id", upload_request.job_id)
.error(status);
} else {
LOG_INFO("successfully upload")
.tag("signature", agent_task_req.signature)
.tag("job_id", upload_request.job_id);
}
TFinishTaskRequest finish_task_request;
finish_task_request.__set_backend(BackendOptions::get_local_backend());
finish_task_request.__set_task_type(agent_task_req.task_type);
finish_task_request.__set_signature(agent_task_req.signature);
finish_task_request.__set_task_status(status.to_thrift());
finish_task_request.__set_tablet_files(tablet_files);
_finish_task(finish_task_request);
_remove_task_info(agent_task_req.task_type, agent_task_req.signature);
}
}
void TaskWorkerPool::_download_worker_thread_callback() {
while (_is_work) {
TAgentTaskRequest agent_task_req;
TDownloadReq download_request;
{
std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock);
_worker_thread_condition_variable.wait(
worker_thread_lock, [this]() { return !_is_work || !_tasks.empty(); });
if (!_is_work) {
return;
}
agent_task_req = _tasks.front();
download_request = agent_task_req.download_req;
_tasks.pop_front();
}
LOG(INFO) << "get download task. signature=" << agent_task_req.signature
<< ", job_id=" << download_request.job_id
<< ", task detail: " << apache::thrift::ThriftDebugString(download_request);
// TODO: download
std::vector<int64_t> downloaded_tablet_ids;
auto status = Status::OK();
if (download_request.__isset.remote_tablet_snapshots) {
std::unique_ptr<SnapshotLoader> loader = std::make_unique<SnapshotLoader>(
_env, download_request.job_id, agent_task_req.signature);
status = loader->remote_http_download(download_request.remote_tablet_snapshots,
&downloaded_tablet_ids);
} else {
std::unique_ptr<SnapshotLoader> loader = std::make_unique<SnapshotLoader>(
_env, download_request.job_id, agent_task_req.signature,
download_request.broker_addr, download_request.broker_prop);
status = loader->init(
download_request.__isset.storage_backend ? download_request.storage_backend
: TStorageBackendType::type::BROKER,
download_request.__isset.location ? download_request.location : "");
if (status.ok()) {
status = loader->download(download_request.src_dest_map, &downloaded_tablet_ids);
}
}
if (!status.ok()) {
LOG_WARNING("failed to download")
.tag("signature", agent_task_req.signature)
.tag("job_id", download_request.job_id)
.error(status);
} else {
LOG_INFO("successfully download")
.tag("signature", agent_task_req.signature)
.tag("job_id", download_request.job_id);
}
TFinishTaskRequest finish_task_request;
finish_task_request.__set_backend(BackendOptions::get_local_backend());
finish_task_request.__set_task_type(agent_task_req.task_type);
finish_task_request.__set_signature(agent_task_req.signature);
finish_task_request.__set_task_status(status.to_thrift());
finish_task_request.__set_downloaded_tablet_ids(downloaded_tablet_ids);
_finish_task(finish_task_request);
_remove_task_info(agent_task_req.task_type, agent_task_req.signature);
}
}
void TaskWorkerPool::_make_snapshot_thread_callback() {
while (_is_work) {
TAgentTaskRequest agent_task_req;
TSnapshotRequest snapshot_request;
{
std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock);
_worker_thread_condition_variable.wait(
worker_thread_lock, [this]() { return !_is_work || !_tasks.empty(); });
if (!_is_work) {
return;
}
agent_task_req = _tasks.front();
snapshot_request = agent_task_req.snapshot_req;
_tasks.pop_front();
}
LOG(INFO) << "get snapshot task. signature=" << agent_task_req.signature;
string snapshot_path;
bool allow_incremental_clone = false; // not used
std::vector<string> snapshot_files;
Status status = SnapshotManager::instance()->make_snapshot(snapshot_request, &snapshot_path,
&allow_incremental_clone);
if (status.ok() && snapshot_request.__isset.list_files) {
// list and save all snapshot files
// snapshot_path like: data/snapshot/20180417205230.1.86400
// we need to add subdir: tablet_id/schema_hash/
std::vector<io::FileInfo> files;
bool exists = true;
io::Path path = fmt::format("{}/{}/{}/", snapshot_path, snapshot_request.tablet_id,
snapshot_request.schema_hash);
status = io::global_local_filesystem()->list(path, true, &files, &exists);
if (status.ok()) {
for (auto& file : files) {
snapshot_files.push_back(file.file_name);
}
}
}
if (!status.ok()) {
LOG_WARNING("failed to make snapshot")
.tag("signature", agent_task_req.signature)
.tag("tablet_id", snapshot_request.tablet_id)
.tag("version", snapshot_request.version)
.error(status);
} else {
LOG_INFO("successfully make snapshot")
.tag("signature", agent_task_req.signature)
.tag("tablet_id", snapshot_request.tablet_id)
.tag("version", snapshot_request.version)
.tag("snapshot_path", snapshot_path);
}
TFinishTaskRequest finish_task_request;
finish_task_request.__set_backend(BackendOptions::get_local_backend());
finish_task_request.__set_task_type(agent_task_req.task_type);
finish_task_request.__set_signature(agent_task_req.signature);
finish_task_request.__set_snapshot_path(snapshot_path);
finish_task_request.__set_snapshot_files(snapshot_files);
finish_task_request.__set_task_status(status.to_thrift());
_finish_task(finish_task_request);
_remove_task_info(agent_task_req.task_type, agent_task_req.signature);
}
}
void TaskWorkerPool::_release_snapshot_thread_callback() {
while (_is_work) {
TAgentTaskRequest agent_task_req;
TReleaseSnapshotRequest release_snapshot_request;
{
std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock);
_worker_thread_condition_variable.wait(
worker_thread_lock, [this]() { return !_is_work || !_tasks.empty(); });
if (!_is_work) {
return;
}
agent_task_req = _tasks.front();
release_snapshot_request = agent_task_req.release_snapshot_req;
_tasks.pop_front();
}
LOG(INFO) << "get release snapshot task. signature=" << agent_task_req.signature;
string& snapshot_path = release_snapshot_request.snapshot_path;
Status status = SnapshotManager::instance()->release_snapshot(snapshot_path);
if (!status.ok()) {
LOG_WARNING("failed to release snapshot")
.tag("signature", agent_task_req.signature)
.tag("snapshot_path", snapshot_path)
.error(status);
} else {
LOG_INFO("successfully release snapshot")
.tag("signature", agent_task_req.signature)
.tag("snapshot_path", snapshot_path);
}
TFinishTaskRequest finish_task_request;
finish_task_request.__set_backend(BackendOptions::get_local_backend());
finish_task_request.__set_task_type(agent_task_req.task_type);
finish_task_request.__set_signature(agent_task_req.signature);
finish_task_request.__set_task_status(status.to_thrift());
_finish_task(finish_task_request);
_remove_task_info(agent_task_req.task_type, agent_task_req.signature);
}
}
Status TaskWorkerPool::_get_tablet_info(const TTabletId tablet_id, const TSchemaHash schema_hash,
int64_t signature, TTabletInfo* tablet_info) {
tablet_info->__set_tablet_id(tablet_id);
tablet_info->__set_schema_hash(schema_hash);
return StorageEngine::instance()->tablet_manager()->report_tablet_info(tablet_info);
}
void TaskWorkerPool::_move_dir_thread_callback() {
while (_is_work) {
TAgentTaskRequest agent_task_req;
TMoveDirReq move_dir_req;
{
std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock);
_worker_thread_condition_variable.wait(
worker_thread_lock, [this]() { return !_is_work || !_tasks.empty(); });
if (!_is_work) {
return;
}
agent_task_req = _tasks.front();
move_dir_req = agent_task_req.move_dir_req;
_tasks.pop_front();
}
LOG(INFO) << "get move dir task. signature=" << agent_task_req.signature
<< ", job_id=" << move_dir_req.job_id;
Status status = _move_dir(move_dir_req.tablet_id, move_dir_req.src, move_dir_req.job_id,
true /* TODO */);
if (!status.ok()) {
LOG_WARNING("failed to move dir")
.tag("signature", agent_task_req.signature)
.tag("job_id", move_dir_req.job_id)
.tag("tablet_id", move_dir_req.tablet_id)
.tag("src", move_dir_req.src)
.error(status);
} else {
LOG_INFO("successfully move dir")
.tag("signature", agent_task_req.signature)
.tag("job_id", move_dir_req.job_id)
.tag("tablet_id", move_dir_req.tablet_id)
.tag("src", move_dir_req.src);
}
TFinishTaskRequest finish_task_request;
finish_task_request.__set_backend(BackendOptions::get_local_backend());
finish_task_request.__set_task_type(agent_task_req.task_type);
finish_task_request.__set_signature(agent_task_req.signature);
finish_task_request.__set_task_status(status.to_thrift());
_finish_task(finish_task_request);
_remove_task_info(agent_task_req.task_type, agent_task_req.signature);
}
}
Status TaskWorkerPool::_move_dir(const TTabletId tablet_id, const std::string& src, int64_t job_id,
bool overwrite) {
TabletSharedPtr tablet = StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id);
if (tablet == nullptr) {
return Status::InvalidArgument("Could not find tablet");
}
SnapshotLoader loader(_env, job_id, tablet_id);
return loader.move(src, tablet, overwrite);
}
void TaskWorkerPool::_handle_report(const TReportRequest& request, ReportType type) {
TMasterResult result;
Status status = MasterServerClient::instance()->report(request, &result);
bool is_report_success = false;
if (!status.ok()) {
LOG_WARNING("failed to report {}", TYPE_STRING(type))
.tag("host", _master_info.network_address.hostname)
.tag("port", _master_info.network_address.port)
.error(status);
} else if (result.status.status_code != TStatusCode::OK) {
LOG_WARNING("failed to report {}", TYPE_STRING(type))
.tag("host", _master_info.network_address.hostname)
.tag("port", _master_info.network_address.port)
.error(result.status);
} else {
is_report_success = true;
LOG_INFO("successfully report {}", TYPE_STRING(type))
.tag("host", _master_info.network_address.hostname)
.tag("port", _master_info.network_address.port);
}
switch (type) {
case TASK:
DorisMetrics::instance()->report_task_requests_total->increment(1);
if (!is_report_success) {
DorisMetrics::instance()->report_task_requests_failed->increment(1);
}
break;
case DISK:
DorisMetrics::instance()->report_disk_requests_total->increment(1);
if (!is_report_success) {
DorisMetrics::instance()->report_disk_requests_failed->increment(1);
}
break;
case TABLET:
DorisMetrics::instance()->report_tablet_requests_total->increment(1);
if (!is_report_success) {
DorisMetrics::instance()->report_tablet_requests_failed->increment(1);
}
break;
default:
break;
}
}
void TaskWorkerPool::_random_sleep(int second) {
Random rnd(UnixMillis());
sleep(rnd.Uniform(second) + 1);
}
void TaskWorkerPool::_submit_table_compaction_worker_thread_callback() {
while (_is_work) {
TAgentTaskRequest agent_task_req;
TCompactionReq compaction_req;
{
std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock);
_worker_thread_condition_variable.wait(
worker_thread_lock, [this]() { return !_is_work || !_tasks.empty(); });
if (!_is_work) {
return;
}
agent_task_req = _tasks.front();
compaction_req = agent_task_req.compaction_req;
_tasks.pop_front();
}
LOG(INFO) << "get compaction task. signature=" << agent_task_req.signature
<< ", compaction_type=" << compaction_req.type;
CompactionType compaction_type;
if (compaction_req.type == "base") {
compaction_type = CompactionType::BASE_COMPACTION;
} else {
compaction_type = CompactionType::CUMULATIVE_COMPACTION;
}
TabletSharedPtr tablet_ptr =
StorageEngine::instance()->tablet_manager()->get_tablet(compaction_req.tablet_id);
if (tablet_ptr != nullptr) {
auto data_dir = tablet_ptr->data_dir();
if (!tablet_ptr->can_do_compaction(data_dir->path_hash(), compaction_type)) {
LOG(WARNING) << "could not do compaction. tablet_id=" << tablet_ptr->tablet_id()
<< ", compaction_type=" << compaction_type;
_remove_task_info(agent_task_req.task_type, agent_task_req.signature);
continue;
}
Status status = StorageEngine::instance()->submit_compaction_task(
tablet_ptr, compaction_type, false);
if (!status.ok()) {
LOG(WARNING) << "failed to submit table compaction task. error=" << status;
}
_remove_task_info(agent_task_req.task_type, agent_task_req.signature);
}
}
}
void TaskWorkerPool::_push_storage_policy_worker_thread_callback() {
while (_is_work) {
TAgentTaskRequest agent_task_req;
{
std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock);
_worker_thread_condition_variable.wait(
worker_thread_lock, [this]() { return !_is_work || !_tasks.empty(); });
if (!_is_work) {
return;
}
agent_task_req = _tasks.front();
_tasks.pop_front();
}
TPushStoragePolicyReq& push_storage_policy_req = agent_task_req.push_storage_policy_req;
// refresh resource
for (auto& resource : push_storage_policy_req.resource) {
auto existed_resource = get_storage_resource(resource.id);
if (existed_resource.version >= resource.version) {
continue;
}
if (resource.__isset.s3_storage_param) {
Status st;
S3Conf s3_conf;
s3_conf.ak = std::move(resource.s3_storage_param.ak);
s3_conf.sk = std::move(resource.s3_storage_param.sk);
s3_conf.endpoint = std::move(resource.s3_storage_param.endpoint);
s3_conf.region = std::move(resource.s3_storage_param.region);
s3_conf.prefix = std::move(resource.s3_storage_param.root_path);
s3_conf.bucket = std::move(resource.s3_storage_param.bucket);
s3_conf.connect_timeout_ms = resource.s3_storage_param.conn_timeout_ms;
s3_conf.max_connections = resource.s3_storage_param.max_conn;
s3_conf.request_timeout_ms = resource.s3_storage_param.request_timeout_ms;
// When using cold heat separation in minio, user might use ip address directly,
// which needs enable use_virtual_addressing to true
s3_conf.use_virtual_addressing = !resource.s3_storage_param.use_path_style;
std::shared_ptr<io::S3FileSystem> fs;
if (existed_resource.fs == nullptr) {
st = io::S3FileSystem::create(s3_conf, std::to_string(resource.id), &fs);
} else {
fs = std::static_pointer_cast<io::S3FileSystem>(existed_resource.fs);
fs->set_conf(s3_conf);
}
if (!st.ok()) {
LOG(WARNING) << "update s3 resource failed: " << st;
} else {
LOG_INFO("successfully update s3 resource")
.tag("resource_id", resource.id)
.tag("resource_name", resource.name)
.tag("s3_conf", s3_conf.to_string());
put_storage_resource(resource.id, {std::move(fs), resource.version});
}
} else {
LOG(WARNING) << "unknown resource=" << resource;
}
}
// drop storage policy
for (auto policy_id : push_storage_policy_req.dropped_storage_policy) {
delete_storage_policy(policy_id);
}
// refresh storage policy
for (auto& storage_policy : push_storage_policy_req.storage_policy) {
auto existed_storage_policy = get_storage_policy(storage_policy.id);
if (existed_storage_policy == nullptr ||
existed_storage_policy->version < storage_policy.version) {
auto storage_policy1 = std::make_shared<StoragePolicy>();
storage_policy1->name = std::move(storage_policy.name);
storage_policy1->version = storage_policy.version;
storage_policy1->cooldown_datetime = storage_policy.cooldown_datetime;
storage_policy1->cooldown_ttl = storage_policy.cooldown_ttl;
storage_policy1->resource_id = storage_policy.resource_id;
LOG_INFO("successfully update storage policy")
.tag("storage_policy_id", storage_policy.id)
.tag("storage_policy", storage_policy1->to_string());
put_storage_policy(storage_policy.id, std::move(storage_policy1));
}
}
}
}
void TaskWorkerPool::_push_cooldown_conf_worker_thread_callback() {
while (_is_work) {
TAgentTaskRequest agent_task_req;
{
std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock);
while (_is_work && _tasks.empty()) {
_worker_thread_condition_variable.wait(worker_thread_lock);
}
if (!_is_work) {
return;
}
agent_task_req = _tasks.front();
_tasks.pop_front();
}
TPushCooldownConfReq& push_cooldown_conf_req = agent_task_req.push_cooldown_conf;
for (auto& cooldown_conf : push_cooldown_conf_req.cooldown_confs) {
int64_t tablet_id = cooldown_conf.tablet_id;
TabletSharedPtr tablet =
StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id);
if (tablet == nullptr) {
LOG(WARNING) << "failed to get tablet. tablet_id=" << tablet_id;
continue;
}
if (tablet->update_cooldown_conf(cooldown_conf.cooldown_term,
cooldown_conf.cooldown_replica_id) &&
cooldown_conf.cooldown_replica_id == tablet->replica_id() &&
tablet->tablet_meta()->cooldown_meta_id().initialized()) {
Tablet::async_write_cooldown_meta(tablet);
}
}
}
}
CreateTableTaskPool::CreateTableTaskPool(ExecEnv* env, ThreadModel thread_model)
: TaskWorkerPool(TaskWorkerType::CREATE_TABLE, env, *env->master_info(), thread_model) {
_worker_count = config::create_tablet_worker_count;
_cb = [this]() { _create_tablet_worker_thread_callback(); };
}
void CreateTableTaskPool::_create_tablet_worker_thread_callback() {
while (_is_work) {
TAgentTaskRequest agent_task_req;
{
std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock);
_worker_thread_condition_variable.wait(
worker_thread_lock, [this]() { return !_is_work || !_tasks.empty(); });
if (!_is_work) {
return;
}
agent_task_req = _tasks.front();
_tasks.pop_front();
}
const TCreateTabletReq& create_tablet_req = agent_task_req.create_tablet_req;
RuntimeProfile runtime_profile("CreateTablet");
RuntimeProfile* profile = &runtime_profile;
MonotonicStopWatch watch;
watch.start();
SCOPED_CLEANUP({
int64_t elapsed_time = static_cast<int64_t>(watch.elapsed_time());
if (elapsed_time / 1e9 > config::agent_task_trace_threshold_sec) {
COUNTER_UPDATE(profile->total_time_counter(), elapsed_time);
std::stringstream ss;
profile->pretty_print(&ss);
LOG(WARNING) << "create tablet cost(s) " << elapsed_time / 1e9 << std::endl
<< ss.str();
}
});
DorisMetrics::instance()->create_tablet_requests_total->increment(1);
VLOG_NOTICE << "start to create tablet " << create_tablet_req.tablet_id;
std::vector<TTabletInfo> finish_tablet_infos;
VLOG_NOTICE << "create tablet: " << create_tablet_req;
Status status = StorageEngine::instance()->create_tablet(create_tablet_req, profile);
if (!status.ok()) {
DorisMetrics::instance()->create_tablet_requests_failed->increment(1);
LOG_WARNING("failed to create tablet, reason={}", status.to_string())
.tag("signature", agent_task_req.signature)
.tag("tablet_id", create_tablet_req.tablet_id)
.error(status);
} else {
++_s_report_version;
// get path hash of the created tablet
TabletSharedPtr tablet;
{
SCOPED_TIMER(ADD_TIMER(profile, "GetTablet"));
tablet = StorageEngine::instance()->tablet_manager()->get_tablet(
create_tablet_req.tablet_id);
}
DCHECK(tablet != nullptr);
TTabletInfo tablet_info;
tablet_info.tablet_id = tablet->table_id();
tablet_info.schema_hash = tablet->schema_hash();
tablet_info.version = create_tablet_req.version;
// Useless but it is a required field in TTabletInfo
tablet_info.version_hash = 0;
tablet_info.row_count = 0;
tablet_info.data_size = 0;
tablet_info.__set_path_hash(tablet->data_dir()->path_hash());
tablet_info.__set_replica_id(tablet->replica_id());
finish_tablet_infos.push_back(tablet_info);
LOG_INFO("successfully create tablet")
.tag("signature", agent_task_req.signature)
.tag("tablet_id", create_tablet_req.tablet_id);
}
TFinishTaskRequest finish_task_request;
finish_task_request.__set_finish_tablet_infos(finish_tablet_infos);
finish_task_request.__set_backend(BackendOptions::get_local_backend());
finish_task_request.__set_report_version(_s_report_version);
finish_task_request.__set_task_type(agent_task_req.task_type);
finish_task_request.__set_signature(agent_task_req.signature);
finish_task_request.__set_task_status(status.to_thrift());
_finish_task(finish_task_request);
_remove_task_info(agent_task_req.task_type, agent_task_req.signature);
}
}
DropTableTaskPool::DropTableTaskPool(ExecEnv* env, ThreadModel thread_model)
: TaskWorkerPool(TaskWorkerType::DROP_TABLE, env, *env->master_info(), thread_model) {
_worker_count = config::drop_tablet_worker_count;
_cb = [this]() { _drop_tablet_worker_thread_callback(); };
}
void DropTableTaskPool::_drop_tablet_worker_thread_callback() {
while (_is_work) {
TAgentTaskRequest agent_task_req;
{
std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock);
_worker_thread_condition_variable.wait(
worker_thread_lock, [this]() { return !_is_work || !_tasks.empty(); });
if (!_is_work) {
return;
}
agent_task_req = _tasks.front();
_tasks.pop_front();
}
const TDropTabletReq& drop_tablet_req = agent_task_req.drop_tablet_req;
Status status;
TabletSharedPtr dropped_tablet = StorageEngine::instance()->tablet_manager()->get_tablet(
drop_tablet_req.tablet_id, false);
if (dropped_tablet != nullptr) {
status = StorageEngine::instance()->tablet_manager()->drop_tablet(
drop_tablet_req.tablet_id, drop_tablet_req.replica_id,
drop_tablet_req.is_drop_table_or_partition);
} else {
status = Status::NotFound("could not find tablet {}", drop_tablet_req.tablet_id);
}
if (status.ok()) {
// if tablet is dropped by fe, then the related txn should also be removed
StorageEngine::instance()->txn_manager()->force_rollback_tablet_related_txns(
dropped_tablet->data_dir()->get_meta(), drop_tablet_req.tablet_id,
dropped_tablet->tablet_uid());
LOG_INFO("successfully drop tablet")
.tag("signature", agent_task_req.signature)
.tag("tablet_id", drop_tablet_req.tablet_id);
} else {
LOG_WARNING("failed to drop tablet")
.tag("signature", agent_task_req.signature)
.tag("tablet_id", drop_tablet_req.tablet_id)
.error(status);
}
TFinishTaskRequest finish_task_request;
finish_task_request.__set_backend(BackendOptions::get_local_backend());
finish_task_request.__set_task_type(agent_task_req.task_type);
finish_task_request.__set_signature(agent_task_req.signature);
finish_task_request.__set_task_status(status.to_thrift());
_finish_task(finish_task_request);
_remove_task_info(agent_task_req.task_type, agent_task_req.signature);
}
}
PushTaskPool::PushTaskPool(ExecEnv* env, ThreadModel thread_model, PushWokerType type)
: TaskWorkerPool(
type == PushWokerType::LOAD_V2 ? TaskWorkerType::PUSH : TaskWorkerType::DELETE,
env, *env->master_info(), thread_model),
_push_worker_type(type) {
if (_push_worker_type == PushWokerType::LOAD_V2) {
_worker_count =
config::push_worker_count_normal_priority + config::push_worker_count_high_priority;
} else {
_worker_count = config::delete_worker_count;
}
_cb = [this]() { _push_worker_thread_callback(); };
}
void PushTaskPool::_push_worker_thread_callback() {
// gen high priority worker thread
TPriority::type priority = TPriority::NORMAL;
int32_t push_worker_count_high_priority = config::push_worker_count_high_priority;
if (_push_worker_type == PushWokerType::LOAD_V2) {
static uint32_t s_worker_count = 0;
std::lock_guard<std::mutex> worker_thread_lock(_worker_thread_lock);
if (s_worker_count < push_worker_count_high_priority) {
++s_worker_count;
priority = TPriority::HIGH;
}
}
while (_is_work) {
TAgentTaskRequest agent_task_req;
{
std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock);
_worker_thread_condition_variable.wait(
worker_thread_lock, [this]() { return !_is_work || !_tasks.empty(); });
if (!_is_work) {
return;
}
if (priority == TPriority::HIGH) {
const auto it = std::find_if(
_tasks.cbegin(), _tasks.cend(), [](const TAgentTaskRequest& req) {
return req.__isset.priority && req.priority == TPriority::HIGH;
});
if (it == _tasks.cend()) {
// there is no high priority task. notify other thread to handle normal task
_worker_thread_condition_variable.notify_all();
sleep(1);
continue;
}
agent_task_req = *it;
_tasks.erase(it);
} else {
agent_task_req = _tasks.front();
_tasks.pop_front();
}
}
TPushReq& push_req = agent_task_req.push_req;
LOG(INFO) << "get push task. signature=" << agent_task_req.signature
<< ", priority=" << priority << " push_type=" << push_req.push_type;
std::vector<TTabletInfo> tablet_infos;
EngineBatchLoadTask engine_task(push_req, &tablet_infos);
auto status = StorageEngine::instance()->execute_task(&engine_task);
// Return result to fe
TFinishTaskRequest finish_task_request;
finish_task_request.__set_backend(BackendOptions::get_local_backend());
finish_task_request.__set_task_type(agent_task_req.task_type);
finish_task_request.__set_signature(agent_task_req.signature);
if (push_req.push_type == TPushType::DELETE) {
finish_task_request.__set_request_version(push_req.version);
}
if (status.ok()) {
LOG_INFO("successfully execute push task")
.tag("signature", agent_task_req.signature)
.tag("tablet_id", push_req.tablet_id)
.tag("push_type", push_req.push_type);
++_s_report_version;
finish_task_request.__set_finish_tablet_infos(tablet_infos);
} else {
LOG_WARNING("failed to execute push task")
.tag("signature", agent_task_req.signature)
.tag("tablet_id", push_req.tablet_id)
.tag("push_type", push_req.push_type)
.error(status);
}
finish_task_request.__set_task_status(status.to_thrift());
finish_task_request.__set_report_version(_s_report_version);
_finish_task(finish_task_request);
_remove_task_info(agent_task_req.task_type, agent_task_req.signature);
}
}
PublishVersionTaskPool::PublishVersionTaskPool(ExecEnv* env, ThreadModel thread_model)
: TaskWorkerPool(TaskWorkerType::PUBLISH_VERSION, env, *env->master_info(), thread_model) {
_worker_count = config::publish_version_worker_count;
_cb = [this]() { _publish_version_worker_thread_callback(); };
}
void PublishVersionTaskPool::_publish_version_worker_thread_callback() {
while (_is_work) {
TAgentTaskRequest agent_task_req;
{
std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock);
_worker_thread_condition_variable.wait(
worker_thread_lock, [this]() { return !_is_work || !_tasks.empty(); });
if (!_is_work) {
return;
}
agent_task_req = _tasks.front();
_tasks.pop_front();
}
const TPublishVersionRequest& publish_version_req = agent_task_req.publish_version_req;
DorisMetrics::instance()->publish_task_request_total->increment(1);
VLOG_NOTICE << "get publish version task. signature=" << agent_task_req.signature;
std::vector<TTabletId> error_tablet_ids;
std::vector<TTabletId> succ_tablet_ids;
// partition_id, tablet_id, publish_version
std::vector<std::tuple<int64_t, int64_t, int64_t>> discontinuous_version_tablets;
uint32_t retry_time = 0;
Status status;
bool is_task_timeout = false;
while (retry_time < PUBLISH_VERSION_MAX_RETRY) {
error_tablet_ids.clear();
EnginePublishVersionTask engine_task(publish_version_req, &error_tablet_ids,
&succ_tablet_ids, &discontinuous_version_tablets);
status = StorageEngine::instance()->execute_task(&engine_task);
if (status.ok()) {
break;
} else if (status.is<PUBLISH_VERSION_NOT_CONTINUOUS>()) {
int64_t time_elapsed = time(nullptr) - agent_task_req.recv_time;
if (time_elapsed > config::publish_version_task_timeout_s) {
LOG(INFO) << "task elapsed " << time_elapsed
<< " seconds since it is inserted to queue, it is timeout";
is_task_timeout = true;
} else {
// version not continuous, put to queue and wait pre version publish
// task execute
std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock);
_tasks.push_back(agent_task_req);
_worker_thread_condition_variable.notify_one();
}
LOG_EVERY_SECOND(INFO) << "wait for previous publish version task to be done, "
<< "transaction_id: " << publish_version_req.transaction_id;
break;
} else {
LOG_WARNING("failed to publish version")
.tag("transaction_id", publish_version_req.transaction_id)
.tag("error_tablets_num", error_tablet_ids.size())
.tag("retry_time", retry_time)
.error(status);
++retry_time;
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}
if (status.is<PUBLISH_VERSION_NOT_CONTINUOUS>() && !is_task_timeout) {
continue;
}
for (auto& item : discontinuous_version_tablets) {
StorageEngine::instance()->add_async_publish_task(
std::get<0>(item), std::get<1>(item), std::get<2>(item),
publish_version_req.transaction_id, false);
}
TFinishTaskRequest finish_task_request;
if (!status) {
DorisMetrics::instance()->publish_task_failed_total->increment(1);
// if publish failed, return failed, FE will ignore this error and
// check error tablet ids and FE will also republish this task
LOG_WARNING("failed to publish version")
.tag("signature", agent_task_req.signature)
.tag("transaction_id", publish_version_req.transaction_id)
.tag("error_tablets_num", error_tablet_ids.size())
.error(status);
finish_task_request.__set_error_tablet_ids(error_tablet_ids);
} else {
if (!config::disable_auto_compaction &&
!MemInfo::is_exceed_soft_mem_limit(GB_EXCHANGE_BYTE)) {
for (int i = 0; i < succ_tablet_ids.size(); i++) {
TabletSharedPtr tablet =
StorageEngine::instance()->tablet_manager()->get_tablet(
succ_tablet_ids[i]);
if (tablet != nullptr) {
tablet->publised_count++;
if (tablet->publised_count % 10 == 0) {
StorageEngine::instance()->submit_compaction_task(
tablet, CompactionType::CUMULATIVE_COMPACTION, true);
LOG(INFO) << "trigger compaction succ, tabletid:" << succ_tablet_ids[i]
<< ", publised:" << tablet->publised_count;
}
} else {
LOG(WARNING)
<< "trigger compaction failed, tabletid:" << succ_tablet_ids[i];
}
}
}
uint32_t cost_second = time(nullptr) - agent_task_req.recv_time;
g_publish_version_latency << cost_second;
LOG_INFO("successfully publish version")
.tag("signature", agent_task_req.signature)
.tag("transaction_id", publish_version_req.transaction_id)
.tag("tablets_num", succ_tablet_ids.size())
.tag("cost(s)", cost_second);
}
status.to_thrift(&finish_task_request.task_status);
finish_task_request.__set_backend(BackendOptions::get_local_backend());
finish_task_request.__set_task_type(agent_task_req.task_type);
finish_task_request.__set_signature(agent_task_req.signature);
finish_task_request.__set_report_version(_s_report_version);
finish_task_request.__set_error_tablet_ids(error_tablet_ids);
_finish_task(finish_task_request);
_remove_task_info(agent_task_req.task_type, agent_task_req.signature);
}
}
ClearTransactionTaskPool::ClearTransactionTaskPool(ExecEnv* env, ThreadModel thread_model)
: TaskWorkerPool(TaskWorkerType::CLEAR_TRANSACTION_TASK, env, *env->master_info(),
thread_model) {
_worker_count = config::clear_transaction_task_worker_count;
_cb = [this]() { _clear_transaction_task_worker_thread_callback(); };
}
void ClearTransactionTaskPool::_clear_transaction_task_worker_thread_callback() {
while (_is_work) {
TAgentTaskRequest agent_task_req;
{
std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock);
_worker_thread_condition_variable.wait(
worker_thread_lock, [this]() { return !_is_work || !_tasks.empty(); });
if (!_is_work) {
return;
}
agent_task_req = _tasks.front();
_tasks.pop_front();
}
const TClearTransactionTaskRequest& clear_transaction_task_req =
agent_task_req.clear_transaction_task_req;
LOG(INFO) << "get clear transaction task. signature=" << agent_task_req.signature
<< ", transaction_id=" << clear_transaction_task_req.transaction_id
<< ", partition_id_size=" << clear_transaction_task_req.partition_id.size();
Status status;
if (clear_transaction_task_req.transaction_id > 0) {
// transaction_id should be greater than zero.
// If it is not greater than zero, no need to execute
// the following clear_transaction_task() function.
if (!clear_transaction_task_req.partition_id.empty()) {
StorageEngine::instance()->clear_transaction_task(
clear_transaction_task_req.transaction_id,
clear_transaction_task_req.partition_id);
} else {
StorageEngine::instance()->clear_transaction_task(
clear_transaction_task_req.transaction_id);
}
LOG(INFO) << "finish to clear transaction task. signature=" << agent_task_req.signature
<< ", transaction_id=" << clear_transaction_task_req.transaction_id;
} else {
LOG(WARNING) << "invalid transaction id " << clear_transaction_task_req.transaction_id
<< ". signature= " << agent_task_req.signature;
}
TFinishTaskRequest finish_task_request;
finish_task_request.__set_task_status(status.to_thrift());
finish_task_request.__set_backend(BackendOptions::get_local_backend());
finish_task_request.__set_task_type(agent_task_req.task_type);
finish_task_request.__set_signature(agent_task_req.signature);
_finish_task(finish_task_request);
_remove_task_info(agent_task_req.task_type, agent_task_req.signature);
}
}
AlterTableTaskPool::AlterTableTaskPool(ExecEnv* env, ThreadModel thread_model)
: TaskWorkerPool(TaskWorkerType::ALTER_TABLE, env, *env->master_info(), thread_model) {
_worker_count = config::alter_tablet_worker_count;
_cb = [this]() { _alter_tablet_worker_thread_callback(); };
}
void AlterTableTaskPool::_alter_tablet_worker_thread_callback() {
while (_is_work) {
TAgentTaskRequest agent_task_req;
{
std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock);
_worker_thread_condition_variable.wait(
worker_thread_lock, [this]() { return !_is_work || !_tasks.empty(); });
if (!_is_work) {
return;
}
agent_task_req = _tasks.front();
_tasks.pop_front();
}
int64_t signature = agent_task_req.signature;
LOG(INFO) << "get alter table task, signature: " << signature;
bool is_task_timeout = false;
if (agent_task_req.__isset.recv_time) {
int64_t time_elapsed = time(nullptr) - agent_task_req.recv_time;
if (time_elapsed > config::report_task_interval_seconds * 20) {
LOG(INFO) << "task elapsed " << time_elapsed
<< " seconds since it is inserted to queue, it is timeout";
is_task_timeout = true;
}
}
if (!is_task_timeout) {
TFinishTaskRequest finish_task_request;
TTaskType::type task_type = agent_task_req.task_type;
switch (task_type) {
case TTaskType::ALTER:
_alter_tablet(agent_task_req, signature, task_type, &finish_task_request);
break;
default:
// pass
break;
}
_finish_task(finish_task_request);
}
_remove_task_info(agent_task_req.task_type, agent_task_req.signature);
}
}
void AlterTableTaskPool::_alter_tablet(const TAgentTaskRequest& agent_task_req, int64_t signature,
const TTaskType::type task_type,
TFinishTaskRequest* finish_task_request) {
Status status;
string process_name;
switch (task_type) {
case TTaskType::ALTER:
process_name = "alter tablet";
break;
default:
std::string task_name;
EnumToString(TTaskType, task_type, task_name);
LOG(WARNING) << "schema change type invalid. type: " << task_name
<< ", signature: " << signature;
status = Status::NotSupported("Schema change type invalid");
break;
}
// Check last schema change status, if failed delete tablet file
// Do not need to adjust delete success or not
// Because if delete failed create rollup will failed
TTabletId new_tablet_id = 0;
TSchemaHash new_schema_hash = 0;
if (status.ok()) {
new_tablet_id = agent_task_req.alter_tablet_req_v2.new_tablet_id;
new_schema_hash = agent_task_req.alter_tablet_req_v2.new_schema_hash;
EngineAlterTabletTask engine_task(agent_task_req.alter_tablet_req_v2);
status = StorageEngine::instance()->execute_task(&engine_task);
}
if (status.ok()) {
++_s_report_version;
}
// Return result to fe
finish_task_request->__set_backend(BackendOptions::get_local_backend());
finish_task_request->__set_report_version(_s_report_version);
finish_task_request->__set_task_type(task_type);
finish_task_request->__set_signature(signature);
std::vector<TTabletInfo> finish_tablet_infos;
if (status.ok()) {
TTabletInfo tablet_info;
status = _get_tablet_info(new_tablet_id, new_schema_hash, signature, &tablet_info);
if (status.ok()) {
finish_tablet_infos.push_back(tablet_info);
}
}
if (!status.ok() && !status.is<NOT_IMPLEMENTED_ERROR>()) {
LOG_WARNING("failed to {}", process_name)
.tag("signature", agent_task_req.signature)
.tag("base_tablet_id", agent_task_req.alter_tablet_req_v2.base_tablet_id)
.tag("new_tablet_id", new_tablet_id)
.error(status);
} else {
finish_task_request->__set_finish_tablet_infos(finish_tablet_infos);
LOG_INFO("successfully {}", process_name)
.tag("signature", agent_task_req.signature)
.tag("base_tablet_id", agent_task_req.alter_tablet_req_v2.base_tablet_id)
.tag("new_tablet_id", new_tablet_id);
}
finish_task_request->__set_task_status(status.to_thrift());
}
void TaskWorkerPool::_gc_binlog_worker_thread_callback() {
while (_is_work) {
TAgentTaskRequest agent_task_req;
{
std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock);
_worker_thread_condition_variable.wait(
worker_thread_lock, [this]() { return !_is_work || !_tasks.empty(); });
if (!_is_work) {
return;
}
agent_task_req = _tasks.front();
_tasks.pop_front();
}
std::unordered_map<int64_t, int64_t> gc_tablet_infos;
if (!agent_task_req.__isset.gc_binlog_req) {
LOG(WARNING) << "gc binlog task is not valid";
return;
}
if (!agent_task_req.gc_binlog_req.__isset.tablet_gc_binlog_infos) {
LOG(WARNING) << "gc binlog task tablet_gc_binlog_infos is not valid";
return;
}
auto& tablet_gc_binlog_infos = agent_task_req.gc_binlog_req.tablet_gc_binlog_infos;
for (auto& tablet_info : tablet_gc_binlog_infos) {
// gc_tablet_infos.emplace(tablet_info.tablet_id, tablet_info.schema_hash);
gc_tablet_infos.emplace(tablet_info.tablet_id, tablet_info.version);
}
StorageEngine::instance()->gc_binlogs(gc_tablet_infos);
}
}
CloneTaskPool::CloneTaskPool(ExecEnv* env, ThreadModel thread_model)
: TaskWorkerPool(TaskWorkerType::CLONE, env, *env->master_info(), thread_model) {
_worker_count = config::clone_worker_count;
_cb = [this]() { _clone_worker_thread_callback(); };
}
void CloneTaskPool::_clone_worker_thread_callback() {
while (_is_work) {
TAgentTaskRequest agent_task_req;
{
std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock);
_worker_thread_condition_variable.wait(
worker_thread_lock, [this]() { return !_is_work || !_tasks.empty(); });
if (!_is_work) {
return;
}
agent_task_req = _tasks.front();
_tasks.pop_front();
}
const TCloneReq& clone_req = agent_task_req.clone_req;
DorisMetrics::instance()->clone_requests_total->increment(1);
LOG(INFO) << "get clone task. signature=" << agent_task_req.signature;
std::vector<TTabletInfo> tablet_infos;
EngineCloneTask engine_task(clone_req, _master_info, agent_task_req.signature,
&tablet_infos);
auto status = StorageEngine::instance()->execute_task(&engine_task);
// Return result to fe
TFinishTaskRequest finish_task_request;
finish_task_request.__set_backend(BackendOptions::get_local_backend());
finish_task_request.__set_task_type(agent_task_req.task_type);
finish_task_request.__set_signature(agent_task_req.signature);
finish_task_request.__set_task_status(status.to_thrift());
if (!status.ok()) {
DorisMetrics::instance()->clone_requests_failed->increment(1);
LOG_WARNING("failed to clone tablet")
.tag("signature", agent_task_req.signature)
.tag("tablet_id", clone_req.tablet_id)
.error(status);
} else {
LOG_INFO("successfully clone tablet")
.tag("signature", agent_task_req.signature)
.tag("tablet_id", clone_req.tablet_id);
finish_task_request.__set_finish_tablet_infos(tablet_infos);
}
_finish_task(finish_task_request);
_remove_task_info(agent_task_req.task_type, agent_task_req.signature);
}
}
StorageMediumMigrateTaskPool::StorageMediumMigrateTaskPool(ExecEnv* env, ThreadModel thread_model)
: TaskWorkerPool(TaskWorkerType::STORAGE_MEDIUM_MIGRATE, env, *env->master_info(),
thread_model) {
_worker_count = config::storage_medium_migrate_count;
_cb = [this]() { _storage_medium_migrate_worker_thread_callback(); };
}
void StorageMediumMigrateTaskPool::_storage_medium_migrate_worker_thread_callback() {
while (_is_work) {
TAgentTaskRequest agent_task_req;
{
std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock);
_worker_thread_condition_variable.wait(
worker_thread_lock, [this]() { return !_is_work || !_tasks.empty(); });
if (!_is_work) {
return;
}
agent_task_req = _tasks.front();
_tasks.pop_front();
}
const TStorageMediumMigrateReq& storage_medium_migrate_req =
agent_task_req.storage_medium_migrate_req;
// check request and get info
TabletSharedPtr tablet;
DataDir* dest_store = nullptr;
auto status = _check_migrate_request(storage_medium_migrate_req, tablet, &dest_store);
if (status.ok()) {
EngineStorageMigrationTask engine_task(tablet, dest_store);
status = StorageEngine::instance()->execute_task(&engine_task);
}
if (!status.ok()) {
LOG_WARNING("failed to migrate storage medium")
.tag("signature", agent_task_req.signature)
.tag("tablet_id", storage_medium_migrate_req.tablet_id)
.error(status);
} else {
LOG_INFO("successfully migrate storage medium")
.tag("signature", agent_task_req.signature)
.tag("tablet_id", storage_medium_migrate_req.tablet_id);
}
TFinishTaskRequest finish_task_request;
finish_task_request.__set_backend(BackendOptions::get_local_backend());
finish_task_request.__set_task_type(agent_task_req.task_type);
finish_task_request.__set_signature(agent_task_req.signature);
finish_task_request.__set_task_status(status.to_thrift());
_finish_task(finish_task_request);
_remove_task_info(agent_task_req.task_type, agent_task_req.signature);
}
}
Status StorageMediumMigrateTaskPool::_check_migrate_request(const TStorageMediumMigrateReq& req,
TabletSharedPtr& tablet,
DataDir** dest_store) {
int64_t tablet_id = req.tablet_id;
tablet = StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id);
if (tablet == nullptr) {
return Status::InternalError("could not find tablet {}", tablet_id);
}
if (req.__isset.data_dir) {
// request specify the data dir
*dest_store = StorageEngine::instance()->get_store(req.data_dir);
if (*dest_store == nullptr) {
return Status::InternalError("could not find data dir {}", req.data_dir);
}
} else {
// this is a storage medium
// get data dir by storage medium
// judge case when no need to migrate
uint32_t count = StorageEngine::instance()->available_storage_medium_type_count();
if (count <= 1) {
return Status::InternalError("available storage medium type count is less than 1");
}
// check current tablet storage medium
TStorageMedium::type storage_medium = req.storage_medium;
TStorageMedium::type src_storage_medium = tablet->data_dir()->storage_medium();
if (src_storage_medium == storage_medium) {
return Status::InternalError("tablet is already on specified storage medium {}",
storage_medium);
}
// get a random store of specified storage medium
auto stores = StorageEngine::instance()->get_stores_for_create_tablet(storage_medium);
if (stores.empty()) {
return Status::InternalError("failed to get root path for create tablet");
}
*dest_store = stores[0];
}
if (tablet->data_dir()->path() == (*dest_store)->path()) {
return Status::InternalError("tablet is already on specified path {}",
tablet->data_dir()->path());
}
// check local disk capacity
int64_t tablet_size = tablet->tablet_local_size();
if ((*dest_store)->reach_capacity_limit(tablet_size)) {
return Status::InternalError("reach the capacity limit of path {}, tablet_size={}",
(*dest_store)->path(), tablet_size);
}
return Status::OK();
}
} // namespace doris