diff --git a/be/src/common/configbase.cpp b/be/src/common/configbase.cpp index b44fc9663d..7593f76d60 100644 --- a/be/src/common/configbase.cpp +++ b/be/src/common/configbase.cpp @@ -255,6 +255,10 @@ void Properties::set(const std::string& key, const std::string& val) { file_conf_map.emplace(key, val); } +void Properties::set_force(const std::string& key, const std::string& val) { + file_conf_map[key] = val; +} + bool Properties::dump(const std::string& conffile) { std::vector files = {conffile}; Status st = FileSystemUtil::remove_paths(files); @@ -386,19 +390,14 @@ bool persist_config(const std::string& field, const std::string& value) { std::lock_guard l(custom_conf_lock); static const string conffile = string(getenv("DORIS_HOME")) + "/conf/be_custom.conf"; - Status st = FileSystemUtil::create_file(conffile); - if (!st.ok()) { - LOG(WARNING) << "failed to create or open be_custom.conf. " << st.get_error_msg(); - return false; - } Properties tmp_props; - if (!tmp_props.load(conffile.c_str())) { + if (!tmp_props.load(conffile.c_str(), false)) { LOG(WARNING) << "failed to load " << conffile; return false; } - tmp_props.set(field, value); + tmp_props.set_force(field, value); return tmp_props.dump(conffile); } @@ -430,5 +429,26 @@ Status set_config(const std::string& field, const std::string& value, bool need_ std::mutex* get_mutable_string_config_lock() { return &mutable_string_config_lock; } +std::vector> get_config_info() { + std::vector> configs; + std::lock_guard lock(mutable_string_config_lock); + for (const auto& it : *full_conf_map) { + auto field_it = Register::_s_field_map->find(it.first); + if (field_it == Register::_s_field_map->end()) { + continue; + } + + std::vector _config; + _config.push_back(it.first); + + _config.push_back(field_it->second.type); + _config.push_back(it.second); + _config.push_back(field_it->second.valmutable ? "true":"false"); + + configs.push_back(_config); + } + return configs; +} + } // namespace config } // namespace doris diff --git a/be/src/common/configbase.h b/be/src/common/configbase.h index e21be85647..6773e367ee 100644 --- a/be/src/common/configbase.h +++ b/be/src/common/configbase.h @@ -156,6 +156,8 @@ public: void set(const std::string& key, const std::string& val); + void set_force(const std::string& key, const std::string& val); + // dump props to conf file bool dump(const std::string& conffile); @@ -181,6 +183,8 @@ bool persist_config(const std::string& field, const std::string& value); std::mutex* get_mutable_string_config_lock(); +std::vector> get_config_info(); + } // namespace config } // namespace doris diff --git a/be/src/http/CMakeLists.txt b/be/src/http/CMakeLists.txt index 4d7caaceb1..eb42232678 100644 --- a/be/src/http/CMakeLists.txt +++ b/be/src/http/CMakeLists.txt @@ -49,7 +49,7 @@ add_library(Webserver STATIC action/stream_load.cpp action/meta_action.cpp action/compaction_action.cpp - action/update_config_action.cpp + action/config_action.cpp # action/multi_start.cpp # action/multi_show.cpp # action/multi_commit.cpp diff --git a/be/src/http/action/update_config_action.cpp b/be/src/http/action/config_action.cpp similarity index 83% rename from be/src/http/action/update_config_action.cpp rename to be/src/http/action/config_action.cpp index d12588a8df..e4dab39820 100644 --- a/be/src/http/action/update_config_action.cpp +++ b/be/src/http/action/config_action.cpp @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -#include "http/action/update_config_action.h" +#include "http/action/config_action.h" #include #include @@ -40,7 +40,34 @@ const static std::string HEADER_JSON = "application/json"; const static std::string PERSIST_PARAM = "persist"; -void UpdateConfigAction::handle(HttpRequest* req) { +void ConfigAction::handle(HttpRequest* req) { + if (_type == ConfigActionType::UPDATE_CONFIG) { + handle_update_config(req); + } else if (_type == ConfigActionType::SHOW_CONFIG) { + handle_show_config(req); + } +} + +void ConfigAction::handle_show_config(HttpRequest* req) { + std::vector> config_info = config::get_config_info(); + + rapidjson::StringBuffer str_buf; + rapidjson::Writer writer(str_buf); + + writer.StartArray(); + for (const auto& _config : config_info) { + writer.StartArray(); + for (const std::string& config_filed : _config) { + writer.String(config_filed.c_str()); + } + writer.EndArray(); + } + + writer.EndArray(); + HttpChannel::send_reply(req, str_buf.GetString()); +} + +void ConfigAction::handle_update_config(HttpRequest* req) { LOG(INFO) << req->debug_string(); Status s; diff --git a/be/src/http/action/update_config_action.h b/be/src/http/action/config_action.h similarity index 74% rename from be/src/http/action/update_config_action.h rename to be/src/http/action/config_action.h index 97965a218d..27836e62c1 100644 --- a/be/src/http/action/update_config_action.h +++ b/be/src/http/action/config_action.h @@ -21,13 +21,27 @@ namespace doris { +enum ConfigActionType { + UPDATE_CONFIG = 1, + SHOW_CONFIG = 2, +}; + // Update BE config. -class UpdateConfigAction : public HttpHandler { +class ConfigAction : public HttpHandler { public: - UpdateConfigAction() {} - virtual ~UpdateConfigAction() {} + ConfigAction(ConfigActionType type): _type(type) {} + + virtual ~ConfigAction() {} void handle(HttpRequest* req) override; + +private: + ConfigActionType _type; + + void handle_update_config(HttpRequest* req); + + void handle_show_config(HttpRequest* req); + }; } // namespace doris diff --git a/be/src/service/http_service.cpp b/be/src/service/http_service.cpp index 78d9dc9975..1b7c6037e1 100644 --- a/be/src/service/http_service.cpp +++ b/be/src/service/http_service.cpp @@ -31,7 +31,7 @@ #include "http/action/tablets_distribution_action.h" #include "http/action/tablet_migration_action.h" #include "http/action/tablets_info_action.h" -#include "http/action/update_config_action.h" +#include "http/action/config_action.h" #include "http/default_path_handlers.h" #include "http/download_action.h" #include "http/ev_http_server.h" @@ -144,9 +144,14 @@ Status HttpService::start() { _ev_http_server->register_handler(HttpMethod::GET, "/api/compaction/run_status", run_status_compaction_action); - UpdateConfigAction* update_config_action = _pool.add(new UpdateConfigAction()); + ConfigAction* update_config_action = + _pool.add(new ConfigAction(ConfigActionType::UPDATE_CONFIG)); _ev_http_server->register_handler(HttpMethod::POST, "/api/update_config", update_config_action); + ConfigAction* show_config_action = + _pool.add(new ConfigAction(ConfigActionType::SHOW_CONFIG)); + _ev_http_server->register_handler(HttpMethod::GET, "/api/show_config", show_config_action); + _ev_http_server->start(); return Status::OK(); } diff --git a/docs/.vuepress/sidebar/en.js b/docs/.vuepress/sidebar/en.js index b5f23fc74c..cfd6faa949 100644 --- a/docs/.vuepress/sidebar/en.js +++ b/docs/.vuepress/sidebar/en.js @@ -88,6 +88,15 @@ module.exports = [ title: "FE", directoryPath: "fe/", children: [ + { + title: "MANAGER", + directoryPath: "manager/", + children: [ + "cluster-action", + "node-action", + "query-profile-action", + ], + }, "bootstrap-action", "cancel-load-action", "check-decommission-action", diff --git a/docs/.vuepress/sidebar/zh-CN.js b/docs/.vuepress/sidebar/zh-CN.js index ba1ac66e3d..27e95ef56b 100644 --- a/docs/.vuepress/sidebar/zh-CN.js +++ b/docs/.vuepress/sidebar/zh-CN.js @@ -87,6 +87,15 @@ module.exports = [ title: "FE", directoryPath: "fe/", children: [ + { + title: "MANAGER", + directoryPath: "manager/", + children: [ + "cluster-action", + "node-action", + "query-profile-action", + ], + }, "bootstrap-action", "cancel-load-action", "check-decommission-action", diff --git a/docs/en/administrator-guide/http-actions/fe/manager/cluster-action.md b/docs/en/administrator-guide/http-actions/fe/manager/cluster-action.md new file mode 100644 index 0000000000..b9328cc249 --- /dev/null +++ b/docs/en/administrator-guide/http-actions/fe/manager/cluster-action.md @@ -0,0 +1,77 @@ +--- +{ + "title": "Cluster Action", + "language": "en" +} +--- + + + +# Cluster Action + +## Request + +`GET /rest/v2/manager/cluster/cluster_info/conn_info` + +## Cluster Connection Information + +`GET /rest/v2/manager/cluster/cluster_info/conn_info` + +### Description + +Used to get cluster http, mysql connection information. + +### Response + +``` +{ + "msg": "success", + "code": 0, + "data": { + "http": [ + "fe_host:http_ip" + ], + "mysql": [ + "fe_host:query_ip" + ] + }, + "count": 0 +} +``` + +### Examples + ``` + GET /rest/v2/manager/cluster/cluster_info/conn_info + + Response: + { + "msg": "success", + "code": 0, + "data": { + "http": [ + "127.0.0.1:8030" + ], + "mysql": [ + "127.0.0.1:9030" + ] + }, + "count": 0 + } + ``` \ No newline at end of file diff --git a/docs/en/administrator-guide/http-actions/fe/manager/node-action.md b/docs/en/administrator-guide/http-actions/fe/manager/node-action.md new file mode 100644 index 0000000000..217a28af08 --- /dev/null +++ b/docs/en/administrator-guide/http-actions/fe/manager/node-action.md @@ -0,0 +1,435 @@ +--- +{ + "title": "Node Action", + "language": "en" +} +--- + + + +# Node Action + +## Request +s +`GET /rest/v2/manager/node/frontends` + +`GET /rest/v2/manager/node/backends` + +`GET /rest/v2/manager/node/brokers` + +`GET /rest/v2/manager/node/configuration_name` + +`GET /rest/v2/manager/node/node_list` + +`POST /rest/v2/manager/node/configuration_info` + +`POST /rest/v2/manager/node/set_config/fe` + +`POST /rest/v2/manager/node/set_config/be` + +## Get information about fe, be, broker nodes + +`GET /rest/v2/manager/node/frontends` + +`GET /rest/v2/manager/node/backends` + +`GET /rest/v2/manager/node/brokers` + +### Description + +Used to get cluster to get fe, be, broker node information. + +### Response + +``` +frontends: +{ + "msg": "success", + "code": 0, + "data": { + "column_names": [ + "Name", + "IP", + "HostName", + "EditLogPort", + "HttpPort", + "QueryPort", + "RpcPort", + "Role", + "IsMaster", + "ClusterId", + "Join", + "Alive", + "ReplayedJournalId", + "LastHeartbeat", + "IsHelper", + "ErrMsg", + "Version" + ], + "rows": [ + [ + ... + ] + ] + }, + "count": 0 +} +``` + +``` +backends: +{ + "msg": "success", + "code": 0, + "data": { + "column_names": [ + "BackendId", + "Cluster", + "IP", + "HostName", + "HeartbeatPort", + "BePort", + "HttpPort", + "BrpcPort", + "LastStartTime", + "LastHeartbeat", + "Alive", + "SystemDecommissioned", + "ClusterDecommissioned", + "TabletNum", + "DataUsedCapacity", + "AvailCapacity", + "TotalCapacity", + "UsedPct", + "MaxDiskUsedPct", + "ErrMsg", + "Version", + "Status" + ], + "rows": [ + [ + ... + ] + ] + }, + "count": 0 +} +``` + +``` +brokers: +{ + "msg": "success", + "code": 0, + "data": { + "column_names": [ + "Name", + "IP", + "HostName", + "Port", + "Alive", + "LastStartTime", + "LastUpdateTime", + "ErrMsg" + ], + "rows": [ + [ + ... + ] + ] + }, + "count": 0 +} +``` + +## Get node configuration information + +`GET /rest/v2/manager/node/configuration_name` + +`GET /rest/v2/manager/node/node_list` + +`POST /rest/v2/manager/node/configuration_info` + +### Description + +configuration_name Used to get the name of the node configuration item. +node_list Get the list of nodes. +configuration_info to get the node configuration details. + +### Query parameters +`GET /rest/v2/manager/node/configuration_name` +none + +`GET /rest/v2/manager/node/node_list` +none + +`POST /rest/v2/manager/node/configuration_info` + +* type + The value is fe or be, which specifies to get the configuration information of fe or the configuration information of be. + +### Request body + +`GET /rest/v2/manager/node/configuration_name` +none + +`GET /rest/v2/manager/node/node_list` +none + +`POST /rest/v2/manager/node/configuration_info` +``` +{ + "conf_name": [ + "" + ], + "node": [ + "" + ] +} + +If no body is included, the parameters in the body use the default values. +conf_name specifies which configuration items to return, the default is all configuration items. +node is used to specify which node's configuration information is returned, the default is all fe nodes or be nodes configuration information. +``` + +### Response +`GET /rest/v2/manager/node/configuration_name` +``` +{ + "msg": "success", + "code": 0, + "data": { + "backend":[ + "" + ], + "frontend":[ + "" + ] + }, + "count": 0 +} +``` + +`GET /rest/v2/manager/node/node_list` +``` +{ + "msg": "success", + "code": 0, + "data": { + "backend": [ + "" + ], + "frontend": [ + "" + ] + }, + "count": 0 +} +``` + +`POST /rest/v2/manager/node/configuration_info?type=fe` +``` +{ + "msg": "success", + "code": 0, + "data": { + "column_names": [ + "配置项", + "节点", + "节点类型", + "配置值类型", + "MasterOnly", + "配置值", + "可修改" + ], + "rows": [ + [ + "" + ] + ] + }, + "count": 0 +} +``` + +`POST /rest/v2/manager/node/configuration_info?type=be` +``` +{ + "msg": "success", + "code": 0, + "data": { + "column_names": [ + "配置项", + "节点", + "节点类型", + "配置值类型", + "配置值", + "可修改" + ], + "rows": [ + [ + "" + ] + ] + }, + "count": 0 +} +``` + +### Examples + +1. Get the fe agent_task_resend_wait_time_ms configuration information: + + POST /rest/v2/manager/node/configuration_info?type=fe + body: + ``` + { + "conf_name":[ + "agent_task_resend_wait_time_ms" + ] + } + ``` + + Response: + ``` + { + "msg": "success", + "code": 0, + "data": { + "column_names": [ + "配置项", + "节点", + "节点类型", + "配置值类型", + "MasterOnly", + "配置值", + "可修改" + ], + "rows": [ + [ + "agent_task_resend_wait_time_ms", + "127.0.0.1:8030", + "FE", + "long", + "true", + "50000", + "true" + ] + ] + }, + "count": 0 + } + ``` + +## Modify configuration values + +`POST /rest/v2/manager/node/set_config/fe` + +`POST /rest/v2/manager/node/set_config/be` + +### Description + +Used to modify fe or be node configuration values + +### Request body +``` +{ + "config_name":{ + "node":[ + "" + ], + "value":"", + "persist": + } +} + +config_name is the corresponding configuration item. +node is a keyword indicating the list of nodes to be modified; +value is the value of the configuration. +persist is true for permanent modification and false for temporary modification. persist means permanent modification, false means temporary modification. permanent modification takes effect after reboot, temporary modification fails after reboot. +``` + +### Response +`GET /rest/v2/manager/node/configuration_name` +``` +{ + "msg": "", + "code": 0, + "data": { + "failed":[ + { + "config_name":"name", + "value"="", + "node":"", + "err_info":"" + } + ] + }, + "count": 0 +} + +failed Indicates a configuration message that failed to be modified. +``` + +### Examples + +1. Modify the agent_task_resend_wait_time_ms and alter_table_timeout_second configuration values in the fe 127.0.0.1:8030 node: + + POST /rest/v2/manager/node/set_config/fe + body: + ``` + { + "agent_task_resend_wait_time_ms":{ + "node":[ + "127.0.0.1:8030" + ], + "value":"10000", + "persist":"true" + }, + "alter_table_timeout_second":{ + "node":[ + "127.0.0.1:8030" + ], + "value":"true", + "persist":"true" + } + } + ``` + + Response: + ``` + { + "msg": "success", + "code": 0, + "data": { + "failed": [ + { + "config_name": "alter_table_timeout_second", + "node": "10.81.85.89:8837", + "err_info": "Unsupported configuration value type.", + "value": "true" + } + ] + }, + "count": 0 + } + + gent_task_resend_wait_time_ms configuration value modified successfully, alter_table_timeout_second modification failed. + ``` \ No newline at end of file diff --git a/docs/en/administrator-guide/http-actions/fe/manager/query-profile-action.md b/docs/en/administrator-guide/http-actions/fe/manager/query-profile-action.md new file mode 100644 index 0000000000..037ca5a437 --- /dev/null +++ b/docs/en/administrator-guide/http-actions/fe/manager/query-profile-action.md @@ -0,0 +1,308 @@ +--- +{ + "title": "Query Profile Action", + "language": "en" +} +--- + + + +# Query Profile Action + +## Request + +`GET /rest/v2/manager/query/query_info` + +`GET /rest/v2/manager/query/sql/{query_id}` + +`GET /rest/v2/manager/query/profile/text/{query_id}` + +`GET /rest/v2/manager/query/profile/fragments/{query_id}` + +`GET /rest/v2/manager/query/profile/graph/{query_id}` + +## Get the query information + +`GET /rest/v2/manager/query/query_info` + +### Description + +Gets information about select queries for all fe nodes in the cluster. + +### Query parameters + +* `query_id` + + Optional, specifies the query ID of the query to be returned, default returns information for all queries. + +* `search` + + Optional, specifies that query information containing strings is returned, currently only string matches are performed. + +* `is_all_node` + + Optional, if true, returns query information for all fe nodes, if false, returns query information for the current fe node. The default is true. + + +### Response + +``` +{ + "msg": "success", + "code": 0, + "data": { + "column_names": [ + "Query ID", + "FE节点", + "查询用户", + "执行数据库", + "Sql", + "查询类型", + "开始时间", + "结束时间", + "执行时长", + "状态" + ], + "rows": [ + [ + ... + ] + ] + }, + "count": 0 +} +``` + +### Examples +``` +GET /rest/v2/manager/query/query_info + +{ + "msg": "success", + "code": 0, + "data": { + "column_names": [ + "Query ID", + "FE节点", + "查询用户", + "执行数据库", + "Sql", + "查询类型", + "开始时间", + "结束时间", + "执行时长", + "状态" + ], + "rows": [ + [ + "d7c93d9275334c35-9e6ac5f295a7134b", + "127.0.0.1:8030", + "root", + "default_cluster:testdb", + "select c.id, c.name, p.age, p.phone, c.date, c.cost from cost c join people p on c.id = p.id where p.age > 20 order by c.id", + "Query", + "2021-07-29 16:59:12", + "2021-07-29 16:59:12", + "109ms", + "EOF" + ] + ] + }, + "count": 0 +} +``` + +## Get the sql and text profile for the specified query + +`GET /rest/v2/manager/query/sql/{query_id}` + +`GET /rest/v2/manager/query/profile/text/{query_id}` + +### Description + +Get the sql and profile text for the specified query id. + +### Path parameters + +* `query_id` + + The query id. + +### Query parameters + +* `is_all_node` + + Optional, if true then query for the specified query id in all fe nodes, if false then query for the specified query id in the currently connected fe nodes. The default is true. + +### Response + +``` +{ + "msg": "success", + "code": 0, + "data": { + "sql": "" + }, + "count": 0 +} +``` + +``` +{ + "msg": "success", + "code": 0, + "data": { + "profile": "" + }, + "count": 0 +} +``` + +### Examples + +1. get sql. + + ``` + GET /rest/v2/manager/query/sql/d7c93d9275334c35-9e6ac5f295a7134b + + Response: + { + "msg": "success", + "code": 0, + "data": { + "sql": "select c.id, c.name, p.age, p.phone, c.date, c.cost from cost c join people p on c.id = p.id where p.age > 20 order by c.id" + }, + "count": 0 + } + ``` + +## Get the specified query fragment and instance information + +`GET /rest/v2/manager/query/profile/fragments/{query_id}` + +### Description + +Get the fragment name, instance id and execution time for the specified query id. + +### Path parameters + +* `query_id` + + The query id. + +### Query parameters + +* `is_all_node` + + Optional, if true then query for the specified query id in all fe nodes, if false then query for the specified query id in the currently connected fe nodes. The default is true. + +### Response + +``` +{ + "msg": "success", + "code": 0, + "data": [ + { + "fragment_id": "", + "time": "", + "instance_id": { + "": "" + } + } + ], + "count": 0 +} +``` + +### Examples + + ``` + GET /rest/v2/manager/query/profile/fragments/d7c93d9275334c35-9e6ac5f295a7134b + + Response: + { + "msg": "success", + "code": 0, + "data": [ + { + "fragment_id": "0", + "time": "36.169ms", + "instance_id": { + "d7c93d9275334c35-9e6ac5f295a7134e": "36.169ms" + } + }, + { + "fragment_id": "1", + "time": "20.710ms", + "instance_id": { + "d7c93d9275334c35-9e6ac5f295a7134c": "20.710ms" + } + }, + { + "fragment_id": "2", + "time": "7.83ms", + "instance_id": { + "d7c93d9275334c35-9e6ac5f295a7134d": "7.83ms" + } + } + ], + "count": 0 + } + ``` + +## Get the specified query id tree profile information + +`GET /rest/v2/manager/query/profile/graph/{query_id}` + +### Description + +Get the tree profile information of the specified query id, same as `show query profile` command. + +### Path parameters + +* `query_id` + + The query id. + +### Query parameters + +* `fragment_id` and `instance_id` + + Optional, both parameters must be specified or not. + If both are not specified, a simple tree of profiles is returned, equivalent to `show query profile '/query_id'`; + If both are specified, a detailed profile tree is returned, which is equivalent to `show query profile '/query_id/fragment_id/instance_id'`. + +* `is_all_node` + + Optional, if true then query information about the specified query id in all fe nodes, if false then query information about the specified query id in the currently connected fe nodes. The default is true. + +### Response + +``` +{ + "msg": "success", + "code": 0, + "data": { + "graph":"" + }, + "count": 0 +} +``` \ No newline at end of file diff --git a/docs/en/administrator-guide/http-actions/fe/set-config-action.md b/docs/en/administrator-guide/http-actions/fe/set-config-action.md index bc1d786a21..a14492e0f7 100644 --- a/docs/en/administrator-guide/http-actions/fe/set-config-action.md +++ b/docs/en/administrator-guide/http-actions/fe/set-config-action.md @@ -48,6 +48,12 @@ None Whether to persist the modified configuration. The default is false, which means it is not persisted. If it is true, the modified configuration item will be written into the `fe_custom.conf` file and will still take effect after FE is restarted. +* `reset_persist` + Whether or not to clear the original persist configuration only takes effect when the persist parameter is true. For compatibility with the original version, reset_persist defaults to true. + If persist is set to true and reset_persist is not set or reset_persist is true, the configuration in the `fe_custom.conf` file will be cleared before this modified configuration is written to `fe_custom.conf`. + If persist is set to true and reset_persist is false, this modified configuration item will be incrementally added to `fe_custom.conf`. + + ## Request body None @@ -60,57 +66,85 @@ None "code": 0, "data": { "set": { - "storage_min_left_capacity_bytes": "1024", - "qe_max_connection": "2048" + "key": "value" }, - "err": { - "replica_ack_policy": "SIMPLE_MAJORITY" - } + "err": [ + { + "config_name": "", + "config_value": "", + "err_info": "" + } + ], + "persist":"" }, "count": 0 } ``` -The `set` field indicates the successfully set configuration. The `err` field indicates the configuration that failed to be set. +The `set` field indicates the successfully set configuration. The `err` field indicates the configuration that failed to be set. The `persist` field indicates persistent information. ## Examples -1. Set the two configuration values of `max_bytes_per_broker_scanner` and `max_broker_concurrency`. +1. Set the values of `storage_min_left_capacity_bytes`, `replica_ack_policy` and `agent_task_resend_wait_time_ms`. ``` - GET /api/_set_config?max_bytes_per_broker_scanner=21474836480&max_broker_concurrency=20 + GET /api/_set_config?storage_min_left_capacity_bytes=1024&replica_ack_policy=SIMPLE_MAJORITY&agent_task_resend_wait_time_ms=true Response: { - "msg": "success", - "code": 0, - "data": { - "set": { - "max_bytes_per_broker_scanner": "21474836480", - "max_broker_concurrency": "20" - }, - "err": {} - }, - "count": 0 + "msg": "success", + "code": 0, + "data": { + "set": { + "storage_min_left_capacity_bytes": "1024" + }, + "err": [ + { + "config_name": "replica_ack_policy", + "config_value": "SIMPLE_MAJORITY", + "err_info": "Not support dynamic modification." + }, + { + "config_name": "agent_task_resend_wait_time_ms", + "config_value": "true", + "err_info": "Unsupported configuration value type." + } + ], + "persist": "" + }, + "count": 0 } + + storage_min_left_capacity_bytes Successfully; + replica_ack_policy Failed, because the configuration item does not support dynamic modification. + agent_task_resend_wait_time_ms Failed, failed to set the boolean type because the configuration item is of type long. ``` 2. Set `max_bytes_per_broker_scanner` and persist it. ``` - GET /api/_set_config?max_bytes_per_broker_scanner=21474836480&persist=true + GET /api/_set_config?max_bytes_per_broker_scanner=21474836480&persist=true&reset_persist=false Response: { - "msg": "success", - "code": 0, - "data": { - "set": { - "max_bytes_per_broker_scanner": "21474836480" - }, - "err": {}, - "persist": "ok" - }, - "count": 0 + "msg": "success", + "code": 0, + "data": { + "set": { + "max_bytes_per_broker_scanner": "21474836480" + }, + "err": [], + "persist": "ok" + }, + "count": 0 } + ``` + + The fe/conf directory generates the fe_custom.conf file: + ``` + #THIS IS AN AUTO GENERATED CONFIG FILE. + #You can modify this file manually, and the configurations in this file + #will overwrite the configurations in fe.conf + #Wed Jul 28 12:43:14 CST 2021 + max_bytes_per_broker_scanner=21474836480 ``` \ No newline at end of file diff --git a/docs/zh-CN/administrator-guide/http-actions/fe/manager/cluster-action.md b/docs/zh-CN/administrator-guide/http-actions/fe/manager/cluster-action.md new file mode 100644 index 0000000000..74bf68d86e --- /dev/null +++ b/docs/zh-CN/administrator-guide/http-actions/fe/manager/cluster-action.md @@ -0,0 +1,77 @@ +--- +{ + "title": "Cluster Action", + "language": "zh-CN" +} +--- + + + +# Cluster Action + +## Request + +`GET /rest/v2/manager/cluster/cluster_info/conn_info` + +## 集群连接信息 + +`GET /rest/v2/manager/cluster/cluster_info/conn_info` + +### Description + +用于获取集群http、mysql连接信息。 + +### Response + +``` +{ + "msg": "success", + "code": 0, + "data": { + "http": [ + "fe_host:http_ip" + ], + "mysql": [ + "fe_host:query_ip" + ] + }, + "count": 0 +} +``` + +### Examples + ``` + GET /rest/v2/manager/cluster/cluster_info/conn_info + + Response: + { + "msg": "success", + "code": 0, + "data": { + "http": [ + "127.0.0.1:8030" + ], + "mysql": [ + "127.0.0.1:9030" + ] + }, + "count": 0 + } + ``` \ No newline at end of file diff --git a/docs/zh-CN/administrator-guide/http-actions/fe/manager/node-action.md b/docs/zh-CN/administrator-guide/http-actions/fe/manager/node-action.md new file mode 100644 index 0000000000..2eb2b7d41d --- /dev/null +++ b/docs/zh-CN/administrator-guide/http-actions/fe/manager/node-action.md @@ -0,0 +1,435 @@ +--- +{ + "title": "Node Action", + "language": "zh-CN" +} +--- + + + +# Node Action + +## Request + +`GET /rest/v2/manager/node/frontends` + +`GET /rest/v2/manager/node/backends` + +`GET /rest/v2/manager/node/brokers` + +`GET /rest/v2/manager/node/configuration_name` + +`GET /rest/v2/manager/node/node_list` + +`POST /rest/v2/manager/node/configuration_info` + +`POST /rest/v2/manager/node/set_config/fe` + +`POST /rest/v2/manager/node/set_config/be` + +## 获取fe, be, broker节点信息 + +`GET /rest/v2/manager/node/frontends` + +`GET /rest/v2/manager/node/backends` + +`GET /rest/v2/manager/node/brokers` + +### Description + +用于获取集群获取fe, be, broker节点信息。 + +### Response + +``` +frontends: +{ + "msg": "success", + "code": 0, + "data": { + "column_names": [ + "Name", + "IP", + "HostName", + "EditLogPort", + "HttpPort", + "QueryPort", + "RpcPort", + "Role", + "IsMaster", + "ClusterId", + "Join", + "Alive", + "ReplayedJournalId", + "LastHeartbeat", + "IsHelper", + "ErrMsg", + "Version" + ], + "rows": [ + [ + ... + ] + ] + }, + "count": 0 +} +``` + +``` +backends: +{ + "msg": "success", + "code": 0, + "data": { + "column_names": [ + "BackendId", + "Cluster", + "IP", + "HostName", + "HeartbeatPort", + "BePort", + "HttpPort", + "BrpcPort", + "LastStartTime", + "LastHeartbeat", + "Alive", + "SystemDecommissioned", + "ClusterDecommissioned", + "TabletNum", + "DataUsedCapacity", + "AvailCapacity", + "TotalCapacity", + "UsedPct", + "MaxDiskUsedPct", + "ErrMsg", + "Version", + "Status" + ], + "rows": [ + [ + ... + ] + ] + }, + "count": 0 +} +``` + +``` +brokers: +{ + "msg": "success", + "code": 0, + "data": { + "column_names": [ + "Name", + "IP", + "HostName", + "Port", + "Alive", + "LastStartTime", + "LastUpdateTime", + "ErrMsg" + ], + "rows": [ + [ + ... + ] + ] + }, + "count": 0 +} +``` + +## 获取节点配置信息 + +`GET /rest/v2/manager/node/configuration_name` + +`GET /rest/v2/manager/node/node_list` + +`POST /rest/v2/manager/node/configuration_info` + +### Description + +configuration_name 用于获取节点配置项名称。 +node_list 用于获取节点列表。 +configuration_info 用于获取节点配置详细信息。 + +### Query parameters +`GET /rest/v2/manager/node/configuration_name` +无 + +`GET /rest/v2/manager/node/node_list` +无 + +`POST /rest/v2/manager/node/configuration_info` + +* type + 值为 fe 或 be, 用于指定获取fe的配置信息或be的配置信息。 + +### Request body + +`GET /rest/v2/manager/node/configuration_name` +无 + +`GET /rest/v2/manager/node/node_list` +无 + +`POST /rest/v2/manager/node/configuration_info` +``` +{ + "conf_name": [ + "" + ], + "node": [ + "" + ] +} + +若不带body,body中的参数都使用默认值。 +conf_name 用于指定返回哪些配置项的信息, 默认返回所有配置项信息; +node 用于指定返回哪些节点的配置项信息,默认为全部fe节点或be节点配置项信息。 +``` + +### Response +`GET /rest/v2/manager/node/configuration_name` +``` +{ + "msg": "success", + "code": 0, + "data": { + "backend":[ + "" + ], + "frontend":[ + "" + ] + }, + "count": 0 +} +``` + +`GET /rest/v2/manager/node/node_list` +``` +{ + "msg": "success", + "code": 0, + "data": { + "backend": [ + "" + ], + "frontend": [ + "" + ] + }, + "count": 0 +} +``` + +`POST /rest/v2/manager/node/configuration_info?type=fe` +``` +{ + "msg": "success", + "code": 0, + "data": { + "column_names": [ + "配置项", + "节点", + "节点类型", + "配置值类型", + "MasterOnly", + "配置值", + "可修改" + ], + "rows": [ + [ + "" + ] + ] + }, + "count": 0 +} +``` + +`POST /rest/v2/manager/node/configuration_info?type=be` +``` +{ + "msg": "success", + "code": 0, + "data": { + "column_names": [ + "配置项", + "节点", + "节点类型", + "配置值类型", + "配置值", + "可修改" + ], + "rows": [ + [ + "" + ] + ] + }, + "count": 0 +} +``` + +### Examples + +1. 获取fe agent_task_resend_wait_time_ms 配置项信息: + + POST /rest/v2/manager/node/configuration_info?type=fe + body: + ``` + { + "conf_name":[ + "agent_task_resend_wait_time_ms" + ] + } + ``` + + Response: + ``` + { + "msg": "success", + "code": 0, + "data": { + "column_names": [ + "配置项", + "节点", + "节点类型", + "配置值类型", + "MasterOnly", + "配置值", + "可修改" + ], + "rows": [ + [ + "agent_task_resend_wait_time_ms", + "127.0.0.1:8030", + "FE", + "long", + "true", + "50000", + "true" + ] + ] + }, + "count": 0 + } + ``` + +## 修改配置值 + +`POST /rest/v2/manager/node/set_config/fe` + +`POST /rest/v2/manager/node/set_config/be` + +### Description + +用于修改fe或be节点配置值 + +### Request body +``` +{ + "config_name":{ + "node":[ + "" + ], + "value":"", + "persist": + } +} + +config_name为对应的配置项; +node为关键字,表示要修改的节点列表; +value为配置的值; +persist为 true 表示永久修改, false 表示临时修改。永久修改重启后能生效, 临时修改重启后失效。 +``` + +### Response +`GET /rest/v2/manager/node/configuration_name` +``` +{ + "msg": "", + "code": 0, + "data": { + "failed":[ + { + "config_name":"name", + "value"="", + "node":"", + "err_info":"" + } + ] + }, + "count": 0 +} + +failed 表示修改失败的配置信息。 +``` + +### Examples + +1. 修改fe 127.0.0.1:8030 节点中 agent_task_resend_wait_time_ms 和alter_table_timeout_second 配置值: + + POST /rest/v2/manager/node/set_config/fe + body: + ``` + { + "agent_task_resend_wait_time_ms":{ + "node":[ + "127.0.0.1:8030" + ], + "value":"10000", + "persist":"true" + }, + "alter_table_timeout_second":{ + "node":[ + "127.0.0.1:8030" + ], + "value":"true", + "persist":"true" + } + } + ``` + + Response: + ``` + { + "msg": "success", + "code": 0, + "data": { + "failed": [ + { + "config_name": "alter_table_timeout_second", + "node": "10.81.85.89:8837", + "err_info": "Unsupported configuration value type.", + "value": "true" + } + ] + }, + "count": 0 + } + + agent_task_resend_wait_time_ms 配置值修改成功,alter_table_timeout_second 修改失败。 + ``` \ No newline at end of file diff --git a/docs/zh-CN/administrator-guide/http-actions/fe/manager/query-profile-action.md b/docs/zh-CN/administrator-guide/http-actions/fe/manager/query-profile-action.md new file mode 100644 index 0000000000..f1acaebbe7 --- /dev/null +++ b/docs/zh-CN/administrator-guide/http-actions/fe/manager/query-profile-action.md @@ -0,0 +1,308 @@ +--- +{ + "title": "Query Profile Action", + "language": "zh-CN" +} +--- + + + +# Query Profile Action + +## Request + +`GET /rest/v2/manager/query/query_info` + +`GET /rest/v2/manager/query/sql/{query_id}` + +`GET /rest/v2/manager/query/profile/text/{query_id}` + +`GET /rest/v2/manager/query/profile/fragments/{query_id}` + +`GET /rest/v2/manager/query/profile/graph/{query_id}` + +## 获取查询信息 + +`GET /rest/v2/manager/query/query_info` + +### Description + +可获取集群所有 fe 节点 select 查询信息。 + +### Query parameters + +* `query_id` + + 可选,指定返回查询的queryID, 默认返回所有查询的信息。 + +* `search` + + 可选,指定返回包含字符串的查询信息,目前仅进行字符串匹配。 + +* `is_all_node` + + 可选,若为 true 则返回所有fe节点的查询信息,若为 false 则返回当前fe节点的查询信息。默认为true。 + + +### Response + +``` +{ + "msg": "success", + "code": 0, + "data": { + "column_names": [ + "Query ID", + "FE节点", + "查询用户", + "执行数据库", + "Sql", + "查询类型", + "开始时间", + "结束时间", + "执行时长", + "状态" + ], + "rows": [ + [ + ... + ] + ] + }, + "count": 0 +} +``` + +### Examples +``` +GET /rest/v2/manager/query/query_info + +{ + "msg": "success", + "code": 0, + "data": { + "column_names": [ + "Query ID", + "FE节点", + "查询用户", + "执行数据库", + "Sql", + "查询类型", + "开始时间", + "结束时间", + "执行时长", + "状态" + ], + "rows": [ + [ + "d7c93d9275334c35-9e6ac5f295a7134b", + "127.0.0.1:8030", + "root", + "default_cluster:testdb", + "select c.id, c.name, p.age, p.phone, c.date, c.cost from cost c join people p on c.id = p.id where p.age > 20 order by c.id", + "Query", + "2021-07-29 16:59:12", + "2021-07-29 16:59:12", + "109ms", + "EOF" + ] + ] + }, + "count": 0 +} +``` + +## 获取指定查询的sql和文本profile + +`GET /rest/v2/manager/query/sql/{query_id}` + +`GET /rest/v2/manager/query/profile/text/{query_id}` + +### Description + +用于获取指定query id的sql和profile文本。 + +### Path parameters + +* `query_id` + + query id。 + +### Query parameters + +* `is_all_node` + + 可选,若为 true 则在所有fe节点中查询指定query id的信息,若为 false 则在当前连接的fe节点中查询指定query id的信息。默认为true。 + +### Response + +``` +{ + "msg": "success", + "code": 0, + "data": { + "sql": "" + }, + "count": 0 +} +``` + +``` +{ + "msg": "success", + "code": 0, + "data": { + "profile": "" + }, + "count": 0 +} +``` + +### Examples + +1. 获取 sql: + + ``` + GET /rest/v2/manager/query/sql/d7c93d9275334c35-9e6ac5f295a7134b + + Response: + { + "msg": "success", + "code": 0, + "data": { + "sql": "select c.id, c.name, p.age, p.phone, c.date, c.cost from cost c join people p on c.id = p.id where p.age > 20 order by c.id" + }, + "count": 0 + } + ``` + +## 获取指定查询fragment和instance信息 + +`GET /rest/v2/manager/query/profile/fragments/{query_id}` + +### Description + +用于获取指定query id的fragment名称,instance id和执行时长。 + +### Path parameters + +* `query_id` + + query id。 + +### Query parameters + +* `is_all_node` + + 可选,若为 true 则在所有fe节点中查询指定query id的信息,若为 false 则在当前连接的fe节点中查询指定query id的信息。默认为true。 + +### Response + +``` +{ + "msg": "success", + "code": 0, + "data": [ + { + "fragment_id": "", + "time": "", + "instance_id": { + "": "" + } + } + ], + "count": 0 +} +``` + +### Examples + + ``` + GET /rest/v2/manager/query/profile/fragments/d7c93d9275334c35-9e6ac5f295a7134b + + Response: + { + "msg": "success", + "code": 0, + "data": [ + { + "fragment_id": "0", + "time": "36.169ms", + "instance_id": { + "d7c93d9275334c35-9e6ac5f295a7134e": "36.169ms" + } + }, + { + "fragment_id": "1", + "time": "20.710ms", + "instance_id": { + "d7c93d9275334c35-9e6ac5f295a7134c": "20.710ms" + } + }, + { + "fragment_id": "2", + "time": "7.83ms", + "instance_id": { + "d7c93d9275334c35-9e6ac5f295a7134d": "7.83ms" + } + } + ], + "count": 0 + } + ``` + +## 获取指定query id树状profile信息 + +`GET /rest/v2/manager/query/profile/graph/{query_id}` + +### Description + +获取指定query id树状profile信息,同 `show query profile` 指令。 + +### Path parameters + +* `query_id` + + query id。 + +### Query parameters + +* `fragment_id` 和 `instance_id` + + 可选,这两个参数需同时指定或同时不指定。 + 同时不指定则返回profile 简易树形图,相当于`show query profile '/query_id'`; + 同时指定则返回指定instance详细profile树形图,相当于`show query profile '/query_id/fragment_id/instance_id'`. + +* `is_all_node` + + 可选,若为 true 则在所有fe节点中查询指定query id的信息,若为 false 则在当前连接的fe节点中查询指定query id的信息。默认为true。 + +### Response + +``` +{ + "msg": "success", + "code": 0, + "data": { + "graph":"" + }, + "count": 0 +} +``` diff --git a/docs/zh-CN/administrator-guide/http-actions/fe/set-config-action.md b/docs/zh-CN/administrator-guide/http-actions/fe/set-config-action.md index 6b00ceb4a1..ffe1caa140 100644 --- a/docs/zh-CN/administrator-guide/http-actions/fe/set-config-action.md +++ b/docs/zh-CN/administrator-guide/http-actions/fe/set-config-action.md @@ -48,6 +48,12 @@ under the License. 是否要将修改的配置持久化。默认为 false,即不持久化。如果为 true,这修改后的配置项会写入 `fe_custom.conf` 文件中,并在 FE 重启后仍会生效。 +* `reset_persist` + + 是否要清空原来的持久化配置,只在 persist 参数为 true 时生效。为了兼容原来的版本,reset_persist 默认为 true。 + 如果 persist 设为 true,不设置 reset_persist 或 reset_persist 为 true,将先清空`fe_custom.conf`文件中的配置再将本次修改的配置写入`fe_custom.conf`; + 如果 persist 设为 true,reset_persist 为 false,本次修改的配置项将会增量添加到`fe_custom.conf`。 + ## Request body 无 @@ -60,56 +66,84 @@ under the License. "code": 0, "data": { "set": { - "storage_min_left_capacity_bytes": "1024", - "qe_max_connection": "2048" + "key": "value" }, - "err": { - "replica_ack_policy": "SIMPLE_MAJORITY" - } + "err": [ + { + "config_name": "", + "config_value": "", + "err_info": "" + } + ], + "persist":"" }, "count": 0 } ``` -`set` 字段表示设置成功的配置。`err` 字段表示设置失败的配置。 +`set` 字段表示设置成功的配置。`err` 字段表示设置失败的配置。 `persist` 字段表示持久化信息。 ## Examples -1. 设置 `max_bytes_per_broker_scanner` 和 `max_broker_concurrency` 两个配置的值。 +1. 设置 `storage_min_left_capacity_bytes` 、 `replica_ack_policy` 和 `agent_task_resend_wait_time_ms` 三个配置的值。 ``` - GET /api/_set_config?max_bytes_per_broker_scanner=21474836480&max_broker_concurrency=20 + GET /api/_set_config?storage_min_left_capacity_bytes=1024&replica_ack_policy=SIMPLE_MAJORITY&agent_task_resend_wait_time_ms=true Response: { - "msg": "success", - "code": 0, - "data": { - "set": { - "max_bytes_per_broker_scanner": "21474836480", - "max_broker_concurrency": "20" - }, - "err": {} - }, - "count": 0 + "msg": "success", + "code": 0, + "data": { + "set": { + "storage_min_left_capacity_bytes": "1024" + }, + "err": [ + { + "config_name": "replica_ack_policy", + "config_value": "SIMPLE_MAJORITY", + "err_info": "Not support dynamic modification." + }, + { + "config_name": "agent_task_resend_wait_time_ms", + "config_value": "true", + "err_info": "Unsupported configuration value type." + } + ], + "persist": "" + }, + "count": 0 } + + storage_min_left_capacity_bytes 设置成功; + replica_ack_policy 设置失败,原因是该配置项不支持动态修改; + agent_task_resend_wait_time_ms 设置失败,因为该配置项类型为long, 设置boolean类型失败。 ``` 2. 设置 `max_bytes_per_broker_scanner` 并持久化 ``` - GET /api/_set_config?max_bytes_per_broker_scanner=21474836480&persist=true + GET /api/_set_config?max_bytes_per_broker_scanner=21474836480&persist=true&reset_persist=false Response: { - "msg": "success", - "code": 0, - "data": { - "set": { - "max_bytes_per_broker_scanner": "21474836480" - }, - "err": {}, - "persist": "ok" - }, - "count": 0 + "msg": "success", + "code": 0, + "data": { + "set": { + "max_bytes_per_broker_scanner": "21474836480" + }, + "err": [], + "persist": "ok" + }, + "count": 0 } + ``` + + fe/conf 目录生成fe_custom.conf: + ``` + #THIS IS AN AUTO GENERATED CONFIG FILE. + #You can modify this file manually, and the configurations in this file + #will overwrite the configurations in fe.conf + #Wed Jul 28 12:43:14 CST 2021 + max_bytes_per_broker_scanner=21474836480 ``` \ No newline at end of file diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/ConfigBase.java b/fe/fe-core/src/main/java/org/apache/doris/common/ConfigBase.java index 2c43e51e9c..b7298aa7df 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/ConfigBase.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/ConfigBase.java @@ -199,7 +199,9 @@ public class ConfigBase { f.setDouble(null, Double.parseDouble(confVal)); break; case "boolean": - f.setBoolean(null, Boolean.parseBoolean(confVal)); + if (isBoolean(confVal)) { + f.setBoolean(null, Boolean.parseBoolean(confVal)); + } break; case "String": f.set(null, confVal); @@ -235,7 +237,9 @@ public class ConfigBase { case "boolean[]": boolean[] ba = new boolean[sa.length]; for (int i = 0; i < ba.length; i++) { - ba[i] = Boolean.parseBoolean(sa[i]); + if (isBoolean(sa[i])) { + ba[i] = Boolean.parseBoolean(sa[i]); + } } f.set(null, ba); break; @@ -247,6 +251,13 @@ public class ConfigBase { } } + private static boolean isBoolean(String s) { + if (s.equalsIgnoreCase("true") || s.equalsIgnoreCase("false")) { + return true; + } + throw new IllegalArgumentException("type mismatch"); + } + public static Map getAllMutableConfigs() { Map mutableConfigs = Maps.newHashMap(); Field fields[] = ConfigBase.confClass.getFields(); @@ -357,14 +368,13 @@ public class ConfigBase { return anno.masterOnly(); } - // overwrite configs to customConfFile. // use synchronized to make sure only one thread modify this file - public synchronized static void persistConfig(Map customConf) throws IOException { + public synchronized static void persistConfig(Map customConf, boolean resetPersist) throws IOException { File file = new File(customConfFile); if (!file.exists()) { file.createNewFile(); - } else { - // clear the file content + } else if (resetPersist){ + // clear the customConfFile content try (PrintWriter writer = new PrintWriter(file)) { writer.print(""); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/profile/MultiProfileTreeBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/common/profile/MultiProfileTreeBuilder.java index 08af2b9431..c6d23e3fcb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/profile/MultiProfileTreeBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/MultiProfileTreeBuilder.java @@ -129,6 +129,10 @@ public class MultiProfileTreeBuilder { return singleBuilder.getFragmentTreeRoot(); } + public List getFragmentInstances(String executionId) throws AnalysisException{ + return getExecutionProfileTreeBuilder(executionId).getFragmentsInstances(); + } + private ProfileTreeBuilder getExecutionProfileTreeBuilder(String executionId) throws AnalysisException { ProfileTreeBuilder singleBuilder = idToSingleTreeBuilder.get(executionId); if (singleBuilder == null) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ProfileTreeBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ProfileTreeBuilder.java index eea7574b8e..01c70c5f56 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ProfileTreeBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ProfileTreeBuilder.java @@ -17,12 +17,15 @@ package org.apache.doris.common.profile; +import lombok.Getter; +import lombok.Setter; import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; import org.apache.doris.common.util.Counter; import org.apache.doris.common.util.RuntimeProfile; import org.apache.doris.thrift.TUnit; +import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -69,6 +72,8 @@ public class ProfileTreeBuilder { // the tree root of the entire query profile tree private ProfileTreeNode fragmentTreeRoot; + private List fragmentsInstances = Lists.newArrayList(); + // Match string like: // EXCHANGE_NODE (id=3):(Active: 103.899ms, % non-child: 2.27%) // Extract "EXCHANGE_NODE" and "3" @@ -112,6 +117,10 @@ public class ProfileTreeBuilder { return instanceActiveTimeMap.get(fragmentId); } + public List getFragmentsInstances() { + return fragmentsInstances; + } + public void build() throws UserException { reset(); checkProfile(); @@ -125,6 +134,7 @@ public class ProfileTreeBuilder { instanceTreeMap.clear(); instanceActiveTimeMap.clear(); fragmentTreeRoot = null; + fragmentsInstances.clear(); } private void checkProfile() throws UserException { @@ -149,13 +159,20 @@ public class ProfileTreeBuilder { // 1. Get max active time of instances in this fragment List> instanceIdAndActiveTimeList = Lists.newArrayList(); + List instances = Lists.newArrayList(); + Map instanceIdToTime = Maps.newHashMap(); long maxActiveTimeNs = 0; for (Pair pair : fragmentChildren) { Triple instanceIdAndActiveTime = getInstanceIdHostAndActiveTime(pair.first); + instanceIdToTime.put(instanceIdAndActiveTime.getLeft(), + RuntimeProfile.printCounter(instanceIdAndActiveTime.getRight(), TUnit.TIME_NS)); maxActiveTimeNs = Math.max(instanceIdAndActiveTime.getRight(), maxActiveTimeNs); instanceIdAndActiveTimeList.add(instanceIdAndActiveTime); + instances.add(instanceIdAndActiveTime.getLeft()); } instanceActiveTimeMap.put(fragmentId, instanceIdAndActiveTimeList); + fragmentsInstances.add(new FragmentInstances(fragmentId, + RuntimeProfile.printCounter(maxActiveTimeNs, TUnit.TIME_NS), instanceIdToTime)); // 2. Build tree for all fragments // All instance in a fragment are same, so use first instance to build the fragment tree @@ -342,4 +359,21 @@ public class ProfileTreeBuilder { } return new ImmutableTriple<>(m.group(1), m.group(2) + ":" + m.group(3), activeTimeNs); } + + @Getter + @Setter + public static class FragmentInstances { + @JsonProperty("fragment_id") + private String fragmentId; + @JsonProperty("time") + private String maxActiveTimeNs; + @JsonProperty("instance_id") + private Map instanceIdToTime; + + public FragmentInstances(String fragmentId, String maxActiveTimeNs, Map instanceIdToTime) { + this.fragmentId = fragmentId; + this.maxActiveTimeNs = maxActiveTimeNs; + this.instanceIdToTime = instanceIdToTime; + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java index 28dd935a27..05103f83ca 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java @@ -19,6 +19,7 @@ package org.apache.doris.common.util; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.profile.MultiProfileTreeBuilder; +import org.apache.doris.common.profile.ProfileTreeBuilder; import org.apache.doris.common.profile.ProfileTreeNode; import com.google.common.base.Strings; @@ -263,7 +264,15 @@ public class ProfileManager { // Return the tasks info of the specified load job // Columns: TaskId, ActiveTime public List> getLoadJobTaskList(String jobId) throws AnalysisException { - MultiProfileTreeBuilder builder; + MultiProfileTreeBuilder builder = getMultiProfileTreeBuilder(jobId); + return builder.getSubTaskInfo(); + } + + public List getFragmentsAndInstances(String queryId) throws AnalysisException{ + return getMultiProfileTreeBuilder(queryId).getFragmentInstances(queryId); + } + + private MultiProfileTreeBuilder getMultiProfileTreeBuilder(String jobId) throws AnalysisException{ readLock.lock(); try { ProfileElement element = queryIdToProfileMap.get(jobId); @@ -271,11 +280,9 @@ public class ProfileManager { throw new AnalysisException("failed to get task ids. err: " + (element == null ? "not found" : element.errMsg)); } - builder = element.builder; + return element.builder; } finally { readLock.unlock(); } - - return builder.getSubTaskInfo(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/http/rest/SetConfigAction.java b/fe/fe-core/src/main/java/org/apache/doris/http/rest/SetConfigAction.java index dc1c74d939..a79a2beaf0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/http/rest/SetConfigAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/http/rest/SetConfigAction.java @@ -50,6 +50,7 @@ public class SetConfigAction extends RestBaseAction { private static final Logger LOG = LogManager.getLogger(SetConfigAction.class); private static final String PERSIST_PARAM = "persist"; + private static final String RESET_PERSIST = "reset_persist"; public SetConfigAction(ActionController controller) { super(controller); @@ -65,6 +66,7 @@ public class SetConfigAction extends RestBaseAction { checkGlobalAuth(ConnectContext.get().getCurrentUserIdentity(), PrivPredicate.ADMIN); boolean needPersist = false; + boolean resetPersist = true; Map> configs = request.getAllParameters(); if (configs.containsKey(PERSIST_PARAM)) { List val = configs.remove(PERSIST_PARAM); @@ -72,6 +74,12 @@ public class SetConfigAction extends RestBaseAction { needPersist = true; } } + if (configs.containsKey(RESET_PERSIST)) { + List val = configs.remove(RESET_PERSIST); + if (val.size() == 1 && val.get(0).equals("false")) { + resetPersist = false; + } + } Map setConfigs = Maps.newHashMap(); Map errConfigs = Maps.newHashMap(); @@ -114,7 +122,7 @@ public class SetConfigAction extends RestBaseAction { String persistMsg = ""; if (needPersist) { try { - ConfigBase.persistConfig(setConfigs); + ConfigBase.persistConfig(setConfigs, resetPersist); persistMsg = "ok"; } catch (IOException e) { LOG.warn("failed to persist config", e); diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/SetConfigAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/SetConfigAction.java index 7dc591483f..7124c338e3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/SetConfigAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/SetConfigAction.java @@ -17,6 +17,9 @@ package org.apache.doris.httpv2.rest; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.Setter; import org.apache.doris.catalog.Catalog; import org.apache.doris.common.ConfigBase; import org.apache.doris.common.ConfigBase.ConfField; @@ -24,7 +27,10 @@ import org.apache.doris.httpv2.entity.ResponseEntityBuilder; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; +import com.clearspring.analytics.util.Lists; +import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.Maps; +import com.google.gson.annotations.SerializedName; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -34,7 +40,9 @@ import org.springframework.web.bind.annotation.RestController; import java.io.IOException; import java.lang.reflect.Field; +import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; @@ -49,6 +57,7 @@ public class SetConfigAction extends RestBaseController { private static final Logger LOG = LogManager.getLogger(SetConfigAction.class); private static final String PERSIST_PARAM = "persist"; + private static final String RESET_PERSIST = "reset_persist"; @RequestMapping(path = "/api/_set_config", method = RequestMethod.GET) protected Object set_config(HttpServletRequest request, HttpServletResponse response) { @@ -56,16 +65,26 @@ public class SetConfigAction extends RestBaseController { checkGlobalAuth(ConnectContext.get().getCurrentUserIdentity(), PrivPredicate.ADMIN); boolean needPersist = false; - Map configs = request.getParameterMap(); + boolean resetPersist = true; + + Map tempConfigs = request.getParameterMap(); + Map configs = Maps.newHashMap(); + configs.putAll(tempConfigs); if (configs.containsKey(PERSIST_PARAM)) { String[] val = configs.remove(PERSIST_PARAM); if (val.length == 1 && val[0].equals("true")) { needPersist = true; } } + if (configs.containsKey(RESET_PERSIST)) { + String[] val = configs.remove(RESET_PERSIST); + if (val.length == 1 && val[0].equals("false")) { + resetPersist = false; + } + } Map setConfigs = Maps.newHashMap(); - Map errConfigs = Maps.newHashMap(); + List errConfigs = Lists.newArrayList(); LOG.debug("get config from url: {}, need persist: {}", configs, needPersist); @@ -73,29 +92,41 @@ public class SetConfigAction extends RestBaseController { for (Field f : fields) { // ensure that field has "@ConfField" annotation ConfField anno = f.getAnnotation(ConfField.class); - if (anno == null || !anno.mutable()) { - continue; - } - if (anno.masterOnly() && !Catalog.getCurrentCatalog().isMaster()) { + if (anno == null) { continue; } // ensure that field has property string String confKey = anno.value().equals("") ? f.getName() : anno.value(); String[] confVals = configs.get(confKey); - if (confVals == null || confVals.length == 0) { + if (confVals == null) { continue; } - if (confVals.length > 1) { + if (confVals.length != 1) { + errConfigs.add(new ErrConfig(confKey, "", "No or multiple configuration values.")); + continue; + } + + if (!anno.mutable()) { + errConfigs.add(new ErrConfig(confKey, confVals[0], "Not support dynamic modification.")); + continue; + } + + if (anno.masterOnly() && !Catalog.getCurrentCatalog().isMaster()) { + errConfigs.add(new ErrConfig(confKey, confVals[0], "Not support modification on non-master")); continue; } try { ConfigBase.setConfigField(f, confVals[0]); + } catch (IllegalArgumentException e){ + errConfigs.add(new ErrConfig(confKey, confVals[0], "Unsupported configuration value type.")); + continue; } catch (Exception e) { LOG.warn("failed to set config {}:{}, {}", confKey, confVals[0], e.getMessage()); + errConfigs.add(new ErrConfig(confKey, confVals[0], e.getMessage())); continue; } @@ -105,7 +136,7 @@ public class SetConfigAction extends RestBaseController { String persistMsg = ""; if (needPersist) { try { - ConfigBase.persistConfig(setConfigs); + ConfigBase.persistConfig(setConfigs, resetPersist); persistMsg = "ok"; } catch (IOException e) { LOG.warn("failed to persist config", e); @@ -113,19 +144,56 @@ public class SetConfigAction extends RestBaseController { } } + List errConfigNames = errConfigs.stream().map(ErrConfig::getConfigName).collect(Collectors.toList()); for (String key : configs.keySet()) { - if (!setConfigs.containsKey(key)) { + if (!setConfigs.containsKey(key) && !errConfigNames.contains(key)) { String[] confVals = configs.get(key); String confVal = confVals.length == 1 ? confVals[0] : "invalid value"; - errConfigs.put(key, confVal); + errConfigs.add(new ErrConfig(key, confVal, "invalid config")); } } - Map resultMap = Maps.newHashMap(); - resultMap.put("set", setConfigs); - resultMap.put("err", errConfigs); - resultMap.put("persist", persistMsg); + return ResponseEntityBuilder.ok(new SetConfigEntity(setConfigs, errConfigs, persistMsg)); + } - return ResponseEntityBuilder.ok(resultMap); + @Setter + @AllArgsConstructor + public static class ErrConfig{ + @SerializedName(value = "config_name") + @JsonProperty("config_name") + private String configName; + @SerializedName(value = "config_value") + @JsonProperty("config_value") + private String configValue; + @SerializedName(value = "err_info") + @JsonProperty("err_info") + private String errInfo; + + public String getConfigName() { + return configName; + } + + public String getConfigValue() { + return configValue; + } + + public String getErrInfo() { + return errInfo; + } + } + + @Getter + @Setter + @AllArgsConstructor + public static class SetConfigEntity{ + @SerializedName(value = "set") + @JsonProperty("set") + Map setConfigs; + @SerializedName(value = "err") + @JsonProperty("err") + List errConfigs; + @SerializedName(value = "persist") + @JsonProperty("persist") + String persistMsg; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/ClusterAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/ClusterAction.java new file mode 100644 index 0000000000..9b4af46de9 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/ClusterAction.java @@ -0,0 +1,72 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.httpv2.rest.manager; + +import org.apache.doris.catalog.Catalog; +import org.apache.doris.common.Config; +import org.apache.doris.httpv2.entity.ResponseEntityBuilder; +import org.apache.doris.httpv2.rest.RestBaseController; +import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.system.Frontend; + +import com.google.common.collect.Maps; + +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.bind.annotation.RestController; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +/* + * Used to return the cluster information for the manager. + */ +@RestController +@RequestMapping("/rest/v2/manager/cluster") +public class ClusterAction extends RestBaseController { + + // Returns mysql and http connection information for the cluster. + // { + // "mysql":[ + // "" + // ], + // "http":[ + // "" + // ] + // } + @RequestMapping(path = "/cluster_info/conn_info", method = RequestMethod.GET) + public Object clusterInfo(HttpServletRequest request, HttpServletResponse response) { + executeCheckPassword(request, response); + checkGlobalAuth(ConnectContext.get().getCurrentUserIdentity(), PrivPredicate.ADMIN); + + Map> result = Maps.newHashMap(); + List frontends = Catalog.getCurrentCatalog().getFrontends(null) + .stream().filter(Frontend::isAlive) + .map(Frontend::getHost) + .collect(Collectors.toList()); + + result.put("mysql", frontends.stream().map(ip -> ip + ":" + Config.query_port).collect(Collectors.toList())); + result.put("http", frontends.stream().map(ip -> ip + ":" + Config.http_port).collect(Collectors.toList())); + return ResponseEntityBuilder.ok(result); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/HttpUtils.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/HttpUtils.java new file mode 100644 index 0000000000..90e2c2289c --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/HttpUtils.java @@ -0,0 +1,120 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.httpv2.rest.manager; + +import org.apache.doris.catalog.Catalog; +import org.apache.doris.common.Config; +import org.apache.doris.common.Pair; +import org.apache.doris.httpv2.entity.ResponseBody; +import org.apache.doris.persist.gson.GsonUtils; +import org.apache.doris.system.Frontend; + +import com.google.gson.reflect.TypeToken; + +import org.apache.http.client.config.RequestConfig; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.methods.HttpRequestBase; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.util.EntityUtils; +import org.apache.parquet.Strings; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +/* + * used to forward http requests from manager to be. + */ +public class HttpUtils { + static final int REQUEST_SUCCESS_CODE = 0; + + static List> getFeList() { + return Catalog.getCurrentCatalog().getFrontends(null) + .stream().filter(Frontend::isAlive).map(fe -> new Pair<>(fe.getHost(), Config.http_port)) + .collect(Collectors.toList()); + } + + static String concatUrl(Pair ipPort, String path, Map arguments) { + StringBuilder url = new StringBuilder("http://") + .append(ipPort.first).append(":").append(ipPort.second).append(path); + boolean isFirst = true; + for (Map.Entry entry : arguments.entrySet()) { + if (!Strings.isNullOrEmpty(entry.getValue())) { + if (isFirst) { + url.append("?"); + } else { + url.append("&"); + } + isFirst = false; + url.append(entry.getKey()).append("=").append(entry.getValue()); + } + } + return url.toString(); + } + + static String doGet(String url, Map headers) throws IOException { + HttpGet httpGet = new HttpGet(url); + setRequestConfig(httpGet, headers); + return executeRequest(httpGet); + } + + static String doPost(String url, Map headers, Object body) throws IOException { + HttpPost httpPost = new HttpPost(url); + if (Objects.nonNull(body)) { + String jsonString = GsonUtils.GSON.toJson(body); + StringEntity stringEntity = new StringEntity(jsonString, "UTF-8"); + httpPost.setEntity(stringEntity); + } + + setRequestConfig(httpPost, headers); + return executeRequest(httpPost); + } + + private static void setRequestConfig(HttpRequestBase request, Map headers) { + if (null != headers) { + for (String key : headers.keySet()) { + request.setHeader(key, headers.get(key)); + } + } + + RequestConfig config = RequestConfig.custom() + .setConnectTimeout(2000) + .setConnectionRequestTimeout(2000) + .setSocketTimeout(2000) + .build(); + request.setConfig(config); + } + + private static String executeRequest(HttpRequestBase request) throws IOException { + CloseableHttpClient client = HttpClientBuilder.create().build(); + return client.execute(request, httpResponse -> EntityUtils.toString(httpResponse.getEntity())); + } + + static String parseResponse(String response) { + ResponseBody responseEntity = GsonUtils.GSON.fromJson(response, new TypeToken() {}.getType()); + if (responseEntity.getCode() != REQUEST_SUCCESS_CODE) { + throw new RuntimeException(responseEntity.getMsg()); + } + return GsonUtils.GSON.toJson(responseEntity.getData()); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/NodeAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/NodeAction.java new file mode 100644 index 0000000000..123cd8d032 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/NodeAction.java @@ -0,0 +1,835 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.httpv2.rest.manager; + +import lombok.Getter; +import lombok.Setter; +import org.apache.doris.catalog.Catalog; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Config; +import org.apache.doris.common.ConfigBase; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.MarkedCountDownLatch; +import org.apache.doris.common.Pair; +import org.apache.doris.common.ThreadPoolManager; +import org.apache.doris.common.proc.ProcResult; +import org.apache.doris.common.proc.ProcService; +import org.apache.doris.httpv2.entity.ResponseEntityBuilder; +import org.apache.doris.httpv2.rest.RestBaseController; +import org.apache.doris.httpv2.rest.SetConfigAction; +import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.persist.gson.GsonUtils; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.system.Backend; +import org.apache.doris.system.Frontend; +import org.apache.doris.system.SystemInfoService; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import com.google.gson.reflect.TypeToken; + +import org.apache.commons.httpclient.HttpException; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +/* + * Used to return all node information, configuration information and modify node config. + */ +@RestController +@RequestMapping("/rest/v2/manager/node") +public class NodeAction extends RestBaseController { + private static final Logger LOG = LogManager.getLogger(NodeAction.class); + private static final Pattern PATTERN = Pattern.compile(":"); + + public static final String AUTHORIZATION = "Authorization"; + private static final int HTTP_WAIT_TIME_SECONDS = 2; + + public static final String CONFIG = "配置项"; + public static final String NODE_IP_PORT = "节点"; + public static final String NODE_TYPE = "节点类型"; + public static final String CONFIG_TYPE = "配置值类型"; + public static final String MASTER_ONLY = "MasterOnly"; + public static final String CONFIG_VALUE = "配置值"; + public static final String IS_MUTABLE = "可修改"; + + public static final ImmutableList FE_CONFIG_TITLE_NAMES = new ImmutableList.Builder() + .add(CONFIG).add(NODE_IP_PORT).add(NODE_TYPE).add(CONFIG_TYPE) + .add(MASTER_ONLY).add(CONFIG_VALUE).add(IS_MUTABLE) + .build(); + + public static final ImmutableList BE_CONFIG_TITLE_NAMES = new ImmutableList.Builder() + .add(CONFIG).add(NODE_IP_PORT).add(NODE_TYPE).add(CONFIG_TYPE) + .add(CONFIG_VALUE).add(IS_MUTABLE) + .build(); + + private Object httpExecutorLock = new Object(); + private volatile static ExecutorService httpExecutor = null; + + // Returns all fe information, similar to 'show frontends'. + @RequestMapping(path = "/frontends", method = RequestMethod.GET) + public Object frontends_info(HttpServletRequest request, HttpServletResponse response) throws AnalysisException { + executeCheckPassword(request, response); + checkGlobalAuth(ConnectContext.get().getCurrentUserIdentity(), PrivPredicate.ADMIN); + + return fetchNodeInfo("/frontends"); + } + + // Returns all be information, similar to 'show backends'. + @RequestMapping(path = "/backends", method = RequestMethod.GET) + public Object backends_info(HttpServletRequest request, HttpServletResponse response) throws AnalysisException { + executeCheckPassword(request, response); + checkGlobalAuth(ConnectContext.get().getCurrentUserIdentity(), PrivPredicate.ADMIN); + + return fetchNodeInfo("/backends"); + } + + // Returns all broker information, similar to 'show broker'. + @RequestMapping(path = "/brokers", method = RequestMethod.GET) + public Object brokers_info(HttpServletRequest request, HttpServletResponse response) throws AnalysisException { + executeCheckPassword(request, response); + checkGlobalAuth(ConnectContext.get().getCurrentUserIdentity(), PrivPredicate.ADMIN); + + return fetchNodeInfo("/brokers"); + } + + // { + // "column_names": [ + // "" + // ], + // "rows": [ + // [ + // "" + // ] + // ] + // } + private Object fetchNodeInfo(String procPath) throws AnalysisException { + try { + ProcResult procResult = ProcService.getInstance().open(procPath).fetchResult(); + List columnNames = Lists.newArrayList(procResult.getColumnNames()); + return ResponseEntityBuilder.ok(new NodeInfo(columnNames, procResult.getRows())); + } catch (Exception e) { + LOG.warn(e); + throw e; + } + } + + @Getter + @Setter + public static class NodeInfo { + public List column_names; + public List> rows; + + public NodeInfo(List column_names, List> rows) { + this.column_names = column_names; + this.rows = rows; + } + } + + // Return fe and be all configuration names. + // { + // "frontend": [ + // "" + // ], + // "backend": [ + // "" + // ] + // } + @RequestMapping(path = "/configuration_name", method = RequestMethod.GET) + public Object configurationName(HttpServletRequest request, HttpServletResponse response) { + executeCheckPassword(request, response); + checkGlobalAuth(ConnectContext.get().getCurrentUserIdentity(), PrivPredicate.ADMIN); + + Map> result = Maps.newHashMap(); + try { + result.put("frontend", Lists.newArrayList(Config.dump().keySet())); + + List beConfigNames = Lists.newArrayList(); + List beIds = Catalog.getCurrentSystemInfo().getBackendIds(true); + if (!beIds.isEmpty()) { + Backend be = Catalog.getCurrentSystemInfo().getBackend(beIds.get(0)); + String url = "http://" + be.getHost() + ":" + be.getHttpPort() + "/api/show_config"; + String questResult = HttpUtils.doGet(url, null); + List> configs = GsonUtils.GSON.fromJson(questResult, + new TypeToken>>() { + }.getType()); + for (List config : configs) { + beConfigNames.add(config.get(0)); + } + } + result.put("backend", beConfigNames); + } catch (Exception e) { + LOG.warn(e); + return ResponseEntityBuilder.internalError(e.getMessage()); + } + return ResponseEntityBuilder.ok(result); + } + + // Return all living fe and be nodes. + // { + // "frontend": [ + // "host:httpPort" + // ], + // "backend": [ + // "host:httpPort"" + // ] + // } + @RequestMapping(path = "/node_list", method = RequestMethod.GET) + public Object nodeList(HttpServletRequest request, HttpServletResponse response) { + executeCheckPassword(request, response); + checkGlobalAuth(ConnectContext.get().getCurrentUserIdentity(), PrivPredicate.ADMIN); + + Map> result = Maps.newHashMap(); + result.put("frontend", getFeList()); + result.put("backend", getBeList()); + return ResponseEntityBuilder.ok(result); + } + + private static List getFeList() { + return Catalog.getCurrentCatalog().getFrontends(null) + .stream().filter(Frontend::isAlive) + .map(fe -> fe.getHost() + ":" + Config.http_port) + .collect(Collectors.toList()); + } + + private static List getBeList() { + return Catalog.getCurrentSystemInfo().getBackendIds(true) + .stream().map(beId -> { + Backend be = Catalog.getCurrentSystemInfo().getBackend(beId); + return be.getHost() + ":" + be.getHttpPort(); + }) + .collect(Collectors.toList()); + } + + /* + * this http interface is used to return configuration information requested by other fe. + */ + @RequestMapping(path = "/config", method = RequestMethod.GET) + public Object config(HttpServletRequest request, HttpServletResponse response) { + executeCheckPassword(request, response); + checkGlobalAuth(ConnectContext.get().getCurrentUserIdentity(), PrivPredicate.ADMIN); + + List> results = Lists.newArrayList(); + try { + List> configs = ConfigBase.getConfigInfo(null); + // Sort all configs by config key. + Collections.sort(configs, Comparator.comparing(o -> o.get(0))); + + // reorder the fields + for (List config : configs) { + List list = Lists.newArrayList(); + list.add(config.get(0)); + list.add(config.get(2)); + list.add(config.get(4)); + list.add(config.get(1)); + list.add(config.get(3)); + results.add(list); + } + } catch (DdlException e) { + LOG.warn(e); + return ResponseEntityBuilder.internalError(e.getMessage()); + } + return results; + } + + // Return the configuration information of fe or be. + // + // for fe: + // { + // "column_names": [ + // "配置项", + // "节点", + // "节点类型", + // "配置类型", + // "仅master", + // "配置值", + // "可修改" + // ], + // "rows": [ + // [ + // "" + // ] + // ] + // } + // + // for be: + // { + // "column_names": [ + // "配置项", + // "节点", + // "节点类型", + // "配置类型", + // "配置值", + // "可修改" + // ], + // "rows": [ + // [ + // "" + // ] + // ] + // } + @RequestMapping(path = "/configuration_info", method = RequestMethod.POST) + public Object configurationInfo(HttpServletRequest request, HttpServletResponse response, + @RequestParam(value = "type") String type, + @RequestBody(required = false) ConfigInfoRequestBody requestBody) { + executeCheckPassword(request, response); + checkGlobalAuth(ConnectContext.get().getCurrentUserIdentity(), PrivPredicate.ADMIN); + + initHttpExecutor(); + + if (requestBody == null) { + requestBody = new ConfigInfoRequestBody(); + } + List> hostPorts; + if (type.equalsIgnoreCase("fe")) { + if (requestBody.getNodes() != null && !requestBody.getNodes().isEmpty()) { + hostPorts = parseHostPort(requestBody.getNodes()); + } else { + hostPorts = parseHostPort(getFeList()); + } + + List> errNodes = Lists.newArrayList(); + List> data = handleConfigurationInfo(hostPorts, request.getHeader(AUTHORIZATION), + "/rest/v2/manager/node/config", "FE", requestBody.getConfNames(), errNodes); + if (!errNodes.isEmpty()) { + LOG.warn("Failed to get fe node configuration information from:{}", errNodes.toString()); + } + return ResponseEntityBuilder.ok(new NodeInfo(FE_CONFIG_TITLE_NAMES, data)); + } else if (type.equalsIgnoreCase("be")) { + if (requestBody.getNodes() != null && !requestBody.getNodes().isEmpty()) { + hostPorts = parseHostPort(requestBody.getNodes()); + } else { + hostPorts = parseHostPort(getBeList()); + } + + List> errNodes = Lists.newArrayList(); + List> data = handleConfigurationInfo(hostPorts, request.getHeader(AUTHORIZATION), + "/api/show_config", "BE", requestBody.getConfNames(), errNodes); + if (!errNodes.isEmpty()) { + LOG.warn("Failed to get be node configuration information from:{}", errNodes.toString()); + } + return ResponseEntityBuilder.ok(new NodeInfo(BE_CONFIG_TITLE_NAMES, data)); + } + return ResponseEntityBuilder.badRequest("Unsupported type: " + type + ". Only types of fe or be are " + + "supported"); + } + + // Use thread pool to concurrently fetch configuration information from specified fe or be nodes. + private List> handleConfigurationInfo(List> hostPorts, + String authorization, String questPath, + String nodeType, List confNames, + List> errNodes) { + // The configuration information returned by each node is a List> type, + // configInfoTotal is used to store the configuration information of all nodes. + List>> configInfoTotal = Lists.newArrayList(); + MarkedCountDownLatch configRequestDoneSignal = new MarkedCountDownLatch<>(hostPorts.size()); + for (int i = 0; i < hostPorts.size(); ++i) { + configInfoTotal.add(Lists.newArrayList()); + + Pair hostPort = hostPorts.get(i); + configRequestDoneSignal.addMark(hostPort.first + ":" + hostPort.second, -1); + String url = "http://" + hostPort.first + ":" + hostPort.second + questPath; + httpExecutor.submit(new HttpConfigInfoTask(url, hostPort, authorization, nodeType, confNames, + configRequestDoneSignal, configInfoTotal.get(i))); + } + List> resultConfigs = Lists.newArrayList(); + try { + configRequestDoneSignal.await(HTTP_WAIT_TIME_SECONDS, TimeUnit.SECONDS); + for (List> lists : configInfoTotal) { + resultConfigs.addAll(lists); + } + } catch (InterruptedException e) { + errNodes.addAll(configRequestDoneSignal.getLeftMarks()); + } + + return resultConfigs; + } + + private void initHttpExecutor() { + if (httpExecutor == null) { + synchronized (httpExecutorLock) { + if (httpExecutor == null) { + httpExecutor = ThreadPoolManager.newDaemonFixedThreadPool(5, 100, + "node-config-update-pool", true); + } + } + } + } + + static List> parseHostPort(List nodes) { + List> hostPorts = Lists.newArrayList(); + for (String node : nodes) { + try { + Pair ipPort = SystemInfoService.validateHostAndPort(node); + hostPorts.add(ipPort); + } catch (Exception e) { + LOG.warn(e); + } + } + return hostPorts; + } + + private class HttpConfigInfoTask implements Runnable { + private String url; + private Pair hostPort; + private String authorization; + private String nodeType; + private List confNames; + private MarkedCountDownLatch configRequestDoneSignal; + private List> config; + + public HttpConfigInfoTask(String url, Pair hostPort, String authorization, String nodeType, + List confNames, + MarkedCountDownLatch configRequestDoneSignal, + List> config) { + this.url = url; + this.hostPort = hostPort; + this.authorization = authorization; + this.nodeType = nodeType; + this.confNames = confNames; + this.configRequestDoneSignal = configRequestDoneSignal; + this.config = config; + } + + @Override + public void run() { + String configInfo; + try { + configInfo = HttpUtils.doGet(url, ImmutableMap.builder().put(AUTHORIZATION, + authorization).build()); + List> configs = GsonUtils.GSON.fromJson(configInfo, + new TypeToken>>() { + }.getType()); + for (List conf : configs) { + if (confNames == null || confNames.isEmpty() || confNames.contains(conf.get(0))) { + addConfig(conf); + } + } + configRequestDoneSignal.markedCountDown(hostPort.first + ":" + hostPort.second, -1); + } catch (Exception e) { + LOG.warn("get config from {}:{} failed.", hostPort.first, hostPort.second, e); + configRequestDoneSignal.countDown(); + } + } + + private void addConfig(List conf) { + conf.add(1, hostPort.first + ":" + hostPort.second); + conf.add(2, nodeType); + config.add(conf); + } + } + + // Modify fe configuration. + // + // request body: + //{ + // "config_name":{ + // "node":[ + // "" + // ], + // "value":"", + // "persist":"" + // } + //} + // + // return data: + // { + // "failed":[ + // { + // "config_name":"", + // "value"="", + // "node":"", + // "err_info":"" + // } + // ] + // } + @RequestMapping(path = "/set_config/fe", method = RequestMethod.POST) + public Object setConfigFe(HttpServletRequest request, HttpServletResponse response, + @RequestBody Map requestBody) { + executeCheckPassword(request, response); + checkGlobalAuth(ConnectContext.get().getCurrentUserIdentity(), PrivPredicate.ADMIN); + + List> failedTotal = Lists.newArrayList(); + List nodeConfigList = parseSetConfigNodes(requestBody, failedTotal); + List> aliveFe = Catalog.getCurrentCatalog().getFrontends(null) + .stream().filter(Frontend::isAlive).map(fe -> new Pair<>(fe.getHost(), Config.http_port)) + .collect(Collectors.toList()); + checkNodeIsAlive(nodeConfigList, aliveFe, failedTotal); + + Map header = Maps.newHashMap(); + header.put(AUTHORIZATION, request.getHeader(AUTHORIZATION)); + + for (NodeConfigs nodeConfigs : nodeConfigList) { + if (!nodeConfigs.getConfigs(true).isEmpty()) { + String url = concatFeSetConfigUrl(nodeConfigs, true); + try { + String responsePersist = HttpUtils.doGet(url, header); + parseFeSetConfigResponse(responsePersist, nodeConfigs.getHostPort(), failedTotal); + } catch (Exception e) { + addSetConfigErrNode(nodeConfigs.getConfigs(true), nodeConfigs.getHostPort(), + e.getMessage(), failedTotal); + } + } + if (!nodeConfigs.getConfigs(false).isEmpty()) { + String url = concatFeSetConfigUrl(nodeConfigs, false); + try { + String responseTemp = HttpUtils.doGet(url, header); + parseFeSetConfigResponse(responseTemp, nodeConfigs.getHostPort(), failedTotal); + } catch (Exception e) { + addSetConfigErrNode(nodeConfigs.getConfigs(false), nodeConfigs.getHostPort(), + e.getMessage(), failedTotal); + } + } + + } + Map>> data = Maps.newHashMap(); + data.put("failed", failedTotal); + return ResponseEntityBuilder.ok(data); + } + + private void addSetConfigErrNode(Map configs, Pair hostPort, String err, + List> failedTotal) { + for (Map.Entry entry : configs.entrySet()) { + Map failed = Maps.newHashMap(); + addFailedConfig(entry.getKey(), entry.getValue(), hostPort.first + ":" + + hostPort.second, err, failed); + failedTotal.add(failed); + } + } + + private void parseFeSetConfigResponse(String response, Pair hostPort, + List> failedTotal) throws HttpException { + JsonObject jsonObject = JsonParser.parseString(response).getAsJsonObject(); + if (jsonObject.get("code").getAsInt() != HttpUtils.REQUEST_SUCCESS_CODE) { + throw new HttpException(jsonObject.get("msg").getAsString()); + } + SetConfigAction.SetConfigEntity setConfigEntity = GsonUtils.GSON.fromJson(jsonObject.get("data").getAsJsonObject(), + SetConfigAction.SetConfigEntity.class); + for (SetConfigAction.ErrConfig errConfig : setConfigEntity.getErrConfigs()) { + Map failed = Maps.newHashMap(); + addFailedConfig(errConfig.getConfigName(), errConfig.getConfigValue(), + hostPort.first + ":" + hostPort.second, errConfig.getErrInfo(), failed); + failedTotal.add(failed); + } + } + + private static void addFailedConfig(String configName, String value, String node, String errInfo, + Map failed) { + failed.put("config_name", configName); + failed.put("value", value); + failed.put("node", node); + failed.put("err_info", errInfo); + } + + private String concatFeSetConfigUrl(NodeConfigs nodeConfigs, boolean isPersist) { + StringBuffer sb = new StringBuffer(); + Pair hostPort = nodeConfigs.getHostPort(); + sb.append("http://").append(hostPort.first).append(":").append(hostPort.second).append("/api/_set_config"); + Map configs = nodeConfigs.getConfigs(isPersist); + boolean addAnd = false; + for (Map.Entry entry : configs.entrySet()) { + if (addAnd) { + sb.append("&"); + } else { + sb.append("?"); + addAnd = true; + } + sb.append(entry.getKey()).append("=").append(entry.getValue()); + } + if (isPersist) { + sb.append("&persist=true&reset_persist=false"); + } + return sb.toString(); + } + + // Modify fe configuration. + // The request body and return data are in the same format as fe + @RequestMapping(path = "/set_config/be", method = RequestMethod.POST) + public Object setConfigBe(HttpServletRequest request, HttpServletResponse response, + @RequestBody Map requestBody) { + executeCheckPassword(request, response); + checkGlobalAuth(ConnectContext.get().getCurrentUserIdentity(), PrivPredicate.ADMIN); + + List> failedTotal = Lists.newArrayList(); + List nodeConfigList = parseSetConfigNodes(requestBody, failedTotal); + List> aliveBe = Catalog.getCurrentSystemInfo().getBackendIds(true) + .stream().map(beId -> { + Backend be = Catalog.getCurrentSystemInfo().getBackend(beId); + return new Pair<>(be.getHost(), be.getHttpPort()); + }) + .collect(Collectors.toList()); + checkNodeIsAlive(nodeConfigList, aliveBe, failedTotal); + + handleBeSetConfig(nodeConfigList, request.getHeader(AUTHORIZATION), failedTotal); + failedTotal = failedTotal.stream().filter(e -> !e.isEmpty()).collect(Collectors.toList()); + + Map>> data = Maps.newHashMap(); + data.put("failed", failedTotal); + return ResponseEntityBuilder.ok(data); + } + + // Parsing request body into List + private List parseSetConfigNodes(Map requestBody, + List> errNodes) { + List nodeConfigsList = Lists.newArrayList(); + for (String configName : requestBody.keySet()) { + SetConfigRequestBody configPara = requestBody.get(configName); + String value = configPara.getValue(); + boolean persist = configPara.isPersist(); + if (value == null || configPara.getNodes() == null) { + continue; + } + + for (String node : configPara.getNodes()) { + Pair ipPort; + try { + ipPort = SystemInfoService.validateHostAndPort(node); + } catch (Exception e) { + Map failed = Maps.newHashMap(); + addFailedConfig(configName, configPara.getValue(), node, "node invalid", failed); + errNodes.add(failed); + continue; + } + boolean find = false; + for (NodeConfigs nodeConfigs : nodeConfigsList) { + Pair hostPort = nodeConfigs.getHostPort(); + if (ipPort.first.equals(hostPort.first) && ipPort.second.equals(hostPort.second)) { + find = true; + nodeConfigs.addConfig(configName, value, persist); + } + } + if (!find) { + NodeConfigs newNodeConfigs = new NodeConfigs(ipPort.first, ipPort.second); + nodeConfigsList.add(newNodeConfigs); + newNodeConfigs.addConfig(configName, value, persist); + } + } + } + return nodeConfigsList; + } + + private void checkNodeIsAlive(List nodeConfigsList, List> aliveNodes, + List> failedNodes) { + Iterator it = nodeConfigsList.iterator(); + while (it.hasNext()) { + NodeConfigs node = it.next(); + boolean isExist = false; + for (Pair aliveHostPort : aliveNodes) { + if (aliveHostPort.first.equals(node.getHostPort().first) + && aliveHostPort.second.equals(node.getHostPort().second)) { + isExist = true; + break; + } + } + if (!isExist) { + addSetConfigErrNode(node.getConfigs(true), node.getHostPort(), + "Node does not exist or is not alive", failedNodes); + addSetConfigErrNode(node.getConfigs(false), node.getHostPort(), + "Node does not exist or is not alive", failedNodes); + it.remove(); + } + } + } + + private List> handleBeSetConfig(List nodeConfigList, String authorization, + List> failedTotal) { + initHttpExecutor(); + + int configNum = nodeConfigList.stream() + .mapToInt(e -> e.getConfigs(true).size() + e.getConfigs(false).size()).sum(); + MarkedCountDownLatch beSetConfigCountDownSignal = new MarkedCountDownLatch<>(configNum); + for (NodeConfigs nodeConfigs : nodeConfigList) { + submitBeSetConfigTask(nodeConfigs, true, authorization, beSetConfigCountDownSignal, failedTotal); + submitBeSetConfigTask(nodeConfigs, false, authorization, beSetConfigCountDownSignal, failedTotal); + } + try { + beSetConfigCountDownSignal.await(HTTP_WAIT_TIME_SECONDS, TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOG.warn("set be config exception:", e); + } finally { + List> leftNode = beSetConfigCountDownSignal.getLeftMarks(); + for (Map.Entry failedNode : leftNode) { + Map failed = parseNodeConfig(failedNode.getKey()); + if (!failed.isEmpty()) { + failed.put("err_info", "Connection timeout"); + failedTotal.add(failed); + } + } + } + return failedTotal; + } + + private void submitBeSetConfigTask(NodeConfigs nodeConfigs, boolean isPersist, String authorization, + MarkedCountDownLatch beSetConfigCountDownSignal, + List> failedTotal) { + if (!nodeConfigs.getConfigs(isPersist).isEmpty()) { + for (Map.Entry entry : nodeConfigs.getConfigs(isPersist).entrySet()) { + failedTotal.add(Maps.newHashMap()); + Pair hostPort = nodeConfigs.getHostPort(); + beSetConfigCountDownSignal.addMark(concatNodeConfig(hostPort.first, hostPort.second, + entry.getKey(), entry.getValue()), -1); + + String url = concatBeSetConfigUrl(hostPort.first, hostPort.second, entry.getKey(), + entry.getValue(), isPersist); + httpExecutor.submit(new HttpSetConfigTask(url, hostPort, authorization, entry.getKey(), + entry.getValue(), beSetConfigCountDownSignal, + failedTotal.get(failedTotal.size() - 1))); + } + } + } + + private String concatBeSetConfigUrl(String host, Integer port, String configName, + String configValue, boolean isPersist) { + StringBuilder stringBuffer = new StringBuilder(); + stringBuffer.append("http://").append(host).append(":").append(port) + .append("/api/update_config") + .append("?").append(configName).append("=").append(configValue); + if (isPersist) { + stringBuffer.append("&persist=true"); + } + return stringBuffer.toString(); + } + + private String concatNodeConfig(String host, Integer port, String configName, String configValue) { + return host + ":" + port + ":" + configName + ":" + configValue; + } + + private Map parseNodeConfig(String nodeConfig) { + Map map = Maps.newHashMap(); + String[] splitStrings = PATTERN.split(nodeConfig); + if (splitStrings.length == 4) { + addFailedConfig(splitStrings[2], splitStrings[3], splitStrings[0] + ":" + splitStrings[1], "", map); + } + return map; + } + + private class HttpSetConfigTask implements Runnable { + private String url; + private Pair hostPort; + private String authorization; + private String configName; + private String configValue; + private MarkedCountDownLatch beSetConfigDoneSignal; + private Map failed; + + public HttpSetConfigTask(String url, Pair hostPort, String authorization, String configName, + String configValue, MarkedCountDownLatch beSetConfigDoneSignal, + Map failed) { + this.url = url; + this.hostPort = hostPort; + this.authorization = authorization; + this.configName = configName; + this.configValue = configValue; + this.beSetConfigDoneSignal = beSetConfigDoneSignal; + this.failed = failed; + } + + @Override + public void run() { + try { + String response = HttpUtils.doPost(url, ImmutableMap.builder().put(AUTHORIZATION, + authorization).build(), null); + JsonObject jsonObject = JsonParser.parseString(response).getAsJsonObject(); + String status = jsonObject.get("status").getAsString(); + if (!status.equals("OK")) { + addFailedConfig(configName, configValue, hostPort.first + ":" + hostPort.second, + jsonObject.get("msg").getAsString(), failed); + } + beSetConfigDoneSignal.markedCountDown(concatNodeConfig(hostPort.first, hostPort.second, + configName, configValue), -1); + } catch (Exception e) { + LOG.warn("set be:{} config:{} failed.", hostPort.first + ":" + hostPort.second, + configName + "=" + configValue, e); + beSetConfigDoneSignal.countDown(); + } + } + } + + // Store persistent and non-persistent configuration information that needs to be modified on the node. + public static class NodeConfigs { + private Pair hostPort; + private Map persistConfigs; + private Map nonPersistConfigs; + + public NodeConfigs(String host, Integer httpPort) { + hostPort = new Pair<>(host, httpPort); + persistConfigs = Maps.newHashMap(); + nonPersistConfigs = Maps.newHashMap(); + } + + public Pair getHostPort() { + return hostPort; + } + + public void addConfig(String name, String value, boolean persist) { + if (persist) { + persistConfigs.put(name, value); + } else { + nonPersistConfigs.put(name, value); + } + } + + public Map getConfigs(boolean isPersist) { + return isPersist ? persistConfigs : nonPersistConfigs; + } + + } + + @Getter + @Setter + public static class ConfigInfoRequestBody { + @JsonProperty("conf_name") + public List confNames; + + @JsonProperty("node") + public List nodes; + } + + @Getter + @Setter + public static class SetConfigRequestBody { + @JsonProperty("node") + private List nodes; + + private String value; + + private boolean persist; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/QueryProfileAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/QueryProfileAction.java new file mode 100644 index 0000000000..28e96ddbe3 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/QueryProfileAction.java @@ -0,0 +1,339 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.httpv2.rest.manager; + +import org.apache.doris.catalog.Catalog; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Config; +import org.apache.doris.common.Pair; +import org.apache.doris.common.profile.ProfileTreeNode; +import org.apache.doris.common.profile.ProfileTreePrinter; +import org.apache.doris.common.util.ProfileManager; +import org.apache.doris.httpv2.entity.ResponseEntityBuilder; +import org.apache.doris.httpv2.rest.RestBaseController; +import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.persist.gson.GsonUtils; +import org.apache.doris.qe.ConnectContext; + +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.gson.JsonParser; +import com.google.gson.reflect.TypeToken; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +import java.io.UnsupportedEncodingException; +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +/* + * Used to return query information and query profile. + */ +@RestController +@RequestMapping("/rest/v2/manager/query") +public class QueryProfileAction extends RestBaseController { + private static final Logger LOG = LogManager.getLogger(QueryProfileAction.class); + + public static final String QUERY_ID = "Query ID"; + public static final String NODE = "FE节点"; + public static final String USER = "查询用户"; + public static final String DEFAULT_DB = "执行数据库"; + public static final String SQL_STATEMENT = "Sql"; + public static final String QUERY_TYPE = "查询类型"; + public static final String START_TIME = "开始时间"; + public static final String END_TIME = "结束时间"; + public static final String TOTAL = "执行时长"; + public static final String QUERY_STATE = "状态"; + + private static final String QUERY_ID_PARA = "query_id"; + private static final String SEARCH_PARA = "search"; + private static final String IS_ALL_NODE_PARA = "is_all_node"; + private static final String FRAGMENT_ID = "fragment_id"; + private static final String INSTANCE_ID = "instance_id"; + + public static final ImmutableList QUERY_TITLE_NAMES = new ImmutableList.Builder() + .add(QUERY_ID).add(NODE).add(USER).add(DEFAULT_DB).add(SQL_STATEMENT) + .add(QUERY_TYPE).add(START_TIME).add(END_TIME).add(TOTAL).add(QUERY_STATE) + .build(); + + private List requestAllFe(String httpPath, Map arguments, String authorization) { + List> frontends = HttpUtils.getFeList(); + ImmutableMap header = ImmutableMap.builder() + .put(NodeAction.AUTHORIZATION, authorization).build(); + List dataList = Lists.newArrayList(); + for (Pair ipPort : frontends) { + String url = HttpUtils.concatUrl(ipPort, httpPath, arguments); + try { + String data = HttpUtils.parseResponse(HttpUtils.doGet(url, header)); + if (!Strings.isNullOrEmpty(data) && !data.equals("{}")) { + dataList.add(data); + } + } catch (Exception e) { + LOG.warn("request url {} error", url, e); + } + } + return dataList; + } + + @RequestMapping(path = "/query_info", method = RequestMethod.GET) + public Object queryInfo(HttpServletRequest request, HttpServletResponse response, + @RequestParam(value = QUERY_ID_PARA, required = false) String queryId, + @RequestParam(value = SEARCH_PARA, required = false) String search, + @RequestParam(value = IS_ALL_NODE_PARA, required = false, defaultValue = "true") + boolean isAllNode) { + executeCheckPassword(request, response); + checkGlobalAuth(ConnectContext.get().getCurrentUserIdentity(), PrivPredicate.ADMIN); + + List> queries = Lists.newArrayList(); + if (isAllNode) { + // Get query information for all fe + String httpPath = "/rest/v2/manager/query/query_info"; + Map arguments = Maps.newHashMap(); + arguments.put(QUERY_ID_PARA, queryId); + if (!Strings.isNullOrEmpty(search)) { + try { + // search may contain special characters that need to be encoded. + search = URLEncoder.encode(search, StandardCharsets.UTF_8.toString()); + } catch (UnsupportedEncodingException ignored) { + // Encoding exception, ignore search parameter. + } + } + arguments.put(SEARCH_PARA, search); + arguments.put(IS_ALL_NODE_PARA, "false"); + + List dataList = requestAllFe(httpPath, arguments, request.getHeader(NodeAction.AUTHORIZATION)); + for (String data : dataList) { + try { + NodeAction.NodeInfo nodeInfo = GsonUtils.GSON.fromJson(data, + new TypeToken() { + }.getType()); + queries.addAll(nodeInfo.getRows()); + } catch (Exception e) { + LOG.warn("parse query info error: {}", data, e); + } + } + return ResponseEntityBuilder.ok(new NodeAction.NodeInfo(QUERY_TITLE_NAMES, queries)); + } + + queries = ProfileManager.getInstance().getAllQueries().stream() + .filter(profile -> profile.get(4).equals("Query")).collect(Collectors.toList()); + if (!Strings.isNullOrEmpty(queryId)) { + queries = queries.stream().filter(q -> q.get(0).equals(queryId)).collect(Collectors.toList()); + } + + // add node information + for (List query : queries) { + query.add(1, Catalog.getCurrentCatalog().getSelfNode().first + ":" + Config.http_port); + } + + if (!Strings.isNullOrEmpty(search)) { + List> tempQueries = Lists.newArrayList(); + for (List query : queries) { + for (String field : query) { + if (field.contains(search)) { + tempQueries.add(query); + break; + } + } + } + queries = tempQueries; + } + + return ResponseEntityBuilder.ok(new NodeAction.NodeInfo(QUERY_TITLE_NAMES, queries)); + } + + // Returns the sql for the specified query id. + @RequestMapping(path = "/sql/{query_id}", method = RequestMethod.GET) + public Object queryInfo(HttpServletRequest request, HttpServletResponse response, + @PathVariable("query_id") String queryId, + @RequestParam(value = IS_ALL_NODE_PARA, required = false, defaultValue = "true") + boolean isAllNode) { + executeCheckPassword(request, response); + checkGlobalAuth(ConnectContext.get().getCurrentUserIdentity(), PrivPredicate.ADMIN); + + Map querySql = Maps.newHashMap(); + if (isAllNode) { + String httpPath = "/rest/v2/manager/query/sql/" + queryId; + ImmutableMap arguments = ImmutableMap.builder() + .put(IS_ALL_NODE_PARA, "false").build(); + List dataList = requestAllFe(httpPath, arguments, request.getHeader(NodeAction.AUTHORIZATION)); + if (!dataList.isEmpty()) { + try { + String sql = JsonParser.parseString(dataList.get(0)).getAsJsonObject().get("sql").getAsString(); + querySql.put("sql", sql); + return ResponseEntityBuilder.ok(querySql); + } catch (Exception e) { + LOG.warn("parse sql error: {}", dataList.get(0), e); + } + } + } else { + List> queries = ProfileManager.getInstance().getAllQueries().stream() + .filter(query -> query.get(0).equals(queryId)).collect(Collectors.toList()); + if (!queries.isEmpty()) { + querySql.put("sql", queries.get(0).get(3)); + } + } + return ResponseEntityBuilder.ok(querySql); + } + + // Returns the text profile for the specified query id. + @RequestMapping(path = "/profile/text/{query_id}", method = RequestMethod.GET) + public Object queryProfileText(HttpServletRequest request, HttpServletResponse response, + @PathVariable("query_id") String queryId, + @RequestParam(value = IS_ALL_NODE_PARA, required = false, defaultValue = "true") + boolean isAllNode) { + executeCheckPassword(request, response); + checkGlobalAuth(ConnectContext.get().getCurrentUserIdentity(), PrivPredicate.ADMIN); + + Map profileMap = Maps.newHashMap(); + if (isAllNode) { + String httpPath = "/rest/v2/manager/query/profile/text/" + queryId; + ImmutableMap arguments = ImmutableMap.builder() + .put(IS_ALL_NODE_PARA, "false").build(); + List dataList = requestAllFe(httpPath, arguments, request.getHeader(NodeAction.AUTHORIZATION)); + if (!dataList.isEmpty()) { + try { + String profile = + JsonParser.parseString(dataList.get(0)).getAsJsonObject().get("profile").getAsString(); + profileMap.put("profile", profile); + return ResponseEntityBuilder.ok(profileMap); + } catch (Exception e) { + LOG.warn("parse profile text error: {}", dataList.get(0), e); + } + } + } else { + String profile = ProfileManager.getInstance().getProfile(queryId); + if (!Strings.isNullOrEmpty(profile)) { + profileMap.put("profile", profile); + } + } + return ResponseEntityBuilder.ok(profileMap); + } + + // Returns the fragments and instances for the specified query id. + // [ + // { + // "fragment_id":"", + // "time":"", + // "instance_id":[ + // "" + // ] + // } + // ] + @RequestMapping(path = "/profile/fragments/{query_id}", method = RequestMethod.GET) + public Object fragments(HttpServletRequest request, HttpServletResponse response, + @PathVariable("query_id") String queryId, + @RequestParam(value = IS_ALL_NODE_PARA, required = false, defaultValue = "true") + boolean isAllNode) { + executeCheckPassword(request, response); + checkGlobalAuth(ConnectContext.get().getCurrentUserIdentity(), PrivPredicate.ADMIN); + + if (isAllNode) { + String httpPath = "/rest/v2/manager/query/profile/fragments/" + queryId; + ImmutableMap arguments = ImmutableMap.builder() + .put(IS_ALL_NODE_PARA, "false").build(); + List> frontends = HttpUtils.getFeList(); + ImmutableMap header = ImmutableMap.builder() + .put(NodeAction.AUTHORIZATION, request.getHeader(NodeAction.AUTHORIZATION)).build(); + for (Pair ipPort : frontends) { + String url = HttpUtils.concatUrl(ipPort, httpPath, arguments); + try { + String responseJson = HttpUtils.doGet(url, header); + int code = JsonParser.parseString(responseJson).getAsJsonObject().get("code").getAsInt(); + if (code == HttpUtils.REQUEST_SUCCESS_CODE) { + return responseJson; + } + } catch (Exception e) { + LOG.warn(e); + } + } + } else { + try { + return ResponseEntityBuilder.ok(ProfileManager.getInstance().getFragmentsAndInstances(queryId)); + } catch (AnalysisException e) { + return ResponseEntityBuilder.badRequest(e.getMessage()); + } + } + return ResponseEntityBuilder.badRequest("not found query id"); + } + + // Returns the graph profile for the specified query id. + @RequestMapping(path = "/profile/graph/{query_id}", method = RequestMethod.GET) + public Object queryProfileGraph(HttpServletRequest request, HttpServletResponse response, + @PathVariable("query_id") String queryId, + @RequestParam(value = FRAGMENT_ID, required = false) String fragmentId, + @RequestParam(value = INSTANCE_ID, required = false) String instanceId, + @RequestParam(value = IS_ALL_NODE_PARA, required = false, defaultValue = "true") + boolean isAllNode) { + executeCheckPassword(request, response); + checkGlobalAuth(ConnectContext.get().getCurrentUserIdentity(), PrivPredicate.ADMIN); + + Map graph = Maps.newHashMap(); + List results; + + if (isAllNode) { + String httpPath = "/rest/v2/manager/query/profile/graph/" + queryId; + Map arguments = Maps.newHashMap(); + arguments.put(FRAGMENT_ID, fragmentId); + arguments.put(INSTANCE_ID, instanceId); + arguments.put(IS_ALL_NODE_PARA, "false"); + List dataList = requestAllFe(httpPath, arguments, request.getHeader(NodeAction.AUTHORIZATION)); + if (!dataList.isEmpty()) { + try { + String profileGraph = + JsonParser.parseString(dataList.get(0)).getAsJsonObject().get("graph").getAsString(); + graph.put("graph", profileGraph); + return ResponseEntityBuilder.ok(graph); + } catch (Exception e) { + LOG.warn("parse profile graph error: {}", dataList.get(0), e); + } + } + } else { + try { + if (Strings.isNullOrEmpty(fragmentId) || Strings.isNullOrEmpty(instanceId)) { + ProfileTreeNode treeRoot = ProfileManager.getInstance().getFragmentProfileTree(queryId, queryId); + results = Lists.newArrayList(ProfileTreePrinter.printFragmentTree(treeRoot)); + } else { + ProfileTreeNode treeRoot = ProfileManager.getInstance().getInstanceProfileTree(queryId, queryId, + fragmentId, instanceId); + results = Lists.newArrayList(ProfileTreePrinter.printInstanceTree(treeRoot)); + } + graph.put("graph", results.get(0)); + } catch (Exception e) { + LOG.warn("get profile graph error, queryId:{}, fragementId:{}, instanceId:{}", + queryId, fragmentId, instanceId, e); + } + } + return ResponseEntityBuilder.ok(graph); + } +} \ No newline at end of file diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 4cf5033d1c..a2af4bc754 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -148,6 +148,7 @@ public class StmtExecutor implements ProfileWriter { private RuntimeProfile plannerRuntimeProfile; private final Object writeProfileLock = new Object(); private volatile boolean isFinishedProfile = false; + private String queryType = "Query"; private volatile Coordinator coord = null; private MasterOpExecutor masterOpExecutor = null; private RedirectStatus redirectStatus = null; @@ -195,10 +196,13 @@ public class StmtExecutor implements ProfileWriter { profile.addChild(summaryProfile); summaryProfile.addInfoString(ProfileManager.QUERY_ID, DebugUtil.printId(context.queryId())); summaryProfile.addInfoString(ProfileManager.START_TIME, TimeUtils.longToTimeString(context.getStartTime())); - summaryProfile.addInfoString(ProfileManager.END_TIME, TimeUtils.longToTimeString(currentTimestamp)); + summaryProfile.addInfoString(ProfileManager.END_TIME, + waiteBeReport ? TimeUtils.longToTimeString(currentTimestamp) : "N/A"); summaryProfile.addInfoString(ProfileManager.TOTAL_TIME, DebugUtil.getPrettyStringMs(totalTimeMs)); - summaryProfile.addInfoString(ProfileManager.QUERY_TYPE, "Query"); - summaryProfile.addInfoString(ProfileManager.QUERY_STATE, context.getState().toString()); + summaryProfile.addInfoString(ProfileManager.QUERY_TYPE, queryType); + summaryProfile.addInfoString(ProfileManager.QUERY_STATE, + !waiteBeReport && context.getState().getStateType().equals(MysqlStateType.OK) ? + "RUNNING" : context.getState().toString()); summaryProfile.addInfoString(ProfileManager.DORIS_VERSION, Version.DORIS_BUILD_VERSION); summaryProfile.addInfoString(ProfileManager.USER, context.getQualifiedUser()); summaryProfile.addInfoString(ProfileManager.DEFAULT_DB, context.getDatabase()); @@ -209,8 +213,12 @@ public class StmtExecutor implements ProfileWriter { summaryProfile.addChild(plannerRuntimeProfile); profile.addChild(coord.getQueryProfile()); } else { - summaryProfile.addInfoString(ProfileManager.END_TIME, TimeUtils.longToTimeString(currentTimestamp)); + summaryProfile.addInfoString(ProfileManager.END_TIME, + waiteBeReport ? TimeUtils.longToTimeString(currentTimestamp) : "N/A"); summaryProfile.addInfoString(ProfileManager.TOTAL_TIME, DebugUtil.getPrettyStringMs(totalTimeMs)); + summaryProfile.addInfoString(ProfileManager.QUERY_STATE, + !waiteBeReport && context.getState().getStateType().equals(MysqlStateType.OK) ? + "RUNNING" : context.getState().toString()); } plannerProfile.initRuntimeProfile(plannerRuntimeProfile); @@ -360,6 +368,7 @@ public class StmtExecutor implements ProfileWriter { try { handleInsertStmt(); if (!((InsertStmt) parsedStmt).getQueryStmt().isExplain()) { + queryType = "Insert"; writeProfile(true); } } catch (Throwable t) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/ExportExportingTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/ExportExportingTask.java index 2adfb23eb3..f3c3c105ba 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/ExportExportingTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/ExportExportingTask.java @@ -262,7 +262,7 @@ public class ExportExportingTask extends MasterTask { summaryProfile.addInfoString(ProfileManager.END_TIME, TimeUtils.longToTimeString(currentTimestamp)); summaryProfile.addInfoString(ProfileManager.TOTAL_TIME, DebugUtil.getPrettyStringMs(totalTimeMs)); - summaryProfile.addInfoString(ProfileManager.QUERY_TYPE, "Query"); + summaryProfile.addInfoString(ProfileManager.QUERY_TYPE, "Export"); summaryProfile.addInfoString(ProfileManager.QUERY_STATE, job.getState().toString()); summaryProfile.addInfoString(ProfileManager.DORIS_VERSION, Version.DORIS_BUILD_VERSION); summaryProfile.addInfoString(ProfileManager.USER, "xxx");