diff --git a/be/test/runtime/load_stream_test.cpp b/be/test/runtime/load_stream_test.cpp index 7cba945dd2..6509f5aa35 100644 --- a/be/test/runtime/load_stream_test.cpp +++ b/be/test/runtime/load_stream_test.cpp @@ -572,7 +572,7 @@ public: } void wait_for_close() { - for (int i = 0; i < 1000 && _load_stream_mgr->get_load_stream_num() != 0; i++) { + for (int i = 0; i < 3000 && _load_stream_mgr->get_load_stream_num() != 0; i++) { bthread_usleep(1000); } } @@ -851,6 +851,46 @@ TEST_F(LoadStreamMgrTest, one_client_one_index_one_tablet_single_segment0_zero_b EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0); } +TEST_F(LoadStreamMgrTest, close_load_before_recv_eos) { + MockSinkClient client; + auto st = client.connect_stream(); + EXPECT_TRUE(st.ok()); + + reset_response_stat(); + + // append data + butil::IOBuf append_buf; + PStreamHeader header; + std::string data = "file1 hello world 123 !@#$%^&*()_+"; + write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID, NORMAL_TABLET_ID, 0, + data.length(), data, false); + + EXPECT_EQ(g_response_stat.num, 0); + // CLOSE_LOAD before EOS + close_load(client); + wait_for_ack(1); + EXPECT_EQ(g_response_stat.num, 1); + EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); + EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1); + // server will close stream on CLOSE_LOAD + wait_for_close(); + EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0); + + // then the late EOS, will not be handled + write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID, NORMAL_TABLET_ID, 0, + data.length(), data, true); + + // duplicated close, will not be handled + close_load(client); + wait_for_ack(2); + EXPECT_EQ(g_response_stat.num, 1); + EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); + EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1); + + auto written_data = read_data(NORMAL_TXN_ID, NORMAL_PARTITION_ID, NORMAL_TABLET_ID, 0); + EXPECT_EQ(written_data, ""); +} + TEST_F(LoadStreamMgrTest, one_client_one_index_one_tablet_single_segment0) { MockSinkClient client; auto st = client.connect_stream();