[Enhancement] Garbage collection of unused data on remote storage backend (#10731)

* [Feature](cold_on_s3) support unused remote rowset gc

* return aborted when skip drop tablet

* perform unused remote rowset gc
This commit is contained in:
plat1ko
2022-07-29 14:38:39 +08:00
committed by GitHub
parent eb4721cd80
commit a6537a90cd
37 changed files with 446 additions and 215 deletions

View File

@ -432,7 +432,8 @@ void TaskWorkerPool::_drop_tablet_worker_thread_callback() {
drop_tablet_req.tablet_id, false, &err);
if (dropped_tablet != nullptr) {
Status drop_status = StorageEngine::instance()->tablet_manager()->drop_tablet(
drop_tablet_req.tablet_id, drop_tablet_req.replica_id);
drop_tablet_req.tablet_id, drop_tablet_req.replica_id,
drop_tablet_req.is_drop_table_or_partition);
if (!drop_status.ok()) {
LOG(WARNING) << "drop table failed! signature: " << agent_task_req.signature;
error_msgs.push_back("drop table failed!");
@ -442,11 +443,6 @@ void TaskWorkerPool::_drop_tablet_worker_thread_callback() {
StorageEngine::instance()->txn_manager()->force_rollback_tablet_related_txns(
dropped_tablet->data_dir()->get_meta(), drop_tablet_req.tablet_id,
drop_tablet_req.schema_hash, dropped_tablet->tablet_uid());
// We remove remote rowset directly.
// TODO(cyx): do remove in background
if (drop_tablet_req.is_drop_table_or_partition) {
dropped_tablet->remove_all_remote_rowsets();
}
}
} else {
status_code = TStatusCode::NOT_FOUND;
@ -881,8 +877,7 @@ void TaskWorkerPool::_update_tablet_meta_worker_thread_callback() {
} else {
LOG(INFO) << "set tablet cooldown resource "
<< tablet_meta_info.storage_policy;
tablet->tablet_meta()->set_cooldown_resource(
tablet_meta_info.storage_policy);
tablet->tablet_meta()->set_storage_policy(tablet_meta_info.storage_policy);
}
break;
}

View File

@ -789,6 +789,17 @@ CONF_Int32(s3_transfer_executor_pool_size, "2");
CONF_Bool(enable_time_lut, "true");
#ifdef BE_TEST
// test s3
CONF_String(test_s3_resource, "resource");
CONF_String(test_s3_ak, "ak");
CONF_String(test_s3_sk, "sk");
CONF_String(test_s3_endpoint, "endpoint");
CONF_String(test_s3_region, "region");
CONF_String(test_s3_bucket, "bucket");
CONF_String(test_s3_prefix, "prefix");
#endif
} // namespace config
} // namespace doris

View File

