[Feature] compaction quickly for small data import (#9804)

* compaction quickly for small data import #9791
1.merge small versions of rowset as soon as possible to increase the import frequency of small version data
2.small version means that the number of rows is less than config::small_compaction_rowset_rows  default 1000
This commit is contained in:
chenlinzhong
2022-06-15 21:48:34 +08:00
committed by GitHub
parent c4871fb306
commit 4dfebb9852
12 changed files with 195 additions and 11 deletions

View File

@ -692,11 +692,13 @@ void TaskWorkerPool::_publish_version_worker_thread_callback() {
VLOG_NOTICE << "get publish version task, signature:" << agent_task_req.signature;
std::vector<TTabletId> error_tablet_ids;
std::vector<TTabletId> succ_tablet_ids;
uint32_t retry_time = 0;
Status res = Status::OK();
while (retry_time < PUBLISH_VERSION_MAX_RETRY) {
error_tablet_ids.clear();
EnginePublishVersionTask engine_task(publish_version_req, &error_tablet_ids);
EnginePublishVersionTask engine_task(publish_version_req, &error_tablet_ids,
&succ_tablet_ids);
res = _env->storage_engine()->execute_task(&engine_task);
if (res.ok()) {
break;
@ -718,7 +720,29 @@ void TaskWorkerPool::_publish_version_worker_thread_callback() {
<< ", error_code=" << res;
finish_task_request.__set_error_tablet_ids(error_tablet_ids);
} else {
LOG(INFO) << "publish_version success. signature:" << agent_task_req.signature;
int submit_tablets = 0;
if (config::enable_quick_compaction && config::quick_compaction_batch_size > 0) {
for (int i = 0; i < succ_tablet_ids.size(); i++) {
TabletSharedPtr tablet =
StorageEngine::instance()->tablet_manager()->get_tablet(
succ_tablet_ids[i]);
if (tablet != nullptr) {
submit_tablets++;
tablet->publised_count++;
if (tablet->publised_count % config::quick_compaction_batch_size == 0) {
StorageEngine::instance()->submit_quick_compaction_task(tablet);
LOG(INFO) << "trigger quick compaction succ, tabletid:"
<< succ_tablet_ids[i]
<< ", publised:" << tablet->publised_count;
}
} else {
LOG(WARNING) << "trigger quick compaction failed, tabletid:"
<< succ_tablet_ids[i];
}
}
LOG(INFO) << "publish_version success. signature:" << agent_task_req.signature
<< ", size:" << succ_tablet_ids.size();
}
}
res.to_thrift(&finish_task_request.task_status);

View File

@ -300,6 +300,9 @@ CONF_mInt32(convert_rowset_thread_num, "0");
// initial sleep interval in seconds of scan alpha rowset
CONF_mInt32(scan_alpha_rowset_min_interval_sec, "3");
// This config can be set to limit thread number in smallcompaction thread pool.
CONF_mInt32(quick_compaction_max_threads, "10");
// Thread count to do tablet meta checkpoint, -1 means use the data directories count.
CONF_Int32(max_meta_checkpoint_threads, "-1");
@ -743,6 +746,15 @@ CONF_Int32(parquet_reader_max_buffer_size, "50");
// if it is lower than a specific threshold, the predicate will be disabled.
CONF_mInt32(bloom_filter_predicate_check_row_num, "1000");
//whether turn on quick compaction feature
CONF_Bool(enable_quick_compaction, "false");
// For continuous versions that rows less than quick_compaction_max_rows will trigger compaction quickly
CONF_Int32(quick_compaction_max_rows, "1000");
// min compaction versions
CONF_Int32(quick_compaction_batch_size, "10");
// do compaction min rowsets
CONF_Int32(quick_compaction_min_rowsets, "10");
} // namespace config
} // namespace doris

View File

@ -24,7 +24,7 @@ namespace doris {
#define APPLY_FOR_ERROR_CODES(M) \
M(OLAP_SUCCESS, 0, "", false) \
M(OLAP_ERR_OTHER_ERROR, -1, "", true) \
M(OLAP_REQUEST_FAILED, -2, "", true) \
M(OLAP_REQUEST_FAILED, -2, "", false) \
M(OLAP_ERR_OS_ERROR, -100, "", true) \
M(OLAP_ERR_DIR_NOT_EXIST, -101, "", true) \
M(OLAP_ERR_FILE_NOT_EXIST, -102, "", true) \
@ -92,7 +92,7 @@ namespace doris {
M(OLAP_ERR_CE_LOAD_TABLE_ERROR, -303, "", true) \
M(OLAP_ERR_CE_NOT_FINISHED, -304, "", true) \
M(OLAP_ERR_CE_TABLET_ID_EXIST, -305, "", true) \
M(OLAP_ERR_CE_TRY_CE_LOCK_ERROR, -306, "", true) \
M(OLAP_ERR_CE_TRY_CE_LOCK_ERROR, -306, "", false) \
M(OLAP_ERR_TABLE_VERSION_DUPLICATE_ERROR, -400, "", true) \
M(OLAP_ERR_TABLE_VERSION_INDEX_MISMATCH_ERROR, -401, "", true) \
M(OLAP_ERR_TABLE_INDEX_VALIDATE_ERROR, -402, "", true) \
@ -176,8 +176,8 @@ namespace doris {
M(OLAP_ERR_HEADER_LOAD_JSON_HEADER, -1410, "", true) \
M(OLAP_ERR_HEADER_INIT_FAILED, -1411, "", true) \
M(OLAP_ERR_HEADER_PB_PARSE_FAILED, -1412, "", true) \
M(OLAP_ERR_HEADER_HAS_PENDING_DATA, -1413, "", true) \
M(OLAP_ERR_SCHEMA_SCHEMA_INVALID, -1500, "", true) \
M(OLAP_ERR_HEADER_HAS_PENDING_DATA, -1413, "", false) \
M(OLAP_ERR_SCHEMA_SCHEMA_INVALID, -1500, "", false) \
M(OLAP_ERR_SCHEMA_SCHEMA_FIELD_INVALID, -1501, "", true) \
M(OLAP_ERR_ALTER_MULTI_TABLE_ERR, -1600, "", true) \
M(OLAP_ERR_ALTER_DELTA_DOES_NOT_EXISTS, -1601, "", true) \

View File

@ -55,6 +55,57 @@ Status Compaction::execute_compact() {
return st;
}
Status Compaction::quick_rowsets_compact() {
std::unique_lock<std::mutex> lock(_tablet->get_cumulative_compaction_lock(), std::try_to_lock);
if (!lock.owns_lock()) {
LOG(WARNING) << "The tablet is under cumulative compaction. tablet="
<< _tablet->full_name();
return Status::OLAPInternalError(OLAP_ERR_CE_TRY_CE_LOCK_ERROR);
}
// Clone task may happen after compaction task is submitted to thread pool, and rowsets picked
// for compaction may change. In this case, current compaction task should not be executed.
if (_tablet->get_clone_occurred()) {
_tablet->set_clone_occurred(false);
return Status::OLAPInternalError(OLAP_ERR_CUMULATIVE_CLONE_OCCURRED);
}
_input_rowsets.clear();
int version_count = _tablet->version_count();
MonotonicStopWatch watch;
watch.start();
int64_t permits = 0;
_tablet->pick_quick_compaction_rowsets(&_input_rowsets, &permits);
std::vector<Version> missedVersions;
find_longest_consecutive_version(&_input_rowsets, &missedVersions);
if (missedVersions.size() != 0) {
LOG(WARNING) << "quick_rowsets_compaction, find missed version"
<< ",input_size:" << _input_rowsets.size();
}
int nums = _input_rowsets.size();
if (_input_rowsets.size() >= config::quick_compaction_min_rowsets) {
Status st = check_version_continuity(_input_rowsets);
if (!st.ok()) {
LOG(WARNING) << "quick_rowsets_compaction failed, cause version not continuous";
return st;
}
st = do_compaction(permits);
if (!st.ok()) {
gc_output_rowset();
LOG(WARNING) << "quick_rowsets_compaction failed";
} else {
LOG(INFO) << "quick_compaction succ"
<< ", before_versions:" << version_count
<< ", after_versions:" << _tablet->version_count()
<< ", cost:" << (watch.elapsed_time() / 1000 / 1000) << "ms"
<< ", merged: " << nums << ", batch:" << config::quick_compaction_batch_size
<< ", segments:" << permits << ", tabletid:" << _tablet->tablet_id();
_tablet->set_last_quick_compaction_success_time(UnixMillis());
}
}
return Status::OK();
}
Status Compaction::do_compaction(int64_t permits) {
TRACE("start to do compaction");
_tablet->data_dir()->disks_compaction_score_increment(permits);

View File

@ -48,6 +48,7 @@ public:
// This is only for http CompactionAction
Status compact();
Status quick_rowsets_compact();
virtual Status prepare_compact() = 0;
Status execute_compact();

View File

@ -17,6 +17,8 @@
#include "olap/delta_writer.h"
#include "olap/base_compaction.h"
#include "olap/cumulative_compaction.h"
#include "olap/data_dir.h"
#include "olap/memtable.h"
#include "olap/memtable_flush_executor.h"
@ -98,6 +100,10 @@ Status DeltaWriter::init() {
MemTracker::create_tracker(-1, "DeltaWriter:" + std::to_string(_tablet->tablet_id()));
// check tablet version number
if (_tablet->version_count() > config::max_tablet_version_num) {
//trigger quick compaction
if (config::enable_quick_compaction) {
StorageEngine::instance()->submit_quick_compaction_task(_tablet);
}
LOG(WARNING) << "failed to init delta writer. version count: " << _tablet->version_count()
<< ", exceed limit: " << config::max_tablet_version_num
<< ". tablet: " << _tablet->full_name();

View File

@ -88,6 +88,16 @@ Status StorageEngine::start_bg_threads() {
LOG(INFO) << "alpha rowset scan thread started";
}
ThreadPoolBuilder("CompactionTaskThreadPool")
.set_min_threads(max_thread_num)
.set_max_threads(max_thread_num)
.build(&_compaction_thread_pool);
ThreadPoolBuilder("SmallCompactionTaskThreadPool")
.set_min_threads(config::quick_compaction_max_threads)
.set_max_threads(config::quick_compaction_max_threads)
.build(&_quick_compaction_thread_pool);
// compaction tasks producer thread
RETURN_IF_ERROR(Thread::create(
"StorageEngine", "compaction_tasks_producer_thread",
@ -661,4 +671,16 @@ Status StorageEngine::submit_compaction_task(TabletSharedPtr tablet,
return _submit_compaction_task(tablet, compaction_type);
}
Status StorageEngine::_handle_quick_compaction(TabletSharedPtr tablet) {
CumulativeCompaction compact(tablet);
compact.quick_rowsets_compact();
return Status::OK();
}
Status StorageEngine::submit_quick_compaction_task(TabletSharedPtr tablet) {
_quick_compaction_thread_pool->submit_func(
std::bind<void>(&StorageEngine::_handle_quick_compaction, this, tablet));
return Status::OK();
}
} // namespace doris

View File

@ -194,6 +194,7 @@ public:
void check_cumulative_compaction_config();
Status submit_compaction_task(TabletSharedPtr tablet, CompactionType compaction_type);
Status submit_quick_compaction_task(TabletSharedPtr tablet);
private:
// Instance should be inited from `static open()`
@ -271,6 +272,8 @@ private:
Status _submit_compaction_task(TabletSharedPtr tablet, CompactionType compaction_type);
Status _handle_quick_compaction(TabletSharedPtr);
private:
struct CompactionCandidate {
CompactionCandidate(uint32_t nicumulative_compaction_, int64_t tablet_id_, uint32_t index_)
@ -379,6 +382,7 @@ private:
HeartbeatFlags* _heartbeat_flags;
std::unique_ptr<ThreadPool> _compaction_thread_pool;
std::unique_ptr<ThreadPool> _quick_compaction_thread_pool;
scoped_refptr<Thread> _alpha_rowset_scan_thread;
std::unique_ptr<ThreadPool> _convert_rowset_thread_pool;

View File

@ -872,10 +872,59 @@ void Tablet::calculate_cumulative_point() {
if (ret_cumulative_point == K_INVALID_CUMULATIVE_POINT) {
return;
}
set_cumulative_layer_point(ret_cumulative_point);
}
//find rowsets that rows less then "config::quick_compaction_max_rows"
Status Tablet::pick_quick_compaction_rowsets(std::vector<RowsetSharedPtr>* input_rowsets,
int64_t* permits) {
int max_rows = config::quick_compaction_max_rows;
if (!config::enable_quick_compaction || max_rows <= 0) {
return Status::OK();
}
if (!init_succeeded()) {
return Status::OLAPInternalError(OLAP_ERR_CUMULATIVE_INVALID_PARAMETERS);
}
int max_series_num = 1000;
std::vector<std::vector<RowsetSharedPtr>> quick_compaction_rowsets(max_series_num);
int idx = 0;
std::shared_lock rdlock(_meta_lock);
std::vector<RowsetSharedPtr> sortedRowset;
for (auto& rs : _rs_version_map) {
sortedRowset.push_back(rs.second);
}
std::sort(sortedRowset.begin(), sortedRowset.end(), Rowset::comparator);
if (tablet_state() == TABLET_RUNNING) {
for (int i = 0; i < sortedRowset.size(); i++) {
bool is_delete = version_for_delete_predicate(sortedRowset[i]->version());
if (!is_delete && sortedRowset[i]->start_version() > 0 &&
sortedRowset[i]->start_version() > cumulative_layer_point()) {
if (sortedRowset[i]->num_rows() < max_rows) {
quick_compaction_rowsets[idx].push_back(sortedRowset[i]);
} else {
idx++;
if (idx > max_series_num) {
break;
}
}
}
}
if (quick_compaction_rowsets.size() == 0) return Status::OK();
std::vector<RowsetSharedPtr> result = quick_compaction_rowsets[0];
for (int i = 0; i < quick_compaction_rowsets.size(); i++) {
if (quick_compaction_rowsets[i].size() > result.size()) {
result = quick_compaction_rowsets[i];
}
}
for (int i = 0; i < result.size(); i++) {
*permits += result[i]->num_segments();
input_rowsets->push_back(result[i]);
}
}
return Status::OK();
}
Status Tablet::split_range(const OlapTuple& start_key_strings, const OlapTuple& end_key_strings,
uint64_t request_block_row_count, std::vector<OlapTuple>* ranges) {
DCHECK(ranges != nullptr);

View File

@ -73,6 +73,8 @@ public:
// Used in clone task, to update local meta when finishing a clone job
Status revise_tablet_meta(const std::vector<RowsetMetaSharedPtr>& rowsets_to_clone,
const std::vector<Version>& versions_to_delete);
Status pick_quick_compaction_rowsets(std::vector<RowsetSharedPtr>* input_rowsets,
int64_t* permits);
const int64_t cumulative_layer_point() const;
void set_cumulative_layer_point(int64_t new_point);
@ -190,6 +192,10 @@ public:
_last_cumu_compaction_success_millis = millis;
}
void set_last_quick_compaction_success_time(int64_t millis) {
_last_quick_compaction_success_time_millis = millis;
}
int64_t last_base_compaction_success_time() { return _last_base_compaction_success_millis; }
void set_last_base_compaction_success_time(int64_t millis) {
_last_base_compaction_success_millis = millis;
@ -337,7 +343,7 @@ private:
std::atomic<int64_t> _last_cumu_compaction_success_millis;
// timestamp of last base compaction success
std::atomic<int64_t> _last_base_compaction_success_millis;
std::atomic<int64_t> _last_quick_compaction_success_time_millis;
std::atomic<int64_t> _cumulative_point;
std::atomic<int32_t> _newly_created_rowset_num;
std::atomic<int64_t> _last_checkpoint_time;
@ -367,6 +373,7 @@ private:
public:
IntCounter* flush_bytes;
IntCounter* flush_count;
std::atomic<int64_t> publised_count = 0;
};
inline CumulativeCompactionPolicy* Tablet::cumulative_compaction_policy() {

View File

@ -28,8 +28,11 @@ namespace doris {
using std::map;
EnginePublishVersionTask::EnginePublishVersionTask(TPublishVersionRequest& publish_version_req,
std::vector<TTabletId>* error_tablet_ids)
: _publish_version_req(publish_version_req), _error_tablet_ids(error_tablet_ids) {}
std::vector<TTabletId>* error_tablet_ids,
std::vector<TTabletId>* succ_tablet_ids)
: _publish_version_req(publish_version_req),
_error_tablet_ids(error_tablet_ids),
_succ_tablet_ids(succ_tablet_ids) {}
Status EnginePublishVersionTask::finish() {
Status res = Status::OK();
@ -106,6 +109,9 @@ Status EnginePublishVersionTask::finish() {
res = publish_status;
continue;
}
if (_succ_tablet_ids != nullptr) {
_succ_tablet_ids->push_back(tablet_info.tablet_id);
}
partition_related_tablet_infos.erase(tablet_info);
VLOG_NOTICE << "publish version successfully on tablet. tablet=" << tablet->full_name()
<< ", transaction_id=" << transaction_id << ", version=" << version.first

View File

@ -27,7 +27,8 @@ namespace doris {
class EnginePublishVersionTask : public EngineTask {
public:
EnginePublishVersionTask(TPublishVersionRequest& publish_version_req,
vector<TTabletId>* error_tablet_ids);
vector<TTabletId>* error_tablet_ids,
std::vector<TTabletId>* succ_tablet_ids = nullptr);
~EnginePublishVersionTask() {}
virtual Status finish() override;
@ -35,6 +36,7 @@ public:
private:
const TPublishVersionRequest& _publish_version_req;
vector<TTabletId>* _error_tablet_ids;
vector<TTabletId>* _succ_tablet_ids;
};
} // namespace doris