[Feature](config) allow update multiple be configs in one request (#23702)
This commit is contained in:
@ -1416,32 +1416,34 @@ bool init(const char* conf_file, bool fill_conf_map, bool must_exist, bool set_t
|
||||
return true;
|
||||
}
|
||||
|
||||
#define UPDATE_FIELD(FIELD, VALUE, TYPE, PERSIST) \
|
||||
if (strcmp((FIELD).type, #TYPE) == 0) { \
|
||||
TYPE new_value; \
|
||||
if (!convert((VALUE), new_value)) { \
|
||||
return Status::InvalidArgument("convert '{}' as {} failed", VALUE, #TYPE); \
|
||||
} \
|
||||
TYPE& ref_conf_value = *reinterpret_cast<TYPE*>((FIELD).storage); \
|
||||
TYPE old_value = ref_conf_value; \
|
||||
if (RegisterConfValidator::_s_field_validator != nullptr) { \
|
||||
auto validator = RegisterConfValidator::_s_field_validator->find((FIELD).name); \
|
||||
if (validator != RegisterConfValidator::_s_field_validator->end() && \
|
||||
!(validator->second)()) { \
|
||||
ref_conf_value = old_value; \
|
||||
return Status::InvalidArgument("validate {}={} failed", (FIELD).name, new_value); \
|
||||
} \
|
||||
} \
|
||||
ref_conf_value = new_value; \
|
||||
if (full_conf_map != nullptr) { \
|
||||
std::ostringstream oss; \
|
||||
oss << new_value; \
|
||||
(*full_conf_map)[(FIELD).name] = oss.str(); \
|
||||
} \
|
||||
if (PERSIST) { \
|
||||
RETURN_IF_ERROR(persist_config(std::string((FIELD).name), VALUE)); \
|
||||
} \
|
||||
return Status::OK(); \
|
||||
#define UPDATE_FIELD(FIELD, VALUE, TYPE, PERSIST) \
|
||||
if (strcmp((FIELD).type, #TYPE) == 0) { \
|
||||
TYPE new_value; \
|
||||
if (!convert((VALUE), new_value)) { \
|
||||
return Status::Error<ErrorCode::INVALID_ARGUMENT, false>("convert '{}' as {} failed", \
|
||||
VALUE, #TYPE); \
|
||||
} \
|
||||
TYPE& ref_conf_value = *reinterpret_cast<TYPE*>((FIELD).storage); \
|
||||
TYPE old_value = ref_conf_value; \
|
||||
if (RegisterConfValidator::_s_field_validator != nullptr) { \
|
||||
auto validator = RegisterConfValidator::_s_field_validator->find((FIELD).name); \
|
||||
if (validator != RegisterConfValidator::_s_field_validator->end() && \
|
||||
!(validator->second)()) { \
|
||||
ref_conf_value = old_value; \
|
||||
return Status::Error<ErrorCode::INVALID_ARGUMENT, false>("validate {}={} failed", \
|
||||
(FIELD).name, new_value); \
|
||||
} \
|
||||
} \
|
||||
ref_conf_value = new_value; \
|
||||
if (full_conf_map != nullptr) { \
|
||||
std::ostringstream oss; \
|
||||
oss << new_value; \
|
||||
(*full_conf_map)[(FIELD).name] = oss.str(); \
|
||||
} \
|
||||
if (PERSIST) { \
|
||||
RETURN_IF_ERROR(persist_config(std::string((FIELD).name), VALUE)); \
|
||||
} \
|
||||
return Status::OK(); \
|
||||
}
|
||||
|
||||
// write config to be_custom.conf
|
||||
@ -1466,11 +1468,12 @@ Status set_config(const std::string& field, const std::string& value, bool need_
|
||||
bool force) {
|
||||
auto it = Register::_s_field_map->find(field);
|
||||
if (it == Register::_s_field_map->end()) {
|
||||
return Status::NotFound("'{}' is not found", field);
|
||||
return Status::Error<ErrorCode::NOT_FOUND, false>("'{}' is not found", field);
|
||||
}
|
||||
|
||||
if (!force && !it->second.valmutable) {
|
||||
return Status::NotSupported("'{}' is not support to modify", field);
|
||||
return Status::Error<ErrorCode::NOT_IMPLEMENTED_ERROR, false>(
|
||||
"'{}' is not support to modify", field);
|
||||
}
|
||||
|
||||
UPDATE_FIELD(it->second, value, bool, need_persist);
|
||||
@ -1485,8 +1488,8 @@ Status set_config(const std::string& field, const std::string& value, bool need_
|
||||
}
|
||||
|
||||
// The other types are not thread safe to change dynamically.
|
||||
return Status::NotSupported("'{}' is type of '{}' which is not support to modify", field,
|
||||
it->second.type);
|
||||
return Status::Error<ErrorCode::NOT_IMPLEMENTED_ERROR, false>(
|
||||
"'{}' is type of '{}' which is not support to modify", field, it->second.type);
|
||||
}
|
||||
|
||||
Status set_fuzzy_config(const std::string& field, const std::string& value) {
|
||||
|
||||
@ -89,61 +89,53 @@ void ConfigAction::handle_update_config(HttpRequest* req) {
|
||||
|
||||
Status s;
|
||||
std::string msg;
|
||||
// We only support set one config at a time, and along with a optional param "persist".
|
||||
// So the number of query params should at most be 2.
|
||||
if (req->params()->size() > 2 || req->params()->size() < 1) {
|
||||
rapidjson::Document root;
|
||||
root.SetObject();
|
||||
rapidjson::Document results;
|
||||
results.SetArray();
|
||||
if (req->params()->size() < 1) {
|
||||
s = Status::InvalidArgument("");
|
||||
msg = "Now only support to set a single config once, via 'config_name=new_value', and with "
|
||||
"an optional parameter 'persist'.";
|
||||
} else {
|
||||
if (req->params()->size() == 1) {
|
||||
const std::string& config = req->params()->begin()->first;
|
||||
const std::string& new_value = req->params()->begin()->second;
|
||||
s = config::set_config(config, new_value, false);
|
||||
bool need_persist = false;
|
||||
if (req->params()->find(PERSIST_PARAM)->second.compare("true") == 0) {
|
||||
need_persist = true;
|
||||
}
|
||||
for (const auto& [key, value] : *req->params()) {
|
||||
if (key == PERSIST_PARAM) {
|
||||
continue;
|
||||
}
|
||||
s = config::set_config(key, value, need_persist);
|
||||
if (s.ok()) {
|
||||
LOG(INFO) << "set_config " << config << "=" << new_value << " success";
|
||||
LOG(INFO) << "set_config " << key << "=" << value
|
||||
<< " success. persist: " << need_persist;
|
||||
} else {
|
||||
LOG(WARNING) << "set_config " << config << "=" << new_value << " failed";
|
||||
msg = strings::Substitute("set $0=$1 failed, reason: $2", config, new_value,
|
||||
LOG(WARNING) << "set_config " << key << "=" << value << " failed";
|
||||
msg = strings::Substitute("set $0=$1 failed, reason: $2.", key, value,
|
||||
s.to_string());
|
||||
}
|
||||
} else if (req->params()->size() == 2) {
|
||||
if (req->params()->find(PERSIST_PARAM) == req->params()->end()) {
|
||||
s = Status::InvalidArgument("");
|
||||
msg = "Now only support to set a single config once, via 'config_name=new_value', "
|
||||
"and with an optional parameter 'persist'.";
|
||||
} else {
|
||||
bool need_persist = false;
|
||||
if (req->params()->find(PERSIST_PARAM)->second.compare("true") == 0) {
|
||||
need_persist = true;
|
||||
}
|
||||
for (auto const& iter : *(req->params())) {
|
||||
if (iter.first.compare(PERSIST_PARAM) == 0) {
|
||||
continue;
|
||||
}
|
||||
s = config::set_config(iter.first, iter.second, need_persist);
|
||||
if (s.ok()) {
|
||||
LOG(INFO) << "set_config " << iter.first << "=" << iter.second
|
||||
<< " success. persist: " << need_persist;
|
||||
} else {
|
||||
LOG(WARNING)
|
||||
<< "set_config " << iter.first << "=" << iter.second << " failed";
|
||||
msg = strings::Substitute("set $0=$1 failed, reason: $2", iter.first,
|
||||
iter.second, s.to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
std::string status(s.ok() ? "OK" : "BAD");
|
||||
rapidjson::Value result;
|
||||
result.SetObject();
|
||||
rapidjson::Value(key.c_str(), key.size(), results.GetAllocator());
|
||||
result.AddMember("config_name",
|
||||
rapidjson::Value(key.c_str(), key.size(), results.GetAllocator()),
|
||||
results.GetAllocator());
|
||||
result.AddMember(
|
||||
"status",
|
||||
rapidjson::Value(status.c_str(), status.size(), results.GetAllocator()),
|
||||
results.GetAllocator());
|
||||
result.AddMember("msg",
|
||||
rapidjson::Value(msg.c_str(), msg.size(), results.GetAllocator()),
|
||||
results.GetAllocator());
|
||||
results.PushBack(result, results.GetAllocator());
|
||||
}
|
||||
}
|
||||
|
||||
std::string status(s.ok() ? "OK" : "BAD");
|
||||
rapidjson::Document root;
|
||||
root.SetObject();
|
||||
root.AddMember("status", rapidjson::Value(status.c_str(), status.size()), root.GetAllocator());
|
||||
root.AddMember("msg", rapidjson::Value(msg.c_str(), msg.size()), root.GetAllocator());
|
||||
rapidjson::StringBuffer strbuf;
|
||||
rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(strbuf);
|
||||
root.Accept(writer);
|
||||
results.Accept(writer);
|
||||
|
||||
req->add_output_header(HttpHeaders::CONTENT_TYPE, HEADER_JSON.c_str());
|
||||
HttpChannel::send_reply(req, HttpStatus::OK, strbuf.GetString());
|
||||
|
||||
@ -54,27 +54,54 @@ None
|
||||
|
||||
### Query
|
||||
|
||||
```
|
||||
[["agent_task_trace_threshold_sec","int32_t","2","true"], ...]
|
||||
```
|
||||
```
|
||||
[["agent_task_trace_threshold_sec","int32_t","2","true"], ...]
|
||||
```
|
||||
|
||||
### Update
|
||||
```
|
||||
```
|
||||
[
|
||||
{
|
||||
"config_name": "agent_task_trace_threshold_sec",
|
||||
"status": "OK",
|
||||
"msg": ""
|
||||
}
|
||||
```
|
||||
]
|
||||
```
|
||||
|
||||
```
|
||||
[
|
||||
{
|
||||
"config_name": "agent_task_trace_threshold_sec",
|
||||
"status": "OK",
|
||||
"msg": ""
|
||||
},
|
||||
{
|
||||
"config_name": "enable_segcompaction",
|
||||
"status": "BAD",
|
||||
"msg": "set enable_segcompaction=false failed, reason: [NOT_IMPLEMENTED_ERROR]'enable_segcompaction' is not support to modify."
|
||||
},
|
||||
{
|
||||
"config_name": "enable_time_lut",
|
||||
"status": "BAD",
|
||||
"msg": "set enable_time_lut=false failed, reason: [NOT_IMPLEMENTED_ERROR]'enable_time_lut' is not support to modify."
|
||||
}
|
||||
]
|
||||
```
|
||||
|
||||
## Examples
|
||||
|
||||
|
||||
```
|
||||
curl http://127.0.0.1:8040/api/show_config
|
||||
```
|
||||
|
||||
```
|
||||
curl -X POST "http://127.0.0.1:8040/api/update_config?agent_task_trace_threshold_sec=2&persist=true"
|
||||
|
||||
```
|
||||
```
|
||||
curl "http://127.0.0.1:8040/api/show_config"
|
||||
```
|
||||
|
||||
```
|
||||
curl -X POST "http://127.0.0.1:8040/api/update_config?agent_task_trace_threshold_sec=2&persist=true"
|
||||
|
||||
```
|
||||
|
||||
```
|
||||
curl -X POST "http://127.0.0.1:8040/api/update_config?agent_task_trace_threshold_sec=2&enable_merge_on_write_correctness_check=true&persist=true"
|
||||
```
|
||||
|
||||
|
||||
@ -54,27 +54,52 @@ under the License.
|
||||
|
||||
### 查询
|
||||
|
||||
```
|
||||
[["agent_task_trace_threshold_sec","int32_t","2","true"], ...]
|
||||
```
|
||||
```
|
||||
[["agent_task_trace_threshold_sec","int32_t","2","true"], ...]
|
||||
```
|
||||
|
||||
### 更新
|
||||
```
|
||||
```
|
||||
[
|
||||
{
|
||||
"config_name": "agent_task_trace_threshold_sec",
|
||||
"status": "OK",
|
||||
"msg": ""
|
||||
}
|
||||
```
|
||||
]
|
||||
```
|
||||
|
||||
```
|
||||
[
|
||||
{
|
||||
"config_name": "agent_task_trace_threshold_sec",
|
||||
"status": "OK",
|
||||
"msg": ""
|
||||
},
|
||||
{
|
||||
"config_name": "enable_segcompaction",
|
||||
"status": "BAD",
|
||||
"msg": "set enable_segcompaction=false failed, reason: [NOT_IMPLEMENTED_ERROR]'enable_segcompaction' is not support to modify."
|
||||
},
|
||||
{
|
||||
"config_name": "enable_time_lut",
|
||||
"status": "BAD",
|
||||
"msg": "set enable_time_lut=false failed, reason: [NOT_IMPLEMENTED_ERROR]'enable_time_lut' is not support to modify."
|
||||
}
|
||||
]
|
||||
```
|
||||
## Examples
|
||||
|
||||
|
||||
```
|
||||
curl "http://127.0.0.1:8040/api/show_config"
|
||||
```
|
||||
|
||||
```
|
||||
curl -X POST "http://127.0.0.1:8040/api/update_config?agent_task_trace_threshold_sec=2&persist=true"
|
||||
|
||||
```
|
||||
```
|
||||
curl "http://127.0.0.1:8040/api/show_config"
|
||||
```
|
||||
|
||||
```
|
||||
curl -X POST "http://127.0.0.1:8040/api/update_config?agent_task_trace_threshold_sec=2&persist=true"
|
||||
|
||||
```
|
||||
|
||||
```
|
||||
curl -X POST "http://127.0.0.1:8040/api/update_config?agent_task_trace_threshold_sec=2&enable_merge_on_write_correctness_check=true&persist=true"
|
||||
```
|
||||
@ -24,18 +24,17 @@ defaultDb = "regression_test"
|
||||
// init cmd like: select @@session.tx_read_only
|
||||
// at each time we connect.
|
||||
// add allowLoadLocalInfile so that the jdbc can execute mysql load data from client.
|
||||
jdbcUrl = "jdbc:mysql://127.0.0.1:9030/?useLocalSessionState=true&allowLoadLocalInfile=true"
|
||||
targetJdbcUrl = "jdbc:mysql://127.0.0.1:9030/?useLocalSessionState=true&allowLoadLocalInfile=true"
|
||||
jdbcUrl = "jdbc:mysql://127.0.0.1:9134/?useLocalSessionState=true&allowLoadLocalInfile=true"
|
||||
targetJdbcUrl = "jdbc:mysql://127.0.0.1:9134/?useLocalSessionState=true&allowLoadLocalInfile=true"
|
||||
jdbcUser = "root"
|
||||
jdbcPassword = ""
|
||||
|
||||
feSourceThriftAddress = "127.0.0.1:9020"
|
||||
feTargetThriftAddress = "127.0.0.1:9020"
|
||||
syncerAddress = "127.0.0.1:9190"
|
||||
feSourceThriftAddress = "127.0.0.1:9124"
|
||||
feTargetThriftAddress = "127.0.0.1:9124"
|
||||
feSyncerUser = "root"
|
||||
feSyncerPassword = ""
|
||||
|
||||
feHttpAddress = "127.0.0.1:8030"
|
||||
feHttpAddress = "127.0.0.1:8134"
|
||||
feHttpUser = "root"
|
||||
feHttpPassword = ""
|
||||
|
||||
@ -85,7 +84,6 @@ pg_14_port=5442
|
||||
oracle_11_port=1521
|
||||
sqlserver_2022_port=1433
|
||||
clickhouse_22_port=8123
|
||||
doris_port=9030
|
||||
|
||||
// hive catalog test config
|
||||
// To enable hive test, you need first start hive container.
|
||||
@ -110,9 +108,6 @@ extHdfsPort = 4007
|
||||
extHiveHmsUser = "****"
|
||||
extHiveHmsPassword= "***********"
|
||||
|
||||
//paimon catalog test config for bigdata
|
||||
enableExternalPaimonTest = false
|
||||
|
||||
//mysql jdbc connector test config for bigdata
|
||||
enableExternalMysqlTest = false
|
||||
extMysqlHost = "***.**.**.**"
|
||||
@ -145,5 +140,3 @@ max_failure_num=0
|
||||
|
||||
// used for exporting test
|
||||
s3ExportBucketName = ""
|
||||
|
||||
externalEnvIp="127.0.0.1"
|
||||
|
||||
90
regression-test/suites/update/test_update_configs.groovy
Normal file
90
regression-test/suites/update/test_update_configs.groovy
Normal file
@ -0,0 +1,90 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
suite("test_update_configs", "p0") {
|
||||
|
||||
String backend_id;
|
||||
def backendId_to_backendIP = [:]
|
||||
def backendId_to_backendHttpPort = [:]
|
||||
getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
|
||||
backend_id = backendId_to_backendIP.keySet()[0]
|
||||
def beIp = backendId_to_backendIP.get(backend_id)
|
||||
def bePort = backendId_to_backendHttpPort.get(backend_id)
|
||||
def (code, out, err) = show_be_config(beIp, bePort)
|
||||
logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err)
|
||||
assertEquals(code, 0)
|
||||
def configList = parseJson(out.trim())
|
||||
assert configList instanceof List
|
||||
|
||||
boolean disableAutoCompaction = true
|
||||
boolean enablePrefetch = true
|
||||
boolean enableSegcompaction = true
|
||||
for (Object ele in (List) configList) {
|
||||
assert ele instanceof List<String>
|
||||
if (((List<String>) ele)[0] == "disable_auto_compaction") {
|
||||
disableAutoCompaction = Boolean.parseBoolean(((List<String>) ele)[2])
|
||||
}
|
||||
if (((List<String>) ele)[0] == "enable_prefetch") {
|
||||
enablePrefetch = Boolean.parseBoolean(((List<String>) ele)[2])
|
||||
}
|
||||
if (((List<String>) ele)[0] == "enable_segcompaction") {
|
||||
enableSegcompaction = Boolean.parseBoolean(((List<String>) ele)[2])
|
||||
}
|
||||
}
|
||||
logger.info("disable_auto_compaction:${disableAutoCompaction}, enable_prefetch:${enablePrefetch}, enable_segcompaction:${enableSegcompaction}")
|
||||
|
||||
curl("POST", String.format("http://%s:%s/api/update_config?%s=%s&%s=%s&%s=%s", beIp, bePort, "disable_auto_compaction", String.valueOf(!disableAutoCompaction), "enable_prefetch", String.valueOf(!enablePrefetch), "enable_segcompaction", String.valueOf(!enableSegcompaction)))
|
||||
|
||||
|
||||
(code, out, err) = show_be_config(beIp, bePort)
|
||||
logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err)
|
||||
assertEquals(code, 0)
|
||||
configList2 = parseJson(out.trim())
|
||||
assert configList instanceof List
|
||||
for (Object ele in (List) configList2) {
|
||||
assert ele instanceof List<String>
|
||||
if (((List<String>) ele)[0] == "disable_auto_compaction") {
|
||||
logger.info("disable_auto_compaction: ${((List<String>) ele)[2]}")
|
||||
assertEquals(((List<String>) ele)[2], String.valueOf(!disableAutoCompaction))
|
||||
}
|
||||
if (((List<String>) ele)[0] == "enable_prefetch") {
|
||||
logger.info("enable_prefetch: ${((List<String>) ele)[3]}")
|
||||
assertEquals(((List<String>) ele)[2], String.valueOf(!enablePrefetch))
|
||||
}
|
||||
if (((List<String>) ele)[0] == "enable_segcompaction") {
|
||||
// enable_segcompaction is not mutable
|
||||
logger.info("enable_segcompaction: ${((List<String>) ele)[3]}")
|
||||
assertEquals(((List<String>) ele)[2], String.valueOf(enableSegcompaction))
|
||||
}
|
||||
}
|
||||
|
||||
curl("POST", String.format("http://%s:%s/api/update_config?%s=%s&%s=%s", beIp, bePort, "disable_auto_compaction", String.valueOf(disableAutoCompaction), "enable_prefetch", String.valueOf(enablePrefetch)))
|
||||
|
||||
(code, out, err) = show_be_config(beIp, bePort)
|
||||
assertEquals(code, 0)
|
||||
configList = parseJson(out.trim())
|
||||
assert configList instanceof List
|
||||
for (Object ele in (List) configList) {
|
||||
assert ele instanceof List<String>
|
||||
if (((List<String>) ele)[0] == "disable_auto_compaction") {
|
||||
assertEquals(((List<String>) ele)[2], String.valueOf(disableAutoCompaction))
|
||||
}
|
||||
if (((List<String>) ele)[0] == "enable_prefetch") {
|
||||
assertEquals(((List<String>) ele)[2], String.valueOf(enablePrefetch))
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user