[Bug] Fix resource tag bug, add documents and some other bug fix (#6708)
1. Fix bug of UNKNOWN Operation Type 91 2. Support using resource_tag property of user to limit the usage of BE 3. Add new FE config `disable_tablet_scheduler` to disable tablet scheduler. 4. Add documents for resource tag. 5. Modify the default value of FE config `default_db_data_quota_bytes` to 1PB. 6. Add a new BE config `disable_compaction_trace_log` to disable the trace log of compaction time cost. 7. Modify the default value of BE config `remote_storage_read_buffer_mb` to 16MB 8. Fix `show backends` results error 9. Add new BE config `external_table_connect_timeout_sec` to set the timeout when connecting to odbc and mysql table. 10. Modify issue template to enable blank issue, for release note or other specific usage. 11. Fix a bug in alpha_row_set split_range() function.
This commit is contained in:
2
.github/ISSUE_TEMPLATE/config.yml
vendored
2
.github/ISSUE_TEMPLATE/config.yml
vendored
@ -15,7 +15,7 @@
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
blank_issues_enabled: false
|
||||
blank_issues_enabled: true
|
||||
contact_links:
|
||||
- name: Ask a question or get support
|
||||
url: https://github.com/apache/incubator-doris/discussions
|
||||
|
||||
@ -298,6 +298,7 @@ CONF_mInt64(row_step_for_compaction_merge_log, "0");
|
||||
// Threshold to logging compaction trace, in seconds.
|
||||
CONF_mInt32(base_compaction_trace_threshold, "60");
|
||||
CONF_mInt32(cumulative_compaction_trace_threshold, "10");
|
||||
CONF_mBool(disable_compaction_trace_log, "true");
|
||||
|
||||
// Threshold to logging agent task trace, in seconds.
|
||||
CONF_mInt32(agent_task_trace_threshold_sec, "2");
|
||||
@ -587,7 +588,7 @@ CONF_mInt32(zone_map_row_num_threshold, "20");
|
||||
CONF_Int32(aws_log_level, "3");
|
||||
|
||||
// the buffer size when read data from remote storage like s3
|
||||
CONF_mInt32(remote_storage_read_buffer_mb, "256");
|
||||
CONF_mInt32(remote_storage_read_buffer_mb, "16");
|
||||
|
||||
// Default level of MemTracker to show in web page
|
||||
// now MemTracker support two level:
|
||||
@ -629,6 +630,9 @@ CONF_Int32(send_batch_thread_pool_queue_size, "102400");
|
||||
// When doing compaction, each segment may take at least 1MB buffer.
|
||||
CONF_mInt32(max_segment_num_per_rowset, "100");
|
||||
|
||||
// The connection timeout when connecting to external table such as odbc table.
|
||||
CONF_mInt32(external_table_connect_timeout_sec, "5");
|
||||
|
||||
} // namespace config
|
||||
|
||||
} // namespace doris
|
||||
|
||||
@ -22,6 +22,8 @@
|
||||
#include "common/logging.h"
|
||||
#include "mysql_scanner.h"
|
||||
|
||||
#include "common/config.h"
|
||||
|
||||
namespace doris {
|
||||
|
||||
MysqlScanner::MysqlScanner(const MysqlScannerParam& param)
|
||||
@ -36,6 +38,7 @@ MysqlScanner::~MysqlScanner() {
|
||||
if (_my_conn) {
|
||||
mysql_close(_my_conn);
|
||||
_my_conn = NULL;
|
||||
mysql_library_end();
|
||||
}
|
||||
}
|
||||
|
||||
@ -53,6 +56,9 @@ Status MysqlScanner::open() {
|
||||
|
||||
VLOG_CRITICAL << "MysqlScanner::Connect";
|
||||
|
||||
unsigned int mysql_ct = config::external_table_connect_timeout_sec;
|
||||
mysql_options(_my_conn, MYSQL_OPT_CONNECT_TIMEOUT, &mysql_ct);
|
||||
mysql_options(_my_conn, MYSQL_OPT_READ_TIMEOUT, &mysql_ct);
|
||||
if (NULL == mysql_real_connect(_my_conn, _my_param.host.c_str(), _my_param.user.c_str(),
|
||||
_my_param.passwd.c_str(), _my_param.db.c_str(),
|
||||
atoi(_my_param.port.c_str()), NULL, _my_param.client_flag)) {
|
||||
|
||||
@ -22,6 +22,7 @@
|
||||
#include <boost/algorithm/string.hpp>
|
||||
#include <codecvt>
|
||||
|
||||
#include "common/config.h"
|
||||
#include "common/logging.h"
|
||||
#include "exprs/expr.h"
|
||||
#include "runtime/primitive_type.h"
|
||||
@ -100,6 +101,10 @@ Status ODBCConnector::open() {
|
||||
"set env attr");
|
||||
// Allocate a connection handle
|
||||
ODBC_DISPOSE(_env, SQL_HANDLE_ENV, SQLAllocHandle(SQL_HANDLE_DBC, _env, &_dbc), "alloc dbc");
|
||||
// Set connect timeout
|
||||
unsigned int timeout = config::external_table_connect_timeout_sec;
|
||||
SQLSetConnectAttr(_dbc, SQL_LOGIN_TIMEOUT, (SQLPOINTER) timeout, 0);
|
||||
SQLSetConnectAttr(_dbc, SQL_ATTR_CONNECTION_TIMEOUT, (SQLPOINTER) timeout, 0);
|
||||
// Connect to the Database
|
||||
ODBC_DISPOSE(_dbc, SQL_HANDLE_DBC,
|
||||
SQLDriverConnect(_dbc, NULL, (SQLCHAR*)_connect_string.c_str(), SQL_NTS, NULL, 0,
|
||||
|
||||
@ -162,8 +162,14 @@ OLAPStatus AlphaRowset::remove_old_files(std::vector<std::string>* files_to_remo
|
||||
}
|
||||
|
||||
OLAPStatus AlphaRowset::split_range(const RowCursor& start_key, const RowCursor& end_key,
|
||||
uint64_t request_block_row_count,
|
||||
uint64_t request_block_row_count, size_t key_num,
|
||||
std::vector<OlapTuple>* ranges) {
|
||||
if (key_num > _schema->num_short_key_columns()) {
|
||||
// should not happen
|
||||
LOG(WARNING) << "key num " << key_num << " should less than or equal to short key column number: "
|
||||
<< _schema->num_short_key_columns();
|
||||
return OLAP_ERR_INVALID_SCHEMA;
|
||||
}
|
||||
EntrySlice entry;
|
||||
RowBlockPosition start_pos;
|
||||
RowBlockPosition end_pos;
|
||||
@ -184,12 +190,12 @@ OLAPStatus AlphaRowset::split_range(const RowCursor& start_key, const RowCursor&
|
||||
if (expected_rows == 0) {
|
||||
LOG(WARNING) << "expected_rows less than 1. [request_block_row_count = "
|
||||
<< request_block_row_count << "]";
|
||||
return OLAP_ERR_TABLE_NOT_FOUND;
|
||||
return OLAP_ERR_INVALID_SCHEMA;
|
||||
}
|
||||
|
||||
// 找到startkey对应的起始位置
|
||||
// find the start position of start key
|
||||
RowCursor helper_cursor;
|
||||
if (helper_cursor.init(*_schema, _schema->num_short_key_columns()) != OLAP_SUCCESS) {
|
||||
if (helper_cursor.init(*_schema, key_num) != OLAP_SUCCESS) {
|
||||
LOG(WARNING) << "fail to parse strings to key with RowCursor type.";
|
||||
return OLAP_ERR_INVALID_SCHEMA;
|
||||
}
|
||||
@ -220,14 +226,14 @@ OLAPStatus AlphaRowset::split_range(const RowCursor& start_key, const RowCursor&
|
||||
RowCursor cur_start_key;
|
||||
RowCursor last_start_key;
|
||||
|
||||
if (cur_start_key.init(*_schema, _schema->num_short_key_columns()) != OLAP_SUCCESS ||
|
||||
last_start_key.init(*_schema, _schema->num_short_key_columns()) != OLAP_SUCCESS) {
|
||||
if (cur_start_key.init(*_schema, key_num) != OLAP_SUCCESS ||
|
||||
last_start_key.init(*_schema, key_num) != OLAP_SUCCESS) {
|
||||
LOG(WARNING) << "fail to init cursor";
|
||||
return OLAP_ERR_INIT_FAILED;
|
||||
}
|
||||
|
||||
std::vector<uint32_t> cids;
|
||||
for (uint32_t cid = 0; cid < _schema->num_short_key_columns(); ++cid) {
|
||||
for (uint32_t cid = 0; cid < key_num; ++cid) {
|
||||
cids.push_back(cid);
|
||||
}
|
||||
|
||||
@ -387,6 +393,9 @@ std::shared_ptr<SegmentGroup> AlphaRowset::_segment_group_with_largest_size() {
|
||||
size_t largest_segment_group_sizes = 0;
|
||||
|
||||
for (auto segment_group : _segment_groups) {
|
||||
if (!segment_group->index_loaded()) {
|
||||
continue;
|
||||
}
|
||||
if (segment_group->empty() || segment_group->zero_num_rows()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
@ -46,7 +46,7 @@ public:
|
||||
std::shared_ptr<RowsetReader>* result) override;
|
||||
|
||||
OLAPStatus split_range(const RowCursor& start_key, const RowCursor& end_key,
|
||||
uint64_t request_block_row_count,
|
||||
uint64_t request_block_row_count, size_t key_num,
|
||||
std::vector<OlapTuple>* ranges) override;
|
||||
|
||||
OLAPStatus remove() override;
|
||||
|
||||
@ -76,7 +76,7 @@ OLAPStatus BetaRowset::create_reader(const std::shared_ptr<MemTracker>& parent_t
|
||||
}
|
||||
|
||||
OLAPStatus BetaRowset::split_range(const RowCursor& start_key, const RowCursor& end_key,
|
||||
uint64_t request_block_row_count,
|
||||
uint64_t request_block_row_count, size_t key_num,
|
||||
std::vector<OlapTuple>* ranges) {
|
||||
ranges->emplace_back(start_key.to_tuple());
|
||||
ranges->emplace_back(end_key.to_tuple());
|
||||
|
||||
@ -46,7 +46,7 @@ public:
|
||||
int segment_id);
|
||||
|
||||
OLAPStatus split_range(const RowCursor& start_key, const RowCursor& end_key,
|
||||
uint64_t request_block_row_count,
|
||||
uint64_t request_block_row_count, size_t key_num,
|
||||
std::vector<OlapTuple>* ranges) override;
|
||||
|
||||
OLAPStatus remove() override;
|
||||
|
||||
@ -131,7 +131,7 @@ public:
|
||||
// The first/last tuple must be start_key/end_key.to_tuple(). If we can't divide the input range,
|
||||
// the result `ranges` should be [start_key.to_tuple(), end_key.to_tuple()]
|
||||
virtual OLAPStatus split_range(const RowCursor& start_key, const RowCursor& end_key,
|
||||
uint64_t request_block_row_count,
|
||||
uint64_t request_block_row_count, size_t key_num,
|
||||
std::vector<OlapTuple>* ranges) = 0;
|
||||
|
||||
const RowsetMetaSharedPtr& rowset_meta() const { return _rowset_meta; }
|
||||
|
||||
@ -864,6 +864,7 @@ OLAPStatus Tablet::split_range(const OlapTuple& start_key_strings, const OlapTup
|
||||
uint64_t request_block_row_count, std::vector<OlapTuple>* ranges) {
|
||||
DCHECK(ranges != nullptr);
|
||||
|
||||
size_t key_num = 0;
|
||||
RowCursor start_key;
|
||||
// 如果有startkey,用startkey初始化;反之则用minkey初始化
|
||||
if (start_key_strings.size() > 0) {
|
||||
@ -876,6 +877,7 @@ OLAPStatus Tablet::split_range(const OlapTuple& start_key_strings, const OlapTup
|
||||
LOG(WARNING) << "init end key failed";
|
||||
return OLAP_ERR_INVALID_SCHEMA;
|
||||
}
|
||||
key_num = start_key_strings.size();
|
||||
} else {
|
||||
if (start_key.init(_schema, num_short_key_columns()) != OLAP_SUCCESS) {
|
||||
LOG(WARNING) << "fail to initial key strings with RowCursor type.";
|
||||
@ -884,6 +886,7 @@ OLAPStatus Tablet::split_range(const OlapTuple& start_key_strings, const OlapTup
|
||||
|
||||
start_key.allocate_memory_for_string_type(_schema);
|
||||
start_key.build_min_key();
|
||||
key_num = num_short_key_columns();
|
||||
}
|
||||
|
||||
RowCursor end_key;
|
||||
@ -919,7 +922,7 @@ OLAPStatus Tablet::split_range(const OlapTuple& start_key_strings, const OlapTup
|
||||
ranges->emplace_back(end_key.to_tuple());
|
||||
return OLAP_SUCCESS;
|
||||
}
|
||||
return rowset->split_range(start_key, end_key, request_block_row_count, ranges);
|
||||
return rowset->split_range(start_key, end_key, request_block_row_count, key_num, ranges);
|
||||
}
|
||||
|
||||
// NOTE: only used when create_table, so it is sure that there is no concurrent reader and writer.
|
||||
@ -1367,7 +1370,7 @@ void Tablet::execute_compaction(CompactionType compaction_type) {
|
||||
MonotonicStopWatch watch;
|
||||
watch.start();
|
||||
SCOPED_CLEANUP({
|
||||
if (watch.elapsed_time() / 1e9 > config::cumulative_compaction_trace_threshold) {
|
||||
if (!config::disable_compaction_trace_log && watch.elapsed_time() / 1e9 > config::cumulative_compaction_trace_threshold) {
|
||||
LOG(WARNING) << "Trace:" << std::endl << trace->DumpToString(Trace::INCLUDE_ALL);
|
||||
}
|
||||
});
|
||||
@ -1389,7 +1392,7 @@ void Tablet::execute_compaction(CompactionType compaction_type) {
|
||||
MonotonicStopWatch watch;
|
||||
watch.start();
|
||||
SCOPED_CLEANUP({
|
||||
if (watch.elapsed_time() / 1e9 > config::base_compaction_trace_threshold) {
|
||||
if (!config::disable_compaction_trace_log && watch.elapsed_time() / 1e9 > config::base_compaction_trace_threshold) {
|
||||
LOG(WARNING) << "Trace:" << std::endl << trace->DumpToString(Trace::INCLUDE_ALL);
|
||||
}
|
||||
});
|
||||
|
||||
@ -208,6 +208,7 @@ module.exports = [
|
||||
"time-zone",
|
||||
"variables",
|
||||
"update",
|
||||
"multi-tenant",
|
||||
],
|
||||
sidebarDepth: 1,
|
||||
},
|
||||
|
||||
@ -209,6 +209,7 @@ module.exports = [
|
||||
"time-zone",
|
||||
"variables",
|
||||
"update",
|
||||
"multi-tenant",
|
||||
],
|
||||
sidebarDepth: 1,
|
||||
},
|
||||
|
||||
@ -349,6 +349,14 @@ CumulativeCompaction skips the most recently released increments to prevent comp
|
||||
|
||||
Similar to `base_compaction_trace_threshold`.
|
||||
|
||||
### disable_compaction_trace_log
|
||||
|
||||
* Type: bool
|
||||
* Description: disable the trace log of compaction
|
||||
* Default value: true
|
||||
|
||||
If set to true, the `cumulative_compaction_trace_threshold` and `base_compaction_trace_threshold` won't work and log is disabled.
|
||||
|
||||
### `cumulative_compaction_policy`
|
||||
|
||||
* Type: string
|
||||
@ -1425,3 +1433,17 @@ The size of the buffer before flashing
|
||||
* Type: int32
|
||||
* Description: Used to limit the number of segments in the newly generated rowset when importing. If the threshold is exceeded, the import will fail with error -238. Too many segments will cause compaction to take up a lot of memory and cause OOM errors.
|
||||
* Default value: 100
|
||||
|
||||
### `remote_storage_read_buffer_mb`
|
||||
|
||||
* Type: int32
|
||||
* Description: The cache size used when reading files on hdfs or object storage.
|
||||
* Default value: 16MB
|
||||
|
||||
Increasing this value can reduce the number of calls to read remote data, but it will increase memory overhead.
|
||||
|
||||
### `external_table_connect_timeout_sec`
|
||||
|
||||
* Type: int32
|
||||
* Description: The timeout when establishing connection with external table such as ODBC table.
|
||||
* Default value: 5 seconds
|
||||
|
||||
@ -222,7 +222,7 @@ Maximum percentage of data that can be filtered (due to reasons such as data is
|
||||
|
||||
### default_db_data_quota_bytes
|
||||
|
||||
Default:1TB
|
||||
Default:1PB
|
||||
|
||||
IsMutable:true
|
||||
|
||||
|
||||
222
docs/en/administrator-guide/multi-tenant.md
Normal file
222
docs/en/administrator-guide/multi-tenant.md
Normal file
@ -0,0 +1,222 @@
|
||||
---
|
||||
{
|
||||
"title": "Multi-tenancy",
|
||||
"language": "en"
|
||||
}
|
||||
---
|
||||
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one
|
||||
or more contributor license agreements. See the NOTICE file
|
||||
distributed with this work for additional information
|
||||
regarding copyright ownership. The ASF licenses this file
|
||||
to you under the Apache License, Version 2.0 (the
|
||||
"License"); you may not use this file except in compliance
|
||||
with the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing,
|
||||
software distributed under the License is distributed on an
|
||||
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
KIND, either express or implied. See the License for the
|
||||
specific language governing permissions and limitations
|
||||
under the License.
|
||||
-->
|
||||
|
||||
# Multi-tenancy
|
||||
|
||||
The main purpose of Doris's multi-tenant and resource isolation solution is to reduce interference between multiple users when performing data operations in the same Doris cluster, and to allocate cluster resources to each user more reasonably.
|
||||
|
||||
The scheme is mainly divided into two parts, one is the division of resource groups at the node level in the cluster, and the other is the resource limit for a single query.
|
||||
|
||||
## Nodes in Doris
|
||||
|
||||
First, let's briefly introduce the node composition of Doris. There are two types of nodes in a Doris cluster: Frontend (FE) and Backend (BE).
|
||||
|
||||
FE is mainly responsible for metadata management, cluster management, user request access and query plan analysis.
|
||||
|
||||
BE is mainly responsible for data storage and execution of query plans.
|
||||
|
||||
FE does not participate in the processing and calculation of user data, so it is a node with low resource consumption. The BE is responsible for all data calculations and task processing, and is a resource-consuming node. Therefore, the resource division and resource restriction schemes introduced in this article are all aimed at BE nodes. Because the FE node consumes relatively low resources and can also be scaled horizontally, there is usually no need to isolate and restrict resources, and the FE node can be shared by all users.
|
||||
|
||||
## Node resource division
|
||||
|
||||
Node resource division refers to setting tags for BE nodes in a Doris cluster, and the BE nodes with the same tags form a resource group. Resource group can be regarded as a management unit of data storage and calculation. Below we use a specific example to introduce the use of resource groups.
|
||||
|
||||
1. Set labels for BE nodes
|
||||
|
||||
Assume that the current Doris cluster has 6 BE nodes. They are host[1-6] respectively. In the initial situation, all nodes belong to a default resource group (Default).
|
||||
|
||||
We can use the following command to divide these 6 nodes into 3 resource groups: group_a, group_b, group_c:
|
||||
|
||||
```sql
|
||||
alter system modify backend "host1:9050" set ("tag.location": "group_a");
|
||||
alter system modify backend "host2:9050" set ("tag.location": "group_a");
|
||||
alter system modify backend "host3:9050" set ("tag.location": "group_b");
|
||||
alter system modify backend "host4:9050" set ("tag.location": "group_b");
|
||||
alter system modify backend "host5:9050" set ("tag.location": "group_c");
|
||||
alter system modify backend "host6:9050" set ("tag.location": "group_c");
|
||||
```
|
||||
|
||||
Here we combine `host[1-2]` to form a resource group `group_a`, `host[3-4]` to form a resource group `group_b`, and `host[5-6]` to form a resource group `group_c`.
|
||||
|
||||
> Note: One BE only supports setting one Tag.
|
||||
|
||||
2. Distribution of data according to resource groups
|
||||
|
||||
After the resource group is divided. We can distribute different copies of user data in different resource groups. Assume a user table UserTable. We want to store a copy in each of the three resource groups, which can be achieved by the following table creation statement:
|
||||
|
||||
```sql
|
||||
create table UserTable
|
||||
(k1 int, k2 int)
|
||||
distributed by hash(k1) buckets 1
|
||||
properties(
|
||||
"replica_allocation"
|
||||
=
|
||||
"tag.location.group_a:1, tag.location.group_b:1, tag.location.group_c:1"
|
||||
)
|
||||
```
|
||||
|
||||
In this way, the data in the UserTable table will be stored in the form of 3 copies in the nodes where the resource groups group_a, group_b, and group_c are located.
|
||||
|
||||
The following figure shows the current node division and data distribution:
|
||||
|
||||
```
|
||||
┌────────────────────────────────────────────────────┐
|
||||
│ │
|
||||
│ ┌──────────────────┐ ┌──────────────────┐ │
|
||||
│ │ host1 │ │ host2 │ │
|
||||
│ │ ┌─────────────┐ │ │ │ │
|
||||
│ group_a │ │ replica1 │ │ │ │ │
|
||||
│ │ └─────────────┘ │ │ │ │
|
||||
│ │ │ │ │ │
|
||||
│ └──────────────────┘ └──────────────────┘ │
|
||||
│ │
|
||||
├────────────────────────────────────────────────────┤
|
||||
├────────────────────────────────────────────────────┤
|
||||
│ │
|
||||
│ ┌──────────────────┐ ┌──────────────────┐ │
|
||||
│ │ host3 │ │ host4 │ │
|
||||
│ │ │ │ ┌─────────────┐ │ │
|
||||
│ group_b │ │ │ │ replica2 │ │ │
|
||||
│ │ │ │ └─────────────┘ │ │
|
||||
│ │ │ │ │ │
|
||||
│ └──────────────────┘ └──────────────────┘ │
|
||||
│ │
|
||||
├────────────────────────────────────────────────────┤
|
||||
├────────────────────────────────────────────────────┤
|
||||
│ │
|
||||
│ ┌──────────────────┐ ┌──────────────────┐ │
|
||||
│ │ host5 │ │ host6 │ │
|
||||
│ │ │ │ ┌─────────────┐ │ │
|
||||
│ group_c │ │ │ │ replica3 │ │ │
|
||||
│ │ │ │ └─────────────┘ │ │
|
||||
│ │ │ │ │ │
|
||||
│ └──────────────────┘ └──────────────────┘ │
|
||||
│ │
|
||||
└────────────────────────────────────────────────────┘
|
||||
```
|
||||
|
||||
3. Use different resource groups for data query
|
||||
|
||||
After the execution of the first two steps is completed, we can limit a user's query by setting the user's resource usage permissions, and can only use the nodes in the specified resource group to execute.
|
||||
|
||||
For example, we can use the following statement to restrict user1 to only use nodes in the `group_a` resource group for data query, user2 can only use the `group_b` resource group, and user3 can use 3 resource groups at the same time:
|
||||
|
||||
```sql
|
||||
set property for'user1''resource_tags.location':'group_a';
|
||||
set property for'user2''resource_tags.location':'group_b';
|
||||
set property for'user3''resource_tags.location':'group_a, group_b, group_c';
|
||||
```
|
||||
|
||||
After the setting is complete, when user1 initiates a query on the UserTable table, it will only access the data copy on the nodes in the `group_a` resource group, and the query will only use the node computing resources in the `group_a` resource group. The query of user3 can use copies and computing resources in any resource group.
|
||||
|
||||
In this way, we have achieved physical resource isolation for different user queries by dividing nodes and restricting user resource usage. Furthermore, we can create different users for different business departments and restrict each user from using different resource groups. In order to avoid the use of resource interference between different business parts. For example, there is a business table in the cluster that needs to be shared by all 9 business departments, but it is hoped that resource preemption between different departments can be avoided as much as possible. Then we can create 3 copies of this table and store them in 3 resource groups. Next, we create 9 users for 9 business departments, and limit the use of one resource group for every 3 users. In this way, the degree of competition for resources is reduced from 9 to 3.
|
||||
|
||||
On the other hand, for the isolation of online and offline tasks. We can use resource groups to achieve this. For example, we can divide nodes into two resource groups, Online and Offline. The table data is still stored in 3 copies, of which 2 copies are stored in the Online resource group, and 1 copy is stored in the Offline resource group. The Online resource group is mainly used for online data services with high concurrency and low latency. Some large queries or offline ETL operations can be executed using nodes in the Offline resource group. So as to realize the ability to provide online and offline services simultaneously in a unified cluster.
|
||||
|
||||
## Single query resource limit
|
||||
|
||||
The resource group method mentioned earlier is resource isolation and restriction at the node level. In the resource group, resource preemption problems may still occur. For example, as mentioned above, the three business departments are arranged in the same resource group. Although the degree of resource competition is reduced, the queries of these three departments may still affect each other.
|
||||
|
||||
Therefore, in addition to the resource group solution, Doris also provides a single query resource restriction function.
|
||||
|
||||
At present, Doris's resource restrictions on single queries are mainly divided into two aspects: CPU and memory restrictions.
|
||||
|
||||
1. Memory Limitation
|
||||
|
||||
Doris can limit the maximum memory overhead that a query is allowed to use. To ensure that the memory resources of the cluster will not be fully occupied by a query. We can set the memory limit in the following ways:
|
||||
|
||||
```
|
||||
// Set the session variable exec_mem_limit. Then all subsequent queries in the session (within the connection) use this memory limit.
|
||||
set exec_mem_limit=1G;
|
||||
// Set the global variable exec_mem_limit. Then all subsequent queries of all new sessions (new connections) use this memory limit.
|
||||
set global exec_mem_limit=1G;
|
||||
// Set the variable exec_mem_limit in SQL. Then the variable only affects this SQL.
|
||||
select /*+ SET_VAR(exec_mem_limit=1G) */ id, name from tbl where xxx;
|
||||
```
|
||||
|
||||
Because Doris' query engine is based on the full-memory MPP query framework. Therefore, when the memory usage of a query exceeds the limit, the query will be terminated. Therefore, when a query cannot run under a reasonable memory limit, we need to solve it through some SQL optimization methods or cluster expansion.
|
||||
|
||||
2. CPU limitations
|
||||
|
||||
Users can limit the CPU resources of the query in the following ways:
|
||||
|
||||
```
|
||||
// Set the session variable cpu_resource_limit. Then all queries in the session (within the connection) will use this CPU limit.
|
||||
set cpu_resource_limit = 2
|
||||
// Set the user's attribute cpu_resource_limit, then all queries of this user will use this CPU limit. The priority of this attribute is higher than the session variable cpu_resource_limit
|
||||
set property for'user1''cpu_resource_limit' = '3';
|
||||
```
|
||||
|
||||
The value of `cpu_resource_limit` is a relative value. The larger the value, the more CPU resources can be used. However, the upper limit of the CPU that can be used by a query also depends on the number of partitions and buckets of the table. In principle, the maximum CPU usage of a query is positively related to the number of tablets involved in the query. In extreme cases, assuming that a query involves only one tablet, even if `cpu_resource_limit` is set to a larger value, only 1 CPU resource can be used.
|
||||
|
||||
Through memory and CPU resource limits. We can divide user queries into more fine-grained resources within a resource group. For example, we can make some offline tasks with low timeliness requirements, but with a large amount of calculation, use less CPU resources and more memory resources. Some delay-sensitive online tasks use more CPU resources and reasonable memory resources.
|
||||
|
||||
## Best practices and forward compatibility
|
||||
|
||||
Tag division and CPU limitation are new features in version 0.15. In order to ensure a smooth upgrade from the old version, Doris has made the following forward compatibility:
|
||||
|
||||
1. Each BE node will have a default Tag: `"tag.location": "default"`.
|
||||
2. The BE node added through the `alter system add backend` statement will also set Tag: `"tag.location": "default"` by default.
|
||||
2. The copy distribution of all tables is modified by default to: `"tag.location.default:xx`. xx is the number of original copies.
|
||||
3. Users can still specify the number of replicas in the table creation statement by `"replication_num" = "xx"`, this attribute will be automatically converted to: `"tag.location.default:xx`. This ensures that there is no need to modify the original creation. Table statement.
|
||||
4. By default, the memory limit for a single query is 2GB for a single node, and the CPU resources are unlimited, which is consistent with the original behavior. And the user's `resource_tags.location` attribute is empty, that is, by default, the user can access the BE of any Tag, which is consistent with the original behavior.
|
||||
|
||||
Here we give an example of the steps to start using the resource division function after upgrading from the original cluster to version 0.15:
|
||||
|
||||
1. Turn off data repair and balance logic
|
||||
|
||||
After the upgrade, the default Tag of BE is `"tag.location": "default"`, and the default copy distribution of the table is: `"tag.location.default:xx`. So if you directly modify the Tag of BE, the system will Automatically detect changes in the distribution of copies, and start data redistribution. This may occupy some system resources. So we can turn off the data repair and balance logic before modifying the tag to ensure that there will be no copies when we plan resources Redistribution operation.
|
||||
|
||||
```
|
||||
ADMIN SET FRONTEND CONFIG ("disable_balance" = "true");
|
||||
ADMIN SET FRONTEND CONFIG ("disable_tablet_scheduler" = "true");
|
||||
```
|
||||
|
||||
2. Set Tag and table copy distribution
|
||||
|
||||
Next, you can use the `alter system modify backend` statement to set the BE Tag. And through the `alter table` statement to modify the copy distribution strategy of the table. Examples are as follows:
|
||||
|
||||
```
|
||||
alter system modify backend "host1:9050, 1212:9050" set ("tag.location": "group_a");
|
||||
alter table my_table modify partition p1 set ("replica_allocation" = "tag.location.group_a:2");
|
||||
```
|
||||
|
||||
3. Turn on data repair and balance logic
|
||||
|
||||
After the tag and copy distribution are set, we can turn on the data repair and equalization logic to trigger data redistribution.
|
||||
|
||||
```
|
||||
ADMIN SET FRONTEND CONFIG ("disable_balance" = "false");
|
||||
ADMIN SET FRONTEND CONFIG ("disable_tablet_scheduler" = "false");
|
||||
```
|
||||
|
||||
This process will continue for a period of time depending on the amount of data involved. And it will cause some colocation tables to fail colocation planning (because the copy is being migrated). You can view the progress by `show proc "/cluster_balance/`. You can also judge the progress by the number of `UnhealthyTabletNum` in `show proc "/statistic"`. When `UnhealthyTabletNum` drops to 0, it means that the data redistribution is completed. .
|
||||
|
||||
4. Set the user's resource label permissions.
|
||||
|
||||
After the data is redistributed. We can start to set the user's resource label permissions. Because by default, the user's `resource_tags.location` attribute is empty, that is, the BE of any tag can be accessed. Therefore, in the previous steps, the normal query of existing users will not be affected. When the `resource_tags.location` property is not empty, the user will be restricted from accessing the BE of the specified Tag.
|
||||
|
||||
Through the above 4 steps, we can smoothly use the resource division function after the original cluster is upgraded.
|
||||
@ -237,6 +237,14 @@ distribution_info
|
||||
* `replication_num`
|
||||
|
||||
Number of copies. The default number of copies is 3. If the number of BE nodes is less than 3, you need to specify that the number of copies is less than or equal to the number of BE nodes.
|
||||
|
||||
After version 0.15, this attribute will be automatically converted to the `replica_allocation` attribute, such as:
|
||||
|
||||
`"replication_num" = "3"` will be automatically converted to `"replica_allocation" = "tag.location.default:3"`
|
||||
|
||||
* `replica_allocation`
|
||||
|
||||
Set the copy distribution according to Tag. This attribute can completely cover the function of the `replication_num` attribute.
|
||||
|
||||
* `storage_medium/storage_cooldown_time`
|
||||
|
||||
@ -478,6 +486,38 @@ distribution_info
|
||||
PROPERTIES("replication_num" = "3");
|
||||
```
|
||||
|
||||
10. Set the replica of the table through the `replica_allocation` property.
|
||||
|
||||
```sql
|
||||
CREATE TABLE example_db.table_hash
|
||||
(
|
||||
k1 TINYINT,
|
||||
k2 DECIMAL(10, 2) DEFAULT "10.5"
|
||||
)
|
||||
DISTRIBUTED BY HASH(k1) BUCKETS 32
|
||||
PROPERTIES (
|
||||
"replica_allocation"="tag.location.group_a:1, tag.location.group_b:2"
|
||||
);
|
||||
|
||||
CREATE TABLE example_db.dynamic_partition
|
||||
(
|
||||
k1 DATE,
|
||||
k2 INT,
|
||||
k3 SMALLINT,
|
||||
v1 VARCHAR(2048),
|
||||
v2 DATETIME DEFAULT "2014-02-04 15:36:00"
|
||||
)
|
||||
PARTITION BY RANGE (k1) ()
|
||||
DISTRIBUTED BY HASH(k2) BUCKETS 32
|
||||
PROPERTIES(
|
||||
"dynamic_partition.time_unit" = "DAY",
|
||||
"dynamic_partition.start" = "-3",
|
||||
"dynamic_partition.end" = "3",
|
||||
"dynamic_partition.prefix" = "p",
|
||||
"dynamic_partition.buckets" = "32",
|
||||
"dynamic_partition."replica_allocation" = "tag.location.group_a:3"
|
||||
);
|
||||
```
|
||||
### Keywords
|
||||
|
||||
CREATE, TABLE
|
||||
|
||||
@ -44,6 +44,7 @@ sql_block_rules: set sql block rules。After setting, if the query user execute
|
||||
cpu_resource_limit: limit the cpu resource usage of a query. See session variable `cpu_resource_limit`.
|
||||
resource.cpu_share: cpu resource assignment.(Derepcated)
|
||||
Load_cluster. {cluster_name}. priority: assigns priority to a specified cluster, which can be HIGH or NORMAL
|
||||
resource_tags: Specify the user's resource tag permissions.
|
||||
|
||||
Ordinary user rights:
|
||||
Quota.normal: Resource allocation at the normal level.
|
||||
@ -89,6 +90,9 @@ SET PROPERTY FOR 'jack' 'sql_block_rules' = 'rule1, rule2';
|
||||
10. Modify the cpu resource usage limit for jack
|
||||
SET PROPERTY FOR 'jack' 'cpu_resource_limit' = '2';
|
||||
|
||||
11. Modify user's resource tag permission
|
||||
SET PROPERTY FOR 'jack' 'resource_tags.location' = 'group_a, group_b';
|
||||
|
||||
## keyword
|
||||
SET, PROPERTY
|
||||
|
||||
|
||||
@ -47,6 +47,8 @@ ALTER SYSTEM DROP BROKER broker_name "host:port"[,"host:port"...];
|
||||
ALTER SYSTEM DROP ALL BROKER broker_name
|
||||
9) Set up a Load error hub for centralized display of import error information
|
||||
ALTER SYSTEM SET LOAD ERRORS HUB PROPERTIES ("key" = "value"[, ...]);
|
||||
10) Modify property of BE
|
||||
ALTER SYSTEM MODIFY BACKEND "host:heartbeat_port" SET ("key" = "value"[, ...]);
|
||||
|
||||
Explain:
|
||||
1) Host can be hostname or IP address
|
||||
@ -115,5 +117,9 @@ ALTER SYSTEM SET LOAD ERRORS HUB PROPERTIES
|
||||
ALTER SYSTEM SET LOAD ERRORS HUB PROPERTIES
|
||||
("type"= "null");
|
||||
|
||||
9. Modify BE resource tag
|
||||
|
||||
ALTER SYSTEM MODIFY BACKEND "host1:9050" SET ("tag.location": "group_a");
|
||||
|
||||
## keyword
|
||||
AGE,SYSTEM,BACKGROUND,BROKER,FREE
|
||||
|
||||
@ -261,7 +261,8 @@ Syntax:
|
||||
PROPERTIES (
|
||||
"storage_medium" = "[SSD|HDD]",
|
||||
["storage_cooldown_time" = "yyyy-MM-dd HH:mm:ss"],
|
||||
["replication_num" = "3"]
|
||||
["replication_num" = "3"],
|
||||
["replica_allocation" = "xxx"]
|
||||
)
|
||||
```
|
||||
|
||||
@ -271,6 +272,8 @@ Syntax:
|
||||
Default is 30 days.
|
||||
Format: "yyyy-MM-dd HH:mm:ss"
|
||||
replication_num: Replication number of a partition. Default is 3.
|
||||
replica_allocation: Specify the distribution of replicas according to the resource tag.
|
||||
|
||||
If table is not range partitions. This property takes on Table level. Or it will takes on Partition level.
|
||||
User can specify different properties for different partition by `ADD PARTITION` or `MODIFY PARTITION` statements.
|
||||
2) If Engine type is olap, user can set bloom filter index for column.
|
||||
@ -682,12 +685,7 @@ Syntax:
|
||||
)
|
||||
ENGINE=olap
|
||||
DUPLICATE KEY(k1, k2, k3)
|
||||
PARTITION BY RANGE (k1)
|
||||
(
|
||||
PARTITION p1 VALUES LESS THAN ("2014-01-01"),
|
||||
PARTITION p2 VALUES LESS THAN ("2014-06-01"),
|
||||
PARTITION p3 VALUES LESS THAN ("2014-12-01")
|
||||
)
|
||||
PARTITION BY RANGE (k1) ()
|
||||
DISTRIBUTED BY HASH(k2) BUCKETS 32
|
||||
PROPERTIES(
|
||||
"storage_medium" = "SSD",
|
||||
@ -752,6 +750,39 @@ Syntax:
|
||||
);
|
||||
```
|
||||
|
||||
16. Specify the replica distribution of the table through replica_allocation
|
||||
|
||||
```
|
||||
CREATE TABLE example_db.table_hash
|
||||
(
|
||||
k1 TINYINT,
|
||||
k2 DECIMAL(10, 2) DEFAULT "10.5"
|
||||
)
|
||||
DISTRIBUTED BY HASH(k1) BUCKETS 32
|
||||
PROPERTIES (
|
||||
"replica_allocation"="tag.location.group_a:1, tag.location.group_b:2"
|
||||
);
|
||||
|
||||
CREATE TABLE example_db.dynamic_partition
|
||||
(
|
||||
k1 DATE,
|
||||
k2 INT,
|
||||
k3 SMALLINT,
|
||||
v1 VARCHAR(2048),
|
||||
v2 DATETIME DEFAULT "2014-02-04 15:36:00"
|
||||
)
|
||||
PARTITION BY RANGE (k1) ()
|
||||
DISTRIBUTED BY HASH(k2) BUCKETS 32
|
||||
PROPERTIES(
|
||||
"dynamic_partition.time_unit" = "DAY",
|
||||
"dynamic_partition.start" = "-3",
|
||||
"dynamic_partition.end" = "3",
|
||||
"dynamic_partition.prefix" = "p",
|
||||
"dynamic_partition.buckets" = "32",
|
||||
"dynamic_partition."replica_allocation" = "tag.location.group_a:3"
|
||||
);
|
||||
```
|
||||
|
||||
## keyword
|
||||
|
||||
CREATE,TABLE
|
||||
|
||||
@ -347,6 +347,14 @@ CumulativeCompaction会跳过最近发布的增量,以防止压缩可能被查
|
||||
|
||||
与base_compaction_trace_threshold类似。
|
||||
|
||||
### disable_compaction_trace_log
|
||||
|
||||
* 类型: bool
|
||||
* 描述: 关闭compaction的trace日志
|
||||
* 默认值: true
|
||||
|
||||
如果设置为true,`cumulative_compaction_trace_threshold` 和 `base_compaction_trace_threshold` 将不起作用。并且trace日志将关闭。
|
||||
|
||||
### `cumulative_compaction_policy`
|
||||
|
||||
* 类型:string
|
||||
@ -1431,7 +1439,6 @@ webserver默认工作线程数
|
||||
```
|
||||
* 默认值: 3
|
||||
|
||||
|
||||
### `mem_tracker_level`
|
||||
|
||||
* 类型: int16
|
||||
@ -1447,3 +1454,17 @@ webserver默认工作线程数
|
||||
* 类型: int32
|
||||
* 描述: 用于限制导入时,新产生的rowset中的segment数量。如果超过阈值,导入会失败并报错 -238。过多的 segment 会导致compaction占用大量内存引发 OOM 错误。
|
||||
* 默认值: 100
|
||||
|
||||
### `remote_storage_read_buffer_mb`
|
||||
|
||||
* 类型: int32
|
||||
* 描述: 读取hdfs或者对象存储上的文件时,使用的缓存大小。
|
||||
* 默认值: 16MB
|
||||
|
||||
增大这个值,可以减少远端数据读取的调用次数,但会增加内存开销。
|
||||
|
||||
### `external_table_connect_timeout_sec`
|
||||
|
||||
* 类型: int32
|
||||
* 描述: 和外部表建立连接的超时时间。
|
||||
* 默认值: 5秒
|
||||
|
||||
@ -216,7 +216,7 @@ workers 线程池默认不做设置,根据自己需要进行设置
|
||||
|
||||
### default_db_data_quota_bytes
|
||||
|
||||
默认值:1TB
|
||||
默认值:1PB
|
||||
|
||||
是否可以动态配置:true
|
||||
|
||||
@ -2055,3 +2055,13 @@ load 标签清理器将每隔 `label_clean_interval_second` 运行一次以清
|
||||
|
||||
是否使用压缩格式发送查询计划结构体。开启后,可以降低约50%的查询计划结构体大小,从而避免一些 "send fragment timeout" 错误。
|
||||
但是在某些高并发小查询场景下,可能会降低约10%的并发度。
|
||||
|
||||
### disable_tablet_scheduler
|
||||
|
||||
默认值:false
|
||||
|
||||
是否可以动态配置:true
|
||||
|
||||
是否为 Master FE 节点独有的配置项:false
|
||||
|
||||
如果设置为true,将关闭副本修复和均衡逻辑。
|
||||
|
||||
222
docs/zh-CN/administrator-guide/multi-tenant.md
Normal file
222
docs/zh-CN/administrator-guide/multi-tenant.md
Normal file
@ -0,0 +1,222 @@
|
||||
---
|
||||
{
|
||||
"title": "多租户和资源划分",
|
||||
"language": "zh-CN"
|
||||
}
|
||||
---
|
||||
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one
|
||||
or more contributor license agreements. See the NOTICE file
|
||||
distributed with this work for additional information
|
||||
regarding copyright ownership. The ASF licenses this file
|
||||
to you under the Apache License, Version 2.0 (the
|
||||
"License"); you may not use this file except in compliance
|
||||
with the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing,
|
||||
software distributed under the License is distributed on an
|
||||
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
KIND, either express or implied. See the License for the
|
||||
specific language governing permissions and limitations
|
||||
under the License.
|
||||
-->
|
||||
|
||||
# 多租户和资源划分
|
||||
|
||||
Doris 的多租户和资源隔离方案,主要目的是为了多用户在同一 Doris 集群内进行数据操作时,减少相互之间的干扰,能够将集群资源更合理的分配给各用户。
|
||||
|
||||
该方案主要分为两部分,一是集群内节点级别的资源组划分,二是针对单个查询的资源限制。
|
||||
|
||||
## Doris 中的节点
|
||||
|
||||
首先先简单介绍一下 Doris 的节点组成。一个 Doris 集群中有两类节点:Frontend(FE) 和 Backend(BE)。
|
||||
|
||||
FE 主要负责元数据管理、集群管理、用户请求的接入和查询计划的解析等工作。
|
||||
|
||||
BE 主要负责数据存储、查询计划的执行等工作。
|
||||
|
||||
FE 不参与用户数据的处理计算等工作,因此是一个资源消耗较低的节点。而 BE 负责所有的数据计算、任务处理,属于资源消耗型的节点。因此,本文所介绍的资源划分及资源限制方案,都是针对 BE 节点的。FE 节点因为资源消耗相对较低,并且还可以横向扩展,因此通常无需做资源上的隔离和限制,FE 节点由所有用户共享即可。
|
||||
|
||||
## 节点资源划分
|
||||
|
||||
节点资源划分,是指将一个 Doris 集群内的 BE 节点设置标签(Tag),标签相同的 BE 节点组成一个资源组(Resource Group)。资源组可以看作是数据存储和计算的一个管理单元。下面我们通过一个具体示例,来介绍资源组的使用方式。
|
||||
|
||||
1. 为 BE 节点设置标签
|
||||
|
||||
假设当前 Doris 集群有 6 个 BE 节点。分别为 host[1-6]。在初始情况下,所有节点都属于一个默认资源组(Default)。
|
||||
|
||||
我们可以使用以下命令将这6个节点划分成3个资源组:group_a、group_b、group_c:
|
||||
|
||||
```sql
|
||||
alter system modify backend "host1:9050" set ("tag.location": "group_a");
|
||||
alter system modify backend "host2:9050" set ("tag.location": "group_a");
|
||||
alter system modify backend "host3:9050" set ("tag.location": "group_b");
|
||||
alter system modify backend "host4:9050" set ("tag.location": "group_b");
|
||||
alter system modify backend "host5:9050" set ("tag.location": "group_c");
|
||||
alter system modify backend "host6:9050" set ("tag.location": "group_c");
|
||||
```
|
||||
|
||||
这里我们将 `host[1-2]` 组成资源组 `group_a`,`host[3-4]` 组成资源组 `group_b`,`host[5-6]` 组成资源组 `group_c`。
|
||||
|
||||
> 注:一个 BE 只支持设置一个 Tag。
|
||||
|
||||
2. 按照资源组分配数据分布
|
||||
|
||||
资源组划分好后。我们可以将用户数据的不同副本分布在不同资源组内。假设一张用户表 UserTable。我们希望在3个资源组内各存放一个副本,则可以通过如下建表语句实现:
|
||||
|
||||
```sql
|
||||
create table UserTable
|
||||
(k1 int, k2 int)
|
||||
distributed by hash(k1) buckets 1
|
||||
properties(
|
||||
"replica_allocation"
|
||||
=
|
||||
"tag.location.group_a:1, tag.location.group_b:1, tag.location.group_c:1"
|
||||
)
|
||||
```
|
||||
|
||||
这样一来,表 UserTable 中的数据,将会以3副本的形式,分别存储在资源组 group_a、group_b、group_c所在的节点中。
|
||||
|
||||
下图展示了当前的节点划分和数据分布:
|
||||
|
||||
```
|
||||
┌────────────────────────────────────────────────────┐
|
||||
│ │
|
||||
│ ┌──────────────────┐ ┌──────────────────┐ │
|
||||
│ │ host1 │ │ host2 │ │
|
||||
│ │ ┌─────────────┐ │ │ │ │
|
||||
│ group_a │ │ replica1 │ │ │ │ │
|
||||
│ │ └─────────────┘ │ │ │ │
|
||||
│ │ │ │ │ │
|
||||
│ └──────────────────┘ └──────────────────┘ │
|
||||
│ │
|
||||
├────────────────────────────────────────────────────┤
|
||||
├────────────────────────────────────────────────────┤
|
||||
│ │
|
||||
│ ┌──────────────────┐ ┌──────────────────┐ │
|
||||
│ │ host3 │ │ host4 │ │
|
||||
│ │ │ │ ┌─────────────┐ │ │
|
||||
│ group_b │ │ │ │ replica2 │ │ │
|
||||
│ │ │ │ └─────────────┘ │ │
|
||||
│ │ │ │ │ │
|
||||
│ └──────────────────┘ └──────────────────┘ │
|
||||
│ │
|
||||
├────────────────────────────────────────────────────┤
|
||||
├────────────────────────────────────────────────────┤
|
||||
│ │
|
||||
│ ┌──────────────────┐ ┌──────────────────┐ │
|
||||
│ │ host5 │ │ host6 │ │
|
||||
│ │ │ │ ┌─────────────┐ │ │
|
||||
│ group_c │ │ │ │ replica3 │ │ │
|
||||
│ │ │ │ └─────────────┘ │ │
|
||||
│ │ │ │ │ │
|
||||
│ └──────────────────┘ └──────────────────┘ │
|
||||
│ │
|
||||
└────────────────────────────────────────────────────┘
|
||||
```
|
||||
|
||||
3. 使用不同资源组进行数据查询
|
||||
|
||||
在前两步执行完成后,我们就可以通过设置用户的资源使用权限,来限制某一用户的查询,只能使用指定资源组中的节点来执行。
|
||||
|
||||
比如我们可以通过以下语句,限制 user1 只能使用 `group_a` 资源组中的节点进行数据查询,user2 只能使用 `group_b` 资源组,而 user3 可以同时使用 3 个资源组:
|
||||
|
||||
```sql
|
||||
set property for 'user1' 'resource_tags.location' : 'group_a';
|
||||
set property for 'user2' 'resource_tags.location' : 'group_b';
|
||||
set property for 'user3' 'resource_tags.location' : 'group_a, group_b, group_c';
|
||||
```
|
||||
|
||||
设置完成后,user1 在发起对 UserTable 表的查询时,只会访问 `group_a` 资源组内节点上的数据副本,并且查询仅会使用 `group_a` 资源组内的节点计算资源。而 user3 的查询可以使用任意资源组内的副本和计算资源。
|
||||
|
||||
这样,我们通过对节点的划分,以及对用户的资源使用限制,实现了不同用户查询上的物理资源隔离。更进一步,我们可以给不同的业务部门创建不同的用户,并限制每个用户使用不同的资源组。以避免不同业务部分之间使用资源干扰。比如集群内有一张业务表需要共享给所有9个业务部门使用,但是希望能够尽量避免不同部门之间的资源抢占。则我们可以为这张表创建3个副本,分别存储在3个资源组中。接下来,我们为9个业务部门创建9个用户,每3个用户限制使用一个资源组。这样,资源的竞争程度就由9降低到了3。
|
||||
|
||||
另一方面,针对在线和离线任务的隔离。我们可以利用资源组的方式实现。比如我们可以将节点划分为 Online 和 Offline 两个资源组。表数据依然以3副本的方式存储,其中 2 个副本存放在 Online 资源组,1 个副本存放在 Offline 资源组。Online 资源组主要用于高并发低延迟的在线数据服务,而一些大查询或离线ETL操作,则可以使用 Offline 资源组中的节点执行。从而实现在统一集群内同时提供在线和离线服务的能力。
|
||||
|
||||
## 单查询资源限制
|
||||
|
||||
前面提到的资源组方法是节点级别的资源隔离和限制。而在资源组内,依然可能发生资源抢占问题。比如前文提到的将3个业务部门安排在同一资源组内。虽然降低了资源竞争程度,但是这3各部门的查询依然有可能相互影响。
|
||||
|
||||
因此,除了资源组方案外,Doris 还提供了对单查询的资源限制功能。
|
||||
|
||||
目前 Doris 对单查询的资源限制主要分为 CPU 和 内存限制两方面。
|
||||
|
||||
1. 内存限制
|
||||
|
||||
Doris 可以限制一个查询被允许使用的最大内存开销。以保证集群的内存资源不会被某一个查询全部占用。我们可以通过以下方式设置内存限制:
|
||||
|
||||
```
|
||||
// 设置会话变量 exec_mem_limit。则之后该会话内(连接内)的所有查询都使用这个内存限制。
|
||||
set exec_mem_limit=1G;
|
||||
// 设置全局变量 exec_mem_limit。则之后所有新会话(新连接)的所有查询都使用这个内存限制。
|
||||
set global exec_mem_limit=1G;
|
||||
// 在 SQL 中设置变量 exec_mem_limit。则该变量仅影响这个 SQL。
|
||||
select /*+ SET_VAR(exec_mem_limit=1G) */ id, name from tbl where xxx;
|
||||
```
|
||||
|
||||
因为 Doris 的查询引擎是基于全内存的 MPP 查询框架。因此当一个查询的内存使用超过限制后,查询会被终止。因此,当一个查询无法在合理的内存限制下运行时,我们就需要通过一些 SQL 优化手段,或者集群扩容的方式来解决了。
|
||||
|
||||
2. CPU 限制
|
||||
|
||||
用户可以通过以下方式限制查询的 CPU 资源:
|
||||
|
||||
```
|
||||
// 设置会话变量 cpu_resource_limit。则之后该会话内(连接内)的所有查询都使用这个CPU限制。
|
||||
set cpu_resource_limit = 2
|
||||
// 设置用户的属性 cpu_resource_limit,则所有该用户的查询情况都使用这个CPU限制。该属性的优先级高于会话变量 cpu_resource_limit
|
||||
set property for 'user1' 'cpu_resource_limit' = '3';
|
||||
```
|
||||
|
||||
`cpu_resource_limit` 的取值是一个相对值,取值越大则能够使用的 CPU 资源越多。但一个查询能使用的CPU上限也取决于表的分区分桶数。原则上,一个查询的最大 CPU 使用量和查询涉及到的 tablet 数量正相关。极端情况下,假设一个查询仅涉及到一个 tablet,则即使 `cpu_resource_limit` 设置一个较大值,也仅能使用 1 个 CPU 资源。
|
||||
|
||||
通过内存和CPU的资源限制。我们可以在一个资源组内,将用户的查询进行更细粒度的资源划分。比如我们可以让部分时效性要求不高,但是计算量很大的离线任务使用更少的CPU资源和更多的内存资源。而部分延迟敏感的在线任务,使用更多的CPU资源以及合理的内存资源。
|
||||
|
||||
## 最佳实践和向前兼容
|
||||
|
||||
Tag 划分和 CPU 限制是 0.15 版本中的新功能。为了保证可以从老版本平滑升级,Doris 做了如下的向前兼容:
|
||||
|
||||
1. 每个 BE 节点会有一个默认的 Tag:`"tag.location": "default"`。
|
||||
2. 通过 `alter system add backend` 语句新增的 BE 节点也会默认设置 Tag:`"tag.location": "default"`。
|
||||
2. 所有表的副本分布默认修改为:`"tag.location.default:xx`。其中 xx 为原副本数量。
|
||||
3. 用户依然可以通过 `"replication_num" = "xx"` 在建表语句中指定副本数,这种属性将会自动转换成:`"tag.location.default:xx`。从而保证无需修改原建表语句。
|
||||
4. 默认情况下,单查询的内存限制为单节点2GB,CPU资源无限制,和原有行为保持一致。且用户的 `resource_tags.location` 属性为空,即默认情况下,用户可以访问任意 Tag 的 BE,和原有行为保持一致。
|
||||
|
||||
这里我们给出一个从原集群升级到 0.15 版本后,开始使用资源划分功能的步骤示例:
|
||||
|
||||
1. 关闭数据修复与均衡逻辑
|
||||
|
||||
因为升级后,BE的默认Tag为 `"tag.location": "default"`,而表的默认副本分布为:`"tag.location.default:xx`。所以如果直接修改 BE 的 Tag,系统会自动检测到副本分布的变化,从而开始数据重分布。这可能会占用部分系统资源。所以我们可以在修改 Tag 前,先关闭数据修复与均衡逻辑,以保证我们在规划资源时,不会有副本重分布的操作。
|
||||
|
||||
```
|
||||
ADMIN SET FRONTEND CONFIG ("disable_balance" = "true");
|
||||
ADMIN SET FRONTEND CONFIG ("disable_tablet_scheduler" = "true");
|
||||
```
|
||||
|
||||
2. 设置 Tag 和表副本分布
|
||||
|
||||
接下来可以通过 `alter system modify backend` 语句进行 BE 的 Tag 设置。以及通过 `alter table` 语句修改表的副本分布策略。示例如下:
|
||||
|
||||
```
|
||||
alter system modify backend "host1:9050, 1212:9050" set ("tag.location": "group_a");
|
||||
alter table my_table modify partition p1 set ("replica_allocation" = "tag.location.group_a:2");
|
||||
```
|
||||
|
||||
3. 开启数据修复与均衡逻辑
|
||||
|
||||
在 Tag 和副本分布都设置完毕后,我们可以开启数据修复与均衡逻辑来触发数据的重分布了。
|
||||
|
||||
```
|
||||
ADMIN SET FRONTEND CONFIG ("disable_balance" = "false");
|
||||
ADMIN SET FRONTEND CONFIG ("disable_tablet_scheduler" = "false");
|
||||
```
|
||||
|
||||
该过程根据涉及到的数据量会持续一段时间。并且会导致部分 colocation table 无法进行 colocation 规划(因为副本在迁移中)。可以通过 ` show proc "/cluster_balance/` 来查看进度。也可以通过 `show proc "/statistic"` 中 `UnhealthyTabletNum` 的数量来判断进度。当 `UnhealthyTabletNum` 降为 0 时,则代表数据重分布完毕。
|
||||
|
||||
4. 设置用户的资源标签权限。
|
||||
|
||||
等数据重分布完毕后。我们就可以开始设置用户的资源标签权限了。因为默认情况下,用户的 `resource_tags.location` 属性为空,即可以访问任意 Tag 的 BE。所以在前面步骤中,不会影响到已有用户的正常查询。当 `resource_tags.location` 属性非空时,用户将被限制访问指定 Tag 的 BE。
|
||||
|
||||
通过以上4步,我们可以较为平滑的在原有集群升级后,使用资源划分功能。
|
||||
@ -238,6 +238,14 @@ distribution_info
|
||||
|
||||
副本数。默认副本数为3。如果 BE 节点数量小于3,则需指定副本数小于等于 BE 节点数量。
|
||||
|
||||
在 0.15 版本后,该属性将自动转换成 `replica_allocation` 属性,如:
|
||||
|
||||
`"replication_num" = "3"` 会自动转换成 `"replica_allocation" = "tag.location.default:3"`
|
||||
|
||||
* `replica_allocation`
|
||||
|
||||
根据 Tag 设置副本分布情况。该属性可以完全覆盖 `replication_num` 属性的功能。
|
||||
|
||||
* `storage_medium/storage_cooldown_time`
|
||||
|
||||
数据存储介质。`storage_medium` 用于声明表数据的初始存储介质,而 `storage_cooldown_time` 用于设定到期时间。示例:
|
||||
@ -478,6 +486,40 @@ distribution_info
|
||||
PROPERTIES("replication_num" = "3");
|
||||
```
|
||||
|
||||
10. 通过 `replica_allocation` 属性设置表的副本。
|
||||
|
||||
```sql
|
||||
CREATE TABLE example_db.table_hash
|
||||
(
|
||||
k1 TINYINT,
|
||||
k2 DECIMAL(10, 2) DEFAULT "10.5"
|
||||
)
|
||||
DISTRIBUTED BY HASH(k1) BUCKETS 32
|
||||
PROPERTIES (
|
||||
"replica_allocation"="tag.location.group_a:1, tag.location.group_b:2"
|
||||
);
|
||||
|
||||
|
||||
CREATE TABLE example_db.dynamic_partition
|
||||
(
|
||||
k1 DATE,
|
||||
k2 INT,
|
||||
k3 SMALLINT,
|
||||
v1 VARCHAR(2048),
|
||||
v2 DATETIME DEFAULT "2014-02-04 15:36:00"
|
||||
)
|
||||
PARTITION BY RANGE (k1) ()
|
||||
DISTRIBUTED BY HASH(k2) BUCKETS 32
|
||||
PROPERTIES(
|
||||
"dynamic_partition.time_unit" = "DAY",
|
||||
"dynamic_partition.start" = "-3",
|
||||
"dynamic_partition.end" = "3",
|
||||
"dynamic_partition.prefix" = "p",
|
||||
"dynamic_partition.buckets" = "32",
|
||||
"dynamic_partition."replica_allocation" = "tag.location.group_a:3"
|
||||
);
|
||||
```
|
||||
|
||||
### Keywords
|
||||
|
||||
CREATE, TABLE
|
||||
|
||||
@ -44,6 +44,7 @@ under the License.
|
||||
cpu_resource_limit: 限制查询的cpu资源。详见会话变量 `cpu_resource_limit` 的介绍。
|
||||
resource.cpu_share: cpu资源分配。(已废弃)
|
||||
load_cluster.{cluster_name}.priority: 为指定的cluster分配优先级,可以为 HIGH 或 NORMAL
|
||||
resource_tags:指定用户的资源标签权限。
|
||||
|
||||
普通用户权限:
|
||||
quota.normal: normal级别的资源分配。
|
||||
@ -89,6 +90,9 @@ under the License.
|
||||
10. 修改用户jack的 cpu 使用限制
|
||||
SET PROPERTY FOR 'jack' 'cpu_resource_limit' = '2';
|
||||
|
||||
11. 修改用户的资源标签权限
|
||||
SET PROPERTY FOR 'jack' 'resource_tags.location' = 'group_a, group_b';
|
||||
|
||||
## keyword
|
||||
SET, PROPERTY
|
||||
|
||||
|
||||
@ -47,6 +47,8 @@ under the License.
|
||||
ALTER SYSTEM DROP ALL BROKER broker_name
|
||||
9) 设置一个 Load error hub,用于集中展示导入时的错误信息
|
||||
ALTER SYSTEM SET LOAD ERRORS HUB PROPERTIES ("key" = "value"[, ...]);
|
||||
10) 修改一个 BE 节点的属性
|
||||
ALTER SYSTEM MODIFY BACKEND "host:heartbeat_port" SET ("key" = "value"[, ...]);
|
||||
|
||||
说明:
|
||||
1) host 可以是主机名或者ip地址
|
||||
@ -72,6 +74,10 @@ under the License.
|
||||
broker: broker 的名称
|
||||
path: 远端存储路径
|
||||
other properties: 其他访问远端存储所必须的信息,比如认证信息等。
|
||||
|
||||
7) 修改 BE 节点属性目前支持以下属性:
|
||||
|
||||
1. tag.location:资源标签
|
||||
|
||||
## example
|
||||
|
||||
@ -114,6 +120,10 @@ under the License.
|
||||
8. 删除当前的 load error hub
|
||||
ALTER SYSTEM SET LOAD ERRORS HUB PROPERTIES
|
||||
("type"= "null");
|
||||
|
||||
9. 修改 BE 的资源标签
|
||||
|
||||
ALTER SYSTEM MODIFY BACKEND "host1:9050" SET ("tag.location": "group_a");
|
||||
|
||||
## keyword
|
||||
ALTER,SYSTEM,BACKEND,BROKER,FREE
|
||||
|
||||
@ -282,6 +282,7 @@ under the License.
|
||||
"storage_medium" = "[SSD|HDD]",
|
||||
["storage_cooldown_time" = "yyyy-MM-dd HH:mm:ss"],
|
||||
["replication_num" = "3"]
|
||||
["replica_allocation" = "xxx"]
|
||||
)
|
||||
```
|
||||
|
||||
@ -290,7 +291,8 @@ under the License.
|
||||
storage_cooldown_time: 当设置存储介质为 SSD 时,指定该分区在 SSD 上的存储到期时间。
|
||||
默认存放 30 天。
|
||||
格式为:"yyyy-MM-dd HH:mm:ss"
|
||||
replication_num: 指定分区的副本数。默认为 3
|
||||
replication_num: 指定分区的副本数。默认为 3。
|
||||
replica_allocation: 按照资源标签来指定副本分布。
|
||||
|
||||
当表为单分区表时,这些属性为表的属性。
|
||||
当表为两级分区时,这些属性为附属于每一个分区。
|
||||
@ -722,12 +724,7 @@ under the License.
|
||||
)
|
||||
ENGINE=olap
|
||||
DUPLICATE KEY(k1, k2, k3)
|
||||
PARTITION BY RANGE (k1)
|
||||
(
|
||||
PARTITION p1 VALUES LESS THAN ("2014-01-01"),
|
||||
PARTITION p2 VALUES LESS THAN ("2014-06-01"),
|
||||
PARTITION p3 VALUES LESS THAN ("2014-12-01")
|
||||
)
|
||||
PARTITION BY RANGE (k1) ()
|
||||
DISTRIBUTED BY HASH(k2) BUCKETS 32
|
||||
PROPERTIES(
|
||||
"storage_medium" = "SSD",
|
||||
@ -794,8 +791,40 @@ under the License.
|
||||
);
|
||||
```
|
||||
|
||||
## keyword
|
||||
```
|
||||
CREATE,TABLE
|
||||
16. 通过 replica_allocation 指定表的副本分布
|
||||
|
||||
```
|
||||
CREATE TABLE example_db.table_hash
|
||||
(
|
||||
k1 TINYINT,
|
||||
k2 DECIMAL(10, 2) DEFAULT "10.5"
|
||||
)
|
||||
DISTRIBUTED BY HASH(k1) BUCKETS 32
|
||||
PROPERTIES (
|
||||
"replica_allocation"="tag.location.group_a:1, tag.location.group_b:2"
|
||||
);
|
||||
|
||||
|
||||
CREATE TABLE example_db.dynamic_partition
|
||||
(
|
||||
k1 DATE,
|
||||
k2 INT,
|
||||
k3 SMALLINT,
|
||||
v1 VARCHAR(2048),
|
||||
v2 DATETIME DEFAULT "2014-02-04 15:36:00"
|
||||
)
|
||||
PARTITION BY RANGE (k1) ()
|
||||
DISTRIBUTED BY HASH(k2) BUCKETS 32
|
||||
PROPERTIES(
|
||||
"dynamic_partition.time_unit" = "DAY",
|
||||
"dynamic_partition.start" = "-3",
|
||||
"dynamic_partition.end" = "3",
|
||||
"dynamic_partition.prefix" = "p",
|
||||
"dynamic_partition.buckets" = "32",
|
||||
"dynamic_partition."replica_allocation" = "tag.location.group_a:3"
|
||||
);
|
||||
```
|
||||
|
||||
## keyword
|
||||
|
||||
CREATE,TABLE
|
||||
|
||||
@ -40,15 +40,15 @@ import org.apache.doris.resource.Tag;
|
||||
import org.apache.doris.system.Backend;
|
||||
import org.apache.doris.system.SystemInfoService;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.google.common.collect.Table;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
@ -247,6 +247,10 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon {
|
||||
isGroupStable = false;
|
||||
LOG.debug("get unhealthy tablet {} in colocate table. status: {}", tablet.getId(), st);
|
||||
|
||||
if (!tablet.readyToBeRepaired(Priority.HIGH)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
TabletSchedCtx tabletCtx = new TabletSchedCtx(
|
||||
TabletSchedCtx.Type.REPAIR, db.getClusterName(),
|
||||
db.getId(), tableId, partition.getId(), index.getId(), tablet.getId(),
|
||||
@ -259,10 +263,9 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon {
|
||||
tabletCtx.setTabletOrderIdx(idx);
|
||||
|
||||
AddResult res = tabletScheduler.addTablet(tabletCtx, false /* not force */);
|
||||
if (res == AddResult.LIMIT_EXCEED) {
|
||||
// tablet in scheduler exceed limit, skip this group and check next one.
|
||||
LOG.info("number of scheduling tablets in tablet scheduler"
|
||||
+ " exceed to limit. stop colocate table check");
|
||||
if (res == AddResult.LIMIT_EXCEED || res == AddResult.DISABLED) {
|
||||
// tablet in scheduler exceed limit, or scheduler is disabled, skip this group and check next one.
|
||||
LOG.info("tablet scheduler return: {}. stop colocate table check", res.name());
|
||||
break OUT;
|
||||
}
|
||||
}
|
||||
|
||||
@ -393,9 +393,8 @@ public class TabletChecker extends MasterDaemon {
|
||||
tabletCtx.setOrigPriority(statusWithPrio.second);
|
||||
|
||||
AddResult res = tabletScheduler.addTablet(tabletCtx, false /* not force */);
|
||||
if (res == AddResult.LIMIT_EXCEED) {
|
||||
LOG.info("number of scheduling tablets in tablet scheduler"
|
||||
+ " exceed to limit. stop tablet checker");
|
||||
if (res == AddResult.LIMIT_EXCEED || res == AddResult.DISABLED) {
|
||||
LOG.info("tablet scheduler return: {}. stop tablet checker", res.name());
|
||||
return LoopControlStatus.BREAK_OUT;
|
||||
} else if (res == AddResult.ADDED) {
|
||||
counter.addToSchedulerTabletNum++;
|
||||
|
||||
@ -138,7 +138,8 @@ public class TabletScheduler extends MasterDaemon {
|
||||
public enum AddResult {
|
||||
ADDED, // success to add
|
||||
ALREADY_IN, // already added, skip
|
||||
LIMIT_EXCEED // number of pending tablets exceed the limit
|
||||
LIMIT_EXCEED, // number of pending tablets exceed the limit
|
||||
DISABLED // scheduler has been disabled.
|
||||
}
|
||||
|
||||
public TabletScheduler(Catalog catalog, SystemInfoService infoService, TabletInvertedIndex invertedIndex,
|
||||
@ -216,6 +217,9 @@ public class TabletScheduler extends MasterDaemon {
|
||||
* if force is true, do not check if tablet is already added before.
|
||||
*/
|
||||
public synchronized AddResult addTablet(TabletSchedCtx tablet, boolean force) {
|
||||
if (!force && Config.disable_tablet_scheduler) {
|
||||
return AddResult.DISABLED;
|
||||
}
|
||||
if (!force && containsTablet(tablet.getTabletId())) {
|
||||
return AddResult.ALREADY_IN;
|
||||
}
|
||||
@ -1081,8 +1085,8 @@ public class TabletScheduler extends MasterDaemon {
|
||||
* and waiting to be scheduled.
|
||||
*/
|
||||
private void selectTabletsForBalance() {
|
||||
if (Config.disable_balance) {
|
||||
LOG.info("balance is disabled. skip selecting tablets for balance");
|
||||
if (Config.disable_balance || Config.disable_tablet_scheduler) {
|
||||
LOG.info("balance or tablet scheduler is disabled. skip selecting tablets for balance");
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
@ -700,44 +700,6 @@ public class Config extends ConfigBase {
|
||||
*/
|
||||
@ConfField(mutable = true, masterOnly = true)
|
||||
public static int tablet_delete_timeout_second = 2;
|
||||
/**
|
||||
* Clone checker's running interval.
|
||||
*/
|
||||
@ConfField public static int clone_checker_interval_second = 300;
|
||||
/**
|
||||
* Default timeout of a single clone job. Set long enough to fit your replica size.
|
||||
* The larger the replica data size is, the more time is will cost to finish clone.
|
||||
*/
|
||||
@ConfField(mutable = true, masterOnly = true)
|
||||
public static int clone_job_timeout_second = 7200; // 2h
|
||||
/**
|
||||
* Concurrency of LOW priority clone jobs.
|
||||
* Concurrency of High priority clone jobs is currently unlimited.
|
||||
*/
|
||||
@ConfField(mutable = true, masterOnly = true)
|
||||
public static int clone_max_job_num = 100;
|
||||
/**
|
||||
* LOW priority clone job's delay trigger time.
|
||||
* A clone job contains a tablet which need to be cloned(recovery or migration).
|
||||
* If the priority is LOW, it will be delayed *clone_low_priority_delay_second*
|
||||
* after the job creation and then be executed.
|
||||
* This is to avoid a large number of clone jobs running at same time only because a host is down for a short time.
|
||||
*
|
||||
* NOTICE that this config(and *clone_normal_priority_delay_second* as well)
|
||||
* will not work if it's smaller then *clone_checker_interval_second*
|
||||
*/
|
||||
@ConfField(mutable = true, masterOnly = true)
|
||||
public static int clone_low_priority_delay_second = 600;
|
||||
/**
|
||||
* NORMAL priority clone job's delay trigger time.
|
||||
*/
|
||||
@ConfField(mutable = true, masterOnly = true)
|
||||
public static int clone_normal_priority_delay_second = 300;
|
||||
/**
|
||||
* HIGH priority clone job's delay trigger time.
|
||||
*/
|
||||
@ConfField(mutable = true, masterOnly = true)
|
||||
public static int clone_high_priority_delay_second = 0;
|
||||
/**
|
||||
* the minimal delay seconds between a replica is failed and fe try to recovery it using clone.
|
||||
*/
|
||||
@ -1380,7 +1342,7 @@ public class Config extends ConfigBase {
|
||||
* Used to set default db data quota bytes.
|
||||
*/
|
||||
@ConfField(mutable = true, masterOnly = true)
|
||||
public static long default_db_data_quota_bytes = 1024 * 1024 * 1024 * 1024L; // 1TB
|
||||
public static long default_db_data_quota_bytes = 1024L * 1024 * 1024 * 1024 * 1024L; // 1PB
|
||||
|
||||
/*
|
||||
* Maximum percentage of data that can be filtered (due to reasons such as data is irregularly)
|
||||
@ -1508,4 +1470,10 @@ public class Config extends ConfigBase {
|
||||
*/
|
||||
@ConfField(mutable = true, masterOnly = false)
|
||||
public static boolean use_compact_thrift_rpc = true;
|
||||
|
||||
/*
|
||||
* If set to true, the tablet scheduler will not work, so that all tablet repair/balance task will not work.
|
||||
*/
|
||||
@ConfField(mutable = true, masterOnly = true)
|
||||
public static boolean disable_tablet_scheduler = false;
|
||||
}
|
||||
|
||||
@ -52,7 +52,7 @@ public class BackendsProcDir implements ProcDirInterface {
|
||||
.add("BePort").add("HttpPort").add("BrpcPort").add("LastStartTime").add("LastHeartbeat").add("Alive")
|
||||
.add("SystemDecommissioned").add("ClusterDecommissioned").add("TabletNum")
|
||||
.add("DataUsedCapacity").add("AvailCapacity").add("TotalCapacity").add("UsedPct")
|
||||
.add("MaxDiskUsedPct").add("ErrMsg").add("Version").add("Status")
|
||||
.add("MaxDiskUsedPct").add("Tag").add("ErrMsg").add("Version").add("Status")
|
||||
.build();
|
||||
|
||||
public static final int HOSTNAME_INDEX = 3;
|
||||
@ -166,18 +166,20 @@ public class BackendsProcDir implements ProcDirInterface {
|
||||
backendInfo.add(String.format("%.2f", backend.getMaxDiskUsedPct() * 100) + " %");
|
||||
// tag
|
||||
backendInfo.add(backend.getTag().toString());
|
||||
|
||||
// err msg
|
||||
backendInfo.add(backend.getHeartbeatErrMsg());
|
||||
// version
|
||||
backendInfo.add(backend.getVersion());
|
||||
// status
|
||||
backendInfo.add(new Gson().toJson(backend.getBackendStatus()));
|
||||
|
||||
comparableBackendInfos.add(backendInfo);
|
||||
}
|
||||
|
||||
// backends proc node get result too slow, add log to observer.
|
||||
LOG.info("backends proc get tablet num cost: {}, total cost: {}",
|
||||
watch.elapsed(TimeUnit.MILLISECONDS), (System.currentTimeMillis() - start));
|
||||
|
||||
LOG.debug("backends proc get tablet num cost: {}, total cost: {}",
|
||||
watch.elapsed(TimeUnit.MILLISECONDS), (System.currentTimeMillis() - start));
|
||||
|
||||
// sort by cluster name, host name
|
||||
ListComparator<List<Comparable>> comparator = new ListComparator<List<Comparable>>(1, 3);
|
||||
comparableBackendInfos.sort(comparator);
|
||||
|
||||
@ -17,9 +17,6 @@
|
||||
|
||||
package org.apache.doris.httpv2.restv2;
|
||||
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
|
||||
import org.apache.doris.catalog.Catalog;
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.Database;
|
||||
@ -38,21 +35,25 @@ import org.apache.doris.mysql.privilege.PrivPredicate;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.system.SystemInfoService;
|
||||
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
|
||||
import org.springframework.web.bind.annotation.PathVariable;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RequestMethod;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
|
||||
/**
|
||||
* And meta info like databases, tables and schema
|
||||
*/
|
||||
@ -138,7 +139,6 @@ public class MetaInfoActionV2 extends RestBaseController {
|
||||
return ResponseEntityBuilder.badRequest("Only support 'default_cluster' now");
|
||||
}
|
||||
|
||||
|
||||
String fullDbName = getFullDbName(dbName);
|
||||
Database db;
|
||||
try {
|
||||
|
||||
@ -318,6 +318,7 @@ public class JournalEntity implements Writable {
|
||||
}
|
||||
case OperationType.OP_ADD_BACKEND:
|
||||
case OperationType.OP_DROP_BACKEND:
|
||||
case OperationType.OP_MODIFY_BACKEND:
|
||||
case OperationType.OP_BACKEND_STATE_CHANGE: {
|
||||
data = Backend.read(in);
|
||||
isRead = true;
|
||||
|
||||
@ -286,6 +286,9 @@ public class MysqlProto {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
// set resource tag if has
|
||||
context.setResourceTags(Catalog.getCurrentCatalog().getAuth().getResourceTags(qualifiedUser));
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
@ -35,14 +35,14 @@ import org.apache.doris.load.DppConfig;
|
||||
import org.apache.doris.resource.Tag;
|
||||
import org.apache.doris.system.SystemInfoService;
|
||||
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
@ -288,10 +288,17 @@ public class UserProperty implements Writable {
|
||||
if (!keyArr[1].equals(Tag.TYPE_LOCATION)) {
|
||||
throw new DdlException("Only support location tag now");
|
||||
}
|
||||
try {
|
||||
resourceTags = parseLocationResoureTags(value);
|
||||
} catch (NumberFormatException e) {
|
||||
throw new DdlException(PROP_RESOURCE_TAGS + " parse failed: " + e.getMessage());
|
||||
|
||||
if (Strings.isNullOrEmpty(value)) {
|
||||
// This is for compatibility. empty value means to unset the resource tag property.
|
||||
// So that user will have permission to query all tags.
|
||||
resourceTags = Sets.newHashSet();
|
||||
} else {
|
||||
try {
|
||||
resourceTags = parseLocationResoureTags(value);
|
||||
} catch (NumberFormatException e) {
|
||||
throw new DdlException(PROP_RESOURCE_TAGS + " parse failed: " + e.getMessage());
|
||||
}
|
||||
}
|
||||
} else {
|
||||
throw new DdlException("Unknown user property(" + key + ")");
|
||||
|
||||
@ -17,7 +17,6 @@
|
||||
|
||||
package org.apache.doris.qe;
|
||||
|
||||
import com.google.common.base.Strings;
|
||||
import org.apache.doris.analysis.InsertStmt;
|
||||
import org.apache.doris.analysis.KillStmt;
|
||||
import org.apache.doris.analysis.SqlParser;
|
||||
@ -81,8 +80,6 @@ public class ConnectProcessor {
|
||||
this.ctx = context;
|
||||
}
|
||||
|
||||
|
||||
|
||||
// COM_INIT_DB: change current database of this session.
|
||||
private void handleInitDb() {
|
||||
String dbName = new String(packetBuf.array(), 1, packetBuf.limit() - 1);
|
||||
|
||||
@ -17,8 +17,6 @@
|
||||
|
||||
package org.apache.doris.common.proc;
|
||||
|
||||
import mockit.Expectations;
|
||||
import mockit.Mocked;
|
||||
import org.apache.doris.catalog.Catalog;
|
||||
import org.apache.doris.catalog.TabletInvertedIndex;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
@ -31,6 +29,9 @@ import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import mockit.Expectations;
|
||||
import mockit.Mocked;
|
||||
|
||||
public class BackendsProcDirTest {
|
||||
private Backend b1;
|
||||
private Backend b2;
|
||||
@ -184,5 +185,4 @@ public class BackendsProcDirTest {
|
||||
Assert.assertNotNull(result);
|
||||
Assert.assertTrue(result instanceof BaseProcResult);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -17,8 +17,6 @@
|
||||
|
||||
package org.apache.doris.common.proc;
|
||||
|
||||
import mockit.Expectations;
|
||||
import mockit.Mocked;
|
||||
import org.apache.doris.catalog.Catalog;
|
||||
import org.apache.doris.catalog.Database;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
@ -34,6 +32,9 @@ import org.junit.Test;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
import mockit.Expectations;
|
||||
import mockit.Mocked;
|
||||
|
||||
public class DbsProcDirTest {
|
||||
private Database db1;
|
||||
private Database db2;
|
||||
@ -185,10 +186,10 @@ public class DbsProcDirTest {
|
||||
Assert.assertTrue(result instanceof BaseProcResult);
|
||||
|
||||
Assert.assertEquals(Lists.newArrayList("DbId", "DbName", "TableNum", "Quota", "LastConsistencyCheckTime", "ReplicaQuota"),
|
||||
result.getColumnNames());
|
||||
result.getColumnNames());
|
||||
List<List<String>> rows = Lists.newArrayList();
|
||||
rows.add(Arrays.asList(String.valueOf(db1.getId()), db1.getFullName(), "0", "1024.000 GB", FeConstants.null_string, "1073741824"));
|
||||
rows.add(Arrays.asList(String.valueOf(db2.getId()), db2.getFullName(), "0", "1024.000 GB", FeConstants.null_string, "1073741824"));
|
||||
rows.add(Arrays.asList(String.valueOf(db1.getId()), db1.getFullName(), "0", "1024.000 TB", FeConstants.null_string, "1073741824"));
|
||||
rows.add(Arrays.asList(String.valueOf(db2.getId()), db2.getFullName(), "0", "1024.000 TB", FeConstants.null_string, "1073741824"));
|
||||
Assert.assertEquals(rows, result.getRows());
|
||||
}
|
||||
|
||||
|
||||
@ -31,6 +31,8 @@ import org.apache.doris.catalog.TabletMeta;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.common.proc.BackendsProcDir;
|
||||
import org.apache.doris.common.proc.ProcResult;
|
||||
import org.apache.doris.planner.OlapScanNode;
|
||||
import org.apache.doris.planner.PlanFragment;
|
||||
import org.apache.doris.planner.Planner;
|
||||
@ -190,6 +192,14 @@ public class DemoMultiBackendsTest {
|
||||
PlanFragment fragment = fragments.get(1);
|
||||
Assert.assertTrue(fragment.getPlanRoot() instanceof OlapScanNode);
|
||||
Assert.assertEquals(0, fragment.getChildren().size());
|
||||
|
||||
// test show backends;
|
||||
BackendsProcDir dir = new BackendsProcDir(Catalog.getCurrentSystemInfo());
|
||||
ProcResult result = dir.fetchResult();
|
||||
Assert.assertEquals(BackendsProcDir.TITLE_NAMES.size(), result.getColumnNames().size());
|
||||
Assert.assertEquals("{\"location\" : \"default\"}", result.getRows().get(0).get(19));
|
||||
Assert.assertEquals("{\"lastSuccessReportTabletsTime\":\"N/A\",\"lastStreamLoadTime\":-1}",
|
||||
result.getRows().get(0).get(BackendsProcDir.TITLE_NAMES.size() - 1));
|
||||
}
|
||||
|
||||
private static void updateReplicaPathHash() {
|
||||
|
||||
Reference in New Issue
Block a user