[fix](tablet sink) fallback to non-vectorized interface in tablet_sink if is in progress of upgrding from 1.1-lts to 1.2-lts (#13966)
This commit is contained in:
@ -338,6 +338,49 @@ Status NodeChannel::add_row(Tuple* input_tuple, int64_t tablet_id) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
// Used for vectorized engine.
|
||||
// TODO(cmy): deprecated, need refactor
|
||||
Status NodeChannel::add_row(const BlockRow& block_row, int64_t tablet_id) {
|
||||
SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get());
|
||||
// If add_row() when _eos_is_produced==true, there must be sth wrong, we can only mark this channel as failed.
|
||||
auto st = none_of({_cancelled, _eos_is_produced});
|
||||
if (!st.ok()) {
|
||||
if (_cancelled) {
|
||||
std::lock_guard<SpinLock> l(_cancel_msg_lock);
|
||||
return Status::InternalError("add row failed. " + _cancel_msg);
|
||||
} else {
|
||||
return std::move(st.prepend("already stopped, can't add row. cancelled/eos: "));
|
||||
}
|
||||
}
|
||||
|
||||
constexpr size_t BATCH_SIZE_FOR_SEND = 2 * 1024 * 1024; //2M
|
||||
auto row_no = _cur_batch->add_row();
|
||||
if (row_no == RowBatch::INVALID_ROW_INDEX ||
|
||||
_cur_batch->tuple_data_pool()->total_allocated_bytes() > BATCH_SIZE_FOR_SEND) {
|
||||
{
|
||||
SCOPED_ATOMIC_TIMER(&_queue_push_lock_ns);
|
||||
std::lock_guard<std::mutex> l(_pending_batches_lock);
|
||||
_pending_batches_bytes += _cur_batch->tuple_data_pool()->total_reserved_bytes();
|
||||
//To simplify the add_row logic, postpone adding batch into req until the time of sending req
|
||||
_pending_batches.emplace(std::move(_cur_batch), _cur_add_batch_request);
|
||||
_pending_batches_num++;
|
||||
}
|
||||
|
||||
_cur_batch.reset(new RowBatch(*_row_desc, _batch_size));
|
||||
_cur_add_batch_request.clear_tablet_ids();
|
||||
|
||||
row_no = _cur_batch->add_row();
|
||||
}
|
||||
DCHECK_NE(row_no, RowBatch::INVALID_ROW_INDEX);
|
||||
|
||||
_cur_batch->get_row(row_no)->set_tuple(
|
||||
0, block_row.first->deep_copy_tuple(*_tuple_desc, _cur_batch->tuple_data_pool(),
|
||||
block_row.second, 0, true));
|
||||
_cur_batch->commit_last_row();
|
||||
_cur_add_batch_request.add_tablet_ids(tablet_id);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
void NodeChannel::mark_close() {
|
||||
SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get());
|
||||
auto st = none_of({_cancelled, _eos_is_produced});
|
||||
@ -883,6 +926,7 @@ Status OlapTableSink::prepare(RuntimeState* state) {
|
||||
_load_mem_limit = state->get_load_mem_limit();
|
||||
|
||||
// open all channels
|
||||
bool use_vec = _is_vectorized && state->be_exec_version() > 0;
|
||||
const auto& partitions = _partition->get_partitions();
|
||||
for (int i = 0; i < _schema->indexes().size(); ++i) {
|
||||
// collect all tablets belong to this rollup
|
||||
@ -896,7 +940,7 @@ Status OlapTableSink::prepare(RuntimeState* state) {
|
||||
tablets.emplace_back(std::move(tablet_with_partition));
|
||||
}
|
||||
}
|
||||
_channels.emplace_back(new IndexChannel(this, index->index_id, _is_vectorized));
|
||||
_channels.emplace_back(new IndexChannel(this, index->index_id, use_vec));
|
||||
RETURN_IF_ERROR(_channels.back()->init(state, tablets));
|
||||
}
|
||||
|
||||
|
||||
@ -187,6 +187,8 @@ public:
|
||||
|
||||
Status add_row(Tuple* tuple, int64_t tablet_id);
|
||||
|
||||
Status add_row(const BlockRow& block_row, int64_t tablet_id);
|
||||
|
||||
virtual Status add_block(vectorized::Block* block,
|
||||
const std::pair<std::unique_ptr<vectorized::IColumn::Selector>,
|
||||
std::vector<int64_t>>& payload) {
|
||||
@ -236,7 +238,7 @@ public:
|
||||
|
||||
void clear_all_batches();
|
||||
|
||||
virtual void clear_all_blocks() { LOG(FATAL) << "NodeChannel::clear_all_blocks not supported"; }
|
||||
virtual void clear_all_blocks() {}
|
||||
|
||||
std::string channel_info() const {
|
||||
return fmt::format("{}, {}, node={}:{}", _name, _load_info, _node_info.host,
|
||||
|
||||
@ -446,6 +446,46 @@ size_t VOlapTableSink::get_pending_bytes() const {
|
||||
return mem_consumption;
|
||||
}
|
||||
|
||||
Status VOlapTableSink::find_tablet(RuntimeState* state, vectorized::Block* block, int row_index,
|
||||
const VOlapTablePartition** partition, uint32_t& tablet_index,
|
||||
bool& stop_processing, bool& is_continue) {
|
||||
Status status = Status::OK();
|
||||
*partition = nullptr;
|
||||
tablet_index = 0;
|
||||
BlockRow block_row;
|
||||
block_row = {block, row_index};
|
||||
if (!_vpartition->find_partition(&block_row, partition)) {
|
||||
RETURN_IF_ERROR(state->append_error_msg_to_file(
|
||||
[]() -> std::string { return ""; },
|
||||
[&]() -> std::string {
|
||||
fmt::memory_buffer buf;
|
||||
fmt::format_to(buf, "no partition for this tuple. tuple={}",
|
||||
block->dump_data(row_index, 1));
|
||||
return fmt::to_string(buf);
|
||||
},
|
||||
&stop_processing));
|
||||
_number_filtered_rows++;
|
||||
if (stop_processing) {
|
||||
return Status::EndOfFile("Encountered unqualified data, stop processing");
|
||||
}
|
||||
is_continue = true;
|
||||
return status;
|
||||
}
|
||||
_partition_ids.emplace((*partition)->id);
|
||||
if (findTabletMode != FindTabletMode::FIND_TABLET_EVERY_ROW) {
|
||||
if (_partition_to_tablet_map.find((*partition)->id) == _partition_to_tablet_map.end()) {
|
||||
tablet_index = _vpartition->find_tablet(&block_row, **partition);
|
||||
_partition_to_tablet_map.emplace((*partition)->id, tablet_index);
|
||||
} else {
|
||||
tablet_index = _partition_to_tablet_map[(*partition)->id];
|
||||
}
|
||||
} else {
|
||||
tablet_index = _vpartition->find_tablet(&block_row, **partition);
|
||||
}
|
||||
|
||||
return status;
|
||||
}
|
||||
|
||||
Status VOlapTableSink::send(RuntimeState* state, vectorized::Block* input_block) {
|
||||
INIT_AND_SCOPE_SEND_SPAN(state->get_tracer(), _send_span, "VOlapTableSink::send");
|
||||
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
|
||||
@ -493,7 +533,6 @@ Status VOlapTableSink::send(RuntimeState* state, vectorized::Block* input_block)
|
||||
_convert_to_dest_desc_block(&block);
|
||||
}
|
||||
|
||||
BlockRow block_row;
|
||||
SCOPED_RAW_TIMER(&_send_data_ns);
|
||||
// This is just for passing compilation.
|
||||
bool stop_processing = false;
|
||||
@ -501,76 +540,85 @@ Status VOlapTableSink::send(RuntimeState* state, vectorized::Block* input_block)
|
||||
_partition_to_tablet_map.clear();
|
||||
}
|
||||
|
||||
std::vector<std::unordered_map<
|
||||
NodeChannel*,
|
||||
std::pair<std::unique_ptr<vectorized::IColumn::Selector>, std::vector<int64_t>>>>
|
||||
channel_to_payload;
|
||||
channel_to_payload.resize(_channels.size());
|
||||
for (int i = 0; i < num_rows; ++i) {
|
||||
if (filtered_rows > 0 && _filter_bitmap.Get(i)) {
|
||||
continue;
|
||||
}
|
||||
const VOlapTablePartition* partition = nullptr;
|
||||
uint32_t tablet_index = 0;
|
||||
block_row = {&block, i};
|
||||
if (!_vpartition->find_partition(&block_row, &partition)) {
|
||||
RETURN_IF_ERROR(state->append_error_msg_to_file(
|
||||
[]() -> std::string { return ""; },
|
||||
[&]() -> std::string {
|
||||
fmt::memory_buffer buf;
|
||||
fmt::format_to(buf, "no partition for this tuple. tuple={}",
|
||||
block.dump_data(i, 1));
|
||||
return fmt::to_string(buf);
|
||||
},
|
||||
&stop_processing));
|
||||
_number_filtered_rows++;
|
||||
if (stop_processing) {
|
||||
return Status::EndOfFile("Encountered unqualified data, stop processing");
|
||||
bool use_vec = _is_vectorized && state->be_exec_version() > 0;
|
||||
if (use_vec) {
|
||||
std::vector<std::unordered_map<
|
||||
NodeChannel*,
|
||||
std::pair<std::unique_ptr<vectorized::IColumn::Selector>, std::vector<int64_t>>>>
|
||||
channel_to_payload;
|
||||
channel_to_payload.resize(_channels.size());
|
||||
for (int i = 0; i < num_rows; ++i) {
|
||||
if (filtered_rows > 0 && _filter_bitmap.Get(i)) {
|
||||
continue;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
_partition_ids.emplace(partition->id);
|
||||
if (findTabletMode != FindTabletMode::FIND_TABLET_EVERY_ROW) {
|
||||
if (_partition_to_tablet_map.find(partition->id) == _partition_to_tablet_map.end()) {
|
||||
tablet_index = _vpartition->find_tablet(&block_row, *partition);
|
||||
_partition_to_tablet_map.emplace(partition->id, tablet_index);
|
||||
} else {
|
||||
tablet_index = _partition_to_tablet_map[partition->id];
|
||||
const VOlapTablePartition* partition = nullptr;
|
||||
uint32_t tablet_index = 0;
|
||||
bool is_continue = false;
|
||||
RETURN_IF_ERROR(find_tablet(state, &block, i, &partition, tablet_index, stop_processing,
|
||||
is_continue));
|
||||
if (is_continue) {
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
tablet_index = _vpartition->find_tablet(&block_row, *partition);
|
||||
}
|
||||
for (int j = 0; j < partition->indexes.size(); ++j) {
|
||||
auto tid = partition->indexes[j].tablets[tablet_index];
|
||||
auto it = _channels[j]->_channels_by_tablet.find(tid);
|
||||
DCHECK(it != _channels[j]->_channels_by_tablet.end())
|
||||
<< "unknown tablet, tablet_id=" << tablet_index;
|
||||
for (const auto& channel : it->second) {
|
||||
if (channel_to_payload[j].count(channel.get()) < 1) {
|
||||
channel_to_payload[j].insert(
|
||||
{channel.get(),
|
||||
std::pair<std::unique_ptr<vectorized::IColumn::Selector>,
|
||||
std::vector<int64_t>> {
|
||||
std::unique_ptr<vectorized::IColumn::Selector>(
|
||||
new vectorized::IColumn::Selector()),
|
||||
std::vector<int64_t>()}});
|
||||
for (int j = 0; j < partition->indexes.size(); ++j) {
|
||||
auto tid = partition->indexes[j].tablets[tablet_index];
|
||||
auto it = _channels[j]->_channels_by_tablet.find(tid);
|
||||
DCHECK(it != _channels[j]->_channels_by_tablet.end())
|
||||
<< "unknown tablet, tablet_id=" << tablet_index;
|
||||
for (const auto& channel : it->second) {
|
||||
if (channel_to_payload[j].count(channel.get()) < 1) {
|
||||
channel_to_payload[j].insert(
|
||||
{channel.get(),
|
||||
std::pair<std::unique_ptr<vectorized::IColumn::Selector>,
|
||||
std::vector<int64_t>> {
|
||||
std::unique_ptr<vectorized::IColumn::Selector>(
|
||||
new vectorized::IColumn::Selector()),
|
||||
std::vector<int64_t>()}});
|
||||
}
|
||||
channel_to_payload[j][channel.get()].first->push_back(i);
|
||||
channel_to_payload[j][channel.get()].second.push_back(tid);
|
||||
}
|
||||
channel_to_payload[j][channel.get()].first->push_back(i);
|
||||
channel_to_payload[j][channel.get()].second.push_back(tid);
|
||||
_number_output_rows++;
|
||||
}
|
||||
_number_output_rows++;
|
||||
}
|
||||
}
|
||||
for (size_t i = 0; i < _channels.size(); i++) {
|
||||
for (const auto& entry : channel_to_payload[i]) {
|
||||
// if this node channel is already failed, this add_row will be skipped
|
||||
auto st = entry.first->add_block(&block, entry.second);
|
||||
if (!st.ok()) {
|
||||
_channels[i]->mark_as_failed(entry.first->node_id(), entry.first->host(),
|
||||
st.get_error_msg());
|
||||
for (size_t i = 0; i < _channels.size(); i++) {
|
||||
for (const auto& entry : channel_to_payload[i]) {
|
||||
// if this node channel is already failed, this add_row will be skipped
|
||||
auto st = entry.first->add_block(&block, entry.second);
|
||||
if (!st.ok()) {
|
||||
_channels[i]->mark_as_failed(entry.first->node_id(), entry.first->host(),
|
||||
st.get_error_msg());
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
size_t MAX_PENDING_BYTES = _load_mem_limit / 3;
|
||||
while (get_pending_bytes() > MAX_PENDING_BYTES && !state->is_cancelled()) {
|
||||
std::this_thread::sleep_for(std::chrono::microseconds(100));
|
||||
}
|
||||
|
||||
for (int i = 0; i < num_rows; ++i) {
|
||||
if (filtered_rows > 0 && _filter_bitmap.Get(i)) {
|
||||
continue;
|
||||
}
|
||||
const VOlapTablePartition* partition = nullptr;
|
||||
uint32_t tablet_index = 0;
|
||||
BlockRow block_row;
|
||||
block_row = {&block, i};
|
||||
bool is_continue = false;
|
||||
RETURN_IF_ERROR(find_tablet(state, &block, i, &partition, tablet_index, stop_processing,
|
||||
is_continue));
|
||||
if (is_continue) {
|
||||
continue;
|
||||
}
|
||||
|
||||
for (int j = 0; j < partition->indexes.size(); ++j) {
|
||||
int64_t tablet_id = partition->indexes[j].tablets[tablet_index];
|
||||
_channels[j]->add_row(block_row, tablet_id);
|
||||
_number_output_rows++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// check intolerable failure
|
||||
for (const auto& index_channel : _channels) {
|
||||
RETURN_IF_ERROR(index_channel->check_intolerable_failure());
|
||||
|
||||
@ -110,6 +110,10 @@ private:
|
||||
// so here need to do the convert operation
|
||||
void _convert_to_dest_desc_block(vectorized::Block* block);
|
||||
|
||||
Status find_tablet(RuntimeState* state, vectorized::Block* block, int row_index,
|
||||
const VOlapTablePartition** partition, uint32_t& tablet_index,
|
||||
bool& stop_processing, bool& is_continue);
|
||||
|
||||
VOlapTablePartitionParam* _vpartition = nullptr;
|
||||
std::vector<vectorized::VExprContext*> _output_vexpr_ctxs;
|
||||
};
|
||||
|
||||
@ -33,6 +33,7 @@
|
||||
#include "runtime/runtime_state.h"
|
||||
#include "runtime/stream_load/load_stream_mgr.h"
|
||||
#include "runtime/thread_resource_mgr.h"
|
||||
#include "runtime/tuple_row.h"
|
||||
#include "runtime/types.h"
|
||||
#include "service/brpc.h"
|
||||
#include "util/brpc_client_cache.h"
|
||||
@ -46,47 +47,6 @@ namespace stream_load {
|
||||
|
||||
extern Status k_add_batch_status;
|
||||
|
||||
class VOlapTableSinkTest : public testing::Test {
|
||||
public:
|
||||
VOlapTableSinkTest() {}
|
||||
virtual ~VOlapTableSinkTest() {}
|
||||
void SetUp() override {
|
||||
k_add_batch_status = Status::OK();
|
||||
_env = ExecEnv::GetInstance();
|
||||
_env->_thread_mgr = new ThreadResourceMgr();
|
||||
_env->_master_info = new TMasterInfo();
|
||||
_env->_load_stream_mgr = new LoadStreamMgr();
|
||||
_env->_internal_client_cache = new BrpcClientCache<PBackendService_Stub>();
|
||||
_env->_function_client_cache = new BrpcClientCache<PFunctionService_Stub>();
|
||||
_env->_task_pool_mem_tracker_registry = new MemTrackerTaskPool();
|
||||
ThreadPoolBuilder("SendBatchThreadPool")
|
||||
.set_min_threads(1)
|
||||
.set_max_threads(5)
|
||||
.set_max_queue_size(100)
|
||||
.build(&_env->_send_batch_thread_pool);
|
||||
config::tablet_writer_open_rpc_timeout_sec = 60;
|
||||
config::max_send_batch_parallelism_per_job = 1;
|
||||
}
|
||||
|
||||
void TearDown() override {
|
||||
SAFE_DELETE(_env->_internal_client_cache);
|
||||
SAFE_DELETE(_env->_function_client_cache);
|
||||
SAFE_DELETE(_env->_load_stream_mgr);
|
||||
SAFE_DELETE(_env->_master_info);
|
||||
SAFE_DELETE(_env->_thread_mgr);
|
||||
SAFE_DELETE(_env->_task_pool_mem_tracker_registry);
|
||||
if (_server) {
|
||||
_server->Stop(100);
|
||||
_server->Join();
|
||||
SAFE_DELETE(_server);
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
ExecEnv* _env = nullptr;
|
||||
brpc::Server* _server = nullptr;
|
||||
};
|
||||
|
||||
TDataSink get_data_sink(TDescriptorTable* desc_tbl);
|
||||
TDataSink get_decimal_sink(TDescriptorTable* desc_tbl);
|
||||
|
||||
@ -111,6 +71,31 @@ public:
|
||||
status.to_protobuf(response->mutable_status());
|
||||
}
|
||||
|
||||
void tablet_writer_add_batch(google::protobuf::RpcController* controller,
|
||||
const PTabletWriterAddBatchRequest* request,
|
||||
PTabletWriterAddBatchResult* response,
|
||||
google::protobuf::Closure* done) override {
|
||||
brpc::ClosureGuard done_guard(done);
|
||||
{
|
||||
std::lock_guard<std::mutex> l(_lock);
|
||||
_row_counters += request->tablet_ids_size();
|
||||
if (request->eos()) {
|
||||
_eof_counters++;
|
||||
}
|
||||
k_add_batch_status.to_protobuf(response->mutable_status());
|
||||
|
||||
if (request->has_row_batch() && _row_desc != nullptr) {
|
||||
brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
|
||||
attachment_transfer_request_row_batch<PTabletWriterAddBatchRequest>(request, cntl);
|
||||
RowBatch batch(*_row_desc, request->row_batch());
|
||||
for (int i = 0; i < batch.num_rows(); ++i) {
|
||||
LOG(INFO) << batch.get_row(i)->to_string(*_row_desc);
|
||||
_output_set->emplace(batch.get_row(i)->to_string(*_row_desc));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void tablet_writer_add_block(google::protobuf::RpcController* controller,
|
||||
const PTabletWriterAddBlockRequest* request,
|
||||
PTabletWriterAddBlockResult* response,
|
||||
@ -160,107 +145,157 @@ public:
|
||||
std::set<std::string>* _output_set = nullptr;
|
||||
};
|
||||
|
||||
class VOlapTableSinkTest : public testing::Test {
|
||||
public:
|
||||
VOlapTableSinkTest() {}
|
||||
virtual ~VOlapTableSinkTest() {}
|
||||
void SetUp() override {
|
||||
k_add_batch_status = Status::OK();
|
||||
_env = ExecEnv::GetInstance();
|
||||
_env->_thread_mgr = new ThreadResourceMgr();
|
||||
_env->_master_info = new TMasterInfo();
|
||||
_env->_load_stream_mgr = new LoadStreamMgr();
|
||||
_env->_internal_client_cache = new BrpcClientCache<PBackendService_Stub>();
|
||||
_env->_function_client_cache = new BrpcClientCache<PFunctionService_Stub>();
|
||||
_env->_task_pool_mem_tracker_registry = new MemTrackerTaskPool();
|
||||
ThreadPoolBuilder("SendBatchThreadPool")
|
||||
.set_min_threads(1)
|
||||
.set_max_threads(5)
|
||||
.set_max_queue_size(100)
|
||||
.build(&_env->_send_batch_thread_pool);
|
||||
config::tablet_writer_open_rpc_timeout_sec = 60;
|
||||
config::max_send_batch_parallelism_per_job = 1;
|
||||
}
|
||||
|
||||
void TearDown() override {
|
||||
SAFE_DELETE(_env->_internal_client_cache);
|
||||
SAFE_DELETE(_env->_function_client_cache);
|
||||
SAFE_DELETE(_env->_load_stream_mgr);
|
||||
SAFE_DELETE(_env->_master_info);
|
||||
SAFE_DELETE(_env->_thread_mgr);
|
||||
SAFE_DELETE(_env->_task_pool_mem_tracker_registry);
|
||||
if (_server) {
|
||||
_server->Stop(100);
|
||||
_server->Join();
|
||||
SAFE_DELETE(_server);
|
||||
}
|
||||
}
|
||||
|
||||
void test_normal(int be_exec_version) {
|
||||
// start brpc service first
|
||||
_server = new brpc::Server();
|
||||
auto service = new VTestInternalService();
|
||||
ASSERT_EQ(_server->AddService(service, brpc::SERVER_OWNS_SERVICE), 0);
|
||||
brpc::ServerOptions options;
|
||||
{
|
||||
debug::ScopedLeakCheckDisabler disable_lsan;
|
||||
_server->Start(4356, &options);
|
||||
}
|
||||
|
||||
TUniqueId fragment_id;
|
||||
TQueryOptions query_options;
|
||||
query_options.batch_size = 1;
|
||||
query_options.be_exec_version = be_exec_version;
|
||||
RuntimeState state(fragment_id, query_options, TQueryGlobals(), _env);
|
||||
state.init_mem_trackers(TUniqueId());
|
||||
|
||||
ObjectPool obj_pool;
|
||||
TDescriptorTable tdesc_tbl;
|
||||
auto t_data_sink = get_data_sink(&tdesc_tbl);
|
||||
|
||||
// crate desc_tabl
|
||||
DescriptorTbl* desc_tbl = nullptr;
|
||||
auto st = DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl);
|
||||
ASSERT_TRUE(st.ok());
|
||||
state._desc_tbl = desc_tbl;
|
||||
|
||||
TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0);
|
||||
LOG(INFO) << "tuple_desc=" << tuple_desc->debug_string();
|
||||
|
||||
RowDescriptor row_desc(*desc_tbl, {0}, {false});
|
||||
service->_row_desc = &row_desc;
|
||||
std::set<std::string> output_set;
|
||||
service->_output_set = &output_set;
|
||||
|
||||
VOlapTableSink sink(&obj_pool, row_desc, {}, &st);
|
||||
ASSERT_TRUE(st.ok());
|
||||
|
||||
// init
|
||||
st = sink.init(t_data_sink);
|
||||
ASSERT_TRUE(st.ok());
|
||||
// prepare
|
||||
st = sink.prepare(&state);
|
||||
ASSERT_TRUE(st.ok());
|
||||
// open
|
||||
st = sink.open(&state);
|
||||
ASSERT_TRUE(st.ok());
|
||||
|
||||
int slot_count = tuple_desc->slots().size();
|
||||
std::vector<vectorized::MutableColumnPtr> columns(slot_count);
|
||||
for (int i = 0; i < slot_count; i++) {
|
||||
columns[i] = tuple_desc->slots()[i]->get_empty_mutable_column();
|
||||
}
|
||||
|
||||
int col_idx = 0;
|
||||
auto* column_ptr = columns[col_idx++].get();
|
||||
auto column_vector_int = column_ptr;
|
||||
int int_val = 12;
|
||||
column_vector_int->insert_data((const char*)&int_val, 0);
|
||||
int_val = 13;
|
||||
column_vector_int->insert_data((const char*)&int_val, 0);
|
||||
int_val = 14;
|
||||
column_vector_int->insert_data((const char*)&int_val, 0);
|
||||
|
||||
column_ptr = columns[col_idx++].get();
|
||||
auto column_vector_bigint = column_ptr;
|
||||
int64_t int64_val = 9;
|
||||
column_vector_bigint->insert_data((const char*)&int64_val, 0);
|
||||
int64_val = 25;
|
||||
column_vector_bigint->insert_data((const char*)&int64_val, 0);
|
||||
int64_val = 50;
|
||||
column_vector_bigint->insert_data((const char*)&int64_val, 0);
|
||||
|
||||
column_ptr = columns[col_idx++].get();
|
||||
auto column_vector_str = column_ptr;
|
||||
column_vector_str->insert_data("abc", 3);
|
||||
column_vector_str->insert_data("abcd", 4);
|
||||
column_vector_str->insert_data("abcde1234567890", 15);
|
||||
|
||||
vectorized::Block block;
|
||||
col_idx = 0;
|
||||
for (const auto slot_desc : tuple_desc->slots()) {
|
||||
block.insert(vectorized::ColumnWithTypeAndName(std::move(columns[col_idx++]),
|
||||
slot_desc->get_data_type_ptr(),
|
||||
slot_desc->col_name()));
|
||||
}
|
||||
|
||||
// send
|
||||
st = sink.send(&state, &block);
|
||||
ASSERT_TRUE(st.ok());
|
||||
// close
|
||||
st = sink.close(&state, Status::OK());
|
||||
ASSERT_TRUE(st.ok() || st.to_string() == "Internal error: wait close failed. ")
|
||||
<< st.to_string();
|
||||
|
||||
// each node has a eof
|
||||
ASSERT_EQ(2, service->_eof_counters);
|
||||
ASSERT_EQ(2 * 2, service->_row_counters);
|
||||
|
||||
// 2node * 2
|
||||
ASSERT_EQ(1, state.num_rows_load_filtered());
|
||||
}
|
||||
|
||||
private:
|
||||
ExecEnv* _env = nullptr;
|
||||
brpc::Server* _server = nullptr;
|
||||
};
|
||||
|
||||
TEST_F(VOlapTableSinkTest, normal) {
|
||||
// start brpc service first
|
||||
_server = new brpc::Server();
|
||||
auto service = new VTestInternalService();
|
||||
ASSERT_EQ(_server->AddService(service, brpc::SERVER_OWNS_SERVICE), 0);
|
||||
brpc::ServerOptions options;
|
||||
{
|
||||
debug::ScopedLeakCheckDisabler disable_lsan;
|
||||
_server->Start(4356, &options);
|
||||
}
|
||||
test_normal(1);
|
||||
}
|
||||
|
||||
TUniqueId fragment_id;
|
||||
TQueryOptions query_options;
|
||||
query_options.batch_size = 1;
|
||||
RuntimeState state(fragment_id, query_options, TQueryGlobals(), _env);
|
||||
state.init_mem_trackers(TUniqueId());
|
||||
|
||||
ObjectPool obj_pool;
|
||||
TDescriptorTable tdesc_tbl;
|
||||
auto t_data_sink = get_data_sink(&tdesc_tbl);
|
||||
|
||||
// crate desc_tabl
|
||||
DescriptorTbl* desc_tbl = nullptr;
|
||||
auto st = DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl);
|
||||
ASSERT_TRUE(st.ok());
|
||||
state._desc_tbl = desc_tbl;
|
||||
|
||||
TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0);
|
||||
LOG(INFO) << "tuple_desc=" << tuple_desc->debug_string();
|
||||
|
||||
RowDescriptor row_desc(*desc_tbl, {0}, {false});
|
||||
service->_row_desc = &row_desc;
|
||||
std::set<std::string> output_set;
|
||||
service->_output_set = &output_set;
|
||||
|
||||
VOlapTableSink sink(&obj_pool, row_desc, {}, &st);
|
||||
ASSERT_TRUE(st.ok());
|
||||
|
||||
// init
|
||||
st = sink.init(t_data_sink);
|
||||
ASSERT_TRUE(st.ok());
|
||||
// prepare
|
||||
st = sink.prepare(&state);
|
||||
ASSERT_TRUE(st.ok());
|
||||
// open
|
||||
st = sink.open(&state);
|
||||
ASSERT_TRUE(st.ok());
|
||||
|
||||
int slot_count = tuple_desc->slots().size();
|
||||
std::vector<vectorized::MutableColumnPtr> columns(slot_count);
|
||||
for (int i = 0; i < slot_count; i++) {
|
||||
columns[i] = tuple_desc->slots()[i]->get_empty_mutable_column();
|
||||
}
|
||||
|
||||
int col_idx = 0;
|
||||
auto* column_ptr = columns[col_idx++].get();
|
||||
auto column_vector_int = column_ptr;
|
||||
int int_val = 12;
|
||||
column_vector_int->insert_data((const char*)&int_val, 0);
|
||||
int_val = 13;
|
||||
column_vector_int->insert_data((const char*)&int_val, 0);
|
||||
int_val = 14;
|
||||
column_vector_int->insert_data((const char*)&int_val, 0);
|
||||
|
||||
column_ptr = columns[col_idx++].get();
|
||||
auto column_vector_bigint = column_ptr;
|
||||
int64_t int64_val = 9;
|
||||
column_vector_bigint->insert_data((const char*)&int64_val, 0);
|
||||
int64_val = 25;
|
||||
column_vector_bigint->insert_data((const char*)&int64_val, 0);
|
||||
int64_val = 50;
|
||||
column_vector_bigint->insert_data((const char*)&int64_val, 0);
|
||||
|
||||
column_ptr = columns[col_idx++].get();
|
||||
auto column_vector_str = column_ptr;
|
||||
column_vector_str->insert_data("abc", 3);
|
||||
column_vector_str->insert_data("abcd", 4);
|
||||
column_vector_str->insert_data("abcde1234567890", 15);
|
||||
|
||||
vectorized::Block block;
|
||||
col_idx = 0;
|
||||
for (const auto slot_desc : tuple_desc->slots()) {
|
||||
block.insert(vectorized::ColumnWithTypeAndName(std::move(columns[col_idx++]),
|
||||
slot_desc->get_data_type_ptr(),
|
||||
slot_desc->col_name()));
|
||||
}
|
||||
|
||||
// send
|
||||
st = sink.send(&state, &block);
|
||||
ASSERT_TRUE(st.ok());
|
||||
// close
|
||||
st = sink.close(&state, Status::OK());
|
||||
ASSERT_TRUE(st.ok() || st.to_string() == "Internal error: wait close failed. ")
|
||||
<< st.to_string();
|
||||
|
||||
// each node has a eof
|
||||
ASSERT_EQ(2, service->_eof_counters);
|
||||
ASSERT_EQ(2 * 2, service->_row_counters);
|
||||
|
||||
// 2node * 2
|
||||
ASSERT_EQ(1, state.num_rows_load_filtered());
|
||||
TEST_F(VOlapTableSinkTest, fallback) {
|
||||
test_normal(0);
|
||||
}
|
||||
|
||||
TEST_F(VOlapTableSinkTest, convert) {
|
||||
@ -277,6 +312,7 @@ TEST_F(VOlapTableSinkTest, convert) {
|
||||
TUniqueId fragment_id;
|
||||
TQueryOptions query_options;
|
||||
query_options.batch_size = 1024;
|
||||
query_options.be_exec_version = 1;
|
||||
RuntimeState state(fragment_id, query_options, TQueryGlobals(), _env);
|
||||
state.init_mem_trackers(TUniqueId());
|
||||
|
||||
@ -406,6 +442,7 @@ TEST_F(VOlapTableSinkTest, add_block_failed) {
|
||||
TUniqueId fragment_id;
|
||||
TQueryOptions query_options;
|
||||
query_options.batch_size = 1;
|
||||
query_options.be_exec_version = 1;
|
||||
RuntimeState state(fragment_id, query_options, TQueryGlobals(), _env);
|
||||
state.init_mem_trackers(TUniqueId());
|
||||
|
||||
@ -519,6 +556,7 @@ TEST_F(VOlapTableSinkTest, decimal) {
|
||||
TUniqueId fragment_id;
|
||||
TQueryOptions query_options;
|
||||
query_options.batch_size = 1;
|
||||
query_options.be_exec_version = 1;
|
||||
RuntimeState state(fragment_id, query_options, TQueryGlobals(), _env);
|
||||
state.init_mem_trackers(TUniqueId());
|
||||
|
||||
|
||||
Reference in New Issue
Block a user