[feature-wip](executor)Fe send topic info to be (#25798)

This commit is contained in:
wangbo
2023-10-26 15:52:48 +08:00
committed by GitHub
parent 2229d82acd
commit 1ba8a9bae4
25 changed files with 421 additions and 182 deletions

View File

@ -29,8 +29,8 @@
#include "agent/task_worker_pool.h"
#include "agent/topic_subscriber.h"
#include "agent/user_resource_listener.h"
#include "agent/utils.h"
#include "agent/workload_group_listener.h"
#include "common/logging.h"
#include "common/status.h"
#include "gutil/strings/substitute.h"
@ -39,10 +39,6 @@
#include "olap/snapshot_manager.h"
#include "runtime/exec_env.h"
namespace doris {
class TopicListener;
} // namespace doris
using std::string;
using std::vector;
@ -134,9 +130,10 @@ AgentServer::AgentServer(ExecEnv* exec_env, const TMasterInfo& master_info)
#if !defined(BE_TEST) && !defined(__APPLE__)
// Add subscriber here and register listeners
TopicListener* user_resource_listener = new UserResourceListener(exec_env, master_info);
LOG(INFO) << "Register user resource listener";
_topic_subscriber->register_listener(doris::TTopicType::type::RESOURCE, user_resource_listener);
std::unique_ptr<TopicListener> wg_listener = std::make_unique<WorkloadGroupListener>(exec_env);
LOG(INFO) << "Register workload group listener";
_topic_subscriber->register_listener(doris::TTopicInfoType::type::WORKLOAD_GROUP,
std::move(wg_listener));
#endif
}

View File

