Change cumulative compaction for decoupling storage from compution (#1576)
1. Calculate cumulative point when loading tablet first time. 2. Simplify pick rowsets logic upon delete predicate. 3. Saving meta and modify rowsets only once after cumulative compaction.
This commit is contained in:
@ -30,6 +30,7 @@ add_library(Olap STATIC
|
||||
bloom_filter_reader.cpp
|
||||
bloom_filter_writer.cpp
|
||||
byte_buffer.cpp
|
||||
compaction.cpp
|
||||
comparison_predicate.cpp
|
||||
compress.cpp
|
||||
cumulative_compaction.cpp
|
||||
|
||||
@ -16,513 +16,106 @@
|
||||
// under the License.
|
||||
|
||||
#include "olap/base_compaction.h"
|
||||
|
||||
#include <algorithm>
|
||||
#include <list>
|
||||
#include <map>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
#include "olap/delete_handler.h"
|
||||
#include "olap/merger.h"
|
||||
#include "olap/rowset/column_data.h"
|
||||
#include "olap/storage_engine.h"
|
||||
#include "olap/tablet_meta.h"
|
||||
#include "olap/rowset/segment_group.h"
|
||||
#include "olap/tablet.h"
|
||||
#include "olap/utils.h"
|
||||
#include "util/doris_metrics.h"
|
||||
|
||||
using std::list;
|
||||
using std::map;
|
||||
using std::string;
|
||||
using std::vector;
|
||||
|
||||
namespace doris {
|
||||
|
||||
OLAPStatus BaseCompaction::init(TabletSharedPtr tablet, bool is_manual_trigger) {
|
||||
// 表在首次查询或PUSH等操作时,会被加载到内存
|
||||
// 如果表没有被加载,表明该表上目前没有任何操作,所以不进行BE操作
|
||||
if (!tablet->init_succeeded()) {
|
||||
BaseCompaction::BaseCompaction(TabletSharedPtr tablet)
|
||||
: Compaction(tablet)
|
||||
{ }
|
||||
|
||||
BaseCompaction::~BaseCompaction() { }
|
||||
|
||||
OLAPStatus BaseCompaction::compact() {
|
||||
if (!_tablet->init_succeeded()) {
|
||||
return OLAP_ERR_INPUT_PARAMETER_ERROR;
|
||||
}
|
||||
|
||||
LOG(INFO) << "init base compaction handler. [tablet=" << tablet->full_name() << "]";
|
||||
|
||||
_tablet = tablet;
|
||||
|
||||
// 1. 尝试取得base compaction的锁
|
||||
if (!_try_base_compaction_lock()) {
|
||||
LOG(WARNING) << "another base compaction is running. tablet=" << tablet->full_name();
|
||||
MutexLock lock(_tablet->get_base_lock(), TRY_LOCK);
|
||||
if (!lock.own_lock()) {
|
||||
LOG(WARNING) << "another base compaction is running. tablet=" << _tablet->full_name();
|
||||
return OLAP_ERR_BE_TRY_BE_LOCK_ERROR;
|
||||
}
|
||||
|
||||
// 2. 检查是否满足base compaction触发策略
|
||||
VLOG(3) << "check whether satisfy base compaction policy.";
|
||||
bool is_policy_satisfied = false;
|
||||
vector<Version> candidate_versions;
|
||||
is_policy_satisfied = _check_whether_satisfy_policy(is_manual_trigger, &candidate_versions);
|
||||
// 1. pick rowsets to compact
|
||||
RETURN_NOT_OK(pick_rowsets_to_compact());
|
||||
|
||||
// 2.1 如果不满足触发策略,则直接释放base compaction锁, 返回错误码
|
||||
if (!is_policy_satisfied) {
|
||||
_release_base_compaction_lock();
|
||||
return OLAP_ERR_BE_NO_SUITABLE_VERSION;
|
||||
}
|
||||
// 2. do base compaction, merge rowsets
|
||||
RETURN_NOT_OK(do_compaction());
|
||||
|
||||
// 2.2 如果满足触发策略,触发base compaction
|
||||
// 不释放base compaction锁, 在run()完成之后再释放
|
||||
if (!_validate_need_merged_versions(candidate_versions)) {
|
||||
LOG(FATAL) << "error! invalid need merged versions";
|
||||
_release_base_compaction_lock();
|
||||
return OLAP_ERR_BE_INVALID_NEED_MERGED_VERSIONS;
|
||||
}
|
||||
// 3. set state to success
|
||||
_state = CompactionState::SUCCESS;
|
||||
|
||||
_need_merged_versions = candidate_versions;
|
||||
// 4. garbage collect input rowsets after base compaction
|
||||
RETURN_NOT_OK(gc_unused_rowsets());
|
||||
|
||||
// 5. add metric to base compaction
|
||||
DorisMetrics::base_compaction_deltas_total.increment(_input_rowsets.size());
|
||||
DorisMetrics::base_compaction_bytes_total.increment(_input_rowsets_size);
|
||||
|
||||
return OLAP_SUCCESS;
|
||||
}
|
||||
|
||||
OLAPStatus BaseCompaction::run() {
|
||||
LOG(INFO) << "start base compaction. tablet=" << _tablet->full_name()
|
||||
<< ", old_base_version=" << _old_base_version.second
|
||||
<< ", new_base_version=" << _new_base_version.second;
|
||||
|
||||
OLAPStatus res = OLAP_SUCCESS;
|
||||
OlapStopWatch stage_watch;
|
||||
|
||||
// 1. 计算新base的version hash
|
||||
VersionHash new_base_version_hash;
|
||||
res = _tablet->compute_all_versions_hash(_need_merged_versions, &new_base_version_hash);
|
||||
if (res != OLAP_SUCCESS) {
|
||||
LOG(WARNING) << "fail to calculate new base version hash. tablet=" << _tablet->full_name()
|
||||
<< ", new_base_version=" << _new_base_version.second;
|
||||
_garbage_collection();
|
||||
return res;
|
||||
OLAPStatus BaseCompaction::pick_rowsets_to_compact() {
|
||||
_input_rowsets.clear();
|
||||
_tablet->pick_candicate_rowsets_to_base_compaction(&_input_rowsets);
|
||||
if (_input_rowsets.size() <= 1) {
|
||||
LOG(WARNING) << "There is no enough rowsets to cumulative compaction."
|
||||
<< ", the size of rowsets to compact=" << _input_rowsets.size()
|
||||
<< ", cumulative_point=" << _tablet->cumulative_layer_point();
|
||||
return OLAP_ERR_CUMULATIVE_NO_SUITABLE_VERSIONS;
|
||||
}
|
||||
|
||||
VLOG(10) << "new_base_version_hash:" << new_base_version_hash;
|
||||
std::sort(_input_rowsets.begin(), _input_rowsets.end(), Rowset::comparator);
|
||||
RETURN_NOT_OK(check_version_continuity(_input_rowsets));
|
||||
|
||||
// 2. 获取生成新base需要的data sources
|
||||
vector<RowsetSharedPtr> rowsets;
|
||||
res = _tablet->capture_consistent_rowsets(_need_merged_versions, &rowsets);
|
||||
if (res != OLAP_SUCCESS) {
|
||||
LOG(WARNING) << "fail to acquire need data sources. tablet=" << _tablet->full_name()
|
||||
<< ", version=" << _new_base_version.second;
|
||||
_garbage_collection();
|
||||
return res;
|
||||
}
|
||||
|
||||
{
|
||||
DorisMetrics::base_compaction_deltas_total.increment(_need_merged_versions.size());
|
||||
int64_t merge_bytes = 0;
|
||||
for (auto& rowset : rowsets) {
|
||||
merge_bytes += rowset->data_disk_size();
|
||||
}
|
||||
DorisMetrics::base_compaction_bytes_total.increment(merge_bytes);
|
||||
}
|
||||
|
||||
// 3. 执行base compaction
|
||||
// 执行过程可能会持续比较长时间
|
||||
stage_watch.reset();
|
||||
RowsetId rowset_id = 0;
|
||||
RETURN_NOT_OK(_tablet->next_rowset_id(&rowset_id));
|
||||
RowsetWriterContext context;
|
||||
context.rowset_id = rowset_id;
|
||||
context.tablet_uid = _tablet->tablet_uid();
|
||||
context.tablet_id = _tablet->tablet_id();
|
||||
context.partition_id = _tablet->partition_id();
|
||||
context.tablet_schema_hash = _tablet->schema_hash();
|
||||
context.rowset_type = ALPHA_ROWSET;
|
||||
context.rowset_path_prefix = _tablet->tablet_path();
|
||||
context.tablet_schema = &(_tablet->tablet_schema());
|
||||
context.rowset_state = VISIBLE;
|
||||
context.data_dir = _tablet->data_dir();
|
||||
context.version = _new_base_version;
|
||||
context.version_hash = new_base_version_hash;
|
||||
|
||||
_rs_writer.reset(new (std::nothrow)AlphaRowsetWriter());
|
||||
if (_rs_writer == nullptr) {
|
||||
LOG(WARNING) << "fail to new rowset.";
|
||||
_garbage_collection();
|
||||
return OLAP_ERR_MALLOC_ERROR;
|
||||
}
|
||||
RETURN_NOT_OK(_rs_writer->init(context));
|
||||
res = _do_base_compaction(new_base_version_hash, rowsets);
|
||||
_tablet->data_dir()->remove_pending_ids(ROWSET_ID_PREFIX + std::to_string(_rs_writer->rowset_id()));
|
||||
// 释放不再使用的ColumnData对象
|
||||
if (res != OLAP_SUCCESS) {
|
||||
LOG(WARNING) << "fail to do base version. tablet=" << _tablet->full_name()
|
||||
<< ", version=" << _new_base_version.second;
|
||||
_garbage_collection();
|
||||
return res;
|
||||
}
|
||||
|
||||
// validate that delete action is right
|
||||
// if error happened, sleep 1 hour. Report a fatal log every 1 minute
|
||||
if (_validate_delete_file_action() != OLAP_SUCCESS) {
|
||||
LOG(WARNING) << "failed to do base compaction. delete action has error.";
|
||||
_garbage_collection();
|
||||
return OLAP_ERR_BE_ERROR_DELETE_ACTION;
|
||||
}
|
||||
|
||||
// 4. make new versions visable.
|
||||
// If success, remove files belong to old versions;
|
||||
// If fail, gc files belong to new versions.
|
||||
vector<RowsetSharedPtr> unused_rowsets;
|
||||
vector<Version> unused_versions;
|
||||
_get_unused_versions(&unused_versions);
|
||||
res = _tablet->capture_consistent_rowsets(unused_versions, &unused_rowsets);
|
||||
if (res != OLAP_SUCCESS) {
|
||||
LOG(WARNING) << "fail to capture consistent rowsets. tablet=" << _tablet->full_name()
|
||||
<< ", version=" << _new_base_version.second;
|
||||
_garbage_collection();
|
||||
return res;
|
||||
}
|
||||
|
||||
res = _update_header(unused_rowsets);
|
||||
if (res != OLAP_SUCCESS) {
|
||||
LOG(WARNING) << "fail to update header. tablet=" << _tablet->full_name()
|
||||
<< ", version=" << _new_base_version.first << "-" << _new_base_version.second;
|
||||
_garbage_collection();
|
||||
return res;
|
||||
}
|
||||
_delete_old_files(&unused_rowsets);
|
||||
|
||||
_release_base_compaction_lock();
|
||||
|
||||
LOG(INFO) << "succeed to do base compaction. tablet=" << _tablet->full_name()
|
||||
<< ", base_version=" << _new_base_version.first << "-" << _new_base_version.second
|
||||
<< ". elapsed time of doing base compaction"
|
||||
<< ", time=" << stage_watch.get_elapse_second() << "s";
|
||||
|
||||
return OLAP_SUCCESS;
|
||||
}
|
||||
|
||||
static bool version_comparator(const Version& lhs, const Version& rhs) {
|
||||
return lhs.second < rhs.second;
|
||||
}
|
||||
|
||||
bool BaseCompaction::_check_whether_satisfy_policy(bool is_manual_trigger,
|
||||
vector<Version>* candidate_versions) {
|
||||
ReadLock rdlock(_tablet->get_header_lock_ptr());
|
||||
int64_t cumulative_layer_point = _tablet->cumulative_layer_point();
|
||||
if (cumulative_layer_point == -1) {
|
||||
LOG(FATAL) << "tablet has an unreasonable cumulative layer point. [tablet='" << _tablet->full_name()
|
||||
<< "' cumulative_layer_point=" << cumulative_layer_point << "]";
|
||||
return false;
|
||||
}
|
||||
|
||||
// 为了后面计算方便,我们在这里先将cumulative_layer_point减1
|
||||
--cumulative_layer_point;
|
||||
|
||||
vector<Version> path_versions;
|
||||
if (OLAP_SUCCESS != _tablet->capture_consistent_versions(Version(0, cumulative_layer_point), &path_versions)) {
|
||||
LOG(WARNING) << "fail to select shortest version path. start=0, end=" << cumulative_layer_point;
|
||||
return false;
|
||||
}
|
||||
|
||||
// base_compaction_layer_point应该为cumulative_layer_point之前,倒数第2个cumulative文件的end version
|
||||
int64_t base_creation_time = 0;
|
||||
size_t base_size = 0;
|
||||
int32_t base_compaction_layer_point = -1;
|
||||
for (unsigned int index = 0; index < path_versions.size(); ++index) {
|
||||
Version temp = path_versions[index];
|
||||
// base文件
|
||||
if (temp.first == 0) {
|
||||
_old_base_version = temp;
|
||||
base_size = _tablet->get_rowset_size_by_version(temp);
|
||||
base_creation_time = _tablet->get_rowset_by_version(temp)->creation_time();
|
||||
continue;
|
||||
}
|
||||
|
||||
if (temp.second == cumulative_layer_point) {
|
||||
base_compaction_layer_point = temp.first - 1;
|
||||
_latest_cumulative = temp;
|
||||
_new_base_version = Version(0, base_compaction_layer_point);
|
||||
}
|
||||
}
|
||||
|
||||
// 只有1个base文件和1个delta文件
|
||||
if (base_compaction_layer_point == -1) {
|
||||
VLOG(3) << "can't do base compaction: no cumulative files."
|
||||
<< "tablet=" << _tablet->full_name()
|
||||
<< ", base_version=0-" << _old_base_version.second
|
||||
<< ", cumulative_layer_point=" << cumulative_layer_point + 1;
|
||||
return false;
|
||||
}
|
||||
|
||||
// 只有1个cumulative文件
|
||||
if (base_compaction_layer_point == _old_base_version.second) {
|
||||
VLOG(3) << "can't do base compaction: only one cumulative file."
|
||||
<< "tablet=" << _tablet->full_name()
|
||||
<< ", base_version=0-" << _old_base_version.second
|
||||
<< ", cumulative_layer_point=" << cumulative_layer_point + 1;
|
||||
return false;
|
||||
}
|
||||
|
||||
// 使用最短路径算法,选择可合并的cumulative版本
|
||||
if (OLAP_SUCCESS != _tablet->capture_consistent_versions(_new_base_version, candidate_versions)) {
|
||||
LOG(WARNING) << "fail to select shortest version path."
|
||||
<< "start=" << _new_base_version.first
|
||||
<< ", end=" << _new_base_version.second;
|
||||
return false;
|
||||
}
|
||||
|
||||
std::sort(candidate_versions->begin(), candidate_versions->end(), version_comparator);
|
||||
|
||||
// 如果是手动执行START_BASE_COMPACTION命令,则不检查base compaction policy,
|
||||
// 也不考虑删除版本过期问题, 只要有可以合并的cumulative,就执行base compaction
|
||||
if (is_manual_trigger) {
|
||||
VLOG(3) << "manual trigger base compaction. tablet=" << _tablet->full_name();
|
||||
return true;
|
||||
}
|
||||
|
||||
// 统计可合并cumulative版本文件的总大小
|
||||
size_t cumulative_total_size = 0;
|
||||
for (vector<Version>::const_iterator version_iter = candidate_versions->begin();
|
||||
version_iter != candidate_versions->end(); ++version_iter) {
|
||||
Version temp = *version_iter;
|
||||
// 跳过base文件
|
||||
if (temp.first == 0) {
|
||||
continue;
|
||||
}
|
||||
// cumulative文件
|
||||
cumulative_total_size += _tablet->get_rowset_size_by_version(temp);
|
||||
}
|
||||
|
||||
// 检查是否满足base compaction的触发条件
|
||||
// 满足以下条件时触发base compaction: 触发条件1 || 触发条件2 || 触发条件3
|
||||
// 触发条件1:cumulative文件个数超过一个阈值
|
||||
const uint32_t base_compaction_num_cumulative_deltas
|
||||
= config::base_compaction_num_cumulative_deltas;
|
||||
// candidate_versions中包含base文件,所以这里减1
|
||||
if (candidate_versions->size() - 1 >= base_compaction_num_cumulative_deltas) {
|
||||
// 1. cumulative rowset must reach base_compaction_num_cumulative_deltas threshold
|
||||
if (_input_rowsets.size() > config::base_compaction_num_cumulative_deltas) {
|
||||
LOG(INFO) << "satisfy the base compaction policy. tablet="<< _tablet->full_name()
|
||||
<< ", num_cumulative_deltas=" << candidate_versions->size() - 1
|
||||
<< ", base_compaction_num_cumulative_deltas=" << base_compaction_num_cumulative_deltas;
|
||||
return true;
|
||||
<< ", num_cumulative_rowsets=" << _input_rowsets.size() - 1
|
||||
<< ", base_compaction_num_cumulative_rowsets=" << config::base_compaction_num_cumulative_deltas;
|
||||
return OLAP_SUCCESS;
|
||||
}
|
||||
|
||||
// 触发条件2:所有cumulative文件的大小超过base文件大小的某一比例
|
||||
const double base_cumulative_delta_ratio = config::base_cumulative_delta_ratio;
|
||||
// 2. the ratio between base rowset and all input cumulative rowsets reachs the threshold
|
||||
int64_t base_size = 0;
|
||||
int64_t cumulative_total_size = 0;
|
||||
for (auto& rowset : _input_rowsets) {
|
||||
if (rowset->start_version() != 0) {
|
||||
cumulative_total_size += rowset->data_disk_size();
|
||||
} else {
|
||||
base_size = rowset->data_disk_size();
|
||||
}
|
||||
}
|
||||
|
||||
double base_cumulative_delta_ratio = config::base_cumulative_delta_ratio;
|
||||
double cumulative_base_ratio = static_cast<double>(cumulative_total_size) / base_size;
|
||||
|
||||
if (cumulative_base_ratio > base_cumulative_delta_ratio) {
|
||||
LOG(INFO) << "satisfy the base compaction policy. tablet=" << _tablet->full_name()
|
||||
<< ", cumualtive_total_size=" << cumulative_total_size
|
||||
<< ", base_size=" << base_size
|
||||
<< ", cumulative_base_ratio=" << cumulative_base_ratio
|
||||
<< ", policy_ratio=" << base_cumulative_delta_ratio;
|
||||
return true;
|
||||
<< ", cumualtive_total_size=" << cumulative_total_size
|
||||
<< ", base_size=" << base_size
|
||||
<< ", cumulative_base_ratio=" << cumulative_base_ratio
|
||||
<< ", policy_ratio=" << base_cumulative_delta_ratio;
|
||||
return OLAP_SUCCESS;
|
||||
}
|
||||
|
||||
// 触发条件3:距离上一次进行base compaction已经超过设定的间隔时间
|
||||
const uint32_t interval_since_last_operation = config::base_compaction_interval_seconds_since_last_operation;
|
||||
int64_t interval_since_last_be = time(NULL) - base_creation_time;
|
||||
if (interval_since_last_be > interval_since_last_operation) {
|
||||
// 3. the interval since last base compaction reachs the threshold
|
||||
int64_t base_creation_time = _input_rowsets[0]->creation_time();
|
||||
int64_t interval_threshold = config::base_compaction_interval_seconds_since_last_operation;
|
||||
int64_t interval_since_last_base_compaction = time(NULL) - base_creation_time;
|
||||
if (interval_since_last_base_compaction > interval_threshold) {
|
||||
LOG(INFO) << "satisfy the base compaction policy. tablet=" << _tablet->full_name()
|
||||
<< ", interval_since_last_be=" << interval_since_last_be
|
||||
<< ", policy_interval=" << interval_since_last_operation;
|
||||
return true;
|
||||
<< ", interval_since_last_base_compaction=" << interval_since_last_base_compaction
|
||||
<< ", interval_threshold=" << interval_threshold;
|
||||
return OLAP_SUCCESS;
|
||||
}
|
||||
|
||||
VLOG(3) << "don't satisfy the base compaction policy. tablet=" << _tablet->full_name()
|
||||
<< ", cumulative_files_number=" << candidate_versions->size() - 1
|
||||
<< ", cumulative_base_ratio=" << cumulative_base_ratio
|
||||
<< ", interval_since_last_be=" << interval_since_last_be;
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
OLAPStatus BaseCompaction::_do_base_compaction(VersionHash new_base_version_hash,
|
||||
const vector<RowsetSharedPtr>& rowsets) {
|
||||
OlapStopWatch watch;
|
||||
vector<RowsetReaderSharedPtr> rs_readers;
|
||||
for (auto& rowset : rowsets) {
|
||||
RowsetReaderSharedPtr rs_reader(rowset->create_reader());
|
||||
if (rs_reader == nullptr) {
|
||||
LOG(WARNING) << "rowset create reader failed. rowset:" << rowset->rowset_id();
|
||||
return OLAP_ERR_ROWSET_CREATE_READER;
|
||||
}
|
||||
rs_readers.push_back(rs_reader);
|
||||
}
|
||||
|
||||
LOG(INFO) << "start merge new base. tablet=" << _tablet->full_name()
|
||||
<< ", version=" << _new_base_version.second;
|
||||
// 2. 执行base compaction的merge
|
||||
// 注意:无论是行列存,还是列存,在执行merge时都使用Merger类,不能使用MassiveMerger。
|
||||
// 原因:MassiveMerger中的base文件不是通过Reader读取的,所以会导致删除条件失效,
|
||||
// 无法达到删除数据的目的
|
||||
// 想法:如果一定要使用MassiveMerger,这里可以提供一种方案
|
||||
// 1. 在此处加一个检查,检测此次BE是否包含删除条件, 即检查Reader中
|
||||
// ReaderParams的delete_handler
|
||||
// 2. 如果包含删除条件,则不使用MassiveMerger,使用Merger
|
||||
// 3. 如果不包含删除条件,则可以使用MassiveMerger
|
||||
uint64_t merged_rows = 0;
|
||||
uint64_t filted_rows = 0;
|
||||
OLAPStatus res = OLAP_SUCCESS;
|
||||
|
||||
Merger merger(_tablet, _rs_writer, READER_BASE_COMPACTION);
|
||||
res = merger.merge(rs_readers, &merged_rows, &filted_rows);
|
||||
// 3. 如果merge失败,执行清理工作,返回错误码退出
|
||||
if (res != OLAP_SUCCESS) {
|
||||
LOG(WARNING) << "fail to make new base version. res=" << res
|
||||
<< ", tablet=" << _tablet->full_name()
|
||||
<< ", version=" << _new_base_version.first
|
||||
<< "-" << _new_base_version.second;
|
||||
return OLAP_ERR_BE_MERGE_ERROR;
|
||||
}
|
||||
RowsetSharedPtr new_base = _rs_writer->build();
|
||||
if (new_base == nullptr) {
|
||||
LOG(WARNING) << "rowset writer build failed. writer version:"
|
||||
<< _rs_writer->version().first << "-" << _rs_writer->version().second;
|
||||
return OLAP_ERR_MALLOC_ERROR;
|
||||
}
|
||||
|
||||
// 4. 如果merge成功,则将新base文件对应的olap index载入
|
||||
_new_rowsets.push_back(new_base);
|
||||
|
||||
VLOG(10) << "merge new base success, start load index. tablet=" << _tablet->full_name()
|
||||
<< ", version=" << _new_base_version.second;
|
||||
|
||||
// Check row num changes
|
||||
uint64_t source_rows = 0;
|
||||
for (auto& rowset : rowsets) {
|
||||
source_rows += rowset->num_rows();
|
||||
}
|
||||
bool row_nums_check = config::row_nums_check;
|
||||
if (row_nums_check) {
|
||||
if (source_rows != new_base->num_rows() + merged_rows + filted_rows) {
|
||||
LOG(WARNING) << "fail to check row num!"
|
||||
<< "source_rows=" << source_rows
|
||||
<< ", merged_rows=" << merged_rows
|
||||
<< ", filted_rows=" << filted_rows
|
||||
<< ", new_index_rows=" << new_base->num_rows();
|
||||
return OLAP_ERR_CHECK_LINES_ERROR;
|
||||
}
|
||||
} else {
|
||||
LOG(INFO) << "all row nums."
|
||||
<< "source_rows=" << source_rows
|
||||
<< ", merged_rows=" << merged_rows
|
||||
<< ", filted_rows=" << filted_rows
|
||||
<< ", new_index_rows=" << new_base->num_rows()
|
||||
<< ", merged_version_num=" << _need_merged_versions.size()
|
||||
<< ", time_us=" << watch.get_elapse_time_us();
|
||||
}
|
||||
|
||||
return OLAP_SUCCESS;
|
||||
}
|
||||
|
||||
OLAPStatus BaseCompaction::_update_header(const vector<RowsetSharedPtr>& unused_rowsets) {
|
||||
WriteLock wrlock(_tablet->get_header_lock_ptr());
|
||||
|
||||
OLAPStatus res = OLAP_SUCCESS;
|
||||
// 由于在modify_rowsets中可能会发生很小概率的非事务性失败, 因此这里定位FATAL错误
|
||||
res = _tablet->modify_rowsets(_new_rowsets, unused_rowsets);
|
||||
if (res != OLAP_SUCCESS) {
|
||||
LOG(FATAL) << "fail to replace data sources. res" << res
|
||||
<< ", tablet=" << _tablet->full_name()
|
||||
<< ", new_base_version=" << _new_base_version.second
|
||||
<< ", old_base_verison=" << _old_base_version.second;
|
||||
return res;
|
||||
}
|
||||
|
||||
// 如果保存Header失败, 所有新增的信息会在下次启动时丢失, 属于严重错误
|
||||
// 暂时没办法做很好的处理,报FATAL
|
||||
res = _tablet->save_meta();
|
||||
if (res != OLAP_SUCCESS) {
|
||||
LOG(FATAL) << "fail to save tablet meta. res=" << res
|
||||
<< ", tablet=" << _tablet->full_name()
|
||||
<< ", new_base_version=" << _new_base_version.second
|
||||
<< ", old_base_version=" << _old_base_version.second;
|
||||
return OLAP_ERR_BE_SAVE_HEADER_ERROR;
|
||||
}
|
||||
_new_rowsets.clear();
|
||||
|
||||
return OLAP_SUCCESS;
|
||||
}
|
||||
|
||||
void BaseCompaction::_delete_old_files(vector<RowsetSharedPtr>* unused_indices) {
|
||||
if (!unused_indices->empty()) {
|
||||
StorageEngine* storage_engine = StorageEngine::instance();
|
||||
|
||||
for (vector<RowsetSharedPtr>::iterator it = unused_indices->begin();
|
||||
it != unused_indices->end(); ++it) {
|
||||
storage_engine->add_unused_rowset(*it);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void BaseCompaction::_garbage_collection() {
|
||||
// 清理掉已生成的版本文件
|
||||
StorageEngine* storage_engine = StorageEngine::instance();
|
||||
for (vector<RowsetSharedPtr>::iterator it = _new_rowsets.begin();
|
||||
it != _new_rowsets.end(); ++it) {
|
||||
storage_engine->add_unused_rowset(*it);
|
||||
}
|
||||
_new_rowsets.clear();
|
||||
|
||||
_release_base_compaction_lock();
|
||||
}
|
||||
|
||||
bool BaseCompaction::_validate_need_merged_versions(
|
||||
const vector<Version>& candidate_versions) {
|
||||
if (candidate_versions.size() <= 1) {
|
||||
LOG(WARNING) << "unenough versions need to be merged. size=" << candidate_versions.size();
|
||||
return false;
|
||||
}
|
||||
|
||||
// 1. validate versions in candidate_versions are continuous
|
||||
// Skip the first element
|
||||
for (unsigned int index = 1; index < candidate_versions.size(); ++index) {
|
||||
Version previous_version = candidate_versions[index - 1];
|
||||
Version current_version = candidate_versions[index];
|
||||
if (current_version.first != previous_version.second + 1) {
|
||||
LOG(WARNING) << "wrong need merged version. "
|
||||
<< "previous_version=" << previous_version.first
|
||||
<< "-" << previous_version.second
|
||||
<< ", current_version=" << current_version.first
|
||||
<< "-" << current_version.second;
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
// 2. validate m_new_base_version is OK
|
||||
if (_new_base_version.first != 0
|
||||
|| _new_base_version.first != candidate_versions.begin()->first
|
||||
|| _new_base_version.second != candidate_versions.rbegin()->second) {
|
||||
LOG(WARNING) << "new_base_version is wrong."
|
||||
<< " new_base_version=" << _new_base_version.first
|
||||
<< "-" << _new_base_version.second
|
||||
<< ", vector_version=" << candidate_versions.begin()->first
|
||||
<< "-" << candidate_versions.rbegin()->second;
|
||||
return false;
|
||||
}
|
||||
|
||||
VLOG(10) << "valid need merged version";
|
||||
return true;
|
||||
}
|
||||
|
||||
OLAPStatus BaseCompaction::_validate_delete_file_action() {
|
||||
// 1. acquire the latest version to make sure all is right after deleting files
|
||||
ReadLock rdlock(_tablet->get_header_lock_ptr());
|
||||
const RowsetSharedPtr rowset = _tablet->rowset_with_max_version();
|
||||
if (rowset == nullptr) {
|
||||
LOG(INFO) << "version is -1 when validate_delete_file_action";
|
||||
return OLAP_ERR_BE_ERROR_DELETE_ACTION;
|
||||
}
|
||||
Version spec_version = Version(0, rowset->end_version());
|
||||
vector<RowsetReaderSharedPtr> rs_readers;
|
||||
_tablet->capture_rs_readers(spec_version, &rs_readers);
|
||||
|
||||
if (rs_readers.empty()) {
|
||||
LOG(INFO) << "acquire data sources failed. version="
|
||||
<< spec_version.first << "-" << spec_version.second;
|
||||
return OLAP_ERR_BE_ERROR_DELETE_ACTION;
|
||||
}
|
||||
|
||||
VLOG(3) << "delete file action is OK";
|
||||
|
||||
return OLAP_SUCCESS;
|
||||
LOG(INFO) << "don't satisfy the base compaction policy. tablet=" << _tablet->full_name()
|
||||
<< ", num_cumulative_rowsets=" << _input_rowsets.size() - 1
|
||||
<< ", cumulative_base_ratio=" << cumulative_base_ratio
|
||||
<< ", interval_since_last_base_compaction=" << interval_since_last_base_compaction;
|
||||
return OLAP_ERR_BE_NO_SUITABLE_VERSION;
|
||||
}
|
||||
|
||||
} // namespace doris
|
||||
|
||||
@ -18,167 +18,33 @@
|
||||
#ifndef DORIS_BE_SRC_OLAP_BASE_COMPACTION_H
|
||||
#define DORIS_BE_SRC_OLAP_BASE_COMPACTION_H
|
||||
|
||||
#include <map>
|
||||
#include <string>
|
||||
|
||||
#include "olap/olap_common.h"
|
||||
#include "olap/olap_define.h"
|
||||
#include "olap/tablet.h"
|
||||
#include "rowset/rowset_id_generator.h"
|
||||
#include "rowset/alpha_rowset_writer.h"
|
||||
#include "olap/compaction.h"
|
||||
|
||||
namespace doris {
|
||||
|
||||
class Rowset;
|
||||
class RowsetReader;
|
||||
// BaseCompaction is derived from Compaction.
|
||||
// BaseCompaction will implements
|
||||
// 1. its policy to pick rowsests
|
||||
// 2. do compaction to produce new rowset.
|
||||
|
||||
// @brief 实现对START_BASE_COMPACTION命令的处理逻辑,并返回处理结果
|
||||
class BaseCompaction {
|
||||
class BaseCompaction : public Compaction {
|
||||
public:
|
||||
BaseCompaction() :
|
||||
_new_base_version(0, 0),
|
||||
_old_base_version(0, 0),
|
||||
_base_compaction_locked(false),
|
||||
_rs_writer(nullptr) {}
|
||||
BaseCompaction(TabletSharedPtr tablet);
|
||||
~BaseCompaction() override;
|
||||
|
||||
virtual ~BaseCompaction() {
|
||||
_release_base_compaction_lock();
|
||||
OLAPStatus compact() override;
|
||||
|
||||
protected:
|
||||
OLAPStatus pick_rowsets_to_compact() override;
|
||||
std::string compaction_name() const override {
|
||||
return "base compaction";
|
||||
}
|
||||
|
||||
// 初始化BaseCompaction, 主要完成以下工作:
|
||||
// 1. 检查是否满足base compaction策略
|
||||
// 2. 如果满足,计算需要合并哪些版本
|
||||
//
|
||||
// 输入参数:
|
||||
// - tablet: 待执行BE的Tablet的智能指针
|
||||
// - is_manual_trigger
|
||||
// - 如果为true,则是手动执行START_BASE_COMPACTION命令
|
||||
// - 如果为false,则是根据BE策略来执行
|
||||
//
|
||||
// 返回值:
|
||||
// - 如果init执行成功,即可以执行BE,则返回OLAP_SUCCESS;
|
||||
// - 其它情况下,返回相应的错误码
|
||||
OLAPStatus init(TabletSharedPtr tablet, bool is_manual_trigger = false);
|
||||
|
||||
// 执行BaseCompaction, 可能会持续很长时间
|
||||
//
|
||||
// 返回值:
|
||||
// - 如果执行成功,则返回OLAP_SUCCESS;
|
||||
// - 其它情况下,返回相应的错误码
|
||||
OLAPStatus run();
|
||||
ReaderType compaction_type() const override {
|
||||
return ReaderType::READER_BASE_COMPACTION;
|
||||
}
|
||||
|
||||
private:
|
||||
// 检验当前情况是否满足base compaction的触发策略
|
||||
//
|
||||
// 输入参数:
|
||||
// - is_manual_trigger: 是否是手动执行START_BASE_COMPACTION命令
|
||||
// 输出参数
|
||||
// - candidate_versions: BE可合并的cumulative文件
|
||||
//
|
||||
// 返回值:
|
||||
// - 如果满足触发策略,返回true
|
||||
// - 如果不满足,返回false
|
||||
bool _check_whether_satisfy_policy(bool is_manual_trigger,
|
||||
std::vector<Version>* candidate_versions);
|
||||
|
||||
// 生成新的Base
|
||||
//
|
||||
// 输入参数:
|
||||
// - new_base_version_hash: 新Base的VersionHash
|
||||
// - rs_readers : 生成新Base需要的RowsetReaders*
|
||||
// - row_count: 生成Base过程中产生的row_count
|
||||
//
|
||||
// 返回值:
|
||||
// - 如果执行成功,则返回OLAP_SUCCESS;
|
||||
// - 其它情况下,返回相应的错误码
|
||||
OLAPStatus _do_base_compaction(VersionHash new_base_version_hash,
|
||||
const std::vector<RowsetSharedPtr>& rowsets);
|
||||
|
||||
// 更新Header使得修改对外可见
|
||||
// 输入参数:
|
||||
// - unused_rowsets: 需要被物理删除的Rowset*
|
||||
//
|
||||
// 返回值:
|
||||
// - 如果执行成功,则返回OLAP_SUCCESS;
|
||||
// - 其它情况下,返回相应的错误码
|
||||
OLAPStatus _update_header(const std::vector<RowsetSharedPtr>& unused_rowsets);
|
||||
|
||||
// 删除不再使用的Rowset
|
||||
//
|
||||
// 输入参数:
|
||||
// - unused_rowsets: 需要被物理删除的Rowset*
|
||||
//
|
||||
// 返回值:
|
||||
// - 如果执行成功,则返回OLAP_SUCCESS;
|
||||
// - 其它情况下,返回相应的错误码
|
||||
void _delete_old_files(std::vector<RowsetSharedPtr>* unused_indices);
|
||||
|
||||
// 其它函数执行失败时,调用该函数进行清理工作
|
||||
void _garbage_collection();
|
||||
|
||||
// 验证得到的candidate_versions是否正确
|
||||
//
|
||||
// 返回值:
|
||||
// - 如果错误,返回false
|
||||
// - 如果正确,返回true
|
||||
bool _validate_need_merged_versions(const std::vector<Version>& candidate_versions);
|
||||
|
||||
// 验证删除文件操作是否正确
|
||||
//
|
||||
// 返回值:
|
||||
// - 如果错误,返回OLAP_ERR_BE_ERROR_DELETE_ACTION
|
||||
// - 如果正确,返回OLAP_SUCCESS
|
||||
OLAPStatus _validate_delete_file_action();
|
||||
|
||||
void _get_unused_versions(std::vector<Version>* unused_versions) {
|
||||
unused_versions->clear();
|
||||
|
||||
std::vector<Version> all_versions;
|
||||
_tablet->list_versions(&all_versions);
|
||||
for (std::vector<Version>::const_iterator iter = all_versions.begin();
|
||||
iter != all_versions.end(); ++iter) {
|
||||
if (iter->first <= _new_base_version.second) {
|
||||
unused_versions->push_back(*iter);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 根据version.second值,比较2个version的大小
|
||||
static bool _version_compare(const Version& left, const Version& right) {
|
||||
return left.second < right.second;
|
||||
}
|
||||
|
||||
bool _try_base_compaction_lock() {
|
||||
if (_tablet->try_base_compaction_lock()) {
|
||||
_base_compaction_locked = true;
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
void _release_base_compaction_lock() {
|
||||
if (_base_compaction_locked) {
|
||||
_tablet->release_base_compaction_lock();
|
||||
_base_compaction_locked = false;
|
||||
}
|
||||
}
|
||||
|
||||
// 需要进行操作的Table指针
|
||||
TabletSharedPtr _tablet;
|
||||
// 新base的version
|
||||
Version _new_base_version;
|
||||
// 现有base的version
|
||||
Version _old_base_version;
|
||||
// 现有的版本号最大的cumulative
|
||||
Version _latest_cumulative;
|
||||
// 在此次base compaction执行过程中,将被合并的cumulative文件版本
|
||||
std::vector<Version> _need_merged_versions;
|
||||
// 需要新增的版本对应Rowset的
|
||||
std::vector<RowsetSharedPtr> _new_rowsets;
|
||||
bool _base_compaction_locked;
|
||||
RowsetWriterSharedPtr _rs_writer;
|
||||
|
||||
DISALLOW_COPY_AND_ASSIGN(BaseCompaction);
|
||||
};
|
||||
|
||||
|
||||
185
be/src/olap/compaction.cpp
Normal file
185
be/src/olap/compaction.cpp
Normal file
@ -0,0 +1,185 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
#include "olap/compaction.h"
|
||||
|
||||
using std::vector;
|
||||
|
||||
namespace doris {
|
||||
|
||||
Compaction::Compaction(TabletSharedPtr tablet)
|
||||
: _tablet(tablet),
|
||||
_input_rowsets_size(0),
|
||||
_input_row_num(0),
|
||||
_state(CompactionState::INITED)
|
||||
{ }
|
||||
|
||||
Compaction::~Compaction() { }
|
||||
|
||||
OLAPStatus Compaction::do_compaction() {
|
||||
LOG(INFO) << "start " << compaction_name() << ". tablet=" << _tablet->full_name();
|
||||
|
||||
OlapStopWatch watch;
|
||||
|
||||
// 1. prepare cumulative_version and cumulative_version
|
||||
_output_version = Version(_input_rowsets.front()->start_version(), _input_rowsets.back()->end_version());
|
||||
_tablet->compute_version_hash_from_rowsets(_input_rowsets, &_output_version_hash);
|
||||
|
||||
RETURN_NOT_OK(construct_output_rowset_writer());
|
||||
RETURN_NOT_OK(construct_input_rowset_readers());
|
||||
|
||||
Merger merger(_tablet, compaction_type(), _output_rs_writer, _input_rs_readers);
|
||||
OLAPStatus res = merger.merge();
|
||||
|
||||
// 2. 如果merge失败,执行清理工作,返回错误码退出
|
||||
if (res != OLAP_SUCCESS) {
|
||||
LOG(WARNING) << "fail to do " << compaction_name()
|
||||
<< ". res=" << res
|
||||
<< ", tablet=" << _tablet->full_name()
|
||||
<< ", output_version=" << _output_version.first
|
||||
<< "-" << _output_version.second;
|
||||
return res;
|
||||
}
|
||||
|
||||
_output_rowset = _output_rs_writer->build();
|
||||
if (_output_rowset == nullptr) {
|
||||
LOG(WARNING) << "rowset writer build failed. writer version:"
|
||||
<< ", output_version=" << _output_version.first
|
||||
<< "-" << _output_version.second;
|
||||
return OLAP_ERR_MALLOC_ERROR;
|
||||
}
|
||||
|
||||
// 3. check correctness
|
||||
RETURN_NOT_OK(check_correctness(merger));
|
||||
|
||||
// 4. modify rowsets in memory
|
||||
RETURN_NOT_OK(modify_rowsets());
|
||||
|
||||
LOG(INFO) << "succeed to do " << compaction_name()
|
||||
<< ". tablet=" << _tablet->full_name()
|
||||
<< ", output_version=" << _output_version.first
|
||||
<< "-" << _output_version.second
|
||||
<< ". elapsed time=" << watch.get_elapse_second() << "s.";
|
||||
|
||||
return OLAP_SUCCESS;
|
||||
}
|
||||
|
||||
OLAPStatus Compaction::construct_output_rowset_writer() {
|
||||
RowsetId rowset_id = 0;
|
||||
RETURN_NOT_OK(_tablet->next_rowset_id(&rowset_id));
|
||||
RowsetWriterContext context;
|
||||
context.rowset_id = rowset_id;
|
||||
context.tablet_uid = _tablet->tablet_uid();
|
||||
context.tablet_id = _tablet->tablet_id();
|
||||
context.partition_id = _tablet->partition_id();
|
||||
context.tablet_schema_hash = _tablet->schema_hash();
|
||||
context.rowset_type = ALPHA_ROWSET;
|
||||
context.rowset_path_prefix = _tablet->tablet_path();
|
||||
context.tablet_schema = &(_tablet->tablet_schema());
|
||||
context.rowset_state = VISIBLE;
|
||||
context.data_dir = _tablet->data_dir();
|
||||
context.version = _output_version;
|
||||
context.version_hash = _output_version_hash;
|
||||
|
||||
_output_rs_writer.reset(new (std::nothrow)AlphaRowsetWriter());
|
||||
RETURN_NOT_OK(_output_rs_writer->init(context));
|
||||
return OLAP_SUCCESS;
|
||||
}
|
||||
|
||||
OLAPStatus Compaction::construct_input_rowset_readers() {
|
||||
for (auto& rowset : _input_rowsets) {
|
||||
RowsetReaderSharedPtr rs_reader(rowset->create_reader());
|
||||
if (rs_reader == nullptr) {
|
||||
LOG(WARNING) << "rowset create reader failed. rowset:" << rowset->rowset_id();
|
||||
return OLAP_ERR_ROWSET_CREATE_READER;
|
||||
}
|
||||
_input_rs_readers.push_back(rs_reader);
|
||||
}
|
||||
return OLAP_SUCCESS;
|
||||
}
|
||||
|
||||
OLAPStatus Compaction::modify_rowsets() {
|
||||
std::vector<RowsetSharedPtr> output_rowsets;
|
||||
output_rowsets.push_back(_output_rowset);
|
||||
|
||||
WriteLock wrlock(_tablet->get_header_lock_ptr());
|
||||
OLAPStatus res = _tablet->modify_rowsets(output_rowsets, _input_rowsets);
|
||||
if (res != OLAP_SUCCESS) {
|
||||
LOG(FATAL) << "fail to replace data sources. res" << res
|
||||
<< ", tablet=" << _tablet->full_name()
|
||||
<< ", compaction__version=" << _output_version.first
|
||||
<< "-" << _output_version.second;
|
||||
return res;
|
||||
}
|
||||
|
||||
res = _tablet->save_meta();
|
||||
if (res != OLAP_SUCCESS) {
|
||||
LOG(FATAL) << "fail to save tablet meta. res=" << res
|
||||
<< ", tablet=" << _tablet->full_name()
|
||||
<< ", compaction_version=" << _output_version.first
|
||||
<< "-" << _output_version.second;
|
||||
return OLAP_ERR_BE_SAVE_HEADER_ERROR;
|
||||
}
|
||||
|
||||
return OLAP_SUCCESS;
|
||||
}
|
||||
|
||||
OLAPStatus Compaction::gc_unused_rowsets() {
|
||||
StorageEngine* storage_engine = StorageEngine::instance();
|
||||
if (_state != CompactionState::SUCCESS) {
|
||||
storage_engine->add_unused_rowset(_output_rowset);
|
||||
return OLAP_SUCCESS;
|
||||
}
|
||||
for (auto& rowset : _input_rowsets) {
|
||||
storage_engine->add_unused_rowset(rowset);
|
||||
}
|
||||
return OLAP_SUCCESS;
|
||||
}
|
||||
|
||||
OLAPStatus Compaction::check_version_continuity(const vector<RowsetSharedPtr>& rowsets) {
|
||||
RowsetSharedPtr prev_rowset = rowsets.front();
|
||||
for (size_t i = 1; i < rowsets.size(); ++i) {
|
||||
RowsetSharedPtr rowset = rowsets[i];
|
||||
if (rowset->start_version() != prev_rowset->end_version() + 1) {
|
||||
LOG(WARNING) << "There are missed versions among rowsets. "
|
||||
<< "prev_rowset verison=" << prev_rowset->start_version()
|
||||
<< "-" << prev_rowset->end_version()
|
||||
<< ", rowset version=" << rowset->start_version()
|
||||
<< "-" << rowset->end_version();
|
||||
return OLAP_ERR_CUMULATIVE_MISS_VERSION;
|
||||
}
|
||||
prev_rowset = rowset;
|
||||
}
|
||||
|
||||
return OLAP_SUCCESS;
|
||||
}
|
||||
|
||||
OLAPStatus Compaction::check_correctness(const Merger& merger) {
|
||||
// 1. check row number
|
||||
if (_input_row_num != _output_rowset->num_rows() + merger.merged_rows() + merger.filted_rows()) {
|
||||
LOG(FATAL) << "row_num does not match between cumulative input and output! "
|
||||
<< "input_row_num=" << _input_row_num
|
||||
<< ", merged_row_num=" << merger.merged_rows()
|
||||
<< ", filted_row_num=" << merger.filted_rows()
|
||||
<< ", output_row_num=" << _output_rowset->num_rows();
|
||||
return OLAP_ERR_CHECK_LINES_ERROR;
|
||||
}
|
||||
|
||||
return OLAP_SUCCESS;
|
||||
}
|
||||
|
||||
} // namespace doris
|
||||
91
be/src/olap/compaction.h
Normal file
91
be/src/olap/compaction.h
Normal file
@ -0,0 +1,91 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
#ifndef DORIS_BE_SRC_OLAP_COMPACTION_H
|
||||
#define DORIS_BE_SRC_OLAP_COMPACTION_H
|
||||
|
||||
#include <vector>
|
||||
|
||||
#include "olap/merger.h"
|
||||
#include "olap/olap_common.h"
|
||||
#include "olap/olap_define.h"
|
||||
#include "olap/storage_engine.h"
|
||||
#include "olap/tablet.h"
|
||||
#include "olap/tablet_meta.h"
|
||||
#include "olap/utils.h"
|
||||
#include "rowset/alpha_rowset_writer.h"
|
||||
#include "rowset/rowset_id_generator.h"
|
||||
|
||||
namespace doris {
|
||||
|
||||
class Merger;
|
||||
|
||||
// This class is a base class for compaction.
|
||||
// The entrance of this class is compact()
|
||||
// Any compaction should go through four procedures.
|
||||
// 1. pick rowsets satisfied to compact
|
||||
// 2. do compaction
|
||||
// 3. modify rowsets
|
||||
// 4. gc unused rowstes
|
||||
class Compaction {
|
||||
public:
|
||||
Compaction(TabletSharedPtr tablet);
|
||||
virtual ~Compaction();
|
||||
|
||||
virtual OLAPStatus compact() = 0;
|
||||
|
||||
protected:
|
||||
virtual OLAPStatus pick_rowsets_to_compact() = 0;
|
||||
virtual std::string compaction_name() const = 0;
|
||||
virtual ReaderType compaction_type() const = 0;
|
||||
|
||||
OLAPStatus do_compaction();
|
||||
OLAPStatus modify_rowsets();
|
||||
OLAPStatus gc_unused_rowsets();
|
||||
|
||||
OLAPStatus construct_output_rowset_writer();
|
||||
OLAPStatus construct_input_rowset_readers();
|
||||
|
||||
OLAPStatus check_version_continuity(const std::vector<RowsetSharedPtr>& rowsets);
|
||||
OLAPStatus check_correctness(const Merger& merger);
|
||||
|
||||
protected:
|
||||
TabletSharedPtr _tablet;
|
||||
|
||||
std::vector<RowsetSharedPtr> _input_rowsets;
|
||||
std::vector<RowsetReaderSharedPtr> _input_rs_readers;
|
||||
int64_t _input_rowsets_size;
|
||||
int64_t _input_row_num;
|
||||
|
||||
RowsetSharedPtr _output_rowset;
|
||||
RowsetWriterSharedPtr _output_rs_writer;
|
||||
|
||||
enum CompactionState {
|
||||
INITED = 0,
|
||||
SUCCESS = 1
|
||||
};
|
||||
CompactionState _state;
|
||||
|
||||
Version _output_version;
|
||||
VersionHash _output_version_hash;
|
||||
|
||||
DISALLOW_COPY_AND_ASSIGN(Compaction);
|
||||
};
|
||||
|
||||
} // namespace doris
|
||||
|
||||
#endif // DORIS_BE_SRC_OLAP_COMPACTION_H
|
||||
@ -16,553 +16,110 @@
|
||||
// under the License.
|
||||
|
||||
#include "olap/cumulative_compaction.h"
|
||||
|
||||
#include <algorithm>
|
||||
#include <list>
|
||||
#include <vector>
|
||||
|
||||
#include "olap/storage_engine.h"
|
||||
#include "util/doris_metrics.h"
|
||||
#include "olap/rowset/alpha_rowset_writer.h"
|
||||
|
||||
using std::list;
|
||||
using std::nothrow;
|
||||
using std::sort;
|
||||
using std::vector;
|
||||
|
||||
namespace doris {
|
||||
|
||||
OLAPStatus CumulativeCompaction::init(TabletSharedPtr tablet) {
|
||||
LOG(INFO) << "init cumulative compaction handler. tablet=" << tablet->full_name();
|
||||
CumulativeCompaction::CumulativeCompaction(TabletSharedPtr tablet)
|
||||
: Compaction(tablet),
|
||||
_cumulative_rowset_size_threshold(config::cumulative_compaction_budgeted_bytes)
|
||||
{ }
|
||||
|
||||
if (_is_init) {
|
||||
LOG(WARNING) << "cumulative handler has been inited. tablet=" << tablet->full_name();
|
||||
return OLAP_ERR_CUMULATIVE_REPEAT_INIT;
|
||||
}
|
||||
CumulativeCompaction::~CumulativeCompaction() { }
|
||||
|
||||
if (!tablet->init_succeeded()) {
|
||||
OLAPStatus CumulativeCompaction::compact() {
|
||||
if (!_tablet->init_succeeded()) {
|
||||
return OLAP_ERR_CUMULATIVE_INVALID_PARAMETERS;
|
||||
}
|
||||
|
||||
_tablet = tablet;
|
||||
_max_delta_file_size = config::cumulative_compaction_budgeted_bytes;
|
||||
|
||||
if (!_tablet->try_cumulative_lock()) {
|
||||
LOG(INFO) << "skip compaction, because another cumulative is running. tablet=" << _tablet->full_name();
|
||||
MutexLock lock(_tablet->get_cumulative_lock(), TRY_LOCK);
|
||||
if (!lock.own_lock()) {
|
||||
LOG(INFO) << "The tablet is under cumulative compaction. tablet=" << _tablet->full_name();
|
||||
return OLAP_ERR_CE_TRY_CE_LOCK_ERROR;
|
||||
}
|
||||
|
||||
_tablet->obtain_header_rdlock();
|
||||
_old_cumulative_layer_point = _tablet->cumulative_layer_point();
|
||||
_tablet->release_header_lock();
|
||||
// 如果为-1,则该table之前没有设置过cumulative layer point
|
||||
// 我们在这里设置一下
|
||||
if (_old_cumulative_layer_point == -1) {
|
||||
LOG(INFO) << "tablet has an unreasonable cumulative layer point. tablet=" << _tablet->full_name()
|
||||
<< ", cumulative_layer_point=" << _old_cumulative_layer_point;
|
||||
_tablet->release_cumulative_lock();
|
||||
return OLAP_ERR_CUMULATIVE_INVALID_PARAMETERS;
|
||||
}
|
||||
// 1.calculate cumulative point
|
||||
RETURN_NOT_OK(_tablet->calculate_cumulative_point());
|
||||
|
||||
_tablet->obtain_header_wrlock();
|
||||
OLAPStatus res = _calculate_need_merged_versions();
|
||||
_tablet->release_header_lock();
|
||||
if (res != OLAP_SUCCESS) {
|
||||
_tablet->release_cumulative_lock();
|
||||
LOG(INFO) << "no suitable delta versions. don't do cumulative compaction now.";
|
||||
return res;
|
||||
}
|
||||
// 2. pick rowsets to compact
|
||||
RETURN_NOT_OK(pick_rowsets_to_compact());
|
||||
|
||||
if (!_validate_need_merged_versions()) {
|
||||
_tablet->release_cumulative_lock();
|
||||
LOG(FATAL) << "error! invalid need merged versions.";
|
||||
return OLAP_ERR_CUMULATIVE_INVALID_NEED_MERGED_VERSIONS;
|
||||
}
|
||||
// 3. do cumulative compaction, merge rowsets
|
||||
RETURN_NOT_OK(do_compaction());
|
||||
|
||||
// 4. set state to success
|
||||
_state = CompactionState::SUCCESS;
|
||||
|
||||
// 5. set cumulative point
|
||||
_tablet->set_cumulative_layer_point(_input_rowsets.back()->end_version() + 1);
|
||||
|
||||
// 6. garbage collect input rowsets after cumulative compaction
|
||||
RETURN_NOT_OK(gc_unused_rowsets());
|
||||
|
||||
// 7. add metric to cumulative compaction
|
||||
DorisMetrics::cumulative_compaction_deltas_total.increment(_input_rowsets.size());
|
||||
DorisMetrics::cumulative_compaction_bytes_total.increment(_input_rowsets_size);
|
||||
|
||||
_is_init = true;
|
||||
_cumulative_version = Version(_need_merged_versions.begin()->first,
|
||||
_need_merged_versions.rbegin()->first);
|
||||
_rs_writer.reset(new (std::nothrow)AlphaRowsetWriter());
|
||||
return OLAP_SUCCESS;
|
||||
}
|
||||
|
||||
OLAPStatus CumulativeCompaction::run() {
|
||||
if (!_is_init) {
|
||||
_tablet->release_cumulative_lock();
|
||||
LOG(WARNING) << "cumulative handler is not inited.";
|
||||
return OLAP_ERR_NOT_INITED;
|
||||
OLAPStatus CumulativeCompaction::pick_rowsets_to_compact() {
|
||||
std::vector<RowsetSharedPtr> candidate_rowsets;
|
||||
_tablet->pick_candicate_rowsets_to_cumulative_compaction(&candidate_rowsets);
|
||||
|
||||
if (candidate_rowsets.size() <= 1) {
|
||||
LOG(WARNING) << "There is no enough rowsets to cumulative compaction."
|
||||
<< ", the size of rowsets to compact=" << candidate_rowsets.size()
|
||||
<< ", cumulative_point=" << _tablet->cumulative_layer_point();
|
||||
return OLAP_ERR_CUMULATIVE_NO_SUITABLE_VERSIONS;
|
||||
}
|
||||
|
||||
// 0. 准备工作
|
||||
LOG(INFO) << "start cumulative compaction. tablet=" << _tablet->full_name()
|
||||
<< ", cumulative_version=" << _cumulative_version.first << "-"
|
||||
<< _cumulative_version.second;
|
||||
OlapStopWatch watch;
|
||||
std::sort(candidate_rowsets.begin(), candidate_rowsets.end(), Rowset::comparator);
|
||||
RETURN_NOT_OK(check_version_continuity(candidate_rowsets));
|
||||
|
||||
// 1. 计算新的cumulative文件的version hash
|
||||
OLAPStatus res = OLAP_SUCCESS;
|
||||
res = _tablet->compute_all_versions_hash(_need_merged_versions, &_cumulative_version_hash);
|
||||
if (res != OLAP_SUCCESS) {
|
||||
_tablet->release_cumulative_lock();
|
||||
LOG(WARNING) << "failed to computer cumulative version hash."
|
||||
<< " tablet=" << _tablet->full_name()
|
||||
<< ", cumulative_version=" << _cumulative_version.first
|
||||
<< "-" << _cumulative_version.second;
|
||||
return res;
|
||||
}
|
||||
|
||||
// 2. 获取待合并的delta文件对应的data文件
|
||||
res = _tablet->capture_consistent_rowsets(_need_merged_versions, &_rowsets);
|
||||
if (res != OLAP_SUCCESS) {
|
||||
_tablet->release_cumulative_lock();
|
||||
LOG(WARNING) << "fail to capture consistent rowsets. tablet=" << _tablet->full_name()
|
||||
<< ", version=" << _cumulative_version.first
|
||||
<< "-" << _cumulative_version.second;
|
||||
return res;
|
||||
}
|
||||
|
||||
{
|
||||
DorisMetrics::cumulative_compaction_deltas_total.increment(_need_merged_versions.size());
|
||||
int64_t merge_bytes = 0;
|
||||
for (auto& rowset : _rowsets) {
|
||||
merge_bytes += rowset->data_disk_size();
|
||||
}
|
||||
DorisMetrics::cumulative_compaction_bytes_total.increment(merge_bytes);
|
||||
}
|
||||
|
||||
do {
|
||||
// 3. 生成新cumulative文件对应的olap index
|
||||
RowsetId rowset_id = 0;
|
||||
RETURN_NOT_OK(_tablet->next_rowset_id(&rowset_id));
|
||||
RowsetWriterContext context;
|
||||
context.rowset_id = rowset_id;
|
||||
context.tablet_uid = _tablet->tablet_uid();
|
||||
context.tablet_id = _tablet->tablet_id();
|
||||
context.partition_id = _tablet->partition_id();
|
||||
context.tablet_schema_hash = _tablet->schema_hash();
|
||||
context.rowset_type = ALPHA_ROWSET;
|
||||
context.rowset_path_prefix = _tablet->tablet_path();
|
||||
context.tablet_schema = &(_tablet->tablet_schema());
|
||||
context.rowset_state = VISIBLE;
|
||||
context.data_dir = _tablet->data_dir();
|
||||
context.version = _cumulative_version;
|
||||
context.version_hash = _cumulative_version_hash;
|
||||
_rs_writer->init(context);
|
||||
|
||||
// 4. 执行cumulative compaction合并过程
|
||||
for (auto& rowset : _rowsets) {
|
||||
RowsetReaderSharedPtr rs_reader(rowset->create_reader());
|
||||
if (rs_reader == nullptr) {
|
||||
LOG(WARNING) << "rowset create reader failed. rowset:" << rowset->rowset_id();
|
||||
_tablet->release_cumulative_lock();
|
||||
return OLAP_ERR_ROWSET_CREATE_READER;
|
||||
}
|
||||
_rs_readers.push_back(rs_reader);
|
||||
}
|
||||
res = _do_cumulative_compaction();
|
||||
_tablet->data_dir()->remove_pending_ids(ROWSET_ID_PREFIX + std::to_string(_rs_writer->rowset_id()));
|
||||
if (res != OLAP_SUCCESS) {
|
||||
LOG(WARNING) << "failed to do cumulative compaction."
|
||||
<< ", tablet=" << _tablet->full_name()
|
||||
<< ", cumulative_version=" << _cumulative_version.first
|
||||
<< "-" << _cumulative_version.second;
|
||||
break;
|
||||
}
|
||||
} while (0);
|
||||
|
||||
// 5. 如果出现错误,执行清理工作
|
||||
if (res != OLAP_SUCCESS) {
|
||||
StorageEngine::instance()->add_unused_rowset(_rowset);
|
||||
}
|
||||
|
||||
_tablet->release_cumulative_lock();
|
||||
|
||||
LOG(INFO) << "succeed to do cumulative compaction. tablet=" << _tablet->full_name()
|
||||
<< ", cumulative_version=" << _cumulative_version.first
|
||||
<< "-" << _cumulative_version.second
|
||||
<< ". elapsed time of doing cumulative compaction"
|
||||
<< ", time=" << watch.get_elapse_second() << "s";
|
||||
return res;
|
||||
}
|
||||
|
||||
OLAPStatus CumulativeCompaction::_calculate_need_merged_versions() {
|
||||
Versions delta_versions;
|
||||
OLAPStatus res = _get_delta_versions(&delta_versions);
|
||||
if (res != OLAP_SUCCESS) {
|
||||
LOG(INFO) << "failed to get delta versions. res=" << res;
|
||||
return res;
|
||||
}
|
||||
|
||||
// 此处减1,是为了确保最新版本的delta不会合入到cumulative里
|
||||
// 因为push可能会重复导入最新版本的delta
|
||||
uint32_t delta_number = delta_versions.size() - 1;
|
||||
uint32_t index = 0;
|
||||
// 在delta文件中寻找可合并的delta文件
|
||||
// 这些delta文件可能被delete或较大的delta文件(>= max_delta_file_size)分割为多个区间, 比如:
|
||||
// v1, v2, v3, D, v4, v5, D, v6, v7
|
||||
// 我们分区间进行查找直至找到合适的可合并delta文件
|
||||
while (index < delta_number) {
|
||||
Versions need_merged_versions;
|
||||
size_t total_size = 0;
|
||||
|
||||
// 在其中1个区间里查找可以合并的delta文件
|
||||
for (; index < delta_number; ++index) {
|
||||
// 如果已找到的可合并delta文件大小大于等于_max_delta_file_size,我们认为可以执行合并了
|
||||
// 停止查找过程
|
||||
if (total_size >= _max_delta_file_size) {
|
||||
std::vector<RowsetSharedPtr> transient_rowsets;
|
||||
for (size_t i = 0; i < candidate_rowsets.size() - 1; ++i) {
|
||||
// VersionHash will calculated from chosen rowsets.
|
||||
// If ultimate singleton rowset is chosen, VersionHash
|
||||
// will be different from the value recorded in FE.
|
||||
// So the ultimate singleton rowset is revserved.
|
||||
RowsetSharedPtr rowset = candidate_rowsets[i];
|
||||
if (_tablet->version_for_delete_predicate(rowset->version())) {
|
||||
if (transient_rowsets.size() > config::cumulative_compaction_num_singleton_deltas) {
|
||||
_input_rowsets = transient_rowsets;
|
||||
break;
|
||||
}
|
||||
|
||||
Version delta = delta_versions[index];
|
||||
size_t delta_size = _tablet->get_rowset_size_by_version(delta);
|
||||
// 如果遇到大的delta文件,或delete版本文件,则:
|
||||
if (delta_size >= _max_delta_file_size
|
||||
|| _tablet->version_for_delete_predicate(delta)
|
||||
|| _tablet->version_for_load_deletion(delta)) {
|
||||
// 1) 如果need_merged_versions为空,表示这2类文件在区间的开头,直接跳过
|
||||
if (need_merged_versions.empty()) {
|
||||
continue;
|
||||
} else {
|
||||
// 2) 如果need_merged_versions不为空,则已经找到区间的末尾,跳出循环
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
need_merged_versions.push_back(delta);
|
||||
total_size += delta_size;
|
||||
}
|
||||
|
||||
// 该区间没有可以合并的delta文件,进行下一轮循环,继续查找下一个区间
|
||||
if (need_merged_versions.empty()) {
|
||||
transient_rowsets.clear();
|
||||
continue;
|
||||
}
|
||||
|
||||
// 如果该区间中只有一个delta,或者该区间的delta都是空的delta,则我们查看能否与区间末尾的
|
||||
// 大delta合并,或者与区间的开头的前一个版本合并
|
||||
if (need_merged_versions.size() == 1 || total_size == 0) {
|
||||
// 如果区间末尾是较大的delta版, 则与它合并
|
||||
if (index < delta_number
|
||||
&& _tablet->get_rowset_size_by_version(delta_versions[index]) >=
|
||||
_max_delta_file_size) {
|
||||
need_merged_versions.push_back(delta_versions[index]);
|
||||
++index;
|
||||
}
|
||||
// 如果区间前一个版本可以合并, 则将其加入到可合并版本中
|
||||
Version delta_before_interval;
|
||||
if (_find_previous_version(need_merged_versions[0], &delta_before_interval)) {
|
||||
need_merged_versions.insert(need_merged_versions.begin(),
|
||||
delta_before_interval);
|
||||
}
|
||||
|
||||
// 如果还是只有1个待合并的delta,则跳过,不进行合并
|
||||
if (need_merged_versions.size() == 1) {
|
||||
continue;
|
||||
}
|
||||
|
||||
_need_merged_versions.swap(need_merged_versions);
|
||||
_new_cumulative_layer_point = delta_versions[index].first;
|
||||
return OLAP_SUCCESS;
|
||||
}
|
||||
|
||||
// 如果有多个可合并文件,则可以进行cumulative compaction的合并过程
|
||||
// 如果只有只有一个可合并的文件,为了效率,不触发cumulative compaction的合并过程
|
||||
if (need_merged_versions.size() != 1) {
|
||||
// 如果在可合并区间开头之前的一个版本的大小没有达到delta文件的最大值,
|
||||
// 则将可合并区间的文件合并到之前那个版本上
|
||||
Version delta;
|
||||
if (_find_previous_version(need_merged_versions[0], &delta)) {
|
||||
need_merged_versions.insert(need_merged_versions.begin(), delta);
|
||||
}
|
||||
|
||||
_need_merged_versions.swap(need_merged_versions);
|
||||
_new_cumulative_layer_point = delta_versions[index].first;
|
||||
return OLAP_SUCCESS;
|
||||
}
|
||||
}
|
||||
|
||||
// 没有找到可以合并的delta文件,无法执行合并过程,但我们仍然需要设置新的cumulative_layer_point
|
||||
// 如果不设置新的cumulative_layer_point, 则下次执行cumulative compaction时,扫描的文件和这次
|
||||
// 扫描的文件相同,依然找不到可以合并的delta文件, 无法执行合并过程。
|
||||
// 依此类推,就进入了死循环状态,永远不会进行cumulative compaction
|
||||
_tablet->set_cumulative_layer_point(delta_versions[index].first);
|
||||
_tablet->save_meta();
|
||||
return OLAP_ERR_CUMULATIVE_NO_SUITABLE_VERSIONS;
|
||||
}
|
||||
|
||||
static bool version_comparator(const Version& lhs, const Version& rhs) {
|
||||
return lhs.second < rhs.second;
|
||||
}
|
||||
|
||||
OLAPStatus CumulativeCompaction::_get_delta_versions(Versions* delta_versions) {
|
||||
delta_versions->clear();
|
||||
|
||||
Versions all_versions;
|
||||
_tablet->list_versions(&all_versions);
|
||||
|
||||
for (Versions::const_iterator version = all_versions.begin();
|
||||
version != all_versions.end(); ++version) {
|
||||
if (version->first == version->second && version->first >= _old_cumulative_layer_point) {
|
||||
delta_versions->push_back(*version);
|
||||
}
|
||||
transient_rowsets.push_back(rowset);
|
||||
}
|
||||
|
||||
if (delta_versions->size() == 0) {
|
||||
LOG(INFO) << "no delta versions. cumulative_point=" << _old_cumulative_layer_point;
|
||||
if (transient_rowsets.size() > config::cumulative_compaction_num_singleton_deltas) {
|
||||
_input_rowsets = transient_rowsets;
|
||||
}
|
||||
|
||||
if (_input_rowsets.empty()) {
|
||||
// There are no rowsets choosed to do cumulative compaction.
|
||||
// Under this circumstance, cumulative_point should be set.
|
||||
// Otherwise, the next round will not choose rowsets.
|
||||
_tablet->set_cumulative_layer_point(candidate_rowsets.back()->start_version());
|
||||
}
|
||||
|
||||
if (_input_rowsets.size() <= 1) {
|
||||
LOG(WARNING) << "There is no enough rowsets to cumulative compaction."
|
||||
<< ", the size of rowsets to compact=" << candidate_rowsets.size()
|
||||
<< ", cumulative_point=" << _tablet->cumulative_layer_point();
|
||||
return OLAP_ERR_CUMULATIVE_NO_SUITABLE_VERSIONS;
|
||||
}
|
||||
|
||||
// 如果size等于1,则这个delta一定是cumulative point所指向的delta
|
||||
// 因为我们总是保留最新的delta不合并到cumulative文件里,所以此时应该返回错误,不进行cumulative
|
||||
if (delta_versions->size() == 1) {
|
||||
delta_versions->clear();
|
||||
VLOG(10) << "only one delta version. no new delta versions."
|
||||
<< " cumulative_point=" << _old_cumulative_layer_point;
|
||||
return OLAP_ERR_CUMULATIVE_NO_SUITABLE_VERSIONS;
|
||||
for (auto& rowset : _input_rowsets) {
|
||||
_input_rowsets_size += rowset->data_disk_size();
|
||||
_input_row_num += rowset->num_rows();
|
||||
}
|
||||
|
||||
sort(delta_versions->begin(), delta_versions->end(), version_comparator);
|
||||
|
||||
// can't do cumulative expansion if there has a hole
|
||||
Versions versions_path;
|
||||
OLAPStatus select_status = _tablet->capture_consistent_versions(
|
||||
Version(delta_versions->front().first, delta_versions->back().second), &versions_path);
|
||||
if (select_status != OLAP_SUCCESS) {
|
||||
LOG(WARNING) << "can't do cumulative expansion if fail to select shortest version path."
|
||||
<< " tablet=" << _tablet->full_name()
|
||||
<< ", start=" << delta_versions->front().first
|
||||
<< ", end=" << delta_versions->back().second;
|
||||
return select_status;
|
||||
}
|
||||
|
||||
return OLAP_SUCCESS;
|
||||
}
|
||||
|
||||
bool CumulativeCompaction::_find_previous_version(const Version current_version,
|
||||
Version* previous_version) {
|
||||
Versions all_versions;
|
||||
if (OLAP_SUCCESS != _tablet->capture_consistent_versions(Version(0, current_version.second),
|
||||
&all_versions)) {
|
||||
LOG(WARNING) << "fail to select shortest version path. start=0, end=" << current_version.second;
|
||||
return false;
|
||||
}
|
||||
|
||||
// previous_version.second应该等于current_version.first - 1
|
||||
for (Versions::const_iterator version = all_versions.begin();
|
||||
version != all_versions.end(); ++version) {
|
||||
// Skip base version
|
||||
if (version->first == 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (version->second == current_version.first - 1) {
|
||||
if (_tablet->version_for_delete_predicate(*version)
|
||||
|| _tablet->version_for_load_deletion(*version)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
size_t data_size = _tablet->get_rowset_size_by_version(*version);
|
||||
if (data_size >= _max_delta_file_size) {
|
||||
return false;
|
||||
}
|
||||
|
||||
*previous_version = *version;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
OLAPStatus CumulativeCompaction::_do_cumulative_compaction() {
|
||||
OLAPStatus res = OLAP_SUCCESS;
|
||||
OlapStopWatch watch;
|
||||
Merger merger(_tablet, _rs_writer, READER_CUMULATIVE_COMPACTION);
|
||||
|
||||
// 1. merge delta files into new cumulative file
|
||||
uint64_t merged_rows = 0;
|
||||
uint64_t filted_rows = 0;
|
||||
res = merger.merge(_rs_readers, &merged_rows, &filted_rows);
|
||||
if (res != OLAP_SUCCESS) {
|
||||
LOG(WARNING) << "failed to do cumulative merge."
|
||||
<< " tablet=" << _tablet->full_name()
|
||||
<< ", cumulative_version=" << _cumulative_version.first
|
||||
<< "-" << _cumulative_version.second;
|
||||
return res;
|
||||
}
|
||||
|
||||
_rowset = _rs_writer->build();
|
||||
if (_rowset == nullptr) {
|
||||
LOG(WARNING) << "rowset writer build failed. writer version:"
|
||||
<< _rs_writer->version().first << "-" << _rs_writer->version().second;
|
||||
return OLAP_ERR_MALLOC_ERROR;
|
||||
}
|
||||
|
||||
// 2. Check row num changes
|
||||
uint64_t source_rows = 0;
|
||||
for (auto rowset : _rowsets) {
|
||||
source_rows += rowset->num_rows();
|
||||
}
|
||||
bool row_nums_check = config::row_nums_check;
|
||||
if (row_nums_check) {
|
||||
if (source_rows != _rowset->num_rows() + merged_rows + filted_rows) {
|
||||
LOG(FATAL) << "fail to check row num! "
|
||||
<< "source_rows=" << source_rows
|
||||
<< ", merged_rows=" << merged_rows
|
||||
<< ", filted_rows=" << filted_rows
|
||||
<< ", new_index_rows=" << _rowset->num_rows();
|
||||
return OLAP_ERR_CHECK_LINES_ERROR;
|
||||
}
|
||||
} else {
|
||||
LOG(INFO) << "all row nums. source_rows=" << source_rows
|
||||
<< ", merged_rows=" << merged_rows
|
||||
<< ", filted_rows=" << filted_rows
|
||||
<< ", new_index_rows=" << _rowset->num_rows()
|
||||
<< ", merged_version_num=" << _need_merged_versions.size()
|
||||
<< ", time_us=" << watch.get_elapse_time_us();
|
||||
}
|
||||
|
||||
// 3. add new cumulative file into tablet
|
||||
vector<RowsetSharedPtr> unused_rowsets;
|
||||
_tablet->obtain_header_wrlock();
|
||||
res = _tablet->capture_consistent_rowsets(_need_merged_versions, &unused_rowsets);
|
||||
if (res != OLAP_SUCCESS) {
|
||||
LOG(WARNING) << "fail to capture consistent rowsets. tablet=" << _tablet->full_name()
|
||||
<< ", version=" << _cumulative_version.first
|
||||
<< "-" << _cumulative_version.second;
|
||||
_tablet->release_header_lock();
|
||||
return res;
|
||||
}
|
||||
res = _update_header(unused_rowsets);
|
||||
if (res != OLAP_SUCCESS) {
|
||||
LOG(WARNING) << "failed to update header for new cumulative."
|
||||
<< "tablet=" << _tablet->full_name()
|
||||
<< ", cumulative_version=" << _cumulative_version.first
|
||||
<< "-" << _cumulative_version.second;
|
||||
_tablet->release_header_lock();
|
||||
return res;
|
||||
}
|
||||
|
||||
// 4. validate that delete action is right
|
||||
res = _validate_delete_file_action();
|
||||
if (res != OLAP_SUCCESS) {
|
||||
LOG(FATAL) << "delete action of cumulative compaction has error. roll back."
|
||||
<< "tablet=" << _tablet->full_name()
|
||||
<< ", cumulative_version=" << _cumulative_version.first
|
||||
<< "-" << _cumulative_version.second;
|
||||
// if error happened, roll back
|
||||
OLAPStatus ret = _roll_back(unused_rowsets);
|
||||
if (ret != OLAP_SUCCESS) {
|
||||
LOG(FATAL) << "roll back failed. [tablet=" << _tablet->full_name() << "]";
|
||||
}
|
||||
|
||||
_tablet->release_header_lock();
|
||||
return res;
|
||||
}
|
||||
// 5. 如果合并成功,设置新的cumulative_layer_point
|
||||
_tablet->set_cumulative_layer_point(_new_cumulative_layer_point);
|
||||
_tablet->save_meta();
|
||||
_tablet->release_header_lock();
|
||||
|
||||
// 6. delete delta files which have been merged into new cumulative file
|
||||
_delete_unused_rowsets(&unused_rowsets);
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
OLAPStatus CumulativeCompaction::_update_header(const vector<RowsetSharedPtr>& unused_rowsets) {
|
||||
vector<RowsetSharedPtr> new_rowsets;
|
||||
new_rowsets.push_back(_rowset);
|
||||
|
||||
OLAPStatus res = OLAP_SUCCESS;
|
||||
res = _tablet->modify_rowsets(new_rowsets, unused_rowsets);
|
||||
if (res != OLAP_SUCCESS) {
|
||||
LOG(FATAL) << "failed to replace data sources. res=" << res
|
||||
<< ", tablet=" << _tablet->full_name();
|
||||
return res;
|
||||
}
|
||||
|
||||
res = _tablet->save_meta();
|
||||
if (res != OLAP_SUCCESS) {
|
||||
LOG(FATAL) << "failed to save header. res=" << res
|
||||
<< ", tablet=" << _tablet->full_name();
|
||||
return res;
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
void CumulativeCompaction::_delete_unused_rowsets(vector<RowsetSharedPtr>* unused_rowsets) {
|
||||
if (!unused_rowsets->empty()) {
|
||||
StorageEngine* storage_engine = StorageEngine::instance();
|
||||
|
||||
for (vector<RowsetSharedPtr>::iterator it = unused_rowsets->begin();
|
||||
it != unused_rowsets->end(); ++it) {
|
||||
storage_engine->add_unused_rowset(*it);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool CumulativeCompaction::_validate_need_merged_versions() {
|
||||
// 1. validate versions in _need_merged_versions are continuous
|
||||
// Skip the first element
|
||||
for (unsigned int index = 1; index < _need_merged_versions.size(); ++index) {
|
||||
Version previous_version = _need_merged_versions[index - 1];
|
||||
Version current_version = _need_merged_versions[index];
|
||||
if (current_version.first != previous_version.second + 1) {
|
||||
LOG(WARNING) << "wrong need merged version. "
|
||||
<< "previous_version=" << previous_version.first
|
||||
<< "-" << previous_version.second
|
||||
<< ", current_version=" << current_version.first
|
||||
<< "-" << current_version.second;
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
OLAPStatus CumulativeCompaction::_validate_delete_file_action() {
|
||||
// 1. acquire the new cumulative version to make sure that all is right after deleting files
|
||||
Version spec_version = Version(0, _cumulative_version.second);
|
||||
vector<RowsetReaderSharedPtr> rs_readers;
|
||||
_tablet->capture_rs_readers(spec_version, &rs_readers);
|
||||
if (rs_readers.empty()) {
|
||||
LOG(WARNING) << "acquire data source failed. "
|
||||
<< "spec_verison=" << spec_version.first << "-" << spec_version.second;
|
||||
return OLAP_ERR_CUMULATIVE_ERROR_DELETE_ACTION;
|
||||
}
|
||||
|
||||
return OLAP_SUCCESS;
|
||||
}
|
||||
|
||||
OLAPStatus CumulativeCompaction::_roll_back(vector<RowsetSharedPtr>& old_olap_indices) {
|
||||
vector<RowsetSharedPtr> unused_rowsets;
|
||||
OLAPStatus res = _tablet->capture_consistent_rowsets(_cumulative_version, &unused_rowsets);
|
||||
if (res != OLAP_SUCCESS) {
|
||||
LOG(WARNING) << "fail to capture consistent rowsets. tablet=" << _tablet->full_name()
|
||||
<< ", version=" << _cumulative_version.first
|
||||
<< "-" << _cumulative_version.second;
|
||||
return res;
|
||||
}
|
||||
|
||||
// unused_rowsets will only contain new cumulative index
|
||||
// we don't need to delete it here; we will delete new cumulative index in the end.
|
||||
|
||||
res = OLAP_SUCCESS;
|
||||
res = _tablet->modify_rowsets(old_olap_indices, unused_rowsets);
|
||||
if (res != OLAP_SUCCESS) {
|
||||
LOG(FATAL) << "failed to replace data sources. [tablet=" << _tablet->full_name() << "]";
|
||||
return res;
|
||||
}
|
||||
|
||||
res = _tablet->save_meta();
|
||||
if (res != OLAP_SUCCESS) {
|
||||
LOG(FATAL) << "failed to save header. [tablet=" << _tablet->full_name() << "]";
|
||||
return res;
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
} // namespace doris
|
||||
|
||||
|
||||
@ -18,147 +18,33 @@
|
||||
#ifndef DORIS_BE_SRC_OLAP_CUMULATIVE_COMPACTION_H
|
||||
#define DORIS_BE_SRC_OLAP_CUMULATIVE_COMPACTION_H
|
||||
|
||||
#include <list>
|
||||
#include <map>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
#include "olap/merger.h"
|
||||
#include "olap/olap_define.h"
|
||||
#include "olap/tablet.h"
|
||||
#include "olap/rowset/rowset_id_generator.h"
|
||||
#include "olap/rowset/alpha_rowset_writer.h"
|
||||
#include "olap/compaction.h"
|
||||
|
||||
namespace doris {
|
||||
|
||||
class Rowset;
|
||||
|
||||
class CumulativeCompaction {
|
||||
class CumulativeCompaction : public Compaction {
|
||||
public:
|
||||
CumulativeCompaction() :
|
||||
_is_init(false),
|
||||
_old_cumulative_layer_point(0),
|
||||
_new_cumulative_layer_point(0),
|
||||
_max_delta_file_size(0),
|
||||
_rowset(nullptr),
|
||||
_rs_writer(nullptr) {}
|
||||
CumulativeCompaction(TabletSharedPtr tablet);
|
||||
~CumulativeCompaction() override;
|
||||
|
||||
OLAPStatus compact() override;
|
||||
|
||||
protected:
|
||||
OLAPStatus pick_rowsets_to_compact() override;
|
||||
|
||||
std::string compaction_name() const override {
|
||||
return "cumulative compaction";
|
||||
}
|
||||
|
||||
ReaderType compaction_type() const override {
|
||||
return ReaderType::READER_CUMULATIVE_COMPACTION;
|
||||
}
|
||||
|
||||
~CumulativeCompaction() {}
|
||||
|
||||
// 初始化CumulativeCompaction对象,包括:
|
||||
// - 检查是否触发cumulative compaction
|
||||
// - 计算可合并的delta文件
|
||||
//
|
||||
// 输入参数:
|
||||
// - tablet 待执行cumulative compaction的tablet
|
||||
//
|
||||
// 返回值:
|
||||
// - 如果触发cumulative compaction,返回OLAP_SUCCESS
|
||||
// - 否则,返回对应错误码
|
||||
OLAPStatus init(TabletSharedPtr tablet);
|
||||
|
||||
// 执行cumulative compaction
|
||||
//
|
||||
// 返回值:
|
||||
// - 如果执行成功,返回OLAP_SUCCESS
|
||||
// - 如果执行失败,返回相应错误码
|
||||
OLAPStatus run();
|
||||
|
||||
private:
|
||||
|
||||
// 计算可以合并的delta文件,以及新的cumulative层标识点
|
||||
//
|
||||
// 返回值:
|
||||
// - 如果成功,返回OLAP_SUCCESS
|
||||
// - 如果不成功,返回相应错误码
|
||||
OLAPStatus _calculate_need_merged_versions();
|
||||
|
||||
// 获取table现有的delta文件
|
||||
//
|
||||
// 输出参数:
|
||||
// - delta_versions: 将table现有delta文件的版本号存入该参数
|
||||
//
|
||||
// 返回值:
|
||||
// - 如果成功,返回OLAP_SUCCESS
|
||||
// - 如果不成功,返回相应错误码
|
||||
OLAPStatus _get_delta_versions(Versions* delta_versions);
|
||||
|
||||
// 找出某一版本文件的前一个版本文件, 即 current_version.first = previous_version.second + 1
|
||||
//
|
||||
// 输入参数:
|
||||
// - current_version: 某一指定版本
|
||||
//
|
||||
// 输出参数:
|
||||
// - previous_version: 待查找的指定版本的前一个版本
|
||||
//
|
||||
// 返回值:
|
||||
// - 如果查找成功,返回true
|
||||
// - 如果查找失败,返回false
|
||||
bool _find_previous_version(const Version current_version, Version* previous_version);
|
||||
|
||||
// 执行cumulative compaction合并过程
|
||||
//
|
||||
// 返回值:
|
||||
// - 如果成功,返回OLAP_SUCCESS
|
||||
// - 如果不成功,返回相应错误码
|
||||
OLAPStatus _do_cumulative_compaction();
|
||||
|
||||
// 将合并得到的新cumulative文件载入tablet
|
||||
//
|
||||
// 输出参数:
|
||||
// - unused_rowsets: 返回不再使用的delta文件对应的olap index
|
||||
//
|
||||
// 返回值:
|
||||
// - 如果成功,返回OLAP_SUCCESS
|
||||
// - 如果不成功,返回相应错误码
|
||||
OLAPStatus _update_header(const std::vector<RowsetSharedPtr>& unused_rowsets);
|
||||
|
||||
// 删除不再使用的delta文件
|
||||
//
|
||||
// 输入输出参数
|
||||
// - unused_rowsets: 待删除的不再使用的delta文件对应的olap index
|
||||
void _delete_unused_rowsets(std::vector<RowsetSharedPtr>* unused_rowsets);
|
||||
|
||||
// 验证得到的m_need_merged_versions是否正确
|
||||
//
|
||||
// 返回值:
|
||||
// - 如果错误,返回false
|
||||
// - 如果正确,返回true
|
||||
bool _validate_need_merged_versions();
|
||||
|
||||
// 验证得到的删除文件操作是否正确; 使用该函数前需要对header文件加锁
|
||||
//
|
||||
// 返回值:
|
||||
// - 如果错误,返回OLAP_ERR_CUMULATIVE_ERROR_DELETE_ACTION
|
||||
// - 如果正确,返回OLAP_SUCCESS
|
||||
OLAPStatus _validate_delete_file_action();
|
||||
|
||||
// 恢复header头文件的文件版本和table的data source
|
||||
OLAPStatus _roll_back(std::vector<RowsetSharedPtr>& old_olap_indices);
|
||||
|
||||
// CumulativeCompaction对象是否初始化
|
||||
bool _is_init;
|
||||
// table现有的cumulative层的标识点
|
||||
int64_t _old_cumulative_layer_point;
|
||||
// 待cumulative compaction完成之后,新的cumulative层的标识点
|
||||
int64_t _new_cumulative_layer_point;
|
||||
// 一个cumulative文件大小的最大值
|
||||
// 当delta文件的大小超过该值时,我们认为该delta文件是cumulative文件
|
||||
size_t _max_delta_file_size;
|
||||
// 待执行cumulative compaction的tablet
|
||||
TabletSharedPtr _tablet;
|
||||
// 新cumulative文件的版本
|
||||
Version _cumulative_version;
|
||||
// 新cumulative文件的version hash
|
||||
VersionHash _cumulative_version_hash;
|
||||
// 新cumulative文件对应的olap index
|
||||
RowsetSharedPtr _rowset;
|
||||
RowsetWriterSharedPtr _rs_writer;
|
||||
// 可合并的delta文件的data文件
|
||||
std::vector<RowsetSharedPtr> _rowsets;
|
||||
std::vector<RowsetReaderSharedPtr> _rs_readers;
|
||||
// 可合并的delta文件的版本
|
||||
std::vector<Version> _need_merged_versions;
|
||||
int64_t _cumulative_rowset_size_threshold;
|
||||
|
||||
DISALLOW_COPY_AND_ASSIGN(CumulativeCompaction);
|
||||
};
|
||||
|
||||
@ -35,20 +35,38 @@ namespace doris {
|
||||
|
||||
Merger::Merger(TabletSharedPtr tablet, RowsetWriterSharedPtr writer, ReaderType type) :
|
||||
_tablet(tablet),
|
||||
_rs_writer(writer),
|
||||
_output_rs_writer(writer),
|
||||
_reader_type(type),
|
||||
_row_count(0) {}
|
||||
|
||||
Merger::Merger(TabletSharedPtr tablet, ReaderType type, RowsetWriterSharedPtr writer,
|
||||
const std::vector<RowsetReaderSharedPtr>& rs_readers) :
|
||||
_tablet(tablet),
|
||||
_output_rs_writer(writer),
|
||||
_input_rs_readers(rs_readers),
|
||||
_reader_type(type),
|
||||
_row_count(0) {}
|
||||
|
||||
OLAPStatus Merger::merge(const vector<RowsetReaderSharedPtr>& rs_readers,
|
||||
uint64_t* merged_rows, uint64_t* filted_rows) {
|
||||
int64_t* merged_rows, int64_t* filted_rows) {
|
||||
_input_rs_readers = rs_readers;
|
||||
OLAPStatus res = merge();
|
||||
if (res == OLAP_SUCCESS) {
|
||||
*merged_rows= _merged_rows;
|
||||
*filted_rows = _filted_rows;
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
OLAPStatus Merger::merge() {
|
||||
// Create and initiate reader for scanning and multi-merging specified
|
||||
// OLAPDatas.
|
||||
Reader reader;
|
||||
ReaderParams reader_params;
|
||||
reader_params.tablet = _tablet;
|
||||
reader_params.reader_type = _reader_type;
|
||||
reader_params.rs_readers = rs_readers;
|
||||
reader_params.version = _rs_writer->version();
|
||||
reader_params.rs_readers = _input_rs_readers;
|
||||
reader_params.version = _output_rs_writer->version();
|
||||
|
||||
if (OLAP_SUCCESS != reader.init(reader_params)) {
|
||||
LOG(WARNING) << "fail to initiate reader. tablet=" << _tablet->full_name();
|
||||
@ -67,7 +85,6 @@ OLAPStatus Merger::merge(const vector<RowsetReaderSharedPtr>& rs_readers,
|
||||
row_cursor.allocate_memory_for_string_type(_tablet->tablet_schema());
|
||||
// The following procedure would last for long time, half of one day, etc.
|
||||
while (!has_error) {
|
||||
|
||||
// Read one row into row_cursor
|
||||
OLAPStatus res = reader.next_row_with_aggregation(&row_cursor, &eof);
|
||||
if (OLAP_SUCCESS == res && eof) {
|
||||
@ -79,7 +96,7 @@ OLAPStatus Merger::merge(const vector<RowsetReaderSharedPtr>& rs_readers,
|
||||
break;
|
||||
}
|
||||
|
||||
if (OLAP_SUCCESS != _rs_writer->add_row(row_cursor)) {
|
||||
if (OLAP_SUCCESS != _output_rs_writer->add_row(row_cursor)) {
|
||||
LOG(WARNING) << "add row to builder failed. tablet=" << _tablet->full_name();
|
||||
has_error = true;
|
||||
break;
|
||||
@ -89,18 +106,17 @@ OLAPStatus Merger::merge(const vector<RowsetReaderSharedPtr>& rs_readers,
|
||||
++_row_count;
|
||||
}
|
||||
|
||||
if (_rs_writer->flush() != OLAP_SUCCESS) {
|
||||
if (_output_rs_writer->flush() != OLAP_SUCCESS) {
|
||||
LOG(WARNING) << "fail to finalize writer. "
|
||||
<< "tablet=" << _tablet->full_name();
|
||||
has_error = true;
|
||||
}
|
||||
|
||||
if (!has_error) {
|
||||
*merged_rows = reader.merged_rows();
|
||||
*filted_rows = reader.filtered_rows();
|
||||
_merged_rows = reader.merged_rows();
|
||||
_filted_rows = reader.filtered_rows();
|
||||
}
|
||||
|
||||
return has_error ? OLAP_ERR_OTHER_ERROR : OLAP_SUCCESS;
|
||||
}
|
||||
|
||||
} // namespace doris
|
||||
|
||||
@ -31,24 +31,31 @@ class Merger {
|
||||
public:
|
||||
// parameter index is created by caller, and it is empty.
|
||||
Merger(TabletSharedPtr tablet, RowsetWriterSharedPtr writer, ReaderType type);
|
||||
Merger(TabletSharedPtr tablet, ReaderType type,
|
||||
RowsetWriterSharedPtr writer, const std::vector<RowsetReaderSharedPtr>& rs_readers);
|
||||
|
||||
virtual ~Merger() {};
|
||||
|
||||
// @brief read from multiple OLAPData and SegmentGroup, then write into single OLAPData and SegmentGroup
|
||||
// @return OLAPStatus: OLAP_SUCCESS or FAIL
|
||||
// @note it will take long time to finish.
|
||||
OLAPStatus merge(const std::vector<RowsetReaderSharedPtr>& rs_readers,
|
||||
uint64_t* merged_rows, uint64_t* filted_rows);
|
||||
OLAPStatus merge(const vector<RowsetReaderSharedPtr>& rs_readers,
|
||||
int64_t* merged_rows, int64_t* filted_rows);
|
||||
OLAPStatus merge();
|
||||
|
||||
// 获取在做merge过程中累积的行数
|
||||
uint64_t row_count() {
|
||||
return _row_count;
|
||||
}
|
||||
inline int64_t row_count() const { return _row_count; }
|
||||
inline int64_t merged_rows() const { return _merged_rows; }
|
||||
inline int64_t filted_rows() const { return _filted_rows; }
|
||||
private:
|
||||
TabletSharedPtr _tablet;
|
||||
RowsetWriterSharedPtr _rs_writer;
|
||||
RowsetWriterSharedPtr _output_rs_writer;
|
||||
|
||||
std::vector<RowsetReaderSharedPtr> _input_rs_readers;
|
||||
ReaderType _reader_type;
|
||||
uint64_t _row_count;
|
||||
int64_t _row_count;
|
||||
int64_t _merged_rows;
|
||||
int64_t _filted_rows;
|
||||
|
||||
DISALLOW_COPY_AND_ASSIGN(Merger);
|
||||
};
|
||||
|
||||
@ -333,6 +333,7 @@ enum OLAPStatus {
|
||||
OLAP_ERR_CUMULATIVE_FAILED_ACQUIRE_DATA_SOURCE = -2003,
|
||||
OLAP_ERR_CUMULATIVE_INVALID_NEED_MERGED_VERSIONS = -2004,
|
||||
OLAP_ERR_CUMULATIVE_ERROR_DELETE_ACTION = -2005,
|
||||
OLAP_ERR_CUMULATIVE_MISS_VERSION = -2006,
|
||||
|
||||
// OLAPMeta
|
||||
// [-3000, -3100)
|
||||
|
||||
@ -32,7 +32,7 @@ OLAPStatus OlapSnapshotConverter::to_olap_header(const TabletMetaPB& tablet_meta
|
||||
LOG(FATAL) << "tablet schema does not have cumulative_layer_point."
|
||||
<< " tablet id = " << tablet_meta_pb.tablet_id();
|
||||
}
|
||||
olap_header->set_cumulative_layer_point(tablet_meta_pb.cumulative_layer_point());
|
||||
olap_header->set_cumulative_layer_point(-1);
|
||||
if (!tablet_meta_pb.schema().has_num_short_key_columns()) {
|
||||
LOG(FATAL) << "tablet schema does not have num_short_key_columns."
|
||||
<< " tablet id = " << tablet_meta_pb.tablet_id();
|
||||
@ -99,7 +99,7 @@ OLAPStatus OlapSnapshotConverter::to_tablet_meta_pb(const OLAPHeaderMessage& ola
|
||||
tablet_meta_pb->set_shard_id(olap_header.shard());
|
||||
}
|
||||
tablet_meta_pb->set_creation_time(olap_header.creation_time());
|
||||
tablet_meta_pb->set_cumulative_layer_point(olap_header.cumulative_layer_point());
|
||||
tablet_meta_pb->set_cumulative_layer_point(-1);
|
||||
|
||||
TabletSchemaPB* schema = tablet_meta_pb->mutable_schema();
|
||||
for (auto& column_msg : olap_header.column()) {
|
||||
|
||||
@ -125,6 +125,10 @@ public:
|
||||
_need_delete_file = true;
|
||||
}
|
||||
|
||||
static bool comparator(const RowsetSharedPtr& left, const RowsetSharedPtr& right) {
|
||||
return left->end_version() < right->end_version();
|
||||
}
|
||||
|
||||
protected:
|
||||
// allow subclass to add custom logic when rowset is being published
|
||||
virtual void make_visible_extra(Version version, VersionHash version_hash) {}
|
||||
|
||||
@ -1108,8 +1108,8 @@ bool SchemaChangeWithSorting::_external_sorting(
|
||||
TabletSharedPtr new_tablet) {
|
||||
Merger merger(new_tablet, rowset_writer, READER_ALTER_TABLE);
|
||||
|
||||
uint64_t merged_rows = 0;
|
||||
uint64_t filtered_rows = 0;
|
||||
int64_t merged_rows = 0;
|
||||
int64_t filtered_rows = 0;
|
||||
vector<RowsetReaderSharedPtr> rs_readers;
|
||||
for (vector<RowsetSharedPtr>::iterator it = src_rowsets.begin();
|
||||
it != src_rowsets.end(); ++it) {
|
||||
@ -1248,7 +1248,7 @@ OLAPStatus SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletRe
|
||||
new_tablet->modify_rowsets(std::vector<RowsetSharedPtr>(), rowsets_to_delete);
|
||||
// inherit cumulative_layer_point from base_tablet
|
||||
// check if new_tablet.ce_point > base_tablet.ce_point?
|
||||
new_tablet->set_cumulative_layer_point(base_tablet->cumulative_layer_point());
|
||||
new_tablet->set_cumulative_layer_point(-1);
|
||||
// save tablet meta
|
||||
res = new_tablet->save_meta();
|
||||
if (res != OLAP_SUCCESS) {
|
||||
@ -1519,7 +1519,7 @@ OLAPStatus SchemaChangeHandler::process_alter_tablet(AlterTabletType type,
|
||||
}
|
||||
|
||||
// inherit cumulative_layer_point from base_tablet
|
||||
new_tablet->set_cumulative_layer_point(base_tablet->cumulative_layer_point());
|
||||
new_tablet->set_cumulative_layer_point(-1);
|
||||
|
||||
// get history versions to be changed
|
||||
res = _get_versions_to_be_changed(base_tablet, &versions_to_be_changed);
|
||||
|
||||
@ -567,26 +567,13 @@ void StorageEngine::perform_cumulative_compaction(DataDir* data_dir) {
|
||||
if (best_tablet == nullptr) { return; }
|
||||
|
||||
DorisMetrics::cumulative_compaction_request_total.increment(1);
|
||||
CumulativeCompaction cumulative_compaction;
|
||||
OLAPStatus res = cumulative_compaction.init(best_tablet);
|
||||
if (res != OLAP_SUCCESS) {
|
||||
if (res != OLAP_ERR_CUMULATIVE_REPEAT_INIT && res != OLAP_ERR_CE_TRY_CE_LOCK_ERROR) {
|
||||
best_tablet->set_last_compaction_failure_time(UnixMillis());
|
||||
if (res != OLAP_ERR_CUMULATIVE_NO_SUITABLE_VERSIONS) {
|
||||
LOG(WARNING) << "failed to init cumulative compaction"
|
||||
<< ", table=" << best_tablet->full_name()
|
||||
<< ", res=" << res;
|
||||
DorisMetrics::cumulative_compaction_request_failed.increment(1);
|
||||
}
|
||||
}
|
||||
return;
|
||||
}
|
||||
CumulativeCompaction cumulative_compaction(best_tablet);
|
||||
|
||||
res = cumulative_compaction.run();
|
||||
OLAPStatus res = cumulative_compaction.compact();
|
||||
if (res != OLAP_SUCCESS) {
|
||||
DorisMetrics::cumulative_compaction_request_failed.increment(1);
|
||||
best_tablet->set_last_compaction_failure_time(UnixMillis());
|
||||
LOG(WARNING) << "failed to do cumulative compaction"
|
||||
LOG(WARNING) << "failed to do cumulative compaction. res=" << res
|
||||
<< ", table=" << best_tablet->full_name()
|
||||
<< ", res=" << res;
|
||||
return;
|
||||
@ -599,26 +586,13 @@ void StorageEngine::perform_base_compaction(DataDir* data_dir) {
|
||||
if (best_tablet == nullptr) { return; }
|
||||
|
||||
DorisMetrics::base_compaction_request_total.increment(1);
|
||||
BaseCompaction base_compaction;
|
||||
OLAPStatus res = base_compaction.init(best_tablet);
|
||||
if (res != OLAP_SUCCESS) {
|
||||
if (res != OLAP_ERR_BE_TRY_BE_LOCK_ERROR && res != OLAP_ERR_BE_NO_SUITABLE_VERSION) {
|
||||
DorisMetrics::base_compaction_request_failed.increment(1);
|
||||
best_tablet->set_last_compaction_failure_time(UnixMillis());
|
||||
LOG(WARNING) << "failed to init base compaction"
|
||||
<< ", table=" << best_tablet->full_name()
|
||||
<< ", res=" << res;
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
res = base_compaction.run();
|
||||
BaseCompaction base_compaction(best_tablet);
|
||||
OLAPStatus res = base_compaction.compact();
|
||||
if (res != OLAP_SUCCESS) {
|
||||
DorisMetrics::base_compaction_request_failed.increment(1);
|
||||
best_tablet->set_last_compaction_failure_time(UnixMillis());
|
||||
LOG(WARNING) << "failed to init base compaction"
|
||||
<< ", table=" << best_tablet->full_name()
|
||||
<< ", res=" << res;
|
||||
LOG(WARNING) << "failed to init base compaction. res=" << res
|
||||
<< ", table=" << best_tablet->full_name();
|
||||
return;
|
||||
}
|
||||
best_tablet->set_last_compaction_failure_time(0);
|
||||
|
||||
@ -150,6 +150,8 @@ OLAPStatus Tablet::init_once() {
|
||||
_inc_rs_version_map[version] = rowset;
|
||||
}
|
||||
|
||||
_cumulative_point = -1;
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
@ -670,6 +672,17 @@ OLAPStatus Tablet::compute_all_versions_hash(const vector<Version>& versions,
|
||||
return OLAP_SUCCESS;
|
||||
}
|
||||
|
||||
void Tablet::compute_version_hash_from_rowsets(
|
||||
const std::vector<RowsetSharedPtr>& rowsets, VersionHash* version_hash) const {
|
||||
DCHECK(version_hash != nullptr) << "invalid parameter, version_hash is nullptr";
|
||||
int64_t v_hash = 0;
|
||||
for (auto& rowset : rowsets) {
|
||||
v_hash ^= rowset->version_hash();
|
||||
}
|
||||
*version_hash = v_hash;
|
||||
}
|
||||
|
||||
|
||||
void Tablet::calc_missed_versions(int64_t spec_version,
|
||||
vector<Version>* missed_versions) {
|
||||
ReadLock rdlock(&_meta_lock);
|
||||
@ -736,6 +749,38 @@ OLAPStatus Tablet::max_continuous_version_from_begining(Version* version, Versio
|
||||
return OLAP_SUCCESS;
|
||||
}
|
||||
|
||||
OLAPStatus Tablet::calculate_cumulative_point() {
|
||||
WriteLock wrlock(&_meta_lock);
|
||||
if (_cumulative_point != -1) {
|
||||
return OLAP_SUCCESS;
|
||||
}
|
||||
|
||||
std::list<Version> existing_versions;
|
||||
for (auto& rs : _tablet_meta->all_rs_metas()) {
|
||||
existing_versions.emplace_back(rs->version());
|
||||
}
|
||||
|
||||
// sort the existing versions in ascending order
|
||||
existing_versions.sort([](const Version& a, const Version& b) {
|
||||
// simple because 2 versions are certainly not overlapping
|
||||
return a.first < b.first;
|
||||
});
|
||||
|
||||
int64_t prev_version = -1;
|
||||
for (const Version& version : existing_versions) {
|
||||
if (version.first > prev_version + 1) {
|
||||
break;
|
||||
}
|
||||
if (version.first == version.second) {
|
||||
_cumulative_point = version.first;
|
||||
break;
|
||||
}
|
||||
prev_version = version.second;
|
||||
_cumulative_point = version.second + 1;
|
||||
}
|
||||
return OLAP_SUCCESS;
|
||||
}
|
||||
|
||||
OLAPStatus Tablet::split_range(
|
||||
const OlapTuple& start_key_strings,
|
||||
const OlapTuple& end_key_strings,
|
||||
@ -937,4 +982,23 @@ TabletInfo Tablet::get_tablet_info() {
|
||||
return TabletInfo(tablet_id(), schema_hash(), tablet_uid());
|
||||
}
|
||||
|
||||
void Tablet::pick_candicate_rowsets_to_cumulative_compaction(std::vector<RowsetSharedPtr>* candidate_rowsets) {
|
||||
ReadLock rdlock(&_meta_lock);
|
||||
for (auto& it : _rs_version_map) {
|
||||
if (it.first.first >= _cumulative_point
|
||||
&& it.first.first == it.first.second) {
|
||||
candidate_rowsets->push_back(it.second);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void Tablet::pick_candicate_rowsets_to_base_compaction(std::vector<RowsetSharedPtr>* candidate_rowsets) {
|
||||
ReadLock rdlock(&_meta_lock);
|
||||
for (auto& it : _rs_version_map) {
|
||||
if (it.first.first < _cumulative_point) {
|
||||
candidate_rowsets->push_back(it.second);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace doris
|
||||
|
||||
@ -165,14 +165,14 @@ public:
|
||||
inline Mutex* get_push_lock() { return &_ingest_lock; }
|
||||
|
||||
// base lock
|
||||
inline bool try_base_compaction_lock() { return _base_lock.trylock() == OLAP_SUCCESS; }
|
||||
inline void obtain_base_compaction_lock() { _base_lock.lock(); }
|
||||
inline void release_base_compaction_lock() { _base_lock.unlock(); }
|
||||
inline Mutex* get_base_lock() { return &_base_lock; }
|
||||
|
||||
// cumulative lock
|
||||
inline bool try_cumulative_lock() { return (OLAP_SUCCESS == _cumulative_lock.trylock()); }
|
||||
inline void obtain_cumulative_lock() { _cumulative_lock.lock(); }
|
||||
inline void release_cumulative_lock() { _cumulative_lock.unlock(); }
|
||||
inline Mutex* get_cumulative_lock() { return &_cumulative_lock; }
|
||||
|
||||
inline RWMutex* get_migration_lock_ptr() { return &_migration_lock; }
|
||||
|
||||
@ -182,6 +182,8 @@ public:
|
||||
const uint32_t calc_base_compaction_score() const;
|
||||
OLAPStatus compute_all_versions_hash(const std::vector<Version>& versions,
|
||||
VersionHash* version_hash) const;
|
||||
void compute_version_hash_from_rowsets(const std::vector<RowsetSharedPtr>& rowsets,
|
||||
VersionHash* version_hash) const;
|
||||
|
||||
// operation for clone
|
||||
void calc_missed_versions(int64_t spec_version, vector<Version>* missed_versions);
|
||||
@ -232,6 +234,11 @@ public:
|
||||
|
||||
TabletInfo get_tablet_info();
|
||||
|
||||
void pick_candicate_rowsets_to_cumulative_compaction(std::vector<RowsetSharedPtr>* candidate_rowsets);
|
||||
void pick_candicate_rowsets_to_base_compaction(std::vector<RowsetSharedPtr>* candidate_rowsets);
|
||||
|
||||
OLAPStatus calculate_cumulative_point();
|
||||
|
||||
private:
|
||||
void _print_missed_versions(const std::vector<Version>& missed_versions) const;
|
||||
OLAPStatus _check_added_rowset(const RowsetSharedPtr& rowset);
|
||||
@ -257,6 +264,7 @@ private:
|
||||
std::atomic<bool> _is_bad; // if this tablet is broken, set to true. default is false
|
||||
std::atomic<int64_t> _last_compaction_failure_time; // timestamp of last compaction failure
|
||||
|
||||
int64_t _cumulative_point;
|
||||
DISALLOW_COPY_AND_ASSIGN(Tablet);
|
||||
};
|
||||
|
||||
@ -315,11 +323,11 @@ inline void Tablet::set_creation_time(int64_t creation_time) {
|
||||
}
|
||||
|
||||
inline const int64_t Tablet::cumulative_layer_point() const {
|
||||
return _tablet_meta->cumulative_layer_point();
|
||||
return _cumulative_point;
|
||||
}
|
||||
|
||||
void inline Tablet::set_cumulative_layer_point(const int64_t new_point) {
|
||||
return _tablet_meta->set_cumulative_layer_point(new_point);
|
||||
_cumulative_point = new_point;
|
||||
}
|
||||
|
||||
inline bool Tablet::equal(int64_t tablet_id, int32_t schema_hash) {
|
||||
|
||||
@ -745,18 +745,16 @@ TabletSharedPtr TabletManager::find_best_tablet_to_compaction(
|
||||
}
|
||||
|
||||
if (compaction_type == CompactionType::CUMULATIVE_COMPACTION) {
|
||||
if (!table_ptr->try_cumulative_lock()) {
|
||||
MutexLock lock(table_ptr->get_cumulative_lock(), TRY_LOCK);
|
||||
if (!lock.own_lock()) {
|
||||
continue;
|
||||
} else {
|
||||
table_ptr->release_cumulative_lock();
|
||||
}
|
||||
}
|
||||
|
||||
if (compaction_type == CompactionType::BASE_COMPACTION) {
|
||||
if (!table_ptr->try_base_compaction_lock()) {
|
||||
MutexLock lock(table_ptr->get_base_lock(), TRY_LOCK);
|
||||
if (!lock.own_lock()) {
|
||||
continue;
|
||||
} else {
|
||||
table_ptr->release_base_compaction_lock();
|
||||
}
|
||||
}
|
||||
|
||||
@ -1244,7 +1242,6 @@ OLAPStatus TabletManager::_create_inital_rowset(
|
||||
return res;
|
||||
}
|
||||
}
|
||||
tablet->set_cumulative_layer_point(request.version + 1);
|
||||
res = tablet->save_meta();
|
||||
if (res != OLAP_SUCCESS) {
|
||||
LOG(WARNING) << "fail to save header. [tablet=" << tablet->full_name() << "]";
|
||||
|
||||
@ -690,7 +690,7 @@ OLAPStatus EngineCloneTask::_finish_clone(TabletSharedPtr tablet, const string&
|
||||
|
||||
// if full clone success, need to update cumulative layer point
|
||||
if (!is_incremental_clone && res == OLAP_SUCCESS) {
|
||||
tablet->set_cumulative_layer_point(cloned_tablet_meta.cumulative_layer_point());
|
||||
tablet->set_cumulative_layer_point(-1);
|
||||
}
|
||||
|
||||
} while (0);
|
||||
|
||||
@ -192,17 +192,28 @@ private:
|
||||
class MutexLock {
|
||||
public:
|
||||
// wait until obtain the lock
|
||||
explicit MutexLock(Mutex* mutex) : _mutex(mutex) {
|
||||
_mutex->lock();
|
||||
explicit MutexLock(Mutex* mutex, bool try_lock = false)
|
||||
: _mutex(mutex), _locked(false) {
|
||||
if (try_lock) {
|
||||
_locked = (_mutex->trylock() == OLAP_SUCCESS);
|
||||
} else {
|
||||
_mutex->lock();
|
||||
_locked = true;
|
||||
}
|
||||
}
|
||||
|
||||
// unlock is called after
|
||||
~MutexLock() {
|
||||
_mutex->unlock();
|
||||
if (_locked) {
|
||||
_mutex->unlock();
|
||||
}
|
||||
}
|
||||
|
||||
inline bool own_lock() const { return _locked; }
|
||||
|
||||
private:
|
||||
Mutex* _mutex;
|
||||
|
||||
bool _locked;
|
||||
DISALLOW_COPY_AND_ASSIGN(MutexLock);
|
||||
};
|
||||
|
||||
@ -248,25 +259,25 @@ private:
|
||||
class ReadLock {
|
||||
public:
|
||||
explicit ReadLock(RWMutex* mutex, bool try_lock = false)
|
||||
: _mutex(mutex), locked(false) {
|
||||
: _mutex(mutex), _locked(false) {
|
||||
if (try_lock) {
|
||||
locked = this->_mutex->tryrdlock() == OLAP_SUCCESS;
|
||||
_locked = this->_mutex->tryrdlock() == OLAP_SUCCESS;
|
||||
} else {
|
||||
this->_mutex->rdlock();
|
||||
locked = true;
|
||||
_locked = true;
|
||||
}
|
||||
}
|
||||
~ReadLock() {
|
||||
if (locked) {
|
||||
if (_locked) {
|
||||
this->_mutex->unlock();
|
||||
}
|
||||
}
|
||||
|
||||
bool own_lock() { return locked; }
|
||||
bool own_lock() { return _locked; }
|
||||
|
||||
private:
|
||||
RWMutex* _mutex;
|
||||
bool locked;
|
||||
bool _locked;
|
||||
DISALLOW_COPY_AND_ASSIGN(ReadLock);
|
||||
};
|
||||
|
||||
@ -278,25 +289,25 @@ private:
|
||||
class WriteLock {
|
||||
public:
|
||||
explicit WriteLock(RWMutex* mutex, bool try_lock = false)
|
||||
: _mutex(mutex), locked(false) {
|
||||
: _mutex(mutex), _locked(false) {
|
||||
if (try_lock) {
|
||||
locked = this->_mutex->trywrlock() == OLAP_SUCCESS;
|
||||
_locked = this->_mutex->trywrlock() == OLAP_SUCCESS;
|
||||
} else {
|
||||
this->_mutex->wrlock();
|
||||
locked = true;
|
||||
_locked = true;
|
||||
}
|
||||
}
|
||||
~WriteLock() {
|
||||
if (locked) {
|
||||
if (_locked) {
|
||||
this->_mutex->unlock();
|
||||
}
|
||||
}
|
||||
|
||||
bool own_lock() { return locked; }
|
||||
bool own_lock() { return _locked; }
|
||||
|
||||
private:
|
||||
RWMutex* _mutex;
|
||||
bool locked;
|
||||
bool _locked;
|
||||
DISALLOW_COPY_AND_ASSIGN(WriteLock);
|
||||
};
|
||||
|
||||
|
||||
Reference in New Issue
Block a user