From 04830af0398be5ebd09c8ae86333068a879e702c Mon Sep 17 00:00:00 2001 From: TengJianPing <18241664+jacktengg@users.noreply.github.com> Date: Sat, 5 Nov 2022 10:19:51 +0800 Subject: [PATCH] [fix](tablet sink) fallback to non-vectorized interface in tablet_sink if is in progress of upgrding from 1.1-lts to 1.2-lts (#13966) --- be/src/exec/tablet_sink.cpp | 46 +++- be/src/exec/tablet_sink.h | 4 +- be/src/vec/sink/vtablet_sink.cpp | 174 +++++++++----- be/src/vec/sink/vtablet_sink.h | 4 + be/test/vec/exec/vtablet_sink_test.cpp | 318 ++++++++++++++----------- 5 files changed, 341 insertions(+), 205 deletions(-) diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp index b9108998f2..762feacb15 100644 --- a/be/src/exec/tablet_sink.cpp +++ b/be/src/exec/tablet_sink.cpp @@ -338,6 +338,49 @@ Status NodeChannel::add_row(Tuple* input_tuple, int64_t tablet_id) { return Status::OK(); } +// Used for vectorized engine. +// TODO(cmy): deprecated, need refactor +Status NodeChannel::add_row(const BlockRow& block_row, int64_t tablet_id) { + SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get()); + // If add_row() when _eos_is_produced==true, there must be sth wrong, we can only mark this channel as failed. + auto st = none_of({_cancelled, _eos_is_produced}); + if (!st.ok()) { + if (_cancelled) { + std::lock_guard l(_cancel_msg_lock); + return Status::InternalError("add row failed. " + _cancel_msg); + } else { + return std::move(st.prepend("already stopped, can't add row. cancelled/eos: ")); + } + } + + constexpr size_t BATCH_SIZE_FOR_SEND = 2 * 1024 * 1024; //2M + auto row_no = _cur_batch->add_row(); + if (row_no == RowBatch::INVALID_ROW_INDEX || + _cur_batch->tuple_data_pool()->total_allocated_bytes() > BATCH_SIZE_FOR_SEND) { + { + SCOPED_ATOMIC_TIMER(&_queue_push_lock_ns); + std::lock_guard l(_pending_batches_lock); + _pending_batches_bytes += _cur_batch->tuple_data_pool()->total_reserved_bytes(); + //To simplify the add_row logic, postpone adding batch into req until the time of sending req + _pending_batches.emplace(std::move(_cur_batch), _cur_add_batch_request); + _pending_batches_num++; + } + + _cur_batch.reset(new RowBatch(*_row_desc, _batch_size)); + _cur_add_batch_request.clear_tablet_ids(); + + row_no = _cur_batch->add_row(); + } + DCHECK_NE(row_no, RowBatch::INVALID_ROW_INDEX); + + _cur_batch->get_row(row_no)->set_tuple( + 0, block_row.first->deep_copy_tuple(*_tuple_desc, _cur_batch->tuple_data_pool(), + block_row.second, 0, true)); + _cur_batch->commit_last_row(); + _cur_add_batch_request.add_tablet_ids(tablet_id); + return Status::OK(); +} + void NodeChannel::mark_close() { SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get()); auto st = none_of({_cancelled, _eos_is_produced}); @@ -883,6 +926,7 @@ Status OlapTableSink::prepare(RuntimeState* state) { _load_mem_limit = state->get_load_mem_limit(); // open all channels + bool use_vec = _is_vectorized && state->be_exec_version() > 0; const auto& partitions = _partition->get_partitions(); for (int i = 0; i < _schema->indexes().size(); ++i) { // collect all tablets belong to this rollup @@ -896,7 +940,7 @@ Status OlapTableSink::prepare(RuntimeState* state) { tablets.emplace_back(std::move(tablet_with_partition)); } } - _channels.emplace_back(new IndexChannel(this, index->index_id, _is_vectorized)); + _channels.emplace_back(new IndexChannel(this, index->index_id, use_vec)); RETURN_IF_ERROR(_channels.back()->init(state, tablets)); } diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h index 3e8d66fdbd..1ba4b46a60 100644 --- a/be/src/exec/tablet_sink.h +++ b/be/src/exec/tablet_sink.h @@ -187,6 +187,8 @@ public: Status add_row(Tuple* tuple, int64_t tablet_id); + Status add_row(const BlockRow& block_row, int64_t tablet_id); + virtual Status add_block(vectorized::Block* block, const std::pair, std::vector>& payload) { @@ -236,7 +238,7 @@ public: void clear_all_batches(); - virtual void clear_all_blocks() { LOG(FATAL) << "NodeChannel::clear_all_blocks not supported"; } + virtual void clear_all_blocks() {} std::string channel_info() const { return fmt::format("{}, {}, node={}:{}", _name, _load_info, _node_info.host, diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp index e19bf5499c..126e65cf85 100644 --- a/be/src/vec/sink/vtablet_sink.cpp +++ b/be/src/vec/sink/vtablet_sink.cpp @@ -446,6 +446,46 @@ size_t VOlapTableSink::get_pending_bytes() const { return mem_consumption; } +Status VOlapTableSink::find_tablet(RuntimeState* state, vectorized::Block* block, int row_index, + const VOlapTablePartition** partition, uint32_t& tablet_index, + bool& stop_processing, bool& is_continue) { + Status status = Status::OK(); + *partition = nullptr; + tablet_index = 0; + BlockRow block_row; + block_row = {block, row_index}; + if (!_vpartition->find_partition(&block_row, partition)) { + RETURN_IF_ERROR(state->append_error_msg_to_file( + []() -> std::string { return ""; }, + [&]() -> std::string { + fmt::memory_buffer buf; + fmt::format_to(buf, "no partition for this tuple. tuple={}", + block->dump_data(row_index, 1)); + return fmt::to_string(buf); + }, + &stop_processing)); + _number_filtered_rows++; + if (stop_processing) { + return Status::EndOfFile("Encountered unqualified data, stop processing"); + } + is_continue = true; + return status; + } + _partition_ids.emplace((*partition)->id); + if (findTabletMode != FindTabletMode::FIND_TABLET_EVERY_ROW) { + if (_partition_to_tablet_map.find((*partition)->id) == _partition_to_tablet_map.end()) { + tablet_index = _vpartition->find_tablet(&block_row, **partition); + _partition_to_tablet_map.emplace((*partition)->id, tablet_index); + } else { + tablet_index = _partition_to_tablet_map[(*partition)->id]; + } + } else { + tablet_index = _vpartition->find_tablet(&block_row, **partition); + } + + return status; +} + Status VOlapTableSink::send(RuntimeState* state, vectorized::Block* input_block) { INIT_AND_SCOPE_SEND_SPAN(state->get_tracer(), _send_span, "VOlapTableSink::send"); SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); @@ -493,7 +533,6 @@ Status VOlapTableSink::send(RuntimeState* state, vectorized::Block* input_block) _convert_to_dest_desc_block(&block); } - BlockRow block_row; SCOPED_RAW_TIMER(&_send_data_ns); // This is just for passing compilation. bool stop_processing = false; @@ -501,76 +540,85 @@ Status VOlapTableSink::send(RuntimeState* state, vectorized::Block* input_block) _partition_to_tablet_map.clear(); } - std::vector, std::vector>>> - channel_to_payload; - channel_to_payload.resize(_channels.size()); - for (int i = 0; i < num_rows; ++i) { - if (filtered_rows > 0 && _filter_bitmap.Get(i)) { - continue; - } - const VOlapTablePartition* partition = nullptr; - uint32_t tablet_index = 0; - block_row = {&block, i}; - if (!_vpartition->find_partition(&block_row, &partition)) { - RETURN_IF_ERROR(state->append_error_msg_to_file( - []() -> std::string { return ""; }, - [&]() -> std::string { - fmt::memory_buffer buf; - fmt::format_to(buf, "no partition for this tuple. tuple={}", - block.dump_data(i, 1)); - return fmt::to_string(buf); - }, - &stop_processing)); - _number_filtered_rows++; - if (stop_processing) { - return Status::EndOfFile("Encountered unqualified data, stop processing"); + bool use_vec = _is_vectorized && state->be_exec_version() > 0; + if (use_vec) { + std::vector, std::vector>>> + channel_to_payload; + channel_to_payload.resize(_channels.size()); + for (int i = 0; i < num_rows; ++i) { + if (filtered_rows > 0 && _filter_bitmap.Get(i)) { + continue; } - continue; - } - _partition_ids.emplace(partition->id); - if (findTabletMode != FindTabletMode::FIND_TABLET_EVERY_ROW) { - if (_partition_to_tablet_map.find(partition->id) == _partition_to_tablet_map.end()) { - tablet_index = _vpartition->find_tablet(&block_row, *partition); - _partition_to_tablet_map.emplace(partition->id, tablet_index); - } else { - tablet_index = _partition_to_tablet_map[partition->id]; + const VOlapTablePartition* partition = nullptr; + uint32_t tablet_index = 0; + bool is_continue = false; + RETURN_IF_ERROR(find_tablet(state, &block, i, &partition, tablet_index, stop_processing, + is_continue)); + if (is_continue) { + continue; } - } else { - tablet_index = _vpartition->find_tablet(&block_row, *partition); - } - for (int j = 0; j < partition->indexes.size(); ++j) { - auto tid = partition->indexes[j].tablets[tablet_index]; - auto it = _channels[j]->_channels_by_tablet.find(tid); - DCHECK(it != _channels[j]->_channels_by_tablet.end()) - << "unknown tablet, tablet_id=" << tablet_index; - for (const auto& channel : it->second) { - if (channel_to_payload[j].count(channel.get()) < 1) { - channel_to_payload[j].insert( - {channel.get(), - std::pair, - std::vector> { - std::unique_ptr( - new vectorized::IColumn::Selector()), - std::vector()}}); + for (int j = 0; j < partition->indexes.size(); ++j) { + auto tid = partition->indexes[j].tablets[tablet_index]; + auto it = _channels[j]->_channels_by_tablet.find(tid); + DCHECK(it != _channels[j]->_channels_by_tablet.end()) + << "unknown tablet, tablet_id=" << tablet_index; + for (const auto& channel : it->second) { + if (channel_to_payload[j].count(channel.get()) < 1) { + channel_to_payload[j].insert( + {channel.get(), + std::pair, + std::vector> { + std::unique_ptr( + new vectorized::IColumn::Selector()), + std::vector()}}); + } + channel_to_payload[j][channel.get()].first->push_back(i); + channel_to_payload[j][channel.get()].second.push_back(tid); } - channel_to_payload[j][channel.get()].first->push_back(i); - channel_to_payload[j][channel.get()].second.push_back(tid); + _number_output_rows++; } - _number_output_rows++; } - } - for (size_t i = 0; i < _channels.size(); i++) { - for (const auto& entry : channel_to_payload[i]) { - // if this node channel is already failed, this add_row will be skipped - auto st = entry.first->add_block(&block, entry.second); - if (!st.ok()) { - _channels[i]->mark_as_failed(entry.first->node_id(), entry.first->host(), - st.get_error_msg()); + for (size_t i = 0; i < _channels.size(); i++) { + for (const auto& entry : channel_to_payload[i]) { + // if this node channel is already failed, this add_row will be skipped + auto st = entry.first->add_block(&block, entry.second); + if (!st.ok()) { + _channels[i]->mark_as_failed(entry.first->node_id(), entry.first->host(), + st.get_error_msg()); + } + } + } + } else { + size_t MAX_PENDING_BYTES = _load_mem_limit / 3; + while (get_pending_bytes() > MAX_PENDING_BYTES && !state->is_cancelled()) { + std::this_thread::sleep_for(std::chrono::microseconds(100)); + } + + for (int i = 0; i < num_rows; ++i) { + if (filtered_rows > 0 && _filter_bitmap.Get(i)) { + continue; + } + const VOlapTablePartition* partition = nullptr; + uint32_t tablet_index = 0; + BlockRow block_row; + block_row = {&block, i}; + bool is_continue = false; + RETURN_IF_ERROR(find_tablet(state, &block, i, &partition, tablet_index, stop_processing, + is_continue)); + if (is_continue) { + continue; + } + + for (int j = 0; j < partition->indexes.size(); ++j) { + int64_t tablet_id = partition->indexes[j].tablets[tablet_index]; + _channels[j]->add_row(block_row, tablet_id); + _number_output_rows++; } } } + // check intolerable failure for (const auto& index_channel : _channels) { RETURN_IF_ERROR(index_channel->check_intolerable_failure()); diff --git a/be/src/vec/sink/vtablet_sink.h b/be/src/vec/sink/vtablet_sink.h index bc24303fdc..857c43722b 100644 --- a/be/src/vec/sink/vtablet_sink.h +++ b/be/src/vec/sink/vtablet_sink.h @@ -110,6 +110,10 @@ private: // so here need to do the convert operation void _convert_to_dest_desc_block(vectorized::Block* block); + Status find_tablet(RuntimeState* state, vectorized::Block* block, int row_index, + const VOlapTablePartition** partition, uint32_t& tablet_index, + bool& stop_processing, bool& is_continue); + VOlapTablePartitionParam* _vpartition = nullptr; std::vector _output_vexpr_ctxs; }; diff --git a/be/test/vec/exec/vtablet_sink_test.cpp b/be/test/vec/exec/vtablet_sink_test.cpp index f2b7248fce..ae1f615035 100644 --- a/be/test/vec/exec/vtablet_sink_test.cpp +++ b/be/test/vec/exec/vtablet_sink_test.cpp @@ -33,6 +33,7 @@ #include "runtime/runtime_state.h" #include "runtime/stream_load/load_stream_mgr.h" #include "runtime/thread_resource_mgr.h" +#include "runtime/tuple_row.h" #include "runtime/types.h" #include "service/brpc.h" #include "util/brpc_client_cache.h" @@ -46,47 +47,6 @@ namespace stream_load { extern Status k_add_batch_status; -class VOlapTableSinkTest : public testing::Test { -public: - VOlapTableSinkTest() {} - virtual ~VOlapTableSinkTest() {} - void SetUp() override { - k_add_batch_status = Status::OK(); - _env = ExecEnv::GetInstance(); - _env->_thread_mgr = new ThreadResourceMgr(); - _env->_master_info = new TMasterInfo(); - _env->_load_stream_mgr = new LoadStreamMgr(); - _env->_internal_client_cache = new BrpcClientCache(); - _env->_function_client_cache = new BrpcClientCache(); - _env->_task_pool_mem_tracker_registry = new MemTrackerTaskPool(); - ThreadPoolBuilder("SendBatchThreadPool") - .set_min_threads(1) - .set_max_threads(5) - .set_max_queue_size(100) - .build(&_env->_send_batch_thread_pool); - config::tablet_writer_open_rpc_timeout_sec = 60; - config::max_send_batch_parallelism_per_job = 1; - } - - void TearDown() override { - SAFE_DELETE(_env->_internal_client_cache); - SAFE_DELETE(_env->_function_client_cache); - SAFE_DELETE(_env->_load_stream_mgr); - SAFE_DELETE(_env->_master_info); - SAFE_DELETE(_env->_thread_mgr); - SAFE_DELETE(_env->_task_pool_mem_tracker_registry); - if (_server) { - _server->Stop(100); - _server->Join(); - SAFE_DELETE(_server); - } - } - -private: - ExecEnv* _env = nullptr; - brpc::Server* _server = nullptr; -}; - TDataSink get_data_sink(TDescriptorTable* desc_tbl); TDataSink get_decimal_sink(TDescriptorTable* desc_tbl); @@ -111,6 +71,31 @@ public: status.to_protobuf(response->mutable_status()); } + void tablet_writer_add_batch(google::protobuf::RpcController* controller, + const PTabletWriterAddBatchRequest* request, + PTabletWriterAddBatchResult* response, + google::protobuf::Closure* done) override { + brpc::ClosureGuard done_guard(done); + { + std::lock_guard l(_lock); + _row_counters += request->tablet_ids_size(); + if (request->eos()) { + _eof_counters++; + } + k_add_batch_status.to_protobuf(response->mutable_status()); + + if (request->has_row_batch() && _row_desc != nullptr) { + brpc::Controller* cntl = static_cast(controller); + attachment_transfer_request_row_batch(request, cntl); + RowBatch batch(*_row_desc, request->row_batch()); + for (int i = 0; i < batch.num_rows(); ++i) { + LOG(INFO) << batch.get_row(i)->to_string(*_row_desc); + _output_set->emplace(batch.get_row(i)->to_string(*_row_desc)); + } + } + } + } + void tablet_writer_add_block(google::protobuf::RpcController* controller, const PTabletWriterAddBlockRequest* request, PTabletWriterAddBlockResult* response, @@ -160,107 +145,157 @@ public: std::set* _output_set = nullptr; }; +class VOlapTableSinkTest : public testing::Test { +public: + VOlapTableSinkTest() {} + virtual ~VOlapTableSinkTest() {} + void SetUp() override { + k_add_batch_status = Status::OK(); + _env = ExecEnv::GetInstance(); + _env->_thread_mgr = new ThreadResourceMgr(); + _env->_master_info = new TMasterInfo(); + _env->_load_stream_mgr = new LoadStreamMgr(); + _env->_internal_client_cache = new BrpcClientCache(); + _env->_function_client_cache = new BrpcClientCache(); + _env->_task_pool_mem_tracker_registry = new MemTrackerTaskPool(); + ThreadPoolBuilder("SendBatchThreadPool") + .set_min_threads(1) + .set_max_threads(5) + .set_max_queue_size(100) + .build(&_env->_send_batch_thread_pool); + config::tablet_writer_open_rpc_timeout_sec = 60; + config::max_send_batch_parallelism_per_job = 1; + } + + void TearDown() override { + SAFE_DELETE(_env->_internal_client_cache); + SAFE_DELETE(_env->_function_client_cache); + SAFE_DELETE(_env->_load_stream_mgr); + SAFE_DELETE(_env->_master_info); + SAFE_DELETE(_env->_thread_mgr); + SAFE_DELETE(_env->_task_pool_mem_tracker_registry); + if (_server) { + _server->Stop(100); + _server->Join(); + SAFE_DELETE(_server); + } + } + + void test_normal(int be_exec_version) { + // start brpc service first + _server = new brpc::Server(); + auto service = new VTestInternalService(); + ASSERT_EQ(_server->AddService(service, brpc::SERVER_OWNS_SERVICE), 0); + brpc::ServerOptions options; + { + debug::ScopedLeakCheckDisabler disable_lsan; + _server->Start(4356, &options); + } + + TUniqueId fragment_id; + TQueryOptions query_options; + query_options.batch_size = 1; + query_options.be_exec_version = be_exec_version; + RuntimeState state(fragment_id, query_options, TQueryGlobals(), _env); + state.init_mem_trackers(TUniqueId()); + + ObjectPool obj_pool; + TDescriptorTable tdesc_tbl; + auto t_data_sink = get_data_sink(&tdesc_tbl); + + // crate desc_tabl + DescriptorTbl* desc_tbl = nullptr; + auto st = DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl); + ASSERT_TRUE(st.ok()); + state._desc_tbl = desc_tbl; + + TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0); + LOG(INFO) << "tuple_desc=" << tuple_desc->debug_string(); + + RowDescriptor row_desc(*desc_tbl, {0}, {false}); + service->_row_desc = &row_desc; + std::set output_set; + service->_output_set = &output_set; + + VOlapTableSink sink(&obj_pool, row_desc, {}, &st); + ASSERT_TRUE(st.ok()); + + // init + st = sink.init(t_data_sink); + ASSERT_TRUE(st.ok()); + // prepare + st = sink.prepare(&state); + ASSERT_TRUE(st.ok()); + // open + st = sink.open(&state); + ASSERT_TRUE(st.ok()); + + int slot_count = tuple_desc->slots().size(); + std::vector columns(slot_count); + for (int i = 0; i < slot_count; i++) { + columns[i] = tuple_desc->slots()[i]->get_empty_mutable_column(); + } + + int col_idx = 0; + auto* column_ptr = columns[col_idx++].get(); + auto column_vector_int = column_ptr; + int int_val = 12; + column_vector_int->insert_data((const char*)&int_val, 0); + int_val = 13; + column_vector_int->insert_data((const char*)&int_val, 0); + int_val = 14; + column_vector_int->insert_data((const char*)&int_val, 0); + + column_ptr = columns[col_idx++].get(); + auto column_vector_bigint = column_ptr; + int64_t int64_val = 9; + column_vector_bigint->insert_data((const char*)&int64_val, 0); + int64_val = 25; + column_vector_bigint->insert_data((const char*)&int64_val, 0); + int64_val = 50; + column_vector_bigint->insert_data((const char*)&int64_val, 0); + + column_ptr = columns[col_idx++].get(); + auto column_vector_str = column_ptr; + column_vector_str->insert_data("abc", 3); + column_vector_str->insert_data("abcd", 4); + column_vector_str->insert_data("abcde1234567890", 15); + + vectorized::Block block; + col_idx = 0; + for (const auto slot_desc : tuple_desc->slots()) { + block.insert(vectorized::ColumnWithTypeAndName(std::move(columns[col_idx++]), + slot_desc->get_data_type_ptr(), + slot_desc->col_name())); + } + + // send + st = sink.send(&state, &block); + ASSERT_TRUE(st.ok()); + // close + st = sink.close(&state, Status::OK()); + ASSERT_TRUE(st.ok() || st.to_string() == "Internal error: wait close failed. ") + << st.to_string(); + + // each node has a eof + ASSERT_EQ(2, service->_eof_counters); + ASSERT_EQ(2 * 2, service->_row_counters); + + // 2node * 2 + ASSERT_EQ(1, state.num_rows_load_filtered()); + } + +private: + ExecEnv* _env = nullptr; + brpc::Server* _server = nullptr; +}; + TEST_F(VOlapTableSinkTest, normal) { - // start brpc service first - _server = new brpc::Server(); - auto service = new VTestInternalService(); - ASSERT_EQ(_server->AddService(service, brpc::SERVER_OWNS_SERVICE), 0); - brpc::ServerOptions options; - { - debug::ScopedLeakCheckDisabler disable_lsan; - _server->Start(4356, &options); - } + test_normal(1); +} - TUniqueId fragment_id; - TQueryOptions query_options; - query_options.batch_size = 1; - RuntimeState state(fragment_id, query_options, TQueryGlobals(), _env); - state.init_mem_trackers(TUniqueId()); - - ObjectPool obj_pool; - TDescriptorTable tdesc_tbl; - auto t_data_sink = get_data_sink(&tdesc_tbl); - - // crate desc_tabl - DescriptorTbl* desc_tbl = nullptr; - auto st = DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl); - ASSERT_TRUE(st.ok()); - state._desc_tbl = desc_tbl; - - TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0); - LOG(INFO) << "tuple_desc=" << tuple_desc->debug_string(); - - RowDescriptor row_desc(*desc_tbl, {0}, {false}); - service->_row_desc = &row_desc; - std::set output_set; - service->_output_set = &output_set; - - VOlapTableSink sink(&obj_pool, row_desc, {}, &st); - ASSERT_TRUE(st.ok()); - - // init - st = sink.init(t_data_sink); - ASSERT_TRUE(st.ok()); - // prepare - st = sink.prepare(&state); - ASSERT_TRUE(st.ok()); - // open - st = sink.open(&state); - ASSERT_TRUE(st.ok()); - - int slot_count = tuple_desc->slots().size(); - std::vector columns(slot_count); - for (int i = 0; i < slot_count; i++) { - columns[i] = tuple_desc->slots()[i]->get_empty_mutable_column(); - } - - int col_idx = 0; - auto* column_ptr = columns[col_idx++].get(); - auto column_vector_int = column_ptr; - int int_val = 12; - column_vector_int->insert_data((const char*)&int_val, 0); - int_val = 13; - column_vector_int->insert_data((const char*)&int_val, 0); - int_val = 14; - column_vector_int->insert_data((const char*)&int_val, 0); - - column_ptr = columns[col_idx++].get(); - auto column_vector_bigint = column_ptr; - int64_t int64_val = 9; - column_vector_bigint->insert_data((const char*)&int64_val, 0); - int64_val = 25; - column_vector_bigint->insert_data((const char*)&int64_val, 0); - int64_val = 50; - column_vector_bigint->insert_data((const char*)&int64_val, 0); - - column_ptr = columns[col_idx++].get(); - auto column_vector_str = column_ptr; - column_vector_str->insert_data("abc", 3); - column_vector_str->insert_data("abcd", 4); - column_vector_str->insert_data("abcde1234567890", 15); - - vectorized::Block block; - col_idx = 0; - for (const auto slot_desc : tuple_desc->slots()) { - block.insert(vectorized::ColumnWithTypeAndName(std::move(columns[col_idx++]), - slot_desc->get_data_type_ptr(), - slot_desc->col_name())); - } - - // send - st = sink.send(&state, &block); - ASSERT_TRUE(st.ok()); - // close - st = sink.close(&state, Status::OK()); - ASSERT_TRUE(st.ok() || st.to_string() == "Internal error: wait close failed. ") - << st.to_string(); - - // each node has a eof - ASSERT_EQ(2, service->_eof_counters); - ASSERT_EQ(2 * 2, service->_row_counters); - - // 2node * 2 - ASSERT_EQ(1, state.num_rows_load_filtered()); +TEST_F(VOlapTableSinkTest, fallback) { + test_normal(0); } TEST_F(VOlapTableSinkTest, convert) { @@ -277,6 +312,7 @@ TEST_F(VOlapTableSinkTest, convert) { TUniqueId fragment_id; TQueryOptions query_options; query_options.batch_size = 1024; + query_options.be_exec_version = 1; RuntimeState state(fragment_id, query_options, TQueryGlobals(), _env); state.init_mem_trackers(TUniqueId()); @@ -406,6 +442,7 @@ TEST_F(VOlapTableSinkTest, add_block_failed) { TUniqueId fragment_id; TQueryOptions query_options; query_options.batch_size = 1; + query_options.be_exec_version = 1; RuntimeState state(fragment_id, query_options, TQueryGlobals(), _env); state.init_mem_trackers(TUniqueId()); @@ -519,6 +556,7 @@ TEST_F(VOlapTableSinkTest, decimal) { TUniqueId fragment_id; TQueryOptions query_options; query_options.batch_size = 1; + query_options.be_exec_version = 1; RuntimeState state(fragment_id, query_options, TQueryGlobals(), _env); state.init_mem_trackers(TUniqueId());