[Fix](statistics)Handle external table in statistics cleaner. (#23843)

Before, Statistics Cleaner only handles olap db and table.
External db and tables would be removed without verification. So that external stats could stored no more than 2 days, which is the interval of Stats cleaner thread.
This pr is to add verification for external db and tables.
This commit is contained in:
Jibing-Li
2023-09-08 09:43:46 +08:00
committed by GitHub
parent cb43f07487
commit fb5a77b726
8 changed files with 162 additions and 17 deletions

View File

@ -851,7 +851,8 @@ public class Database extends MetaObject implements Writable, DatabaseIf<Table>
return null;
}
public Map<Long, Table> getIdToTable() {
@Override
public Map<Long, TableIf> getIdToTable() {
return new HashMap<>(idToTable);
}

View File

@ -30,6 +30,7 @@ import org.apache.logging.log4j.Logger;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@ -257,4 +258,6 @@ public interface DatabaseIf<T extends TableIf> {
default long getLastUpdateTime() {
return -1L;
}
public Map<Long, TableIf> getIdToTable();
}

View File

@ -20,6 +20,7 @@ package org.apache.doris.catalog.external;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.DatabaseProperty;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.io.Text;
@ -44,6 +45,7 @@ import org.apache.logging.log4j.Logger;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -360,4 +362,9 @@ public abstract class ExternalDatabase<T extends ExternalTable>
public void createTableForReplay(String tableName, long tableId) {
throw new NotImplementedException("createTable() is not implemented");
}
@Override
public Map<Long, TableIf> getIdToTable() {
return new HashMap<>(idToTbl);
}
}

View File

@ -17,7 +17,6 @@
package org.apache.doris.datasource;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.AnalysisException;
@ -34,6 +33,7 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import javax.annotation.Nullable;
@ -172,4 +172,6 @@ public interface CatalogIf<T extends DatabaseIf> {
public Collection<DatabaseIf> getAllDbs();
public boolean enableAutoAnalyze();
public ConcurrentHashMap<Long, DatabaseIf> getIdToDb();
}

View File

