diff --git a/be/src/agent/agent_server.cpp b/be/src/agent/agent_server.cpp index 4660902333..8198c38e45 100644 --- a/be/src/agent/agent_server.cpp +++ b/be/src/agent/agent_server.cpp @@ -29,8 +29,8 @@ #include "agent/task_worker_pool.h" #include "agent/topic_subscriber.h" -#include "agent/user_resource_listener.h" #include "agent/utils.h" +#include "agent/workload_group_listener.h" #include "common/logging.h" #include "common/status.h" #include "gutil/strings/substitute.h" @@ -39,10 +39,6 @@ #include "olap/snapshot_manager.h" #include "runtime/exec_env.h" -namespace doris { -class TopicListener; -} // namespace doris - using std::string; using std::vector; @@ -134,9 +130,10 @@ AgentServer::AgentServer(ExecEnv* exec_env, const TMasterInfo& master_info) #if !defined(BE_TEST) && !defined(__APPLE__) // Add subscriber here and register listeners - TopicListener* user_resource_listener = new UserResourceListener(exec_env, master_info); - LOG(INFO) << "Register user resource listener"; - _topic_subscriber->register_listener(doris::TTopicType::type::RESOURCE, user_resource_listener); + std::unique_ptr wg_listener = std::make_unique(exec_env); + LOG(INFO) << "Register workload group listener"; + _topic_subscriber->register_listener(doris::TTopicInfoType::type::WORKLOAD_GROUP, + std::move(wg_listener)); #endif } diff --git a/be/src/agent/agent_server.h b/be/src/agent/agent_server.h index daa1823b07..a3e5cbd745 100644 --- a/be/src/agent/agent_server.h +++ b/be/src/agent/agent_server.h @@ -23,10 +23,11 @@ #include #include +#include "agent/topic_subscriber.h" + namespace doris { class TaskWorkerPool; -class TopicSubscriber; class ExecEnv; class TAgentPublishRequest; class TAgentResult; @@ -52,6 +53,8 @@ public: // [[deprecated]] void publish_cluster_state(TAgentResult& agent_result, const TAgentPublishRequest& request); + TopicSubscriber* get_topic_subscriber() { return _topic_subscriber.get(); } + private: DISALLOW_COPY_AND_ASSIGN(AgentServer); diff --git a/be/src/agent/topic_listener.h b/be/src/agent/topic_listener.h index 0ef9c597f1..af99a78545 100644 --- a/be/src/agent/topic_listener.h +++ b/be/src/agent/topic_listener.h @@ -17,19 +17,14 @@ #pragma once -#include +#include namespace doris { class TopicListener { public: virtual ~TopicListener() {} - // Deal with a single update - // - // Input parameters: - // protocol version: the version for the protocol, listeners should deal with the msg according to the protocol - // topic_update: single update - virtual void handle_update(const TAgentServiceVersion::type& protocol_version, - const TTopicUpdate& topic_update) = 0; + + virtual void handle_topic_info(const TPublishTopicRequest& topic_request) = 0; }; } // namespace doris diff --git a/be/src/agent/topic_subscriber.cpp b/be/src/agent/topic_subscriber.cpp index c5c4a324eb..c3bcc29c62 100644 --- a/be/src/agent/topic_subscriber.cpp +++ b/be/src/agent/topic_subscriber.cpp @@ -17,7 +17,7 @@ #include "agent/topic_subscriber.h" -#include +#include #include #include @@ -28,38 +28,22 @@ namespace doris { TopicSubscriber::TopicSubscriber() {} -TopicSubscriber::~TopicSubscriber() { - // Delete all listeners in the register - std::map>::iterator it = - _registered_listeners.begin(); - for (; it != _registered_listeners.end(); ++it) { - std::vector& listeners = it->second; - std::vector::iterator listener_it = listeners.begin(); - for (; listener_it != listeners.end(); ++listener_it) { - delete *listener_it; - } - } -} - -void TopicSubscriber::register_listener(TTopicType::type topic_type, TopicListener* listener) { +void TopicSubscriber::register_listener(TTopicInfoType::type topic_type, + std::unique_ptr topic_listener) { // Unique lock here to prevent access to listeners std::lock_guard lock(_listener_mtx); - this->_registered_listeners[topic_type].push_back(listener); + this->_registered_listeners.emplace(topic_type, std::move(topic_listener)); } -void TopicSubscriber::handle_updates(const TAgentPublishRequest& agent_publish_request) { - // Shared lock here in order to avoid updates in listeners' map +void TopicSubscriber::handle_topic_info(const TPublishTopicRequest& topic_request) { + // NOTE(wb): if we found there is bottleneck for handle_topic_info by LOG(INFO) + // eg, update workload info may delay other listener, then we need add a thread here + // to handle_topic_info asynchronous std::shared_lock lock(_listener_mtx); - // Currently, not deal with protocol version, the listener should deal with protocol version - const std::vector& topic_updates = agent_publish_request.updates; - std::vector::const_iterator topic_update_it = topic_updates.begin(); - for (; topic_update_it != topic_updates.end(); ++topic_update_it) { - std::vector& listeners = this->_registered_listeners[topic_update_it->type]; - std::vector::iterator listener_it = listeners.begin(); - // Send the update to all listeners with protocol version. - for (; listener_it != listeners.end(); ++listener_it) { - (*listener_it)->handle_update(agent_publish_request.protocol_version, *topic_update_it); - } + LOG(INFO) << "begin handle topic info"; + for (auto& listener_pair : _registered_listeners) { + listener_pair.second->handle_topic_info(topic_request); + LOG(INFO) << "handle topic " << listener_pair.first << " succ"; } } } // namespace doris diff --git a/be/src/agent/topic_subscriber.h b/be/src/agent/topic_subscriber.h index 490bd35d2b..7adcd0ea37 100644 --- a/be/src/agent/topic_subscriber.h +++ b/be/src/agent/topic_subscriber.h @@ -17,7 +17,8 @@ #pragma once -#include +#include +#include #include #include @@ -29,14 +30,15 @@ class TopicListener; class TopicSubscriber { public: TopicSubscriber(); - ~TopicSubscriber(); - // Put the topic type and listener to the map - void register_listener(TTopicType::type topic_type, TopicListener* listener); - // Handle all updates in the request - void handle_updates(const TAgentPublishRequest& agent_publish_request); + ~TopicSubscriber() = default; + + void register_listener(TTopicInfoType::type topic_type, + std::unique_ptr topic_listener); + + void handle_topic_info(const TPublishTopicRequest& topic_request); private: - std::map> _registered_listeners; + std::map> _registered_listeners; std::shared_mutex _listener_mtx; }; } // namespace doris diff --git a/be/src/agent/user_resource_listener.cpp b/be/src/agent/user_resource_listener.cpp deleted file mode 100644 index 6a73edf6db..0000000000 --- a/be/src/agent/user_resource_listener.cpp +++ /dev/null @@ -1,105 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "agent/user_resource_listener.h" - -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include -#include -#include - -#include "common/config.h" -#include "common/status.h" -#include "runtime/client_cache.h" -#include "runtime/exec_env.h" - -namespace doris { - -using std::string; -using apache::thrift::TException; -using apache::thrift::transport::TTransportException; - -// Initialize the resource to cgroups file mapping -// TRESOURCE_IOPS not mapped - -UserResourceListener::UserResourceListener(ExecEnv* exec_env, const TMasterInfo& master_info) - : _master_info(master_info), _exec_env(exec_env) {} - -UserResourceListener::~UserResourceListener() {} - -void UserResourceListener::handle_update(const TAgentServiceVersion::type& protocol_version, - const TTopicUpdate& topic_update) { - std::vector updates = topic_update.updates; - if (updates.size() > 0) { - int64_t new_version = updates[0].int_value; - // Async call to update users resource method - auto res = std::async(std::launch::async, &UserResourceListener::update_users_resource, - this, new_version); - res.get(); - } -} - -void UserResourceListener::update_users_resource(int64_t new_version) { - // Call fe to get latest user resource - Status master_status; - // using 500ms as default timeout value - FrontendServiceConnection client(_exec_env->frontend_client_cache(), - _master_info.network_address, config::thrift_rpc_timeout_ms, - &master_status); - TFetchResourceResult new_fetched_resource; - if (!master_status.ok()) { - LOG(ERROR) << "Get frontend client failed, with address:" - << _master_info.network_address.hostname << ":" - << _master_info.network_address.port; - return; - } - try { - try { - client->fetchResource(new_fetched_resource); - } catch (TTransportException& e) { - // reopen the client and set timeout to 500ms - master_status = client.reopen(config::thrift_rpc_timeout_ms); - - if (!master_status.ok()) { - LOG(WARNING) << "Reopen to get frontend client failed, with address:" - << _master_info.network_address.hostname << ":" - << _master_info.network_address.port << ", reason=" << e.what(); - return; - } - LOG(WARNING) << "fetchResource from frontend failed" - << ", reason=" << e.what(); - client->fetchResource(new_fetched_resource); - } - } catch (TException& e) { - // Already try twice, log here - static_cast(client.reopen(config::thrift_rpc_timeout_ms)); - LOG(WARNING) << "retry to fetchResource from " << _master_info.network_address.hostname - << ":" << _master_info.network_address.port << " failed:\n" - << e.what(); - return; - } -} -} // namespace doris diff --git a/be/src/agent/workload_group_listener.cpp b/be/src/agent/workload_group_listener.cpp new file mode 100644 index 0000000000..bf27861c28 --- /dev/null +++ b/be/src/agent/workload_group_listener.cpp @@ -0,0 +1,43 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "agent/workload_group_listener.h" + +#include "runtime/task_group/task_group.h" +#include "runtime/task_group/task_group_manager.h" +#include "util/mem_info.h" +#include "util/parse_util.h" + +namespace doris { + +void WorkloadGroupListener::handle_topic_info(const TPublishTopicRequest& topic_request) { + std::set current_wg_ids; + for (const TopicInfo& topic_info : topic_request.topic_list) { + if (topic_info.topic_type != doris::TTopicInfoType::type::WORKLOAD_GROUP) { + continue; + } + + int wg_id = 0; + auto iter2 = topic_info.info_map.find("id"); + std::from_chars(iter2->second.c_str(), iter2->second.c_str() + iter2->second.size(), wg_id); + + current_wg_ids.insert(wg_id); + } + + _exec_env->task_group_manager()->delete_task_group_by_ids(current_wg_ids); +} +} // namespace doris \ No newline at end of file diff --git a/be/src/agent/user_resource_listener.h b/be/src/agent/workload_group_listener.h similarity index 54% rename from be/src/agent/user_resource_listener.h rename to be/src/agent/workload_group_listener.h index 6fd25bef67..d31b1c4ef6 100644 --- a/be/src/agent/user_resource_listener.h +++ b/be/src/agent/workload_group_listener.h @@ -17,31 +17,21 @@ #pragma once -#include -#include +#include #include "agent/topic_listener.h" +#include "runtime/exec_env.h" namespace doris { -class ExecEnv; -class TMasterInfo; - -class UserResourceListener : public TopicListener { +class WorkloadGroupListener : public TopicListener { public: - ~UserResourceListener(); - // Input parameters: - // root_cgroups_path: root cgroups allocated by admin to doris - UserResourceListener(ExecEnv* exec_env, const TMasterInfo& master_info); - // This method should be async - virtual void handle_update(const TAgentServiceVersion::type& protocol_version, - const TTopicUpdate& topic_update); + ~WorkloadGroupListener() {} + WorkloadGroupListener(ExecEnv* exec_env) : _exec_env(exec_env) {} + + void handle_topic_info(const TPublishTopicRequest& topic_request) override; private: - const TMasterInfo& _master_info; ExecEnv* _exec_env; - // Call cgroups mgr to update user's cgroups resource share - // Also refresh local user resource's cache - void update_users_resource(int64_t new_version); }; -} // namespace doris +} // namespace doris \ No newline at end of file diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp index 4ef5def6f1..cdd934d5c7 100644 --- a/be/src/pipeline/task_scheduler.cpp +++ b/be/src/pipeline/task_scheduler.cpp @@ -189,6 +189,7 @@ void BlockedTaskScheduler::_make_task_run(std::list& local_tasks, TaskScheduler::~TaskScheduler() { stop(); + LOG(INFO) << "Task scheduler " << _name << " shutdown"; } Status TaskScheduler::start() { @@ -363,7 +364,6 @@ void TaskScheduler::_try_close_task(PipelineTask* task, PipelineTaskState state, void TaskScheduler::stop() { if (!this->_shutdown.load()) { this->_shutdown.store(true); - _blocked_task_scheduler->shutdown(); if (_task_queue) { _task_queue->close(); } diff --git a/be/src/runtime/task_group/task_group_manager.cpp b/be/src/runtime/task_group/task_group_manager.cpp index e6ed60148e..b3c24fa96e 100644 --- a/be/src/runtime/task_group/task_group_manager.cpp +++ b/be/src/runtime/task_group/task_group_manager.cpp @@ -122,6 +122,56 @@ Status TaskGroupManager::create_and_get_task_scheduler(uint64_t tg_id, std::stri return Status::OK(); } +void TaskGroupManager::delete_task_group_by_ids(std::set id_set) { + { + std::lock_guard w_lock(_group_mutex); + for (auto iter = _task_groups.begin(); iter != _task_groups.end();) { + uint64_t tg_id = iter->first; + if (id_set.find(tg_id) == id_set.end()) { + iter = _task_groups.erase(iter); + } else { + iter++; + } + } + } + + // stop task sche may cost some time, so it should not be locked + // task scheduler is stoped in task scheduler's destructor + std::set> task_sche_to_del; + std::set> scan_task_sche_to_del; + { + std::lock_guard lock(_task_scheduler_lock); + for (auto iter = _tg_sche_map.begin(); iter != _tg_sche_map.end();) { + uint64_t tg_id = iter->first; + if (id_set.find(tg_id) == id_set.end()) { + task_sche_to_del.insert(std::move(_tg_sche_map[tg_id])); + iter = _tg_sche_map.erase(iter); + } else { + iter++; + } + } + + for (auto iter = _tg_scan_sche_map.begin(); iter != _tg_scan_sche_map.end();) { + uint64_t tg_id = iter->first; + if (id_set.find(tg_id) == id_set.end()) { + scan_task_sche_to_del.insert(std::move(_tg_scan_sche_map[tg_id])); + iter = _tg_scan_sche_map.erase(iter); + } else { + iter++; + } + } + + for (auto iter = _cgroup_ctl_map.begin(); iter != _cgroup_ctl_map.end();) { + uint64_t tg_id = iter->first; + if (id_set.find(tg_id) == id_set.end()) { + iter = _cgroup_ctl_map.erase(iter); + } else { + iter++; + } + } + } +} + void TaskGroupManager::stop() { for (auto& task_sche : _tg_sche_map) { task_sche.second->stop(); diff --git a/be/src/runtime/task_group/task_group_manager.h b/be/src/runtime/task_group/task_group_manager.h index e45cdeca7e..ae501e93f3 100644 --- a/be/src/runtime/task_group/task_group_manager.h +++ b/be/src/runtime/task_group/task_group_manager.h @@ -54,6 +54,8 @@ public: Status create_and_get_task_scheduler(uint64_t wg_id, std::string wg_name, int cpu_hard_limit, ExecEnv* exec_env, QueryContext* query_ctx_ptr); + void delete_task_group_by_ids(std::set id_set); + void stop(); private: diff --git a/be/src/service/backend_service.h b/be/src/service/backend_service.h index e98dd65a8c..8ad55e43e6 100644 --- a/be/src/service/backend_service.h +++ b/be/src/service/backend_service.h @@ -90,6 +90,11 @@ public: _agent_server->publish_cluster_state(result, request); } + void publish_topic_info(TPublishTopicResult& result, + const TPublishTopicRequest& topic_request) override { + _agent_server->get_topic_subscriber()->handle_topic_info(topic_request); + } + // DorisServer service void exec_plan_fragment(TExecPlanFragmentResult& return_val, const TExecPlanFragmentParams& params) override; diff --git a/be/src/vec/exec/scan/scanner_scheduler.h b/be/src/vec/exec/scan/scanner_scheduler.h index ad6f86c4f1..32048458ed 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.h +++ b/be/src/vec/exec/scan/scanner_scheduler.h @@ -144,8 +144,14 @@ public: _wg_name = wg_name; } + ~SimplifiedScanScheduler() { + stop(); + LOG(INFO) << "Scanner sche " << _wg_name << " shutdown"; + } + void stop() { _is_stop.store(true); + _scan_task_queue->shutdown(); _scan_thread_pool->shutdown(); _scan_thread_pool->wait(); } @@ -169,8 +175,9 @@ private: void _work() { while (!_is_stop.load()) { SimplifiedScanTask scan_task; - _scan_task_queue->blocking_get(&scan_task); - scan_task.scan_func(); + if (_scan_task_queue->blocking_get(&scan_task)) { + scan_task.scan_func(); + }; } } diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index c50acaa087..c9b75cb84f 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2268,4 +2268,6 @@ public class Config extends ConfigBase { }) public static int sync_image_timeout_second = 300; + @ConfField(mutable = true, masterOnly = true) + public static int publish_topic_info_interval_ms = 30000; // 30s } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 8e46daaf93..9a6d13552d 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -113,6 +113,9 @@ import org.apache.doris.common.ThreadPoolManager; import org.apache.doris.common.UserException; import org.apache.doris.common.io.CountingDataOutputStream; import org.apache.doris.common.io.Text; +import org.apache.doris.common.publish.TopicPublisher; +import org.apache.doris.common.publish.TopicPublisherThread; +import org.apache.doris.common.publish.WorkloadGroupPublisher; import org.apache.doris.common.util.Daemon; import org.apache.doris.common.util.DynamicPartitionUtil; import org.apache.doris.common.util.HttpURLUtil; @@ -492,6 +495,8 @@ public class Env { private HiveTransactionMgr hiveTransactionMgr; + private TopicPublisherThread topicPublisherThread; + public List getFrontendInfos() { List res = new ArrayList<>(); @@ -726,6 +731,8 @@ public class Env { this.binlogGcer = new BinlogGcer(); this.columnIdFlusher = new ColumnIdFlushDaemon(); this.queryCancelWorker = new QueryCancelWorker(systemInfo); + this.topicPublisherThread = new TopicPublisherThread( + "TopicPublisher", Config.publish_topic_info_interval_ms, systemInfo); } public static void destroyCheckpoint() { @@ -970,6 +977,10 @@ public class Env { } queryCancelWorker.start(); + + TopicPublisher wgPublisher = new WorkloadGroupPublisher(this); + topicPublisherThread.addToTopicPublisherList(wgPublisher); + topicPublisherThread.start(); } // wait until FE is ready. diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/publish/AckResponseHandler.java b/fe/fe-core/src/main/java/org/apache/doris/common/publish/AckResponseHandler.java index b95469d691..f9d15a1ae5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/publish/AckResponseHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/publish/AckResponseHandler.java @@ -30,15 +30,24 @@ public class AckResponseHandler extends ResponseHandler { this.listener = listener; } + public AckResponseHandler(Collection nodes) { + super(nodes); + this.listener = null; + } + @Override public void onResponse(Backend node) { super.onResponse(node); - listener.onResponse(node); + if (listener != null) { + listener.onResponse(node); + } } @Override public void onFailure(Backend node, Throwable t) { super.onFailure(node, t); - listener.onFailure(node, t); + if (listener != null) { + listener.onFailure(node, t); + } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/publish/TopicPublisher.java b/fe/fe-core/src/main/java/org/apache/doris/common/publish/TopicPublisher.java new file mode 100644 index 0000000000..24086cb0e7 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/publish/TopicPublisher.java @@ -0,0 +1,24 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.publish; + +import org.apache.doris.thrift.TPublishTopicRequest; + +public interface TopicPublisher { + public void getTopicInfo(TPublishTopicRequest req); +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/publish/TopicPublisherThread.java b/fe/fe-core/src/main/java/org/apache/doris/common/publish/TopicPublisherThread.java new file mode 100644 index 0000000000..616c8a30b5 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/publish/TopicPublisherThread.java @@ -0,0 +1,121 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.publish; + +import org.apache.doris.common.ClientPool; +import org.apache.doris.common.Config; +import org.apache.doris.common.ThreadPoolManager; +import org.apache.doris.common.util.MasterDaemon; +import org.apache.doris.system.Backend; +import org.apache.doris.system.SystemInfoService; +import org.apache.doris.thrift.BackendService; +import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TPublishTopicRequest; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.ExecutorService; + +public class TopicPublisherThread extends MasterDaemon { + + private static final Logger LOG = LogManager.getLogger(TopicPublisherThread.class); + + private SystemInfoService clusterInfoService; + + private ExecutorService executor = ThreadPoolManager + .newDaemonFixedThreadPool(6, 256, "topic-publish-thread", true); + + public TopicPublisherThread(String name, long intervalMs, + SystemInfoService clusterInfoService) { + super(name, intervalMs); + this.clusterInfoService = clusterInfoService; + } + + private List topicPublisherList = new ArrayList<>(); + + public void addToTopicPublisherList(TopicPublisher topicPublisher) { + this.topicPublisherList.add(topicPublisher); + } + + @Override + protected void runAfterCatalogReady() { + if (!Config.enable_workload_group) { + return; + } + LOG.info("begin publish topic info"); + // step 1: get all publish topic info + TPublishTopicRequest request = new TPublishTopicRequest(); + for (TopicPublisher topicPublisher : topicPublisherList) { + topicPublisher.getTopicInfo(request); + } + + // step 2: publish topic info to all be + Collection nodesToPublish = clusterInfoService.getIdToBackend().values(); + AckResponseHandler handler = new AckResponseHandler(nodesToPublish); + for (Backend be : nodesToPublish) { + executor.submit(new TopicPublishWorker(request, be, handler)); + } + try { + int timeoutMs = Config.publish_topic_info_interval_ms / 3 * 2; + if (!handler.awaitAllInMs(timeoutMs)) { + Backend[] backends = handler.pendingNodes(); + if (backends.length > 0) { + LOG.warn("timed out waiting for all nodes to publish. (pending nodes: {})", + Arrays.toString(backends)); + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + public class TopicPublishWorker implements Runnable { + private TPublishTopicRequest request; + private Backend be; + private ResponseHandler handler; + + public TopicPublishWorker(TPublishTopicRequest request, Backend node, ResponseHandler handler) { + this.request = request; + this.be = node; + this.handler = handler; + } + + @Override + public void run() { + long beginTime = System.currentTimeMillis(); + try { + TNetworkAddress addr = new TNetworkAddress(be.getHost(), be.getBePort()); + BackendService.Client client = ClientPool.backendPool.borrowObject(addr); + client.publishTopicInfo(request); + LOG.info("publish topic info to be {} success, time cost={} ms", + be.getHost(), (System.currentTimeMillis() - beginTime)); + } catch (Exception e) { + LOG.warn("publish topic info to be {} error happens: , time cost={} ms", + be.getHost(), (System.currentTimeMillis() - beginTime), e); + } finally { + handler.onResponse(be); + } + } + } + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java b/fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java new file mode 100644 index 0000000000..2330700ce7 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java @@ -0,0 +1,39 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.publish; + +import org.apache.doris.catalog.Env; +import org.apache.doris.thrift.TPublishTopicRequest; +import org.apache.doris.thrift.TopicInfo; + +public class WorkloadGroupPublisher implements TopicPublisher { + + private Env env; + + public WorkloadGroupPublisher(Env env) { + this.env = env; + } + + @Override + public void getTopicInfo(TPublishTopicRequest req) { + for (TopicInfo topicInfo : env.getWorkloadGroupMgr() + .getPublishTopicInfo()) { + req.addToTopicList(topicInfo); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java index 68115c4c84..b8aebdccab 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java @@ -25,6 +25,8 @@ import org.apache.doris.common.proc.BaseProcResult; import org.apache.doris.persist.gson.GsonPostProcessable; import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.thrift.TPipelineWorkloadGroup; +import org.apache.doris.thrift.TTopicInfoType; +import org.apache.doris.thrift.TopicInfo; import com.google.common.base.Strings; import com.google.common.collect.ImmutableSet; @@ -290,6 +292,16 @@ public class WorkloadGroup implements Writable, GsonPostProcessable { return new TPipelineWorkloadGroup().setId(id).setName(name).setProperties(clonedHashMap).setVersion(version); } + public TopicInfo toTopicInfo() { + HashMap newHashMap = new HashMap<>(); + newHashMap.put("id", String.valueOf(id)); + TopicInfo topicInfo = new TopicInfo(); + topicInfo.setTopicType(TTopicInfoType.WORKLOAD_GROUP); + topicInfo.setInfoMap(newHashMap); + topicInfo.setTopicKey(name); + return topicInfo; + } + @Override public void write(DataOutput out) throws IOException { String json = GsonUtils.GSON.toJson(this); diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java index 1ec47d053f..5f0a52cab5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java @@ -40,6 +40,7 @@ import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.qe.ConnectContext; import org.apache.doris.thrift.TPipelineWorkloadGroup; import org.apache.doris.thrift.TUserIdentity; +import org.apache.doris.thrift.TopicInfo; import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; @@ -52,6 +53,7 @@ import org.apache.logging.log4j.Logger; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -133,6 +135,19 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable { return workloadGroups; } + public List getPublishTopicInfo() { + List workloadGroups = new ArrayList(); + readLock(); + try { + for (WorkloadGroup wg : idToWorkloadGroup.values()) { + workloadGroups.add(wg.toTopicInfo()); + } + } finally { + readUnlock(); + } + return workloadGroups; + } + public QueryQueue getWorkloadGroupQueryQueue(ConnectContext context) throws UserException { String groupName = getWorkloadGroupNameAndCheckPriv(context); readLock(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java b/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java index 3ab76732f6..a2f3867de2 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java @@ -32,6 +32,8 @@ import org.apache.doris.thrift.TExportTaskRequest; import org.apache.doris.thrift.TIngestBinlogRequest; import org.apache.doris.thrift.TIngestBinlogResult; import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TPublishTopicRequest; +import org.apache.doris.thrift.TPublishTopicResult; import org.apache.doris.thrift.TRoutineLoadTask; import org.apache.doris.thrift.TScanBatchResult; import org.apache.doris.thrift.TScanCloseParams; @@ -139,6 +141,11 @@ public class GenericPoolTest { return null; } + @Override + public TPublishTopicResult publishTopicInfo(TPublishTopicRequest request) throws TException { + return null; + } + @Override public TAgentResult makeSnapshot(TSnapshotRequest snapshotRequest) throws TException { // TODO Auto-generated method stub diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java index d3f5aeeacc..058e356a32 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java @@ -50,6 +50,8 @@ import org.apache.doris.thrift.TIngestBinlogRequest; import org.apache.doris.thrift.TIngestBinlogResult; import org.apache.doris.thrift.TMasterInfo; import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TPublishTopicRequest; +import org.apache.doris.thrift.TPublishTopicResult; import org.apache.doris.thrift.TRoutineLoadTask; import org.apache.doris.thrift.TScanBatchResult; import org.apache.doris.thrift.TScanCloseParams; @@ -299,6 +301,11 @@ public class MockedBackendFactory { return new TAgentResult(new TStatus(TStatusCode.OK)); } + @Override + public TPublishTopicResult publishTopicInfo(TPublishTopicRequest request) throws TException { + return new TPublishTopicResult(new TStatus(TStatusCode.OK)); + } + @Override public TStatus submitExportTask(TExportTaskRequest request) throws TException { return new TStatus(TStatusCode.OK); diff --git a/gensrc/thrift/AgentService.thrift b/gensrc/thrift/AgentService.thrift index 4b4c260b47..c161fc99e7 100644 --- a/gensrc/thrift/AgentService.thrift +++ b/gensrc/thrift/AgentService.thrift @@ -506,5 +506,4 @@ struct TTopicUpdate { struct TAgentPublishRequest { 1: required TAgentServiceVersion protocol_version 2: required list updates -} - +} \ No newline at end of file diff --git a/gensrc/thrift/BackendService.thrift b/gensrc/thrift/BackendService.thrift index 3d77eab4ca..a7a9c50aed 100644 --- a/gensrc/thrift/BackendService.thrift +++ b/gensrc/thrift/BackendService.thrift @@ -138,6 +138,24 @@ struct TIngestBinlogResult { 1: optional Status.TStatus status; } +enum TTopicInfoType { + WORKLOAD_GROUP +} + +struct TopicInfo { + 1: optional string topic_key + 2: required TTopicInfoType topic_type + 3: optional map info_map +} + +struct TPublishTopicRequest { + 1: required list topic_list +} + +struct TPublishTopicResult { + 1: required Status.TStatus status +} + service BackendService { // Called by coord to start asynchronous execution of plan fragment in backend. // Returns as soon as all incoming data streams have been set up. @@ -193,4 +211,6 @@ service BackendService { TCheckStorageFormatResult check_storage_format(); TIngestBinlogResult ingest_binlog(1: TIngestBinlogRequest ingest_binlog_request); + + TPublishTopicResult publish_topic_info(1:TPublishTopicRequest topic_request); }