@ -283,10 +283,11 @@ public:
template <typename... Args>
static Status ErrorFmt(TStatusCode::type code, const std::string& fmt, Args&&... args) {
// In some cases, fmt contains '{}' but there are no args.
if (sizeof...(args) == 0) {
if constexpr (sizeof...(args) == 0) {
return Status(code, fmt);
} else {
return Status(code, fmt::format(fmt, std::forward<Args>(args)...));
}
return Status(code, fmt::format(fmt, std::forward<Args>(args)...));
}
template <typename... Args>

View File

@ -50,9 +50,8 @@ Status LocalFileReader::close() {
Status LocalFileReader::read_at(size_t offset, Slice result, size_t* bytes_read) {
DCHECK(!closed());
if (offset > _file_size) {
return Status::IOError(
fmt::format("offset exceeds file size(offset: {), file size: {}, path: {})", offset,
_file_size, _path.native()));
return Status::IOError("offset exceeds file size(offset: {), file size: {}, path: {})",
offset, _file_size, _path.native());
}
size_t bytes_req = result.size;
char* to = result.data;
@ -62,12 +61,10 @@ Status LocalFileReader::read_at(size_t offset, Slice result, size_t* bytes_read)
while (bytes_req != 0) {
auto res = ::pread(_fd, to, bytes_req, offset);
if (UNLIKELY(-1 == res && errno != EINTR)) {
return Status::IOError(
fmt::format("cannot read from {}: {}", _path.native(), std::strerror(errno)));
return Status::IOError("cannot read from {}: {}", _path.native(), std::strerror(errno));
}
if (UNLIKELY(res == 0)) {
return Status::IOError(
fmt::format("cannot read from {}: unexpected EOF", _path.native()));
return Status::IOError("cannot read from {}: unexpected EOF", _path.native());
}
if (res > 0) {
to += res;

View File

@ -40,8 +40,7 @@ Status LocalFileSystem::create_file(const Path& path, FileWriterPtr* writer) {
auto fs_path = absolute_path(path);
int fd = ::open(fs_path.c_str(), O_TRUNC | O_WRONLY | O_CREAT | O_CLOEXEC, 0666);
if (-1 == fd) {
return Status::IOError(
fmt::format("cannot open {}: {}", fs_path.native(), std::strerror(errno)));
return Status::IOError("cannot open {}: {}", fs_path.native(), std::strerror(errno));
}
*writer = std::make_unique<LocalFileWriter>(std::move(fs_path), fd);
return Status::OK();
@ -62,14 +61,16 @@ Status LocalFileSystem::open_file(const Path& path, FileReaderSPtr* reader) {
Status LocalFileSystem::delete_file(const Path& path) {
auto fs_path = absolute_path(path);
if (!std::filesystem::exists(fs_path)) {
return Status::OK();
}
if (!std::filesystem::is_regular_file(fs_path)) {
return Status::IOError(fmt::format("{} is not a file", fs_path.native()));
return Status::IOError("{} is not a file", fs_path.native());
}
std::error_code ec;
std::filesystem::remove(fs_path, ec);
if (ec) {
return Status::IOError(
fmt::format("cannot delete {}: {}", fs_path.native(), std::strerror(ec.value())));
return Status::IOError("cannot delete {}: {}", fs_path.native(), std::strerror(ec.value()));
}
return Status::OK();
}
@ -77,35 +78,36 @@ Status LocalFileSystem::delete_file(const Path& path) {
Status LocalFileSystem::create_directory(const Path& path) {
auto fs_path = absolute_path(path);
if (std::filesystem::exists(fs_path)) {
return Status::IOError(fmt::format("{} exists", fs_path.native()));
return Status::IOError("{} exists", fs_path.native());
}
std::error_code ec;
std::filesystem::create_directories(fs_path, ec);
if (ec) {
return Status::IOError(
fmt::format("cannot create {}: {}", fs_path.native(), std::strerror(ec.value())));
return Status::IOError("cannot create {}: {}", fs_path.native(), std::strerror(ec.value()));
}
return Status::OK();
}
Status LocalFileSystem::delete_directory(const Path& path) {
auto fs_path = absolute_path(path);
if (!std::filesystem::exists(fs_path)) {
return Status::OK();
}
if (!std::filesystem::is_directory(fs_path)) {
return Status::IOError(fmt::format("{} is not a directory", fs_path.native()));
return Status::IOError("{} is not a directory", fs_path.native());
}
std::error_code ec;
std::filesystem::remove_all(fs_path, ec);
if (ec) {
return Status::IOError(
fmt::format("cannot delete {}: {}", fs_path.native(), std::strerror(ec.value())));
return Status::IOError("cannot delete {}: {}", fs_path.native(), std::strerror(ec.value()));
}
return Status::OK();
}
Status LocalFileSystem::link_file(const Path& src, const Path& dest) {
if (::link(src.c_str(), dest.c_str()) != 0) {
return Status::IOError(fmt::format("fail to create hard link: {}. from {} to {}",
std::strerror(errno), src.native(), dest.native()));
return Status::IOError("fail to create hard link: {}. from {} to {}", std::strerror(errno),
src.native(), dest.native());
}
return Status::OK();
}
@ -121,8 +123,8 @@ Status LocalFileSystem::file_size(const Path& path, size_t* file_size) const {
std::error_code ec;
*file_size = std::filesystem::file_size(fs_path, ec);
if (ec) {
return Status::IOError(fmt::format("cannot get file size {}: {}", fs_path.native(),
std::strerror(ec.value())));
return Status::IOError("cannot get file size {}: {}", fs_path.native(),
std::strerror(ec.value()));
}
return Status::OK();
}
@ -135,8 +137,7 @@ Status LocalFileSystem::list(const Path& path, std::vector<Path>* files) {
files->push_back(entry.path().filename());
}
if (ec) {
return Status::IOError(
fmt::format("cannot list {}: {}", fs_path.native(), std::strerror(ec.value())));
return Status::IOError("cannot list {}: {}", fs_path.native(), std::strerror(ec.value()));
}
return Status::OK();
}

View File

@ -37,12 +37,10 @@ Status sync_dir(const io::Path& dirname) {
int fd;
RETRY_ON_EINTR(fd, ::open(dirname.c_str(), O_DIRECTORY | O_RDONLY));
if (-1 == fd) {
return Status::IOError(
fmt::format("cannot open {}: {}", dirname.native(), std::strerror(errno)));
return Status::IOError("cannot open {}: {}", dirname.native(), std::strerror(errno));
}
if (0 != ::fdatasync(fd)) {
return Status::IOError(
fmt::format("cannot fdatasync {}: {}", dirname.native(), std::strerror(errno)));
return Status::IOError("cannot fdatasync {}: {}", dirname.native(), std::strerror(errno));
}
::close(fd);
return Status::OK();
@ -102,8 +100,7 @@ Status LocalFileWriter::appendv(const Slice* data, size_t data_cnt) {
ssize_t res;
RETRY_ON_EINTR(res, ::writev(_fd, iov + completed_iov, iov_count));
if (UNLIKELY(res < 0)) {
return Status::IOError(
fmt::format("cannot write to {}: {}", _path.native(), std::strerror(errno)));
return Status::IOError("cannot write to {}: {}", _path.native(), std::strerror(errno));
}
if (LIKELY(res == n_left)) {
@ -139,8 +136,7 @@ Status LocalFileWriter::finalize() {
#if defined(__linux__)
int flags = SYNC_FILE_RANGE_WRITE;
if (sync_file_range(_fd, 0, 0, flags) < 0) {
return Status::IOError(
fmt::format("cannot sync {}: {}", _path.native(), std::strerror(errno)));
return Status::IOError("cannot sync {}: {}", _path.native(), std::strerror(errno));
}
#endif
}
@ -153,8 +149,7 @@ Status LocalFileWriter::_close(bool sync) {
}
if (sync && _dirty) {
if (0 != ::fdatasync(_fd)) {
return Status::IOError(
fmt::format("cannot fdatasync {}: {}", _path.native(), std::strerror(errno)));
return Status::IOError("cannot fdatasync {}: {}", _path.native(), std::strerror(errno));
}
RETURN_IF_ERROR(detail::sync_dir(_path.parent_path()));
_dirty = false;
@ -166,8 +161,7 @@ Status LocalFileWriter::_close(bool sync) {
DorisMetrics::instance()->local_bytes_written_total->increment(_bytes_appended);
if (0 != ::close(_fd)) {
return Status::IOError(
fmt::format("cannot close {}: {}", _path.native(), std::strerror(errno)));
return Status::IOError("cannot close {}: {}", _path.native(), std::strerror(errno));
}
return Status::OK();
}
@ -182,8 +176,7 @@ Status LocalFileWriter::write_at(size_t offset, const Slice& data) {
while (bytes_req != 0) {
auto res = ::pwrite(_fd, from, bytes_req, offset);
if (-1 == res && errno != EINTR) {
return Status::IOError(
fmt::format("cannot write to {}: {}", _path.native(), std::strerror(errno)));
return Status::IOError("cannot write to {}: {}", _path.native(), std::strerror(errno));
}
if (res > 0) {
from += res;

View File

@ -52,9 +52,8 @@ Status S3FileReader::close() {
Status S3FileReader::read_at(size_t offset, Slice result, size_t* bytes_read) {
DCHECK(!closed());
if (offset > _file_size) {
return Status::IOError(
fmt::format("offset exceeds file size(offset: {), file size: {}, path: {})", offset,
_file_size, _path.native()));
return Status::IOError("offset exceeds file size(offset: {), file size: {}, path: {})",
offset, _file_size, _path.native());
}
size_t bytes_req = result.size;
char* to = result.data;
@ -75,13 +74,13 @@ Status S3FileReader::read_at(size_t offset, Slice result, size_t* bytes_read) {
}
auto outcome = client->GetObject(request);
if (!outcome.IsSuccess()) {
return Status::IOError(fmt::format("failed to read from {}: {}", _path.native(),
outcome.GetError().GetMessage()));
return Status::IOError("failed to read from {}: {}", _path.native(),
outcome.GetError().GetMessage());
}
*bytes_read = outcome.GetResult().GetContentLength();
if (*bytes_read != bytes_req) {
return Status::IOError(fmt::format("failed to read from {}(bytes read: {}, bytes req: {})",
_path.native(), *bytes_read, bytes_req));
return Status::IOError("failed to read from {}(bytes read: {}, bytes req: {})",
_path.native(), *bytes_read, bytes_req);
}
DorisMetrics::instance()->s3_bytes_read_total->increment(*bytes_read);
return Status::OK();

View File

@ -20,7 +20,9 @@
#include <aws/core/utils/threading/Executor.h>
#include <aws/s3/S3Client.h>
#include <aws/s3/model/DeleteObjectRequest.h>
#include <aws/s3/model/DeleteObjectsRequest.h>
#include <aws/s3/model/HeadObjectRequest.h>
#include <aws/s3/model/ListObjectsV2Request.h>
#include <aws/s3/model/PutObjectRequest.h>
#include <aws/transfer/TransferManager.h>
@ -153,12 +155,13 @@ Status S3FileSystem::delete_file(const Path& path) {
request.WithBucket(_s3_conf.bucket).WithKey(key);
auto outcome = client->DeleteObject(request);
if (!outcome.IsSuccess()) {
return Status::IOError("failed to delete object(endpoint={}, bucket={}, key={}): {}",
_s3_conf.endpoint, _s3_conf.bucket, key,
outcome.GetError().GetMessage());
if (outcome.IsSuccess() ||
outcome.GetError().GetResponseCode() == Aws::Http::HttpResponseCode::NOT_FOUND) {
return Status::OK();
}
return Status::OK();
return Status::IOError("failed to delete object(endpoint={}, bucket={}, key={}): {}",
_s3_conf.endpoint, _s3_conf.bucket, key,
outcome.GetError().GetMessage());
}
Status S3FileSystem::create_directory(const Path& path) {
@ -166,7 +169,54 @@ Status S3FileSystem::create_directory(const Path& path) {
}
Status S3FileSystem::delete_directory(const Path& path) {
return Status::NotSupported("not support");
auto client = get_client();
CHECK_S3_CLIENT(client);
Aws::S3::Model::ListObjectsV2Request request;
auto prefix = get_key(path);
request.WithBucket(_s3_conf.bucket).WithPrefix(prefix);
Aws::S3::Model::DeleteObjectsRequest delete_request;
delete_request.SetBucket(_s3_conf.bucket);
bool is_trucated = false;
do {
auto outcome = client->ListObjectsV2(request);
if (!outcome.IsSuccess()) {
return Status::IOError("failed to list objects(endpoint={}, bucket={}, prefix={}): {}",
_s3_conf.endpoint, _s3_conf.bucket, prefix,
outcome.GetError().GetMessage());
}
const auto& result = outcome.GetResult();
Aws::Vector<Aws::S3::Model::ObjectIdentifier> objects;
objects.reserve(result.GetContents().size());
for (const auto& obj : result.GetContents()) {
objects.emplace_back().SetKey(obj.GetKey());
}
if (!objects.empty()) {
Aws::S3::Model::Delete del;
del.WithObjects(std::move(objects)).SetQuiet(true);
delete_request.SetDelete(std::move(del));
auto delete_outcome = client->DeleteObjects(delete_request);
if (!delete_outcome.IsSuccess()) {
return Status::IOError(
"failed to delete objects(endpoint={}, bucket={}, prefix={}): {}",
_s3_conf.endpoint, _s3_conf.bucket, prefix,
delete_outcome.GetError().GetMessage());
}
if (!delete_outcome.GetResult().GetErrors().empty()) {
const auto& e = delete_outcome.GetResult().GetErrors().front();
return Status::IOError("fail to delete object(endpoint={}, bucket={}, key={}): {}",
_s3_conf.endpoint, _s3_conf.bucket, e.GetKey(),
e.GetMessage());
}
VLOG_TRACE << "delete " << objects.size()
<< " s3 objects, endpoint: " << _s3_conf.endpoint
<< ", bucket: " << _s3_conf.bucket << ", prefix: " << _s3_conf.prefix;
}
is_trucated = result.GetIsTruncated();
request.SetContinuationToken(result.GetNextContinuationToken());
} while (is_trucated);
return Status::OK();
}
Status S3FileSystem::link_file(const Path& src, const Path& dest) {
@ -181,7 +231,7 @@ Status S3FileSystem::exists(const Path& path, bool* res) const {
auto key = get_key(path);
request.WithBucket(_s3_conf.bucket).WithKey(key);
auto outcome = _client->HeadObject(request);
auto outcome = client->HeadObject(request);
if (outcome.IsSuccess()) {
*res = true;
} else if (outcome.GetError().GetResponseCode() == Aws::Http::HttpResponseCode::NOT_FOUND) {
@ -202,7 +252,7 @@ Status S3FileSystem::file_size(const Path& path, size_t* file_size) const {
auto key = get_key(path);
request.WithBucket(_s3_conf.bucket).WithKey(key);
auto outcome = _client->HeadObject(request);
auto outcome = client->HeadObject(request);
if (outcome.IsSuccess()) {
*file_size = outcome.GetResult().GetContentLength();
} else {

View File

@ -46,6 +46,7 @@ public:
Status create_directory(const Path& path) override;
// Delete all objects start with path.
Status delete_directory(const Path& path) override;
Status link_file(const Path& src, const Path& dest) override;

View File

@ -92,6 +92,26 @@ Status BaseCompaction::pick_rowsets_to_compact() {
RETURN_NOT_OK(check_version_continuity(_input_rowsets));
RETURN_NOT_OK(_check_rowset_overlapping(_input_rowsets));
// If there are delete predicate rowsets in tablet, start_version > 0 implies some rowsets before
// delete version cannot apply these delete predicates, which can cause incorrect query result.
// So we must abort this base compaction.
// A typical scenario is that some rowsets before cumulative point are on remote storage.
if (_input_rowsets.front()->start_version() > 0) {
bool has_delete_predicate = false;
for (const auto& rs : _input_rowsets) {
if (rs->rowset_meta()->has_delete_predicate()) {
has_delete_predicate = true;
break;
}
}
if (has_delete_predicate) {
LOG(WARNING)
<< "Some rowsets cannot apply delete predicates in base compaction. tablet_id="
<< _tablet->tablet_id();
return Status::OLAPInternalError(OLAP_ERR_BE_NO_SUITABLE_VERSION);
}
}
if (_input_rowsets.size() == 2 && _input_rowsets[0]->end_version() == 1) {
// the tablet is with rowset: [0-1], [2-y]
// and [0-1] has no data. in this situation, no need to do base compaction.

View File

@ -18,6 +18,7 @@
#pragma once
#include <memory>
#include <string>
#include "olap/olap_define.h"
#include "olap/tablet_meta.h"
@ -58,11 +59,9 @@ public:
int16_t shard_id() const;
bool equal(int64_t tablet_id, int32_t schema_hash);
const io::ResourceId& cooldown_resource() const { return _tablet_meta->cooldown_resource(); }
const std::string& storage_policy() const { return _tablet_meta->storage_policy(); }
void set_cooldown_resource(io::ResourceId resource) {
_tablet_meta->set_cooldown_resource(std::move(resource));
}
void set_storage_policy(const std::string& policy) { _tablet_meta->set_storage_policy(policy); }
// properties encapsulated in TabletSchema
virtual const TabletSchema& tablet_schema() const;

View File

@ -463,10 +463,9 @@ void NumBasedCumulativeCompactionPolicy::calculate_cumulative_point(
void CumulativeCompactionPolicy::pick_candidate_rowsets(
const std::unordered_map<Version, RowsetSharedPtr, HashOfVersion>& rs_version_map,
int64_t cumulative_point, std::vector<RowsetSharedPtr>* candidate_rowsets) {
for (auto& it : rs_version_map) {
// find all rowset version greater than cumulative_point and skip the create time in skip_window_sec
if (it.first.first >= cumulative_point && it.second->is_local()) {
candidate_rowsets->push_back(it.second);
for (const auto& [version, rs] : rs_version_map) {
if (version.first >= cumulative_point && rs->is_local()) {
candidate_rowsets->push_back(rs);
}
}
std::sort(candidate_rowsets->begin(), candidate_rowsets->end(), Rowset::comparator);

View File

@ -40,6 +40,7 @@
#include "io/fs/path.h"
#include "olap/file_helper.h"
#include "olap/olap_define.h"
#include "olap/rowset/beta_rowset.h"
#include "olap/rowset/rowset_meta_manager.h"
#include "olap/storage_engine.h"
#include "olap/tablet_meta_manager.h"
@ -820,4 +821,65 @@ Status DataDir::move_to_trash(const std::string& tablet_path) {
return Status::OK();
}
void DataDir::perform_remote_rowset_gc() {
std::vector<std::pair<std::string, std::string>> gc_kvs;
auto traverse_remote_rowset_func = [&gc_kvs](const std::string& key,
const std::string& value) -> bool {
gc_kvs.emplace_back(key, value);
return true;
};
_meta->iterate(META_COLUMN_FAMILY_INDEX, REMOTE_ROWSET_GC_PREFIX, traverse_remote_rowset_func);
std::vector<std::string> deleted_keys;
for (auto& [key, val] : gc_kvs) {
auto rowset_id = key.substr(REMOTE_ROWSET_GC_PREFIX.size());
RemoteRowsetGcPB gc_pb;
gc_pb.ParseFromString(val);
auto fs = io::FileSystemMap::instance()->get(gc_pb.resource_id());
if (!fs) {
LOG(WARNING) << "Cannot get file system: " << gc_pb.resource_id();
continue;
}
DCHECK(fs->type() != io::FileSystemType::LOCAL);
Status st;
for (int i = 0; i < gc_pb.num_segments(); ++i) {
auto seg_path = BetaRowset::remote_segment_path(gc_pb.tablet_id(), rowset_id, i);
st = fs->delete_file(seg_path);
if (!st.ok()) {
LOG(WARNING) << st.to_string();
break;
}
}
if (st.ok()) {
deleted_keys.push_back(std::move(key));
}
}
for (const auto& key : deleted_keys) {
_meta->remove(META_COLUMN_FAMILY_INDEX, key);
}
}
void DataDir::perform_remote_tablet_gc() {
std::vector<std::pair<std::string, std::string>> tablet_gc_kvs;
auto traverse_remote_tablet_func = [&tablet_gc_kvs](const std::string& key,
const std::string& value) -> bool {
tablet_gc_kvs.emplace_back(key, value);
return true;
};
_meta->iterate(META_COLUMN_FAMILY_INDEX, REMOTE_TABLET_GC_PREFIX, traverse_remote_tablet_func);
std::vector<std::string> deleted_keys;
for (auto& [key, resource] : tablet_gc_kvs) {
auto tablet_id = key.substr(REMOTE_TABLET_GC_PREFIX.size());
auto fs = io::FileSystemMap::instance()->get(resource);
auto st = fs->delete_directory(DATA_PREFIX + "/" + tablet_id);
if (st.ok()) {
deleted_keys.push_back(std::move(key));
} else {
LOG(WARNING) << st;
}
}
for (const auto& key : deleted_keys) {
_meta->remove(META_COLUMN_FAMILY_INDEX, key);
}
}
} // namespace doris

View File

@ -116,6 +116,10 @@ public:
void perform_path_gc_by_tablet();
void perform_remote_rowset_gc();
void perform_remote_tablet_gc();
// check if the capacity reach the limit after adding the incoming data
// return true if limit reached, otherwise, return false.
// TODO(cmy): for now we can not precisely calculate the capacity Doris used,

View File

@ -127,6 +127,8 @@ const std::string TABLET_ID_KEY = "tablet_id";
const std::string ENABLE_BYTE_TO_BASE64 = "byte_to_base64";
const std::string TABLET_ID_PREFIX = "t_";
const std::string ROWSET_ID_PREFIX = "s_";
const std::string REMOTE_ROWSET_GC_PREFIX = "gc_";
const std::string REMOTE_TABLET_GC_PREFIX = "tgc_";
#if defined(__GNUC__)
#define OLAP_LIKELY(x) __builtin_expect((x), 1)

View File

@ -46,6 +46,12 @@ std::string BetaRowset::local_segment_path(const std::string& tablet_path,
return fmt::format("{}/{}_{}.dat", tablet_path, rowset_id.to_string(), segment_id);
}
std::string BetaRowset::remote_segment_path(int64_t tablet_id, const std::string& rowset_id,
int segment_id) {
// data/{tablet_id}/{rowset_id}_{seg_num}.dat
return fmt::format("{}/{}/{}_{}.dat", DATA_PREFIX, tablet_id, rowset_id, segment_id);
}
std::string BetaRowset::remote_segment_path(int64_t tablet_id, const RowsetId& rowset_id,
int segment_id) {
// data/{tablet_id}/{rowset_id}_{seg_num}.dat

View File

@ -19,6 +19,7 @@
#define DORIS_SRC_OLAP_ROWSET_BETA_ROWSET_H_
#include <cstdint>
#include <string>
#include "olap/olap_common.h"
#include "olap/olap_define.h"
@ -49,6 +50,9 @@ public:
static std::string remote_segment_path(int64_t tablet_id, const RowsetId& rowset_id,
int segment_id);
static std::string remote_segment_path(int64_t tablet_id, const std::string& rowset_id,
int segment_id);
Status split_range(const RowCursor& start_key, const RowCursor& end_key,
uint64_t request_block_row_count, size_t key_num,
std::vector<OlapTuple>* ranges) override;

View File

@ -28,12 +28,14 @@
#include <map>
#include <set>
#include "common/status.h"
#include "env/env.h"
#include "gen_cpp/Types_constants.h"
#include "olap/rowset/rowset.h"
#include "olap/rowset/rowset_factory.h"
#include "olap/rowset/rowset_writer.h"
#include "olap/storage_engine.h"
#include "olap/tablet_meta.h"
#include "runtime/thread_context.h"
using std::filesystem::path;
@ -360,6 +362,9 @@ Status SnapshotManager::_create_snapshot_files(const TabletSharedPtr& ref_tablet
/// make the full snapshot of the tablet.
{
std::shared_lock rdlock(ref_tablet->get_header_lock());
if (ref_tablet->tablet_state() == TABLET_SHUTDOWN) {
return Status::Aborted("tablet has shutdown");
}
if (request.__isset.missing_version) {
for (int64_t missed_version : request.missing_version) {
Version version = {missed_version, missed_version};
@ -422,6 +427,13 @@ Status SnapshotManager::_create_snapshot_files(const TabletSharedPtr& ref_tablet
CHECK(res.ok()) << res;
ref_tablet->generate_tablet_meta_copy_unlocked(new_tablet_meta);
}
{
std::unique_lock wlock(ref_tablet->get_header_lock());
if (ref_tablet->tablet_state() == TABLET_SHUTDOWN) {
return Status::Aborted("tablet has shutdown");
}
ref_tablet->update_self_owned_remote_rowsets(consistent_rowsets);
}
std::vector<RowsetMetaSharedPtr> rs_metas;
for (auto& rs : consistent_rowsets) {

View File

@ -329,7 +329,7 @@ std::vector<DataDir*> StorageEngine::get_stores() {
stores.reserve(_store_map.size());
std::lock_guard<std::mutex> l(_store_lock);
if (include_unused) {
if constexpr (include_unused) {
for (auto& it : _store_map) {
stores.push_back(it.second);
}
@ -720,6 +720,12 @@ Status StorageEngine::start_trash_sweep(double* usage, bool ignore_guard) {
// clean unused rowset metas in OlapMeta
_clean_unused_rowset_metas();
// clean unused rowsets in remote storage backends
for (auto data_dir : get_stores()) {
data_dir->perform_remote_rowset_gc();
data_dir->perform_remote_tablet_gc();
}
return res;
}

View File

@ -18,6 +18,7 @@
#include "olap/tablet.h"
#include <ctype.h>
#include <fmt/core.h>
#include <glog/logging.h>
#include <pthread.h>
#include <rapidjson/prettywriter.h>
@ -1145,10 +1146,8 @@ void Tablet::pick_candidate_rowsets_to_cumulative_compaction(
void Tablet::pick_candidate_rowsets_to_base_compaction(vector<RowsetSharedPtr>* candidate_rowsets) {
std::shared_lock rdlock(_meta_lock);
// FIXME(cyx): If there are delete predicate rowsets in tablet,
// remote rowsets cannot apply these delete predicate, which can cause
// incorrect query result.
for (auto& it : _rs_version_map) {
// Do compaction on local rowsets only.
if (it.first.first < _cumulative_point && it.second->is_local()) {
candidate_rowsets->push_back(it.second);
}
@ -1702,7 +1701,7 @@ Status Tablet::cooldown() {
LOG(WARNING) << "Failed to own cumu_compaction_lock. tablet=" << tablet_id();
return Status::OLAPInternalError(OLAP_ERR_BE_TRY_BE_LOCK_ERROR);
}
auto dest_fs = io::FileSystemMap::instance()->get(cooldown_resource());
auto dest_fs = io::FileSystemMap::instance()->get(storage_policy());
if (!dest_fs) {
return Status::OLAPInternalError(OLAP_ERR_NOT_INITED);
}
@ -1716,8 +1715,13 @@ Status Tablet::cooldown() {
auto start = std::chrono::steady_clock::now();
RETURN_IF_ERROR(old_rowset->upload_to(reinterpret_cast<io::RemoteFileSystem*>(dest_fs.get()),
new_rowset_id));
auto st = old_rowset->upload_to(reinterpret_cast<io::RemoteFileSystem*>(dest_fs.get()),
new_rowset_id);
if (!st.ok()) {
record_unused_remote_rowset(new_rowset_id, dest_fs->resource_id(),
old_rowset->num_segments());
return st;
}
auto duration = std::chrono::duration<float>(std::chrono::steady_clock::now() - start);
LOG(INFO) << "Upload rowset " << old_rowset->version() << " " << new_rowset_id.to_string()
@ -1732,14 +1736,30 @@ Status Tablet::cooldown() {
new_rowset_meta->set_fs(dest_fs);
new_rowset_meta->set_creation_time(time(nullptr));
RowsetSharedPtr new_rowset;
RowsetFactory::create_rowset(&_schema, _tablet_path, std::move(new_rowset_meta), &new_rowset);
RowsetFactory::create_rowset(&_schema, _tablet_path, new_rowset_meta, &new_rowset);
std::vector to_add {std::move(new_rowset)};
std::vector to_delete {std::move(old_rowset)};
std::unique_lock meta_wlock(_meta_lock);
modify_rowsets(to_add, to_delete);
save_meta();
bool has_shutdown = false;
{
std::unique_lock meta_wlock(_meta_lock);
has_shutdown = tablet_state() == TABLET_SHUTDOWN;
if (!has_shutdown) {
modify_rowsets(to_add, to_delete);
if (new_rowset_meta->has_delete_predicate()) {
add_delete_predicate(new_rowset_meta->delete_predicate(),
new_rowset_meta->start_version());
}
_self_owned_remote_rowsets.insert(to_add.front());
save_meta();
}
}
if (has_shutdown) {
record_unused_remote_rowset(new_rowset_id, dest_fs->resource_id(),
to_add.front()->num_segments());
return Status::Aborted("tablet {} has shutdown", tablet_id());
}
return Status::OK();
}
@ -1763,13 +1783,13 @@ RowsetSharedPtr Tablet::pick_cooldown_rowset() {
bool Tablet::need_cooldown(int64_t* cooldown_timestamp, size_t* file_size) {
// std::shared_lock meta_rlock(_meta_lock);
if (cooldown_resource().empty()) {
if (storage_policy().empty()) {
VLOG_DEBUG << "tablet does not need cooldown, tablet id: " << tablet_id();
return false;
}
auto policy = ExecEnv::GetInstance()->storage_policy_mgr()->get(cooldown_resource());
auto policy = ExecEnv::GetInstance()->storage_policy_mgr()->get(storage_policy());
if (!policy) {
LOG(WARNING) << "Cannot get storage policy: " << cooldown_resource();
LOG(WARNING) << "Cannot get storage policy: " << storage_policy();
return false;
}
auto cooldown_ttl_sec = policy->cooldown_ttl;
@ -1817,28 +1837,26 @@ bool Tablet::need_cooldown(int64_t* cooldown_timestamp, size_t* file_size) {
return false;
}
void Tablet::remove_all_remote_rowsets() {
std::unique_lock meta_wlock(_meta_lock);
DCHECK(_state == TabletState::TABLET_SHUTDOWN);
Status st;
for (auto& it : _rs_version_map) {
auto& rs = it.second;
if (!rs->is_local()) {
st = rs->remove();
LOG_IF(WARNING, !st.ok()) << "Failed to remove rowset " << rs->version() << " "
<< rs->rowset_id().to_string() << " in tablet " << tablet_id()
<< ": " << st.to_string();
}
}
for (auto& it : _stale_rs_version_map) {
auto& rs = it.second;
if (!rs->is_local()) {
st = rs->remove();
LOG_IF(WARNING, !st.ok()) << "Failed to remove rowset " << rs->version() << " "
<< rs->rowset_id().to_string() << " in tablet " << tablet_id()
<< ": " << st.to_string();
}
void Tablet::record_unused_remote_rowset(const RowsetId& rowset_id, const io::ResourceId& resource,
int64_t num_segments) {
auto gc_key = REMOTE_ROWSET_GC_PREFIX + rowset_id.to_string();
RemoteRowsetGcPB gc_pb;
gc_pb.set_resource_id(resource);
gc_pb.set_tablet_id(tablet_id());
gc_pb.set_num_segments(num_segments);
WARN_IF_ERROR(
_data_dir->get_meta()->put(META_COLUMN_FAMILY_INDEX, gc_key, gc_pb.SerializeAsString()),
fmt::format("Failed to record unused remote rowset(tablet id: {}, rowset id: {})",
tablet_id(), rowset_id.to_string()));
}
Status Tablet::remove_all_remote_rowsets() {
DCHECK(_state == TABLET_SHUTDOWN);
if (storage_policy().empty()) {
return Status::OK();
}
auto tablet_gc_key = REMOTE_TABLET_GC_PREFIX + std::to_string(tablet_id());
return _data_dir->get_meta()->put(META_COLUMN_FAMILY_INDEX, tablet_gc_key, storage_policy());
}
const TabletSchema& Tablet::tablet_schema() const {
@ -1887,4 +1905,28 @@ Status Tablet::lookup_row_key(const Slice& encoded_key, RowLocation* row_locatio
return Status::NotFound("can't find key in all rowsets");
}
void Tablet::remove_self_owned_remote_rowsets() {
DCHECK(_state == TABLET_SHUTDOWN);
for (const auto& rs : _self_owned_remote_rowsets) {
DCHECK(!rs->is_local());
record_unused_remote_rowset(rs->rowset_id(), rs->rowset_meta()->resource_id(),
rs->num_segments());
}
}
void Tablet::update_self_owned_remote_rowsets(
const std::vector<RowsetSharedPtr>& rowsets_in_snapshot) {
if (_self_owned_remote_rowsets.empty()) {
return;
}
for (const auto& rs : rowsets_in_snapshot) {
if (!rs->is_local()) {
auto it = _self_owned_remote_rowsets.find(rs);
if (it != _self_owned_remote_rowsets.end()) {
_self_owned_remote_rowsets.erase(it);
}
}
}
}
} // namespace doris

View File

@ -22,6 +22,7 @@
#include <set>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <vector>
#include "gen_cpp/AgentService_types.h"
@ -30,6 +31,7 @@
#include "olap/base_tablet.h"
#include "olap/cumulative_compaction_policy.h"
#include "olap/data_dir.h"
#include "olap/olap_common.h"
#include "olap/olap_define.h"
#include "olap/rowset/rowset.h"
#include "olap/rowset/rowset_reader.h"
@ -298,14 +300,22 @@ public:
bool need_cooldown(int64_t* cooldown_timestamp, size_t* file_size);
// Physically remove remote rowsets.
void remove_all_remote_rowsets();
Status remove_all_remote_rowsets();
// Lookup the row location of `encoded_key`, the function sets `row_location` on success.
// NOTE: the method only works in unique key model with primary key index, you will got a
// not supported error in other data model.
Status lookup_row_key(const Slice& encoded_key, RowLocation* row_location, uint32_t version);
void remove_self_owned_remote_rowsets();
// Erase entries in `_self_owned_remote_rowsets` iff they are in `rowsets_in_snapshot`.
// REQUIRES: held _meta_lock
void update_self_owned_remote_rowsets(const std::vector<RowsetSharedPtr>& rowsets_in_snapshot);
void record_unused_remote_rowset(const RowsetId& rowset_id, const io::ResourceId& resource,
int64_t num_segments);
private:
Status _init_once_action();
void _print_missed_versions(const std::vector<Version>& missed_versions) const;
@ -397,6 +407,9 @@ private:
int64_t _last_missed_version;
int64_t _last_missed_time_s;
// Remote rowsets not shared by other BE. We can delete them when drop tablet.
std::unordered_set<RowsetSharedPtr> _self_owned_remote_rowsets; // guarded by _meta_lock
DISALLOW_COPY_AND_ASSIGN(Tablet);
public:

View File

@ -194,7 +194,7 @@ Status TabletManager::_add_tablet_to_map_unlocked(TTabletId tablet_id,
// If the new tablet is fresher than the existing one, then replace
// the existing tablet with the new one.
// Use default replica_id to ignore whether replica_id is match when drop tablet.
RETURN_NOT_OK_LOG(_drop_tablet_unlocked(tablet_id, /* replica_id */ 0, keep_files),
RETURN_NOT_OK_LOG(_drop_tablet_unlocked(tablet_id, /* replica_id */ 0, keep_files, false),
strings::Substitute("failed to drop old tablet when add new tablet. "
"tablet_id=$0",
tablet_id));
@ -356,7 +356,7 @@ TabletSharedPtr TabletManager::_internal_create_tablet_unlocked(
}
// something is wrong, we need clear environment
if (is_tablet_added) {
Status status = _drop_tablet_unlocked(new_tablet_id, request.replica_id, false);
Status status = _drop_tablet_unlocked(new_tablet_id, request.replica_id, false, false);
if (!status.ok()) {
LOG(WARNING) << "fail to drop tablet when create tablet failed. res=" << res;
}
@ -425,7 +425,8 @@ TabletSharedPtr TabletManager::_create_tablet_meta_and_dir_unlocked(
return nullptr;
}
Status TabletManager::drop_tablet(TTabletId tablet_id, TReplicaId replica_id, bool keep_files) {
Status TabletManager::drop_tablet(TTabletId tablet_id, TReplicaId replica_id,
bool is_drop_table_or_partition) {
auto& shard = _get_tablets_shard(tablet_id);
std::lock_guard wrlock(shard.lock);
if (shard.tablets_under_clone.count(tablet_id) > 0) {
@ -433,12 +434,12 @@ Status TabletManager::drop_tablet(TTabletId tablet_id, TReplicaId replica_id, bo
return Status::Aborted("aborted");
}
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
return _drop_tablet_unlocked(tablet_id, replica_id, keep_files);
return _drop_tablet_unlocked(tablet_id, replica_id, false, is_drop_table_or_partition);
}
// Drop specified tablet.
Status TabletManager::_drop_tablet_unlocked(TTabletId tablet_id, TReplicaId replica_id,
bool keep_files) {
bool keep_files, bool is_drop_table_or_partition) {
LOG(INFO) << "begin drop tablet. tablet_id=" << tablet_id << ", replica_id=" << replica_id;
DorisMetrics::instance()->drop_tablet_requests_total->increment(1);
@ -472,6 +473,13 @@ Status TabletManager::_drop_tablet_unlocked(TTabletId tablet_id, TReplicaId repl
// and the tablet will be loaded at restart time.
// To avoid this exception, we first set the state of the tablet to `SHUTDOWN`.
to_drop_tablet->set_tablet_state(TABLET_SHUTDOWN);
// We must record unused remote rowsets path info to OlapMeta before tablet state is marked as TABLET_SHUTDOWN in OlapMeta,
// otherwise if BE shutdown after saving tablet state, these remote rowsets path info will lost.
if (is_drop_table_or_partition) {
RETURN_IF_ERROR(to_drop_tablet->remove_all_remote_rowsets());
} else {
to_drop_tablet->remove_self_owned_remote_rowsets();
}
to_drop_tablet->save_meta();
{
std::lock_guard<std::shared_mutex> wrdlock(_shutdown_tablets_lock);

View File

@ -61,12 +61,9 @@ public:
// task to be fail, even if there is enough space on other disks
Status create_tablet(const TCreateTabletReq& request, std::vector<DataDir*> stores);
// Drop a tablet by description
// If set keep_files == true, files will NOT be deleted when deconstruction.
// Return OLAP_SUCCESS, if run ok
// OLAP_ERR_TABLE_DELETE_NOEXIST_ERROR, if tablet not exist
// Status::OLAPInternalError(OLAP_ERR_NOT_INITED), if not inited
Status drop_tablet(TTabletId tablet_id, TReplicaId replica_id, bool keep_files = false);
// Drop a tablet by description.
// If `is_drop_table_or_partition` is true, we need to remove all remote rowsets in this tablet.
Status drop_tablet(TTabletId tablet_id, TReplicaId replica_id, bool is_drop_table_or_partition);
Status drop_tablets_on_error_root_path(const std::vector<TabletInfo>& tablet_info_vec);
@ -156,7 +153,8 @@ private:
bool _check_tablet_id_exist_unlocked(TTabletId tablet_id);
Status _drop_tablet_unlocked(TTabletId tablet_id, TReplicaId replica_id, bool keep_files);
Status _drop_tablet_unlocked(TTabletId tablet_id, TReplicaId replica_id, bool keep_files,
bool is_drop_table_or_partition);
TabletSharedPtr _get_tablet_unlocked(TTabletId tablet_id);
TabletSharedPtr _get_tablet_unlocked(TTabletId tablet_id, bool include_deleted,

View File

@ -206,7 +206,7 @@ TabletMeta::TabletMeta(const TabletMeta& b)
_del_predicates(b._del_predicates),
_in_restore_mode(b._in_restore_mode),
_preferred_rowset_type(b._preferred_rowset_type),
_cooldown_resource(b._cooldown_resource),
_storage_policy(b._storage_policy),
_delete_bitmap(b._delete_bitmap) {};
void TabletMeta::init_column_from_tcolumn(uint32_t unique_id, const TColumn& tcolumn,
@ -461,7 +461,7 @@ void TabletMeta::init_from_pb(const TabletMetaPB& tablet_meta_pb) {
_preferred_rowset_type = tablet_meta_pb.preferred_rowset_type();
}
_cooldown_resource = tablet_meta_pb.storage_policy();
_storage_policy = tablet_meta_pb.storage_policy();
if (tablet_meta_pb.has_enable_unique_key_merge_on_write()) {
_enable_unique_key_merge_on_write = tablet_meta_pb.enable_unique_key_merge_on_write();
}
@ -528,7 +528,7 @@ void TabletMeta::to_meta_pb(TabletMetaPB* tablet_meta_pb) {
tablet_meta_pb->set_preferred_rowset_type(_preferred_rowset_type);
}
tablet_meta_pb->set_storage_policy(_cooldown_resource);
tablet_meta_pb->set_storage_policy(_storage_policy);
tablet_meta_pb->set_enable_unique_key_merge_on_write(_enable_unique_key_merge_on_write);
{
@ -753,7 +753,7 @@ bool operator==(const TabletMeta& a, const TabletMeta& b) {
}
if (a._in_restore_mode != b._in_restore_mode) return false;
if (a._preferred_rowset_type != b._preferred_rowset_type) return false;
if (a._cooldown_resource != b._cooldown_resource) return false;
if (a._storage_policy != b._storage_policy) return false;
return true;
}

View File

@ -26,7 +26,6 @@
#include "common/logging.h"
#include "gen_cpp/olap_file.pb.h"
#include "io/fs/file_system.h"
#include "olap/delete_handler.h"
#include "olap/olap_common.h"
#include "olap/olap_define.h"
@ -186,23 +185,24 @@ public:
bool all_beta() const;
const io::ResourceId& cooldown_resource() const {
const std::string& storage_policy() const {
std::shared_lock<std::shared_mutex> rlock(_meta_lock);
return _cooldown_resource;
return _storage_policy;
}
void set_cooldown_resource(io::ResourceId resource) {
void set_storage_policy(const std::string& policy) {
std::unique_lock<std::shared_mutex> wlock(_meta_lock);
VLOG_NOTICE << "set tablet_id : " << _table_id << " cooldown resource from "
<< _cooldown_resource << " to " << resource;
_cooldown_resource = std::move(resource);
VLOG_NOTICE << "set tablet_id : " << _table_id << " storage policy from " << _storage_policy
<< " to " << policy;
_storage_policy = policy;
}
static void init_column_from_tcolumn(uint32_t unique_id, const TColumn& tcolumn,
ColumnPB* column);
DeleteBitmap& delete_bitmap() { return *_delete_bitmap; }
bool enable_unique_key_merge_on_write() { return _enable_unique_key_merge_on_write; }
bool enable_unique_key_merge_on_write() const { return _enable_unique_key_merge_on_write; }
private:
Status _save_meta(DataDir* data_dir);
@ -238,8 +238,7 @@ private:
bool _in_restore_mode = false;
RowsetTypePB _preferred_rowset_type = BETA_ROWSET;
// FIXME(cyx): Currently `cooldown_resource` is equivalent to `storage_policy`.
io::ResourceId _cooldown_resource;
std::string _storage_policy;
// For unique key data model, the feature Merge-on-Write will leverage a primary
// key index and a delete-bitmap to mark duplicate keys as deleted in load stage,

View File

@ -249,7 +249,7 @@ void EngineCloneTask::_set_tablet_info(Status status, bool is_new_tablet) {
<< ", signature:" << _signature << ", version:" << tablet_info.version
<< ", expected_version: " << _clone_req.committed_version;
Status drop_status = StorageEngine::instance()->tablet_manager()->drop_tablet(
_clone_req.tablet_id, _clone_req.replica_id);
_clone_req.tablet_id, _clone_req.replica_id, false);
if (drop_status != Status::OK() &&
drop_status.precise_code() != OLAP_ERR_TABLE_NOT_FOUND) {
// just log

View File

@ -197,6 +197,7 @@ set(OLAP_TEST_FILES
# olap/push_handler_test.cpp
olap/tablet_cooldown_test.cpp
olap/rowid_conversion_test.cpp
olap/remote_rowset_gc_test.cpp
)
set(RUNTIME_TEST_FILES

View File

@ -278,7 +278,7 @@ protected:
tablet.reset();
dup_tablet.reset();
StorageEngine::instance()->tablet_manager()->drop_tablet(_create_tablet.tablet_id,
_create_tablet.replica_id);
_create_tablet.replica_id, false);
EXPECT_TRUE(FileUtils::remove_all(config::storage_root_path).ok());
}
@ -443,8 +443,8 @@ protected:
void TearDown() {
// Remove all dir.
tablet.reset();
k_engine->tablet_manager()->drop_tablet(_create_tablet.tablet_id,
_create_tablet.replica_id);
k_engine->tablet_manager()->drop_tablet(_create_tablet.tablet_id, _create_tablet.replica_id,
false);
EXPECT_TRUE(FileUtils::remove_all(config::storage_root_path).ok());
}
@ -820,7 +820,7 @@ protected:
tablet.reset();
_delete_handler.finalize();
StorageEngine::instance()->tablet_manager()->drop_tablet(_create_tablet.tablet_id,
_create_tablet.replica_id);
_create_tablet.replica_id, false);
EXPECT_TRUE(FileUtils::remove_all(config::storage_root_path).ok());
}

View File

@ -406,7 +406,7 @@ TEST_F(TestDeltaWriter, open) {
EXPECT_EQ(Status::OK(), res);
SAFE_DELETE(delta_writer);
res = k_engine->tablet_manager()->drop_tablet(request.tablet_id, request.replica_id);
res = k_engine->tablet_manager()->drop_tablet(request.tablet_id, request.replica_id, false);
EXPECT_EQ(Status::OK(), res);
}
@ -527,7 +527,7 @@ TEST_F(TestDeltaWriter, write) {
}
EXPECT_EQ(1, tablet->num_rows());
res = k_engine->tablet_manager()->drop_tablet(request.tablet_id, request.replica_id);
res = k_engine->tablet_manager()->drop_tablet(request.tablet_id, request.replica_id, false);
EXPECT_EQ(Status::OK(), res);
delete delta_writer;
}
@ -673,7 +673,7 @@ TEST_F(TestDeltaWriter, vec_write) {
}
ASSERT_EQ(1, tablet->num_rows());
res = k_engine->tablet_manager()->drop_tablet(request.tablet_id, request.replica_id);
res = k_engine->tablet_manager()->drop_tablet(request.tablet_id, request.replica_id, false);
ASSERT_TRUE(res.ok());
delete delta_writer;
}
@ -740,7 +740,7 @@ TEST_F(TestDeltaWriter, sequence_col) {
}
EXPECT_EQ(1, tablet->num_rows());
res = k_engine->tablet_manager()->drop_tablet(request.tablet_id, request.replica_id);
res = k_engine->tablet_manager()->drop_tablet(request.tablet_id, request.replica_id, false);
EXPECT_EQ(Status::OK(), res);
delete delta_writer;
}
@ -833,7 +833,7 @@ TEST_F(TestDeltaWriter, vec_sequence_col) {
}
ASSERT_EQ(1, tablet->num_rows());
res = k_engine->tablet_manager()->drop_tablet(request.tablet_id, request.replica_id);
res = k_engine->tablet_manager()->drop_tablet(request.tablet_id, request.replica_id, false);
ASSERT_TRUE(res.ok());
delete delta_writer;
}

View File

@ -259,7 +259,7 @@ TEST_F(TestEngineStorageMigrationTask, write_and_migration) {
EXPECT_NE(tablet3, tablet);
// test case 2 end
res = k_engine->tablet_manager()->drop_tablet(request.tablet_id, request.replica_id);
res = k_engine->tablet_manager()->drop_tablet(request.tablet_id, request.replica_id, false);
EXPECT_EQ(Status::OK(), res);
delete delta_writer;
}

View File

@ -19,11 +19,12 @@
#include <memory>
#include "common/config.h"
#include "common/status.h"
#include "io/fs/file_system_map.h"
#include "io/fs/s3_file_system.h"
#include "olap/delta_writer.h"
#include "olap/snapshot_manager.h"
#include "olap/rowset/beta_rowset.h"
#include "olap/storage_engine.h"
#include "olap/tablet.h"
#include "runtime/descriptor_helper.h"
@ -35,47 +36,39 @@ namespace doris {
static StorageEngine* k_engine = nullptr;
static const std::string kTestDir = "./ut_dir/tablet_clone_test";
static std::string kSnapshotDir = "./ut_dir/tablet_clone_test/snapshot";
static const std::string kResourceId = "TabletCloneTest";
static const int64_t kTabletId = 10005;
static const int32_t KSchemaHash = 270068377;
static const std::string AK = "ak";
static const std::string SK = "sk";
static const std::string ENDPOINT = "endpoint";
static const std::string REGION = "region";
static const std::string BUCKET = "bucket";
static const std::string PREFIX = "prefix";
static const std::string kTestDir = "./ut_dir/remote_rowset_gc_test";
static const std::string kResourceId = "RemoteRowsetGcTest";
// remove DISABLED_ when need run this test
#define TabletCloneTest DISABLED_TabletCloneTest
#define private public
class TabletCloneTest : public testing::Test {
#define RemoteRowsetGcTest DISABLED_RemoteRowsetGcTest
class RemoteRowsetGcTest : public testing::Test {
public:
static void SetUpTestSuite() {
S3Conf s3_conf;
s3_conf.ak = AK;
s3_conf.sk = SK;
s3_conf.endpoint = ENDPOINT;
s3_conf.region = REGION;
s3_conf.bucket = BUCKET;
s3_conf.prefix = PREFIX;
s3_conf.ak = config::test_s3_ak;
s3_conf.sk = config::test_s3_sk;
s3_conf.endpoint = config::test_s3_endpoint;
s3_conf.region = config::test_s3_region;
s3_conf.bucket = config::test_s3_bucket;
s3_conf.prefix = "remote_rowset_gc_test";
auto s3_fs = std::make_shared<io::S3FileSystem>(std::move(s3_conf), kResourceId);
ASSERT_TRUE(s3_fs->connect().ok());
io::FileSystemMap::instance()->insert(kResourceId, s3_fs);
config::storage_root_path = kTestDir;
constexpr uint32_t MAX_PATH_LEN = 1024;
char buffer[MAX_PATH_LEN];
EXPECT_NE(getcwd(buffer, MAX_PATH_LEN), nullptr);
config::storage_root_path = std::string(buffer) + "/" + kTestDir;
config::min_file_descriptor_number = 1000;
FileUtils::remove_all(kTestDir);
FileUtils::create_dir(kTestDir);
std::vector<StorePath> paths {{kTestDir, -1}};
FileUtils::remove_all(config::storage_root_path);
FileUtils::create_dir(config::storage_root_path);
std::vector<StorePath> paths {{config::storage_root_path, -1}};
EngineOptions options;
options.store_paths = paths;
doris::StorageEngine::open(options, &k_engine);
k_engine->start_bg_threads();
}
static void TearDownTestSuite() {
@ -145,9 +138,9 @@ static TDescriptorTable create_descriptor_tablet_with_sequence_col() {
return desc_tbl_builder.desc_tbl();
}
TEST_F(TabletCloneTest, convert_rowset_ids_has_file_in_s3) {
TEST_F(RemoteRowsetGcTest, normal) {
TCreateTabletReq request;
create_tablet_request_with_sequence_col(kTabletId, KSchemaHash, &request);
create_tablet_request_with_sequence_col(10005, 270068377, &request);
Status st = k_engine->create_tablet(request);
ASSERT_EQ(Status::OK(), st);
@ -161,13 +154,14 @@ TEST_F(TabletCloneTest, convert_rowset_ids_has_file_in_s3) {
PUniqueId load_id;
load_id.set_hi(0);
load_id.set_lo(0);
WriteRequest write_req = {kTabletId, KSchemaHash, WriteType::LOAD, 20003,
30003, load_id, tuple_desc, &(tuple_desc->slots())};
WriteRequest write_req = {10005, 270068377, WriteType::LOAD, 20003,
30003, load_id, tuple_desc, &(tuple_desc->slots())};
DeltaWriter* delta_writer = nullptr;
DeltaWriter::open(&write_req, &delta_writer);
ASSERT_NE(delta_writer, nullptr);
MemPool pool;
MemTracker tracker;
MemPool pool(&tracker);
// Tuple 1
{
Tuple* tuple = reinterpret_cast<Tuple*>(pool.allocate(tuple_desc->byte_size()));
@ -199,27 +193,39 @@ TEST_F(TabletCloneTest, convert_rowset_ids_has_file_in_s3) {
write_req.txn_id, write_req.partition_id, &tablet_related_rs);
for (auto& tablet_rs : tablet_related_rs) {
RowsetSharedPtr rowset = tablet_rs.second;
rowset->rowset_meta()->set_resource_id(kResourceId);
st = k_engine->txn_manager()->publish_txn(meta, write_req.partition_id, write_req.txn_id,
tablet->tablet_id(), tablet->schema_hash(),
tablet->tablet_uid(), version);
write_req.tablet_id, write_req.schema_hash,
tablet_rs.first.tablet_uid, version);
ASSERT_EQ(Status::OK(), st);
st = tablet->add_inc_rowset(rowset);
ASSERT_EQ(Status::OK(), st);
}
EXPECT_EQ(1, tablet->num_rows());
TSnapshotRequest snapshot_req;
snapshot_req.tablet_id = kTabletId;
snapshot_req.schema_hash = KSchemaHash;
bool allow_incremental_clone = false;
st = SnapshotManager::instance()->_create_snapshot_files(tablet, snapshot_req, &kSnapshotDir,
&allow_incremental_clone);
tablet->set_storage_policy(kResourceId);
st = tablet->cooldown(); // rowset [0-1]
ASSERT_EQ(Status::OK(), st);
st = SnapshotManager::instance()->convert_rowset_ids(kTestDir, kTabletId, request.replica_id,
KSchemaHash);
ASSERT_NE(Status::OK(), st);
st = tablet->cooldown(); // rowset [2-2]
ASSERT_EQ(Status::OK(), st);
ASSERT_EQ(DorisMetrics::instance()->upload_rowset_count->value(), 1);
delete delta_writer;
auto fs = io::FileSystemMap::instance()->get(kResourceId);
auto rowset = tablet->get_rowset_by_version({2, 2});
ASSERT_TRUE(rowset);
auto seg_path = BetaRowset::remote_segment_path(10005, rowset->rowset_id(), 0);
bool exists = false;
st = fs->exists(seg_path, &exists);
ASSERT_EQ(Status::OK(), st);
ASSERT_TRUE(exists);
st = k_engine->tablet_manager()->drop_tablet(10005, 0, true);
ASSERT_EQ(Status::OK(), st);
tablet->data_dir()->perform_remote_tablet_gc();
st = fs->exists(seg_path, &exists);
ASSERT_EQ(Status::OK(), st);
ASSERT_FALSE(exists);
}
} // namespace doris

View File

@ -25,7 +25,6 @@
#include "io/fs/s3_file_system.h"
#include "olap/delta_writer.h"
#include "olap/storage_engine.h"
#include "olap/storage_policy_mgr.h"
#include "olap/tablet.h"
#include "runtime/descriptor_helper.h"
#include "runtime/tuple.h"
@ -39,25 +38,18 @@ static StorageEngine* k_engine = nullptr;
static const std::string kTestDir = "./ut_dir/tablet_cooldown_test";
static const std::string kResourceId = "TabletCooldownTest";
static const std::string AK = "ak";
static const std::string SK = "sk";
static const std::string ENDPOINT = "endpoint";
static const std::string REGION = "region";
static const std::string BUCKET = "bucket";
static const std::string PREFIX = "tablet_cooldown_test";
// remove DISABLED_ when need run this test
#define TabletCooldownTest DISABLED_TabletCooldownTest
class TabletCooldownTest : public testing::Test {
public:
static void SetUpTestSuite() {
S3Conf s3_conf;
s3_conf.ak = AK;
s3_conf.sk = SK;
s3_conf.endpoint = ENDPOINT;
s3_conf.region = REGION;
s3_conf.bucket = BUCKET;
s3_conf.prefix = PREFIX;
s3_conf.ak = config::test_s3_ak;
s3_conf.sk = config::test_s3_sk;
s3_conf.endpoint = config::test_s3_endpoint;
s3_conf.region = config::test_s3_region;
s3_conf.bucket = config::test_s3_bucket;
s3_conf.prefix = "tablet_cooldown_test";
auto s3_fs = std::make_shared<io::S3FileSystem>(std::move(s3_conf), kResourceId);
ASSERT_TRUE(s3_fs->connect().ok());
io::FileSystemMap::instance()->insert(kResourceId, s3_fs);
@ -167,7 +159,8 @@ TEST_F(TabletCooldownTest, normal) {
DeltaWriter::open(&write_req, &delta_writer);
ASSERT_NE(delta_writer, nullptr);
MemPool pool;
MemTracker tracker;
MemPool pool(&tracker);
// Tuple 1
{
Tuple* tuple = reinterpret_cast<Tuple*>(pool.allocate(tuple_desc->byte_size()));
@ -208,7 +201,7 @@ TEST_F(TabletCooldownTest, normal) {
}
EXPECT_EQ(1, tablet->num_rows());
tablet->set_cooldown_resource(kResourceId);
tablet->set_storage_policy(kResourceId);
st = tablet->cooldown(); // rowset [0-1]
ASSERT_EQ(Status::OK(), st);
st = tablet->cooldown(); // rowset [2-2]

View File

@ -117,7 +117,7 @@ TEST_F(TabletMgrTest, CreateTablet) {
create_st = _tablet_mgr->create_tablet(create_tablet_req, data_dirs);
EXPECT_TRUE(create_st == Status::OK());
Status drop_st = _tablet_mgr->drop_tablet(111, create_tablet_req.replica_id);
Status drop_st = _tablet_mgr->drop_tablet(111, create_tablet_req.replica_id, false);
EXPECT_TRUE(drop_st == Status::OK());
tablet.reset();
Status trash_st = _tablet_mgr->start_trash_sweep();
@ -172,7 +172,7 @@ TEST_F(TabletMgrTest, CreateTabletWithSequence) {
Status check_meta_st = TabletMetaManager::get_meta(_data_dir, 111, 3333, new_tablet_meta);
EXPECT_TRUE(check_meta_st == Status::OK());
Status drop_st = _tablet_mgr->drop_tablet(111, create_tablet_req.replica_id);
Status drop_st = _tablet_mgr->drop_tablet(111, create_tablet_req.replica_id, false);
EXPECT_TRUE(drop_st == Status::OK());
tablet.reset();
Status trash_st = _tablet_mgr->start_trash_sweep();
@ -206,13 +206,13 @@ TEST_F(TabletMgrTest, DropTablet) {
EXPECT_TRUE(tablet != nullptr);
// drop unexist tablet will be success
Status drop_st = _tablet_mgr->drop_tablet(1121, create_tablet_req.replica_id);
Status drop_st = _tablet_mgr->drop_tablet(1121, create_tablet_req.replica_id, false);
EXPECT_TRUE(drop_st == Status::OK());
tablet = _tablet_mgr->get_tablet(111);
EXPECT_TRUE(tablet != nullptr);
// drop exist tablet will be success
drop_st = _tablet_mgr->drop_tablet(111, create_tablet_req.replica_id);
drop_st = _tablet_mgr->drop_tablet(111, create_tablet_req.replica_id, false);
EXPECT_TRUE(drop_st == Status::OK());
tablet = _tablet_mgr->get_tablet(111);
EXPECT_TRUE(tablet == nullptr);

View File

@ -281,7 +281,7 @@ TEST_F(TestTablet, cooldown_policy) {
TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr));
_tablet->init();
_tablet->set_cooldown_resource("test_policy_name");
_tablet->set_storage_policy("test_policy_name");
_tablet->_rs_version_map[ptr1->version()] = rowset1;
_tablet->_rs_version_map[ptr2->version()] = rowset2;

View File

@ -283,10 +283,12 @@ public class BackendLoadStatistic {
// Else if capacity used percent > 75%, set capacityCoefficient to 1.
// Else, capacityCoefficient changed smoothly from 0.5 to 1 with used capacity increasing
// Function: (2 * usedCapacityPercent - 0.5)
loadScore.capacityCoefficient = usedCapacityPercent < 0.5 ? 0.5
: (usedCapacityPercent > Config.capacity_used_percent_high_water ? 1.0
: (2 * usedCapacityPercent - 0.5));
loadScore.replicaNumCoefficient = 1 - loadScore.capacityCoefficient;
if (!Config.be_rebalancer_fuzzy_test) {
loadScore.capacityCoefficient = usedCapacityPercent < 0.5 ? 0.5
: (usedCapacityPercent > Config.capacity_used_percent_high_water ? 1.0
: (2 * usedCapacityPercent - 0.5));
loadScore.replicaNumCoefficient = 1 - loadScore.capacityCoefficient;
}
loadScore.score = capacityProportion * loadScore.capacityCoefficient
+ replicaNumProportion * loadScore.replicaNumCoefficient;

View File

@ -1685,7 +1685,7 @@ public class Config extends ConfigBase {
* It's used to test the reliability in single replica case when tablet scheduling are frequent.
* Default is false.
*/
@ConfField(mutable = false, masterOnly = true)
@ConfField(mutable = true, masterOnly = true)
public static boolean be_rebalancer_fuzzy_test = false;
/**

View File

@ -115,6 +115,13 @@ message RowsetMetaPB {
optional SegmentsOverlapPB segments_overlap_pb = 51 [default = OVERLAP_UNKNOWN];
}
// unused remote rowsets garbage collection kv value
message RemoteRowsetGcPB {
required string resource_id = 1;
required int64 tablet_id = 2;
required int64 num_segments = 3;
}
message AlphaRowsetExtraMetaPB {
repeated SegmentGroupPB segment_groups = 1;
}