[http][manager] Add manager related http interface. (#6396)

Encapsulate some http interfaces for better management and maintenance of doris clusters.

The http interface includes getting cluster connection information, node information, node configuration information, batch modifying node configuration, and getting query profile.

For details, please refer to the document:  
`docs/zh-CN/administrator-guide/http-actions/fe/manager/`
This commit is contained in:
luozenglin
2021-08-10 10:58:31 +08:00
committed by GitHub
parent 636b30b1d1
commit 0930e89452
28 changed files with 3407 additions and 105 deletions

View File

@ -255,6 +255,10 @@ void Properties::set(const std::string& key, const std::string& val) {
file_conf_map.emplace(key, val);
}
void Properties::set_force(const std::string& key, const std::string& val) {
file_conf_map[key] = val;
}
bool Properties::dump(const std::string& conffile) {
std::vector<std::string> files = {conffile};
Status st = FileSystemUtil::remove_paths(files);
@ -386,19 +390,14 @@ bool persist_config(const std::string& field, const std::string& value) {
std::lock_guard<std::mutex> l(custom_conf_lock);
static const string conffile = string(getenv("DORIS_HOME")) + "/conf/be_custom.conf";
Status st = FileSystemUtil::create_file(conffile);
if (!st.ok()) {
LOG(WARNING) << "failed to create or open be_custom.conf. " << st.get_error_msg();
return false;
}
Properties tmp_props;
if (!tmp_props.load(conffile.c_str())) {
if (!tmp_props.load(conffile.c_str(), false)) {
LOG(WARNING) << "failed to load " << conffile;
return false;
}
tmp_props.set(field, value);
tmp_props.set_force(field, value);
return tmp_props.dump(conffile);
}
@ -430,5 +429,26 @@ Status set_config(const std::string& field, const std::string& value, bool need_
std::mutex* get_mutable_string_config_lock() { return &mutable_string_config_lock; }
std::vector<std::vector<std::string>> get_config_info() {
std::vector<std::vector<std::string>> configs;
std::lock_guard<std::mutex> lock(mutable_string_config_lock);
for (const auto& it : *full_conf_map) {
auto field_it = Register::_s_field_map->find(it.first);
if (field_it == Register::_s_field_map->end()) {
continue;
}
std::vector<std::string> _config;
_config.push_back(it.first);
_config.push_back(field_it->second.type);
_config.push_back(it.second);
_config.push_back(field_it->second.valmutable ? "true":"false");
configs.push_back(_config);
}
return configs;
}
} // namespace config
} // namespace doris

View File

@ -156,6 +156,8 @@ public:
void set(const std::string& key, const std::string& val);
void set_force(const std::string& key, const std::string& val);
// dump props to conf file
bool dump(const std::string& conffile);
@ -181,6 +183,8 @@ bool persist_config(const std::string& field, const std::string& value);
std::mutex* get_mutable_string_config_lock();
std::vector<std::vector<std::string>> get_config_info();
} // namespace config
} // namespace doris

View File

