Cherry-picked from #44680 Co-authored-by: Kaijie Chen <chenkaijie@selectdb.com>
This commit is contained in:
committed by
GitHub
parent
b332217584
commit
cf2e2113db
@ -219,11 +219,7 @@ Status LoadStreamStub::append_data(int64_t partition_id, int64_t index_id, int64
|
||||
add_failed_tablet(tablet_id, _status);
|
||||
return _status;
|
||||
}
|
||||
DBUG_EXECUTE_IF("LoadStreamStub.only_send_segment_0", {
|
||||
if (segment_id != 0) {
|
||||
return Status::OK();
|
||||
}
|
||||
});
|
||||
DBUG_EXECUTE_IF("LoadStreamStub.skip_send_segment", { return Status::OK(); });
|
||||
PStreamHeader header;
|
||||
header.set_src_id(_src_id);
|
||||
*header.mutable_load_id() = _load_id;
|
||||
@ -245,11 +241,7 @@ Status LoadStreamStub::add_segment(int64_t partition_id, int64_t index_id, int64
|
||||
add_failed_tablet(tablet_id, _status);
|
||||
return _status;
|
||||
}
|
||||
DBUG_EXECUTE_IF("LoadStreamStub.only_send_segment_0", {
|
||||
if (segment_id != 0) {
|
||||
return Status::OK();
|
||||
}
|
||||
});
|
||||
DBUG_EXECUTE_IF("LoadStreamStub.skip_send_segment", { return Status::OK(); });
|
||||
PStreamHeader header;
|
||||
header.set_src_id(_src_id);
|
||||
*header.mutable_load_id() = _load_id;
|
||||
@ -339,6 +331,10 @@ Status LoadStreamStub::wait_for_schema(int64_t partition_id, int64_t index_id, i
|
||||
|
||||
Status LoadStreamStub::close_wait(RuntimeState* state, int64_t timeout_ms) {
|
||||
DBUG_EXECUTE_IF("LoadStreamStub::close_wait.long_wait", DBUG_BLOCK);
|
||||
if (!_is_open.load()) {
|
||||
// we don't need to close wait on non-open streams
|
||||
return Status::OK();
|
||||
}
|
||||
if (!_is_closing.load()) {
|
||||
return _status;
|
||||
}
|
||||
|
||||
@ -268,14 +268,20 @@ Status VTabletWriterV2::open(RuntimeState* state, RuntimeProfile* profile) {
|
||||
}
|
||||
|
||||
Status VTabletWriterV2::_open_streams() {
|
||||
bool fault_injection_skip_be = true;
|
||||
int fault_injection_skip_be = 0;
|
||||
bool any_backend = false;
|
||||
bool any_success = false;
|
||||
for (auto& [dst_id, _] : _tablets_for_node) {
|
||||
auto streams = _load_stream_map->get_or_create(dst_id);
|
||||
DBUG_EXECUTE_IF("VTabletWriterV2._open_streams.skip_one_backend", {
|
||||
if (fault_injection_skip_be) {
|
||||
fault_injection_skip_be = false;
|
||||
if (fault_injection_skip_be < 1) {
|
||||
fault_injection_skip_be++;
|
||||
continue;
|
||||
}
|
||||
});
|
||||
DBUG_EXECUTE_IF("VTabletWriterV2._open_streams.skip_two_backends", {
|
||||
if (fault_injection_skip_be < 2) {
|
||||
fault_injection_skip_be++;
|
||||
continue;
|
||||
}
|
||||
});
|
||||
|
||||
@ -75,14 +75,15 @@ suite("test_multi_replica_fault_injection", "nonConcurrent") {
|
||||
file "baseall.txt"
|
||||
}
|
||||
|
||||
def load_with_injection = { injection, error_msg->
|
||||
def load_with_injection = { injection, error_msg, success=false->
|
||||
try {
|
||||
sql "truncate table test"
|
||||
GetDebugPoint().enableDebugPointForAllBEs(injection)
|
||||
sql "insert into test select * from baseall where k1 <= 3"
|
||||
assertTrue(success, String.format("Expected Exception '%s', actual success", error_msg))
|
||||
} catch(Exception e) {
|
||||
logger.info(e.getMessage())
|
||||
assertTrue(e.getMessage().contains(error_msg))
|
||||
assertTrue(e.getMessage().contains(error_msg), e.toString())
|
||||
} finally {
|
||||
GetDebugPoint().disableDebugPointForAllBEs(injection)
|
||||
}
|
||||
@ -90,15 +91,17 @@ suite("test_multi_replica_fault_injection", "nonConcurrent") {
|
||||
|
||||
// StreamSinkFileWriter appendv write segment failed one replica
|
||||
// success
|
||||
load_with_injection("StreamSinkFileWriter.appendv.write_segment_failed_one_replica", "sucess")
|
||||
load_with_injection("StreamSinkFileWriter.appendv.write_segment_failed_one_replica", "sucess", true)
|
||||
// StreamSinkFileWriter appendv write segment failed two replica
|
||||
load_with_injection("StreamSinkFileWriter.appendv.write_segment_failed_two_replica", "add segment failed")
|
||||
// StreamSinkFileWriter appendv write segment failed all replica
|
||||
load_with_injection("StreamSinkFileWriter.appendv.write_segment_failed_all_replica", "failed to send segment data to any replicas")
|
||||
// test segment num check when LoadStreamStub missed tail segments
|
||||
load_with_injection("LoadStreamStub.only_send_segment_0", "segment num mismatch")
|
||||
load_with_injection("LoadStreamStub.skip_send_segment", "segment num mismatch")
|
||||
// test one backend open failure
|
||||
load_with_injection("VTabletWriterV2._open_streams.skip_one_backend", "success")
|
||||
load_with_injection("VTabletWriterV2._open_streams.skip_one_backend", "success", true)
|
||||
// test two backend open failure
|
||||
load_with_injection("VTabletWriterV2._open_streams.skip_two_backends", "not enough streams 1/3")
|
||||
sql """ set enable_memtable_on_sink_node=false """
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user