From 8a267f1ac55a4215633e489f8283f11c8e585715 Mon Sep 17 00:00:00 2001 From: Pxl <952130278@qq.com> Date: Thu, 12 Aug 2021 10:07:51 +0800 Subject: [PATCH] [Feature] Support for cleaning the trash actively (#6323) --- be/src/olap/olap_server.cpp | 2 +- be/src/olap/storage_engine.cpp | 15 +- be/src/olap/storage_engine.h | 7 +- be/src/service/backend_service.cpp | 3 + be/src/service/backend_service.h | 2 + docs/.vuepress/sidebar/en.js | 2 + docs/.vuepress/sidebar/zh-CN.js | 1 + .../operation/disk-capacity.md | 169 ++++++++++++++++++ .../Administration/ADMIN CLEAN TRASH.md | 47 +++++ .../operation/disk-capacity.md | 15 ++ .../Administration/ADMIN CLEAN TRASH.md | 47 +++++ fe/fe-core/src/main/cup/sql_parser.cup | 10 +- .../doris/analysis/AdminCleanTrashStmt.java | 73 ++++++++ .../org/apache/doris/catalog/Catalog.java | 27 +++ .../java/org/apache/doris/qe/DdlExecutor.java | 3 + fe/fe-core/src/main/jflex/sql_scanner.flex | 1 + .../apache/doris/common/GenericPoolTest.java | 4 + .../doris/utframe/MockedBackendFactory.java | 5 + gensrc/thrift/BackendService.thrift | 1 + 19 files changed, 427 insertions(+), 7 deletions(-) create mode 100644 docs/en/administrator-guide/operation/disk-capacity.md create mode 100644 docs/en/sql-reference/sql-statements/Administration/ADMIN CLEAN TRASH.md create mode 100644 docs/zh-CN/sql-reference/sql-statements/Administration/ADMIN CLEAN TRASH.md create mode 100644 fe/fe-core/src/main/java/org/apache/doris/analysis/AdminCleanTrashStmt.java diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp index ea6904de7c..bad51dc728 100644 --- a/be/src/olap/olap_server.cpp +++ b/be/src/olap/olap_server.cpp @@ -174,7 +174,7 @@ void StorageEngine::_garbage_sweeper_thread_callback() { curr_interval = std::min(curr_interval, max_interval); // start clean trash and update usage. - OLAPStatus res = _start_trash_sweep(&usage); + OLAPStatus res = start_trash_sweep(&usage); if (res != OLAP_SUCCESS) { OLAP_LOG_WARNING( "one or more errors occur when sweep trash." diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index 2cbb38713a..4983c78380 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -635,15 +635,24 @@ void StorageEngine::_start_clean_fd_cache() { VLOG_TRACE << "end clean file descritpor cache"; } -OLAPStatus StorageEngine::_start_trash_sweep(double* usage) { +OLAPStatus StorageEngine::start_trash_sweep(double* usage, bool ignore_guard) { OLAPStatus res = OLAP_SUCCESS; + + std::unique_lock l(_trash_sweep_lock,std::defer_lock); + if(!l.try_lock()) { + LOG(INFO) << "trash and snapshot sweep is running."; + return res; + } + LOG(INFO) << "start trash and snapshot sweep."; const int32_t snapshot_expire = config::snapshot_expire_time_sec; const int32_t trash_expire = config::trash_file_expire_time_sec; // the guard space should be lower than storage_flood_stage_usage_percent, // so here we multiply 0.9 - const double guard_space = config::storage_flood_stage_usage_percent / 100.0 * 0.9; + // if ignore_guard is true, set guard_space to 0. + const double guard_space = + ignore_guard ? 0 : config::storage_flood_stage_usage_percent / 100.0 * 0.9; std::vector data_dir_infos; RETURN_NOT_OK_LOG(get_all_data_dir_info(&data_dir_infos, false), "failed to get root path stat info when sweep trash.") @@ -687,7 +696,7 @@ OLAPStatus StorageEngine::_start_trash_sweep(double* usage) { } if (usage != nullptr) { - *usage = tmp_usage; + *usage = tmp_usage; // update usage } // clear expire incremental rowset, move deleted tablet to trash diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index feb2b1f1d8..cfe7e8792d 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -170,6 +170,10 @@ public: // start all background threads. This should be call after env is ready. Status start_bg_threads(); + // clear trash and snapshot file + // option: update disk usage after sweep + OLAPStatus start_trash_sweep(double* usage, bool ignore_guard = false); + void stop(); void create_cumulative_compaction(TabletSharedPtr best_tablet, @@ -238,8 +242,6 @@ private: void _start_clean_fd_cache(); - // 清理trash和snapshot文件,返回清理后的磁盘使用量 - OLAPStatus _start_trash_sweep(double* usage); // 磁盘状态监测。监测unused_flag路劲新的对应root_path unused标识位, // 当检测到有unused标识时,从内存中删除对应表信息,磁盘数据不动。 // 当磁盘状态为不可用,但未检测到unused标识时,需要从root_path上 @@ -291,6 +293,7 @@ private: EngineOptions _options; std::mutex _store_lock; + std::mutex _trash_sweep_lock; std::map _store_map; uint32_t _available_storage_medium_type_count; diff --git a/be/src/service/backend_service.cpp b/be/src/service/backend_service.cpp index 87ad432d1c..61a91f0ebc 100644 --- a/be/src/service/backend_service.cpp +++ b/be/src/service/backend_service.cpp @@ -373,4 +373,7 @@ void BackendService::get_stream_load_record(TStreamLoadRecordResult& result, } } +void BackendService::clean_trash() { + StorageEngine::instance()->start_trash_sweep(nullptr, true); +} } // namespace doris diff --git a/be/src/service/backend_service.h b/be/src/service/backend_service.h index 1464fe46f6..36e908e65f 100644 --- a/be/src/service/backend_service.h +++ b/be/src/service/backend_service.h @@ -148,6 +148,8 @@ public: virtual void get_stream_load_record(TStreamLoadRecordResult& result, const int64_t last_stream_record_time) override; + + virtual void clean_trash() override; private: Status start_plan_fragment_execution(const TExecPlanFragmentParams& exec_params); diff --git a/docs/.vuepress/sidebar/en.js b/docs/.vuepress/sidebar/en.js index 8528f19527..96090cfb10 100644 --- a/docs/.vuepress/sidebar/en.js +++ b/docs/.vuepress/sidebar/en.js @@ -154,6 +154,7 @@ module.exports = [ directoryPath: "operation/", children: [ "doris-error-code", + "disk-capacity", "metadata-operation", "monitor-alert", "multi-tenant", @@ -439,6 +440,7 @@ module.exports = [ directoryPath: "Administration/", children: [ "ADMIN CANCEL REPAIR", + "ADMIN CLEAN TRASH", "ADMIN CHECK TABLET", "ADMIN REPAIR", "ADMIN SET CONFIG", diff --git a/docs/.vuepress/sidebar/zh-CN.js b/docs/.vuepress/sidebar/zh-CN.js index 296f7431de..f2fbb5f574 100644 --- a/docs/.vuepress/sidebar/zh-CN.js +++ b/docs/.vuepress/sidebar/zh-CN.js @@ -444,6 +444,7 @@ module.exports = [ directoryPath: "Administration/", children: [ "ADMIN CANCEL REPAIR", + "ADMIN CLEAN TRASH", "ADMIN CHECK TABLET", "ADMIN REPAIR", "ADMIN SET CONFIG", diff --git a/docs/en/administrator-guide/operation/disk-capacity.md b/docs/en/administrator-guide/operation/disk-capacity.md new file mode 100644 index 0000000000..71027d9f6e --- /dev/null +++ b/docs/en/administrator-guide/operation/disk-capacity.md @@ -0,0 +1,169 @@ +--- +{ + "title": "Disk Capacity Management", + "language": "en" +} +--- + + + +# Disk Capacity Management + +This document mainly introduces system parameters and processing strategies related to disk storage capacity. + +If Doris' data disk capacity is not controlled, the process will hang because the disk is full. Therefore, we monitor the disk usage and remaining capacity, and control various operations in the Doris system by setting different warning levels, and try to avoid the situation where the disk is full. + +## Glossary + +* FE:Doris Frontend Node. Responsible for metadata management and request access. +* BE:Doris Backend Node. Responsible for query execution and data storage. +* Data Dir:Data directory, each data directory specified in the `storage_root_path` of the BE configuration file `be.conf`. Usually a data directory corresponds to a disk, so the following **disk** also refers to a data directory. + +## Basic Principles + +BE will report disk usage to FE on a regular basis (every minute). FE records these statistical values and restricts various operation requests based on these statistical values. + +Two thresholds, **High Watermark** and **Flood Stage**, are set in FE. Flood Stage is higher than High Watermark. When the disk usage is higher than High Watermark, Doris will restrict the execution of certain operations (such as replica balancing, etc.). If it is higher than Flood Stage, certain operations (such as load data) will be prohibited. + +At the same time, a **Flood Stage** is also set on the BE. Taking into account that FE cannot fully detect the disk usage on BE in a timely manner, and cannot control certain BE operations (such as Compaction). Therefore, Flood Stage on the BE is used for the BE to actively refuse and stop certain operations to achieve the purpose of self-protection. + +## FE Parameter + +**High Watermark:** + +``` +storage_high_watermark_usage_percent: default value is 85 (85%). +storage_min_left_capacity_bytes: default value is 2GB. +``` + +When disk capacity **more than** `storage_high_watermark_usage_percent`, **or** disk free capacity **less than** `storage_min_left_capacity_bytes`, the disk will no longer be used as the destination path for the following operations: + +* Tablet Balance +* Colocation Relocation +* Decommission + +**Flood Stage:** + +``` +storage_flood_stage_usage_percent: default value is 95 (95%). +storage_flood_stage_left_capacity_bytes: default value is 1GB. +``` + +When disk capacity **more than** `storage_flood_stage_usage_percent`, **or** disk free capacity **less than** `storage_flood_stage_left_capacity_bytes`, the disk will no longer be used as the destination path for the following operations: + +* Tablet Balance +* Colocation Relocation +* Replica make up +* Restore +* Load/Insert + +## BE Parameter + +**Flood Stage:** + +``` +capacity_used_percent_flood_stage: default value is 95 (95%). +capacity_min_left_bytes_flood_stage: default value is 1GB. +``` + +When disk capacity **more than** `storage_flood_stage_usage_percent`, **and** disk free capacity **less than** `storage_flood_stage_left_capacity_bytes`, the following operations on this disk will be prohibited: + +* Base/Cumulative Compaction +* Data load +* Clone Task (Usually occurs when the replica is repaired or balanced.) +* Push Task (Occurs during the Loading phase of Hadoop import, and the file is downloaded. ) +* Alter Task (Schema Change or Rollup Task.) +* Download Task (The Downloading phase of the recovery operation.) + +## Disk Capacity Release + +When the disk capacity is higher than High Watermark or even Flood Stage, many operations will be prohibited. At this time, you can try to reduce the disk usage and restore the system in the following ways. + +* Delete table or partition + + By deleting tables or partitions, you can quickly reduce the disk space usage and restore the cluster. + **Note: Only the `DROP` operation can achieve the purpose of quickly reducing the disk space usage, the `DELETE` operation cannot.** + + ``` + DROP TABLE tbl; + ALTER TABLE tbl DROP PARTITION p1; + ``` + +* BE expansion + + After backend expansion, data tablets will be automatically balanced to BE nodes with lower disk usage. The expansion operation will make the cluster reach a balanced state in a few hours or days depending on the amount of data and the number of nodes. + +* Modify replica of a table or partition + + You can reduce the number of replica of a table or partition. For example, the default 3 replica can be reduced to 2 replica. Although this method reduces the reliability of the data, it can quickly reduce the disk usage rate and restore the cluster to normal. + This method is usually used in emergency recovery systems. Please restore the number of copies to 3 after reducing the disk usage rate by expanding or deleting data after recovery. + Modifying the replica operation takes effect instantly, and the backends will automatically and asynchronously delete the redundant replica. + + ``` + ALTER TABLE tbl MODIFY PARTITION p1 SET("replication_num" = "2"); + ``` + +* Delete unnecessary files + + When the BE has crashed because the disk is full and cannot be started (this phenomenon may occur due to untimely detection of FE or BE), you need to delete some temporary files in the data directory to ensure that the BE process can start. + Files in the following directories can be deleted directly: + + * log/:Log files in the log directory. + * snapshot/: Snapshot files in the snapshot directory. + * trash/ Trash files in the trash directory. + + **This operation will affect [Restore data from BE Recycle Bin](./tablet-restore-tool.md).** + + If the BE can still be started, you can use `ADMIN CLEAN TRASH ON(BackendHost:BackendHeartBeatPort);` to actively clean up temporary files. **all trash files** and expired snapshot files will be cleaned up, **This will affect the operation of restoring data from the trash bin**. + + + If you do not manually execute `ADMIN CLEAN TRASH`, the system will still automatically execute the cleanup within a few minutes to tens of minutes.There are two situations as follows: + * If the disk usage does not reach 90% of the **Flood Stage**, expired trash files and expired snapshot files will be cleaned up. At this time, some recent files will be retained without affecting the recovery of data. + * If the disk usage has reached 90% of the **Flood Stage**, **all trash files** and expired snapshot files will be cleaned up, **This will affect the operation of restoring data from the trash bin**. + + The time interval for automatic execution can be changed by `max_garbage_sweep_interval` and `max_garbage_sweep_interval` in the configuration items. + + When the recovery fails due to lack of trash files, the following results may be returned: + + ``` + {"status": "Fail","msg": "can find tablet path in trash"} + ``` + +* Delete data file (dangerous!!!) + + When none of the above operations can free up capacity, you need to delete data files to free up space. The data file is in the `data/` directory of the specified data directory. To delete a tablet, you must first ensure that at least one replica of the tablet is normal, otherwise **deleting the only replica will result in data loss**. + + Suppose we want to delete the tablet with id 12345: + + * Find the directory corresponding to Tablet, usually under `data/shard_id/tablet_id/`. like: + + ```data/0/12345/``` + + * Record the tablet id and schema hash. The schema hash is the name of the next-level directory of the previous step. The following is 352781111: + + ```data/0/12345/352781111``` + + * Delete the data directory: + + ```rm -rf data/0/12345/``` + + * Delete tablet metadata (refer to [Tablet metadata management tool](./tablet-meta-tool.md)) + + ```./lib/meta_tool --operation=delete_header --root_path=/path/to/root_path --tablet_id=12345 --schema_hash= 352781111``` \ No newline at end of file diff --git a/docs/en/sql-reference/sql-statements/Administration/ADMIN CLEAN TRASH.md b/docs/en/sql-reference/sql-statements/Administration/ADMIN CLEAN TRASH.md new file mode 100644 index 0000000000..0511a3e47b --- /dev/null +++ b/docs/en/sql-reference/sql-statements/Administration/ADMIN CLEAN TRASH.md @@ -0,0 +1,47 @@ +--- +{ + "title": "ADMIN CLEAN TRASH", + "language": "en" +} +--- + + + +# ADMIN CLEAN TRASH +## description + This statement is used to clean up the trash data in the backend. + Grammar: + ADMIN CLEAN TRASH [ON ("BackendHost1:BackendHeartBeatPort1", "BackendHost2:BackendHeartBeatPort2", ...)]; + + Explain: + Take BackendHost:BackendHeartBeatPort to indicate the backend that needs to be cleaned up, and clean up all backends without adding the on limit. + +## example + + 1. Clean up the trash data of all be nodes. + + ADMIN CLEAN TRASH; + + 2. Clean up the trash data of '192.168.0.1:9050' and '192.168.0.2:9050'. + + ADMIN CLEAN TRASH ON ("192.168.0.1:9050","192.168.0.2:9050"); + +## keyword + ADMIN, CLEAN, TRASH diff --git a/docs/zh-CN/administrator-guide/operation/disk-capacity.md b/docs/zh-CN/administrator-guide/operation/disk-capacity.md index e3e7f5ba50..8616f63575 100644 --- a/docs/zh-CN/administrator-guide/operation/disk-capacity.md +++ b/docs/zh-CN/administrator-guide/operation/disk-capacity.md @@ -127,6 +127,21 @@ capacity_min_left_bytes_flood_stage 默认 1GB。 * snapshot/: 快照目录下的快照文件。 * trash/:回收站中的文件。 + **这种操作会对 [从 BE 回收站中恢复数据](./tablet-restore-tool.md) 产生影响。** + + 如果BE还能够启动,则可以使用`ADMIN CLEAN TRASH ON(BackendHost:BackendHeartBeatPort);`来主动清理临时文件,会清理 **所有** trash文件和过期snapshot文件,**这将影响从回收站恢复数据的操作** 。 + + 如果不手动执行`ADMIN CLEAN TRASH`,系统仍将会在几分钟至几十分钟内自动执行清理,这里分为两种情况: + * 如果磁盘占用未达到 **危险水位(Flood Stage)** 的90%,则会清理过期trash文件和过期snapshot文件,此时会保留一些近期文件而不影响恢复数据。 + * 如果磁盘占用已达到 **危险水位(Flood Stage)** 的90%,则会清理 **所有** trash文件和过期snapshot文件, **此时会影响从回收站恢复数据的操作** 。 + 自动执行的时间间隔可以通过配置项中的`max_garbage_sweep_interval`和`max_garbage_sweep_interval`更改。 + + 出现由于缺少trash文件而导致恢复失败的情况时,可能返回如下结果: + + ``` + {"status": "Fail","msg": "can find tablet path in trash"} + ``` + * 删除数据文件(危险!!!) 当以上操作都无法释放空间时,需要通过删除数据文件来释放空间。数据文件在指定数据目录的 `data/` 目录下。删除数据分片(Tablet)必须先确保该 Tablet 至少有一个副本是正常的,否则**删除唯一副本会导致数据丢失**。假设我们要删除 id 为 12345 的 Tablet: diff --git a/docs/zh-CN/sql-reference/sql-statements/Administration/ADMIN CLEAN TRASH.md b/docs/zh-CN/sql-reference/sql-statements/Administration/ADMIN CLEAN TRASH.md new file mode 100644 index 0000000000..04e49bd7e6 --- /dev/null +++ b/docs/zh-CN/sql-reference/sql-statements/Administration/ADMIN CLEAN TRASH.md @@ -0,0 +1,47 @@ +--- +{ + "title": "ADMIN CLEAN TRASH", + "language": "zh-CN" +} +--- + + + +# ADMIN CLEAN TRASH +## description + 该语句用于清理 backend 内的垃圾数据。 + 语法: + ADMIN CLEAN TRASH [ON ("BackendHost1:BackendHeartBeatPort1", "BackendHost2:BackendHeartBeatPort2", ...)]; + + 说明: + 以 BackendHost:BackendHeartBeatPort 表示需要清理的 backend ,不添加on限定则清理所有 backend 。 + +## example + + 1. 清理所有be节点的垃圾数据。 + + ADMIN CLEAN TRASH; + + 2. 清理'192.168.0.1:9050'和'192.168.0.2:9050'的垃圾数据。 + + ADMIN CLEAN TRASH ON ("192.168.0.1:9050","192.168.0.2:9050"); + +## keyword + ADMIN, CLEAN, TRASH diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup index d29bee84fa..c360419d0d 100644 --- a/fe/fe-core/src/main/cup/sql_parser.cup +++ b/fe/fe-core/src/main/cup/sql_parser.cup @@ -235,7 +235,7 @@ parser code {: // Total keywords of doris terminal String KW_ADD, KW_ADMIN, KW_AFTER, KW_AGGREGATE, KW_ALIAS, KW_ALL, KW_ALTER, KW_AND, KW_ANTI, KW_APPEND, KW_AS, KW_ASC, KW_AUTHORS, KW_ARRAY, KW_BACKEND, KW_BACKUP, KW_BETWEEN, KW_BEGIN, KW_BIGINT, KW_BINLOG, KW_BITMAP, KW_BITMAP_UNION, KW_BOOLEAN, KW_BROKER, KW_BACKENDS, KW_BY, KW_BUILTIN, - KW_CANCEL, KW_CASE, KW_CAST, KW_CHAIN, KW_CHAR, KW_CHARSET, KW_CHECK, KW_CLUSTER, KW_CLUSTERS, + KW_CANCEL, KW_CASE, KW_CAST, KW_CHAIN, KW_CHAR, KW_CHARSET, KW_CHECK, KW_CLUSTER, KW_CLUSTERS, KW_CLEAN, KW_COLLATE, KW_COLLATION, KW_COLUMN, KW_COLON, KW_COLUMNS, KW_COMMENT, KW_COMMIT, KW_COMMITTED, KW_CONFIG, KW_CONNECTION, KW_CONNECTION_ID, KW_CONSISTENT, KW_CONVERT, KW_COUNT, KW_CREATE, KW_CROSS, KW_CUBE, KW_CURRENT, KW_CURRENT_USER, KW_DATA, KW_DATABASE, KW_DATABASES, KW_DATE, KW_DATETIME, KW_DAY, KW_DECIMAL, KW_DECOMMISSION, KW_DEFAULT, KW_DESC, KW_DESCRIBE, @@ -4940,6 +4940,14 @@ admin_stmt ::= {: RESULT = new AdminShowDataSkewStmt(table_ref); :} + | KW_ADMIN KW_CLEAN KW_TRASH KW_ON LPAREN string_list:backends RPAREN + {: + RESULT = new AdminCleanTrashStmt(backends); + :} + | KW_ADMIN KW_CLEAN KW_TRASH + {: + RESULT = new AdminCleanTrashStmt(null); + :} ; truncate_stmt ::= diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AdminCleanTrashStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AdminCleanTrashStmt.java new file mode 100644 index 0000000000..1f798f0ce9 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AdminCleanTrashStmt.java @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +import org.apache.doris.catalog.Catalog; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.ErrorReport; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.system.Backend; +import org.apache.doris.mysql.privilege.PrivPredicate; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class AdminCleanTrashStmt extends DdlStmt { + private List backends = Lists.newArrayList(); + + public AdminCleanTrashStmt(List backends) { + ImmutableMap backendsInfo = Catalog.getCurrentSystemInfo().getIdToBackend(); + Map backendsID = new HashMap(); + for (Backend backend : backendsInfo.values()) { + backendsID.put(String.valueOf(backend.getHost()) + ":" + String.valueOf(backend.getHeartbeatPort()), backend.getId()); + } + if (backends == null) { + for (Backend backend : backendsInfo.values()) { + this.backends.add(backend); + } + } else { + for (String backend : backends) { + if (backendsID.get(backend) != null) { + this.backends.add(backendsInfo.get(backendsID.get(backend))); + backendsID.remove(backend); // avoid repetition + } + } + } + } + + public List getBackends() { + return backends; + } + + @Override + public void analyze(Analyzer analyzer) throws AnalysisException { + if (!Catalog.getCurrentCatalog().getAuth().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN"); + } + } + + @Override + public RedirectStatus getRedirectStatus() { + return RedirectStatus.NO_FORWARD; + } +} \ No newline at end of file diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java index 35c5d621c2..fdc91477bd 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java @@ -43,6 +43,7 @@ import org.apache.doris.analysis.BackupStmt; import org.apache.doris.analysis.CancelAlterSystemStmt; import org.apache.doris.analysis.CancelAlterTableStmt; import org.apache.doris.analysis.CancelBackupStmt; +import org.apache.doris.analysis.AdminCleanTrashStmt; import org.apache.doris.analysis.ColumnRenameClause; import org.apache.doris.analysis.CreateClusterStmt; import org.apache.doris.analysis.CreateDbStmt; @@ -104,6 +105,7 @@ import org.apache.doris.cluster.ClusterNamespace; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.ConfigBase; +import org.apache.doris.common.ClientPool; import org.apache.doris.common.DdlException; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; @@ -217,11 +219,13 @@ import org.apache.doris.task.AgentTaskQueue; import org.apache.doris.task.CreateReplicaTask; import org.apache.doris.task.DropReplicaTask; import org.apache.doris.task.MasterTaskExecutor; +import org.apache.doris.thrift.BackendService; import org.apache.doris.thrift.TStorageFormat; import org.apache.doris.thrift.TStorageMedium; import org.apache.doris.thrift.TStorageType; import org.apache.doris.thrift.TTabletType; import org.apache.doris.thrift.TTaskType; +import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.transaction.DbUsedDataQuotaInfoCollector; import org.apache.doris.transaction.GlobalTransactionMgr; import org.apache.doris.transaction.PublishVersionDaemon; @@ -7073,4 +7077,27 @@ public class Catalog { } } } + + public void cleanTrash(AdminCleanTrashStmt stmt) { + List backends = stmt.getBackends(); + for (Backend backend : backends){ + BackendService.Client client = null; + TNetworkAddress address = null; + boolean ok = false; + try { + address = new TNetworkAddress(backend.getHost(), backend.getBePort()); + client = ClientPool.backendPool.borrowObject(address); + client.cleanTrash(); // async + ok = true; + } catch (Exception e) { + LOG.warn("trash clean exec error. backend[{}]", backend.getId(), e); + } finally { + if (ok) { + ClientPool.backendPool.returnObject(address, client); + } else { + ClientPool.backendPool.invalidateObject(address, client); + } + } + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java index f9e76ec169..1d2a5f5902 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java @@ -34,6 +34,7 @@ import org.apache.doris.analysis.CancelAlterSystemStmt; import org.apache.doris.analysis.CancelAlterTableStmt; import org.apache.doris.analysis.CancelBackupStmt; import org.apache.doris.analysis.CancelLoadStmt; +import org.apache.doris.analysis.AdminCleanTrashStmt; import org.apache.doris.analysis.CreateClusterStmt; import org.apache.doris.analysis.CreateDataSyncJobStmt; import org.apache.doris.analysis.CreateDbStmt; @@ -260,6 +261,8 @@ public class DdlExecutor { catalog.getSyncJobManager().pauseSyncJob((PauseSyncJobStmt) ddlStmt); } else if (ddlStmt instanceof StopSyncJobStmt) { catalog.getSyncJobManager().stopSyncJob((StopSyncJobStmt) ddlStmt); + } else if (ddlStmt instanceof AdminCleanTrashStmt) { + catalog.cleanTrash((AdminCleanTrashStmt) ddlStmt); } else { throw new DdlException("Unknown statement."); } diff --git a/fe/fe-core/src/main/jflex/sql_scanner.flex b/fe/fe-core/src/main/jflex/sql_scanner.flex index 589f57f0f4..5e99627e3f 100644 --- a/fe/fe-core/src/main/jflex/sql_scanner.flex +++ b/fe/fe-core/src/main/jflex/sql_scanner.flex @@ -126,6 +126,7 @@ import org.apache.doris.qe.SqlModeHelper; keywordMap.put("character", new Integer(SqlParserSymbols.KW_CHAR)); keywordMap.put("charset", new Integer(SqlParserSymbols.KW_CHARSET)); keywordMap.put("check", new Integer(SqlParserSymbols.KW_CHECK)); + keywordMap.put("clean", new Integer(SqlParserSymbols.KW_CLEAN)); keywordMap.put("cluster", new Integer(SqlParserSymbols.KW_CLUSTER)); keywordMap.put("clusters", new Integer(SqlParserSymbols.KW_CLUSTERS)); keywordMap.put("collate", new Integer(SqlParserSymbols.KW_COLLATE)); diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java b/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java index e134e812b9..3247e5e09c 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java @@ -238,6 +238,10 @@ public class GenericPoolTest { // TODO Auto-generated method stub return null; } + @Override + public void cleanTrash() throws TException { + // TODO Auto-generated method stub + } } @Test diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java index f14d59b3e4..94ecb9b458 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java @@ -284,6 +284,11 @@ public class MockedBackendFactory { public TStreamLoadRecordResult getStreamLoadRecord(long last_stream_record_time) throws TException { return new TStreamLoadRecordResult(Maps.newHashMap()); } + + @Override + public void cleanTrash() throws TException { + return; + } } // The default Brpc service. diff --git a/gensrc/thrift/BackendService.thrift b/gensrc/thrift/BackendService.thrift index 1288a2bed2..717fb020c2 100644 --- a/gensrc/thrift/BackendService.thrift +++ b/gensrc/thrift/BackendService.thrift @@ -171,4 +171,5 @@ service BackendService { TStreamLoadRecordResult get_stream_load_record(1: i64 last_stream_record_time); + oneway void clean_trash(); }