@ -63,6 +63,7 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
/**
* The abstract class for all types of external catalogs.
@ -605,4 +606,9 @@ public abstract class ExternalCatalog
}
return ret;
}
@Override
public ConcurrentHashMap<Long, DatabaseIf> getIdToDb() {
return new ConcurrentHashMap<>(idToDb);
}
}

View File

@ -3133,7 +3133,8 @@ public class InternalCatalog implements CatalogIf<Database> {
return newChecksum;
}
public ConcurrentHashMap<Long, Database> getIdToDb() {
@Override
public ConcurrentHashMap<Long, DatabaseIf> getIdToDb() {
return new ConcurrentHashMap<>(idToDb);
}

View File

@ -17,11 +17,11 @@
package org.apache.doris.statistics;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MaterializedIndexMeta;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.util.MasterDaemon;
@ -30,6 +30,7 @@ import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.statistics.util.StatisticsUtil;
import org.apache.doris.system.SystemInfoService;
import com.google.common.collect.Maps;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.text.StringSubstitutor;
import org.apache.logging.log4j.LogManager;
@ -55,12 +56,8 @@ public class StatisticsCleaner extends MasterDaemon {
private OlapTable histStatsTbl;
private Map<Long, CatalogIf> idToCatalog;
/* Internal DB only */
private Map<Long, Database> idToDb;
/* Internal tbl only */
private Map<Long, Table> idToTbl;
private Map<Long, DatabaseIf> idToDb;
private Map<Long, TableIf> idToTbl;
private Map<Long, MaterializedIndexMeta> idToMVIdx;
@ -114,15 +111,23 @@ public class StatisticsCleaner extends MasterDaemon {
}
idToCatalog = Env.getCurrentEnv().getCatalogMgr().getIdToCatalog();
idToDb = Env.getCurrentEnv().getInternalCatalog().getIdToDb();
idToDb = constructDbMap();
idToTbl = constructTblMap();
idToMVIdx = constructIdxMap();
return true;
}
private Map<Long, Table> constructTblMap() {
Map<Long, Table> idToTbl = new HashMap<>();
for (Database db : idToDb.values()) {
private Map<Long, DatabaseIf> constructDbMap() {
Map<Long, DatabaseIf> idToDb = Maps.newHashMap();
for (CatalogIf ctl : idToCatalog.values()) {
idToDb.putAll(ctl.getIdToDb());
}
return idToDb;
}
private Map<Long, TableIf> constructTblMap() {
Map<Long, TableIf> idToTbl = new HashMap<>();
for (DatabaseIf db : idToDb.values()) {
idToTbl.putAll(db.getIdToTable());
}
return idToTbl;
@ -130,7 +135,7 @@ public class StatisticsCleaner extends MasterDaemon {
private Map<Long, MaterializedIndexMeta> constructIdxMap() {
Map<Long, MaterializedIndexMeta> idToMVIdx = new HashMap<>();
for (Table t : idToTbl.values()) {
for (TableIf t : idToTbl.values()) {
if (t instanceof OlapTable) {
OlapTable olapTable = (OlapTable) t;
olapTable.getCopyOfIndexIdToMeta()
@ -213,7 +218,7 @@ public class StatisticsCleaner extends MasterDaemon {
continue;
}
Table t = idToTbl.get(tblId);
TableIf t = idToTbl.get(tblId);
String colId = statsId.colId;
if (t.getColumn(colId) == null) {
expiredStats.ids.add(id);

View File

@ -0,0 +1,120 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
suite("test_hive_statistic_clean", "p2,external,hive,external_remote,external_remote_hive") {
String enabled = context.config.otherConfigs.get("enableExternalHiveTest")
if (enabled != null && enabled.equalsIgnoreCase("true")) {
String extHiveHmsHost = context.config.otherConfigs.get("extHiveHmsHost")
String extHiveHmsPort = context.config.otherConfigs.get("extHiveHmsPort")
String catalog_name = "test_hive_statistic_clean"
sql """drop catalog if exists ${catalog_name};"""
sql """
create catalog if not exists ${catalog_name} properties (
'type'='hms',
'hadoop.username' = 'hadoop',
'hive.metastore.uris' = 'thrift://${extHiveHmsHost}:${extHiveHmsPort}'
);
"""
logger.info("catalog " + catalog_name + " created")
sql """analyze database ${catalog_name}.statistics with sync"""
sql """use ${catalog_name}.statistics"""
def result = sql """show column stats `statistics` (lo_quantity)"""
assertTrue(result.size() == 1)
assertTrue(result[0][0] == "lo_quantity")
assertTrue(result[0][1] == "100.0")
assertTrue(result[0][2] == "46.0")
assertTrue(result[0][3] == "0.0")
assertTrue(result[0][4] == "404.0")
assertTrue(result[0][5] == "4.0")
assertTrue(result[0][6] == "1")
assertTrue(result[0][7] == "50")
result = sql """show column stats `statistics` (lo_orderkey)"""
assertTrue(result.size() == 1)
assertTrue(result[0][0] == "lo_orderkey")
assertTrue(result[0][1] == "100.0")
assertTrue(result[0][2] == "26.0")
assertTrue(result[0][3] == "0.0")
assertTrue(result[0][4] == "404.0")
assertTrue(result[0][5] == "4.0")
assertTrue(result[0][6] == "1")
assertTrue(result[0][7] == "98")
result = sql """show column stats `statistics` (lo_linenumber)"""
assertTrue(result.size() == 1)
assertTrue(result[0][0] == "lo_linenumber")
assertTrue(result[0][1] == "100.0")
assertTrue(result[0][2] == "7.0")
assertTrue(result[0][3] == "0.0")
assertTrue(result[0][4] == "404.0")
assertTrue(result[0][5] == "4.0")
assertTrue(result[0][6] == "1")
assertTrue(result[0][7] == "7")
sql """drop expired stats"""
result = sql """show column stats `statistics` (lo_quantity)"""
assertTrue(result.size() == 1)
assertTrue(result[0][0] == "lo_quantity")
assertTrue(result[0][1] == "100.0")
assertTrue(result[0][2] == "46.0")
assertTrue(result[0][3] == "0.0")
assertTrue(result[0][4] == "404.0")
assertTrue(result[0][5] == "4.0")
assertTrue(result[0][6] == "1")
assertTrue(result[0][7] == "50")
result = sql """show column stats `statistics` (lo_orderkey)"""
assertTrue(result.size() == 1)
assertTrue(result[0][0] == "lo_orderkey")
assertTrue(result[0][1] == "100.0")
assertTrue(result[0][2] == "26.0")
assertTrue(result[0][3] == "0.0")
assertTrue(result[0][4] == "404.0")
assertTrue(result[0][5] == "4.0")
assertTrue(result[0][6] == "1")
assertTrue(result[0][7] == "98")
result = sql """show column stats `statistics` (lo_linenumber)"""
assertTrue(result.size() == 1)
assertTrue(result[0][0] == "lo_linenumber")
assertTrue(result[0][1] == "100.0")
assertTrue(result[0][2] == "7.0")
assertTrue(result[0][3] == "0.0")
assertTrue(result[0][4] == "404.0")
assertTrue(result[0][5] == "4.0")
assertTrue(result[0][6] == "1")
assertTrue(result[0][7] == "7")
def ctlId
result = sql """show proc '/catalogs'"""
for (int i = 0; i < result.size(); i++) {
if (result[i][1] == catalog_name) {
ctlId = result[i][0]
}
}
sql """drop catalog ${catalog_name}"""
sql """drop expired stats"""
result = sql """select * from internal.__internal_schema.column_statistics where catalog_id=${ctlId}"""
assertTrue(result.size() == 0)
}
}