pick https://github.com/apache/doris/pull/49825
This commit is contained in:
@ -35,6 +35,7 @@
|
||||
#include "olap/schema_change.h"
|
||||
#include "olap/tablet_meta.h"
|
||||
#include "runtime/thread_context.h"
|
||||
#include "util/debug_points.h"
|
||||
#include "util/thread.h"
|
||||
#include "util/trace.h"
|
||||
|
||||
@ -112,13 +113,46 @@ Status FullCompaction::pick_rowsets_to_compact() {
|
||||
}
|
||||
|
||||
Status FullCompaction::modify_rowsets(const Merger::Statistics* stats) {
|
||||
std::vector<RowsetSharedPtr> output_rowsets {_output_rowset};
|
||||
if (_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
|
||||
_tablet->enable_unique_key_merge_on_write()) {
|
||||
RETURN_IF_ERROR(
|
||||
_full_compaction_update_delete_bitmap(_output_rowset, _output_rs_writer.get()));
|
||||
}
|
||||
std::vector<RowsetSharedPtr> output_rowsets(1, _output_rowset);
|
||||
{
|
||||
std::vector<RowsetSharedPtr> tmp_rowsets {};
|
||||
|
||||
// tablet is under alter process. The delete bitmap will be calculated after conversion.
|
||||
if (_tablet->tablet_state() == TABLET_NOTREADY) {
|
||||
LOG(INFO) << "tablet is under alter process, update delete bitmap later, tablet_id="
|
||||
<< _tablet->tablet_id();
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
int64_t max_version = _tablet->max_version().second;
|
||||
DCHECK(max_version >= _output_rowset->version().second);
|
||||
if (max_version > _output_rowset->version().second) {
|
||||
RETURN_IF_ERROR(_tablet->capture_consistent_rowsets(
|
||||
{_output_rowset->version().second + 1, max_version}, &tmp_rowsets));
|
||||
}
|
||||
|
||||
for (const auto& it : tmp_rowsets) {
|
||||
const int64_t& cur_version = it->rowset_meta()->start_version();
|
||||
RETURN_IF_ERROR(_full_compaction_calc_delete_bitmap(it, _output_rowset, cur_version,
|
||||
_output_rs_writer.get()));
|
||||
}
|
||||
DBUG_EXECUTE_IF("FullCompaction.modify_rowsets.before.block", DBUG_BLOCK);
|
||||
std::lock_guard rowset_update_lock(_tablet->get_rowset_update_lock());
|
||||
std::lock_guard header_lock(_tablet->get_header_lock());
|
||||
SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD);
|
||||
for (const auto& it : _tablet->rowset_map()) {
|
||||
const int64_t& cur_version = it.first.first;
|
||||
const RowsetSharedPtr& published_rowset = it.second;
|
||||
if (cur_version > max_version) {
|
||||
RETURN_IF_ERROR(_full_compaction_calc_delete_bitmap(
|
||||
published_rowset, _output_rowset, cur_version, _output_rs_writer.get()));
|
||||
}
|
||||
}
|
||||
RETURN_IF_ERROR(_tablet->modify_rowsets(output_rowsets, _input_rowsets, true));
|
||||
DBUG_EXECUTE_IF("FullCompaction.modify_rowsets.sleep", { sleep(5); })
|
||||
_tablet->save_meta();
|
||||
} else {
|
||||
std::lock_guard<std::mutex> rowset_update_wlock(_tablet->get_rowset_update_lock());
|
||||
std::lock_guard<std::shared_mutex> meta_wlock(_tablet->get_header_lock());
|
||||
RETURN_IF_ERROR(_tablet->modify_rowsets(output_rowsets, _input_rowsets, true));
|
||||
@ -148,55 +182,16 @@ Status FullCompaction::_check_all_version(const std::vector<RowsetSharedPtr>& ro
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status FullCompaction::_full_compaction_update_delete_bitmap(const RowsetSharedPtr& rowset,
|
||||
RowsetWriter* rowset_writer) {
|
||||
std::vector<RowsetSharedPtr> tmp_rowsets {};
|
||||
|
||||
// tablet is under alter process. The delete bitmap will be calculated after conversion.
|
||||
if (_tablet->tablet_state() == TABLET_NOTREADY) {
|
||||
LOG(INFO) << "tablet is under alter process, update delete bitmap later, tablet_id="
|
||||
<< _tablet->tablet_id();
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
int64_t max_version = _tablet->max_version().second;
|
||||
DCHECK(max_version >= rowset->version().second);
|
||||
if (max_version > rowset->version().second) {
|
||||
RETURN_IF_ERROR(_tablet->capture_consistent_rowsets(
|
||||
{rowset->version().second + 1, max_version}, &tmp_rowsets));
|
||||
}
|
||||
|
||||
for (const auto& it : tmp_rowsets) {
|
||||
const int64_t& cur_version = it->rowset_meta()->start_version();
|
||||
RETURN_IF_ERROR(
|
||||
_full_compaction_calc_delete_bitmap(it, rowset, cur_version, rowset_writer));
|
||||
}
|
||||
|
||||
std::lock_guard rowset_update_lock(_tablet->get_rowset_update_lock());
|
||||
std::lock_guard header_lock(_tablet->get_header_lock());
|
||||
SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD);
|
||||
for (const auto& it : _tablet->rowset_map()) {
|
||||
const int64_t& cur_version = it.first.first;
|
||||
const RowsetSharedPtr& published_rowset = it.second;
|
||||
if (cur_version > max_version) {
|
||||
RETURN_IF_ERROR(_full_compaction_calc_delete_bitmap(published_rowset, rowset,
|
||||
cur_version, rowset_writer));
|
||||
}
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status FullCompaction::_full_compaction_calc_delete_bitmap(const RowsetSharedPtr& published_rowset,
|
||||
const RowsetSharedPtr& rowset,
|
||||
const int64_t& cur_version,
|
||||
int64_t cur_version,
|
||||
RowsetWriter* rowset_writer) {
|
||||
std::vector<segment_v2::SegmentSharedPtr> segments;
|
||||
auto beta_rowset = reinterpret_cast<BetaRowset*>(published_rowset.get());
|
||||
RETURN_IF_ERROR(beta_rowset->load_segments(&segments));
|
||||
RETURN_IF_ERROR(
|
||||
std::static_pointer_cast<BetaRowset>(published_rowset)->load_segments(&segments));
|
||||
DeleteBitmapPtr delete_bitmap =
|
||||
std::make_shared<DeleteBitmap>(_tablet->tablet_meta()->tablet_id());
|
||||
std::vector<RowsetSharedPtr> specified_rowsets(1, rowset);
|
||||
std::vector<RowsetSharedPtr> specified_rowsets {rowset};
|
||||
|
||||
OlapStopWatch watch;
|
||||
RETURN_IF_ERROR(_tablet->calc_delete_bitmap(published_rowset, segments, specified_rowsets,
|
||||
@ -213,10 +208,11 @@ Status FullCompaction::_full_compaction_calc_delete_bitmap(const RowsetSharedPtr
|
||||
<< ", cost: " << watch.get_elapse_time_us() << "(us), total rows: " << total_rows;
|
||||
|
||||
for (const auto& [k, v] : delete_bitmap->delete_bitmap) {
|
||||
_tablet->tablet_meta()->delete_bitmap().merge({std::get<0>(k), std::get<1>(k), cur_version},
|
||||
v);
|
||||
if (std::get<1>(k) != DeleteBitmap::INVALID_SEGMENT_ID) {
|
||||
_tablet->tablet_meta()->delete_bitmap().merge(
|
||||
{std::get<0>(k), std::get<1>(k), cur_version}, v);
|
||||
}
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
|
||||
@ -47,11 +47,8 @@ protected:
|
||||
|
||||
private:
|
||||
Status _check_all_version(const std::vector<RowsetSharedPtr>& rowsets);
|
||||
Status _full_compaction_update_delete_bitmap(const RowsetSharedPtr& rowset,
|
||||
RowsetWriter* rowset_writer);
|
||||
Status _full_compaction_calc_delete_bitmap(const RowsetSharedPtr& published_rowset,
|
||||
const RowsetSharedPtr& rowset,
|
||||
const int64_t& cur_version,
|
||||
const RowsetSharedPtr& rowset, int64_t cur_version,
|
||||
RowsetWriter* rowset_writer);
|
||||
|
||||
DISALLOW_COPY_AND_ASSIGN(FullCompaction);
|
||||
|
||||
@ -0,0 +1,16 @@
|
||||
-- This file is automatically generated. You should know what you did if you want to edit this
|
||||
-- !sql --
|
||||
0 0
|
||||
1 99
|
||||
2 99
|
||||
3 99
|
||||
|
||||
-- !dup_key_count --
|
||||
0
|
||||
|
||||
-- !sql --
|
||||
0 0
|
||||
1 99
|
||||
2 99
|
||||
3 99
|
||||
|
||||
@ -0,0 +1,98 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
suite("test_full_compaction_mow","nonConcurrent") {
|
||||
if (isCloudMode()) {
|
||||
return
|
||||
}
|
||||
|
||||
def backends = sql_return_maparray('show backends')
|
||||
if (backends.size() > 1) {
|
||||
return
|
||||
}
|
||||
|
||||
def tableName = "test_full_compaction_mow"
|
||||
sql """ DROP TABLE IF EXISTS ${tableName} """
|
||||
sql """
|
||||
CREATE TABLE IF NOT EXISTS ${tableName} (
|
||||
`k` int ,
|
||||
`v` int ,
|
||||
) engine=olap
|
||||
UNIQUE KEY(k)
|
||||
DISTRIBUTED BY HASH(`k`) BUCKETS 1
|
||||
properties(
|
||||
"replication_num" = "1",
|
||||
"disable_auto_compaction" = "true")
|
||||
"""
|
||||
|
||||
sql """ INSERT INTO ${tableName} VALUES (0,00)"""
|
||||
sql """ INSERT INTO ${tableName} VALUES (1,10)"""
|
||||
sql """ INSERT INTO ${tableName} VALUES (2,20)"""
|
||||
sql """ INSERT INTO ${tableName} VALUES (3,30)"""
|
||||
|
||||
def tabletStats = sql_return_maparray("show tablets from ${tableName};")
|
||||
def tabletId = tabletStats[0].TabletId
|
||||
def tabletBackendId = tabletStats[0].BackendId
|
||||
def tabletBackend
|
||||
for (def be : backends) {
|
||||
if (be.BackendId == tabletBackendId) {
|
||||
tabletBackend = be
|
||||
break;
|
||||
}
|
||||
}
|
||||
logger.info("tablet ${tabletId} on backend ${tabletBackend.Host} with backendId=${tabletBackend.BackendId}");
|
||||
|
||||
GetDebugPoint().clearDebugPointsForAllBEs()
|
||||
|
||||
try {
|
||||
GetDebugPoint().enableDebugPointForAllBEs("FullCompaction.modify_rowsets.before.block")
|
||||
|
||||
// trigger full compaction
|
||||
logger.info("trigger full compaction on BE ${tabletBackend.Host} with backendId=${tabletBackend.BackendId}")
|
||||
def (code, out, err) = be_run_full_compaction(tabletBackend.Host, tabletBackend.HttpPort, tabletId)
|
||||
logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)
|
||||
assert code == 0
|
||||
def compactJson = parseJson(out.trim())
|
||||
assert "success" == compactJson.status.toLowerCase()
|
||||
|
||||
sql """ INSERT INTO ${tableName} VALUES (1,99),(2,99),(3,99);"""
|
||||
qt_sql "select * from ${tableName} order by k;"
|
||||
|
||||
GetDebugPoint().disableDebugPointForAllBEs("FullCompaction.modify_rowsets.before.block")
|
||||
|
||||
// wait for compaction to finish
|
||||
def running = true
|
||||
do {
|
||||
Thread.sleep(1000)
|
||||
(code, out, err) = be_get_compaction_status(tabletBackend.Host, tabletBackend.HttpPort, tabletId)
|
||||
logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err)
|
||||
assertEquals(code, 0)
|
||||
def compactionStatus = parseJson(out.trim())
|
||||
assertEquals("success", compactionStatus.status.toLowerCase())
|
||||
running = compactionStatus.run_status
|
||||
} while (running)
|
||||
|
||||
qt_dup_key_count "select count() from (select k, count(*) from ${tableName} group by k having count(*) > 1) t"
|
||||
qt_sql "select * from ${tableName} order by k;"
|
||||
} catch (Exception e) {
|
||||
logger.info(e.getMessage())
|
||||
exception = true;
|
||||
} finally {
|
||||
GetDebugPoint().clearDebugPointsForAllBEs()
|
||||
GetDebugPoint().clearDebugPointsForAllFEs()
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user