[Enhancement](wal) Add wal space back pressure (#26483)

This commit is contained in:
abmdocrt
2023-11-09 12:29:05 +08:00
committed by GitHub
parent 84d1c3ba30
commit 5f62a4462d
8 changed files with 73 additions and 7 deletions

View File

@ -1118,8 +1118,11 @@ DEFINE_Bool(ignore_always_true_predicate_for_segment, "true");
// Dir of default timezone files
DEFINE_String(default_tzfiles_path, "${DORIS_HOME}/zoneinfo");
// Max size(bytes) of group commit queues, used for mem back pressure.
DEFINE_Int32(group_commit_max_queue_size, "65536");
// Max size(bytes) of group commit queues, used for mem back pressure, defult 64M.
DEFINE_Int32(group_commit_max_queue_size, "67108864");
// Max size(bytes) of wal disk using, used for disk space back pressure, default 64M.
DEFINE_Int32(wal_max_disk_size, "67108864");
// Ingest binlog work pool size, -1 is disable, 0 is hardware concurrency
DEFINE_Int32(ingest_binlog_work_pool_size, "-1");

View File

@ -1195,6 +1195,9 @@ DECLARE_String(default_tzfiles_path);
// Max size(bytes) of group commit queues, used for mem back pressure.
DECLARE_Int32(group_commit_max_queue_size);
// Max size(bytes) of wal disk using, used for disk space back pressure.
DECLARE_Int32(wal_max_disk_size);
// Ingest binlog work pool size
DECLARE_Int32(ingest_binlog_work_pool_size);

View File

@ -19,10 +19,15 @@
#include <thrift/protocol/TDebugProtocol.h>
#include <atomic>
#include <chrono>
#include <cstdint>
#include <filesystem>
#include <memory>
#include <utility>
#include "io/fs/local_file_system.h"
#include "olap/wal_writer.h"
#include "runtime/client_cache.h"
#include "runtime/fragment_mgr.h"
#include "runtime/plan_fragment_executor.h"
@ -35,11 +40,13 @@ namespace doris {
WalManager::WalManager(ExecEnv* exec_env, const std::string& wal_dir_list)
: _exec_env(exec_env), _stop_background_threads_latch(1) {
doris::vectorized::WalReader::string_split(wal_dir_list, ",", _wal_dirs);
_all_wal_disk_bytes = std::make_shared<std::atomic_size_t>(0);
}
WalManager::~WalManager() {
LOG(INFO) << "WalManager is destoried";
}
void WalManager::stop() {
_stop = true;
_stop_background_threads_latch.count_down();
@ -117,8 +124,12 @@ Status WalManager::create_wal_writer(int64_t wal_id, std::shared_ptr<WalWriter>&
RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(base_path));
}
LOG(INFO) << "create wal " << wal_path;
wal_writer = std::make_shared<WalWriter>(wal_path);
wal_writer = std::make_shared<WalWriter>(wal_path, _all_wal_disk_bytes);
RETURN_IF_ERROR(wal_writer->init());
{
std::lock_guard<std::shared_mutex> wrlock(_wal_lock);
_wal_id_to_writer_map.emplace(wal_id, wal_writer);
}
return Status::OK();
}
@ -241,6 +252,17 @@ size_t WalManager::get_wal_table_size(const std::string& table_id) {
Status WalManager::delete_wal(int64_t wal_id) {
{
std::lock_guard<std::shared_mutex> wrlock(_wal_lock);
if (_wal_id_to_writer_map.find(wal_id) != _wal_id_to_writer_map.end()) {
_all_wal_disk_bytes->store(
_all_wal_disk_bytes->fetch_sub(_wal_id_to_writer_map[wal_id]->disk_bytes(),
std::memory_order_relaxed),
std::memory_order_relaxed);
_wal_id_to_writer_map[wal_id]->cv.notify_one();
_wal_id_to_writer_map.erase(wal_id);
}
if (_wal_id_to_writer_map.empty()) {
CHECK_EQ(_all_wal_disk_bytes->load(std::memory_order_relaxed), 0);
}
std::string wal_path = _wal_path_map[wal_id];
RETURN_IF_ERROR(io::global_local_filesystem()->delete_file(wal_path));
LOG(INFO) << "delete file=" << wal_path;

View File

@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.
#include <memory>
#include "common/config.h"
#include "gen_cpp/FrontendService.h"
#include "gen_cpp/FrontendService_types.h"
@ -56,6 +58,8 @@ private:
std::vector<std::string> _wal_dirs;
std::shared_mutex _wal_lock;
std::unordered_map<int64_t, std::string> _wal_path_map;
std::unordered_map<int64_t, std::shared_ptr<WalWriter>> _wal_id_to_writer_map;
std::shared_ptr<std::atomic_size_t> _all_wal_disk_bytes;
bool _stop = false;
};
} // namespace doris

View File

@ -17,6 +17,9 @@
#include "olap/wal_writer.h"
#include <atomic>
#include "common/config.h"
#include "io/fs/file_writer.h"
#include "io/fs/local_file_system.h"
#include "io/fs/path.h"
@ -25,7 +28,12 @@
namespace doris {
WalWriter::WalWriter(const std::string& file_name) : _file_name(file_name) {}
WalWriter::WalWriter(const std::string& file_name,
const std::shared_ptr<std::atomic_size_t>& all_wal_disk_bytes)
: _file_name(file_name),
_count(0),
_disk_bytes(0),
_all_wal_disk_bytes(all_wal_disk_bytes) {}
WalWriter::~WalWriter() {}
@ -44,6 +52,12 @@ Status WalWriter::finalize() {
}
Status WalWriter::append_blocks(const PBlockArray& blocks) {
{
std::unique_lock l(_mutex);
while (_all_wal_disk_bytes->load(std::memory_order_relaxed) > config::wal_max_disk_size) {
cv.wait_for(l, std::chrono::milliseconds(WalWriter::MAX_WAL_WRITE_WAIT_TIME));
}
}
size_t total_size = 0;
for (const auto& block : blocks) {
total_size += LENGTH_SIZE + block->ByteSizeLong() + CHECKSUM_SIZE;
@ -62,6 +76,10 @@ Status WalWriter::append_blocks(const PBlockArray& blocks) {
offset += CHECKSUM_SIZE;
}
DCHECK(offset == total_size);
_disk_bytes += total_size;
_all_wal_disk_bytes->store(
_all_wal_disk_bytes->fetch_add(total_size, std::memory_order_relaxed),
std::memory_order_relaxed);
// write rows
RETURN_IF_ERROR(_file_writer->append({row_binary, offset}));
_count++;

View File

@ -17,9 +17,13 @@
#pragma once
#include <atomic>
#include <memory>
#include "common/status.h"
#include "gen_cpp/internal_service.pb.h"
#include "io/fs/file_reader_writer_fwd.h"
#include "util/lock.h"
namespace doris {
@ -27,23 +31,30 @@ using PBlockArray = std::vector<PBlock*>;
class WalWriter {
public:
explicit WalWriter(const std::string& file_name);
explicit WalWriter(const std::string& file_name,
const std::shared_ptr<std::atomic_size_t>& all_wal_disk_bytes);
~WalWriter();
Status init();
Status finalize();
Status append_blocks(const PBlockArray& blocks);
size_t disk_bytes() const { return _disk_bytes; };
std::string file_name() { return _file_name; };
static const int64_t LENGTH_SIZE = 8;
static const int64_t CHECKSUM_SIZE = 4;
doris::ConditionVariable cv;
private:
static constexpr size_t MAX_WAL_WRITE_WAIT_TIME = 1000;
std::string _file_name;
io::FileWriterPtr _file_writer;
int64_t _count;
int64_t _batch;
size_t _disk_bytes;
std::shared_ptr<std::atomic_size_t> _all_wal_disk_bytes;
doris::Mutex _mutex;
};
} // namespace doris

View File

@ -78,7 +78,9 @@ public:
void prepare() { static_cast<void>(io::global_local_filesystem()->create_directory(wal_dir)); }
void createWal(const std::string& wal_path) {
auto wal_writer = WalWriter(wal_path);
std::shared_ptr<std::atomic_size_t> _all_wal_disk_bytes =
std::make_shared<std::atomic_size_t>(0);
auto wal_writer = WalWriter(wal_path, _all_wal_disk_bytes);
static_cast<void>(wal_writer.init());
static_cast<void>(wal_writer.finalize());
}

View File

@ -17,6 +17,7 @@
#include <gtest/gtest.h>
#include <filesystem>
#include <memory>
#include "agent/be_exec_version_manager.h"
#include "common/object_pool.h"
@ -89,7 +90,9 @@ void generate_block(PBlock& pblock, int row_index) {
TEST_F(WalReaderWriterTest, TestWriteAndRead1) {
std::string file_name = _s_test_data_path + "/abcd123.txt";
auto wal_writer = WalWriter(file_name);
std::shared_ptr<std::atomic_size_t> _all_wal_disk_bytes =
std::make_shared<std::atomic_size_t>(0);
auto wal_writer = WalWriter(file_name, _all_wal_disk_bytes);
static_cast<void>(wal_writer.init());
size_t file_len = 0;
int64_t file_size = -1;