[Fix](remote) Fix thread safety issue in cache (#11984)

This commit is contained in:
zxealous
2022-08-24 18:14:14 +08:00
committed by GitHub
parent 2057edbea0
commit 54fc038dc5
10 changed files with 224 additions and 85 deletions

View File

@ -701,6 +701,10 @@ CONF_Validator(max_send_batch_parallelism_per_job,
CONF_Int32(send_batch_thread_pool_thread_num, "64");
// number of send batch thread pool queue size
CONF_Int32(send_batch_thread_pool_queue_size, "102400");
// number of download cache thread pool size
CONF_Int32(download_cache_thread_pool_thread_num, "48");
// number of download cache thread pool queue size
CONF_Int32(download_cache_thread_pool_queue_size, "102400");
// Limit the number of segment of a newly created rowset.
// The newly created rowset may to be compacted after loading,

View File

@ -121,48 +121,84 @@ Status SubFileCache::_generate_cache_reader(size_t offset, size_t req_size) {
RETURN_NOT_OK_STATUS_WITH_WARN(
io::global_local_filesystem()->exists(cache_done_file, &done_file_exist),
fmt::format("Check local cache done file exist failed. {}", cache_done_file.native()));
std::promise<Status> download_st;
std::future<Status> future = download_st.get_future();
if (!done_file_exist) {
bool cache_file_exist = false;
RETURN_NOT_OK_STATUS_WITH_WARN(
io::global_local_filesystem()->exists(cache_file, &cache_file_exist),
fmt::format("Check local cache file exist failed. {}", cache_file.native()));
if (cache_file_exist) {
RETURN_NOT_OK_STATUS_WITH_WARN(
io::global_local_filesystem()->delete_file(cache_file),
fmt::format("Check local cache file exist failed. {}", cache_file.native()));
ThreadPoolToken* thread_token =
ExecEnv::GetInstance()->get_serial_download_cache_thread_token();
if (thread_token != nullptr) {
auto st = thread_token->submit_func([this, &download_st, cache_done_file, cache_file,
offset, req_size] {
auto func = [this, cache_done_file, cache_file, offset, req_size] {
bool done_file_exist = false;
// Judge again whether cache_done_file exists, it is possible that the cache
// is downloaded while waiting in the thread pool
RETURN_NOT_OK_STATUS_WITH_WARN(io::global_local_filesystem()->exists(
cache_done_file, &done_file_exist),
"Check local cache done file exist failed.");
bool cache_file_exist = false;
RETURN_NOT_OK_STATUS_WITH_WARN(
io::global_local_filesystem()->exists(cache_file, &cache_file_exist),
fmt::format("Check local cache file exist failed. {}",
cache_file.native()));
if (done_file_exist && cache_file_exist) {
return Status::OK();
} else if (!done_file_exist && cache_file_exist) {
RETURN_NOT_OK_STATUS_WITH_WARN(
io::global_local_filesystem()->delete_file(cache_file),
fmt::format("Check local cache file exist failed. {}",
cache_file.native()));
}
LOG(INFO) << "Download cache file from remote file: "
<< _remote_file_reader->path().native() << " -> "
<< cache_file.native();
std::unique_ptr<char[]> file_buf(new char[req_size]);
Slice file_slice(file_buf.get(), req_size);
size_t bytes_read = 0;
RETURN_NOT_OK_STATUS_WITH_WARN(
_remote_file_reader->read_at(0, file_slice, &bytes_read),
fmt::format("read remote file failed. {}. offset: {}, size: {}",
_remote_file_reader->path().native(), offset, req_size));
if (bytes_read != req_size) {
LOG(ERROR) << "read remote file failed: "
<< _remote_file_reader->path().native()
<< ", bytes read: " << bytes_read
<< " vs file size: " << _remote_file_reader->size();
return Status::OLAPInternalError(OLAP_ERR_OS_ERROR);
}
io::FileWriterPtr file_writer;
RETURN_NOT_OK_STATUS_WITH_WARN(
io::global_local_filesystem()->create_file(cache_file, &file_writer),
fmt::format("Create local cache file failed: {}", cache_file.native()));
RETURN_NOT_OK_STATUS_WITH_WARN(
file_writer->append(file_slice),
fmt::format("Write local cache file failed: {}", cache_file.native()));
RETURN_NOT_OK_STATUS_WITH_WARN(
file_writer->close(),
fmt::format("Close local cache file failed: {}", cache_file.native()));
io::FileWriterPtr done_file_writer;
RETURN_NOT_OK_STATUS_WITH_WARN(io::global_local_filesystem()->create_file(
cache_done_file, &done_file_writer),
fmt::format("Create local done file failed: {}",
cache_done_file.native()));
RETURN_NOT_OK_STATUS_WITH_WARN(done_file_writer->close(),
fmt::format("Close local done file failed: {}",
cache_done_file.native()));
return Status::OK();
};
download_st.set_value(func());
});
if (!st.ok()) {
LOG(FATAL) << "Failed to submit download cache task to thread pool! "
<< st.get_error_msg();
}
} else {
return Status::InternalError("Failed to get download cache thread token");
}
LOG(INFO) << "Download cache file from remote file: "
<< _remote_file_reader->path().native() << " -> " << cache_file.native();
std::unique_ptr<char[]> file_buf(new char[req_size]);
Slice file_slice(file_buf.get(), req_size);
size_t bytes_read = 0;
RETURN_NOT_OK_STATUS_WITH_WARN(
_remote_file_reader->read_at(0, file_slice, &bytes_read),
fmt::format("read remote file failed. {}. offset: {}, size: {}",
_remote_file_reader->path().native(), offset, req_size));
if (bytes_read != req_size) {
LOG(ERROR) << "read remote file failed: " << _remote_file_reader->path().native()
<< ", bytes read: " << bytes_read
<< " vs file size: " << _remote_file_reader->size();
return Status::OLAPInternalError(OLAP_ERR_OS_ERROR);
if (!future.get().ok()) {
return future.get();
}
io::FileWriterPtr file_writer;
RETURN_NOT_OK_STATUS_WITH_WARN(
io::global_local_filesystem()->create_file(cache_file, &file_writer),
fmt::format("Create local cache file failed: {}", cache_file.native()));
RETURN_NOT_OK_STATUS_WITH_WARN(
file_writer->append(file_slice),
fmt::format("Write local cache file failed: {}", cache_file.native()));
RETURN_NOT_OK_STATUS_WITH_WARN(
file_writer->close(),
fmt::format("Close local cache file failed: {}", cache_file.native()));
io::FileWriterPtr done_file_writer;
RETURN_NOT_OK_STATUS_WITH_WARN(
io::global_local_filesystem()->create_file(cache_done_file, &done_file_writer),
fmt::format("Create local done file failed: {}", cache_done_file.native()));
RETURN_NOT_OK_STATUS_WITH_WARN(
done_file_writer->close(),
fmt::format("Close local done file failed: {}", cache_done_file.native()));
}
io::FileReaderSPtr cache_reader;
RETURN_IF_ERROR(io::global_local_filesystem()->open_file(cache_file, &cache_reader));

View File

@ -17,6 +17,7 @@
#pragma once
#include <future>
#include <memory>
#include "common/status.h"

View File

@ -60,55 +60,95 @@ Status WholeFileCache::_generate_cache_reader(size_t offset, size_t req_size) {
RETURN_NOT_OK_STATUS_WITH_WARN(
io::global_local_filesystem()->exists(cache_done_file, &done_file_exist),
"Check local cache done file exist failed.");
std::promise<Status> download_st;
std::future<Status> future = download_st.get_future();
if (!done_file_exist) {
bool cache_dir_exist = false;
RETURN_NOT_OK_STATUS_WITH_WARN(
io::global_local_filesystem()->exists(_cache_dir, &cache_dir_exist),
fmt::format("Check local cache dir exist failed. {}", _cache_dir.native()));
if (!cache_dir_exist) {
RETURN_NOT_OK_STATUS_WITH_WARN(
io::global_local_filesystem()->create_directory(_cache_dir),
fmt::format("Create local cache dir failed. {}", _cache_dir.native()));
ThreadPoolToken* thread_token =
ExecEnv::GetInstance()->get_serial_download_cache_thread_token();
if (thread_token != nullptr) {
auto st = thread_token->submit_func([this, &download_st, cache_done_file, cache_file] {
auto func = [this, cache_done_file, cache_file] {
bool done_file_exist = false;
bool cache_dir_exist = false;
RETURN_NOT_OK_STATUS_WITH_WARN(
io::global_local_filesystem()->exists(_cache_dir, &cache_dir_exist),
fmt::format("Check local cache dir exist failed. {}",
_cache_dir.native()));
if (!cache_dir_exist) {
RETURN_NOT_OK_STATUS_WITH_WARN(
io::global_local_filesystem()->create_directory(_cache_dir),
fmt::format("Create local cache dir failed. {}",
_cache_dir.native()));
} else {
// Judge again whether cache_done_file exists, it is possible that the cache
// is downloaded while waiting in the thread pool
RETURN_NOT_OK_STATUS_WITH_WARN(io::global_local_filesystem()->exists(
cache_done_file, &done_file_exist),
"Check local cache done file exist failed.");
}
bool cache_file_exist = false;
RETURN_NOT_OK_STATUS_WITH_WARN(
io::global_local_filesystem()->exists(cache_file, &cache_file_exist),
"Check local cache file exist failed.");
if (done_file_exist && cache_file_exist) {
return Status::OK();
} else if (!done_file_exist && cache_file_exist) {
RETURN_NOT_OK_STATUS_WITH_WARN(
io::global_local_filesystem()->delete_file(cache_file),
fmt::format("Check local cache file exist failed. {}",
cache_file.native()));
}
LOG(INFO) << "Download cache file from remote file: "
<< _remote_file_reader->path().native() << " -> "
<< cache_file.native();
std::unique_ptr<char[]> file_buf(new char[_remote_file_reader->size()]);
Slice file_slice(file_buf.get(), _remote_file_reader->size());
size_t bytes_read = 0;
RETURN_NOT_OK_STATUS_WITH_WARN(
_remote_file_reader->read_at(0, file_slice, &bytes_read),
fmt::format("read remote file failed. {}",
_remote_file_reader->path().native()));
if (bytes_read != _remote_file_reader->size()) {
LOG(ERROR) << "read remote file failed: "
<< _remote_file_reader->path().native()
<< ", bytes read: " << bytes_read
<< " vs file size: " << _remote_file_reader->size();
return Status::OLAPInternalError(OLAP_ERR_OS_ERROR);
}
io::FileWriterPtr file_writer;
RETURN_NOT_OK_STATUS_WITH_WARN(
io::global_local_filesystem()->create_file(cache_file, &file_writer),
fmt::format("Create local cache file failed: {}", cache_file.native()));
RETURN_NOT_OK_STATUS_WITH_WARN(
file_writer->append(file_slice),
fmt::format("Write local cache file failed: {}", cache_file.native()));
RETURN_NOT_OK_STATUS_WITH_WARN(
file_writer->close(),
fmt::format("Close local cache file failed: {}", cache_file.native()));
io::FileWriterPtr done_file_writer;
RETURN_NOT_OK_STATUS_WITH_WARN(io::global_local_filesystem()->create_file(
cache_done_file, &done_file_writer),
fmt::format("Create local done file failed: {}",
cache_done_file.native()));
RETURN_NOT_OK_STATUS_WITH_WARN(done_file_writer->close(),
fmt::format("Close local done file failed: {}",
cache_done_file.native()));
return Status::OK();
};
download_st.set_value(func());
});
if (!st.ok()) {
LOG(FATAL) << "Failed to submit download cache task to thread pool! "
<< st.get_error_msg();
return st;
}
} else {
return Status::InternalError("Failed to get download cache thread token");
}
bool cache_file_exist = false;
RETURN_NOT_OK_STATUS_WITH_WARN(
io::global_local_filesystem()->exists(cache_file, &cache_file_exist),
"Check local cache file exist failed.");
if (cache_file_exist) {
RETURN_NOT_OK_STATUS_WITH_WARN(io::global_local_filesystem()->delete_file(cache_file),
"Check local cache file exist failed.");
if (!future.get().ok()) {
return future.get();
}
LOG(INFO) << "Download cache file from remote file: "
<< _remote_file_reader->path().native() << " -> " << cache_file.native();
std::unique_ptr<char[]> file_buf(new char[_remote_file_reader->size()]);
Slice file_slice(file_buf.get(), _remote_file_reader->size());
size_t bytes_read = 0;
RETURN_NOT_OK_STATUS_WITH_WARN(
_remote_file_reader->read_at(0, file_slice, &bytes_read),
fmt::format("read remote file failed. {}", _remote_file_reader->path().native()));
if (bytes_read != _remote_file_reader->size()) {
LOG(ERROR) << "read remote file failed: " << _remote_file_reader->path().native()
<< ", bytes read: " << bytes_read
<< " vs file size: " << _remote_file_reader->size();
return Status::OLAPInternalError(OLAP_ERR_OS_ERROR);
}
io::FileWriterPtr file_writer;
RETURN_NOT_OK_STATUS_WITH_WARN(
io::global_local_filesystem()->create_file(cache_file, &file_writer),
fmt::format("Create local cache file failed: {}", cache_file.native()));
RETURN_NOT_OK_STATUS_WITH_WARN(
file_writer->append(file_slice),
fmt::format("Write local cache file failed: {}", cache_file.native()));
RETURN_NOT_OK_STATUS_WITH_WARN(
file_writer->close(),
fmt::format("Close local cache file failed: {}", cache_file.native()));
io::FileWriterPtr done_file_writer;
RETURN_NOT_OK_STATUS_WITH_WARN(
io::global_local_filesystem()->create_file(cache_done_file, &done_file_writer),
fmt::format("Create local done file failed: {}", cache_done_file.native()));
RETURN_NOT_OK_STATUS_WITH_WARN(
done_file_writer->close(),
fmt::format("Close local done file failed: {}", cache_done_file.native()));
}
RETURN_IF_ERROR(io::global_local_filesystem()->open_file(cache_file, &_cache_file_reader));
_cache_file_size = _cache_file_reader->size();

View File

@ -17,6 +17,7 @@
#pragma once
#include <future>
#include <memory>
#include "common/status.h"

View File

@ -125,6 +125,14 @@ public:
PriorityThreadPool* remote_scan_thread_pool() { return _remote_scan_thread_pool; }
ThreadPool* limited_scan_thread_pool() { return _limited_scan_thread_pool.get(); }
ThreadPool* send_batch_thread_pool() { return _send_batch_thread_pool.get(); }
ThreadPool* download_cache_thread_pool() { return _download_cache_thread_pool.get(); }
void set_serial_download_cache_thread_token() {
_serial_download_cache_thread_token =
download_cache_thread_pool()->new_token(ThreadPool::ExecutionMode::SERIAL, 1);
}
ThreadPoolToken* get_serial_download_cache_thread_token() {
return _serial_download_cache_thread_token.get();
}
CgroupsMgr* cgroups_mgr() { return _cgroups_mgr; }
FragmentMgr* fragment_mgr() { return _fragment_mgr; }
ResultCache* result_cache() { return _result_cache; }
@ -207,6 +215,12 @@ private:
std::unique_ptr<ThreadPool> _limited_scan_thread_pool;
std::unique_ptr<ThreadPool> _send_batch_thread_pool;
// Threadpool used to download cache from remote storage
std::unique_ptr<ThreadPool> _download_cache_thread_pool;
// A token used to submit download cache task serially
std::unique_ptr<ThreadPoolToken> _serial_download_cache_thread_token;
CgroupsMgr* _cgroups_mgr = nullptr;
FragmentMgr* _fragment_mgr = nullptr;
ResultCache* _result_cache = nullptr;

View File

@ -70,6 +70,8 @@ namespace doris {
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(scanner_thread_pool_queue_size, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(send_batch_thread_pool_thread_num, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(send_batch_thread_pool_queue_size, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(download_cache_thread_pool_thread_num, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(download_cache_thread_pool_queue_size, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(query_mem_consumption, MetricUnit::BYTES, "", mem_consumption,
Labels({{"type", "query"}}));
DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(load_mem_consumption, MetricUnit::BYTES, "", mem_consumption,
@ -129,6 +131,13 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths) {
.set_max_queue_size(config::send_batch_thread_pool_queue_size)
.build(&_send_batch_thread_pool);
ThreadPoolBuilder("DownloadCacheThreadPool")
.set_min_threads(1)
.set_max_threads(config::download_cache_thread_pool_thread_num)
.set_max_queue_size(config::download_cache_thread_pool_queue_size)
.build(&_download_cache_thread_pool);
set_serial_download_cache_thread_token();
_scanner_scheduler = new doris::vectorized::ScannerScheduler();
_cgroups_mgr = new CgroupsMgr(this, config::doris_cgroups);
@ -320,12 +329,20 @@ void ExecEnv::_register_metrics() {
REGISTER_HOOK_METRIC(send_batch_thread_pool_queue_size,
[this]() { return _send_batch_thread_pool->get_queue_size(); });
REGISTER_HOOK_METRIC(download_cache_thread_pool_thread_num,
[this]() { return _download_cache_thread_pool->num_threads(); });
REGISTER_HOOK_METRIC(download_cache_thread_pool_queue_size,
[this]() { return _download_cache_thread_pool->get_queue_size(); });
}
void ExecEnv::_deregister_metrics() {
DEREGISTER_HOOK_METRIC(scanner_thread_pool_queue_size);
DEREGISTER_HOOK_METRIC(send_batch_thread_pool_thread_num);
DEREGISTER_HOOK_METRIC(send_batch_thread_pool_queue_size);
DEREGISTER_HOOK_METRIC(download_cache_thread_pool_thread_num);
DEREGISTER_HOOK_METRIC(download_cache_thread_pool_queue_size);
}
void ExecEnv::_destroy() {

View File

@ -198,6 +198,8 @@ public:
UIntGauge* add_batch_task_queue_size;
UIntGauge* send_batch_thread_pool_thread_num;
UIntGauge* send_batch_thread_pool_queue_size;
UIntGauge* download_cache_thread_pool_thread_num;
UIntGauge* download_cache_thread_pool_queue_size;
// Upload metrics
UIntGauge* upload_total_byte;

View File

@ -1106,6 +1106,18 @@ This configuration is used for the context gc thread scheduling cycle. Note: The
* Type: int32
* Description: The queue length of the SendBatch thread pool. In NodeChannels' sending data tasks, the SendBatch operation of each NodeChannel will be submitted as a thread task to the thread pool waiting to be scheduled, and after the number of submitted tasks exceeds the length of the thread pool queue, subsequent submitted tasks will be blocked until there is a empty slot in the queue.
### `download_cache_thread_pool_thread_num`
* Type: int32
* Description: The number of threads in the DownloadCache thread pool. In the download cache task of FileCache, the download cache operation will be submitted to the thread pool as a thread task and wait to be scheduled. This parameter determines the size of the DownloadCache thread pool.
* Default value: 48
### `download_cache_thread_pool_queue_size`
* Type: int32
* Description: The number of threads in the DownloadCache thread pool. In the download cache task of FileCache, the download cache operation will be submitted to the thread pool as a thread task and wait to be scheduled. After the number of submitted tasks exceeds the length of the thread pool queue, subsequent submitted tasks will be blocked until there is a empty slot in the queue.
* Default value: 102400
### `single_replica_load_brpc_port`
* Type: int32

View File

@ -1108,6 +1108,18 @@ routine load任务的线程池大小。 这应该大于 FE 配置 'max_concurren
* 描述:SendBatch线程池的队列长度。在NodeChannel的发送数据任务之中,每一个NodeChannel的SendBatch操作会作为一个线程task提交到线程池之中等待被调度,而提交的任务数目超过线程池队列的长度之后,后续提交的任务将阻塞直到队列之中有新的空缺。
* 默认值:102400
### `download_cache_thread_pool_thread_num`
* 类型: int32
* 描述: DownloadCache线程池线程数目. 在FileCache的缓存下载任务之中, 缓存下载操作会作为一个线程task提交到线程池之中等待被调度,该参数决定了DownloadCache线程池的大小。
* 默认值:48
### `download_cache_thread_pool_queue_size`
* Type: int32
* 描述: DownloadCache线程池线程数目. 在FileCache的缓存下载任务之中, 缓存下载操作会作为一个线程task提交到线程池之中等待被调度,而提交的任务数目超过线程池队列的长度之后,后续提交的任务将阻塞直到队列之中有新的空缺。
* 默认值:102400
### `serialize_batch`
默认值:false