// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. #include "agent/task_worker_pool.h" #include #include #include #include #include #include #include #include #include // IWYU pragma: no_include #include #include #include // IWYU pragma: keep #include #include #include #include #include #include #include #include #include #include #include #include "agent/utils.h" #include "common/config.h" #include "common/logging.h" #include "common/status.h" #include "io/fs/file_system.h" #include "io/fs/hdfs_file_system.h" #include "io/fs/local_file_system.h" #include "io/fs/path.h" #include "io/fs/s3_file_system.h" #include "olap/cumulative_compaction_time_series_policy.h" #include "olap/data_dir.h" #include "olap/olap_common.h" #include "olap/rowset/rowset_meta.h" #include "olap/snapshot_manager.h" #include "olap/storage_engine.h" #include "olap/storage_policy.h" #include "olap/tablet.h" #include "olap/tablet_manager.h" #include "olap/tablet_meta.h" #include "olap/tablet_schema.h" #include "olap/task/engine_alter_tablet_task.h" #include "olap/task/engine_batch_load_task.h" #include "olap/task/engine_checksum_task.h" #include "olap/task/engine_clone_task.h" #include "olap/task/engine_index_change_task.h" #include "olap/task/engine_publish_version_task.h" #include "olap/task/engine_storage_migration_task.h" #include "olap/txn_manager.h" #include "olap/utils.h" #include "runtime/exec_env.h" #include "runtime/memory/global_memory_arbitrator.h" #include "runtime/snapshot_loader.h" #include "service/backend_options.h" #include "util/debug_points.h" #include "util/doris_metrics.h" #include "util/mem_info.h" #include "util/random.h" #include "util/s3_util.h" #include "util/scoped_cleanup.h" #include "util/stopwatch.hpp" #include "util/threadpool.h" #include "util/time.h" #include "util/trace.h" namespace doris { using namespace ErrorCode; namespace { std::mutex s_task_signatures_mtx; std::unordered_map> s_task_signatures; std::atomic_ulong s_report_version(time(nullptr) * 10000); void increase_report_version() { s_report_version.fetch_add(1, std::memory_order_relaxed); } // FIXME(plat1ko): Paired register and remove task info bool register_task_info(const TTaskType::type task_type, int64_t signature) { if (task_type == TTaskType::type::PUSH_STORAGE_POLICY || task_type == TTaskType::type::PUSH_COOLDOWN_CONF || task_type == TTaskType::type::COMPACTION) { // no need to report task of these types return true; } if (signature == -1) { // No need to report task with unintialized signature return true; } std::lock_guard lock(s_task_signatures_mtx); auto& set = s_task_signatures[task_type]; return set.insert(signature).second; } void remove_task_info(const TTaskType::type task_type, int64_t signature) { size_t queue_size; { std::lock_guard lock(s_task_signatures_mtx); auto& set = s_task_signatures[task_type]; set.erase(signature); queue_size = set.size(); } VLOG_NOTICE << "remove task info. type=" << task_type << ", signature=" << signature << ", queue_size=" << queue_size; } void finish_task(const TFinishTaskRequest& finish_task_request) { // Return result to FE TMasterResult result; uint32_t try_time = 0; constexpr int TASK_FINISH_MAX_RETRY = 3; while (try_time < TASK_FINISH_MAX_RETRY) { DorisMetrics::instance()->finish_task_requests_total->increment(1); Status client_status = MasterServerClient::instance()->finish_task(finish_task_request, &result); if (client_status.ok()) { break; } else { DorisMetrics::instance()->finish_task_requests_failed->increment(1); LOG_WARNING("failed to finish task") .tag("type", finish_task_request.task_type) .tag("signature", finish_task_request.signature) .error(result.status); try_time += 1; } sleep(1); } } Status get_tablet_info(StorageEngine& engine, const TTabletId tablet_id, const TSchemaHash schema_hash, TTabletInfo* tablet_info) { tablet_info->__set_tablet_id(tablet_id); tablet_info->__set_schema_hash(schema_hash); return engine.tablet_manager()->report_tablet_info(tablet_info); } void random_sleep(int second) { Random rnd(UnixMillis()); sleep(rnd.Uniform(second) + 1); } void alter_tablet(StorageEngine& engine, const TAgentTaskRequest& agent_task_req, int64_t signature, const TTaskType::type task_type, TFinishTaskRequest* finish_task_request) { Status status; std::string_view process_name; switch (task_type) { case TTaskType::ALTER: process_name = "alter tablet"; break; default: std::string task_name; EnumToString(TTaskType, task_type, task_name); LOG(WARNING) << "schema change type invalid. type: " << task_name << ", signature: " << signature; status = Status::NotSupported("Schema change type invalid"); break; } // Check last schema change status, if failed delete tablet file // Do not need to adjust delete success or not // Because if delete failed create rollup will failed TTabletId new_tablet_id = 0; TSchemaHash new_schema_hash = 0; if (status.ok()) { new_tablet_id = agent_task_req.alter_tablet_req_v2.new_tablet_id; new_schema_hash = agent_task_req.alter_tablet_req_v2.new_schema_hash; EngineAlterTabletTask engine_task(agent_task_req.alter_tablet_req_v2); SCOPED_ATTACH_TASK(engine_task.mem_tracker()); status = engine_task.execute(); } if (status.ok()) { increase_report_version(); } // Return result to fe finish_task_request->__set_backend(BackendOptions::get_local_backend()); finish_task_request->__set_report_version(s_report_version); finish_task_request->__set_task_type(task_type); finish_task_request->__set_signature(signature); std::vector finish_tablet_infos; if (status.ok()) { TTabletInfo tablet_info; status = get_tablet_info(engine, new_tablet_id, new_schema_hash, &tablet_info); if (status.ok()) { finish_tablet_infos.push_back(tablet_info); } } if (!status.ok() && !status.is()) { LOG_WARNING("failed to {}", process_name) .tag("signature", agent_task_req.signature) .tag("base_tablet_id", agent_task_req.alter_tablet_req_v2.base_tablet_id) .tag("new_tablet_id", new_tablet_id) .error(status); } else { finish_task_request->__set_finish_tablet_infos(finish_tablet_infos); LOG_INFO("successfully {}", process_name) .tag("signature", agent_task_req.signature) .tag("base_tablet_id", agent_task_req.alter_tablet_req_v2.base_tablet_id) .tag("new_tablet_id", new_tablet_id); } finish_task_request->__set_task_status(status.to_thrift()); } Status check_migrate_request(StorageEngine& engine, const TStorageMediumMigrateReq& req, TabletSharedPtr& tablet, DataDir** dest_store) { int64_t tablet_id = req.tablet_id; tablet = engine.tablet_manager()->get_tablet(tablet_id); if (tablet == nullptr) { return Status::InternalError("could not find tablet {}", tablet_id); } if (req.__isset.data_dir) { // request specify the data dir *dest_store = engine.get_store(req.data_dir); if (*dest_store == nullptr) { return Status::InternalError("could not find data dir {}", req.data_dir); } } else { // this is a storage medium // get data dir by storage medium // judge case when no need to migrate uint32_t count = engine.available_storage_medium_type_count(); if (count <= 1) { return Status::InternalError("available storage medium type count is less than 1"); } // check current tablet storage medium TStorageMedium::type storage_medium = req.storage_medium; TStorageMedium::type src_storage_medium = tablet->data_dir()->storage_medium(); if (src_storage_medium == storage_medium) { return Status::InternalError("tablet is already on specified storage medium {}", storage_medium); } // get a random store of specified storage medium auto stores = engine.get_stores_for_create_tablet(tablet->partition_id(), storage_medium); if (stores.empty()) { return Status::InternalError("failed to get root path for create tablet"); } *dest_store = stores[0]; } if (tablet->data_dir()->path() == (*dest_store)->path()) { LOG_WARNING("tablet is already on specified path").tag("path", tablet->data_dir()->path()); return Status::Error("tablet is already on specified path: {}", tablet->data_dir()->path()); } // check local disk capacity int64_t tablet_size = tablet->tablet_local_size(); if ((*dest_store)->reach_capacity_limit(tablet_size)) { return Status::Error("reach the capacity limit of path {}, tablet_size={}", (*dest_store)->path(), tablet_size); } return Status::OK(); } // Return `true` if report success bool handle_report(const TReportRequest& request, const TMasterInfo& master_info, std::string_view name) { TMasterResult result; Status status = MasterServerClient::instance()->report(request, &result); if (!status.ok()) [[unlikely]] { LOG_WARNING("failed to report {}", name) .tag("host", master_info.network_address.hostname) .tag("port", master_info.network_address.port) .error(status); return false; } else if (result.status.status_code != TStatusCode::OK) [[unlikely]] { LOG_WARNING("failed to report {}", name) .tag("host", master_info.network_address.hostname) .tag("port", master_info.network_address.port) .error(result.status); return false; } return true; } void _submit_task(const TAgentTaskRequest& task, std::function submit_op) { const TTaskType::type task_type = task.task_type; int64_t signature = task.signature; std::string type_str; EnumToString(TTaskType, task_type, type_str); VLOG_CRITICAL << "submitting task. type=" << type_str << ", signature=" << signature; if (!register_task_info(task_type, signature)) { LOG_WARNING("failed to register task").tag("type", type_str).tag("signature", signature); return; } // Set the receiving time of task so that we can determine whether it is timed out later (const_cast(task)).__set_recv_time(time(nullptr)); auto st = submit_op(task); if (!st.ok()) [[unlikely]] { LOG_INFO("failed to submit task").tag("type", type_str).tag("signature", signature); return; } LOG_INFO("successfully submit task").tag("type", type_str).tag("signature", signature); } bvar::LatencyRecorder g_publish_version_latency("doris_pk", "publish_version"); bvar::Adder ALTER_INVERTED_INDEX_count("task", "ALTER_INVERTED_INDEX"); bvar::Adder CHECK_CONSISTENCY_count("task", "CHECK_CONSISTENCY"); bvar::Adder UPLOAD_count("task", "UPLOAD"); bvar::Adder DOWNLOAD_count("task", "DOWNLOAD"); bvar::Adder MAKE_SNAPSHOT_count("task", "MAKE_SNAPSHOT"); bvar::Adder RELEASE_SNAPSHOT_count("task", "RELEASE_SNAPSHOT"); bvar::Adder MOVE_count("task", "MOVE"); bvar::Adder COMPACTION_count("task", "COMPACTION"); bvar::Adder PUSH_STORAGE_POLICY_count("task", "PUSH_STORAGE_POLICY"); bvar::Adder PUSH_COOLDOWN_CONF_count("task", "PUSH_COOLDOWN_CONF"); bvar::Adder CREATE_count("task", "CREATE_TABLE"); bvar::Adder DROP_count("task", "DROP_TABLE"); bvar::Adder PUBLISH_VERSION_count("task", "PUBLISH_VERSION"); bvar::Adder CLEAR_TRANSACTION_TASK_count("task", "CLEAR_TRANSACTION_TASK"); bvar::Adder DELETE_count("task", "DELETE"); bvar::Adder PUSH_count("task", "PUSH"); bvar::Adder UPDATE_TABLET_META_INFO_count("task", "UPDATE_TABLET_META_INFO"); bvar::Adder ALTER_count("task", "ALTER_TABLE"); bvar::Adder CLONE_count("task", "CLONE"); bvar::Adder STORAGE_MEDIUM_MIGRATE_count("task", "STORAGE_MEDIUM_MIGRATE"); bvar::Adder GC_BINLOG_count("task", "GC_BINLOG"); bvar::Adder UPDATE_VISIBLE_VERSION_count("task", "UPDATE_VISIBLE_VERSION"); void add_task_count(const TAgentTaskRequest& task, int n) { // clang-format off switch (task.task_type) { #define ADD_TASK_COUNT(type) \ case TTaskType::type: \ type##_count << n; \ return; ADD_TASK_COUNT(ALTER_INVERTED_INDEX) ADD_TASK_COUNT(CHECK_CONSISTENCY) ADD_TASK_COUNT(UPLOAD) ADD_TASK_COUNT(DOWNLOAD) ADD_TASK_COUNT(MAKE_SNAPSHOT) ADD_TASK_COUNT(RELEASE_SNAPSHOT) ADD_TASK_COUNT(MOVE) ADD_TASK_COUNT(COMPACTION) ADD_TASK_COUNT(PUSH_STORAGE_POLICY) ADD_TASK_COUNT(PUSH_COOLDOWN_CONF) ADD_TASK_COUNT(CREATE) ADD_TASK_COUNT(DROP) ADD_TASK_COUNT(PUBLISH_VERSION) ADD_TASK_COUNT(CLEAR_TRANSACTION_TASK) ADD_TASK_COUNT(UPDATE_TABLET_META_INFO) ADD_TASK_COUNT(ALTER) ADD_TASK_COUNT(CLONE) ADD_TASK_COUNT(STORAGE_MEDIUM_MIGRATE) ADD_TASK_COUNT(GC_BINLOG) ADD_TASK_COUNT(UPDATE_VISIBLE_VERSION) #undef ADD_TASK_COUNT case TTaskType::REALTIME_PUSH: case TTaskType::PUSH: if (task.push_req.push_type == TPushType::LOAD_V2) { PUSH_count << n; } else if (task.push_req.push_type == TPushType::DELETE) { DELETE_count << n; } return; default: return; } // clang-format on } bvar::Adder report_task_total("report", "task_total"); bvar::Adder report_task_failed("report", "task_failed"); bvar::Adder report_disk_total("report", "disk_total"); bvar::Adder report_disk_failed("report", "disk_failed"); bvar::Adder report_tablet_total("report", "tablet_total"); bvar::Adder report_tablet_failed("report", "tablet_failed"); } // namespace TaskWorkerPool::TaskWorkerPool(std::string_view name, int worker_count, std::function callback) : _callback(std::move(callback)) { auto st = ThreadPoolBuilder(fmt::format("TaskWP_{}", name)) .set_min_threads(worker_count) .set_max_threads(worker_count) .build(&_thread_pool); CHECK(st.ok()) << name << ": " << st; } TaskWorkerPool::~TaskWorkerPool() { stop(); } void TaskWorkerPool::stop() { if (_stopped.exchange(true)) { return; } if (_thread_pool) { _thread_pool->shutdown(); } } void TaskWorkerPool::submit_task(const TAgentTaskRequest& task) { _submit_task(task, [this](auto&& task) { add_task_count(task, 1); return _thread_pool->submit_func([this, task]() { _callback(task); add_task_count(task, -1); }); }); } PriorTaskWorkerPool::PriorTaskWorkerPool( std::string_view name, int normal_worker_count, int high_prior_worker_conut, std::function callback) : _callback(std::move(callback)) { auto st = ThreadPoolBuilder(fmt::format("TaskWP_.{}", name)) .set_min_threads(normal_worker_count) .set_max_threads(normal_worker_count) .build(&_normal_pool); CHECK(st.ok()) << name << ": " << st; st = _normal_pool->submit_func([this] { normal_loop(); }); CHECK(st.ok()) << name << ": " << st; st = ThreadPoolBuilder(fmt::format("HighPriorPool.{}", name)) .set_min_threads(high_prior_worker_conut) .set_max_threads(high_prior_worker_conut) .build(&_high_prior_pool); CHECK(st.ok()) << name << ": " << st; st = _high_prior_pool->submit_func([this] { high_prior_loop(); }); CHECK(st.ok()) << name << ": " << st; } PriorTaskWorkerPool::~PriorTaskWorkerPool() { stop(); } void PriorTaskWorkerPool::stop() { { std::lock_guard lock(_mtx); if (_stopped) { return; } _stopped = true; } _normal_condv.notify_all(); _high_prior_condv.notify_all(); if (_normal_pool) { _normal_pool->shutdown(); } if (_high_prior_pool) { _high_prior_pool->shutdown(); } } void PriorTaskWorkerPool::submit_task(const TAgentTaskRequest& task) { _submit_task(task, [this](auto&& task) { auto req = std::make_unique(task); add_task_count(*req, 1); if (req->__isset.priority && req->priority == TPriority::HIGH) { std::lock_guard lock(_mtx); _high_prior_queue.push_back(std::move(req)); _high_prior_condv.notify_one(); _normal_condv.notify_one(); } else { std::lock_guard lock(_mtx); _normal_queue.push_back(std::move(req)); _normal_condv.notify_one(); } return Status::OK(); }); } void PriorTaskWorkerPool::normal_loop() { while (true) { std::unique_ptr req; { std::unique_lock lock(_mtx); _normal_condv.wait(lock, [&] { return !_normal_queue.empty() || !_high_prior_queue.empty() || _stopped; }); if (_stopped) { return; } if (!_high_prior_queue.empty()) { req = std::move(_high_prior_queue.front()); _high_prior_queue.pop_front(); } else if (!_normal_queue.empty()) { req = std::move(_normal_queue.front()); _normal_queue.pop_front(); } else { continue; } } _callback(*req); add_task_count(*req, -1); } } void PriorTaskWorkerPool::high_prior_loop() { while (true) { std::unique_ptr req; { std::unique_lock lock(_mtx); _high_prior_condv.wait(lock, [&] { return !_high_prior_queue.empty() || _stopped; }); if (_stopped) { return; } if (_high_prior_queue.empty()) { continue; } req = std::move(_high_prior_queue.front()); _high_prior_queue.pop_front(); } _callback(*req); add_task_count(*req, -1); } } ReportWorker::ReportWorker(std::string name, const TMasterInfo& master_info, int report_interval_s, std::function callback) : _name(std::move(name)) { auto report_loop = [this, &master_info, report_interval_s, callback = std::move(callback)] { StorageEngine::instance()->register_report_listener(this); while (true) { { std::unique_lock lock(_mtx); _condv.wait_for(lock, std::chrono::seconds(report_interval_s), [&] { return _stopped || _signal; }); if (_stopped) { break; } if (_signal) { // Consume received signal _signal = false; } } if (master_info.network_address.port == 0) { // port == 0 means not received heartbeat yet LOG(INFO) << "waiting to receive first heartbeat from frontend before doing report"; continue; } callback(); } StorageEngine::instance()->deregister_report_listener(this); }; auto st = Thread::create("ReportWorker", _name, report_loop, &_thread); CHECK(st.ok()) << _name << ": " << st; } ReportWorker::~ReportWorker() { stop(); } void ReportWorker::notify() { { std::lock_guard lock(_mtx); _signal = true; } _condv.notify_all(); } void ReportWorker::stop() { { std::lock_guard lock(_mtx); if (_stopped) { return; } _stopped = true; } _condv.notify_all(); if (_thread) { _thread->join(); } } void alter_inverted_index_callback(StorageEngine& engine, const TAgentTaskRequest& req) { const auto& alter_inverted_index_rq = req.alter_inverted_index_req; LOG(INFO) << "get alter inverted index task. signature=" << req.signature << ", tablet_id=" << alter_inverted_index_rq.tablet_id << ", job_id=" << alter_inverted_index_rq.job_id; Status status = Status::OK(); auto tablet_ptr = engine.tablet_manager()->get_tablet(alter_inverted_index_rq.tablet_id); if (tablet_ptr != nullptr) { EngineIndexChangeTask engine_task(alter_inverted_index_rq); SCOPED_ATTACH_TASK(engine_task.mem_tracker()); status = engine_task.execute(); } else { status = Status::NotFound("could not find tablet {}", alter_inverted_index_rq.tablet_id); } // Return result to fe TFinishTaskRequest finish_task_request; finish_task_request.__set_backend(BackendOptions::get_local_backend()); finish_task_request.__set_task_type(req.task_type); finish_task_request.__set_signature(req.signature); std::vector finish_tablet_infos; if (!status.ok()) { LOG(WARNING) << "failed to alter inverted index task, signature=" << req.signature << ", tablet_id=" << alter_inverted_index_rq.tablet_id << ", job_id=" << alter_inverted_index_rq.job_id << ", error=" << status; } else { LOG(INFO) << "successfully alter inverted index task, signature=" << req.signature << ", tablet_id=" << alter_inverted_index_rq.tablet_id << ", job_id=" << alter_inverted_index_rq.job_id; TTabletInfo tablet_info; status = get_tablet_info(engine, alter_inverted_index_rq.tablet_id, alter_inverted_index_rq.schema_hash, &tablet_info); if (status.ok()) { finish_tablet_infos.push_back(tablet_info); } finish_task_request.__set_finish_tablet_infos(finish_tablet_infos); } finish_task_request.__set_task_status(status.to_thrift()); finish_task(finish_task_request); remove_task_info(req.task_type, req.signature); } void update_tablet_meta_callback(StorageEngine& engine, const TAgentTaskRequest& req) { LOG(INFO) << "get update tablet meta task. signature=" << req.signature; Status status; const auto& update_tablet_meta_req = req.update_tablet_meta_info_req; for (const auto& tablet_meta_info : update_tablet_meta_req.tabletMetaInfos) { auto tablet = engine.tablet_manager()->get_tablet(tablet_meta_info.tablet_id); if (tablet == nullptr) { status = Status::NotFound("tablet not found"); LOG(WARNING) << "could not find tablet when update tablet meta. tablet_id=" << tablet_meta_info.tablet_id; continue; } bool need_to_save = false; if (tablet_meta_info.__isset.partition_id) { // for fix partition_id = 0 LOG(WARNING) << "change be tablet id: " << tablet->tablet_meta()->tablet_id() << "partition id from : " << tablet->tablet_meta()->partition_id() << " to : " << tablet_meta_info.partition_id; auto succ = engine.tablet_manager()->update_tablet_partition_id( tablet_meta_info.partition_id, tablet->tablet_meta()->tablet_id()); if (!succ) { std::string err_msg = fmt::format( "change be tablet id : {} partition_id : {} failed", tablet->tablet_meta()->tablet_id(), tablet_meta_info.partition_id); LOG(WARNING) << err_msg; status = Status::InvalidArgument(err_msg); continue; } need_to_save = true; } if (tablet_meta_info.__isset.storage_policy_id) { tablet->tablet_meta()->set_storage_policy_id(tablet_meta_info.storage_policy_id); need_to_save = true; } if (tablet_meta_info.__isset.is_in_memory) { tablet->tablet_meta()->mutable_tablet_schema()->set_is_in_memory( tablet_meta_info.is_in_memory); std::shared_lock rlock(tablet->get_header_lock()); for (auto& rowset_meta : tablet->tablet_meta()->all_mutable_rs_metas()) { rowset_meta->tablet_schema()->set_is_in_memory(tablet_meta_info.is_in_memory); } tablet->tablet_schema_unlocked()->set_is_in_memory(tablet_meta_info.is_in_memory); need_to_save = true; } if (tablet_meta_info.__isset.compaction_policy) { if (tablet_meta_info.compaction_policy != CUMULATIVE_SIZE_BASED_POLICY && tablet_meta_info.compaction_policy != CUMULATIVE_TIME_SERIES_POLICY) { status = Status::InvalidArgument( "invalid compaction policy, only support for size_based or " "time_series"); continue; } tablet->tablet_meta()->set_compaction_policy(tablet_meta_info.compaction_policy); need_to_save = true; } if (tablet_meta_info.__isset.time_series_compaction_goal_size_mbytes) { if (tablet->tablet_meta()->compaction_policy() != CUMULATIVE_TIME_SERIES_POLICY) { status = Status::InvalidArgument( "only time series compaction policy support time series config"); continue; } tablet->tablet_meta()->set_time_series_compaction_goal_size_mbytes( tablet_meta_info.time_series_compaction_goal_size_mbytes); need_to_save = true; } if (tablet_meta_info.__isset.time_series_compaction_file_count_threshold) { if (tablet->tablet_meta()->compaction_policy() != CUMULATIVE_TIME_SERIES_POLICY) { status = Status::InvalidArgument( "only time series compaction policy support time series config"); continue; } tablet->tablet_meta()->set_time_series_compaction_file_count_threshold( tablet_meta_info.time_series_compaction_file_count_threshold); need_to_save = true; } if (tablet_meta_info.__isset.time_series_compaction_time_threshold_seconds) { if (tablet->tablet_meta()->compaction_policy() != CUMULATIVE_TIME_SERIES_POLICY) { status = Status::InvalidArgument( "only time series compaction policy support time series config"); continue; } tablet->tablet_meta()->set_time_series_compaction_time_threshold_seconds( tablet_meta_info.time_series_compaction_time_threshold_seconds); need_to_save = true; } if (tablet_meta_info.__isset.time_series_compaction_empty_rowsets_threshold) { if (tablet->tablet_meta()->compaction_policy() != CUMULATIVE_TIME_SERIES_POLICY) { status = Status::InvalidArgument( "only time series compaction policy support time series config"); continue; } tablet->tablet_meta()->set_time_series_compaction_empty_rowsets_threshold( tablet_meta_info.time_series_compaction_empty_rowsets_threshold); need_to_save = true; } if (tablet_meta_info.__isset.time_series_compaction_level_threshold) { if (tablet->tablet_meta()->compaction_policy() != CUMULATIVE_TIME_SERIES_POLICY) { status = Status::InvalidArgument( "only time series compaction policy support time series config"); continue; } tablet->tablet_meta()->set_time_series_compaction_level_threshold( tablet_meta_info.time_series_compaction_level_threshold); need_to_save = true; } if (tablet_meta_info.__isset.replica_id) { tablet->tablet_meta()->set_replica_id(tablet_meta_info.replica_id); } if (tablet_meta_info.__isset.binlog_config) { // check binlog_config require fields: enable, ttl_seconds, max_bytes, max_history_nums const auto& t_binlog_config = tablet_meta_info.binlog_config; if (!t_binlog_config.__isset.enable || !t_binlog_config.__isset.ttl_seconds || !t_binlog_config.__isset.max_bytes || !t_binlog_config.__isset.max_history_nums) { status = Status::InvalidArgument("invalid binlog config, some fields not set"); LOG(WARNING) << fmt::format( "invalid binlog config, some fields not set, tablet_id={}, " "t_binlog_config={}", tablet_meta_info.tablet_id, apache::thrift::ThriftDebugString(t_binlog_config)); continue; } BinlogConfig new_binlog_config; new_binlog_config = tablet_meta_info.binlog_config; LOG(INFO) << fmt::format( "update tablet meta binlog config. tablet_id={}, old_binlog_config={}, " "new_binlog_config={}", tablet_meta_info.tablet_id, tablet->tablet_meta()->binlog_config().to_string(), new_binlog_config.to_string()); tablet->set_binlog_config(new_binlog_config); need_to_save = true; } if (tablet_meta_info.__isset.enable_single_replica_compaction) { std::shared_lock rlock(tablet->get_header_lock()); tablet->tablet_meta()->mutable_tablet_schema()->set_enable_single_replica_compaction( tablet_meta_info.enable_single_replica_compaction); for (auto& rowset_meta : tablet->tablet_meta()->all_mutable_rs_metas()) { rowset_meta->tablet_schema()->set_enable_single_replica_compaction( tablet_meta_info.enable_single_replica_compaction); } tablet->tablet_schema_unlocked()->set_enable_single_replica_compaction( tablet_meta_info.enable_single_replica_compaction); need_to_save = true; } if (tablet_meta_info.__isset.disable_auto_compaction) { std::shared_lock rlock(tablet->get_header_lock()); tablet->tablet_meta()->mutable_tablet_schema()->set_disable_auto_compaction( tablet_meta_info.disable_auto_compaction); for (auto& rowset_meta : tablet->tablet_meta()->all_mutable_rs_metas()) { rowset_meta->tablet_schema()->set_disable_auto_compaction( tablet_meta_info.disable_auto_compaction); } tablet->tablet_schema_unlocked()->set_disable_auto_compaction( tablet_meta_info.disable_auto_compaction); need_to_save = true; } if (tablet_meta_info.__isset.skip_write_index_on_load) { std::shared_lock rlock(tablet->get_header_lock()); tablet->tablet_meta()->mutable_tablet_schema()->set_skip_write_index_on_load( tablet_meta_info.skip_write_index_on_load); for (auto& rowset_meta : tablet->tablet_meta()->all_mutable_rs_metas()) { rowset_meta->tablet_schema()->set_skip_write_index_on_load( tablet_meta_info.skip_write_index_on_load); } tablet->tablet_schema_unlocked()->set_skip_write_index_on_load( tablet_meta_info.skip_write_index_on_load); need_to_save = true; } if (need_to_save) { std::shared_lock rlock(tablet->get_header_lock()); tablet->save_meta(); } } LOG(INFO) << "finish update tablet meta task. signature=" << req.signature; if (req.signature != -1) { TFinishTaskRequest finish_task_request; finish_task_request.__set_task_status(status.to_thrift()); finish_task_request.__set_backend(BackendOptions::get_local_backend()); finish_task_request.__set_task_type(req.task_type); finish_task_request.__set_signature(req.signature); finish_task(finish_task_request); remove_task_info(req.task_type, req.signature); } } void check_consistency_callback(StorageEngine& engine, const TAgentTaskRequest& req) { uint32_t checksum = 0; const auto& check_consistency_req = req.check_consistency_req; EngineChecksumTask engine_task(check_consistency_req.tablet_id, check_consistency_req.schema_hash, check_consistency_req.version, &checksum); SCOPED_ATTACH_TASK(engine_task.mem_tracker()); Status status = engine_task.execute(); if (!status.ok()) { LOG_WARNING("failed to check consistency") .tag("signature", req.signature) .tag("tablet_id", check_consistency_req.tablet_id) .error(status); } else { LOG_INFO("successfully check consistency") .tag("signature", req.signature) .tag("tablet_id", check_consistency_req.tablet_id) .tag("checksum", checksum); } TFinishTaskRequest finish_task_request; finish_task_request.__set_backend(BackendOptions::get_local_backend()); finish_task_request.__set_task_type(req.task_type); finish_task_request.__set_signature(req.signature); finish_task_request.__set_task_status(status.to_thrift()); finish_task_request.__set_tablet_checksum(static_cast(checksum)); finish_task_request.__set_request_version(check_consistency_req.version); finish_task(finish_task_request); remove_task_info(req.task_type, req.signature); } void report_task_callback(const TMasterInfo& master_info) { TReportRequest request; if (config::report_random_wait) { random_sleep(5); } request.__isset.tasks = true; { std::lock_guard lock(s_task_signatures_mtx); auto& tasks = request.tasks; for (auto&& [task_type, signatures] : s_task_signatures) { auto& set = tasks[task_type]; for (auto&& signature : signatures) { set.insert(signature); } } } request.__set_backend(BackendOptions::get_local_backend()); bool succ = handle_report(request, master_info, "task"); report_task_total << 1; if (!succ) [[unlikely]] { report_task_failed << 1; } } void report_disk_callback(StorageEngine& engine, const TMasterInfo& master_info) { TReportRequest request; request.__set_backend(BackendOptions::get_local_backend()); request.__isset.disks = true; std::vector data_dir_infos; static_cast(engine.get_all_data_dir_info(&data_dir_infos, true /* update */)); for (auto& root_path_info : data_dir_infos) { TDisk disk; disk.__set_root_path(root_path_info.path); disk.__set_path_hash(root_path_info.path_hash); disk.__set_storage_medium(root_path_info.storage_medium); disk.__set_disk_total_capacity(root_path_info.disk_capacity); disk.__set_data_used_capacity(root_path_info.local_used_capacity); disk.__set_remote_used_capacity(root_path_info.remote_used_capacity); disk.__set_disk_available_capacity(root_path_info.available); disk.__set_trash_used_capacity(root_path_info.trash_used_capacity); disk.__set_used(root_path_info.is_used); request.disks[root_path_info.path] = disk; } request.__set_num_cores(CpuInfo::num_cores()); request.__set_pipeline_executor_size(config::pipeline_executor_size > 0 ? config::pipeline_executor_size : CpuInfo::num_cores()); bool succ = handle_report(request, master_info, "disk"); report_disk_total << 1; if (!succ) [[unlikely]] { report_disk_failed << 1; } } void report_tablet_callback(StorageEngine& engine, const TMasterInfo& master_info) { if (config::report_random_wait) { random_sleep(5); } TReportRequest request; request.__set_backend(BackendOptions::get_local_backend()); request.__isset.tablets = true; uint64_t report_version; for (int i = 0; i < 5; i++) { request.tablets.clear(); report_version = s_report_version; engine.tablet_manager()->build_all_report_tablets_info(&request.tablets); if (report_version == s_report_version) { break; } } if (report_version < s_report_version) { // TODO llj This can only reduce the possibility for report error, but can't avoid it. // If FE create a tablet in FE meta and send CREATE task to this BE, the tablet may not be included in this // report, and the report version has a small probability that it has not been updated in time. When FE // receives this report, it is possible to delete the new tablet. LOG(WARNING) << "report version " << report_version << " change to " << s_report_version; DorisMetrics::instance()->report_all_tablets_requests_skip->increment(1); return; } std::map partitions_version; engine.tablet_manager()->get_partitions_visible_version(&partitions_version); request.__set_partitions_version(std::move(partitions_version)); int64_t max_compaction_score = std::max(DorisMetrics::instance()->tablet_cumulative_max_compaction_score->value(), DorisMetrics::instance()->tablet_base_max_compaction_score->value()); request.__set_tablet_max_compaction_score(max_compaction_score); request.__set_report_version(report_version); // report storage policy and resource auto& storage_policy_list = request.storage_policy; for (auto [id, version] : get_storage_policy_ids()) { auto& storage_policy = storage_policy_list.emplace_back(); storage_policy.__set_id(id); storage_policy.__set_version(version); } request.__isset.storage_policy = true; auto& resource_list = request.resource; for (auto [id, version] : get_storage_resource_ids()) { auto& resource = resource_list.emplace_back(); resource.__set_id(id); resource.__set_version(version); } request.__isset.resource = true; bool succ = handle_report(request, master_info, "tablet"); report_tablet_total << 1; if (!succ) [[unlikely]] { report_tablet_failed << 1; } } void upload_callback(StorageEngine& engine, ExecEnv* env, const TAgentTaskRequest& req) { const auto& upload_request = req.upload_req; LOG(INFO) << "get upload task. signature=" << req.signature << ", job_id=" << upload_request.job_id; std::map> tablet_files; std::unique_ptr loader = std::make_unique( env, upload_request.job_id, req.signature, upload_request.broker_addr, upload_request.broker_prop); Status status = loader->init(upload_request.__isset.storage_backend ? upload_request.storage_backend : TStorageBackendType::type::BROKER, upload_request.__isset.location ? upload_request.location : ""); if (status.ok()) { status = loader->upload(upload_request.src_dest_map, &tablet_files); } if (!status.ok()) { LOG_WARNING("failed to upload") .tag("signature", req.signature) .tag("job_id", upload_request.job_id) .error(status); } else { LOG_INFO("successfully upload") .tag("signature", req.signature) .tag("job_id", upload_request.job_id); } TFinishTaskRequest finish_task_request; finish_task_request.__set_backend(BackendOptions::get_local_backend()); finish_task_request.__set_task_type(req.task_type); finish_task_request.__set_signature(req.signature); finish_task_request.__set_task_status(status.to_thrift()); finish_task_request.__set_tablet_files(tablet_files); finish_task(finish_task_request); remove_task_info(req.task_type, req.signature); } void download_callback(StorageEngine& engine, ExecEnv* env, const TAgentTaskRequest& req) { const auto& download_request = req.download_req; LOG(INFO) << "get download task. signature=" << req.signature << ", job_id=" << download_request.job_id << ", task detail: " << apache::thrift::ThriftDebugString(download_request); // TODO: download std::vector downloaded_tablet_ids; auto status = Status::OK(); if (download_request.__isset.remote_tablet_snapshots) { std::unique_ptr loader = std::make_unique(env, download_request.job_id, req.signature); status = loader->remote_http_download(download_request.remote_tablet_snapshots, &downloaded_tablet_ids); } else { std::unique_ptr loader = std::make_unique( env, download_request.job_id, req.signature, download_request.broker_addr, download_request.broker_prop); status = loader->init(download_request.__isset.storage_backend ? download_request.storage_backend : TStorageBackendType::type::BROKER, download_request.__isset.location ? download_request.location : ""); if (status.ok()) { status = loader->download(download_request.src_dest_map, &downloaded_tablet_ids); } } if (!status.ok()) { LOG_WARNING("failed to download") .tag("signature", req.signature) .tag("job_id", download_request.job_id) .error(status); } else { LOG_INFO("successfully download") .tag("signature", req.signature) .tag("job_id", download_request.job_id); } TFinishTaskRequest finish_task_request; finish_task_request.__set_backend(BackendOptions::get_local_backend()); finish_task_request.__set_task_type(req.task_type); finish_task_request.__set_signature(req.signature); finish_task_request.__set_task_status(status.to_thrift()); finish_task_request.__set_downloaded_tablet_ids(downloaded_tablet_ids); finish_task(finish_task_request); remove_task_info(req.task_type, req.signature); } void make_snapshot_callback(StorageEngine& engine, const TAgentTaskRequest& req) { const auto& snapshot_request = req.snapshot_req; LOG(INFO) << "get snapshot task. signature=" << req.signature; string snapshot_path; bool allow_incremental_clone = false; // not used std::vector snapshot_files; Status status = SnapshotManager::instance()->make_snapshot(snapshot_request, &snapshot_path, &allow_incremental_clone); if (status.ok() && snapshot_request.__isset.list_files) { // list and save all snapshot files // snapshot_path like: data/snapshot/20180417205230.1.86400 // we need to add subdir: tablet_id/schema_hash/ std::vector files; bool exists = true; io::Path path = fmt::format("{}/{}/{}/", snapshot_path, snapshot_request.tablet_id, snapshot_request.schema_hash); status = io::global_local_filesystem()->list(path, true, &files, &exists); if (status.ok()) { for (auto& file : files) { snapshot_files.push_back(file.file_name); } } } if (!status.ok()) { LOG_WARNING("failed to make snapshot") .tag("signature", req.signature) .tag("tablet_id", snapshot_request.tablet_id) .tag("version", snapshot_request.version) .error(status); } else { LOG_INFO("successfully make snapshot") .tag("signature", req.signature) .tag("tablet_id", snapshot_request.tablet_id) .tag("version", snapshot_request.version) .tag("snapshot_path", snapshot_path); } TFinishTaskRequest finish_task_request; finish_task_request.__set_backend(BackendOptions::get_local_backend()); finish_task_request.__set_task_type(req.task_type); finish_task_request.__set_signature(req.signature); finish_task_request.__set_snapshot_path(snapshot_path); finish_task_request.__set_snapshot_files(snapshot_files); finish_task_request.__set_task_status(status.to_thrift()); finish_task(finish_task_request); remove_task_info(req.task_type, req.signature); } void release_snapshot_callback(StorageEngine& engine, const TAgentTaskRequest& req) { const auto& release_snapshot_request = req.release_snapshot_req; LOG(INFO) << "get release snapshot task. signature=" << req.signature; const string& snapshot_path = release_snapshot_request.snapshot_path; Status status = SnapshotManager::instance()->release_snapshot(snapshot_path); if (!status.ok()) { LOG_WARNING("failed to release snapshot") .tag("signature", req.signature) .tag("snapshot_path", snapshot_path) .error(status); } else { LOG_INFO("successfully release snapshot") .tag("signature", req.signature) .tag("snapshot_path", snapshot_path); } TFinishTaskRequest finish_task_request; finish_task_request.__set_backend(BackendOptions::get_local_backend()); finish_task_request.__set_task_type(req.task_type); finish_task_request.__set_signature(req.signature); finish_task_request.__set_task_status(status.to_thrift()); finish_task(finish_task_request); remove_task_info(req.task_type, req.signature); } void move_dir_callback(StorageEngine& engine, ExecEnv* env, const TAgentTaskRequest& req) { const auto& move_dir_req = req.move_dir_req; LOG(INFO) << "get move dir task. signature=" << req.signature << ", job_id=" << move_dir_req.job_id; Status status; auto tablet = engine.tablet_manager()->get_tablet(move_dir_req.tablet_id); if (tablet == nullptr) { status = Status::InvalidArgument("Could not find tablet"); } else { SnapshotLoader loader(env, move_dir_req.job_id, move_dir_req.tablet_id); status = loader.move(move_dir_req.src, tablet, true); } if (!status.ok()) { LOG_WARNING("failed to move dir") .tag("signature", req.signature) .tag("job_id", move_dir_req.job_id) .tag("tablet_id", move_dir_req.tablet_id) .tag("src", move_dir_req.src) .error(status); } else { LOG_INFO("successfully move dir") .tag("signature", req.signature) .tag("job_id", move_dir_req.job_id) .tag("tablet_id", move_dir_req.tablet_id) .tag("src", move_dir_req.src); } TFinishTaskRequest finish_task_request; finish_task_request.__set_backend(BackendOptions::get_local_backend()); finish_task_request.__set_task_type(req.task_type); finish_task_request.__set_signature(req.signature); finish_task_request.__set_task_status(status.to_thrift()); finish_task(finish_task_request); remove_task_info(req.task_type, req.signature); } void submit_table_compaction_callback(StorageEngine& engine, const TAgentTaskRequest& req) { const auto& compaction_req = req.compaction_req; LOG(INFO) << "get compaction task. signature=" << req.signature << ", compaction_type=" << compaction_req.type; CompactionType compaction_type; if (compaction_req.type == "base") { compaction_type = CompactionType::BASE_COMPACTION; } else { compaction_type = CompactionType::CUMULATIVE_COMPACTION; } auto tablet_ptr = engine.tablet_manager()->get_tablet(compaction_req.tablet_id); if (tablet_ptr != nullptr) { auto* data_dir = tablet_ptr->data_dir(); if (!tablet_ptr->can_do_compaction(data_dir->path_hash(), compaction_type)) { LOG(WARNING) << "could not do compaction. tablet_id=" << tablet_ptr->tablet_id() << ", compaction_type=" << compaction_type; return; } Status status = engine.submit_compaction_task(tablet_ptr, compaction_type, false); if (!status.ok()) { LOG(WARNING) << "failed to submit table compaction task. error=" << status; } } } void push_storage_policy_callback(StorageEngine& engine, const TAgentTaskRequest& req) { const auto& push_storage_policy_req = req.push_storage_policy_req; // refresh resource for (auto& resource : push_storage_policy_req.resource) { auto existed_resource = get_storage_resource(resource.id); if (existed_resource.version >= resource.version) { continue; } if (resource.__isset.s3_storage_param) { Status st; S3Conf s3_conf; s3_conf.ak = std::move(resource.s3_storage_param.ak); s3_conf.sk = std::move(resource.s3_storage_param.sk); s3_conf.token = std::move(resource.s3_storage_param.token); s3_conf.endpoint = std::move(resource.s3_storage_param.endpoint); s3_conf.region = std::move(resource.s3_storage_param.region); s3_conf.prefix = std::move(resource.s3_storage_param.root_path); s3_conf.bucket = std::move(resource.s3_storage_param.bucket); s3_conf.connect_timeout_ms = resource.s3_storage_param.conn_timeout_ms; s3_conf.max_connections = resource.s3_storage_param.max_conn; s3_conf.request_timeout_ms = resource.s3_storage_param.request_timeout_ms; // When using cold heat separation in minio, user might use ip address directly, // which needs enable use_virtual_addressing to true s3_conf.use_virtual_addressing = !resource.s3_storage_param.use_path_style; std::shared_ptr fs; if (existed_resource.fs == nullptr) { st = io::S3FileSystem::create(s3_conf, std::to_string(resource.id), nullptr, &fs); } else { fs = std::static_pointer_cast(existed_resource.fs); st = fs->set_conf(s3_conf); } if (!st.ok()) { LOG(WARNING) << "update s3 resource failed: " << st; } else { LOG_INFO("successfully update s3 resource") .tag("resource_id", resource.id) .tag("resource_name", resource.name) .tag("s3_conf", s3_conf.to_string()); put_storage_resource(resource.id, {std::move(fs), resource.version}); } } else if (resource.__isset.hdfs_storage_param) { Status st; std::shared_ptr fs; std::string root_path = resource.hdfs_storage_param.__isset.root_path ? resource.hdfs_storage_param.root_path : ""; if (existed_resource.fs == nullptr) { st = io::HdfsFileSystem::create(resource.hdfs_storage_param, std::to_string(resource.id), root_path, nullptr, &fs); } else { fs = std::static_pointer_cast(existed_resource.fs); } if (!st.ok()) { LOG(WARNING) << "update hdfs resource failed: " << st; } else { LOG_INFO("successfully update hdfs resource") .tag("resource_id", resource.id) .tag("resource_name", resource.name) .tag("root_path", fs->root_path().string()); put_storage_resource(resource.id, {std::move(fs), resource.version}); } } else { LOG(WARNING) << "unknown resource=" << resource; } } // drop storage policy for (auto policy_id : push_storage_policy_req.dropped_storage_policy) { delete_storage_policy(policy_id); } // refresh storage policy for (auto& storage_policy : push_storage_policy_req.storage_policy) { auto existed_storage_policy = get_storage_policy(storage_policy.id); if (existed_storage_policy == nullptr || existed_storage_policy->version < storage_policy.version) { auto storage_policy1 = std::make_shared(); storage_policy1->name = std::move(storage_policy.name); storage_policy1->version = storage_policy.version; storage_policy1->cooldown_datetime = storage_policy.cooldown_datetime; storage_policy1->cooldown_ttl = storage_policy.cooldown_ttl; storage_policy1->resource_id = storage_policy.resource_id; LOG_INFO("successfully update storage policy") .tag("storage_policy_id", storage_policy.id) .tag("storage_policy", storage_policy1->to_string()); put_storage_policy(storage_policy.id, std::move(storage_policy1)); } } } void push_cooldown_conf_callback(StorageEngine& engine, const TAgentTaskRequest& req) { const auto& push_cooldown_conf_req = req.push_cooldown_conf; for (const auto& cooldown_conf : push_cooldown_conf_req.cooldown_confs) { int64_t tablet_id = cooldown_conf.tablet_id; TabletSharedPtr tablet = engine.tablet_manager()->get_tablet(tablet_id); if (tablet == nullptr) { LOG(WARNING) << "failed to get tablet. tablet_id=" << tablet_id; continue; } if (tablet->update_cooldown_conf(cooldown_conf.cooldown_term, cooldown_conf.cooldown_replica_id) && cooldown_conf.cooldown_replica_id == tablet->replica_id() && tablet->tablet_meta()->cooldown_meta_id().initialized()) { Tablet::async_write_cooldown_meta(tablet); } } } void create_tablet_callback(StorageEngine& engine, const TAgentTaskRequest& req) { const auto& create_tablet_req = req.create_tablet_req; RuntimeProfile runtime_profile("CreateTablet"); RuntimeProfile* profile = &runtime_profile; MonotonicStopWatch watch; watch.start(); SCOPED_CLEANUP({ auto elapsed_time = static_cast(watch.elapsed_time()); if (elapsed_time / 1e9 > config::agent_task_trace_threshold_sec) { COUNTER_UPDATE(profile->total_time_counter(), elapsed_time); std::stringstream ss; profile->pretty_print(&ss); LOG(WARNING) << "create tablet " << create_tablet_req.tablet_id << " cost(s) " << elapsed_time / 1e9 << std::endl << ss.str(); } }); DorisMetrics::instance()->create_tablet_requests_total->increment(1); VLOG_NOTICE << "start to create tablet " << create_tablet_req.tablet_id; std::vector finish_tablet_infos; VLOG_NOTICE << "create tablet: " << create_tablet_req; Status status = engine.create_tablet(create_tablet_req, profile); if (!status.ok()) { DorisMetrics::instance()->create_tablet_requests_failed->increment(1); LOG_WARNING("failed to create tablet, reason={}", status.to_string()) .tag("signature", req.signature) .tag("tablet_id", create_tablet_req.tablet_id) .error(status); } else { increase_report_version(); // get path hash of the created tablet TabletSharedPtr tablet; { SCOPED_TIMER(ADD_TIMER(profile, "GetTablet")); tablet = engine.tablet_manager()->get_tablet(create_tablet_req.tablet_id); } DCHECK(tablet != nullptr); TTabletInfo tablet_info; tablet_info.tablet_id = tablet->table_id(); tablet_info.schema_hash = tablet->schema_hash(); tablet_info.version = create_tablet_req.version; // Useless but it is a required field in TTabletInfo tablet_info.version_hash = 0; tablet_info.row_count = 0; tablet_info.data_size = 0; tablet_info.__set_path_hash(tablet->data_dir()->path_hash()); tablet_info.__set_replica_id(tablet->replica_id()); finish_tablet_infos.push_back(tablet_info); LOG_INFO("successfully create tablet") .tag("signature", req.signature) .tag("tablet_id", create_tablet_req.tablet_id); } TFinishTaskRequest finish_task_request; finish_task_request.__set_finish_tablet_infos(finish_tablet_infos); finish_task_request.__set_backend(BackendOptions::get_local_backend()); finish_task_request.__set_report_version(s_report_version); finish_task_request.__set_task_type(req.task_type); finish_task_request.__set_signature(req.signature); finish_task_request.__set_task_status(status.to_thrift()); finish_task(finish_task_request); remove_task_info(req.task_type, req.signature); } void drop_tablet_callback(StorageEngine& engine, const TAgentTaskRequest& req) { const auto& drop_tablet_req = req.drop_tablet_req; Status status; auto dropped_tablet = engine.tablet_manager()->get_tablet(drop_tablet_req.tablet_id, false); if (dropped_tablet != nullptr) { status = engine.tablet_manager()->drop_tablet(drop_tablet_req.tablet_id, drop_tablet_req.replica_id, drop_tablet_req.is_drop_table_or_partition); } else { status = Status::NotFound("could not find tablet {}", drop_tablet_req.tablet_id); } if (status.ok()) { // if tablet is dropped by fe, then the related txn should also be removed engine.txn_manager()->force_rollback_tablet_related_txns( dropped_tablet->data_dir()->get_meta(), drop_tablet_req.tablet_id, dropped_tablet->tablet_uid()); LOG_INFO("successfully drop tablet") .tag("signature", req.signature) .tag("tablet_id", drop_tablet_req.tablet_id); } else { LOG_WARNING("failed to drop tablet") .tag("signature", req.signature) .tag("tablet_id", drop_tablet_req.tablet_id) .error(status); } TFinishTaskRequest finish_task_request; finish_task_request.__set_backend(BackendOptions::get_local_backend()); finish_task_request.__set_task_type(req.task_type); finish_task_request.__set_signature(req.signature); finish_task_request.__set_task_status(status.to_thrift()); finish_task(finish_task_request); remove_task_info(req.task_type, req.signature); } void push_callback(const TAgentTaskRequest& req) { const auto& push_req = req.push_req; LOG(INFO) << "get push task. signature=" << req.signature << " push_type=" << push_req.push_type; std::vector tablet_infos; EngineBatchLoadTask engine_task(const_cast(push_req), &tablet_infos); SCOPED_ATTACH_TASK(engine_task.mem_tracker()); auto status = engine_task.execute(); // Return result to fe TFinishTaskRequest finish_task_request; finish_task_request.__set_backend(BackendOptions::get_local_backend()); finish_task_request.__set_task_type(req.task_type); finish_task_request.__set_signature(req.signature); if (push_req.push_type == TPushType::DELETE) { finish_task_request.__set_request_version(push_req.version); } if (status.ok()) { LOG_INFO("successfully execute push task") .tag("signature", req.signature) .tag("tablet_id", push_req.tablet_id) .tag("push_type", push_req.push_type); increase_report_version(); finish_task_request.__set_finish_tablet_infos(tablet_infos); } else { LOG_WARNING("failed to execute push task") .tag("signature", req.signature) .tag("tablet_id", push_req.tablet_id) .tag("push_type", push_req.push_type) .error(status); } finish_task_request.__set_task_status(status.to_thrift()); finish_task_request.__set_report_version(s_report_version); finish_task(finish_task_request); remove_task_info(req.task_type, req.signature); } PublishVersionWorkerPool::PublishVersionWorkerPool(StorageEngine& engine) : TaskWorkerPool("PUBLISH_VERSION", config::publish_version_worker_count, [this](const TAgentTaskRequest& task) { publish_version_callback(task); }), _engine(engine) {} PublishVersionWorkerPool::~PublishVersionWorkerPool() = default; void PublishVersionWorkerPool::publish_version_callback(const TAgentTaskRequest& req) { const auto& publish_version_req = req.publish_version_req; DorisMetrics::instance()->publish_task_request_total->increment(1); VLOG_NOTICE << "get publish version task. signature=" << req.signature; std::set error_tablet_ids; std::map succ_tablets; // partition_id, tablet_id, publish_version std::vector> discontinuous_version_tablets; std::map table_id_to_num_delta_rows; uint32_t retry_time = 0; Status status; constexpr uint32_t PUBLISH_VERSION_MAX_RETRY = 3; while (retry_time < PUBLISH_VERSION_MAX_RETRY) { succ_tablets.clear(); error_tablet_ids.clear(); table_id_to_num_delta_rows.clear(); EnginePublishVersionTask engine_task(publish_version_req, &error_tablet_ids, &succ_tablets, &discontinuous_version_tablets, &table_id_to_num_delta_rows); SCOPED_ATTACH_TASK(engine_task.mem_tracker()); status = engine_task.execute(); if (status.ok()) { break; } if (status.is()) { // there are too many missing versions, it has been be added to async // publish task, so no need to retry here. if (discontinuous_version_tablets.empty()) { break; } LOG_EVERY_SECOND(INFO) << "wait for previous publish version task to be done, " << "transaction_id: " << publish_version_req.transaction_id; int64_t time_elapsed = time(nullptr) - req.recv_time; if (time_elapsed > config::publish_version_task_timeout_s) { LOG(INFO) << "task elapsed " << time_elapsed << " seconds since it is inserted to queue, it is timeout"; break; } // Version not continuous, put to queue and wait pre version publish task execute PUBLISH_VERSION_count << 1; auto st = _thread_pool->submit_func([this, req] { this->publish_version_callback(req); PUBLISH_VERSION_count << -1; }); if (!st.ok()) [[unlikely]] { PUBLISH_VERSION_count << -1; status = std::move(st); } else { return; } } LOG_WARNING("failed to publish version") .tag("transaction_id", publish_version_req.transaction_id) .tag("error_tablets_num", error_tablet_ids.size()) .tag("retry_time", retry_time) .error(status); ++retry_time; } for (auto& item : discontinuous_version_tablets) { _engine.add_async_publish_task(std::get<0>(item), std::get<1>(item), std::get<2>(item), publish_version_req.transaction_id, false); } TFinishTaskRequest finish_task_request; if (!status.ok()) [[unlikely]] { DorisMetrics::instance()->publish_task_failed_total->increment(1); // if publish failed, return failed, FE will ignore this error and // check error tablet ids and FE will also republish this task LOG_WARNING("failed to publish version") .tag("signature", req.signature) .tag("transaction_id", publish_version_req.transaction_id) .tag("error_tablets_num", error_tablet_ids.size()) .error(status); } else { if (!config::disable_auto_compaction && !GlobalMemoryArbitrator::is_exceed_soft_mem_limit(GB_EXCHANGE_BYTE)) { for (auto [tablet_id, _] : succ_tablets) { TabletSharedPtr tablet = _engine.tablet_manager()->get_tablet(tablet_id); if (tablet != nullptr) { if (!tablet->tablet_meta()->tablet_schema()->disable_auto_compaction()) { tablet->published_count.fetch_add(1); int64_t published_count = tablet->published_count.load(); if (tablet->exceed_version_limit(config::max_tablet_version_num * 2 / 3) && published_count % 20 == 0) { auto st = _engine.submit_compaction_task( tablet, CompactionType::CUMULATIVE_COMPACTION, true, false); if (!st.ok()) [[unlikely]] { LOG(WARNING) << "trigger compaction failed, tablet_id=" << tablet_id << ", published=" << published_count << " : " << st; } else { LOG(INFO) << "trigger compaction succ, tablet_id:" << tablet_id << ", published:" << published_count; } } } } else { LOG(WARNING) << "trigger compaction failed, tablet_id:" << tablet_id; } } } uint32_t cost_second = time(nullptr) - req.recv_time; g_publish_version_latency << cost_second; LOG_INFO("successfully publish version") .tag("signature", req.signature) .tag("transaction_id", publish_version_req.transaction_id) .tag("tablets_num", succ_tablets.size()) .tag("cost(s)", cost_second); } status.to_thrift(&finish_task_request.task_status); finish_task_request.__set_backend(BackendOptions::get_local_backend()); finish_task_request.__set_task_type(req.task_type); finish_task_request.__set_signature(req.signature); finish_task_request.__set_report_version(s_report_version); finish_task_request.__set_succ_tablets(succ_tablets); finish_task_request.__set_error_tablet_ids( std::vector(error_tablet_ids.begin(), error_tablet_ids.end())); finish_task_request.__set_table_id_to_delta_num_rows(table_id_to_num_delta_rows); finish_task(finish_task_request); remove_task_info(req.task_type, req.signature); } void clear_transaction_task_callback(StorageEngine& engine, const TAgentTaskRequest& req) { const auto& clear_transaction_task_req = req.clear_transaction_task_req; LOG(INFO) << "get clear transaction task. signature=" << req.signature << ", transaction_id=" << clear_transaction_task_req.transaction_id << ", partition_id_size=" << clear_transaction_task_req.partition_id.size(); Status status; if (clear_transaction_task_req.transaction_id > 0) { // transaction_id should be greater than zero. // If it is not greater than zero, no need to execute // the following clear_transaction_task() function. if (!clear_transaction_task_req.partition_id.empty()) { engine.clear_transaction_task(clear_transaction_task_req.transaction_id, clear_transaction_task_req.partition_id); } else { engine.clear_transaction_task(clear_transaction_task_req.transaction_id); } LOG(INFO) << "finish to clear transaction task. signature=" << req.signature << ", transaction_id=" << clear_transaction_task_req.transaction_id; } else { LOG(WARNING) << "invalid transaction id " << clear_transaction_task_req.transaction_id << ". signature= " << req.signature; } TFinishTaskRequest finish_task_request; finish_task_request.__set_task_status(status.to_thrift()); finish_task_request.__set_backend(BackendOptions::get_local_backend()); finish_task_request.__set_task_type(req.task_type); finish_task_request.__set_signature(req.signature); finish_task(finish_task_request); remove_task_info(req.task_type, req.signature); } void alter_tablet_callback(StorageEngine& engine, const TAgentTaskRequest& req) { int64_t signature = req.signature; LOG(INFO) << "get alter table task, signature: " << signature; bool is_task_timeout = false; if (req.__isset.recv_time) { int64_t time_elapsed = time(nullptr) - req.recv_time; if (time_elapsed > config::report_task_interval_seconds * 20) { LOG(INFO) << "task elapsed " << time_elapsed << " seconds since it is inserted to queue, it is timeout"; is_task_timeout = true; } } if (!is_task_timeout) { TFinishTaskRequest finish_task_request; TTaskType::type task_type = req.task_type; switch (task_type) { case TTaskType::ALTER: alter_tablet(engine, req, signature, task_type, &finish_task_request); break; default: // pass break; } finish_task(finish_task_request); } remove_task_info(req.task_type, req.signature); } void gc_binlog_callback(StorageEngine& engine, const TAgentTaskRequest& req) { std::unordered_map gc_tablet_infos; if (!req.__isset.gc_binlog_req) { LOG(WARNING) << "gc binlog task is not valid"; return; } if (!req.gc_binlog_req.__isset.tablet_gc_binlog_infos) { LOG(WARNING) << "gc binlog task tablet_gc_binlog_infos is not valid"; return; } const auto& tablet_gc_binlog_infos = req.gc_binlog_req.tablet_gc_binlog_infos; for (auto&& tablet_info : tablet_gc_binlog_infos) { // gc_tablet_infos.emplace(tablet_info.tablet_id, tablet_info.schema_hash); gc_tablet_infos.emplace(tablet_info.tablet_id, tablet_info.version); } engine.gc_binlogs(gc_tablet_infos); } void visible_version_callback(StorageEngine& engine, const TAgentTaskRequest& req) { const TVisibleVersionReq& visible_version_req = req.visible_version_req; engine.tablet_manager()->update_partitions_visible_version( visible_version_req.partition_version); } void clone_callback(StorageEngine& engine, const TMasterInfo& master_info, const TAgentTaskRequest& req) { const auto& clone_req = req.clone_req; DorisMetrics::instance()->clone_requests_total->increment(1); LOG(INFO) << "get clone task. signature=" << req.signature; std::vector tablet_infos; EngineCloneTask engine_task(clone_req, master_info, req.signature, &tablet_infos); SCOPED_ATTACH_TASK(engine_task.mem_tracker()); auto status = engine_task.execute(); // Return result to fe TFinishTaskRequest finish_task_request; finish_task_request.__set_backend(BackendOptions::get_local_backend()); finish_task_request.__set_task_type(req.task_type); finish_task_request.__set_signature(req.signature); finish_task_request.__set_task_status(status.to_thrift()); if (!status.ok()) { DorisMetrics::instance()->clone_requests_failed->increment(1); LOG_WARNING("failed to clone tablet") .tag("signature", req.signature) .tag("tablet_id", clone_req.tablet_id) .error(status); } else { LOG_INFO("successfully clone tablet") .tag("signature", req.signature) .tag("tablet_id", clone_req.tablet_id); if (engine_task.is_new_tablet()) { increase_report_version(); finish_task_request.__set_report_version(s_report_version); } finish_task_request.__set_finish_tablet_infos(tablet_infos); } finish_task(finish_task_request); remove_task_info(req.task_type, req.signature); } void storage_medium_migrate_callback(StorageEngine& engine, const TAgentTaskRequest& req) { const auto& storage_medium_migrate_req = req.storage_medium_migrate_req; // check request and get info TabletSharedPtr tablet; DataDir* dest_store = nullptr; auto status = check_migrate_request(engine, storage_medium_migrate_req, tablet, &dest_store); if (status.ok()) { EngineStorageMigrationTask engine_task(tablet, dest_store); SCOPED_ATTACH_TASK(engine_task.mem_tracker()); status = engine_task.execute(); } // fe should ignore this err if (status.is()) { status = Status::OK(); } if (!status.ok()) { LOG_WARNING("failed to migrate storage medium") .tag("signature", req.signature) .tag("tablet_id", storage_medium_migrate_req.tablet_id) .error(status); } else { LOG_INFO("successfully migrate storage medium") .tag("signature", req.signature) .tag("tablet_id", storage_medium_migrate_req.tablet_id); } TFinishTaskRequest finish_task_request; finish_task_request.__set_backend(BackendOptions::get_local_backend()); finish_task_request.__set_task_type(req.task_type); finish_task_request.__set_signature(req.signature); finish_task_request.__set_task_status(status.to_thrift()); finish_task(finish_task_request); remove_task_info(req.task_type, req.signature); } void clean_trash_callback(StorageEngine& engine, const TAgentTaskRequest& req) { LOG(INFO) << "clean trash start"; DBUG_EXECUTE_IF("clean_trash_callback_sleep", { sleep(100); }) static_cast(engine.start_trash_sweep(nullptr, true)); static_cast(engine.notify_listener("REPORT_DISK_STATE")); LOG(INFO) << "clean trash finish"; } } // namespace doris