@ -49,7 +49,7 @@ add_library(Webserver STATIC
action/stream_load.cpp
action/meta_action.cpp
action/compaction_action.cpp
action/update_config_action.cpp
action/config_action.cpp
# action/multi_start.cpp
# action/multi_show.cpp
# action/multi_commit.cpp

View File

@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
#include "http/action/update_config_action.h"
#include "http/action/config_action.h"
#include <rapidjson/document.h>
#include <rapidjson/prettywriter.h>
@ -40,7 +40,34 @@ const static std::string HEADER_JSON = "application/json";
const static std::string PERSIST_PARAM = "persist";
void UpdateConfigAction::handle(HttpRequest* req) {
void ConfigAction::handle(HttpRequest* req) {
if (_type == ConfigActionType::UPDATE_CONFIG) {
handle_update_config(req);
} else if (_type == ConfigActionType::SHOW_CONFIG) {
handle_show_config(req);
}
}
void ConfigAction::handle_show_config(HttpRequest* req) {
std::vector<std::vector<std::string>> config_info = config::get_config_info();
rapidjson::StringBuffer str_buf;
rapidjson::Writer<rapidjson::StringBuffer> writer(str_buf);
writer.StartArray();
for (const auto& _config : config_info) {
writer.StartArray();
for (const std::string& config_filed : _config) {
writer.String(config_filed.c_str());
}
writer.EndArray();
}
writer.EndArray();
HttpChannel::send_reply(req, str_buf.GetString());
}
void ConfigAction::handle_update_config(HttpRequest* req) {
LOG(INFO) << req->debug_string();
Status s;

View File

@ -21,13 +21,27 @@
namespace doris {
enum ConfigActionType {
UPDATE_CONFIG = 1,
SHOW_CONFIG = 2,
};
// Update BE config.
class UpdateConfigAction : public HttpHandler {
class ConfigAction : public HttpHandler {
public:
UpdateConfigAction() {}
virtual ~UpdateConfigAction() {}
ConfigAction(ConfigActionType type): _type(type) {}
virtual ~ConfigAction() {}
void handle(HttpRequest* req) override;
private:
ConfigActionType _type;
void handle_update_config(HttpRequest* req);
void handle_show_config(HttpRequest* req);
};
} // namespace doris

View File

@ -31,7 +31,7 @@
#include "http/action/tablets_distribution_action.h"
#include "http/action/tablet_migration_action.h"
#include "http/action/tablets_info_action.h"
#include "http/action/update_config_action.h"
#include "http/action/config_action.h"
#include "http/default_path_handlers.h"
#include "http/download_action.h"
#include "http/ev_http_server.h"
@ -144,9 +144,14 @@ Status HttpService::start() {
_ev_http_server->register_handler(HttpMethod::GET, "/api/compaction/run_status",
run_status_compaction_action);
UpdateConfigAction* update_config_action = _pool.add(new UpdateConfigAction());
ConfigAction* update_config_action =
_pool.add(new ConfigAction(ConfigActionType::UPDATE_CONFIG));
_ev_http_server->register_handler(HttpMethod::POST, "/api/update_config", update_config_action);
ConfigAction* show_config_action =
_pool.add(new ConfigAction(ConfigActionType::SHOW_CONFIG));
_ev_http_server->register_handler(HttpMethod::GET, "/api/show_config", show_config_action);
_ev_http_server->start();
return Status::OK();
}

View File

@ -88,6 +88,15 @@ module.exports = [
title: "FE",
directoryPath: "fe/",
children: [
{
title: "MANAGER",
directoryPath: "manager/",
children: [
"cluster-action",
"node-action",
"query-profile-action",
],
},
"bootstrap-action",
"cancel-load-action",
"check-decommission-action",

View File

@ -87,6 +87,15 @@ module.exports = [
title: "FE",
directoryPath: "fe/",
children: [
{
title: "MANAGER",
directoryPath: "manager/",
children: [
"cluster-action",
"node-action",
"query-profile-action",
],
},
"bootstrap-action",
"cancel-load-action",
"check-decommission-action",

View File

@ -0,0 +1,77 @@
---
{
"title": "Cluster Action",
"language": "en"
}
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
# Cluster Action
## Request
`GET /rest/v2/manager/cluster/cluster_info/conn_info`
## Cluster Connection Information
`GET /rest/v2/manager/cluster/cluster_info/conn_info`
### Description
Used to get cluster http, mysql connection information.
### Response
```
{
"msg": "success",
"code": 0,
"data": {
"http": [
"fe_host:http_ip"
],
"mysql": [
"fe_host:query_ip"
]
},
"count": 0
}
```
### Examples
```
GET /rest/v2/manager/cluster/cluster_info/conn_info
Response:
{
"msg": "success",
"code": 0,
"data": {
"http": [
"127.0.0.1:8030"
],
"mysql": [
"127.0.0.1:9030"
]
},
"count": 0
}
```

View File

@ -0,0 +1,435 @@
---
{
"title": "Node Action",
"language": "en"
}
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
# Node Action
## Request
s
`GET /rest/v2/manager/node/frontends`
`GET /rest/v2/manager/node/backends`
`GET /rest/v2/manager/node/brokers`
`GET /rest/v2/manager/node/configuration_name`
`GET /rest/v2/manager/node/node_list`
`POST /rest/v2/manager/node/configuration_info`
`POST /rest/v2/manager/node/set_config/fe`
`POST /rest/v2/manager/node/set_config/be`
## Get information about fe, be, broker nodes
`GET /rest/v2/manager/node/frontends`
`GET /rest/v2/manager/node/backends`
`GET /rest/v2/manager/node/brokers`
### Description
Used to get cluster to get fe, be, broker node information.
### Response
```
frontends:
{
"msg": "success",
"code": 0,
"data": {
"column_names": [
"Name",
"IP",
"HostName",
"EditLogPort",
"HttpPort",
"QueryPort",
"RpcPort",
"Role",
"IsMaster",
"ClusterId",
"Join",
"Alive",
"ReplayedJournalId",
"LastHeartbeat",
"IsHelper",
"ErrMsg",
"Version"
],
"rows": [
[
...
]
]
},
"count": 0
}
```
```
backends:
{
"msg": "success",
"code": 0,
"data": {
"column_names": [
"BackendId",
"Cluster",
"IP",
"HostName",
"HeartbeatPort",
"BePort",
"HttpPort",
"BrpcPort",
"LastStartTime",
"LastHeartbeat",
"Alive",
"SystemDecommissioned",
"ClusterDecommissioned",
"TabletNum",
"DataUsedCapacity",
"AvailCapacity",
"TotalCapacity",
"UsedPct",
"MaxDiskUsedPct",
"ErrMsg",
"Version",
"Status"
],
"rows": [
[
...
]
]
},
"count": 0
}
```
```
brokers:
{
"msg": "success",
"code": 0,
"data": {
"column_names": [
"Name",
"IP",
"HostName",
"Port",
"Alive",
"LastStartTime",
"LastUpdateTime",
"ErrMsg"
],
"rows": [
[
...
]
]
},
"count": 0
}
```
## Get node configuration information
`GET /rest/v2/manager/node/configuration_name`
`GET /rest/v2/manager/node/node_list`
`POST /rest/v2/manager/node/configuration_info`
### Description
configuration_name Used to get the name of the node configuration item.
node_list Get the list of nodes.
configuration_info to get the node configuration details.
### Query parameters
`GET /rest/v2/manager/node/configuration_name`
none
`GET /rest/v2/manager/node/node_list`
none
`POST /rest/v2/manager/node/configuration_info`
* type
The value is fe or be, which specifies to get the configuration information of fe or the configuration information of be.
### Request body
`GET /rest/v2/manager/node/configuration_name`
none
`GET /rest/v2/manager/node/node_list`
none
`POST /rest/v2/manager/node/configuration_info`
```
{
"conf_name": [
""
],
"node": [
""
]
}
If no body is included, the parameters in the body use the default values.
conf_name specifies which configuration items to return, the default is all configuration items.
node is used to specify which node's configuration information is returned, the default is all fe nodes or be nodes configuration information.
```
### Response
`GET /rest/v2/manager/node/configuration_name`
```
{
"msg": "success",
"code": 0,
"data": {
"backend":[
""
],
"frontend":[
""
]
},
"count": 0
}
```
`GET /rest/v2/manager/node/node_list`
```
{
"msg": "success",
"code": 0,
"data": {
"backend": [
""
],
"frontend": [
""
]
},
"count": 0
}
```
`POST /rest/v2/manager/node/configuration_info?type=fe`
```
{
"msg": "success",
"code": 0,
"data": {
"column_names": [
"配置项",
"节点",
"节点类型",
"配置值类型",
"MasterOnly",
"配置值",
"可修改"
],
"rows": [
[
""
]
]
},
"count": 0
}
```
`POST /rest/v2/manager/node/configuration_info?type=be`
```
{
"msg": "success",
"code": 0,
"data": {
"column_names": [
"配置项",
"节点",
"节点类型",
"配置值类型",
"配置值",
"可修改"
],
"rows": [
[
""
]
]
},
"count": 0
}
```
### Examples
1. Get the fe agent_task_resend_wait_time_ms configuration information:
POST /rest/v2/manager/node/configuration_info?type=fe
body:
```
{
"conf_name":[
"agent_task_resend_wait_time_ms"
]
}
```
Response:
```
{
"msg": "success",
"code": 0,
"data": {
"column_names": [
"配置项",
"节点",
"节点类型",
"配置值类型",
"MasterOnly",
"配置值",
"可修改"
],
"rows": [
[
"agent_task_resend_wait_time_ms",
"127.0.0.1:8030",
"FE",
"long",
"true",
"50000",
"true"
]
]
},
"count": 0
}
```
## Modify configuration values
`POST /rest/v2/manager/node/set_config/fe`
`POST /rest/v2/manager/node/set_config/be`
### Description
Used to modify fe or be node configuration values
### Request body
```
{
"config_name":{
"node":[
""
],
"value":"",
"persist":
}
}
config_name is the corresponding configuration item.
node is a keyword indicating the list of nodes to be modified;
value is the value of the configuration.
persist is true for permanent modification and false for temporary modification. persist means permanent modification, false means temporary modification. permanent modification takes effect after reboot, temporary modification fails after reboot.
```
### Response
`GET /rest/v2/manager/node/configuration_name`
```
{
"msg": "",
"code": 0,
"data": {
"failed":[
{
"config_name":"name",
"value"="",
"node":"",
"err_info":""
}
]
},
"count": 0
}
failed Indicates a configuration message that failed to be modified.
```
### Examples
1. Modify the agent_task_resend_wait_time_ms and alter_table_timeout_second configuration values in the fe 127.0.0.1:8030 node:
POST /rest/v2/manager/node/set_config/fe
body:
```
{
"agent_task_resend_wait_time_ms":{
"node":[
"127.0.0.1:8030"
],
"value":"10000",
"persist":"true"
},
"alter_table_timeout_second":{
"node":[
"127.0.0.1:8030"
],
"value":"true",
"persist":"true"
}
}
```
Response:
```
{
"msg": "success",
"code": 0,
"data": {
"failed": [
{
"config_name": "alter_table_timeout_second",
"node": "10.81.85.89:8837",
"err_info": "Unsupported configuration value type.",
"value": "true"
}
]
},
"count": 0
}
gent_task_resend_wait_time_ms configuration value modified successfully, alter_table_timeout_second modification failed.
```

View File

@ -0,0 +1,308 @@
---
{
"title": "Query Profile Action",
"language": "en"
}
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
# Query Profile Action
## Request
`GET /rest/v2/manager/query/query_info`
`GET /rest/v2/manager/query/sql/{query_id}`
`GET /rest/v2/manager/query/profile/text/{query_id}`
`GET /rest/v2/manager/query/profile/fragments/{query_id}`
`GET /rest/v2/manager/query/profile/graph/{query_id}`
## Get the query information
`GET /rest/v2/manager/query/query_info`
### Description
Gets information about select queries for all fe nodes in the cluster.
### Query parameters
* `query_id`
Optional, specifies the query ID of the query to be returned, default returns information for all queries.
* `search`
Optional, specifies that query information containing strings is returned, currently only string matches are performed.
* `is_all_node`
Optional, if true, returns query information for all fe nodes, if false, returns query information for the current fe node. The default is true.
### Response
```
{
"msg": "success",
"code": 0,
"data": {
"column_names": [
"Query ID",
"FE节点",
"查询用户",
"执行数据库",
"Sql",
"查询类型",
"开始时间",
"结束时间",
"执行时长",
"状态"
],
"rows": [
[
...
]
]
},
"count": 0
}
```
### Examples
```
GET /rest/v2/manager/query/query_info
{
"msg": "success",
"code": 0,
"data": {
"column_names": [
"Query ID",
"FE节点",
"查询用户",
"执行数据库",
"Sql",
"查询类型",
"开始时间",
"结束时间",
"执行时长",
"状态"
],
"rows": [
[
"d7c93d9275334c35-9e6ac5f295a7134b",
"127.0.0.1:8030",
"root",
"default_cluster:testdb",
"select c.id, c.name, p.age, p.phone, c.date, c.cost from cost c join people p on c.id = p.id where p.age > 20 order by c.id",
"Query",
"2021-07-29 16:59:12",
"2021-07-29 16:59:12",
"109ms",
"EOF"
]
]
},
"count": 0
}
```
## Get the sql and text profile for the specified query
`GET /rest/v2/manager/query/sql/{query_id}`
`GET /rest/v2/manager/query/profile/text/{query_id}`
### Description
Get the sql and profile text for the specified query id.
### Path parameters
* `query_id`
The query id.
### Query parameters
* `is_all_node`
Optional, if true then query for the specified query id in all fe nodes, if false then query for the specified query id in the currently connected fe nodes. The default is true.
### Response
```
{
"msg": "success",
"code": 0,
"data": {
"sql": ""
},
"count": 0
}
```
```
{
"msg": "success",
"code": 0,
"data": {
"profile": ""
},
"count": 0
}
```
### Examples
1. get sql.
```
GET /rest/v2/manager/query/sql/d7c93d9275334c35-9e6ac5f295a7134b
Response:
{
"msg": "success",
"code": 0,
"data": {
"sql": "select c.id, c.name, p.age, p.phone, c.date, c.cost from cost c join people p on c.id = p.id where p.age > 20 order by c.id"
},
"count": 0
}
```
## Get the specified query fragment and instance information
`GET /rest/v2/manager/query/profile/fragments/{query_id}`
### Description
Get the fragment name, instance id and execution time for the specified query id.
### Path parameters
* `query_id`
The query id.
### Query parameters
* `is_all_node`
Optional, if true then query for the specified query id in all fe nodes, if false then query for the specified query id in the currently connected fe nodes. The default is true.
### Response
```
{
"msg": "success",
"code": 0,
"data": [
{
"fragment_id": "",
"time": "",
"instance_id": {
"": ""
}
}
],
"count": 0
}
```
### Examples
```
GET /rest/v2/manager/query/profile/fragments/d7c93d9275334c35-9e6ac5f295a7134b
Response:
{
"msg": "success",
"code": 0,
"data": [
{
"fragment_id": "0",
"time": "36.169ms",
"instance_id": {
"d7c93d9275334c35-9e6ac5f295a7134e": "36.169ms"
}
},
{
"fragment_id": "1",
"time": "20.710ms",
"instance_id": {
"d7c93d9275334c35-9e6ac5f295a7134c": "20.710ms"
}
},
{
"fragment_id": "2",
"time": "7.83ms",
"instance_id": {
"d7c93d9275334c35-9e6ac5f295a7134d": "7.83ms"
}
}
],
"count": 0
}
```
## Get the specified query id tree profile information
`GET /rest/v2/manager/query/profile/graph/{query_id}`
### Description
Get the tree profile information of the specified query id, same as `show query profile` command.
### Path parameters
* `query_id`
The query id.
### Query parameters
* `fragment_id` and `instance_id`
Optional, both parameters must be specified or not.
If both are not specified, a simple tree of profiles is returned, equivalent to `show query profile '/query_id'`;
If both are specified, a detailed profile tree is returned, which is equivalent to `show query profile '/query_id/fragment_id/instance_id'`.
* `is_all_node`
Optional, if true then query information about the specified query id in all fe nodes, if false then query information about the specified query id in the currently connected fe nodes. The default is true.
### Response
```
{
"msg": "success",
"code": 0,
"data": {
"graph":""
},
"count": 0
}
```

View File

@ -48,6 +48,12 @@ None
Whether to persist the modified configuration. The default is false, which means it is not persisted. If it is true, the modified configuration item will be written into the `fe_custom.conf` file and will still take effect after FE is restarted.
* `reset_persist`
Whether or not to clear the original persist configuration only takes effect when the persist parameter is true. For compatibility with the original version, reset_persist defaults to true.
If persist is set to true and reset_persist is not set or reset_persist is true, the configuration in the `fe_custom.conf` file will be cleared before this modified configuration is written to `fe_custom.conf`.
If persist is set to true and reset_persist is false, this modified configuration item will be incrementally added to `fe_custom.conf`.
## Request body
None
@ -60,57 +66,85 @@ None
"code": 0,
"data": {
"set": {
"storage_min_left_capacity_bytes": "1024",
"qe_max_connection": "2048"
"key": "value"
},
"err": {
"replica_ack_policy": "SIMPLE_MAJORITY"
}
"err": [
{
"config_name": "",
"config_value": "",
"err_info": ""
}
],
"persist":""
},
"count": 0
}
```
The `set` field indicates the successfully set configuration. The `err` field indicates the configuration that failed to be set.
The `set` field indicates the successfully set configuration. The `err` field indicates the configuration that failed to be set. The `persist` field indicates persistent information.
## Examples
1. Set the two configuration values of `max_bytes_per_broker_scanner` and `max_broker_concurrency`.
1. Set the values of `storage_min_left_capacity_bytes`, `replica_ack_policy` and `agent_task_resend_wait_time_ms`.
```
GET /api/_set_config?max_bytes_per_broker_scanner=21474836480&max_broker_concurrency=20
GET /api/_set_config?storage_min_left_capacity_bytes=1024&replica_ack_policy=SIMPLE_MAJORITY&agent_task_resend_wait_time_ms=true
Response:
{
"msg": "success",
"code": 0,
"data": {
"set": {
"max_bytes_per_broker_scanner": "21474836480",
"max_broker_concurrency": "20"
},
"err": {}
},
"count": 0
"msg": "success",
"code": 0,
"data": {
"set": {
"storage_min_left_capacity_bytes": "1024"
},
"err": [
{
"config_name": "replica_ack_policy",
"config_value": "SIMPLE_MAJORITY",
"err_info": "Not support dynamic modification."
},
{
"config_name": "agent_task_resend_wait_time_ms",
"config_value": "true",
"err_info": "Unsupported configuration value type."
}
],
"persist": ""
},
"count": 0
}
storage_min_left_capacity_bytes Successfully;
replica_ack_policy Failed, because the configuration item does not support dynamic modification.
agent_task_resend_wait_time_ms Failed, failed to set the boolean type because the configuration item is of type long.
```
2. Set `max_bytes_per_broker_scanner` and persist it.
```
GET /api/_set_config?max_bytes_per_broker_scanner=21474836480&persist=true
GET /api/_set_config?max_bytes_per_broker_scanner=21474836480&persist=true&reset_persist=false
Response:
{
"msg": "success",
"code": 0,
"data": {
"set": {
"max_bytes_per_broker_scanner": "21474836480"
},
"err": {},
"persist": "ok"
},
"count": 0
"msg": "success",
"code": 0,
"data": {
"set": {
"max_bytes_per_broker_scanner": "21474836480"
},
"err": [],
"persist": "ok"
},
"count": 0
}
```
The fe/conf directory generates the fe_custom.conf file:
```
#THIS IS AN AUTO GENERATED CONFIG FILE.
#You can modify this file manually, and the configurations in this file
#will overwrite the configurations in fe.conf
#Wed Jul 28 12:43:14 CST 2021
max_bytes_per_broker_scanner=21474836480
```

View File

@ -0,0 +1,77 @@
---
{
"title": "Cluster Action",
"language": "zh-CN"
}
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
# Cluster Action
## Request
`GET /rest/v2/manager/cluster/cluster_info/conn_info`
## 集群连接信息
`GET /rest/v2/manager/cluster/cluster_info/conn_info`
### Description
用于获取集群http、mysql连接信息。
### Response
```
{
"msg": "success",
"code": 0,
"data": {
"http": [
"fe_host:http_ip"
],
"mysql": [
"fe_host:query_ip"
]
},
"count": 0
}
```
### Examples
```
GET /rest/v2/manager/cluster/cluster_info/conn_info
Response:
{
"msg": "success",
"code": 0,
"data": {
"http": [
"127.0.0.1:8030"
],
"mysql": [
"127.0.0.1:9030"
]
},
"count": 0
}
```

View File

@ -0,0 +1,435 @@
---
{
"title": "Node Action",
"language": "zh-CN"
}
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
# Node Action
## Request
`GET /rest/v2/manager/node/frontends`
`GET /rest/v2/manager/node/backends`
`GET /rest/v2/manager/node/brokers`
`GET /rest/v2/manager/node/configuration_name`
`GET /rest/v2/manager/node/node_list`
`POST /rest/v2/manager/node/configuration_info`
`POST /rest/v2/manager/node/set_config/fe`
`POST /rest/v2/manager/node/set_config/be`
## 获取fe, be, broker节点信息
`GET /rest/v2/manager/node/frontends`
`GET /rest/v2/manager/node/backends`
`GET /rest/v2/manager/node/brokers`
### Description
用于获取集群获取fe, be, broker节点信息。
### Response
```
frontends:
{
"msg": "success",
"code": 0,
"data": {
"column_names": [
"Name",
"IP",
"HostName",
"EditLogPort",
"HttpPort",
"QueryPort",
"RpcPort",
"Role",
"IsMaster",
"ClusterId",
"Join",
"Alive",
"ReplayedJournalId",
"LastHeartbeat",
"IsHelper",
"ErrMsg",
"Version"
],
"rows": [
[
...
]
]
},
"count": 0
}
```
```
backends:
{
"msg": "success",
"code": 0,
"data": {
"column_names": [
"BackendId",
"Cluster",
"IP",
"HostName",
"HeartbeatPort",
"BePort",
"HttpPort",
"BrpcPort",
"LastStartTime",
"LastHeartbeat",
"Alive",
"SystemDecommissioned",
"ClusterDecommissioned",
"TabletNum",
"DataUsedCapacity",
"AvailCapacity",
"TotalCapacity",
"UsedPct",
"MaxDiskUsedPct",
"ErrMsg",
"Version",
"Status"
],
"rows": [
[
...
]
]
},
"count": 0
}
```
```
brokers:
{
"msg": "success",
"code": 0,
"data": {
"column_names": [
"Name",
"IP",
"HostName",
"Port",
"Alive",
"LastStartTime",
"LastUpdateTime",
"ErrMsg"
],
"rows": [
[
...
]
]
},
"count": 0
}
```
## 获取节点配置信息
`GET /rest/v2/manager/node/configuration_name`
`GET /rest/v2/manager/node/node_list`
`POST /rest/v2/manager/node/configuration_info`
### Description
configuration_name 用于获取节点配置项名称。
node_list 用于获取节点列表。
configuration_info 用于获取节点配置详细信息。
### Query parameters
`GET /rest/v2/manager/node/configuration_name`
`GET /rest/v2/manager/node/node_list`
`POST /rest/v2/manager/node/configuration_info`
* type
值为 fe 或 be, 用于指定获取fe的配置信息或be的配置信息。
### Request body
`GET /rest/v2/manager/node/configuration_name`
`GET /rest/v2/manager/node/node_list`
`POST /rest/v2/manager/node/configuration_info`
```
{
"conf_name": [
""
],
"node": [
""
]
}
若不带body,body中的参数都使用默认值。
conf_name 用于指定返回哪些配置项的信息, 默认返回所有配置项信息;
node 用于指定返回哪些节点的配置项信息,默认为全部fe节点或be节点配置项信息。
```
### Response
`GET /rest/v2/manager/node/configuration_name`
```
{
"msg": "success",
"code": 0,
"data": {
"backend":[
""
],
"frontend":[
""
]
},
"count": 0
}
```
`GET /rest/v2/manager/node/node_list`
```
{
"msg": "success",
"code": 0,
"data": {
"backend": [
""
],
"frontend": [
""
]
},
"count": 0
}
```
`POST /rest/v2/manager/node/configuration_info?type=fe`
```
{
"msg": "success",
"code": 0,
"data": {
"column_names": [
"配置项",
"节点",
"节点类型",
"配置值类型",
"MasterOnly",
"配置值",
"可修改"
],
"rows": [
[
""
]
]
},
"count": 0
}
```
`POST /rest/v2/manager/node/configuration_info?type=be`
```
{
"msg": "success",
"code": 0,
"data": {
"column_names": [
"配置项",
"节点",
"节点类型",
"配置值类型",
"配置值",
"可修改"
],
"rows": [
[
""
]
]
},
"count": 0
}
```
### Examples
1. 获取fe agent_task_resend_wait_time_ms 配置项信息:
POST /rest/v2/manager/node/configuration_info?type=fe
body:
```
{
"conf_name":[
"agent_task_resend_wait_time_ms"
]
}
```
Response:
```
{
"msg": "success",
"code": 0,
"data": {
"column_names": [
"配置项",
"节点",
"节点类型",
"配置值类型",
"MasterOnly",
"配置值",
"可修改"
],
"rows": [
[
"agent_task_resend_wait_time_ms",
"127.0.0.1:8030",
"FE",
"long",
"true",
"50000",
"true"
]
]
},
"count": 0
}
```
## 修改配置值
`POST /rest/v2/manager/node/set_config/fe`
`POST /rest/v2/manager/node/set_config/be`
### Description
用于修改fe或be节点配置值
### Request body
```
{
"config_name":{
"node":[
""
],
"value":"",
"persist":
}
}
config_name为对应的配置项;
node为关键字,表示要修改的节点列表;
value为配置的值;
persist为 true 表示永久修改, false 表示临时修改。永久修改重启后能生效, 临时修改重启后失效。
```
### Response
`GET /rest/v2/manager/node/configuration_name`
```
{
"msg": "",
"code": 0,
"data": {
"failed":[
{
"config_name":"name",
"value"="",
"node":"",
"err_info":""
}
]
},
"count": 0
}
failed 表示修改失败的配置信息。
```
### Examples
1. 修改fe 127.0.0.1:8030 节点中 agent_task_resend_wait_time_ms 和alter_table_timeout_second 配置值:
POST /rest/v2/manager/node/set_config/fe
body:
```
{
"agent_task_resend_wait_time_ms":{
"node":[
"127.0.0.1:8030"
],
"value":"10000",
"persist":"true"
},
"alter_table_timeout_second":{
"node":[
"127.0.0.1:8030"
],
"value":"true",
"persist":"true"
}
}
```
Response:
```
{
"msg": "success",
"code": 0,
"data": {
"failed": [
{
"config_name": "alter_table_timeout_second",
"node": "10.81.85.89:8837",
"err_info": "Unsupported configuration value type.",
"value": "true"
}
]
},
"count": 0
}
agent_task_resend_wait_time_ms 配置值修改成功,alter_table_timeout_second 修改失败。
```

View File

@ -0,0 +1,308 @@
---
{
"title": "Query Profile Action",
"language": "zh-CN"
}
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
# Query Profile Action
## Request
`GET /rest/v2/manager/query/query_info`
`GET /rest/v2/manager/query/sql/{query_id}`
`GET /rest/v2/manager/query/profile/text/{query_id}`
`GET /rest/v2/manager/query/profile/fragments/{query_id}`
`GET /rest/v2/manager/query/profile/graph/{query_id}`
## 获取查询信息
`GET /rest/v2/manager/query/query_info`
### Description
可获取集群所有 fe 节点 select 查询信息。
### Query parameters
* `query_id`
可选,指定返回查询的queryID, 默认返回所有查询的信息。
* `search`
可选,指定返回包含字符串的查询信息,目前仅进行字符串匹配。
* `is_all_node`
可选,若为 true 则返回所有fe节点的查询信息,若为 false 则返回当前fe节点的查询信息。默认为true。
### Response
```
{
"msg": "success",
"code": 0,
"data": {
"column_names": [
"Query ID",
"FE节点",
"查询用户",
"执行数据库",
"Sql",
"查询类型",
"开始时间",
"结束时间",
"执行时长",
"状态"
],
"rows": [
[
...
]
]
},
"count": 0
}
```
### Examples
```
GET /rest/v2/manager/query/query_info
{
"msg": "success",
"code": 0,
"data": {
"column_names": [
"Query ID",
"FE节点",
"查询用户",
"执行数据库",
"Sql",
"查询类型",
"开始时间",
"结束时间",
"执行时长",
"状态"
],
"rows": [
[
"d7c93d9275334c35-9e6ac5f295a7134b",
"127.0.0.1:8030",
"root",
"default_cluster:testdb",
"select c.id, c.name, p.age, p.phone, c.date, c.cost from cost c join people p on c.id = p.id where p.age > 20 order by c.id",
"Query",
"2021-07-29 16:59:12",
"2021-07-29 16:59:12",
"109ms",
"EOF"
]
]
},
"count": 0
}
```
## 获取指定查询的sql和文本profile
`GET /rest/v2/manager/query/sql/{query_id}`
`GET /rest/v2/manager/query/profile/text/{query_id}`
### Description
用于获取指定query id的sql和profile文本。
### Path parameters
* `query_id`
query id。
### Query parameters
* `is_all_node`
可选,若为 true 则在所有fe节点中查询指定query id的信息,若为 false 则在当前连接的fe节点中查询指定query id的信息。默认为true。
### Response
```
{
"msg": "success",
"code": 0,
"data": {
"sql": ""
},
"count": 0
}
```
```
{
"msg": "success",
"code": 0,
"data": {
"profile": ""
},
"count": 0
}
```
### Examples
1. 获取 sql:
```
GET /rest/v2/manager/query/sql/d7c93d9275334c35-9e6ac5f295a7134b
Response:
{
"msg": "success",
"code": 0,
"data": {
"sql": "select c.id, c.name, p.age, p.phone, c.date, c.cost from cost c join people p on c.id = p.id where p.age > 20 order by c.id"
},
"count": 0
}
```
## 获取指定查询fragment和instance信息
`GET /rest/v2/manager/query/profile/fragments/{query_id}`
### Description
用于获取指定query id的fragment名称,instance id和执行时长。
### Path parameters
* `query_id`
query id。
### Query parameters
* `is_all_node`
可选,若为 true 则在所有fe节点中查询指定query id的信息,若为 false 则在当前连接的fe节点中查询指定query id的信息。默认为true。
### Response
```
{
"msg": "success",
"code": 0,
"data": [
{
"fragment_id": "",
"time": "",
"instance_id": {
"": ""
}
}
],
"count": 0
}
```
### Examples
```
GET /rest/v2/manager/query/profile/fragments/d7c93d9275334c35-9e6ac5f295a7134b
Response:
{
"msg": "success",
"code": 0,
"data": [
{
"fragment_id": "0",
"time": "36.169ms",
"instance_id": {
"d7c93d9275334c35-9e6ac5f295a7134e": "36.169ms"
}
},
{
"fragment_id": "1",
"time": "20.710ms",
"instance_id": {
"d7c93d9275334c35-9e6ac5f295a7134c": "20.710ms"
}
},
{
"fragment_id": "2",
"time": "7.83ms",
"instance_id": {
"d7c93d9275334c35-9e6ac5f295a7134d": "7.83ms"
}
}
],
"count": 0
}
```
## 获取指定query id树状profile信息
`GET /rest/v2/manager/query/profile/graph/{query_id}`
### Description
获取指定query id树状profile信息,同 `show query profile` 指令。
### Path parameters
* `query_id`
query id。
### Query parameters
* `fragment_id` 和 `instance_id`
可选,这两个参数需同时指定或同时不指定。
同时不指定则返回profile 简易树形图,相当于`show query profile '/query_id'`;
同时指定则返回指定instance详细profile树形图,相当于`show query profile '/query_id/fragment_id/instance_id'`.
* `is_all_node`
可选,若为 true 则在所有fe节点中查询指定query id的信息,若为 false 则在当前连接的fe节点中查询指定query id的信息。默认为true。
### Response
```
{
"msg": "success",
"code": 0,
"data": {
"graph":""
},
"count": 0
}
```

View File

@ -48,6 +48,12 @@ under the License.
是否要将修改的配置持久化。默认为 false,即不持久化。如果为 true,这修改后的配置项会写入 `fe_custom.conf` 文件中,并在 FE 重启后仍会生效。
* `reset_persist`
是否要清空原来的持久化配置,只在 persist 参数为 true 时生效。为了兼容原来的版本,reset_persist 默认为 true。
如果 persist 设为 true,不设置 reset_persist 或 reset_persist 为 true,将先清空`fe_custom.conf`文件中的配置再将本次修改的配置写入`fe_custom.conf`
如果 persist 设为 true,reset_persist 为 false,本次修改的配置项将会增量添加到`fe_custom.conf`
## Request body
@ -60,56 +66,84 @@ under the License.
"code": 0,
"data": {
"set": {
"storage_min_left_capacity_bytes": "1024",
"qe_max_connection": "2048"
"key": "value"
},
"err": {
"replica_ack_policy": "SIMPLE_MAJORITY"
}
"err": [
{
"config_name": "",
"config_value": "",
"err_info": ""
}
],
"persist":""
},
"count": 0
}
```
`set` 字段表示设置成功的配置。`err` 字段表示设置失败的配置。
`set` 字段表示设置成功的配置。`err` 字段表示设置失败的配置。 `persist` 字段表示持久化信息。
## Examples
1. 设置 `max_bytes_per_broker_scanner``max_broker_concurrency` 个配置的值。
1. 设置 `storage_min_left_capacity_bytes``replica_ack_policy``agent_task_resend_wait_time_ms` 个配置的值。
```
GET /api/_set_config?max_bytes_per_broker_scanner=21474836480&max_broker_concurrency=20
GET /api/_set_config?storage_min_left_capacity_bytes=1024&replica_ack_policy=SIMPLE_MAJORITY&agent_task_resend_wait_time_ms=true
Response:
{
"msg": "success",
"code": 0,
"data": {
"set": {
"max_bytes_per_broker_scanner": "21474836480",
"max_broker_concurrency": "20"
},
"err": {}
},
"count": 0
"msg": "success",
"code": 0,
"data": {
"set": {
"storage_min_left_capacity_bytes": "1024"
},
"err": [
{
"config_name": "replica_ack_policy",
"config_value": "SIMPLE_MAJORITY",
"err_info": "Not support dynamic modification."
},
{
"config_name": "agent_task_resend_wait_time_ms",
"config_value": "true",
"err_info": "Unsupported configuration value type."
}
],
"persist": ""
},
"count": 0
}
storage_min_left_capacity_bytes 设置成功;
replica_ack_policy 设置失败,原因是该配置项不支持动态修改;
agent_task_resend_wait_time_ms 设置失败,因为该配置项类型为long, 设置boolean类型失败。
```
2. 设置 `max_bytes_per_broker_scanner` 并持久化
```
GET /api/_set_config?max_bytes_per_broker_scanner=21474836480&persist=true
GET /api/_set_config?max_bytes_per_broker_scanner=21474836480&persist=true&reset_persist=false
Response:
{
"msg": "success",
"code": 0,
"data": {
"set": {
"max_bytes_per_broker_scanner": "21474836480"
},
"err": {},
"persist": "ok"
},
"count": 0
"msg": "success",
"code": 0,
"data": {
"set": {
"max_bytes_per_broker_scanner": "21474836480"
},
"err": [],
"persist": "ok"
},
"count": 0
}
```
fe/conf 目录生成fe_custom.conf:
```
#THIS IS AN AUTO GENERATED CONFIG FILE.
#You can modify this file manually, and the configurations in this file
#will overwrite the configurations in fe.conf
#Wed Jul 28 12:43:14 CST 2021
max_bytes_per_broker_scanner=21474836480
```

View File

@ -199,7 +199,9 @@ public class ConfigBase {
f.setDouble(null, Double.parseDouble(confVal));
break;
case "boolean":
f.setBoolean(null, Boolean.parseBoolean(confVal));
if (isBoolean(confVal)) {
f.setBoolean(null, Boolean.parseBoolean(confVal));
}
break;
case "String":
f.set(null, confVal);
@ -235,7 +237,9 @@ public class ConfigBase {
case "boolean[]":
boolean[] ba = new boolean[sa.length];
for (int i = 0; i < ba.length; i++) {
ba[i] = Boolean.parseBoolean(sa[i]);
if (isBoolean(sa[i])) {
ba[i] = Boolean.parseBoolean(sa[i]);
}
}
f.set(null, ba);
break;
@ -247,6 +251,13 @@ public class ConfigBase {
}
}
private static boolean isBoolean(String s) {
if (s.equalsIgnoreCase("true") || s.equalsIgnoreCase("false")) {
return true;
}
throw new IllegalArgumentException("type mismatch");
}
public static Map<String, Field> getAllMutableConfigs() {
Map<String, Field> mutableConfigs = Maps.newHashMap();
Field fields[] = ConfigBase.confClass.getFields();
@ -357,14 +368,13 @@ public class ConfigBase {
return anno.masterOnly();
}
// overwrite configs to customConfFile.
// use synchronized to make sure only one thread modify this file
public synchronized static void persistConfig(Map<String, String> customConf) throws IOException {
public synchronized static void persistConfig(Map<String, String> customConf, boolean resetPersist) throws IOException {
File file = new File(customConfFile);
if (!file.exists()) {
file.createNewFile();
} else {
// clear the file content
} else if (resetPersist){
// clear the customConfFile content
try (PrintWriter writer = new PrintWriter(file)) {
writer.print("");
}

View File

@ -129,6 +129,10 @@ public class MultiProfileTreeBuilder {
return singleBuilder.getFragmentTreeRoot();
}
public List<ProfileTreeBuilder.FragmentInstances> getFragmentInstances(String executionId) throws AnalysisException{
return getExecutionProfileTreeBuilder(executionId).getFragmentsInstances();
}
private ProfileTreeBuilder getExecutionProfileTreeBuilder(String executionId) throws AnalysisException {
ProfileTreeBuilder singleBuilder = idToSingleTreeBuilder.get(executionId);
if (singleBuilder == null) {

View File

@ -17,12 +17,15 @@
package org.apache.doris.common.profile;
import lombok.Getter;
import lombok.Setter;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.Counter;
import org.apache.doris.common.util.RuntimeProfile;
import org.apache.doris.thrift.TUnit;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@ -69,6 +72,8 @@ public class ProfileTreeBuilder {
// the tree root of the entire query profile tree
private ProfileTreeNode fragmentTreeRoot;
private List<FragmentInstances> fragmentsInstances = Lists.newArrayList();
// Match string like:
// EXCHANGE_NODE (id=3):(Active: 103.899ms, % non-child: 2.27%)
// Extract "EXCHANGE_NODE" and "3"
@ -112,6 +117,10 @@ public class ProfileTreeBuilder {
return instanceActiveTimeMap.get(fragmentId);
}
public List<FragmentInstances> getFragmentsInstances() {
return fragmentsInstances;
}
public void build() throws UserException {
reset();
checkProfile();
@ -125,6 +134,7 @@ public class ProfileTreeBuilder {
instanceTreeMap.clear();
instanceActiveTimeMap.clear();
fragmentTreeRoot = null;
fragmentsInstances.clear();
}
private void checkProfile() throws UserException {
@ -149,13 +159,20 @@ public class ProfileTreeBuilder {
// 1. Get max active time of instances in this fragment
List<Triple<String, String, Long>> instanceIdAndActiveTimeList = Lists.newArrayList();
List<String> instances = Lists.newArrayList();
Map<String, String> instanceIdToTime = Maps.newHashMap();
long maxActiveTimeNs = 0;
for (Pair<RuntimeProfile, Boolean> pair : fragmentChildren) {
Triple<String, String, Long> instanceIdAndActiveTime = getInstanceIdHostAndActiveTime(pair.first);
instanceIdToTime.put(instanceIdAndActiveTime.getLeft(),
RuntimeProfile.printCounter(instanceIdAndActiveTime.getRight(), TUnit.TIME_NS));
maxActiveTimeNs = Math.max(instanceIdAndActiveTime.getRight(), maxActiveTimeNs);
instanceIdAndActiveTimeList.add(instanceIdAndActiveTime);
instances.add(instanceIdAndActiveTime.getLeft());
}
instanceActiveTimeMap.put(fragmentId, instanceIdAndActiveTimeList);
fragmentsInstances.add(new FragmentInstances(fragmentId,
RuntimeProfile.printCounter(maxActiveTimeNs, TUnit.TIME_NS), instanceIdToTime));
// 2. Build tree for all fragments
// All instance in a fragment are same, so use first instance to build the fragment tree
@ -342,4 +359,21 @@ public class ProfileTreeBuilder {
}
return new ImmutableTriple<>(m.group(1), m.group(2) + ":" + m.group(3), activeTimeNs);
}
@Getter
@Setter
public static class FragmentInstances {
@JsonProperty("fragment_id")
private String fragmentId;
@JsonProperty("time")
private String maxActiveTimeNs;
@JsonProperty("instance_id")
private Map<String, String> instanceIdToTime;
public FragmentInstances(String fragmentId, String maxActiveTimeNs, Map<String, String> instanceIdToTime) {
this.fragmentId = fragmentId;
this.maxActiveTimeNs = maxActiveTimeNs;
this.instanceIdToTime = instanceIdToTime;
}
}
}

View File

@ -19,6 +19,7 @@ package org.apache.doris.common.util;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.profile.MultiProfileTreeBuilder;
import org.apache.doris.common.profile.ProfileTreeBuilder;
import org.apache.doris.common.profile.ProfileTreeNode;
import com.google.common.base.Strings;
@ -263,7 +264,15 @@ public class ProfileManager {
// Return the tasks info of the specified load job
// Columns: TaskId, ActiveTime
public List<List<String>> getLoadJobTaskList(String jobId) throws AnalysisException {
MultiProfileTreeBuilder builder;
MultiProfileTreeBuilder builder = getMultiProfileTreeBuilder(jobId);
return builder.getSubTaskInfo();
}
public List<ProfileTreeBuilder.FragmentInstances> getFragmentsAndInstances(String queryId) throws AnalysisException{
return getMultiProfileTreeBuilder(queryId).getFragmentInstances(queryId);
}
private MultiProfileTreeBuilder getMultiProfileTreeBuilder(String jobId) throws AnalysisException{
readLock.lock();
try {
ProfileElement element = queryIdToProfileMap.get(jobId);
@ -271,11 +280,9 @@ public class ProfileManager {
throw new AnalysisException("failed to get task ids. err: "
+ (element == null ? "not found" : element.errMsg));
}
builder = element.builder;
return element.builder;
} finally {
readLock.unlock();
}
return builder.getSubTaskInfo();
}
}

View File

@ -50,6 +50,7 @@ public class SetConfigAction extends RestBaseAction {
private static final Logger LOG = LogManager.getLogger(SetConfigAction.class);
private static final String PERSIST_PARAM = "persist";
private static final String RESET_PERSIST = "reset_persist";
public SetConfigAction(ActionController controller) {
super(controller);
@ -65,6 +66,7 @@ public class SetConfigAction extends RestBaseAction {
checkGlobalAuth(ConnectContext.get().getCurrentUserIdentity(), PrivPredicate.ADMIN);
boolean needPersist = false;
boolean resetPersist = true;
Map<String, List<String>> configs = request.getAllParameters();
if (configs.containsKey(PERSIST_PARAM)) {
List<String> val = configs.remove(PERSIST_PARAM);
@ -72,6 +74,12 @@ public class SetConfigAction extends RestBaseAction {
needPersist = true;
}
}
if (configs.containsKey(RESET_PERSIST)) {
List<String> val = configs.remove(RESET_PERSIST);
if (val.size() == 1 && val.get(0).equals("false")) {
resetPersist = false;
}
}
Map<String, String> setConfigs = Maps.newHashMap();
Map<String, String> errConfigs = Maps.newHashMap();
@ -114,7 +122,7 @@ public class SetConfigAction extends RestBaseAction {
String persistMsg = "";
if (needPersist) {
try {
ConfigBase.persistConfig(setConfigs);
ConfigBase.persistConfig(setConfigs, resetPersist);
persistMsg = "ok";
} catch (IOException e) {
LOG.warn("failed to persist config", e);

View File

@ -17,6 +17,9 @@
package org.apache.doris.httpv2.rest;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.common.ConfigBase;
import org.apache.doris.common.ConfigBase.ConfField;
@ -24,7 +27,10 @@ import org.apache.doris.httpv2.entity.ResponseEntityBuilder;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
import com.clearspring.analytics.util.Lists;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -34,7 +40,9 @@ import org.springframework.web.bind.annotation.RestController;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@ -49,6 +57,7 @@ public class SetConfigAction extends RestBaseController {
private static final Logger LOG = LogManager.getLogger(SetConfigAction.class);
private static final String PERSIST_PARAM = "persist";
private static final String RESET_PERSIST = "reset_persist";
@RequestMapping(path = "/api/_set_config", method = RequestMethod.GET)
protected Object set_config(HttpServletRequest request, HttpServletResponse response) {
@ -56,16 +65,26 @@ public class SetConfigAction extends RestBaseController {
checkGlobalAuth(ConnectContext.get().getCurrentUserIdentity(), PrivPredicate.ADMIN);
boolean needPersist = false;
Map<String, String[]> configs = request.getParameterMap();
boolean resetPersist = true;
Map<String, String[]> tempConfigs = request.getParameterMap();
Map<String, String[]> configs = Maps.newHashMap();
configs.putAll(tempConfigs);
if (configs.containsKey(PERSIST_PARAM)) {
String[] val = configs.remove(PERSIST_PARAM);
if (val.length == 1 && val[0].equals("true")) {
needPersist = true;
}
}
if (configs.containsKey(RESET_PERSIST)) {
String[] val = configs.remove(RESET_PERSIST);
if (val.length == 1 && val[0].equals("false")) {
resetPersist = false;
}
}
Map<String, String> setConfigs = Maps.newHashMap();
Map<String, String> errConfigs = Maps.newHashMap();
List<ErrConfig> errConfigs = Lists.newArrayList();
LOG.debug("get config from url: {}, need persist: {}", configs, needPersist);
@ -73,29 +92,41 @@ public class SetConfigAction extends RestBaseController {
for (Field f : fields) {
// ensure that field has "@ConfField" annotation
ConfField anno = f.getAnnotation(ConfField.class);
if (anno == null || !anno.mutable()) {
continue;
}
if (anno.masterOnly() && !Catalog.getCurrentCatalog().isMaster()) {
if (anno == null) {
continue;
}
// ensure that field has property string
String confKey = anno.value().equals("") ? f.getName() : anno.value();
String[] confVals = configs.get(confKey);
if (confVals == null || confVals.length == 0) {
if (confVals == null) {
continue;
}
if (confVals.length > 1) {
if (confVals.length != 1) {
errConfigs.add(new ErrConfig(confKey, "", "No or multiple configuration values."));
continue;
}
if (!anno.mutable()) {
errConfigs.add(new ErrConfig(confKey, confVals[0], "Not support dynamic modification."));
continue;
}
if (anno.masterOnly() && !Catalog.getCurrentCatalog().isMaster()) {
errConfigs.add(new ErrConfig(confKey, confVals[0], "Not support modification on non-master"));
continue;
}
try {
ConfigBase.setConfigField(f, confVals[0]);
} catch (IllegalArgumentException e){
errConfigs.add(new ErrConfig(confKey, confVals[0], "Unsupported configuration value type."));
continue;
} catch (Exception e) {
LOG.warn("failed to set config {}:{}, {}", confKey, confVals[0], e.getMessage());
errConfigs.add(new ErrConfig(confKey, confVals[0], e.getMessage()));
continue;
}
@ -105,7 +136,7 @@ public class SetConfigAction extends RestBaseController {
String persistMsg = "";
if (needPersist) {
try {
ConfigBase.persistConfig(setConfigs);
ConfigBase.persistConfig(setConfigs, resetPersist);
persistMsg = "ok";
} catch (IOException e) {
LOG.warn("failed to persist config", e);
@ -113,19 +144,56 @@ public class SetConfigAction extends RestBaseController {
}
}
List<String> errConfigNames = errConfigs.stream().map(ErrConfig::getConfigName).collect(Collectors.toList());
for (String key : configs.keySet()) {
if (!setConfigs.containsKey(key)) {
if (!setConfigs.containsKey(key) && !errConfigNames.contains(key)) {
String[] confVals = configs.get(key);
String confVal = confVals.length == 1 ? confVals[0] : "invalid value";
errConfigs.put(key, confVal);
errConfigs.add(new ErrConfig(key, confVal, "invalid config"));
}
}
Map<String, Object> resultMap = Maps.newHashMap();
resultMap.put("set", setConfigs);
resultMap.put("err", errConfigs);
resultMap.put("persist", persistMsg);
return ResponseEntityBuilder.ok(new SetConfigEntity(setConfigs, errConfigs, persistMsg));
}
return ResponseEntityBuilder.ok(resultMap);
@Setter
@AllArgsConstructor
public static class ErrConfig{
@SerializedName(value = "config_name")
@JsonProperty("config_name")
private String configName;
@SerializedName(value = "config_value")
@JsonProperty("config_value")
private String configValue;
@SerializedName(value = "err_info")
@JsonProperty("err_info")
private String errInfo;
public String getConfigName() {
return configName;
}
public String getConfigValue() {
return configValue;
}
public String getErrInfo() {
return errInfo;
}
}
@Getter
@Setter
@AllArgsConstructor
public static class SetConfigEntity{
@SerializedName(value = "set")
@JsonProperty("set")
Map<String, String> setConfigs;
@SerializedName(value = "err")
@JsonProperty("err")
List<ErrConfig> errConfigs;
@SerializedName(value = "persist")
@JsonProperty("persist")
String persistMsg;
}
}

View File

@ -0,0 +1,72 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.httpv2.rest.manager;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.common.Config;
import org.apache.doris.httpv2.entity.ResponseEntityBuilder;
import org.apache.doris.httpv2.rest.RestBaseController;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.system.Frontend;
import com.google.common.collect.Maps;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
/*
* Used to return the cluster information for the manager.
*/
@RestController
@RequestMapping("/rest/v2/manager/cluster")
public class ClusterAction extends RestBaseController {
// Returns mysql and http connection information for the cluster.
// {
// "mysql":[
// ""
// ],
// "http":[
// ""
// ]
// }
@RequestMapping(path = "/cluster_info/conn_info", method = RequestMethod.GET)
public Object clusterInfo(HttpServletRequest request, HttpServletResponse response) {
executeCheckPassword(request, response);
checkGlobalAuth(ConnectContext.get().getCurrentUserIdentity(), PrivPredicate.ADMIN);
Map<String, List<String>> result = Maps.newHashMap();
List<String> frontends = Catalog.getCurrentCatalog().getFrontends(null)
.stream().filter(Frontend::isAlive)
.map(Frontend::getHost)
.collect(Collectors.toList());
result.put("mysql", frontends.stream().map(ip -> ip + ":" + Config.query_port).collect(Collectors.toList()));
result.put("http", frontends.stream().map(ip -> ip + ":" + Config.http_port).collect(Collectors.toList()));
return ResponseEntityBuilder.ok(result);
}
}

View File

@ -0,0 +1,120 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.httpv2.rest.manager;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.common.Config;
import org.apache.doris.common.Pair;
import org.apache.doris.httpv2.entity.ResponseBody;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.system.Frontend;
import com.google.gson.reflect.TypeToken;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpRequestBase;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.util.EntityUtils;
import org.apache.parquet.Strings;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
/*
* used to forward http requests from manager to be.
*/
public class HttpUtils {
static final int REQUEST_SUCCESS_CODE = 0;
static List<Pair<String, Integer>> getFeList() {
return Catalog.getCurrentCatalog().getFrontends(null)
.stream().filter(Frontend::isAlive).map(fe -> new Pair<>(fe.getHost(), Config.http_port))
.collect(Collectors.toList());
}
static String concatUrl(Pair<String, Integer> ipPort, String path, Map<String, String> arguments) {
StringBuilder url = new StringBuilder("http://")
.append(ipPort.first).append(":").append(ipPort.second).append(path);
boolean isFirst = true;
for (Map.Entry<String, String> entry : arguments.entrySet()) {
if (!Strings.isNullOrEmpty(entry.getValue())) {
if (isFirst) {
url.append("?");
} else {
url.append("&");
}
isFirst = false;
url.append(entry.getKey()).append("=").append(entry.getValue());
}
}
return url.toString();
}
static String doGet(String url, Map<String, String> headers) throws IOException {
HttpGet httpGet = new HttpGet(url);
setRequestConfig(httpGet, headers);
return executeRequest(httpGet);
}
static String doPost(String url, Map<String, String> headers, Object body) throws IOException {
HttpPost httpPost = new HttpPost(url);
if (Objects.nonNull(body)) {
String jsonString = GsonUtils.GSON.toJson(body);
StringEntity stringEntity = new StringEntity(jsonString, "UTF-8");
httpPost.setEntity(stringEntity);
}
setRequestConfig(httpPost, headers);
return executeRequest(httpPost);
}
private static void setRequestConfig(HttpRequestBase request, Map<String, String> headers) {
if (null != headers) {
for (String key : headers.keySet()) {
request.setHeader(key, headers.get(key));
}
}
RequestConfig config = RequestConfig.custom()
.setConnectTimeout(2000)
.setConnectionRequestTimeout(2000)
.setSocketTimeout(2000)
.build();
request.setConfig(config);
}
private static String executeRequest(HttpRequestBase request) throws IOException {
CloseableHttpClient client = HttpClientBuilder.create().build();
return client.execute(request, httpResponse -> EntityUtils.toString(httpResponse.getEntity()));
}
static String parseResponse(String response) {
ResponseBody responseEntity = GsonUtils.GSON.fromJson(response, new TypeToken<ResponseBody>() {}.getType());
if (responseEntity.getCode() != REQUEST_SUCCESS_CODE) {
throw new RuntimeException(responseEntity.getMsg());
}
return GsonUtils.GSON.toJson(responseEntity.getData());
}
}

View File

@ -0,0 +1,835 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.httpv2.rest.manager;
import lombok.Getter;
import lombok.Setter;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.ConfigBase;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.MarkedCountDownLatch;
import org.apache.doris.common.Pair;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.proc.ProcResult;
import org.apache.doris.common.proc.ProcService;
import org.apache.doris.httpv2.entity.ResponseEntityBuilder;
import org.apache.doris.httpv2.rest.RestBaseController;
import org.apache.doris.httpv2.rest.SetConfigAction;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.system.Backend;
import org.apache.doris.system.Frontend;
import org.apache.doris.system.SystemInfoService;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.reflect.TypeToken;
import org.apache.commons.httpclient.HttpException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
/*
* Used to return all node information, configuration information and modify node config.
*/
@RestController
@RequestMapping("/rest/v2/manager/node")
public class NodeAction extends RestBaseController {
private static final Logger LOG = LogManager.getLogger(NodeAction.class);
private static final Pattern PATTERN = Pattern.compile(":");
public static final String AUTHORIZATION = "Authorization";
private static final int HTTP_WAIT_TIME_SECONDS = 2;
public static final String CONFIG = "配置项";
public static final String NODE_IP_PORT = "节点";
public static final String NODE_TYPE = "节点类型";
public static final String CONFIG_TYPE = "配置值类型";
public static final String MASTER_ONLY = "MasterOnly";
public static final String CONFIG_VALUE = "配置值";
public static final String IS_MUTABLE = "可修改";
public static final ImmutableList<String> FE_CONFIG_TITLE_NAMES = new ImmutableList.Builder<String>()
.add(CONFIG).add(NODE_IP_PORT).add(NODE_TYPE).add(CONFIG_TYPE)
.add(MASTER_ONLY).add(CONFIG_VALUE).add(IS_MUTABLE)
.build();
public static final ImmutableList<String> BE_CONFIG_TITLE_NAMES = new ImmutableList.Builder<String>()
.add(CONFIG).add(NODE_IP_PORT).add(NODE_TYPE).add(CONFIG_TYPE)
.add(CONFIG_VALUE).add(IS_MUTABLE)
.build();
private Object httpExecutorLock = new Object();
private volatile static ExecutorService httpExecutor = null;
// Returns all fe information, similar to 'show frontends'.
@RequestMapping(path = "/frontends", method = RequestMethod.GET)
public Object frontends_info(HttpServletRequest request, HttpServletResponse response) throws AnalysisException {
executeCheckPassword(request, response);
checkGlobalAuth(ConnectContext.get().getCurrentUserIdentity(), PrivPredicate.ADMIN);
return fetchNodeInfo("/frontends");
}
// Returns all be information, similar to 'show backends'.
@RequestMapping(path = "/backends", method = RequestMethod.GET)
public Object backends_info(HttpServletRequest request, HttpServletResponse response) throws AnalysisException {
executeCheckPassword(request, response);
checkGlobalAuth(ConnectContext.get().getCurrentUserIdentity(), PrivPredicate.ADMIN);
return fetchNodeInfo("/backends");
}
// Returns all broker information, similar to 'show broker'.
@RequestMapping(path = "/brokers", method = RequestMethod.GET)
public Object brokers_info(HttpServletRequest request, HttpServletResponse response) throws AnalysisException {
executeCheckPassword(request, response);
checkGlobalAuth(ConnectContext.get().getCurrentUserIdentity(), PrivPredicate.ADMIN);
return fetchNodeInfo("/brokers");
}
// {
// "column_names": [
// ""
// ],
// "rows": [
// [
// ""
// ]
// ]
// }
private Object fetchNodeInfo(String procPath) throws AnalysisException {
try {
ProcResult procResult = ProcService.getInstance().open(procPath).fetchResult();
List<String> columnNames = Lists.newArrayList(procResult.getColumnNames());
return ResponseEntityBuilder.ok(new NodeInfo(columnNames, procResult.getRows()));
} catch (Exception e) {
LOG.warn(e);
throw e;
}
}
@Getter
@Setter
public static class NodeInfo {
public List<String> column_names;
public List<List<String>> rows;
public NodeInfo(List<String> column_names, List<List<String>> rows) {
this.column_names = column_names;
this.rows = rows;
}
}
// Return fe and be all configuration names.
// {
// "frontend": [
// ""
// ],
// "backend": [
// ""
// ]
// }
@RequestMapping(path = "/configuration_name", method = RequestMethod.GET)
public Object configurationName(HttpServletRequest request, HttpServletResponse response) {
executeCheckPassword(request, response);
checkGlobalAuth(ConnectContext.get().getCurrentUserIdentity(), PrivPredicate.ADMIN);
Map<String, List<String>> result = Maps.newHashMap();
try {
result.put("frontend", Lists.newArrayList(Config.dump().keySet()));
List<String> beConfigNames = Lists.newArrayList();
List<Long> beIds = Catalog.getCurrentSystemInfo().getBackendIds(true);
if (!beIds.isEmpty()) {
Backend be = Catalog.getCurrentSystemInfo().getBackend(beIds.get(0));
String url = "http://" + be.getHost() + ":" + be.getHttpPort() + "/api/show_config";
String questResult = HttpUtils.doGet(url, null);
List<List<String>> configs = GsonUtils.GSON.fromJson(questResult,
new TypeToken<List<List<String>>>() {
}.getType());
for (List<String> config : configs) {
beConfigNames.add(config.get(0));
}
}
result.put("backend", beConfigNames);
} catch (Exception e) {
LOG.warn(e);
return ResponseEntityBuilder.internalError(e.getMessage());
}
return ResponseEntityBuilder.ok(result);
}
// Return all living fe and be nodes.
// {
// "frontend": [
// "host:httpPort"
// ],
// "backend": [
// "host:httpPort""
// ]
// }
@RequestMapping(path = "/node_list", method = RequestMethod.GET)
public Object nodeList(HttpServletRequest request, HttpServletResponse response) {
executeCheckPassword(request, response);
checkGlobalAuth(ConnectContext.get().getCurrentUserIdentity(), PrivPredicate.ADMIN);
Map<String, List<String>> result = Maps.newHashMap();
result.put("frontend", getFeList());
result.put("backend", getBeList());
return ResponseEntityBuilder.ok(result);
}
private static List<String> getFeList() {
return Catalog.getCurrentCatalog().getFrontends(null)
.stream().filter(Frontend::isAlive)
.map(fe -> fe.getHost() + ":" + Config.http_port)
.collect(Collectors.toList());
}
private static List<String> getBeList() {
return Catalog.getCurrentSystemInfo().getBackendIds(true)
.stream().map(beId -> {
Backend be = Catalog.getCurrentSystemInfo().getBackend(beId);
return be.getHost() + ":" + be.getHttpPort();
})
.collect(Collectors.toList());
}
/*
* this http interface is used to return configuration information requested by other fe.
*/
@RequestMapping(path = "/config", method = RequestMethod.GET)
public Object config(HttpServletRequest request, HttpServletResponse response) {
executeCheckPassword(request, response);
checkGlobalAuth(ConnectContext.get().getCurrentUserIdentity(), PrivPredicate.ADMIN);
List<List<String>> results = Lists.newArrayList();
try {
List<List<String>> configs = ConfigBase.getConfigInfo(null);
// Sort all configs by config key.
Collections.sort(configs, Comparator.comparing(o -> o.get(0)));
// reorder the fields
for (List<String> config : configs) {
List<String> list = Lists.newArrayList();
list.add(config.get(0));
list.add(config.get(2));
list.add(config.get(4));
list.add(config.get(1));
list.add(config.get(3));
results.add(list);
}
} catch (DdlException e) {
LOG.warn(e);
return ResponseEntityBuilder.internalError(e.getMessage());
}
return results;
}
// Return the configuration information of fe or be.
//
// for fe:
// {
// "column_names": [
// "配置项",
// "节点",
// "节点类型",
// "配置类型",
// "仅master",
// "配置值",
// "可修改"
// ],
// "rows": [
// [
// ""
// ]
// ]
// }
//
// for be:
// {
// "column_names": [
// "配置项",
// "节点",
// "节点类型",
// "配置类型",
// "配置值",
// "可修改"
// ],
// "rows": [
// [
// ""
// ]
// ]
// }
@RequestMapping(path = "/configuration_info", method = RequestMethod.POST)
public Object configurationInfo(HttpServletRequest request, HttpServletResponse response,
@RequestParam(value = "type") String type,
@RequestBody(required = false) ConfigInfoRequestBody requestBody) {
executeCheckPassword(request, response);
checkGlobalAuth(ConnectContext.get().getCurrentUserIdentity(), PrivPredicate.ADMIN);
initHttpExecutor();
if (requestBody == null) {
requestBody = new ConfigInfoRequestBody();
}
List<Pair<String, Integer>> hostPorts;
if (type.equalsIgnoreCase("fe")) {
if (requestBody.getNodes() != null && !requestBody.getNodes().isEmpty()) {
hostPorts = parseHostPort(requestBody.getNodes());
} else {
hostPorts = parseHostPort(getFeList());
}
List<Map.Entry<String, Integer>> errNodes = Lists.newArrayList();
List<List<String>> data = handleConfigurationInfo(hostPorts, request.getHeader(AUTHORIZATION),
"/rest/v2/manager/node/config", "FE", requestBody.getConfNames(), errNodes);
if (!errNodes.isEmpty()) {
LOG.warn("Failed to get fe node configuration information from:{}", errNodes.toString());
}
return ResponseEntityBuilder.ok(new NodeInfo(FE_CONFIG_TITLE_NAMES, data));
} else if (type.equalsIgnoreCase("be")) {
if (requestBody.getNodes() != null && !requestBody.getNodes().isEmpty()) {
hostPorts = parseHostPort(requestBody.getNodes());
} else {
hostPorts = parseHostPort(getBeList());
}
List<Map.Entry<String, Integer>> errNodes = Lists.newArrayList();
List<List<String>> data = handleConfigurationInfo(hostPorts, request.getHeader(AUTHORIZATION),
"/api/show_config", "BE", requestBody.getConfNames(), errNodes);
if (!errNodes.isEmpty()) {
LOG.warn("Failed to get be node configuration information from:{}", errNodes.toString());
}
return ResponseEntityBuilder.ok(new NodeInfo(BE_CONFIG_TITLE_NAMES, data));
}
return ResponseEntityBuilder.badRequest("Unsupported type: " + type + ". Only types of fe or be are " +
"supported");
}
// Use thread pool to concurrently fetch configuration information from specified fe or be nodes.
private List<List<String>> handleConfigurationInfo(List<Pair<String, Integer>> hostPorts,
String authorization, String questPath,
String nodeType, List<String> confNames,
List<Map.Entry<String, Integer>> errNodes) {
// The configuration information returned by each node is a List<List<String>> type,
// configInfoTotal is used to store the configuration information of all nodes.
List<List<List<String>>> configInfoTotal = Lists.newArrayList();
MarkedCountDownLatch<String, Integer> configRequestDoneSignal = new MarkedCountDownLatch<>(hostPorts.size());
for (int i = 0; i < hostPorts.size(); ++i) {
configInfoTotal.add(Lists.newArrayList());
Pair<String, Integer> hostPort = hostPorts.get(i);
configRequestDoneSignal.addMark(hostPort.first + ":" + hostPort.second, -1);
String url = "http://" + hostPort.first + ":" + hostPort.second + questPath;
httpExecutor.submit(new HttpConfigInfoTask(url, hostPort, authorization, nodeType, confNames,
configRequestDoneSignal, configInfoTotal.get(i)));
}
List<List<String>> resultConfigs = Lists.newArrayList();
try {
configRequestDoneSignal.await(HTTP_WAIT_TIME_SECONDS, TimeUnit.SECONDS);
for (List<List<String>> lists : configInfoTotal) {
resultConfigs.addAll(lists);
}
} catch (InterruptedException e) {
errNodes.addAll(configRequestDoneSignal.getLeftMarks());
}
return resultConfigs;
}
private void initHttpExecutor() {
if (httpExecutor == null) {
synchronized (httpExecutorLock) {
if (httpExecutor == null) {
httpExecutor = ThreadPoolManager.newDaemonFixedThreadPool(5, 100,
"node-config-update-pool", true);
}
}
}
}
static List<Pair<String, Integer>> parseHostPort(List<String> nodes) {
List<Pair<String, Integer>> hostPorts = Lists.newArrayList();
for (String node : nodes) {
try {
Pair<String, Integer> ipPort = SystemInfoService.validateHostAndPort(node);
hostPorts.add(ipPort);
} catch (Exception e) {
LOG.warn(e);
}
}
return hostPorts;
}
private class HttpConfigInfoTask implements Runnable {
private String url;
private Pair<String, Integer> hostPort;
private String authorization;
private String nodeType;
private List<String> confNames;
private MarkedCountDownLatch<String, Integer> configRequestDoneSignal;
private List<List<String>> config;
public HttpConfigInfoTask(String url, Pair<String, Integer> hostPort, String authorization, String nodeType,
List<String> confNames,
MarkedCountDownLatch<String, Integer> configRequestDoneSignal,
List<List<String>> config) {
this.url = url;
this.hostPort = hostPort;
this.authorization = authorization;
this.nodeType = nodeType;
this.confNames = confNames;
this.configRequestDoneSignal = configRequestDoneSignal;
this.config = config;
}
@Override
public void run() {
String configInfo;
try {
configInfo = HttpUtils.doGet(url, ImmutableMap.<String, String>builder().put(AUTHORIZATION,
authorization).build());
List<List<String>> configs = GsonUtils.GSON.fromJson(configInfo,
new TypeToken<List<List<String>>>() {
}.getType());
for (List<String> conf : configs) {
if (confNames == null || confNames.isEmpty() || confNames.contains(conf.get(0))) {
addConfig(conf);
}
}
configRequestDoneSignal.markedCountDown(hostPort.first + ":" + hostPort.second, -1);
} catch (Exception e) {
LOG.warn("get config from {}:{} failed.", hostPort.first, hostPort.second, e);
configRequestDoneSignal.countDown();
}
}
private void addConfig(List<String> conf) {
conf.add(1, hostPort.first + ":" + hostPort.second);
conf.add(2, nodeType);
config.add(conf);
}
}
// Modify fe configuration.
//
// request body:
//{
// "config_name":{
// "node":[
// ""
// ],
// "value":"",
// "persist":""
// }
//}
//
// return data:
// {
// "failed":[
// {
// "config_name":"",
// "value"="",
// "node":"",
// "err_info":""
// }
// ]
// }
@RequestMapping(path = "/set_config/fe", method = RequestMethod.POST)
public Object setConfigFe(HttpServletRequest request, HttpServletResponse response,
@RequestBody Map<String, SetConfigRequestBody> requestBody) {
executeCheckPassword(request, response);
checkGlobalAuth(ConnectContext.get().getCurrentUserIdentity(), PrivPredicate.ADMIN);
List<Map<String, String>> failedTotal = Lists.newArrayList();
List<NodeConfigs> nodeConfigList = parseSetConfigNodes(requestBody, failedTotal);
List<Pair<String, Integer>> aliveFe = Catalog.getCurrentCatalog().getFrontends(null)
.stream().filter(Frontend::isAlive).map(fe -> new Pair<>(fe.getHost(), Config.http_port))
.collect(Collectors.toList());
checkNodeIsAlive(nodeConfigList, aliveFe, failedTotal);
Map<String, String> header = Maps.newHashMap();
header.put(AUTHORIZATION, request.getHeader(AUTHORIZATION));
for (NodeConfigs nodeConfigs : nodeConfigList) {
if (!nodeConfigs.getConfigs(true).isEmpty()) {
String url = concatFeSetConfigUrl(nodeConfigs, true);
try {
String responsePersist = HttpUtils.doGet(url, header);
parseFeSetConfigResponse(responsePersist, nodeConfigs.getHostPort(), failedTotal);
} catch (Exception e) {
addSetConfigErrNode(nodeConfigs.getConfigs(true), nodeConfigs.getHostPort(),
e.getMessage(), failedTotal);
}
}
if (!nodeConfigs.getConfigs(false).isEmpty()) {
String url = concatFeSetConfigUrl(nodeConfigs, false);
try {
String responseTemp = HttpUtils.doGet(url, header);
parseFeSetConfigResponse(responseTemp, nodeConfigs.getHostPort(), failedTotal);
} catch (Exception e) {
addSetConfigErrNode(nodeConfigs.getConfigs(false), nodeConfigs.getHostPort(),
e.getMessage(), failedTotal);
}
}
}
Map<String, List<Map<String, String>>> data = Maps.newHashMap();
data.put("failed", failedTotal);
return ResponseEntityBuilder.ok(data);
}
private void addSetConfigErrNode(Map<String, String> configs, Pair<String, Integer> hostPort, String err,
List<Map<String, String>> failedTotal) {
for (Map.Entry<String, String> entry : configs.entrySet()) {
Map<String, String> failed = Maps.newHashMap();
addFailedConfig(entry.getKey(), entry.getValue(), hostPort.first + ":" +
hostPort.second, err, failed);
failedTotal.add(failed);
}
}
private void parseFeSetConfigResponse(String response, Pair<String, Integer> hostPort,
List<Map<String, String>> failedTotal) throws HttpException {
JsonObject jsonObject = JsonParser.parseString(response).getAsJsonObject();
if (jsonObject.get("code").getAsInt() != HttpUtils.REQUEST_SUCCESS_CODE) {
throw new HttpException(jsonObject.get("msg").getAsString());
}
SetConfigAction.SetConfigEntity setConfigEntity = GsonUtils.GSON.fromJson(jsonObject.get("data").getAsJsonObject(),
SetConfigAction.SetConfigEntity.class);
for (SetConfigAction.ErrConfig errConfig : setConfigEntity.getErrConfigs()) {
Map<String, String> failed = Maps.newHashMap();
addFailedConfig(errConfig.getConfigName(), errConfig.getConfigValue(),
hostPort.first + ":" + hostPort.second, errConfig.getErrInfo(), failed);
failedTotal.add(failed);
}
}
private static void addFailedConfig(String configName, String value, String node, String errInfo,
Map<String, String> failed) {
failed.put("config_name", configName);
failed.put("value", value);
failed.put("node", node);
failed.put("err_info", errInfo);
}
private String concatFeSetConfigUrl(NodeConfigs nodeConfigs, boolean isPersist) {
StringBuffer sb = new StringBuffer();
Pair<String, Integer> hostPort = nodeConfigs.getHostPort();
sb.append("http://").append(hostPort.first).append(":").append(hostPort.second).append("/api/_set_config");
Map<String, String> configs = nodeConfigs.getConfigs(isPersist);
boolean addAnd = false;
for (Map.Entry<String, String> entry : configs.entrySet()) {
if (addAnd) {
sb.append("&");
} else {
sb.append("?");
addAnd = true;
}
sb.append(entry.getKey()).append("=").append(entry.getValue());
}
if (isPersist) {
sb.append("&persist=true&reset_persist=false");
}
return sb.toString();
}
// Modify fe configuration.
// The request body and return data are in the same format as fe
@RequestMapping(path = "/set_config/be", method = RequestMethod.POST)
public Object setConfigBe(HttpServletRequest request, HttpServletResponse response,
@RequestBody Map<String, SetConfigRequestBody> requestBody) {
executeCheckPassword(request, response);
checkGlobalAuth(ConnectContext.get().getCurrentUserIdentity(), PrivPredicate.ADMIN);
List<Map<String, String>> failedTotal = Lists.newArrayList();
List<NodeConfigs> nodeConfigList = parseSetConfigNodes(requestBody, failedTotal);
List<Pair<String, Integer>> aliveBe = Catalog.getCurrentSystemInfo().getBackendIds(true)
.stream().map(beId -> {
Backend be = Catalog.getCurrentSystemInfo().getBackend(beId);
return new Pair<>(be.getHost(), be.getHttpPort());
})
.collect(Collectors.toList());
checkNodeIsAlive(nodeConfigList, aliveBe, failedTotal);
handleBeSetConfig(nodeConfigList, request.getHeader(AUTHORIZATION), failedTotal);
failedTotal = failedTotal.stream().filter(e -> !e.isEmpty()).collect(Collectors.toList());
Map<String, List<Map<String, String>>> data = Maps.newHashMap();
data.put("failed", failedTotal);
return ResponseEntityBuilder.ok(data);
}
// Parsing request body into List<NodeConfigs>
private List<NodeConfigs> parseSetConfigNodes(Map<String, SetConfigRequestBody> requestBody,
List<Map<String, String>> errNodes) {
List<NodeConfigs> nodeConfigsList = Lists.newArrayList();
for (String configName : requestBody.keySet()) {
SetConfigRequestBody configPara = requestBody.get(configName);
String value = configPara.getValue();
boolean persist = configPara.isPersist();
if (value == null || configPara.getNodes() == null) {
continue;
}
for (String node : configPara.getNodes()) {
Pair<String, Integer> ipPort;
try {
ipPort = SystemInfoService.validateHostAndPort(node);
} catch (Exception e) {
Map<String, String> failed = Maps.newHashMap();
addFailedConfig(configName, configPara.getValue(), node, "node invalid", failed);
errNodes.add(failed);
continue;
}
boolean find = false;
for (NodeConfigs nodeConfigs : nodeConfigsList) {
Pair<String, Integer> hostPort = nodeConfigs.getHostPort();
if (ipPort.first.equals(hostPort.first) && ipPort.second.equals(hostPort.second)) {
find = true;
nodeConfigs.addConfig(configName, value, persist);
}
}
if (!find) {
NodeConfigs newNodeConfigs = new NodeConfigs(ipPort.first, ipPort.second);
nodeConfigsList.add(newNodeConfigs);
newNodeConfigs.addConfig(configName, value, persist);
}
}
}
return nodeConfigsList;
}
private void checkNodeIsAlive(List<NodeConfigs> nodeConfigsList, List<Pair<String, Integer>> aliveNodes,
List<Map<String, String>> failedNodes) {
Iterator<NodeConfigs> it = nodeConfigsList.iterator();
while (it.hasNext()) {
NodeConfigs node = it.next();
boolean isExist = false;
for (Pair<String, Integer> aliveHostPort : aliveNodes) {
if (aliveHostPort.first.equals(node.getHostPort().first)
&& aliveHostPort.second.equals(node.getHostPort().second)) {
isExist = true;
break;
}
}
if (!isExist) {
addSetConfigErrNode(node.getConfigs(true), node.getHostPort(),
"Node does not exist or is not alive", failedNodes);
addSetConfigErrNode(node.getConfigs(false), node.getHostPort(),
"Node does not exist or is not alive", failedNodes);
it.remove();
}
}
}
private List<Map<String, String>> handleBeSetConfig(List<NodeConfigs> nodeConfigList, String authorization,
List<Map<String, String>> failedTotal) {
initHttpExecutor();
int configNum = nodeConfigList.stream()
.mapToInt(e -> e.getConfigs(true).size() + e.getConfigs(false).size()).sum();
MarkedCountDownLatch<String, Integer> beSetConfigCountDownSignal = new MarkedCountDownLatch<>(configNum);
for (NodeConfigs nodeConfigs : nodeConfigList) {
submitBeSetConfigTask(nodeConfigs, true, authorization, beSetConfigCountDownSignal, failedTotal);
submitBeSetConfigTask(nodeConfigs, false, authorization, beSetConfigCountDownSignal, failedTotal);
}
try {
beSetConfigCountDownSignal.await(HTTP_WAIT_TIME_SECONDS, TimeUnit.SECONDS);
} catch (InterruptedException e) {
LOG.warn("set be config exception:", e);
} finally {
List<Map.Entry<String, Integer>> leftNode = beSetConfigCountDownSignal.getLeftMarks();
for (Map.Entry<String, Integer> failedNode : leftNode) {
Map<String, String> failed = parseNodeConfig(failedNode.getKey());
if (!failed.isEmpty()) {
failed.put("err_info", "Connection timeout");
failedTotal.add(failed);
}
}
}
return failedTotal;
}
private void submitBeSetConfigTask(NodeConfigs nodeConfigs, boolean isPersist, String authorization,
MarkedCountDownLatch<String, Integer> beSetConfigCountDownSignal,
List<Map<String, String>> failedTotal) {
if (!nodeConfigs.getConfigs(isPersist).isEmpty()) {
for (Map.Entry<String, String> entry : nodeConfigs.getConfigs(isPersist).entrySet()) {
failedTotal.add(Maps.newHashMap());
Pair<String, Integer> hostPort = nodeConfigs.getHostPort();
beSetConfigCountDownSignal.addMark(concatNodeConfig(hostPort.first, hostPort.second,
entry.getKey(), entry.getValue()), -1);
String url = concatBeSetConfigUrl(hostPort.first, hostPort.second, entry.getKey(),
entry.getValue(), isPersist);
httpExecutor.submit(new HttpSetConfigTask(url, hostPort, authorization, entry.getKey(),
entry.getValue(), beSetConfigCountDownSignal,
failedTotal.get(failedTotal.size() - 1)));
}
}
}
private String concatBeSetConfigUrl(String host, Integer port, String configName,
String configValue, boolean isPersist) {
StringBuilder stringBuffer = new StringBuilder();
stringBuffer.append("http://").append(host).append(":").append(port)
.append("/api/update_config")
.append("?").append(configName).append("=").append(configValue);
if (isPersist) {
stringBuffer.append("&persist=true");
}
return stringBuffer.toString();
}
private String concatNodeConfig(String host, Integer port, String configName, String configValue) {
return host + ":" + port + ":" + configName + ":" + configValue;
}
private Map<String, String> parseNodeConfig(String nodeConfig) {
Map<String, String> map = Maps.newHashMap();
String[] splitStrings = PATTERN.split(nodeConfig);
if (splitStrings.length == 4) {
addFailedConfig(splitStrings[2], splitStrings[3], splitStrings[0] + ":" + splitStrings[1], "", map);
}
return map;
}
private class HttpSetConfigTask implements Runnable {
private String url;
private Pair<String, Integer> hostPort;
private String authorization;
private String configName;
private String configValue;
private MarkedCountDownLatch<String, Integer> beSetConfigDoneSignal;
private Map<String, String> failed;
public HttpSetConfigTask(String url, Pair<String, Integer> hostPort, String authorization, String configName,
String configValue, MarkedCountDownLatch<String, Integer> beSetConfigDoneSignal,
Map<String, String> failed) {
this.url = url;
this.hostPort = hostPort;
this.authorization = authorization;
this.configName = configName;
this.configValue = configValue;
this.beSetConfigDoneSignal = beSetConfigDoneSignal;
this.failed = failed;
}
@Override
public void run() {
try {
String response = HttpUtils.doPost(url, ImmutableMap.<String, String>builder().put(AUTHORIZATION,
authorization).build(), null);
JsonObject jsonObject = JsonParser.parseString(response).getAsJsonObject();
String status = jsonObject.get("status").getAsString();
if (!status.equals("OK")) {
addFailedConfig(configName, configValue, hostPort.first + ":" + hostPort.second,
jsonObject.get("msg").getAsString(), failed);
}
beSetConfigDoneSignal.markedCountDown(concatNodeConfig(hostPort.first, hostPort.second,
configName, configValue), -1);
} catch (Exception e) {
LOG.warn("set be:{} config:{} failed.", hostPort.first + ":" + hostPort.second,
configName + "=" + configValue, e);
beSetConfigDoneSignal.countDown();
}
}
}
// Store persistent and non-persistent configuration information that needs to be modified on the node.
public static class NodeConfigs {
private Pair<String, Integer> hostPort;
private Map<String, String> persistConfigs;
private Map<String, String> nonPersistConfigs;
public NodeConfigs(String host, Integer httpPort) {
hostPort = new Pair<>(host, httpPort);
persistConfigs = Maps.newHashMap();
nonPersistConfigs = Maps.newHashMap();
}
public Pair<String, Integer> getHostPort() {
return hostPort;
}
public void addConfig(String name, String value, boolean persist) {
if (persist) {
persistConfigs.put(name, value);
} else {
nonPersistConfigs.put(name, value);
}
}
public Map<String, String> getConfigs(boolean isPersist) {
return isPersist ? persistConfigs : nonPersistConfigs;
}
}
@Getter
@Setter
public static class ConfigInfoRequestBody {
@JsonProperty("conf_name")
public List<String> confNames;
@JsonProperty("node")
public List<String> nodes;
}
@Getter
@Setter
public static class SetConfigRequestBody {
@JsonProperty("node")
private List<String> nodes;
private String value;
private boolean persist;
}
}

View File

@ -0,0 +1,339 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.httpv2.rest.manager;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.Pair;
import org.apache.doris.common.profile.ProfileTreeNode;
import org.apache.doris.common.profile.ProfileTreePrinter;
import org.apache.doris.common.util.ProfileManager;
import org.apache.doris.httpv2.entity.ResponseEntityBuilder;
import org.apache.doris.httpv2.rest.RestBaseController;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ConnectContext;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.JsonParser;
import com.google.gson.reflect.TypeToken;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
/*
* Used to return query information and query profile.
*/
@RestController
@RequestMapping("/rest/v2/manager/query")
public class QueryProfileAction extends RestBaseController {
private static final Logger LOG = LogManager.getLogger(QueryProfileAction.class);
public static final String QUERY_ID = "Query ID";
public static final String NODE = "FE节点";
public static final String USER = "查询用户";
public static final String DEFAULT_DB = "执行数据库";
public static final String SQL_STATEMENT = "Sql";
public static final String QUERY_TYPE = "查询类型";
public static final String START_TIME = "开始时间";
public static final String END_TIME = "结束时间";
public static final String TOTAL = "执行时长";
public static final String QUERY_STATE = "状态";
private static final String QUERY_ID_PARA = "query_id";
private static final String SEARCH_PARA = "search";
private static final String IS_ALL_NODE_PARA = "is_all_node";
private static final String FRAGMENT_ID = "fragment_id";
private static final String INSTANCE_ID = "instance_id";
public static final ImmutableList<String> QUERY_TITLE_NAMES = new ImmutableList.Builder<String>()
.add(QUERY_ID).add(NODE).add(USER).add(DEFAULT_DB).add(SQL_STATEMENT)
.add(QUERY_TYPE).add(START_TIME).add(END_TIME).add(TOTAL).add(QUERY_STATE)
.build();
private List<String> requestAllFe(String httpPath, Map<String, String> arguments, String authorization) {
List<Pair<String, Integer>> frontends = HttpUtils.getFeList();
ImmutableMap<String, String> header = ImmutableMap.<String, String>builder()
.put(NodeAction.AUTHORIZATION, authorization).build();
List<String> dataList = Lists.newArrayList();
for (Pair<String, Integer> ipPort : frontends) {
String url = HttpUtils.concatUrl(ipPort, httpPath, arguments);
try {
String data = HttpUtils.parseResponse(HttpUtils.doGet(url, header));
if (!Strings.isNullOrEmpty(data) && !data.equals("{}")) {
dataList.add(data);
}
} catch (Exception e) {
LOG.warn("request url {} error", url, e);
}
}
return dataList;
}
@RequestMapping(path = "/query_info", method = RequestMethod.GET)
public Object queryInfo(HttpServletRequest request, HttpServletResponse response,
@RequestParam(value = QUERY_ID_PARA, required = false) String queryId,
@RequestParam(value = SEARCH_PARA, required = false) String search,
@RequestParam(value = IS_ALL_NODE_PARA, required = false, defaultValue = "true")
boolean isAllNode) {
executeCheckPassword(request, response);
checkGlobalAuth(ConnectContext.get().getCurrentUserIdentity(), PrivPredicate.ADMIN);
List<List<String>> queries = Lists.newArrayList();
if (isAllNode) {
// Get query information for all fe
String httpPath = "/rest/v2/manager/query/query_info";
Map<String, String> arguments = Maps.newHashMap();
arguments.put(QUERY_ID_PARA, queryId);
if (!Strings.isNullOrEmpty(search)) {
try {
// search may contain special characters that need to be encoded.
search = URLEncoder.encode(search, StandardCharsets.UTF_8.toString());
} catch (UnsupportedEncodingException ignored) {
// Encoding exception, ignore search parameter.
}
}
arguments.put(SEARCH_PARA, search);
arguments.put(IS_ALL_NODE_PARA, "false");
List<String> dataList = requestAllFe(httpPath, arguments, request.getHeader(NodeAction.AUTHORIZATION));
for (String data : dataList) {
try {
NodeAction.NodeInfo nodeInfo = GsonUtils.GSON.fromJson(data,
new TypeToken<NodeAction.NodeInfo>() {
}.getType());
queries.addAll(nodeInfo.getRows());
} catch (Exception e) {
LOG.warn("parse query info error: {}", data, e);
}
}
return ResponseEntityBuilder.ok(new NodeAction.NodeInfo(QUERY_TITLE_NAMES, queries));
}
queries = ProfileManager.getInstance().getAllQueries().stream()
.filter(profile -> profile.get(4).equals("Query")).collect(Collectors.toList());
if (!Strings.isNullOrEmpty(queryId)) {
queries = queries.stream().filter(q -> q.get(0).equals(queryId)).collect(Collectors.toList());
}
// add node information
for (List<String> query : queries) {
query.add(1, Catalog.getCurrentCatalog().getSelfNode().first + ":" + Config.http_port);
}
if (!Strings.isNullOrEmpty(search)) {
List<List<String>> tempQueries = Lists.newArrayList();
for (List<String> query : queries) {
for (String field : query) {
if (field.contains(search)) {
tempQueries.add(query);
break;
}
}
}
queries = tempQueries;
}
return ResponseEntityBuilder.ok(new NodeAction.NodeInfo(QUERY_TITLE_NAMES, queries));
}
// Returns the sql for the specified query id.
@RequestMapping(path = "/sql/{query_id}", method = RequestMethod.GET)
public Object queryInfo(HttpServletRequest request, HttpServletResponse response,
@PathVariable("query_id") String queryId,
@RequestParam(value = IS_ALL_NODE_PARA, required = false, defaultValue = "true")
boolean isAllNode) {
executeCheckPassword(request, response);
checkGlobalAuth(ConnectContext.get().getCurrentUserIdentity(), PrivPredicate.ADMIN);
Map<String, String> querySql = Maps.newHashMap();
if (isAllNode) {
String httpPath = "/rest/v2/manager/query/sql/" + queryId;
ImmutableMap<String, String> arguments = ImmutableMap.<String, String>builder()
.put(IS_ALL_NODE_PARA, "false").build();
List<String> dataList = requestAllFe(httpPath, arguments, request.getHeader(NodeAction.AUTHORIZATION));
if (!dataList.isEmpty()) {
try {
String sql = JsonParser.parseString(dataList.get(0)).getAsJsonObject().get("sql").getAsString();
querySql.put("sql", sql);
return ResponseEntityBuilder.ok(querySql);
} catch (Exception e) {
LOG.warn("parse sql error: {}", dataList.get(0), e);
}
}
} else {
List<List<String>> queries = ProfileManager.getInstance().getAllQueries().stream()
.filter(query -> query.get(0).equals(queryId)).collect(Collectors.toList());
if (!queries.isEmpty()) {
querySql.put("sql", queries.get(0).get(3));
}
}
return ResponseEntityBuilder.ok(querySql);
}
// Returns the text profile for the specified query id.
@RequestMapping(path = "/profile/text/{query_id}", method = RequestMethod.GET)
public Object queryProfileText(HttpServletRequest request, HttpServletResponse response,
@PathVariable("query_id") String queryId,
@RequestParam(value = IS_ALL_NODE_PARA, required = false, defaultValue = "true")
boolean isAllNode) {
executeCheckPassword(request, response);
checkGlobalAuth(ConnectContext.get().getCurrentUserIdentity(), PrivPredicate.ADMIN);
Map<String, String> profileMap = Maps.newHashMap();
if (isAllNode) {
String httpPath = "/rest/v2/manager/query/profile/text/" + queryId;
ImmutableMap<String, String> arguments = ImmutableMap.<String, String>builder()
.put(IS_ALL_NODE_PARA, "false").build();
List<String> dataList = requestAllFe(httpPath, arguments, request.getHeader(NodeAction.AUTHORIZATION));
if (!dataList.isEmpty()) {
try {
String profile =
JsonParser.parseString(dataList.get(0)).getAsJsonObject().get("profile").getAsString();
profileMap.put("profile", profile);
return ResponseEntityBuilder.ok(profileMap);
} catch (Exception e) {
LOG.warn("parse profile text error: {}", dataList.get(0), e);
}
}
} else {
String profile = ProfileManager.getInstance().getProfile(queryId);
if (!Strings.isNullOrEmpty(profile)) {
profileMap.put("profile", profile);
}
}
return ResponseEntityBuilder.ok(profileMap);
}
// Returns the fragments and instances for the specified query id.
// [
// {
// "fragment_id":"",
// "time":"",
// "instance_id":[
// ""
// ]
// }
// ]
@RequestMapping(path = "/profile/fragments/{query_id}", method = RequestMethod.GET)
public Object fragments(HttpServletRequest request, HttpServletResponse response,
@PathVariable("query_id") String queryId,
@RequestParam(value = IS_ALL_NODE_PARA, required = false, defaultValue = "true")
boolean isAllNode) {
executeCheckPassword(request, response);
checkGlobalAuth(ConnectContext.get().getCurrentUserIdentity(), PrivPredicate.ADMIN);
if (isAllNode) {
String httpPath = "/rest/v2/manager/query/profile/fragments/" + queryId;
ImmutableMap<String, String> arguments = ImmutableMap.<String, String>builder()
.put(IS_ALL_NODE_PARA, "false").build();
List<Pair<String, Integer>> frontends = HttpUtils.getFeList();
ImmutableMap<String, String> header = ImmutableMap.<String, String>builder()
.put(NodeAction.AUTHORIZATION, request.getHeader(NodeAction.AUTHORIZATION)).build();
for (Pair<String, Integer> ipPort : frontends) {
String url = HttpUtils.concatUrl(ipPort, httpPath, arguments);
try {
String responseJson = HttpUtils.doGet(url, header);
int code = JsonParser.parseString(responseJson).getAsJsonObject().get("code").getAsInt();
if (code == HttpUtils.REQUEST_SUCCESS_CODE) {
return responseJson;
}
} catch (Exception e) {
LOG.warn(e);
}
}
} else {
try {
return ResponseEntityBuilder.ok(ProfileManager.getInstance().getFragmentsAndInstances(queryId));
} catch (AnalysisException e) {
return ResponseEntityBuilder.badRequest(e.getMessage());
}
}
return ResponseEntityBuilder.badRequest("not found query id");
}
// Returns the graph profile for the specified query id.
@RequestMapping(path = "/profile/graph/{query_id}", method = RequestMethod.GET)
public Object queryProfileGraph(HttpServletRequest request, HttpServletResponse response,
@PathVariable("query_id") String queryId,
@RequestParam(value = FRAGMENT_ID, required = false) String fragmentId,
@RequestParam(value = INSTANCE_ID, required = false) String instanceId,
@RequestParam(value = IS_ALL_NODE_PARA, required = false, defaultValue = "true")
boolean isAllNode) {
executeCheckPassword(request, response);
checkGlobalAuth(ConnectContext.get().getCurrentUserIdentity(), PrivPredicate.ADMIN);
Map<String, String> graph = Maps.newHashMap();
List<String> results;
if (isAllNode) {
String httpPath = "/rest/v2/manager/query/profile/graph/" + queryId;
Map<String, String> arguments = Maps.newHashMap();
arguments.put(FRAGMENT_ID, fragmentId);
arguments.put(INSTANCE_ID, instanceId);
arguments.put(IS_ALL_NODE_PARA, "false");
List<String> dataList = requestAllFe(httpPath, arguments, request.getHeader(NodeAction.AUTHORIZATION));
if (!dataList.isEmpty()) {
try {
String profileGraph =
JsonParser.parseString(dataList.get(0)).getAsJsonObject().get("graph").getAsString();
graph.put("graph", profileGraph);
return ResponseEntityBuilder.ok(graph);
} catch (Exception e) {
LOG.warn("parse profile graph error: {}", dataList.get(0), e);
}
}
} else {
try {
if (Strings.isNullOrEmpty(fragmentId) || Strings.isNullOrEmpty(instanceId)) {
ProfileTreeNode treeRoot = ProfileManager.getInstance().getFragmentProfileTree(queryId, queryId);
results = Lists.newArrayList(ProfileTreePrinter.printFragmentTree(treeRoot));
} else {
ProfileTreeNode treeRoot = ProfileManager.getInstance().getInstanceProfileTree(queryId, queryId,
fragmentId, instanceId);
results = Lists.newArrayList(ProfileTreePrinter.printInstanceTree(treeRoot));
}
graph.put("graph", results.get(0));
} catch (Exception e) {
LOG.warn("get profile graph error, queryId:{}, fragementId:{}, instanceId:{}",
queryId, fragmentId, instanceId, e);
}
}
return ResponseEntityBuilder.ok(graph);
}
}

View File

@ -148,6 +148,7 @@ public class StmtExecutor implements ProfileWriter {
private RuntimeProfile plannerRuntimeProfile;
private final Object writeProfileLock = new Object();
private volatile boolean isFinishedProfile = false;
private String queryType = "Query";
private volatile Coordinator coord = null;
private MasterOpExecutor masterOpExecutor = null;
private RedirectStatus redirectStatus = null;
@ -195,10 +196,13 @@ public class StmtExecutor implements ProfileWriter {
profile.addChild(summaryProfile);
summaryProfile.addInfoString(ProfileManager.QUERY_ID, DebugUtil.printId(context.queryId()));
summaryProfile.addInfoString(ProfileManager.START_TIME, TimeUtils.longToTimeString(context.getStartTime()));
summaryProfile.addInfoString(ProfileManager.END_TIME, TimeUtils.longToTimeString(currentTimestamp));
summaryProfile.addInfoString(ProfileManager.END_TIME,
waiteBeReport ? TimeUtils.longToTimeString(currentTimestamp) : "N/A");
summaryProfile.addInfoString(ProfileManager.TOTAL_TIME, DebugUtil.getPrettyStringMs(totalTimeMs));
summaryProfile.addInfoString(ProfileManager.QUERY_TYPE, "Query");
summaryProfile.addInfoString(ProfileManager.QUERY_STATE, context.getState().toString());
summaryProfile.addInfoString(ProfileManager.QUERY_TYPE, queryType);
summaryProfile.addInfoString(ProfileManager.QUERY_STATE,
!waiteBeReport && context.getState().getStateType().equals(MysqlStateType.OK) ?
"RUNNING" : context.getState().toString());
summaryProfile.addInfoString(ProfileManager.DORIS_VERSION, Version.DORIS_BUILD_VERSION);
summaryProfile.addInfoString(ProfileManager.USER, context.getQualifiedUser());
summaryProfile.addInfoString(ProfileManager.DEFAULT_DB, context.getDatabase());
@ -209,8 +213,12 @@ public class StmtExecutor implements ProfileWriter {
summaryProfile.addChild(plannerRuntimeProfile);
profile.addChild(coord.getQueryProfile());
} else {
summaryProfile.addInfoString(ProfileManager.END_TIME, TimeUtils.longToTimeString(currentTimestamp));
summaryProfile.addInfoString(ProfileManager.END_TIME,
waiteBeReport ? TimeUtils.longToTimeString(currentTimestamp) : "N/A");
summaryProfile.addInfoString(ProfileManager.TOTAL_TIME, DebugUtil.getPrettyStringMs(totalTimeMs));
summaryProfile.addInfoString(ProfileManager.QUERY_STATE,
!waiteBeReport && context.getState().getStateType().equals(MysqlStateType.OK) ?
"RUNNING" : context.getState().toString());
}
plannerProfile.initRuntimeProfile(plannerRuntimeProfile);
@ -360,6 +368,7 @@ public class StmtExecutor implements ProfileWriter {
try {
handleInsertStmt();
if (!((InsertStmt) parsedStmt).getQueryStmt().isExplain()) {
queryType = "Insert";
writeProfile(true);
}
} catch (Throwable t) {

View File

@ -262,7 +262,7 @@ public class ExportExportingTask extends MasterTask {
summaryProfile.addInfoString(ProfileManager.END_TIME, TimeUtils.longToTimeString(currentTimestamp));
summaryProfile.addInfoString(ProfileManager.TOTAL_TIME, DebugUtil.getPrettyStringMs(totalTimeMs));
summaryProfile.addInfoString(ProfileManager.QUERY_TYPE, "Query");
summaryProfile.addInfoString(ProfileManager.QUERY_TYPE, "Export");
summaryProfile.addInfoString(ProfileManager.QUERY_STATE, job.getState().toString());
summaryProfile.addInfoString(ProfileManager.DORIS_VERSION, Version.DORIS_BUILD_VERSION);
summaryProfile.addInfoString(ProfileManager.USER, "xxx");