// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "common/config.h" #include "common/status.h" #include "exec/tablet_info.h" #include "gen_cpp/BackendService_types.h" #include "gen_cpp/FrontendService_types.h" #include "gtest/gtest_pred_impl.h" #include "io/fs/local_file_system.h" #include "olap/olap_define.h" #include "olap/rowset/beta_rowset.h" #include "olap/tablet_manager.h" #include "olap/txn_manager.h" #include "runtime/descriptor_helper.h" #include "runtime/exec_env.h" #include "runtime/load_stream_mgr.h" #include "util/debug/leakcheck_disabler.h" #include "util/runtime_profile.h" using namespace brpc; namespace doris { static const uint32_t MAX_PATH_LEN = 1024; static std::unique_ptr k_engine; static const std::string zTestDir = "./data_test/data/load_stream_mgr_test"; const int64_t NORMAL_TABLET_ID = 10000; const int64_t ABNORMAL_TABLET_ID = 40000; const int64_t NORMAL_INDEX_ID = 50000; const int64_t ABNORMAL_INDEX_ID = 60000; const int64_t NORMAL_PARTITION_ID = 50000; const int64_t SCHEMA_HASH = 90000; const uint32_t NORMAL_SENDER_ID = 0; const uint32_t ABNORMAL_SENDER_ID = 10000; const int64_t NORMAL_TXN_ID = 600001; const UniqueId NORMAL_LOAD_ID(1, 1); const UniqueId ABNORMAL_LOAD_ID(1, 0); std::string NORMAL_STRING("normal"); std::string ABNORMAL_STRING("abnormal"); void construct_schema(OlapTableSchemaParam* schema) { // construct schema TOlapTableSchemaParam tschema; tschema.db_id = 1; tschema.table_id = 2; tschema.version = 0; // descriptor { TDescriptorTableBuilder dtb; { TTupleDescriptorBuilder tuple_builder; tuple_builder.add_slot(TSlotDescriptorBuilder() .type(TYPE_INT) .column_name("c1") .column_pos(1) .build()); tuple_builder.add_slot(TSlotDescriptorBuilder() .type(TYPE_BIGINT) .column_name("c2") .column_pos(2) .build()); tuple_builder.add_slot(TSlotDescriptorBuilder() .string_type(10) .column_name("c3") .column_pos(3) .build()); tuple_builder.build(&dtb); } { TTupleDescriptorBuilder tuple_builder; tuple_builder.add_slot(TSlotDescriptorBuilder() .type(TYPE_INT) .column_name("c1") .column_pos(1) .build()); tuple_builder.add_slot(TSlotDescriptorBuilder() .type(TYPE_BIGINT) .column_name("c2") .column_pos(2) .build()); tuple_builder.add_slot(TSlotDescriptorBuilder() .string_type(20) .column_name("c3") .column_pos(3) .build()); tuple_builder.build(&dtb); } auto desc_tbl = dtb.desc_tbl(); tschema.slot_descs = desc_tbl.slotDescriptors; tschema.tuple_desc = desc_tbl.tupleDescriptors[0]; } // index tschema.indexes.resize(2); tschema.indexes[0].id = NORMAL_INDEX_ID; tschema.indexes[0].columns = {"c1", "c2", "c3"}; tschema.indexes[1].id = NORMAL_INDEX_ID + 1; tschema.indexes[1].columns = {"c1", "c2", "c3"}; static_cast(schema->init(tschema)); } // copied from delta_writer_test.cpp static void create_tablet_request(int64_t tablet_id, int32_t schema_hash, TCreateTabletReq* request) { request->tablet_id = tablet_id; request->partition_id = 30001; request->__set_version(1); request->tablet_schema.schema_hash = schema_hash; request->tablet_schema.short_key_column_count = 6; request->tablet_schema.keys_type = TKeysType::AGG_KEYS; request->tablet_schema.storage_type = TStorageType::COLUMN; request->__set_storage_format(TStorageFormat::V2); TColumn k1; k1.__set_is_key(true); k1.column_type.type = TPrimitiveType::TINYINT; request->tablet_schema.columns.push_back(k1); TColumn k2; k2.column_name = "k2"; k2.__set_is_key(true); k2.column_type.type = TPrimitiveType::SMALLINT; request->tablet_schema.columns.push_back(k2); TColumn k3; k3.column_name = "k3"; k3.__set_is_key(true); k3.column_type.type = TPrimitiveType::INT; request->tablet_schema.columns.push_back(k3); TColumn k4; k4.column_name = "k4"; k4.__set_is_key(true); k4.column_type.type = TPrimitiveType::BIGINT; request->tablet_schema.columns.push_back(k4); TColumn k5; k5.column_name = "k5"; k5.__set_is_key(true); k5.column_type.type = TPrimitiveType::LARGEINT; request->tablet_schema.columns.push_back(k5); TColumn k6; k6.column_name = "k6"; k6.__set_is_key(true); k6.column_type.type = TPrimitiveType::DATE; request->tablet_schema.columns.push_back(k6); TColumn k7; k7.column_name = "k7"; k7.__set_is_key(true); k7.column_type.type = TPrimitiveType::DATETIME; request->tablet_schema.columns.push_back(k7); TColumn k8; k8.column_name = "k8"; k8.__set_is_key(true); k8.column_type.type = TPrimitiveType::CHAR; k8.column_type.__set_len(4); request->tablet_schema.columns.push_back(k8); TColumn k9; k9.column_name = "k9"; k9.__set_is_key(true); k9.column_type.type = TPrimitiveType::VARCHAR; k9.column_type.__set_len(65); request->tablet_schema.columns.push_back(k9); TColumn k10; k10.column_name = "k10"; k10.__set_is_key(true); k10.column_type.type = TPrimitiveType::DECIMALV2; k10.column_type.__set_precision(6); k10.column_type.__set_scale(3); request->tablet_schema.columns.push_back(k10); TColumn k11; k11.column_name = "k11"; k11.__set_is_key(true); k11.column_type.type = TPrimitiveType::DATEV2; request->tablet_schema.columns.push_back(k11); TColumn v1; v1.column_name = "v1"; v1.__set_is_key(false); v1.column_type.type = TPrimitiveType::TINYINT; v1.__set_aggregation_type(TAggregationType::SUM); request->tablet_schema.columns.push_back(v1); TColumn v2; v2.column_name = "v2"; v2.__set_is_key(false); v2.column_type.type = TPrimitiveType::SMALLINT; v2.__set_aggregation_type(TAggregationType::SUM); request->tablet_schema.columns.push_back(v2); TColumn v3; v3.column_name = "v3"; v3.__set_is_key(false); v3.column_type.type = TPrimitiveType::INT; v3.__set_aggregation_type(TAggregationType::SUM); request->tablet_schema.columns.push_back(v3); TColumn v4; v4.column_name = "v4"; v4.__set_is_key(false); v4.column_type.type = TPrimitiveType::BIGINT; v4.__set_aggregation_type(TAggregationType::SUM); request->tablet_schema.columns.push_back(v4); TColumn v5; v5.column_name = "v5"; v5.__set_is_key(false); v5.column_type.type = TPrimitiveType::LARGEINT; v5.__set_aggregation_type(TAggregationType::SUM); request->tablet_schema.columns.push_back(v5); TColumn v6; v6.column_name = "v6"; v6.__set_is_key(false); v6.column_type.type = TPrimitiveType::DATE; v6.__set_aggregation_type(TAggregationType::REPLACE); request->tablet_schema.columns.push_back(v6); TColumn v7; v7.column_name = "v7"; v7.__set_is_key(false); v7.column_type.type = TPrimitiveType::DATETIME; v7.__set_aggregation_type(TAggregationType::REPLACE); request->tablet_schema.columns.push_back(v7); TColumn v8; v8.column_name = "v8"; v8.__set_is_key(false); v8.column_type.type = TPrimitiveType::CHAR; v8.column_type.__set_len(4); v8.__set_aggregation_type(TAggregationType::REPLACE); request->tablet_schema.columns.push_back(v8); TColumn v9; v9.column_name = "v9"; v9.__set_is_key(false); v9.column_type.type = TPrimitiveType::VARCHAR; v9.column_type.__set_len(65); v9.__set_aggregation_type(TAggregationType::REPLACE); request->tablet_schema.columns.push_back(v9); TColumn v10; v10.column_name = "v10"; v10.__set_is_key(false); v10.column_type.type = TPrimitiveType::DECIMALV2; v10.column_type.__set_precision(6); v10.column_type.__set_scale(3); v10.__set_aggregation_type(TAggregationType::SUM); request->tablet_schema.columns.push_back(v10); TColumn v11; v11.column_name = "v11"; v11.__set_is_key(false); v11.column_type.type = TPrimitiveType::DATEV2; v11.__set_aggregation_type(TAggregationType::REPLACE); request->tablet_schema.columns.push_back(v11); } struct ResponseStat { std::atomic num; std::vector success_tablet_ids; std::vector failed_tablet_ids; }; bthread::Mutex g_stat_lock; static ResponseStat g_response_stat; void reset_response_stat() { std::lock_guard lock_guard(g_stat_lock); g_response_stat.num = 0; g_response_stat.success_tablet_ids.clear(); g_response_stat.failed_tablet_ids.clear(); } class LoadStreamMgrTest : public testing::Test { public: class Handler : public brpc::StreamInputHandler { public: int on_received_messages(StreamId id, butil::IOBuf* const messages[], size_t size) override { for (size_t i = 0; i < size; i++) { PLoadStreamResponse response; butil::IOBufAsZeroCopyInputStream wrapper(*messages[i]); response.ParseFromZeroCopyStream(&wrapper); LOG(INFO) << "response " << response.DebugString(); std::lock_guard lock_guard(g_stat_lock); for (auto& id : response.success_tablet_ids()) { g_response_stat.success_tablet_ids.push_back(id); } for (auto& tablet : response.failed_tablets()) { g_response_stat.failed_tablet_ids.push_back(tablet.id()); } g_response_stat.num++; } return 0; } void on_idle_timeout(StreamId id) override { std::cerr << "on_idle_timeout" << std::endl; } void on_closed(StreamId id) override { std::cerr << "on_closed" << std::endl; } }; class StreamService : public PBackendService { public: StreamService(LoadStreamMgr* load_stream_mgr) : _sd(brpc::INVALID_STREAM_ID), _load_stream_mgr(load_stream_mgr) {} virtual ~StreamService() { brpc::StreamClose(_sd); }; virtual void open_load_stream(google::protobuf::RpcController* controller, const POpenLoadStreamRequest* request, POpenLoadStreamResponse* response, google::protobuf::Closure* done) { brpc::ClosureGuard done_guard(done); std::unique_ptr status = std::make_unique(); brpc::Controller* cntl = static_cast(controller); brpc::StreamOptions stream_options; for (const auto& req : request->tablets()) { TabletManager* tablet_mgr = StorageEngine::instance()->tablet_manager(); TabletSharedPtr tablet = tablet_mgr->get_tablet(req.tablet_id()); if (tablet == nullptr) { cntl->SetFailed("Tablet not found"); status->set_status_code(TStatusCode::NOT_FOUND); response->set_allocated_status(status.get()); static_cast(response->release_status()); return; } auto resp = response->add_tablet_schemas(); resp->set_index_id(req.index_id()); resp->set_enable_unique_key_merge_on_write( tablet->enable_unique_key_merge_on_write()); tablet->tablet_schema()->to_schema_pb(resp->mutable_tablet_schema()); } LoadStream* load_stream; LOG(INFO) << "total streams: " << request->total_streams(); EXPECT_GT(request->total_streams(), 0); auto st = _load_stream_mgr->open_load_stream(request, load_stream); stream_options.handler = load_stream; StreamId streamid; if (brpc::StreamAccept(&streamid, *cntl, &stream_options) != 0) { cntl->SetFailed("Fail to accept stream"); status->set_status_code(TStatusCode::CANCELLED); response->set_allocated_status(status.get()); static_cast(response->release_status()); return; } status->set_status_code(TStatusCode::OK); response->set_allocated_status(status.get()); static_cast(response->release_status()); } private: Handler _receiver; brpc::StreamId _sd; LoadStreamMgr* _load_stream_mgr = nullptr; }; class MockSinkClient { public: MockSinkClient() = default; ~MockSinkClient() { disconnect(); } class MockClosure : public google::protobuf::Closure { public: MockClosure(std::function cb) : _cb(cb) {} void Run() override { _cb(); delete this; } private: std::function _cb; }; Status connect_stream(int64_t sender_id = NORMAL_SENDER_ID, int total_streams = 1) { brpc::Channel channel; std::cerr << "connect_stream" << std::endl; // Initialize the channel, NULL means using default options. brpc::ChannelOptions options; options.protocol = brpc::PROTOCOL_BAIDU_STD; options.connection_type = "single"; options.timeout_ms = 10000 /*milliseconds*/; options.max_retry = 3; CHECK_EQ(0, channel.Init("127.0.0.1:18947", nullptr)); // Normally, you should not call a Channel directly, but instead construct // a stub Service wrapping it. stub can be shared by all threads as well. PBackendService_Stub stub(&channel); _stream_options.handler = &_handler; if (brpc::StreamCreate(&_stream, _cntl, &_stream_options) != 0) { LOG(ERROR) << "Fail to create stream"; return Status::InternalError("Fail to create stream"); } POpenLoadStreamRequest request; POpenLoadStreamResponse response; PUniqueId id; id.set_hi(1); id.set_lo(1); auto param = std::make_shared(); construct_schema(param.get()); *request.mutable_schema() = *param->to_protobuf(); *request.mutable_load_id() = id; request.set_txn_id(NORMAL_TXN_ID); request.set_src_id(sender_id); request.set_total_streams(total_streams); auto ptablet = request.add_tablets(); ptablet->set_tablet_id(NORMAL_TABLET_ID); ptablet->set_index_id(NORMAL_INDEX_ID); stub.open_load_stream(&_cntl, &request, &response, nullptr); if (_cntl.Failed()) { std::cerr << "open_load_stream failed" << std::endl; LOG(ERROR) << "Fail to open load stream " << _cntl.ErrorText(); return Status::InternalError("Fail to open load stream"); } return Status::OK(); } void disconnect() const { std::cerr << "disconnect" << std::endl; CHECK_EQ(0, brpc::StreamClose(_stream)); } Status send(butil::IOBuf* buf) { int ret = brpc::StreamWrite(_stream, *buf); if (ret != 0) { LOG(ERROR) << "Fail to write stream"; return Status::InternalError("Fail to write stream"); } LOG(INFO) << "sent by stream successfully" << std::endl; return Status::OK(); } Status close() { return Status::OK(); } private: brpc::StreamId _stream; brpc::Controller _cntl; brpc::StreamOptions _stream_options; Handler _handler; }; LoadStreamMgrTest() : _heavy_work_pool(4, 32, "load_stream_test_heavy"), _light_work_pool(4, 32, "load_stream_test_light") {} void close_load(MockSinkClient& client, const std::vector& tablets_to_commit = {}, uint32_t sender_id = NORMAL_SENDER_ID) { butil::IOBuf append_buf; PStreamHeader header; header.mutable_load_id()->set_hi(1); header.mutable_load_id()->set_lo(1); header.set_opcode(PStreamHeader::CLOSE_LOAD); header.set_src_id(sender_id); for (const auto& tablet : tablets_to_commit) { *header.add_tablets() = tablet; } size_t hdr_len = header.ByteSizeLong(); append_buf.append((char*)&hdr_len, sizeof(size_t)); append_buf.append(header.SerializeAsString()); static_cast(client.send(&append_buf)); } void write_one_tablet(MockSinkClient& client, UniqueId load_id, uint32_t sender_id, int64_t index_id, int64_t tablet_id, uint32_t segid, uint64_t offset, std::string& data, bool segment_eos) { // append data butil::IOBuf append_buf; PStreamHeader header; header.set_opcode(PStreamHeader::APPEND_DATA); header.mutable_load_id()->set_hi(load_id.hi); header.mutable_load_id()->set_lo(load_id.lo); header.set_index_id(index_id); header.set_tablet_id(tablet_id); header.set_segment_id(segid); header.set_segment_eos(segment_eos); header.set_src_id(sender_id); header.set_partition_id(NORMAL_PARTITION_ID); header.set_offset(offset); size_t hdr_len = header.ByteSizeLong(); append_buf.append((char*)&hdr_len, sizeof(size_t)); append_buf.append(header.SerializeAsString()); size_t data_len = data.length(); append_buf.append((char*)&data_len, sizeof(size_t)); append_buf.append(data); LOG(INFO) << "send " << header.DebugString() << data; static_cast(client.send(&append_buf)); } void write_normal(MockSinkClient& client) { write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID, NORMAL_TABLET_ID, 0, 0, NORMAL_STRING, true); } void write_abnormal_load(MockSinkClient& client) { write_one_tablet(client, ABNORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID, NORMAL_TABLET_ID, 0, 0, ABNORMAL_STRING, true); } void write_abnormal_index(MockSinkClient& client) { write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, ABNORMAL_INDEX_ID, NORMAL_TABLET_ID, 0, 0, ABNORMAL_STRING, true); } void write_abnormal_sender(MockSinkClient& client) { write_one_tablet(client, NORMAL_LOAD_ID, ABNORMAL_SENDER_ID, NORMAL_INDEX_ID, NORMAL_TABLET_ID, 0, 0, ABNORMAL_STRING, true); } void write_abnormal_tablet(MockSinkClient& client) { write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID, ABNORMAL_TABLET_ID, 0, 0, ABNORMAL_STRING, true); } void wait_for_ack(int32_t num) { for (int i = 0; i < 1000 && g_response_stat.num < num; i++) { LOG(INFO) << "waiting ack, current " << g_response_stat.num << ", expected " << num; bthread_usleep(1000); } } void wait_for_close() { for (int i = 0; i < 3000 && _load_stream_mgr->get_load_stream_num() != 0; i++) { bthread_usleep(1000); } } void SetUp() override { _server = new brpc::Server(); srand(time(nullptr)); char buffer[MAX_PATH_LEN]; EXPECT_NE(getcwd(buffer, MAX_PATH_LEN), nullptr); config::storage_root_path = std::string(buffer) + "/data_test"; auto st = io::global_local_filesystem()->delete_directory(config::storage_root_path); ASSERT_TRUE(st.ok()) << st; st = io::global_local_filesystem()->create_directory(config::storage_root_path); ASSERT_TRUE(st.ok()) << st; std::vector paths; paths.emplace_back(config::storage_root_path, -1); doris::EngineOptions options; options.store_paths = paths; k_engine = std::make_unique(options); Status s = k_engine->open(); EXPECT_TRUE(s.ok()) << s.to_string(); doris::ExecEnv::GetInstance()->set_storage_engine(k_engine.get()); EXPECT_TRUE(io::global_local_filesystem()->create_directory(zTestDir).ok()); static_cast(k_engine->start_bg_threads()); _load_stream_mgr = std::make_unique(4); _load_stream_mgr->set_heavy_work_pool(&_heavy_work_pool); _load_stream_mgr->set_light_work_pool(&_light_work_pool); _stream_service = new StreamService(_load_stream_mgr.get()); CHECK_EQ(0, _server->AddService(_stream_service, brpc::SERVER_OWNS_SERVICE)); brpc::ServerOptions server_options; server_options.idle_timeout_sec = 300; { debug::ScopedLeakCheckDisabler disable_lsan; CHECK_EQ(0, _server->Start("127.0.0.1:18947", &server_options)); } for (int i = 0; i < 3; i++) { TCreateTabletReq request; create_tablet_request(NORMAL_TABLET_ID + i, SCHEMA_HASH, &request); auto profile = std::make_unique("test"); Status res = k_engine->create_tablet(request, profile.get()); EXPECT_EQ(Status::OK(), res); } } void TearDown() override { _server->Stop(0); CHECK_EQ(0, _server->Join()); SAFE_DELETE(_server); k_engine.reset(); doris::ExecEnv::GetInstance()->set_storage_engine(nullptr); } std::string read_data(int64_t txn_id, int64_t partition_id, int64_t tablet_id, uint32_t segid) { auto tablet = k_engine->tablet_manager()->get_tablet(tablet_id); std::map tablet_related_rs; k_engine->txn_manager()->get_txn_related_tablets(txn_id, partition_id, &tablet_related_rs); LOG(INFO) << "get txn related tablet, txn_id=" << txn_id << ", tablet_id=" << tablet_id << "partition_id=" << partition_id; for (auto& [tablet, rowset] : tablet_related_rs) { if (tablet.tablet_id != tablet_id || rowset == nullptr) { continue; } auto path = static_cast(rowset.get())->segment_file_path(segid); LOG(INFO) << "read data from " << path; std::ifstream inputFile(path, std::ios::binary); inputFile.seekg(0, std::ios::end); std::streampos file_size = inputFile.tellg(); inputFile.seekg(0, std::ios::beg); // Read the file contents std::unique_ptr buffer = std::make_unique(file_size); inputFile.read(buffer.get(), file_size); return std::string(buffer.get(), file_size); } return std::string(); } ExecEnv* _env; brpc::Server* _server; StreamService* _stream_service; FifoThreadPool _heavy_work_pool; FifoThreadPool _light_work_pool; std::unique_ptr _load_stream_mgr; }; // // one client TEST_F(LoadStreamMgrTest, one_client_normal) { MockSinkClient client; auto st = client.connect_stream(); EXPECT_TRUE(st.ok()); write_normal(client); reset_response_stat(); PTabletID tablet; tablet.set_partition_id(NORMAL_PARTITION_ID); tablet.set_index_id(NORMAL_INDEX_ID); tablet.set_tablet_id(NORMAL_TABLET_ID); tablet.set_num_segments(1); close_load(client, {tablet}, ABNORMAL_SENDER_ID); wait_for_ack(1); EXPECT_EQ(g_response_stat.num, 1); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0); EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 1); close_load(client, {tablet}); wait_for_ack(2); EXPECT_EQ(g_response_stat.num, 2); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 1); EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0); EXPECT_EQ(g_response_stat.success_tablet_ids[0], NORMAL_TABLET_ID); // server will close stream on CLOSE_LOAD wait_for_close(); EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0); } TEST_F(LoadStreamMgrTest, one_client_abnormal_load) { MockSinkClient client; auto st = client.connect_stream(); EXPECT_TRUE(st.ok()); reset_response_stat(); write_abnormal_load(client); wait_for_ack(1); EXPECT_EQ(g_response_stat.num, 1); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1); EXPECT_EQ(g_response_stat.failed_tablet_ids[0], NORMAL_TABLET_ID); EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 1); close_load(client); wait_for_ack(2); EXPECT_EQ(g_response_stat.num, 2); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1); EXPECT_EQ(g_response_stat.failed_tablet_ids[0], NORMAL_TABLET_ID); // server will close stream on CLOSE_LOAD wait_for_close(); EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0); } TEST_F(LoadStreamMgrTest, one_client_abnormal_index) { MockSinkClient client; auto st = client.connect_stream(); EXPECT_TRUE(st.ok()); reset_response_stat(); write_abnormal_index(client); wait_for_ack(1); EXPECT_EQ(g_response_stat.num, 1); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1); EXPECT_EQ(g_response_stat.failed_tablet_ids[0], NORMAL_TABLET_ID); PTabletID tablet; tablet.set_partition_id(NORMAL_PARTITION_ID); tablet.set_index_id(ABNORMAL_INDEX_ID); tablet.set_tablet_id(NORMAL_TABLET_ID); tablet.set_num_segments(1); close_load(client, {tablet}, 1); wait_for_ack(2); EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 1); EXPECT_EQ(g_response_stat.num, 2); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1); close_load(client, {tablet}, 0); wait_for_ack(3); EXPECT_EQ(g_response_stat.num, 3); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1); // server will close stream on CLOSE_LOAD wait_for_close(); EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0); } TEST_F(LoadStreamMgrTest, one_client_abnormal_sender) { MockSinkClient client; auto st = client.connect_stream(); EXPECT_TRUE(st.ok()); reset_response_stat(); write_abnormal_sender(client); wait_for_ack(1); EXPECT_EQ(g_response_stat.num, 1); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1); EXPECT_EQ(g_response_stat.failed_tablet_ids[0], NORMAL_TABLET_ID); PTabletID tablet; tablet.set_partition_id(NORMAL_PARTITION_ID); tablet.set_index_id(NORMAL_INDEX_ID); tablet.set_tablet_id(NORMAL_TABLET_ID); tablet.set_num_segments(1); close_load(client, {tablet}, 1); wait_for_ack(2); EXPECT_EQ(g_response_stat.num, 2); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1); // on the final close_load, segment num check will fail close_load(client, {tablet}, 0); wait_for_ack(3); EXPECT_EQ(g_response_stat.num, 3); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 2); // server will close stream on CLOSE_LOAD wait_for_close(); EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0); } TEST_F(LoadStreamMgrTest, one_client_abnormal_tablet) { MockSinkClient client; auto st = client.connect_stream(); EXPECT_TRUE(st.ok()); reset_response_stat(); write_abnormal_tablet(client); wait_for_ack(1); EXPECT_EQ(g_response_stat.num, 1); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1); EXPECT_EQ(g_response_stat.failed_tablet_ids[0], ABNORMAL_TABLET_ID); PTabletID tablet; tablet.set_partition_id(NORMAL_PARTITION_ID); tablet.set_index_id(NORMAL_INDEX_ID); tablet.set_tablet_id(ABNORMAL_TABLET_ID); tablet.set_num_segments(1); close_load(client, {tablet}, 1); wait_for_ack(2); EXPECT_EQ(g_response_stat.num, 2); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1); close_load(client, {tablet}, 0); wait_for_ack(3); EXPECT_EQ(g_response_stat.num, 3); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 2); EXPECT_EQ(g_response_stat.failed_tablet_ids[1], ABNORMAL_TABLET_ID); // server will close stream on CLOSE_LOAD wait_for_close(); EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0); } TEST_F(LoadStreamMgrTest, one_client_one_index_one_tablet_single_segment0_zero_bytes) { MockSinkClient client; auto st = client.connect_stream(); EXPECT_TRUE(st.ok()); reset_response_stat(); // append data butil::IOBuf append_buf; PStreamHeader header; std::string data; write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID, NORMAL_TABLET_ID, 0, 0, data, true); EXPECT_EQ(g_response_stat.num, 0); PTabletID tablet; tablet.set_partition_id(NORMAL_PARTITION_ID); tablet.set_index_id(NORMAL_INDEX_ID); tablet.set_tablet_id(NORMAL_TABLET_ID); tablet.set_num_segments(1); // CLOSE_LOAD close_load(client, {tablet}, 1); wait_for_ack(1); EXPECT_EQ(g_response_stat.num, 1); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0); // duplicated close close_load(client, {tablet}, 1); wait_for_ack(2); EXPECT_EQ(g_response_stat.num, 2); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0); close_load(client, {tablet}, 0); wait_for_ack(3); EXPECT_EQ(g_response_stat.num, 3); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1); EXPECT_EQ(g_response_stat.failed_tablet_ids[0], NORMAL_TABLET_ID); // server will close stream on CLOSE_LOAD wait_for_close(); EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0); } TEST_F(LoadStreamMgrTest, close_load_before_recv_eos) { MockSinkClient client; auto st = client.connect_stream(); EXPECT_TRUE(st.ok()); reset_response_stat(); // append data butil::IOBuf append_buf; PStreamHeader header; std::string data = "file1 hello world 123 !@#$%^&*()_+"; write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID, NORMAL_TABLET_ID, 0, data.length(), data, false); EXPECT_EQ(g_response_stat.num, 0); PTabletID tablet; tablet.set_partition_id(NORMAL_PARTITION_ID); tablet.set_index_id(NORMAL_INDEX_ID); tablet.set_tablet_id(NORMAL_TABLET_ID); tablet.set_num_segments(1); // CLOSE_LOAD before EOS close_load(client, {tablet}); wait_for_ack(1); EXPECT_EQ(g_response_stat.num, 1); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1); // server will close stream on CLOSE_LOAD wait_for_close(); EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0); // then the late EOS, will not be handled write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID, NORMAL_TABLET_ID, 0, data.length(), data, true); // duplicated close, will not be handled close_load(client, {tablet}); wait_for_ack(2); EXPECT_EQ(g_response_stat.num, 1); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1); auto written_data = read_data(NORMAL_TXN_ID, NORMAL_PARTITION_ID, NORMAL_TABLET_ID, 0); EXPECT_EQ(written_data, ""); } TEST_F(LoadStreamMgrTest, one_client_one_index_one_tablet_single_segment0) { MockSinkClient client; auto st = client.connect_stream(); EXPECT_TRUE(st.ok()); reset_response_stat(); // append data butil::IOBuf append_buf; PStreamHeader header; std::string data = "file1 hello world 123 !@#$%^&*()_+"; write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID, NORMAL_TABLET_ID, 0, 0, data, false); write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID, NORMAL_TABLET_ID, 0, data.length(), data, true); EXPECT_EQ(g_response_stat.num, 0); PTabletID tablet; tablet.set_partition_id(NORMAL_PARTITION_ID); tablet.set_index_id(NORMAL_INDEX_ID); tablet.set_tablet_id(NORMAL_TABLET_ID); tablet.set_num_segments(1); // CLOSE_LOAD close_load(client, {tablet}, 1); wait_for_ack(1); EXPECT_EQ(g_response_stat.num, 1); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0); // duplicated close close_load(client, {tablet}, 1); wait_for_ack(2); EXPECT_EQ(g_response_stat.num, 2); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0); close_load(client, {tablet}, 0); wait_for_ack(3); EXPECT_EQ(g_response_stat.num, 3); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 1); EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0); EXPECT_EQ(g_response_stat.success_tablet_ids[0], NORMAL_TABLET_ID); auto written_data = read_data(NORMAL_TXN_ID, NORMAL_PARTITION_ID, NORMAL_TABLET_ID, 0); EXPECT_EQ(written_data, data + data); // server will close stream on CLOSE_LOAD wait_for_close(); EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0); } TEST_F(LoadStreamMgrTest, one_client_one_index_one_tablet_single_segment_without_eos) { MockSinkClient client; auto st = client.connect_stream(); EXPECT_TRUE(st.ok()); reset_response_stat(); // append data butil::IOBuf append_buf; PStreamHeader header; std::string data = "file1 hello world 123 !@#$%^&*()_+"; write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID, NORMAL_TABLET_ID, 0, 0, data, false); EXPECT_EQ(g_response_stat.num, 0); PTabletID tablet; tablet.set_partition_id(NORMAL_PARTITION_ID); tablet.set_index_id(NORMAL_INDEX_ID); tablet.set_tablet_id(NORMAL_TABLET_ID); tablet.set_num_segments(1); // CLOSE_LOAD close_load(client, {tablet}, 1); wait_for_ack(1); EXPECT_EQ(g_response_stat.num, 1); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0); // duplicated close close_load(client, {tablet}, 1); wait_for_ack(2); EXPECT_EQ(g_response_stat.num, 2); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0); close_load(client, {tablet}, 0); wait_for_ack(3); EXPECT_EQ(g_response_stat.num, 3); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1); EXPECT_EQ(g_response_stat.failed_tablet_ids[0], NORMAL_TABLET_ID); // server will close stream on CLOSE_LOAD wait_for_close(); EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0); } TEST_F(LoadStreamMgrTest, one_client_one_index_one_tablet_single_segment1) { MockSinkClient client; auto st = client.connect_stream(); EXPECT_TRUE(st.ok()); reset_response_stat(); // append data butil::IOBuf append_buf; PStreamHeader header; std::string data = "file1 hello world 123 !@#$%^&*()_+"; write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID, NORMAL_TABLET_ID, 1, 0, data, false); write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID, NORMAL_TABLET_ID, 1, data.length(), data, true); EXPECT_EQ(g_response_stat.num, 0); PTabletID tablet; tablet.set_partition_id(NORMAL_PARTITION_ID); tablet.set_index_id(NORMAL_INDEX_ID); tablet.set_tablet_id(NORMAL_TABLET_ID); tablet.set_num_segments(1); // CLOSE_LOAD close_load(client, {tablet}, 1); wait_for_ack(1); EXPECT_EQ(g_response_stat.num, 1); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0); // duplicated close close_load(client, {tablet}, 1); wait_for_ack(2); EXPECT_EQ(g_response_stat.num, 2); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0); close_load(client, {tablet}, 0); wait_for_ack(3); EXPECT_EQ(g_response_stat.num, 3); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1); EXPECT_EQ(g_response_stat.failed_tablet_ids[0], NORMAL_TABLET_ID); // server will close stream on CLOSE_LOAD wait_for_close(); EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0); } TEST_F(LoadStreamMgrTest, one_client_one_index_one_tablet_two_segment) { MockSinkClient client; auto st = client.connect_stream(); EXPECT_TRUE(st.ok()); reset_response_stat(); // append data butil::IOBuf append_buf; PStreamHeader header; std::string data1 = "file1 hello world 123 !@#$%^&*()_+1"; write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID, NORMAL_TABLET_ID, 0, 0, data1, false); std::string empty; write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID, NORMAL_TABLET_ID, 0, data1.length(), empty, true); std::string data2 = "file1 hello world 123 !@#$%^&*()_+2"; write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID, NORMAL_TABLET_ID, 1, 0, data2, true); EXPECT_EQ(g_response_stat.num, 0); PTabletID tablet; tablet.set_partition_id(NORMAL_PARTITION_ID); tablet.set_index_id(NORMAL_INDEX_ID); tablet.set_tablet_id(NORMAL_TABLET_ID); tablet.set_num_segments(2); // CLOSE_LOAD close_load(client, {tablet}, 1); wait_for_ack(1); EXPECT_EQ(g_response_stat.num, 1); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0); // duplicated close close_load(client, {tablet}, 1); wait_for_ack(2); EXPECT_EQ(g_response_stat.num, 2); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0); close_load(client, {tablet}, 0); wait_for_ack(3); EXPECT_EQ(g_response_stat.num, 3); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 1); EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0); EXPECT_EQ(g_response_stat.success_tablet_ids[0], NORMAL_TABLET_ID); auto written_data = read_data(NORMAL_TXN_ID, NORMAL_PARTITION_ID, NORMAL_TABLET_ID, 0); EXPECT_EQ(written_data, data1); written_data = read_data(NORMAL_TXN_ID, NORMAL_PARTITION_ID, NORMAL_TABLET_ID, 1); EXPECT_EQ(written_data, data2); // server will close stream on CLOSE_LOAD wait_for_close(); EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0); } TEST_F(LoadStreamMgrTest, one_client_one_index_three_tablet) { MockSinkClient client; auto st = client.connect_stream(); EXPECT_TRUE(st.ok()); reset_response_stat(); // append data butil::IOBuf append_buf; PStreamHeader header; std::string data1 = "file1 hello world 123 !@#$%^&*()_+1"; write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID, NORMAL_TABLET_ID + 0, 0, 0, data1, true); write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID, NORMAL_TABLET_ID + 1, 0, 0, data1, true); std::string data2 = "file1 hello world 123 !@#$%^&*()_+2"; write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID, NORMAL_TABLET_ID + 2, 0, 0, data2, true); EXPECT_EQ(g_response_stat.num, 0); PTabletID tablet1; tablet1.set_partition_id(NORMAL_PARTITION_ID); tablet1.set_index_id(NORMAL_INDEX_ID); tablet1.set_tablet_id(NORMAL_TABLET_ID); tablet1.set_num_segments(1); PTabletID tablet2 {tablet1}; tablet2.set_tablet_id(NORMAL_TABLET_ID + 1); PTabletID tablet3 {tablet1}; tablet3.set_tablet_id(NORMAL_TABLET_ID + 2); // CLOSE_LOAD close_load(client, {tablet1, tablet2, tablet3}, 1); wait_for_ack(1); EXPECT_EQ(g_response_stat.num, 1); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0); // duplicated close close_load(client, {tablet1, tablet2, tablet3}, 1); wait_for_ack(2); EXPECT_EQ(g_response_stat.num, 2); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0); close_load(client, {tablet1, tablet2, tablet3}, 0); wait_for_ack(3); EXPECT_EQ(g_response_stat.num, 3); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 3); EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0); std::set success_tablet_ids; for (auto& id : g_response_stat.success_tablet_ids) { success_tablet_ids.insert(id); } EXPECT_TRUE(success_tablet_ids.count(NORMAL_TABLET_ID)); EXPECT_TRUE(success_tablet_ids.count(NORMAL_TABLET_ID + 1)); EXPECT_TRUE(success_tablet_ids.count(NORMAL_TABLET_ID + 2)); auto written_data = read_data(NORMAL_TXN_ID, NORMAL_PARTITION_ID, NORMAL_TABLET_ID, 0); EXPECT_EQ(written_data, data1); written_data = read_data(NORMAL_TXN_ID, NORMAL_PARTITION_ID, NORMAL_TABLET_ID + 1, 0); EXPECT_EQ(written_data, data1); written_data = read_data(NORMAL_TXN_ID, NORMAL_PARTITION_ID, NORMAL_TABLET_ID + 2, 0); EXPECT_EQ(written_data, data2); // server will close stream on CLOSE_LOAD wait_for_close(); EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0); } TEST_F(LoadStreamMgrTest, two_client_one_index_one_tablet_three_segment) { MockSinkClient clients[2]; for (int i = 0; i < 2; i++) { auto st = clients[i].connect_stream(NORMAL_SENDER_ID + i, 2); EXPECT_TRUE(st.ok()); } reset_response_stat(); std::vector segment_data; segment_data.resize(6); // each sender three segments for (int32_t segid = 2; segid >= 0; segid--) { // append data for (int i = 0; i < 2; i++) { butil::IOBuf append_buf; PStreamHeader header; std::string data1 = "sender_id=" + std::to_string(i) + ",segid=" + std::to_string(segid); write_one_tablet(clients[i], NORMAL_LOAD_ID, NORMAL_SENDER_ID + i, NORMAL_INDEX_ID, NORMAL_TABLET_ID, segid, 0, data1, true); segment_data[i * 3 + segid] = data1; LOG(INFO) << "segment_data[" << i * 3 + segid << "]" << data1; } } EXPECT_EQ(g_response_stat.num, 0); PTabletID tablet; tablet.set_partition_id(NORMAL_PARTITION_ID); tablet.set_index_id(NORMAL_INDEX_ID); tablet.set_tablet_id(NORMAL_TABLET_ID); tablet.set_num_segments(3); // CLOSE_LOAD close_load(clients[1], {tablet}, 1); wait_for_ack(1); EXPECT_EQ(g_response_stat.num, 1); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0); // duplicated close close_load(clients[1], {tablet}, 1); wait_for_ack(2); // stream closed, no response will be sent EXPECT_EQ(g_response_stat.num, 1); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0); close_load(clients[0], {tablet}, 0); wait_for_ack(2); EXPECT_EQ(g_response_stat.num, 2); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 1); EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0); EXPECT_EQ(g_response_stat.success_tablet_ids[0], NORMAL_TABLET_ID); auto written_data = read_data(NORMAL_TXN_ID, NORMAL_PARTITION_ID, NORMAL_TABLET_ID, 0); size_t sender_pos = written_data.find('='); size_t sender_end = written_data.find(','); EXPECT_NE(sender_pos, std::string::npos); EXPECT_NE(sender_end, std::string::npos); auto sender_str = written_data.substr(sender_pos + 1, sender_end - sender_pos); LOG(INFO) << "sender_str " << sender_str; uint32_t sender_id = std::stoi(sender_str); for (int i = 0; i < 3; i++) { auto written_data = read_data(NORMAL_TXN_ID, NORMAL_PARTITION_ID, NORMAL_TABLET_ID, i); EXPECT_EQ(written_data, segment_data[sender_id * 3 + i]); } sender_id = (sender_id + 1) % 2; for (int i = 0; i < 3; i++) { auto written_data = read_data(NORMAL_TXN_ID, NORMAL_PARTITION_ID, NORMAL_TABLET_ID, i + 3); EXPECT_EQ(written_data, segment_data[sender_id * 3 + i]); } // server will close stream on CLOSE_LOAD wait_for_close(); EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0); } TEST_F(LoadStreamMgrTest, two_client_one_close_before_the_other_open) { MockSinkClient clients[2]; EXPECT_TRUE(clients[0].connect_stream(NORMAL_SENDER_ID, 2).ok()); reset_response_stat(); std::vector segment_data; segment_data.resize(6); for (int32_t segid = 2; segid >= 0; segid--) { for (int i = 0; i < 2; i++) { std::string data = "sender_id=" + std::to_string(i) + ",segid=" + std::to_string(segid); segment_data[i * 3 + segid] = data; LOG(INFO) << "segment_data[" << i * 3 + segid << "]" << data; } } for (int32_t segid = 2; segid >= 0; segid--) { int i = 0; write_one_tablet(clients[i], NORMAL_LOAD_ID, NORMAL_SENDER_ID + i, NORMAL_INDEX_ID, NORMAL_TABLET_ID, segid, 0, segment_data[i * 3 + segid], true); } EXPECT_EQ(g_response_stat.num, 0); PTabletID tablet; tablet.set_partition_id(NORMAL_PARTITION_ID); tablet.set_index_id(NORMAL_INDEX_ID); tablet.set_tablet_id(NORMAL_TABLET_ID); tablet.set_num_segments(3); // CLOSE_LOAD close_load(clients[0], {tablet}, 0); wait_for_ack(1); EXPECT_EQ(g_response_stat.num, 1); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0); // sender 0 closed, before open sender 1, load stream should still be open EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 1); EXPECT_TRUE(clients[1].connect_stream(NORMAL_SENDER_ID + 1, 2).ok()); for (int32_t segid = 2; segid >= 0; segid--) { int i = 1; write_one_tablet(clients[i], NORMAL_LOAD_ID, NORMAL_SENDER_ID + i, NORMAL_INDEX_ID, NORMAL_TABLET_ID, segid, 0, segment_data[i * 3 + segid], true); } close_load(clients[1], {tablet}, 1); wait_for_ack(2); EXPECT_EQ(g_response_stat.num, 2); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 1); EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0); EXPECT_EQ(g_response_stat.success_tablet_ids[0], NORMAL_TABLET_ID); // server will close stream on CLOSE_LOAD wait_for_close(); EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0); auto written_data = read_data(NORMAL_TXN_ID, NORMAL_PARTITION_ID, NORMAL_TABLET_ID, 0); size_t sender_pos = written_data.find('='); size_t sender_end = written_data.find(','); EXPECT_NE(sender_pos, std::string::npos); EXPECT_NE(sender_end, std::string::npos); auto sender_str = written_data.substr(sender_pos + 1, sender_end - sender_pos); LOG(INFO) << "sender_str " << sender_str; uint32_t sender_id = std::stoi(sender_str); for (int i = 0; i < 3; i++) { auto written_data = read_data(NORMAL_TXN_ID, NORMAL_PARTITION_ID, NORMAL_TABLET_ID, i); EXPECT_EQ(written_data, segment_data[sender_id * 3 + i]); } sender_id = (sender_id + 1) % 2; for (int i = 0; i < 3; i++) { auto written_data = read_data(NORMAL_TXN_ID, NORMAL_PARTITION_ID, NORMAL_TABLET_ID, i + 3); EXPECT_EQ(written_data, segment_data[sender_id * 3 + i]); } } } // namespace doris