[Bug] Fix bug that truncate table may change the storage medium property (#6905)
This commit is contained in:
@ -689,7 +689,6 @@ Status OlapScanNode::get_hints(const TPaloScanRange& scan_range, int block_row_c
|
||||
res = table->split_range(key_range->begin_scan_range, key_range->end_scan_range,
|
||||
block_row_count, &range);
|
||||
if (res != OLAP_SUCCESS) {
|
||||
OLAP_LOG_WARNING("fail to show hints by split range. [res=%d]", res);
|
||||
return Status::InternalError("fail to show hints");
|
||||
}
|
||||
ranges.emplace_back(std::move(range));
|
||||
@ -700,7 +699,6 @@ Status OlapScanNode::get_hints(const TPaloScanRange& scan_range, int block_row_c
|
||||
std::vector<OlapTuple> range;
|
||||
auto res = table->split_range({}, {}, block_row_count, &range);
|
||||
if (res != OLAP_SUCCESS) {
|
||||
OLAP_LOG_WARNING("fail to show hints by split range. [res=%d]", res);
|
||||
return Status::InternalError("fail to show hints");
|
||||
}
|
||||
ranges.emplace_back(std::move(range));
|
||||
|
||||
@ -73,7 +73,7 @@ BloomFilterPredicate::BloomFilterPredicate(const TExprNode& node)
|
||||
_scan_rows(0) {}
|
||||
|
||||
BloomFilterPredicate::~BloomFilterPredicate() {
|
||||
LOG(INFO) << "bloom filter rows:" << _filtered_rows << ",scan_rows:" << _scan_rows
|
||||
VLOG_NOTICE << "bloom filter rows:" << _filtered_rows << ",scan_rows:" << _scan_rows
|
||||
<< ",rate:" << (double)_filtered_rows / _scan_rows;
|
||||
}
|
||||
|
||||
|
||||
@ -58,8 +58,8 @@ Status IRuntimeFilter::push_to_remote(RuntimeState* state, const TNetworkAddress
|
||||
|
||||
Status serialize_status = serialize(&_rpc_context->request, &data, &len);
|
||||
if (serialize_status.ok()) {
|
||||
LOG(INFO) << "Producer:" << _rpc_context->request.ShortDebugString() << addr->hostname
|
||||
<< ":" << addr->port;
|
||||
VLOG_NOTICE << "Producer:" << _rpc_context->request.ShortDebugString() << addr->hostname
|
||||
<< ":" << addr->port;
|
||||
if (len > 0) {
|
||||
DCHECK(data != nullptr);
|
||||
_rpc_context->cntl.request_attachment().append(data, len);
|
||||
|
||||
@ -166,7 +166,9 @@ OLAPStatus AlphaRowset::split_range(const RowCursor& start_key, const RowCursor&
|
||||
std::vector<OlapTuple>* ranges) {
|
||||
if (key_num > _schema->num_short_key_columns()) {
|
||||
// should not happen
|
||||
LOG(WARNING) << "key num " << key_num << " should less than or equal to short key column number: "
|
||||
// But since aloha rowset is deprecated in future and it will not fail the query,
|
||||
// just use VLOG to avoid too many warning logs.
|
||||
VLOG_NOTICE << "key num " << key_num << " should less than or equal to short key column number: "
|
||||
<< _schema->num_short_key_columns();
|
||||
return OLAP_ERR_INVALID_SCHEMA;
|
||||
}
|
||||
|
||||
@ -560,13 +560,19 @@ OLAPStatus Tablet::capture_consistent_versions(const Version& spec_version,
|
||||
std::vector<Version> missed_versions;
|
||||
calc_missed_versions_unlocked(spec_version.second, &missed_versions);
|
||||
if (missed_versions.empty()) {
|
||||
LOG(WARNING) << "tablet:" << full_name()
|
||||
<< ", version already has been merged. spec_version: " << spec_version;
|
||||
// if version_path is null, it may be a compaction check logic.
|
||||
// so to avoid print too many logs.
|
||||
if (version_path != nullptr) {
|
||||
LOG(WARNING) << "tablet:" << full_name()
|
||||
<< ", version already has been merged. spec_version: " << spec_version;
|
||||
}
|
||||
status = OLAP_ERR_VERSION_ALREADY_MERGED;
|
||||
} else {
|
||||
LOG(WARNING) << "status:" << status << ", tablet:" << full_name()
|
||||
<< ", missed version for version:" << spec_version;
|
||||
_print_missed_versions(missed_versions);
|
||||
if (version_path != nullptr) {
|
||||
LOG(WARNING) << "status:" << status << ", tablet:" << full_name()
|
||||
<< ", missed version for version:" << spec_version;
|
||||
_print_missed_versions(missed_versions);
|
||||
}
|
||||
}
|
||||
}
|
||||
return status;
|
||||
|
||||
@ -45,6 +45,7 @@
|
||||
#include "olap/tablet_meta.h"
|
||||
#include "olap/tablet_meta_manager.h"
|
||||
#include "olap/utils.h"
|
||||
#include "service/backend_options.h"
|
||||
#include "util/doris_metrics.h"
|
||||
#include "util/file_utils.h"
|
||||
#include "util/histogram.h"
|
||||
@ -553,7 +554,7 @@ TabletSharedPtr TabletManager::_get_tablet_unlocked(TTabletId tablet_id, SchemaH
|
||||
|
||||
if (tablet == nullptr) {
|
||||
if (err != nullptr) {
|
||||
*err = "tablet does not exist";
|
||||
*err = "tablet does not exist. " + BackendOptions::get_localhost();
|
||||
}
|
||||
return nullptr;
|
||||
}
|
||||
@ -561,7 +562,7 @@ TabletSharedPtr TabletManager::_get_tablet_unlocked(TTabletId tablet_id, SchemaH
|
||||
if (!tablet->is_used()) {
|
||||
LOG(WARNING) << "tablet cannot be used. tablet=" << tablet_id;
|
||||
if (err != nullptr) {
|
||||
*err = "tablet cannot be used";
|
||||
*err = "tablet cannot be used. " + BackendOptions::get_localhost();
|
||||
}
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
@ -71,7 +71,7 @@ DataStreamSender::Channel::Channel(DataStreamSender* parent, const RowDescriptor
|
||||
std::string localhost = BackendOptions::get_localhost();
|
||||
_is_local = _brpc_dest_addr.hostname == localhost && _brpc_dest_addr.port == config::brpc_port;
|
||||
if (_is_local) {
|
||||
LOG(INFO) << "will use local exechange, dest_node_id:" << _dest_node_id;
|
||||
VLOG_NOTICE << "will use local exechange, dest_node_id:" << _dest_node_id;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -231,7 +231,7 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ
|
||||
|
||||
PBackendService_Stub* stub = ExecEnv::GetInstance()->brpc_stub_cache()->get_stub(
|
||||
targets[i].target_fragment_instance_addr);
|
||||
LOG(INFO) << "send filter " << rpc_contexts[i]->request.filter_id()
|
||||
VLOG_NOTICE << "send filter " << rpc_contexts[i]->request.filter_id()
|
||||
<< " to:" << targets[i].target_fragment_instance_addr.hostname << ":"
|
||||
<< targets[i].target_fragment_instance_addr.port
|
||||
<< rpc_contexts[i]->request.ShortDebugString();
|
||||
|
||||
@ -308,7 +308,7 @@ void PInternalServiceImpl<T>::apply_filter(::google::protobuf::RpcController* co
|
||||
auto attachment = static_cast<brpc::Controller*>(controller)->request_attachment();
|
||||
UniqueId unique_id(request->query_id());
|
||||
// TODO: avoid copy attachment copy
|
||||
LOG(INFO) << "rpc apply_filter recv";
|
||||
VLOG_NOTICE << "rpc apply_filter recv";
|
||||
Status st = _exec_env->fragment_mgr()->apply_filter(request, attachment.to_string().data());
|
||||
if (!st.ok()) {
|
||||
LOG(WARNING) << "apply filter meet error" << st.to_string();
|
||||
|
||||
@ -49,9 +49,6 @@ import org.apache.doris.thrift.TFinishTaskRequest;
|
||||
import org.apache.doris.thrift.TStatusCode;
|
||||
import org.apache.doris.thrift.TTaskType;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Predicates;
|
||||
@ -61,6 +58,9 @@ import com.google.common.collect.Collections2;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.File;
|
||||
@ -505,7 +505,7 @@ public class BackupJob extends AbstractJob {
|
||||
// only copy visible indexes
|
||||
List<String> reservedPartitions = tableRef.getPartitionNames() == null ? null
|
||||
: tableRef.getPartitionNames().getPartitionNames();
|
||||
OlapTable copiedTbl = olapTable.selectiveCopy(reservedPartitions, true, IndexExtState.VISIBLE);
|
||||
OlapTable copiedTbl = olapTable.selectiveCopy(reservedPartitions, IndexExtState.VISIBLE, true);
|
||||
if (copiedTbl == null) {
|
||||
status = new Status(ErrCode.COMMON_ERROR, "failed to copy table: " + tblName);
|
||||
return;
|
||||
|
||||
@ -777,6 +777,7 @@ public class Catalog {
|
||||
// 1. check and create dirs and files
|
||||
File meta = new File(metaDir);
|
||||
if (!meta.exists()) {
|
||||
LOG.warn("Doris' meta dir {} does not exist. You need to create it before starting FE", meta.getAbsolutePath());
|
||||
throw new Exception(meta.getAbsolutePath() + " does not exist, will exit");
|
||||
}
|
||||
|
||||
@ -6680,7 +6681,7 @@ public class Catalog {
|
||||
partitionsDistributionInfo.put(partition.getId(), partition.getDistributionInfo());
|
||||
}
|
||||
}
|
||||
copiedTbl = olapTable.selectiveCopy(origPartitions.keySet(), true, IndexExtState.VISIBLE);
|
||||
copiedTbl = olapTable.selectiveCopy(origPartitions.keySet(), IndexExtState.VISIBLE, false);
|
||||
} finally {
|
||||
olapTable.readUnlock();
|
||||
}
|
||||
|
||||
@ -1278,34 +1278,37 @@ public class OlapTable extends Table {
|
||||
return table instanceof OlapTable;
|
||||
}
|
||||
|
||||
public OlapTable selectiveCopy(Collection<String> reservedPartitions, boolean resetState, IndexExtState extState) {
|
||||
public OlapTable selectiveCopy(Collection<String> reservedPartitions, IndexExtState extState, boolean isForBackup) {
|
||||
OlapTable copied = new OlapTable();
|
||||
if (!DeepCopy.copy(this, copied, OlapTable.class, FeConstants.meta_version)) {
|
||||
LOG.warn("failed to copy olap table: " + getName());
|
||||
return null;
|
||||
}
|
||||
|
||||
if (resetState) {
|
||||
// remove shadow index from copied table
|
||||
List<MaterializedIndex> shadowIndex = copied.getPartitions().stream().findFirst().get().getMaterializedIndices(IndexExtState.SHADOW);
|
||||
|
||||
// remove shadow index from copied table
|
||||
List<MaterializedIndex> shadowIndex = copied.getPartitions().stream().findFirst().get().getMaterializedIndices(IndexExtState.SHADOW);
|
||||
for (MaterializedIndex deleteIndex : shadowIndex) {
|
||||
LOG.debug("copied table delete shadow index : {}", deleteIndex.getId());
|
||||
copied.deleteIndexInfo(copied.getIndexNameById(deleteIndex.getId()));
|
||||
}
|
||||
copied.setState(OlapTableState.NORMAL);
|
||||
for (Partition partition : copied.getPartitions()) {
|
||||
// remove shadow index from partition
|
||||
for (MaterializedIndex deleteIndex : shadowIndex) {
|
||||
LOG.debug("copied table delete shadow index : {}", deleteIndex.getId());
|
||||
copied.deleteIndexInfo(copied.getIndexNameById(deleteIndex.getId()));
|
||||
partition.deleteRollupIndex(deleteIndex.getId());
|
||||
}
|
||||
copied.setState(OlapTableState.NORMAL);
|
||||
for (Partition partition : copied.getPartitions()) {
|
||||
// remove shadow index from partition
|
||||
for (MaterializedIndex deleteIndex : shadowIndex) {
|
||||
partition.deleteRollupIndex(deleteIndex.getId());
|
||||
}
|
||||
partition.setState(PartitionState.NORMAL);
|
||||
partition.setState(PartitionState.NORMAL);
|
||||
if (isForBackup) {
|
||||
// set storage medium to HDD for backup job, because we want that the backuped table
|
||||
// can be able to restored to another Doris cluster without SSD disk.
|
||||
// But for other operation such as truncate table, keep the origin storage medium.
|
||||
copied.getPartitionInfo().setDataProperty(partition.getId(), new DataProperty(TStorageMedium.HDD));
|
||||
for (MaterializedIndex idx : partition.getMaterializedIndices(extState)) {
|
||||
idx.setState(IndexState.NORMAL);
|
||||
for (Tablet tablet : idx.getTablets()) {
|
||||
for (Replica replica : tablet.getReplicas()) {
|
||||
replica.setState(ReplicaState.NORMAL);
|
||||
}
|
||||
}
|
||||
for (MaterializedIndex idx : partition.getMaterializedIndices(extState)) {
|
||||
idx.setState(IndexState.NORMAL);
|
||||
for (Tablet tablet : idx.getTablets()) {
|
||||
for (Replica replica : tablet.getReplicas()) {
|
||||
replica.setState(ReplicaState.NORMAL);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1315,10 +1318,10 @@ public class OlapTable extends Table {
|
||||
// reserve all
|
||||
return copied;
|
||||
}
|
||||
|
||||
|
||||
Set<String> partNames = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
|
||||
partNames.addAll(copied.getPartitionNames());
|
||||
|
||||
|
||||
for (String partName : partNames) {
|
||||
if (!reservedPartitions.contains(partName)) {
|
||||
copied.dropPartitionAndReserveTablet(partName);
|
||||
|
||||
@ -241,7 +241,7 @@ public class FileSystemManager {
|
||||
return null;
|
||||
}
|
||||
if (fileSystem.getDFSFileSystem() == null) {
|
||||
logger.info("could not find file system for path " + path + " create a new one");
|
||||
logger.info("create file system for new path: " + path);
|
||||
UserGroupInformation ugi = null;
|
||||
|
||||
// create a new filesystem
|
||||
@ -402,7 +402,7 @@ public class FileSystemManager {
|
||||
return null;
|
||||
}
|
||||
if (fileSystem.getDFSFileSystem() == null) {
|
||||
logger.info("could not find file system for path " + path + " create a new one");
|
||||
logger.info("create file system for new path " + path);
|
||||
// create a new filesystem
|
||||
Configuration conf = new Configuration();
|
||||
conf.set(FS_S3A_ACCESS_KEY, accessKey);
|
||||
|
||||
Reference in New Issue
Block a user