[fix](clone) Fix clone and alter tablet use same tablet path #34889 (#36858)

cherry pick from #34889
This commit is contained in:
deardeng
2024-06-30 20:40:54 +08:00
committed by GitHub
parent 07278e9dcb
commit 92cbbd2b75
8 changed files with 253 additions and 38 deletions

View File

@ -663,7 +663,7 @@ Status DataDir::load() {
}
// gc unused local tablet dir
void DataDir::_perform_tablet_gc(const std::string& tablet_schema_hash_path) {
void DataDir::_perform_tablet_gc(const std::string& tablet_schema_hash_path, int16_t shard_id) {
if (_stop_bg_worker) {
return;
}
@ -681,12 +681,11 @@ void DataDir::_perform_tablet_gc(const std::string& tablet_schema_hash_path) {
if (!tablet || tablet->data_dir() != this) {
if (tablet) {
LOG(INFO) << "The tablet in path " << tablet_schema_hash_path
<< " is not same with the running one: " << tablet->data_dir()->_path << "/"
<< tablet->tablet_path()
<< " is not same with the running one: " << tablet->tablet_path()
<< ", might be the old tablet after migration, try to move it to trash";
}
StorageEngine::instance()->tablet_manager()->try_delete_unused_tablet_path(
this, tablet_id, schema_hash, tablet_schema_hash_path);
this, tablet_id, schema_hash, tablet_schema_hash_path, shard_id);
return;
}
@ -855,7 +854,14 @@ void DataDir::perform_path_gc() {
std::this_thread::sleep_for(
std::chrono::milliseconds(config::path_gc_check_step_interval_ms));
}
_perform_tablet_gc(tablet_id_path + '/' + schema_hash.file_name);
int16_t shard_id = -1;
try {
shard_id = std::stoi(shard.file_name);
} catch (const std::exception&) {
LOG(WARNING) << "failed to stoi shard_id, shard name=" << shard.file_name;
continue;
}
_perform_tablet_gc(tablet_id_path + '/' + schema_hash.file_name, shard_id);
}
}
}
@ -957,8 +963,16 @@ Status DataDir::move_to_trash(const std::string& tablet_path) {
}
// 5. check parent dir of source file, delete it when empty
RETURN_IF_ERROR(delete_tablet_parent_path_if_empty(tablet_path));
return Status::OK();
}
Status DataDir::delete_tablet_parent_path_if_empty(const std::string& tablet_path) {
auto fs_tablet_path = io::Path(tablet_path);
std::string source_parent_dir = fs_tablet_path.parent_path(); // tablet_id level
std::vector<io::FileInfo> sub_files;
bool exists = true;
RETURN_IF_ERROR(
io::global_local_filesystem()->list(source_parent_dir, false, &sub_files, &exists));
if (sub_files.empty()) {
@ -966,7 +980,6 @@ Status DataDir::move_to_trash(const std::string& tablet_path) {
// no need to exam return status
RETURN_IF_ERROR(io::global_local_filesystem()->delete_directory(source_parent_dir));
}
return Status::OK();
}

View File

@ -145,6 +145,8 @@ public:
// Move tablet to trash.
Status move_to_trash(const std::string& tablet_path);
static Status delete_tablet_parent_path_if_empty(const std::string& tablet_path);
private:
Status _init_cluster_id();
Status _init_capacity_and_create_shards();
@ -161,7 +163,7 @@ private:
int _path_gc_step {0};
void _perform_tablet_gc(const std::string& tablet_schema_hash_path);
void _perform_tablet_gc(const std::string& tablet_schema_hash_path, int16_t shard_name);
void _perform_rowset_gc(const std::string& tablet_schema_hash_path);

View File

@ -1034,6 +1034,8 @@ Status StorageEngine::_do_sweep(const std::string& scan_root, const time_t& loca
string path_name = sorted_path.string();
if (difftime(local_now, mktime(&local_tm_create)) >= actual_expire) {
res = io::global_local_filesystem()->delete_directory(path_name);
LOG(INFO) << "do sweep delete directory " << path_name << " local_now " << local_now
<< "actual_expire " << actual_expire << " res " << res;
if (!res.ok()) {
continue;
}

View File

@ -59,6 +59,7 @@
#include "runtime/memory/mem_tracker.h"
#include "runtime/thread_context.h"
#include "service/backend_options.h"
#include "util/defer_op.h"
#include "util/doris_metrics.h"
#include "util/histogram.h"
#include "util/metrics.h"
@ -522,9 +523,6 @@ Status TabletManager::drop_tablet(TTabletId tablet_id, TReplicaId replica_id,
bool is_drop_table_or_partition) {
auto& shard = _get_tablets_shard(tablet_id);
std::lock_guard wrlock(shard.lock);
if (shard.tablets_under_clone.count(tablet_id) > 0) {
return Status::Aborted("tablet {} is under clone, skip drop task", tablet_id);
}
return _drop_tablet_unlocked(tablet_id, replica_id, false, is_drop_table_or_partition);
}
@ -535,6 +533,9 @@ Status TabletManager::_drop_tablet_unlocked(TTabletId tablet_id, TReplicaId repl
<< ", is_drop_table_or_partition=" << is_drop_table_or_partition;
DorisMetrics::instance()->drop_tablet_requests_total->increment(1);
RETURN_IF_ERROR(register_transition_tablet(tablet_id, "drop tablet"));
Defer defer {[&]() { unregister_transition_tablet(tablet_id, "drop tablet"); }};
// Fetch tablet which need to be dropped
TabletSharedPtr to_drop_tablet = _get_tablet_unlocked(tablet_id);
if (to_drop_tablet == nullptr) {
@ -542,12 +543,14 @@ Status TabletManager::_drop_tablet_unlocked(TTabletId tablet_id, TReplicaId repl
<< "tablet_id=" << tablet_id;
return Status::OK();
}
// We should compare replica id to avoid dropping new cloned tablet.
// Iff request replica id is 0, FE may be an older release, then we drop this tablet as before.
if (to_drop_tablet->replica_id() != replica_id && replica_id != 0) {
return Status::Aborted("replica_id not match({} vs {})", to_drop_tablet->replica_id(),
replica_id);
}
_remove_tablet_from_partition(to_drop_tablet);
tablet_map_t& tablet_map = _get_tablet_map(tablet_id);
tablet_map.erase(tablet_id);
@ -1045,6 +1048,7 @@ void TabletManager::build_all_report_tablets_info(std::map<TTabletId, TTablet>*
}
Status TabletManager::start_trash_sweep() {
DBUG_EXECUTE_IF("TabletManager.start_trash_sweep.sleep", DBUG_BLOCK);
std::unique_lock<std::mutex> lock(_gc_tablets_lock, std::defer_lock);
if (!lock.try_lock()) {
return Status::OK();
@ -1117,6 +1121,33 @@ Status TabletManager::start_trash_sweep() {
}
bool TabletManager::_move_tablet_to_trash(const TabletSharedPtr& tablet) {
RETURN_IF_ERROR(register_transition_tablet(tablet->tablet_id(), "move to trash"));
Defer defer {[&]() { unregister_transition_tablet(tablet->tablet_id(), "move to trash"); }};
TabletSharedPtr tablet_in_not_shutdown = get_tablet(tablet->tablet_id());
if (tablet_in_not_shutdown) {
TSchemaHash schema_hash_not_shutdown = tablet_in_not_shutdown->schema_hash();
size_t path_hash_not_shutdown = tablet_in_not_shutdown->data_dir()->path_hash();
if (tablet->schema_hash() == schema_hash_not_shutdown &&
tablet->data_dir()->path_hash() == path_hash_not_shutdown) {
tablet->clear_cache();
// shard_id in memory not eq shard_id in shutdown
if (tablet_in_not_shutdown->tablet_path() != tablet->tablet_path()) {
LOG(INFO) << "tablet path not eq shutdown tablet path, move it to trash, tablet_id="
<< tablet_in_not_shutdown->tablet_id()
<< " mem manager tablet path=" << tablet_in_not_shutdown->tablet_path()
<< " shutdown tablet path=" << tablet->tablet_path();
return tablet->data_dir()->move_to_trash(tablet->tablet_path());
} else {
LOG(INFO) << "tablet path eq shutdown tablet path, not move to trash, tablet_id="
<< tablet_in_not_shutdown->tablet_id()
<< " mem manager tablet path=" << tablet_in_not_shutdown->tablet_path()
<< " shutdown tablet path=" << tablet->tablet_path();
return true;
}
}
}
TabletMetaSharedPtr tablet_meta(new TabletMeta());
int64_t get_meta_ts = MonotonicMicros();
Status check_st = TabletMetaManager::get_meta(tablet->data_dir(), tablet->tablet_id(),
@ -1184,6 +1215,15 @@ bool TabletManager::_move_tablet_to_trash(const TabletSharedPtr& tablet) {
return false;
}
if (exists) {
if (check_st.is<META_KEY_NOT_FOUND>()) {
LOG(INFO) << "could not find tablet meta in rocksdb, so just delete it path "
<< "tablet_id=" << tablet->tablet_id()
<< ", schema_hash=" << tablet->schema_hash()
<< ", delete tablet_path=" << tablet_path;
RETURN_IF_ERROR(io::global_local_filesystem()->delete_directory(tablet_path));
RETURN_IF_ERROR(DataDir::delete_tablet_parent_path_if_empty(tablet_path));
return true;
}
LOG(WARNING) << "errors while load meta from store, skip this tablet. "
<< "tablet_id=" << tablet->tablet_id()
<< ", schema_hash=" << tablet->schema_hash();
@ -1198,21 +1238,68 @@ bool TabletManager::_move_tablet_to_trash(const TabletSharedPtr& tablet) {
}
}
bool TabletManager::register_clone_tablet(int64_t tablet_id) {
Status TabletManager::register_transition_tablet(int64_t tablet_id, std::string reason) {
tablets_shard& shard = _get_tablets_shard(tablet_id);
std::lock_guard<std::shared_mutex> wrlock(shard.lock);
return shard.tablets_under_clone.insert(tablet_id).second;
std::thread::id thread_id = std::this_thread::get_id();
std::lock_guard<std::mutex> lk(shard.lock_for_transition);
if (auto search = shard.tablets_under_transition.find(tablet_id);
search == shard.tablets_under_transition.end()) {
// not found
shard.tablets_under_transition[tablet_id] = std::make_tuple(reason, thread_id, 1);
LOG(INFO) << "add tablet_id= " << tablet_id << " to map, reason=" << reason
<< " lock times=1 thread_id_in_map=" << thread_id;
return Status::OK();
} else {
// found
auto& [r, thread_id_in_map, lock_times] = search->second;
if (thread_id != thread_id_in_map) {
// other thread, failed
LOG(INFO) << "tablet_id = " << tablet_id << " is doing " << r
<< " thread_id_in_map=" << thread_id_in_map << " , add reason=" << reason
<< " thread_id=" << thread_id;
return Status::InternalError<false>("{} failed try later, tablet_id={}", reason,
tablet_id);
}
// add lock times
++lock_times;
LOG(INFO) << "add tablet_id= " << tablet_id << " to map, reason=" << reason
<< " lock times=" << lock_times << " thread_id_in_map=" << thread_id_in_map;
return Status::OK();
}
}
void TabletManager::unregister_clone_tablet(int64_t tablet_id) {
void TabletManager::unregister_transition_tablet(int64_t tablet_id, std::string reason) {
tablets_shard& shard = _get_tablets_shard(tablet_id);
std::lock_guard<std::shared_mutex> wrlock(shard.lock);
shard.tablets_under_clone.erase(tablet_id);
std::thread::id thread_id = std::this_thread::get_id();
std::lock_guard<std::mutex> lk(shard.lock_for_transition);
if (auto search = shard.tablets_under_transition.find(tablet_id);
search == shard.tablets_under_transition.end()) {
// impossible, bug
DCHECK(false) << "tablet " << tablet_id
<< " must be found, before unreg must have been reg";
} else {
auto& [r, thread_id_in_map, lock_times] = search->second;
if (thread_id_in_map != thread_id) {
// impossible, bug
DCHECK(false) << "tablet " << tablet_id << " unreg thread must same reg thread";
}
// sub lock times
--lock_times;
if (lock_times != 0) {
LOG(INFO) << "erase tablet_id= " << tablet_id << " from map, reason=" << reason
<< " left=" << lock_times << " thread_id_in_map=" << thread_id_in_map;
} else {
LOG(INFO) << "erase tablet_id= " << tablet_id << " from map, reason=" << reason
<< " thread_id_in_map=" << thread_id_in_map;
shard.tablets_under_transition.erase(tablet_id);
}
}
}
void TabletManager::try_delete_unused_tablet_path(DataDir* data_dir, TTabletId tablet_id,
SchemaHash schema_hash,
const string& schema_hash_path) {
const string& schema_hash_path,
int16_t shard_id) {
// acquire the read lock, so that there is no creating tablet or load tablet from meta tasks
// create tablet and load tablet task should check whether the dir exists
tablets_shard& shard = _get_tablets_shard(tablet_id);
@ -1221,13 +1308,21 @@ void TabletManager::try_delete_unused_tablet_path(DataDir* data_dir, TTabletId t
// check if meta already exists
TabletMetaSharedPtr tablet_meta(new TabletMeta());
Status check_st = TabletMetaManager::get_meta(data_dir, tablet_id, schema_hash, tablet_meta);
if (check_st.ok()) {
LOG(INFO) << "tablet meta exists in meta store, skip delete the path " << schema_hash_path;
if (check_st.ok() && tablet_meta->shard_id() == shard_id) {
return;
}
if (shard.tablets_under_clone.count(tablet_id) > 0) {
LOG(INFO) << "tablet is under clone, skip delete the path " << schema_hash_path;
LOG(INFO) << "tablet meta not exists, try delete tablet path " << schema_hash_path;
bool succ = register_transition_tablet(tablet_id, "path gc");
if (!succ) {
return;
}
Defer defer {[&]() { unregister_transition_tablet(tablet_id, "path gc"); }};
TabletSharedPtr tablet = _get_tablet_unlocked(tablet_id);
if (tablet != nullptr && tablet->tablet_path() == schema_hash_path) {
LOG(INFO) << "tablet , skip delete the path " << schema_hash_path;
return;
}

View File

@ -137,7 +137,8 @@ public:
Status start_trash_sweep();
void try_delete_unused_tablet_path(DataDir* data_dir, TTabletId tablet_id,
SchemaHash schema_hash, const std::string& schema_hash_path);
SchemaHash schema_hash, const std::string& schema_hash_path,
int16_t shard_id);
void update_root_path_info(std::map<std::string, DataDirInfo>* path_map,
size_t* tablet_counter);
@ -153,8 +154,8 @@ public:
void obtain_specific_quantity_tablets(std::vector<TabletInfo>& tablets_info, int64_t num);
// return `true` if register success
bool register_clone_tablet(int64_t tablet_id);
void unregister_clone_tablet(int64_t tablet_id);
Status register_transition_tablet(int64_t tablet_id, std::string reason);
void unregister_transition_tablet(int64_t tablet_id, std::string reason);
void get_tablets_distribution_on_different_disks(
std::map<int64_t, std::map<DataDir*, int64_t>>& tablets_num_on_disk,
@ -225,12 +226,15 @@ private:
tablets_shard() = default;
tablets_shard(tablets_shard&& shard) {
tablet_map = std::move(shard.tablet_map);
tablets_under_clone = std::move(shard.tablets_under_clone);
tablets_under_transition = std::move(shard.tablets_under_transition);
}
// protect tablet_map, tablets_under_clone and tablets_under_restore
mutable std::shared_mutex lock;
tablet_map_t tablet_map;
std::set<int64_t> tablets_under_clone;
std::mutex lock_for_transition;
// tablet do clone, path gc, move to trash, disk migrate will record in tablets_under_transition
// tablet <reason, thread_id, lock_times>
std::map<int64_t, std::tuple<std::string, std::thread::id, int64_t>>
tablets_under_transition;
};
struct Partition {

View File

@ -155,12 +155,7 @@ EngineCloneTask::EngineCloneTask(const TCloneReq& clone_req, const TMasterInfo&
}
Status EngineCloneTask::execute() {
// register the tablet to avoid it is deleted by gc thread during clone process
if (!StorageEngine::instance()->tablet_manager()->register_clone_tablet(_clone_req.tablet_id)) {
return Status::InternalError("tablet {} is under clone", _clone_req.tablet_id);
}
Status st = _do_clone();
StorageEngine::instance()->tablet_manager()->unregister_clone_tablet(_clone_req.tablet_id);
StorageEngine::instance()->tablet_manager()->update_partitions_visible_version(
{{_clone_req.partition_id, _clone_req.version}});
return st;
@ -174,6 +169,13 @@ Status EngineCloneTask::_do_clone() {
Status status = Status::OK();
string src_file_path;
TBackend src_host;
RETURN_IF_ERROR(StorageEngine::instance()->tablet_manager()->register_transition_tablet(
_clone_req.tablet_id, "clone"));
Defer defer {[&]() {
StorageEngine::instance()->tablet_manager()->unregister_transition_tablet(
_clone_req.tablet_id, "clone");
}};
// Check local tablet exist or not
TabletSharedPtr tablet =
StorageEngine::instance()->tablet_manager()->get_tablet(_clone_req.tablet_id);
@ -184,14 +186,8 @@ Status EngineCloneTask::_do_clone() {
if (tablet && tablet->tablet_state() == TABLET_NOTREADY) {
LOG(WARNING) << "tablet state is not ready when clone, need to drop old tablet, tablet_id="
<< tablet->tablet_id();
// can not drop tablet when under clone. so unregister clone tablet firstly.
StorageEngine::instance()->tablet_manager()->unregister_clone_tablet(_clone_req.tablet_id);
RETURN_IF_ERROR(StorageEngine::instance()->tablet_manager()->drop_tablet(
tablet->tablet_id(), tablet->replica_id(), false));
if (!StorageEngine::instance()->tablet_manager()->register_clone_tablet(
_clone_req.tablet_id)) {
return Status::InternalError("tablet {} is under clone", _clone_req.tablet_id);
}
tablet.reset();
}
bool is_new_tablet = tablet == nullptr;
@ -274,8 +270,21 @@ Status EngineCloneTask::_do_clone() {
<< ". signature: " << _signature;
WARN_IF_ERROR(io::global_local_filesystem()->delete_directory(tablet_dir),
"failed to delete useless clone dir ");
WARN_IF_ERROR(DataDir::delete_tablet_parent_path_if_empty(tablet_dir),
"failed to delete parent dir");
}};
bool exists = true;
Status exists_st = io::global_local_filesystem()->exists(tablet_dir, &exists);
if (!exists_st) {
LOG(WARNING) << "cant get path=" << tablet_dir << " state, st=" << exists_st;
return exists_st;
}
if (exists) {
LOG(WARNING) << "before clone dest path=" << tablet_dir << " exist, remote it first";
RETURN_IF_ERROR(io::global_local_filesystem()->delete_directory(tablet_dir));
}
bool allow_incremental_clone = false;
RETURN_IF_ERROR_(status,
_make_and_download_snapshots(*store, tablet_dir, &src_host, &src_file_path,

View File

@ -199,6 +199,13 @@ Status EngineStorageMigrationTask::_migrate() {
LOG(INFO) << "begin to process tablet migrate. "
<< "tablet_id=" << tablet_id << ", dest_store=" << _dest_store->path();
RETURN_IF_ERROR(StorageEngine::instance()->tablet_manager()->register_transition_tablet(
_tablet->tablet_id(), "disk migrate"));
Defer defer {[&]() {
StorageEngine::instance()->tablet_manager()->unregister_transition_tablet(
_tablet->tablet_id(), "disk migrate");
}};
DorisMetrics::instance()->storage_migrate_requests_total->increment(1);
int32_t start_version = 0;
int32_t end_version = 0;
@ -313,6 +320,7 @@ Status EngineStorageMigrationTask::_migrate() {
if (!res.ok()) {
// we should remove the dir directly for avoid disk full of junk data, and it's safe to remove
RETURN_IF_ERROR(io::global_local_filesystem()->delete_directory(full_path));
RETURN_IF_ERROR(DataDir::delete_tablet_parent_path_if_empty(full_path));
}
return res;
}

View File

@ -0,0 +1,82 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
import org.apache.doris.regression.suite.ClusterOptions
import org.junit.Assert
suite('test_drop_clone_tablet_path_race') {
def options = new ClusterOptions()
options.enableDebugPoints()
options.feConfigs += [
'tablet_checker_interval_ms=100',
'schedule_slot_num_per_hdd_path=1000',
'storage_high_watermark_usage_percent=99',
'storage_flood_stage_usage_percent=99',
]
options.beNum = 3
docker(options) {
def table = "t1"
def checkFunc = {size ->
boolean succ = false
for (int i = 0; i < 120; i++) {
def result = sql_return_maparray """SHOW TABLETS FROM ${table}"""
if (result.size() == size) {
def version = result[0].Version
def state = result[0].State
succ = result.every { it.Version.equals(version) && it.State.equals(state) }
if (succ) {
break
}
}
sleep(1000)
}
Assert.assertTrue(succ)
}
sql """DROP TABLE IF EXISTS ${table}"""
sql """
CREATE TABLE `${table}` (
`id` int(11) NULL,
`name` varchar(255) NULL,
`score` int(11) SUM NULL
) ENGINE=OLAP
AGGREGATE KEY(`id`, `name`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`id`) BUCKETS 10
PROPERTIES (
'replication_num' = '3'
);
"""
try {
// 10h
GetDebugPoint().enableDebugPointForAllBEs("TabletManager.start_trash_sweep.sleep")
for(int i= 0; i < 100; ++i) {
sql """INSERT INTO ${table} values (${i}, "${i}str", ${i} * 100)"""
}
sql """ALTER TABLE ${table} MODIFY PARTITION(${table}) SET ("replication_num" = "2")"""
checkFunc(20)
sql """ALTER TABLE ${table} MODIFY PARTITION(${table}) SET ("replication_num" = "3")"""
checkFunc(30)
} finally {
GetDebugPoint().disableDebugPointForAllBEs("TabletManager.start_trash_sweep.sleep")
}
}
}