@ -23,10 +23,11 @@
#include <string>
#include <vector>
#include "agent/topic_subscriber.h"
namespace doris {
class TaskWorkerPool;
class TopicSubscriber;
class ExecEnv;
class TAgentPublishRequest;
class TAgentResult;
@ -52,6 +53,8 @@ public:
// [[deprecated]]
void publish_cluster_state(TAgentResult& agent_result, const TAgentPublishRequest& request);
TopicSubscriber* get_topic_subscriber() { return _topic_subscriber.get(); }
private:
DISALLOW_COPY_AND_ASSIGN(AgentServer);

View File

@ -17,19 +17,14 @@
#pragma once
#include <gen_cpp/AgentService_types.h>
#include <gen_cpp/BackendService_types.h>
namespace doris {
class TopicListener {
public:
virtual ~TopicListener() {}
// Deal with a single update
//
// Input parameters:
// protocol version: the version for the protocol, listeners should deal with the msg according to the protocol
// topic_update: single update
virtual void handle_update(const TAgentServiceVersion::type& protocol_version,
const TTopicUpdate& topic_update) = 0;
virtual void handle_topic_info(const TPublishTopicRequest& topic_request) = 0;
};
} // namespace doris

View File

@ -17,7 +17,7 @@
#include "agent/topic_subscriber.h"
#include <gen_cpp/AgentService_types.h>
#include <pthread.h>
#include <mutex>
#include <utility>
@ -28,38 +28,22 @@ namespace doris {
TopicSubscriber::TopicSubscriber() {}
TopicSubscriber::~TopicSubscriber() {
// Delete all listeners in the register
std::map<TTopicType::type, std::vector<TopicListener*>>::iterator it =
_registered_listeners.begin();
for (; it != _registered_listeners.end(); ++it) {
std::vector<TopicListener*>& listeners = it->second;
std::vector<TopicListener*>::iterator listener_it = listeners.begin();
for (; listener_it != listeners.end(); ++listener_it) {
delete *listener_it;
}
}
}
void TopicSubscriber::register_listener(TTopicType::type topic_type, TopicListener* listener) {
void TopicSubscriber::register_listener(TTopicInfoType::type topic_type,
std::unique_ptr<TopicListener> topic_listener) {
// Unique lock here to prevent access to listeners
std::lock_guard<std::shared_mutex> lock(_listener_mtx);
this->_registered_listeners[topic_type].push_back(listener);
this->_registered_listeners.emplace(topic_type, std::move(topic_listener));
}
void TopicSubscriber::handle_updates(const TAgentPublishRequest& agent_publish_request) {
// Shared lock here in order to avoid updates in listeners' map
void TopicSubscriber::handle_topic_info(const TPublishTopicRequest& topic_request) {
// NOTE(wb): if we found there is bottleneck for handle_topic_info by LOG(INFO)
// eg, update workload info may delay other listener, then we need add a thread here
// to handle_topic_info asynchronous
std::shared_lock lock(_listener_mtx);
// Currently, not deal with protocol version, the listener should deal with protocol version
const std::vector<TTopicUpdate>& topic_updates = agent_publish_request.updates;
std::vector<TTopicUpdate>::const_iterator topic_update_it = topic_updates.begin();
for (; topic_update_it != topic_updates.end(); ++topic_update_it) {
std::vector<TopicListener*>& listeners = this->_registered_listeners[topic_update_it->type];
std::vector<TopicListener*>::iterator listener_it = listeners.begin();
// Send the update to all listeners with protocol version.
for (; listener_it != listeners.end(); ++listener_it) {
(*listener_it)->handle_update(agent_publish_request.protocol_version, *topic_update_it);
}
LOG(INFO) << "begin handle topic info";
for (auto& listener_pair : _registered_listeners) {
listener_pair.second->handle_topic_info(topic_request);
LOG(INFO) << "handle topic " << listener_pair.first << " succ";
}
}
} // namespace doris

View File

@ -17,7 +17,8 @@
#pragma once
#include <gen_cpp/AgentService_types.h>
#include <gen_cpp/BackendService_types.h>
#include <glog/logging.h>
#include <map>
#include <shared_mutex>
@ -29,14 +30,15 @@ class TopicListener;
class TopicSubscriber {
public:
TopicSubscriber();
~TopicSubscriber();
// Put the topic type and listener to the map
void register_listener(TTopicType::type topic_type, TopicListener* listener);
// Handle all updates in the request
void handle_updates(const TAgentPublishRequest& agent_publish_request);
~TopicSubscriber() = default;
void register_listener(TTopicInfoType::type topic_type,
std::unique_ptr<TopicListener> topic_listener);
void handle_topic_info(const TPublishTopicRequest& topic_request);
private:
std::map<TTopicType::type, std::vector<TopicListener*>> _registered_listeners;
std::map<TTopicInfoType::type, std::unique_ptr<TopicListener>> _registered_listeners;
std::shared_mutex _listener_mtx;
};
} // namespace doris

View File

@ -1,105 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "agent/user_resource_listener.h"
#include <gen_cpp/AgentService_types.h>
#include <gen_cpp/FrontendService.h>
#include <gen_cpp/HeartbeatService_types.h>
#include <gen_cpp/MasterService_types.h>
#include <gen_cpp/Types_types.h>
#include <glog/logging.h>
#include <thrift/Thrift.h>
#include <thrift/transport/TTransportException.h>
#include <future>
#include <ostream>
#include <string>
#include <vector>
#include "common/config.h"
#include "common/status.h"
#include "runtime/client_cache.h"
#include "runtime/exec_env.h"
namespace doris {
using std::string;
using apache::thrift::TException;
using apache::thrift::transport::TTransportException;
// Initialize the resource to cgroups file mapping
// TRESOURCE_IOPS not mapped
UserResourceListener::UserResourceListener(ExecEnv* exec_env, const TMasterInfo& master_info)
: _master_info(master_info), _exec_env(exec_env) {}
UserResourceListener::~UserResourceListener() {}
void UserResourceListener::handle_update(const TAgentServiceVersion::type& protocol_version,
const TTopicUpdate& topic_update) {
std::vector<TTopicItem> updates = topic_update.updates;
if (updates.size() > 0) {
int64_t new_version = updates[0].int_value;
// Async call to update users resource method
auto res = std::async(std::launch::async, &UserResourceListener::update_users_resource,
this, new_version);
res.get();
}
}
void UserResourceListener::update_users_resource(int64_t new_version) {
// Call fe to get latest user resource
Status master_status;
// using 500ms as default timeout value
FrontendServiceConnection client(_exec_env->frontend_client_cache(),
_master_info.network_address, config::thrift_rpc_timeout_ms,
&master_status);
TFetchResourceResult new_fetched_resource;
if (!master_status.ok()) {
LOG(ERROR) << "Get frontend client failed, with address:"
<< _master_info.network_address.hostname << ":"
<< _master_info.network_address.port;
return;
}
try {
try {
client->fetchResource(new_fetched_resource);
} catch (TTransportException& e) {
// reopen the client and set timeout to 500ms
master_status = client.reopen(config::thrift_rpc_timeout_ms);
if (!master_status.ok()) {
LOG(WARNING) << "Reopen to get frontend client failed, with address:"
<< _master_info.network_address.hostname << ":"
<< _master_info.network_address.port << ", reason=" << e.what();
return;
}
LOG(WARNING) << "fetchResource from frontend failed"
<< ", reason=" << e.what();
client->fetchResource(new_fetched_resource);
}
} catch (TException& e) {
// Already try twice, log here
static_cast<void>(client.reopen(config::thrift_rpc_timeout_ms));
LOG(WARNING) << "retry to fetchResource from " << _master_info.network_address.hostname
<< ":" << _master_info.network_address.port << " failed:\n"
<< e.what();
return;
}
}
} // namespace doris

View File

@ -0,0 +1,43 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "agent/workload_group_listener.h"
#include "runtime/task_group/task_group.h"
#include "runtime/task_group/task_group_manager.h"
#include "util/mem_info.h"
#include "util/parse_util.h"
namespace doris {
void WorkloadGroupListener::handle_topic_info(const TPublishTopicRequest& topic_request) {
std::set<uint64_t> current_wg_ids;
for (const TopicInfo& topic_info : topic_request.topic_list) {
if (topic_info.topic_type != doris::TTopicInfoType::type::WORKLOAD_GROUP) {
continue;
}
int wg_id = 0;
auto iter2 = topic_info.info_map.find("id");
std::from_chars(iter2->second.c_str(), iter2->second.c_str() + iter2->second.size(), wg_id);
current_wg_ids.insert(wg_id);
}
_exec_env->task_group_manager()->delete_task_group_by_ids(current_wg_ids);
}
} // namespace doris

View File

@ -17,31 +17,21 @@
#pragma once
#include <gen_cpp/AgentService_types.h>
#include <stdint.h>
#include <glog/logging.h>
#include "agent/topic_listener.h"
#include "runtime/exec_env.h"
namespace doris {
class ExecEnv;
class TMasterInfo;
class UserResourceListener : public TopicListener {
class WorkloadGroupListener : public TopicListener {
public:
~UserResourceListener();
// Input parameters:
// root_cgroups_path: root cgroups allocated by admin to doris
UserResourceListener(ExecEnv* exec_env, const TMasterInfo& master_info);
// This method should be async
virtual void handle_update(const TAgentServiceVersion::type& protocol_version,
const TTopicUpdate& topic_update);
~WorkloadGroupListener() {}
WorkloadGroupListener(ExecEnv* exec_env) : _exec_env(exec_env) {}
void handle_topic_info(const TPublishTopicRequest& topic_request) override;
private:
const TMasterInfo& _master_info;
ExecEnv* _exec_env;
// Call cgroups mgr to update user's cgroups resource share
// Also refresh local user resource's cache
void update_users_resource(int64_t new_version);
};
} // namespace doris
} // namespace doris

View File

@ -189,6 +189,7 @@ void BlockedTaskScheduler::_make_task_run(std::list<PipelineTask*>& local_tasks,
TaskScheduler::~TaskScheduler() {
stop();
LOG(INFO) << "Task scheduler " << _name << " shutdown";
}
Status TaskScheduler::start() {
@ -363,7 +364,6 @@ void TaskScheduler::_try_close_task(PipelineTask* task, PipelineTaskState state,
void TaskScheduler::stop() {
if (!this->_shutdown.load()) {
this->_shutdown.store(true);
_blocked_task_scheduler->shutdown();
if (_task_queue) {
_task_queue->close();
}

View File

@ -122,6 +122,56 @@ Status TaskGroupManager::create_and_get_task_scheduler(uint64_t tg_id, std::stri
return Status::OK();
}
void TaskGroupManager::delete_task_group_by_ids(std::set<uint64_t> id_set) {
{
std::lock_guard<std::shared_mutex> w_lock(_group_mutex);
for (auto iter = _task_groups.begin(); iter != _task_groups.end();) {
uint64_t tg_id = iter->first;
if (id_set.find(tg_id) == id_set.end()) {
iter = _task_groups.erase(iter);
} else {
iter++;
}
}
}
// stop task sche may cost some time, so it should not be locked
// task scheduler is stoped in task scheduler's destructor
std::set<std::unique_ptr<doris::pipeline::TaskScheduler>> task_sche_to_del;
std::set<std::unique_ptr<vectorized::SimplifiedScanScheduler>> scan_task_sche_to_del;
{
std::lock_guard<std::mutex> lock(_task_scheduler_lock);
for (auto iter = _tg_sche_map.begin(); iter != _tg_sche_map.end();) {
uint64_t tg_id = iter->first;
if (id_set.find(tg_id) == id_set.end()) {
task_sche_to_del.insert(std::move(_tg_sche_map[tg_id]));
iter = _tg_sche_map.erase(iter);
} else {
iter++;
}
}
for (auto iter = _tg_scan_sche_map.begin(); iter != _tg_scan_sche_map.end();) {
uint64_t tg_id = iter->first;
if (id_set.find(tg_id) == id_set.end()) {
scan_task_sche_to_del.insert(std::move(_tg_scan_sche_map[tg_id]));
iter = _tg_scan_sche_map.erase(iter);
} else {
iter++;
}
}
for (auto iter = _cgroup_ctl_map.begin(); iter != _cgroup_ctl_map.end();) {
uint64_t tg_id = iter->first;
if (id_set.find(tg_id) == id_set.end()) {
iter = _cgroup_ctl_map.erase(iter);
} else {
iter++;
}
}
}
}
void TaskGroupManager::stop() {
for (auto& task_sche : _tg_sche_map) {
task_sche.second->stop();

View File

@ -54,6 +54,8 @@ public:
Status create_and_get_task_scheduler(uint64_t wg_id, std::string wg_name, int cpu_hard_limit,
ExecEnv* exec_env, QueryContext* query_ctx_ptr);
void delete_task_group_by_ids(std::set<uint64_t> id_set);
void stop();
private:

View File

@ -90,6 +90,11 @@ public:
_agent_server->publish_cluster_state(result, request);
}
void publish_topic_info(TPublishTopicResult& result,
const TPublishTopicRequest& topic_request) override {
_agent_server->get_topic_subscriber()->handle_topic_info(topic_request);
}
// DorisServer service
void exec_plan_fragment(TExecPlanFragmentResult& return_val,
const TExecPlanFragmentParams& params) override;

View File

@ -144,8 +144,14 @@ public:
_wg_name = wg_name;
}
~SimplifiedScanScheduler() {
stop();
LOG(INFO) << "Scanner sche " << _wg_name << " shutdown";
}
void stop() {
_is_stop.store(true);
_scan_task_queue->shutdown();
_scan_thread_pool->shutdown();
_scan_thread_pool->wait();
}
@ -169,8 +175,9 @@ private:
void _work() {
while (!_is_stop.load()) {
SimplifiedScanTask scan_task;
_scan_task_queue->blocking_get(&scan_task);
scan_task.scan_func();
if (_scan_task_queue->blocking_get(&scan_task)) {
scan_task.scan_func();
};
}
}

View File

@ -2268,4 +2268,6 @@ public class Config extends ConfigBase {
})
public static int sync_image_timeout_second = 300;
@ConfField(mutable = true, masterOnly = true)
public static int publish_topic_info_interval_ms = 30000; // 30s
}

View File

@ -113,6 +113,9 @@ import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.CountingDataOutputStream;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.publish.TopicPublisher;
import org.apache.doris.common.publish.TopicPublisherThread;
import org.apache.doris.common.publish.WorkloadGroupPublisher;
import org.apache.doris.common.util.Daemon;
import org.apache.doris.common.util.DynamicPartitionUtil;
import org.apache.doris.common.util.HttpURLUtil;
@ -492,6 +495,8 @@ public class Env {
private HiveTransactionMgr hiveTransactionMgr;
private TopicPublisherThread topicPublisherThread;
public List<TFrontendInfo> getFrontendInfos() {
List<TFrontendInfo> res = new ArrayList<>();
@ -726,6 +731,8 @@ public class Env {
this.binlogGcer = new BinlogGcer();
this.columnIdFlusher = new ColumnIdFlushDaemon();
this.queryCancelWorker = new QueryCancelWorker(systemInfo);
this.topicPublisherThread = new TopicPublisherThread(
"TopicPublisher", Config.publish_topic_info_interval_ms, systemInfo);
}
public static void destroyCheckpoint() {
@ -970,6 +977,10 @@ public class Env {
}
queryCancelWorker.start();
TopicPublisher wgPublisher = new WorkloadGroupPublisher(this);
topicPublisherThread.addToTopicPublisherList(wgPublisher);
topicPublisherThread.start();
}
// wait until FE is ready.

View File

@ -30,15 +30,24 @@ public class AckResponseHandler extends ResponseHandler {
this.listener = listener;
}
public AckResponseHandler(Collection<Backend> nodes) {
super(nodes);
this.listener = null;
}
@Override
public void onResponse(Backend node) {
super.onResponse(node);
listener.onResponse(node);
if (listener != null) {
listener.onResponse(node);
}
}
@Override
public void onFailure(Backend node, Throwable t) {
super.onFailure(node, t);
listener.onFailure(node, t);
if (listener != null) {
listener.onFailure(node, t);
}
}
}

View File

@ -0,0 +1,24 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.common.publish;
import org.apache.doris.thrift.TPublishTopicRequest;
public interface TopicPublisher {
public void getTopicInfo(TPublishTopicRequest req);
}

View File

@ -0,0 +1,121 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.common.publish;
import org.apache.doris.common.ClientPool;
import org.apache.doris.common.Config;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.BackendService;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPublishTopicRequest;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ExecutorService;
public class TopicPublisherThread extends MasterDaemon {
private static final Logger LOG = LogManager.getLogger(TopicPublisherThread.class);
private SystemInfoService clusterInfoService;
private ExecutorService executor = ThreadPoolManager
.newDaemonFixedThreadPool(6, 256, "topic-publish-thread", true);
public TopicPublisherThread(String name, long intervalMs,
SystemInfoService clusterInfoService) {
super(name, intervalMs);
this.clusterInfoService = clusterInfoService;
}
private List<TopicPublisher> topicPublisherList = new ArrayList<>();
public void addToTopicPublisherList(TopicPublisher topicPublisher) {
this.topicPublisherList.add(topicPublisher);
}
@Override
protected void runAfterCatalogReady() {
if (!Config.enable_workload_group) {
return;
}
LOG.info("begin publish topic info");
// step 1: get all publish topic info
TPublishTopicRequest request = new TPublishTopicRequest();
for (TopicPublisher topicPublisher : topicPublisherList) {
topicPublisher.getTopicInfo(request);
}
// step 2: publish topic info to all be
Collection<Backend> nodesToPublish = clusterInfoService.getIdToBackend().values();
AckResponseHandler handler = new AckResponseHandler(nodesToPublish);
for (Backend be : nodesToPublish) {
executor.submit(new TopicPublishWorker(request, be, handler));
}
try {
int timeoutMs = Config.publish_topic_info_interval_ms / 3 * 2;
if (!handler.awaitAllInMs(timeoutMs)) {
Backend[] backends = handler.pendingNodes();
if (backends.length > 0) {
LOG.warn("timed out waiting for all nodes to publish. (pending nodes: {})",
Arrays.toString(backends));
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public class TopicPublishWorker implements Runnable {
private TPublishTopicRequest request;
private Backend be;
private ResponseHandler handler;
public TopicPublishWorker(TPublishTopicRequest request, Backend node, ResponseHandler handler) {
this.request = request;
this.be = node;
this.handler = handler;
}
@Override
public void run() {
long beginTime = System.currentTimeMillis();
try {
TNetworkAddress addr = new TNetworkAddress(be.getHost(), be.getBePort());
BackendService.Client client = ClientPool.backendPool.borrowObject(addr);
client.publishTopicInfo(request);
LOG.info("publish topic info to be {} success, time cost={} ms",
be.getHost(), (System.currentTimeMillis() - beginTime));
} catch (Exception e) {
LOG.warn("publish topic info to be {} error happens: , time cost={} ms",
be.getHost(), (System.currentTimeMillis() - beginTime), e);
} finally {
handler.onResponse(be);
}
}
}
}

View File

@ -0,0 +1,39 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.common.publish;
import org.apache.doris.catalog.Env;
import org.apache.doris.thrift.TPublishTopicRequest;
import org.apache.doris.thrift.TopicInfo;
public class WorkloadGroupPublisher implements TopicPublisher {
private Env env;
public WorkloadGroupPublisher(Env env) {
this.env = env;
}
@Override
public void getTopicInfo(TPublishTopicRequest req) {
for (TopicInfo topicInfo : env.getWorkloadGroupMgr()
.getPublishTopicInfo()) {
req.addToTopicList(topicInfo);
}
}
}

View File

@ -25,6 +25,8 @@ import org.apache.doris.common.proc.BaseProcResult;
import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.thrift.TPipelineWorkloadGroup;
import org.apache.doris.thrift.TTopicInfoType;
import org.apache.doris.thrift.TopicInfo;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableSet;
@ -290,6 +292,16 @@ public class WorkloadGroup implements Writable, GsonPostProcessable {
return new TPipelineWorkloadGroup().setId(id).setName(name).setProperties(clonedHashMap).setVersion(version);
}
public TopicInfo toTopicInfo() {
HashMap<String, String> newHashMap = new HashMap<>();
newHashMap.put("id", String.valueOf(id));
TopicInfo topicInfo = new TopicInfo();
topicInfo.setTopicType(TTopicInfoType.WORKLOAD_GROUP);
topicInfo.setInfoMap(newHashMap);
topicInfo.setTopicKey(name);
return topicInfo;
}
@Override
public void write(DataOutput out) throws IOException {
String json = GsonUtils.GSON.toJson(this);

View File

@ -40,6 +40,7 @@ import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.thrift.TPipelineWorkloadGroup;
import org.apache.doris.thrift.TUserIdentity;
import org.apache.doris.thrift.TopicInfo;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
@ -52,6 +53,7 @@ import org.apache.logging.log4j.Logger;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@ -133,6 +135,19 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable {
return workloadGroups;
}
public List<TopicInfo> getPublishTopicInfo() {
List<TopicInfo> workloadGroups = new ArrayList();
readLock();
try {
for (WorkloadGroup wg : idToWorkloadGroup.values()) {
workloadGroups.add(wg.toTopicInfo());
}
} finally {
readUnlock();
}
return workloadGroups;
}
public QueryQueue getWorkloadGroupQueryQueue(ConnectContext context) throws UserException {
String groupName = getWorkloadGroupNameAndCheckPriv(context);
readLock();

View File

@ -32,6 +32,8 @@ import org.apache.doris.thrift.TExportTaskRequest;
import org.apache.doris.thrift.TIngestBinlogRequest;
import org.apache.doris.thrift.TIngestBinlogResult;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPublishTopicRequest;
import org.apache.doris.thrift.TPublishTopicResult;
import org.apache.doris.thrift.TRoutineLoadTask;
import org.apache.doris.thrift.TScanBatchResult;
import org.apache.doris.thrift.TScanCloseParams;
@ -139,6 +141,11 @@ public class GenericPoolTest {
return null;
}
@Override
public TPublishTopicResult publishTopicInfo(TPublishTopicRequest request) throws TException {
return null;
}
@Override
public TAgentResult makeSnapshot(TSnapshotRequest snapshotRequest) throws TException {
// TODO Auto-generated method stub

View File

@ -50,6 +50,8 @@ import org.apache.doris.thrift.TIngestBinlogRequest;
import org.apache.doris.thrift.TIngestBinlogResult;
import org.apache.doris.thrift.TMasterInfo;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPublishTopicRequest;
import org.apache.doris.thrift.TPublishTopicResult;
import org.apache.doris.thrift.TRoutineLoadTask;
import org.apache.doris.thrift.TScanBatchResult;
import org.apache.doris.thrift.TScanCloseParams;
@ -299,6 +301,11 @@ public class MockedBackendFactory {
return new TAgentResult(new TStatus(TStatusCode.OK));
}
@Override
public TPublishTopicResult publishTopicInfo(TPublishTopicRequest request) throws TException {
return new TPublishTopicResult(new TStatus(TStatusCode.OK));
}
@Override
public TStatus submitExportTask(TExportTaskRequest request) throws TException {
return new TStatus(TStatusCode.OK);

View File

@ -506,5 +506,4 @@ struct TTopicUpdate {
struct TAgentPublishRequest {
1: required TAgentServiceVersion protocol_version
2: required list<TTopicUpdate> updates
}
}

View File

@ -138,6 +138,24 @@ struct TIngestBinlogResult {
1: optional Status.TStatus status;
}
enum TTopicInfoType {
WORKLOAD_GROUP
}
struct TopicInfo {
1: optional string topic_key
2: required TTopicInfoType topic_type
3: optional map<string, string> info_map
}
struct TPublishTopicRequest {
1: required list<TopicInfo> topic_list
}
struct TPublishTopicResult {
1: required Status.TStatus status
}
service BackendService {
// Called by coord to start asynchronous execution of plan fragment in backend.
// Returns as soon as all incoming data streams have been set up.
@ -193,4 +211,6 @@ service BackendService {
TCheckStorageFormatResult check_storage_format();
TIngestBinlogResult ingest_binlog(1: TIngestBinlogRequest ingest_binlog_request);
TPublishTopicResult publish_topic_info(1:TPublishTopicRequest topic_request);
}