[Feature] Support for cleaning the trash actively (#6323)
This commit is contained in:
@ -235,7 +235,7 @@ parser code {:
|
||||
// Total keywords of doris
|
||||
terminal String KW_ADD, KW_ADMIN, KW_AFTER, KW_AGGREGATE, KW_ALIAS, KW_ALL, KW_ALTER, KW_AND, KW_ANTI, KW_APPEND, KW_AS, KW_ASC, KW_AUTHORS, KW_ARRAY,
|
||||
KW_BACKEND, KW_BACKUP, KW_BETWEEN, KW_BEGIN, KW_BIGINT, KW_BINLOG, KW_BITMAP, KW_BITMAP_UNION, KW_BOOLEAN, KW_BROKER, KW_BACKENDS, KW_BY, KW_BUILTIN,
|
||||
KW_CANCEL, KW_CASE, KW_CAST, KW_CHAIN, KW_CHAR, KW_CHARSET, KW_CHECK, KW_CLUSTER, KW_CLUSTERS,
|
||||
KW_CANCEL, KW_CASE, KW_CAST, KW_CHAIN, KW_CHAR, KW_CHARSET, KW_CHECK, KW_CLUSTER, KW_CLUSTERS, KW_CLEAN,
|
||||
KW_COLLATE, KW_COLLATION, KW_COLUMN, KW_COLON, KW_COLUMNS, KW_COMMENT, KW_COMMIT, KW_COMMITTED,
|
||||
KW_CONFIG, KW_CONNECTION, KW_CONNECTION_ID, KW_CONSISTENT, KW_CONVERT, KW_COUNT, KW_CREATE, KW_CROSS, KW_CUBE, KW_CURRENT, KW_CURRENT_USER,
|
||||
KW_DATA, KW_DATABASE, KW_DATABASES, KW_DATE, KW_DATETIME, KW_DAY, KW_DECIMAL, KW_DECOMMISSION, KW_DEFAULT, KW_DESC, KW_DESCRIBE,
|
||||
@ -4940,6 +4940,14 @@ admin_stmt ::=
|
||||
{:
|
||||
RESULT = new AdminShowDataSkewStmt(table_ref);
|
||||
:}
|
||||
| KW_ADMIN KW_CLEAN KW_TRASH KW_ON LPAREN string_list:backends RPAREN
|
||||
{:
|
||||
RESULT = new AdminCleanTrashStmt(backends);
|
||||
:}
|
||||
| KW_ADMIN KW_CLEAN KW_TRASH
|
||||
{:
|
||||
RESULT = new AdminCleanTrashStmt(null);
|
||||
:}
|
||||
;
|
||||
|
||||
truncate_stmt ::=
|
||||
|
||||
@ -0,0 +1,73 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.analysis;
|
||||
|
||||
import org.apache.doris.catalog.Catalog;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.ErrorCode;
|
||||
import org.apache.doris.common.ErrorReport;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.system.Backend;
|
||||
import org.apache.doris.mysql.privilege.PrivPredicate;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class AdminCleanTrashStmt extends DdlStmt {
|
||||
private List<Backend> backends = Lists.newArrayList();
|
||||
|
||||
public AdminCleanTrashStmt(List<String> backends) {
|
||||
ImmutableMap<Long, Backend> backendsInfo = Catalog.getCurrentSystemInfo().getIdToBackend();
|
||||
Map<String, Long> backendsID = new HashMap<String, Long>();
|
||||
for (Backend backend : backendsInfo.values()) {
|
||||
backendsID.put(String.valueOf(backend.getHost()) + ":" + String.valueOf(backend.getHeartbeatPort()), backend.getId());
|
||||
}
|
||||
if (backends == null) {
|
||||
for (Backend backend : backendsInfo.values()) {
|
||||
this.backends.add(backend);
|
||||
}
|
||||
} else {
|
||||
for (String backend : backends) {
|
||||
if (backendsID.get(backend) != null) {
|
||||
this.backends.add(backendsInfo.get(backendsID.get(backend)));
|
||||
backendsID.remove(backend); // avoid repetition
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public List<Backend> getBackends() {
|
||||
return backends;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void analyze(Analyzer analyzer) throws AnalysisException {
|
||||
if (!Catalog.getCurrentCatalog().getAuth().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) {
|
||||
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public RedirectStatus getRedirectStatus() {
|
||||
return RedirectStatus.NO_FORWARD;
|
||||
}
|
||||
}
|
||||
@ -43,6 +43,7 @@ import org.apache.doris.analysis.BackupStmt;
|
||||
import org.apache.doris.analysis.CancelAlterSystemStmt;
|
||||
import org.apache.doris.analysis.CancelAlterTableStmt;
|
||||
import org.apache.doris.analysis.CancelBackupStmt;
|
||||
import org.apache.doris.analysis.AdminCleanTrashStmt;
|
||||
import org.apache.doris.analysis.ColumnRenameClause;
|
||||
import org.apache.doris.analysis.CreateClusterStmt;
|
||||
import org.apache.doris.analysis.CreateDbStmt;
|
||||
@ -104,6 +105,7 @@ import org.apache.doris.cluster.ClusterNamespace;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.ConfigBase;
|
||||
import org.apache.doris.common.ClientPool;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.ErrorCode;
|
||||
import org.apache.doris.common.ErrorReport;
|
||||
@ -217,11 +219,13 @@ import org.apache.doris.task.AgentTaskQueue;
|
||||
import org.apache.doris.task.CreateReplicaTask;
|
||||
import org.apache.doris.task.DropReplicaTask;
|
||||
import org.apache.doris.task.MasterTaskExecutor;
|
||||
import org.apache.doris.thrift.BackendService;
|
||||
import org.apache.doris.thrift.TStorageFormat;
|
||||
import org.apache.doris.thrift.TStorageMedium;
|
||||
import org.apache.doris.thrift.TStorageType;
|
||||
import org.apache.doris.thrift.TTabletType;
|
||||
import org.apache.doris.thrift.TTaskType;
|
||||
import org.apache.doris.thrift.TNetworkAddress;
|
||||
import org.apache.doris.transaction.DbUsedDataQuotaInfoCollector;
|
||||
import org.apache.doris.transaction.GlobalTransactionMgr;
|
||||
import org.apache.doris.transaction.PublishVersionDaemon;
|
||||
@ -7073,4 +7077,27 @@ public class Catalog {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void cleanTrash(AdminCleanTrashStmt stmt) {
|
||||
List<Backend> backends = stmt.getBackends();
|
||||
for (Backend backend : backends){
|
||||
BackendService.Client client = null;
|
||||
TNetworkAddress address = null;
|
||||
boolean ok = false;
|
||||
try {
|
||||
address = new TNetworkAddress(backend.getHost(), backend.getBePort());
|
||||
client = ClientPool.backendPool.borrowObject(address);
|
||||
client.cleanTrash(); // async
|
||||
ok = true;
|
||||
} catch (Exception e) {
|
||||
LOG.warn("trash clean exec error. backend[{}]", backend.getId(), e);
|
||||
} finally {
|
||||
if (ok) {
|
||||
ClientPool.backendPool.returnObject(address, client);
|
||||
} else {
|
||||
ClientPool.backendPool.invalidateObject(address, client);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -34,6 +34,7 @@ import org.apache.doris.analysis.CancelAlterSystemStmt;
|
||||
import org.apache.doris.analysis.CancelAlterTableStmt;
|
||||
import org.apache.doris.analysis.CancelBackupStmt;
|
||||
import org.apache.doris.analysis.CancelLoadStmt;
|
||||
import org.apache.doris.analysis.AdminCleanTrashStmt;
|
||||
import org.apache.doris.analysis.CreateClusterStmt;
|
||||
import org.apache.doris.analysis.CreateDataSyncJobStmt;
|
||||
import org.apache.doris.analysis.CreateDbStmt;
|
||||
@ -260,6 +261,8 @@ public class DdlExecutor {
|
||||
catalog.getSyncJobManager().pauseSyncJob((PauseSyncJobStmt) ddlStmt);
|
||||
} else if (ddlStmt instanceof StopSyncJobStmt) {
|
||||
catalog.getSyncJobManager().stopSyncJob((StopSyncJobStmt) ddlStmt);
|
||||
} else if (ddlStmt instanceof AdminCleanTrashStmt) {
|
||||
catalog.cleanTrash((AdminCleanTrashStmt) ddlStmt);
|
||||
} else {
|
||||
throw new DdlException("Unknown statement.");
|
||||
}
|
||||
|
||||
@ -126,6 +126,7 @@ import org.apache.doris.qe.SqlModeHelper;
|
||||
keywordMap.put("character", new Integer(SqlParserSymbols.KW_CHAR));
|
||||
keywordMap.put("charset", new Integer(SqlParserSymbols.KW_CHARSET));
|
||||
keywordMap.put("check", new Integer(SqlParserSymbols.KW_CHECK));
|
||||
keywordMap.put("clean", new Integer(SqlParserSymbols.KW_CLEAN));
|
||||
keywordMap.put("cluster", new Integer(SqlParserSymbols.KW_CLUSTER));
|
||||
keywordMap.put("clusters", new Integer(SqlParserSymbols.KW_CLUSTERS));
|
||||
keywordMap.put("collate", new Integer(SqlParserSymbols.KW_COLLATE));
|
||||
|
||||
@ -238,6 +238,10 @@ public class GenericPoolTest {
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
@Override
|
||||
public void cleanTrash() throws TException {
|
||||
// TODO Auto-generated method stub
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
|
||||
@ -284,6 +284,11 @@ public class MockedBackendFactory {
|
||||
public TStreamLoadRecordResult getStreamLoadRecord(long last_stream_record_time) throws TException {
|
||||
return new TStreamLoadRecordResult(Maps.newHashMap());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cleanTrash() throws TException {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// The default Brpc service.
|
||||
|
||||
Reference in New Issue
Block a user