[fix](move-memtable) fix commit may fail due to duplicated reports (#32403)

This commit is contained in:
Kaijie Chen
2024-04-19 10:13:18 +08:00
committed by yiguolei
parent 2675e94a93
commit ffd9da44a2
5 changed files with 414 additions and 29 deletions

View File

@ -200,6 +200,18 @@ public:
std::string to_string();
// for tests only
void add_success_tablet(int64_t tablet_id) {
std::lock_guard<bthread::Mutex> lock(_success_tablets_mutex);
_success_tablets.push_back(tablet_id);
}
// for tests only
void add_failed_tablet(int64_t tablet_id, Status reason) {
std::lock_guard<bthread::Mutex> lock(_failed_tablets_mutex);
_failed_tablets[tablet_id] = reason;
}
private:
Status _encode_and_send(PStreamHeader& header, std::span<const Slice> data = {});
Status _send_with_buffer(butil::IOBuf& buf, bool sync = false);

View File

@ -574,39 +574,28 @@ Status VTabletWriterV2::close(Status exec_status) {
// calculate and submit commit info
if (is_last_sink) {
std::unordered_map<int64_t, int> failed_tablets;
std::unordered_map<int64_t, Status> failed_reason;
std::vector<TTabletCommitInfo> tablet_commit_infos;
_load_stream_map->for_each([&](int64_t dst_id, const Streams& streams) {
std::unordered_set<int64_t> known_tablets;
for (const auto& stream : streams) {
for (auto [tablet_id, reason] : stream->failed_tablets()) {
if (known_tablets.contains(tablet_id)) {
continue;
}
known_tablets.insert(tablet_id);
failed_tablets[tablet_id]++;
failed_reason[tablet_id] = reason;
}
for (auto tablet_id : stream->success_tablets()) {
if (known_tablets.contains(tablet_id)) {
continue;
}
known_tablets.insert(tablet_id);
TTabletCommitInfo commit_info;
commit_info.tabletId = tablet_id;
commit_info.backendId = dst_id;
tablet_commit_infos.emplace_back(std::move(commit_info));
DBUG_EXECUTE_IF("VTabletWriterV2.close.add_failed_tablet", {
auto streams = _load_stream_map->at(_tablets_for_node.begin()->first);
int64_t tablet_id = -1;
for (auto& stream : *streams) {
const auto& tablets = stream->success_tablets();
if (tablets.size() > 0) {
tablet_id = tablets[0];
break;
}
}
if (tablet_id != -1) {
LOG(INFO) << "fault injection: adding failed tablet_id: " << tablet_id;
streams->front()->add_failed_tablet(tablet_id,
Status::InternalError("fault injection"));
} else {
LOG(INFO) << "fault injection: failed to inject failed tablet_id";
}
});
for (auto [tablet_id, replicas] : failed_tablets) {
if (replicas > (_num_replicas - 1) / 2) {
return failed_reason.at(tablet_id);
}
}
std::vector<TTabletCommitInfo> tablet_commit_infos;
RETURN_IF_ERROR(
_create_commit_info(tablet_commit_infos, _load_stream_map, _num_replicas));
_state->tablet_commit_infos().insert(
_state->tablet_commit_infos().end(),
std::make_move_iterator(tablet_commit_infos.begin()),
@ -659,4 +648,43 @@ void VTabletWriterV2::_calc_tablets_to_commit() {
}
}
Status VTabletWriterV2::_create_commit_info(std::vector<TTabletCommitInfo>& tablet_commit_infos,
std::shared_ptr<LoadStreamMap> load_stream_map,
int num_replicas) {
std::unordered_map<int64_t, int> failed_tablets;
std::unordered_map<int64_t, Status> failed_reason;
load_stream_map->for_each([&](int64_t dst_id, const Streams& streams) {
std::unordered_set<int64_t> known_tablets;
for (const auto& stream : streams) {
for (auto [tablet_id, reason] : stream->failed_tablets()) {
if (known_tablets.contains(tablet_id)) {
continue;
}
known_tablets.insert(tablet_id);
failed_tablets[tablet_id]++;
failed_reason[tablet_id] = reason;
}
for (auto tablet_id : stream->success_tablets()) {
if (known_tablets.contains(tablet_id)) {
continue;
}
known_tablets.insert(tablet_id);
TTabletCommitInfo commit_info;
commit_info.tabletId = tablet_id;
commit_info.backendId = dst_id;
tablet_commit_infos.emplace_back(std::move(commit_info));
}
}
});
for (auto [tablet_id, replicas] : failed_tablets) {
if (replicas > (num_replicas - 1) / 2) {
LOG(INFO) << "tablet " << tablet_id
<< " failed on majority backends: " << failed_reason[tablet_id];
return failed_reason.at(tablet_id);
}
}
return Status::OK();
}
} // namespace doris::vectorized

View File

@ -114,6 +114,13 @@ public:
Status on_partitions_created(TCreatePartitionResult* result);
#ifndef BE_TEST
private:
#endif
static Status _create_commit_info(std::vector<TTabletCommitInfo>& tablet_commit_infos,
std::shared_ptr<LoadStreamMap> load_stream_map,
int num_replicas);
private:
Status _init_row_distribution();

View File

@ -0,0 +1,239 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "vec/sink/writer/vtablet_writer_v2.h"
#include <gtest/gtest.h>
#include "vec/sink/load_stream_map_pool.h"
#include "vec/sink/load_stream_stub.h"
namespace doris {
class TestVTabletWriterV2 : public ::testing::Test {
public:
TestVTabletWriterV2() = default;
~TestVTabletWriterV2() = default;
static void SetUpTestSuite() {}
static void TearDownTestSuite() {}
};
const int64_t src_id = 1000;
static void add_stream(std::shared_ptr<LoadStreamMap> load_stream_map, int64_t node_id,
std::vector<int64_t> success_tablets,
std::unordered_map<int64_t, Status> failed_tablets) {
auto stub = load_stream_map->get_or_create(node_id);
for (const auto& tablet_id : success_tablets) {
stub->at(0)->add_success_tablet(tablet_id);
}
for (const auto& [tablet_id, reason] : failed_tablets) {
stub->at(0)->add_failed_tablet(tablet_id, reason);
}
}
TEST_F(TestVTabletWriterV2, one_replica) {
UniqueId load_id;
std::vector<TTabletCommitInfo> tablet_commit_infos;
std::shared_ptr<LoadStreamMap> load_stream_map =
std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr);
const int num_replicas = 1;
add_stream(load_stream_map, 1001, {1, 2}, {});
auto st = vectorized::VTabletWriterV2::_create_commit_info(tablet_commit_infos, load_stream_map,
num_replicas);
ASSERT_TRUE(st.ok());
ASSERT_EQ(tablet_commit_infos.size(), 2);
}
TEST_F(TestVTabletWriterV2, one_replica_fail) {
UniqueId load_id;
std::vector<TTabletCommitInfo> tablet_commit_infos;
std::shared_ptr<LoadStreamMap> load_stream_map =
std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr);
const int num_replicas = 1;
add_stream(load_stream_map, 1001, {1}, {{2, Status::InternalError("test")}});
auto st = vectorized::VTabletWriterV2::_create_commit_info(tablet_commit_infos, load_stream_map,
num_replicas);
ASSERT_EQ(st, Status::InternalError("test"));
}
TEST_F(TestVTabletWriterV2, two_replica) {
UniqueId load_id;
std::vector<TTabletCommitInfo> tablet_commit_infos;
std::shared_ptr<LoadStreamMap> load_stream_map =
std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr);
const int num_replicas = 2;
add_stream(load_stream_map, 1001, {1, 2}, {});
add_stream(load_stream_map, 1002, {1, 2}, {});
auto st = vectorized::VTabletWriterV2::_create_commit_info(tablet_commit_infos, load_stream_map,
num_replicas);
ASSERT_TRUE(st.ok());
ASSERT_EQ(tablet_commit_infos.size(), 4);
}
TEST_F(TestVTabletWriterV2, two_replica_fail) {
UniqueId load_id;
std::vector<TTabletCommitInfo> tablet_commit_infos;
std::shared_ptr<LoadStreamMap> load_stream_map =
std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr);
const int num_replicas = 2;
add_stream(load_stream_map, 1001, {1}, {{2, Status::InternalError("test")}});
add_stream(load_stream_map, 1002, {1, 2}, {});
auto st = vectorized::VTabletWriterV2::_create_commit_info(tablet_commit_infos, load_stream_map,
num_replicas);
ASSERT_EQ(st, Status::InternalError("test"));
}
TEST_F(TestVTabletWriterV2, normal) {
UniqueId load_id;
std::vector<TTabletCommitInfo> tablet_commit_infos;
std::shared_ptr<LoadStreamMap> load_stream_map =
std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr);
const int num_replicas = 3;
add_stream(load_stream_map, 1001, {1, 2}, {});
add_stream(load_stream_map, 1002, {1, 2}, {});
add_stream(load_stream_map, 1003, {1, 2}, {});
auto st = vectorized::VTabletWriterV2::_create_commit_info(tablet_commit_infos, load_stream_map,
num_replicas);
ASSERT_TRUE(st.ok());
ASSERT_EQ(tablet_commit_infos.size(), 6);
}
TEST_F(TestVTabletWriterV2, miss_one) {
UniqueId load_id;
std::vector<TTabletCommitInfo> tablet_commit_infos;
std::shared_ptr<LoadStreamMap> load_stream_map =
std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr);
const int num_replicas = 3;
add_stream(load_stream_map, 1001, {1, 2}, {});
add_stream(load_stream_map, 1002, {1}, {});
add_stream(load_stream_map, 1003, {1, 2}, {});
auto st = vectorized::VTabletWriterV2::_create_commit_info(tablet_commit_infos, load_stream_map,
num_replicas);
ASSERT_TRUE(st.ok());
ASSERT_EQ(tablet_commit_infos.size(), 5);
}
TEST_F(TestVTabletWriterV2, miss_two) {
UniqueId load_id;
std::vector<TTabletCommitInfo> tablet_commit_infos;
std::shared_ptr<LoadStreamMap> load_stream_map =
std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr);
const int num_replicas = 3;
add_stream(load_stream_map, 1001, {1, 2}, {});
add_stream(load_stream_map, 1002, {1}, {});
add_stream(load_stream_map, 1003, {1}, {});
auto st = vectorized::VTabletWriterV2::_create_commit_info(tablet_commit_infos, load_stream_map,
num_replicas);
ASSERT_TRUE(st.ok());
ASSERT_EQ(tablet_commit_infos.size(), 4);
}
TEST_F(TestVTabletWriterV2, fail_one) {
UniqueId load_id;
std::vector<TTabletCommitInfo> tablet_commit_infos;
std::shared_ptr<LoadStreamMap> load_stream_map =
std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr);
const int num_replicas = 3;
add_stream(load_stream_map, 1001, {1, 2}, {});
add_stream(load_stream_map, 1002, {1}, {{2, Status::InternalError("test")}});
add_stream(load_stream_map, 1003, {1, 2}, {});
auto st = vectorized::VTabletWriterV2::_create_commit_info(tablet_commit_infos, load_stream_map,
num_replicas);
ASSERT_TRUE(st.ok());
ASSERT_EQ(tablet_commit_infos.size(), 5);
}
TEST_F(TestVTabletWriterV2, fail_one_duplicate) {
UniqueId load_id;
std::vector<TTabletCommitInfo> tablet_commit_infos;
std::shared_ptr<LoadStreamMap> load_stream_map =
std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr);
const int num_replicas = 3;
add_stream(load_stream_map, 1001, {1, 2}, {});
add_stream(load_stream_map, 1002, {1}, {{2, Status::InternalError("test")}});
add_stream(load_stream_map, 1002, {1}, {{2, Status::InternalError("test")}});
add_stream(load_stream_map, 1003, {1, 2}, {});
auto st = vectorized::VTabletWriterV2::_create_commit_info(tablet_commit_infos, load_stream_map,
num_replicas);
// Duplicate tablets from same node should be ignored
ASSERT_TRUE(st.ok());
ASSERT_EQ(tablet_commit_infos.size(), 5);
}
TEST_F(TestVTabletWriterV2, fail_two_diff_tablet_same_node) {
UniqueId load_id;
std::vector<TTabletCommitInfo> tablet_commit_infos;
std::shared_ptr<LoadStreamMap> load_stream_map =
std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr);
const int num_replicas = 3;
add_stream(load_stream_map, 1001, {1, 2}, {});
add_stream(load_stream_map, 1002, {},
{{1, Status::InternalError("test")}, {2, Status::InternalError("test")}});
add_stream(load_stream_map, 1003, {1, 2}, {});
auto st = vectorized::VTabletWriterV2::_create_commit_info(tablet_commit_infos, load_stream_map,
num_replicas);
ASSERT_TRUE(st.ok());
ASSERT_EQ(tablet_commit_infos.size(), 4);
}
TEST_F(TestVTabletWriterV2, fail_two_diff_tablet_diff_node) {
UniqueId load_id;
std::vector<TTabletCommitInfo> tablet_commit_infos;
std::shared_ptr<LoadStreamMap> load_stream_map =
std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr);
const int num_replicas = 3;
add_stream(load_stream_map, 1001, {1, 2}, {});
add_stream(load_stream_map, 1002, {1}, {{2, Status::InternalError("test")}});
add_stream(load_stream_map, 1003, {2}, {{1, Status::InternalError("test")}});
auto st = vectorized::VTabletWriterV2::_create_commit_info(tablet_commit_infos, load_stream_map,
num_replicas);
ASSERT_TRUE(st.ok());
ASSERT_EQ(tablet_commit_infos.size(), 4);
}
TEST_F(TestVTabletWriterV2, fail_two_same_tablet) {
UniqueId load_id;
std::vector<TTabletCommitInfo> tablet_commit_infos;
std::shared_ptr<LoadStreamMap> load_stream_map =
std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr);
const int num_replicas = 3;
add_stream(load_stream_map, 1001, {1, 2}, {});
add_stream(load_stream_map, 1002, {1}, {{2, Status::InternalError("test")}});
add_stream(load_stream_map, 1003, {1}, {{2, Status::InternalError("test")}});
auto st = vectorized::VTabletWriterV2::_create_commit_info(tablet_commit_infos, load_stream_map,
num_replicas);
// BE should detect and abort commit if majority of replicas failed
ASSERT_EQ(st, Status::InternalError("test"));
}
TEST_F(TestVTabletWriterV2, fail_two_miss_one_same_tablet) {
UniqueId load_id;
std::vector<TTabletCommitInfo> tablet_commit_infos;
std::shared_ptr<LoadStreamMap> load_stream_map =
std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr);
const int num_replicas = 3;
add_stream(load_stream_map, 1001, {1}, {});
add_stream(load_stream_map, 1002, {1}, {{2, Status::InternalError("test")}});
add_stream(load_stream_map, 1003, {1}, {{2, Status::InternalError("test")}});
auto st = vectorized::VTabletWriterV2::_create_commit_info(tablet_commit_infos, load_stream_map,
num_replicas);
// BE should detect and abort commit if majority of replicas failed
ASSERT_EQ(st, Status::InternalError("test"));
}
} // namespace doris

