[enhancement](checksum) use vertorized engine in checksum (#15260)
This commit is contained in:
@ -520,7 +520,8 @@ Status TabletReader::_init_delete_condition(const ReaderParams& read_params) {
|
||||
// other reader type:
|
||||
// QUERY will filter the row in query layer to keep right result use where clause.
|
||||
// CUMULATIVE_COMPACTION will lost the filter_delete info of base rowset
|
||||
if (read_params.reader_type == READER_BASE_COMPACTION) {
|
||||
if (read_params.reader_type == READER_BASE_COMPACTION ||
|
||||
read_params.reader_type == READER_CHECKSUM) {
|
||||
_filter_delete = true;
|
||||
}
|
||||
|
||||
@ -528,4 +529,51 @@ Status TabletReader::_init_delete_condition(const ReaderParams& read_params) {
|
||||
read_params.version.second);
|
||||
}
|
||||
|
||||
Status TabletReader::init_reader_params_and_create_block(
|
||||
TabletSharedPtr tablet, ReaderType reader_type,
|
||||
const std::vector<RowsetSharedPtr>& input_rowsets,
|
||||
TabletReader::ReaderParams* reader_params, vectorized::Block* block) {
|
||||
reader_params->tablet = tablet;
|
||||
reader_params->reader_type = reader_type;
|
||||
reader_params->version =
|
||||
Version(input_rowsets.front()->start_version(), input_rowsets.back()->end_version());
|
||||
|
||||
for (auto& rowset : input_rowsets) {
|
||||
RowsetReaderSharedPtr rs_reader;
|
||||
RETURN_NOT_OK(rowset->create_reader(&rs_reader));
|
||||
reader_params->rs_readers.push_back(std::move(rs_reader));
|
||||
}
|
||||
|
||||
std::vector<RowsetMetaSharedPtr> rowset_metas(input_rowsets.size());
|
||||
std::transform(input_rowsets.begin(), input_rowsets.end(), rowset_metas.begin(),
|
||||
[](const RowsetSharedPtr& rowset) { return rowset->rowset_meta(); });
|
||||
TabletSchemaSPtr read_tablet_schema =
|
||||
tablet->rowset_meta_with_max_schema_version(rowset_metas)->tablet_schema();
|
||||
TabletSchemaSPtr merge_tablet_schema = std::make_shared<TabletSchema>();
|
||||
merge_tablet_schema->copy_from(*read_tablet_schema);
|
||||
{
|
||||
std::shared_lock rdlock(tablet->get_header_lock());
|
||||
auto& delete_preds = tablet->delete_predicates();
|
||||
std::copy(delete_preds.cbegin(), delete_preds.cend(),
|
||||
std::inserter(reader_params->delete_predicates,
|
||||
reader_params->delete_predicates.begin()));
|
||||
}
|
||||
// Merge the columns in delete predicate that not in latest schema in to current tablet schema
|
||||
for (auto& del_pred_pb : reader_params->delete_predicates) {
|
||||
merge_tablet_schema->merge_dropped_columns(tablet->tablet_schema(del_pred_pb->version()));
|
||||
}
|
||||
reader_params->tablet_schema = merge_tablet_schema;
|
||||
if (tablet->enable_unique_key_merge_on_write()) {
|
||||
reader_params->delete_bitmap = &tablet->tablet_meta()->delete_bitmap();
|
||||
}
|
||||
|
||||
reader_params->return_columns.resize(read_tablet_schema->num_columns());
|
||||
std::iota(reader_params->return_columns.begin(), reader_params->return_columns.end(), 0);
|
||||
reader_params->origin_return_columns = &reader_params->return_columns;
|
||||
|
||||
*block = read_tablet_schema->create_block();
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
} // namespace doris
|
||||
|
||||
@ -142,6 +142,10 @@ public:
|
||||
OlapReaderStatistics* mutable_stats() { return &_stats; }
|
||||
|
||||
virtual bool update_profile(RuntimeProfile* profile) { return false; }
|
||||
static Status init_reader_params_and_create_block(
|
||||
TabletSharedPtr tablet, ReaderType reader_type,
|
||||
const std::vector<RowsetSharedPtr>& input_rowsets,
|
||||
TabletReader::ReaderParams* reader_params, vectorized::Block* block);
|
||||
|
||||
protected:
|
||||
friend class vectorized::VCollectIterator;
|
||||
|
||||
@ -18,12 +18,13 @@
|
||||
#include "olap/task/engine_checksum_task.h"
|
||||
|
||||
#include "runtime/thread_context.h"
|
||||
#include "vec/olap/block_reader.h"
|
||||
|
||||
namespace doris {
|
||||
|
||||
EngineChecksumTask::EngineChecksumTask(TTabletId tablet_id, TSchemaHash schema_hash,
|
||||
TVersion version, uint32_t* checksum)
|
||||
: _tablet_id(tablet_id), _schema_hash(schema_hash), _version(version) {
|
||||
: _tablet_id(tablet_id), _schema_hash(schema_hash), _version(version), _checksum(checksum) {
|
||||
_mem_tracker = std::make_shared<MemTrackerLimiter>(
|
||||
MemTrackerLimiter::Type::CONSISTENCY,
|
||||
"EngineChecksumTask#tabletId=" + std::to_string(tablet_id));
|
||||
@ -38,7 +39,52 @@ Status EngineChecksumTask::_compute_checksum() {
|
||||
LOG(INFO) << "begin to process compute checksum."
|
||||
<< "tablet_id=" << _tablet_id << ", schema_hash=" << _schema_hash
|
||||
<< ", version=" << _version;
|
||||
return Status::InternalError("Not implemented yet");
|
||||
|
||||
if (_checksum == nullptr) {
|
||||
return Status::InvalidArgument("invalid checksum which is nullptr");
|
||||
}
|
||||
|
||||
TabletSharedPtr tablet = StorageEngine::instance()->tablet_manager()->get_tablet(_tablet_id);
|
||||
if (nullptr == tablet) {
|
||||
return Status::InternalError("could not find tablet {}", _tablet_id);
|
||||
}
|
||||
|
||||
std::vector<RowsetSharedPtr> input_rowsets;
|
||||
Version version(0, _version);
|
||||
Status acquire_reader_st = tablet->capture_consistent_rowsets(version, &input_rowsets);
|
||||
if (acquire_reader_st != Status::OK()) {
|
||||
LOG(WARNING) << "fail to captute consistent rowsets. tablet=" << tablet->full_name()
|
||||
<< "res=" << acquire_reader_st;
|
||||
return acquire_reader_st;
|
||||
}
|
||||
vectorized::BlockReader reader;
|
||||
TabletReader::ReaderParams reader_params;
|
||||
vectorized::Block block;
|
||||
RETURN_NOT_OK(TabletReader::init_reader_params_and_create_block(
|
||||
tablet, READER_CHECKSUM, input_rowsets, &reader_params, &block))
|
||||
|
||||
auto res = reader.init(reader_params);
|
||||
if (!res.ok()) {
|
||||
LOG(WARNING) << "initiate reader fail. res = " << res;
|
||||
return res;
|
||||
}
|
||||
|
||||
bool eof = false;
|
||||
SipHash block_hash;
|
||||
uint64_t rows = 0;
|
||||
while (!eof) {
|
||||
RETURN_IF_ERROR(reader.next_block_with_aggregation(&block, nullptr, nullptr, &eof));
|
||||
rows += block.rows();
|
||||
|
||||
block.update_hash(block_hash);
|
||||
block.clear_column_data();
|
||||
}
|
||||
uint64_t checksum64 = block_hash.get64();
|
||||
*_checksum = (checksum64 >> 32) ^ (checksum64 & 0xffffffff);
|
||||
|
||||
LOG(INFO) << "success to finish compute checksum. tablet_id = " << _tablet_id
|
||||
<< ", rows = " << rows << ", checksum=" << *_checksum;
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
} // namespace doris
|
||||
|
||||
@ -43,6 +43,7 @@ private:
|
||||
TTabletId _tablet_id;
|
||||
TSchemaHash _schema_hash;
|
||||
TVersion _version;
|
||||
uint32_t* _checksum;
|
||||
std::shared_ptr<MemTrackerLimiter> _mem_tracker;
|
||||
}; // EngineTask
|
||||
|
||||
|
||||
Reference in New Issue
Block a user