[ut](move-memtable) add CLOSE_LOAD before EOS ut case (#29253)

Signed-off-by: freemandealer <freeman.zhang1992@gmail.com>
This commit is contained in:
zhengyu
2023-12-29 00:33:34 +08:00
committed by GitHub
parent 99a1e066b5
commit efea006f3a

View File

@ -572,7 +572,7 @@ public:
}
void wait_for_close() {
for (int i = 0; i < 1000 && _load_stream_mgr->get_load_stream_num() != 0; i++) {
for (int i = 0; i < 3000 && _load_stream_mgr->get_load_stream_num() != 0; i++) {
bthread_usleep(1000);
}
}
@ -851,6 +851,46 @@ TEST_F(LoadStreamMgrTest, one_client_one_index_one_tablet_single_segment0_zero_b
EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0);
}
TEST_F(LoadStreamMgrTest, close_load_before_recv_eos) {
MockSinkClient client;
auto st = client.connect_stream();
EXPECT_TRUE(st.ok());
reset_response_stat();
// append data
butil::IOBuf append_buf;
PStreamHeader header;
std::string data = "file1 hello world 123 !@#$%^&*()_+";
write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID, NORMAL_TABLET_ID, 0,
data.length(), data, false);
EXPECT_EQ(g_response_stat.num, 0);
// CLOSE_LOAD before EOS
close_load(client);
wait_for_ack(1);
EXPECT_EQ(g_response_stat.num, 1);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1);
// server will close stream on CLOSE_LOAD
wait_for_close();
EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0);
// then the late EOS, will not be handled
write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID, NORMAL_TABLET_ID, 0,
data.length(), data, true);
// duplicated close, will not be handled
close_load(client);
wait_for_ack(2);
EXPECT_EQ(g_response_stat.num, 1);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1);
auto written_data = read_data(NORMAL_TXN_ID, NORMAL_PARTITION_ID, NORMAL_TABLET_ID, 0);
EXPECT_EQ(written_data, "");
}
TEST_F(LoadStreamMgrTest, one_client_one_index_one_tablet_single_segment0) {
MockSinkClient client;
auto st = client.connect_stream();