View File

@ -0,0 +1,99 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
import org.codehaus.groovy.runtime.IOGroovyMethods
import org.apache.doris.regression.util.Http
suite("test_commit_info_fault_injection", "nonConcurrent") {
def res = sql"show backends;"
logger.info(res.toString())
def beNums = 0;
res.each { item ->
beNums++;
logger.info(item.toString())
}
if (beNums == 3){
result = sql "show VARIABLES like \'enable_memtable_on_sink_node\'"
log.info(result.toString())
original_status = result[0][1]
sql """ set enable_memtable_on_sink_node=true """
sql """
CREATE TABLE IF NOT EXISTS `baseall` (
`k0` boolean null comment "",
`k1` tinyint(4) null comment "",
`k2` smallint(6) null comment "",
`k3` int(11) null comment "",
`k4` bigint(20) null comment "",
`k5` decimal(9, 3) null comment "",
`k6` char(5) null comment "",
`k10` date null comment "",
`k11` datetime null comment "",
`k7` varchar(20) null comment "",
`k8` double max null comment "",
`k9` float sum null comment "",
`k12` string replace null comment "",
`k13` largeint(40) replace null comment ""
) engine=olap
DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = "3")
"""
sql """
CREATE TABLE IF NOT EXISTS `test` (
`k0` boolean null comment "",
`k1` tinyint(4) null comment "",
`k2` smallint(6) null comment "",
`k3` int(11) null comment "",
`k4` bigint(20) null comment "",
`k5` decimal(9, 3) null comment "",
`k6` char(5) null comment "",
`k10` date null comment "",
`k11` datetime null comment "",
`k7` varchar(20) null comment "",
`k8` double max null comment "",
`k9` float sum null comment "",
`k12` string replace_if_not_null null comment "",
`k13` largeint(40) replace null comment ""
) engine=olap
DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = "3")
"""
GetDebugPoint().clearDebugPointsForAllBEs()
streamLoad {
table "baseall"
db "regression_test_fault_injection_p0"
set 'column_separator', ','
file "baseall.txt"
}
def load_with_injection = { injection, error_msg->
try {
GetDebugPoint().enableDebugPointForAllBEs(injection)
sql "insert into test select * from baseall where k1 <= 3"
} catch(Exception e) {
logger.info(e.getMessage())
assertTrue(e.getMessage().contains(error_msg))
} finally {
GetDebugPoint().disableDebugPointForAllBEs(injection)
}
}
// One replica commit failed, load should success
load_with_injection("VTabletWriterV2.close.add_failed_tablet", "sucess")
sql """set enable_memtable_on_sink_node = ${original_status}"""
}
}