diff --git a/be/src/exec/data_sink.h b/be/src/exec/data_sink.h index 299e1c5376..f558678308 100644 --- a/be/src/exec/data_sink.h +++ b/be/src/exec/data_sink.h @@ -32,7 +32,6 @@ namespace doris { class ObjectPool; -class RowBatch; class RuntimeProfile; class RuntimeState; class TPlanFragmentExecParams; diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp index 616bf9f5e0..ab9f52f4af 100644 --- a/be/src/exec/exec_node.cpp +++ b/be/src/exec/exec_node.cpp @@ -64,51 +64,6 @@ namespace doris { const std::string ExecNode::ROW_THROUGHPUT_COUNTER = "RowsReturnedRate"; -ExecNode::RowBatchQueue::RowBatchQueue(int max_batches) : BlockingQueue(max_batches) {} - -ExecNode::RowBatchQueue::~RowBatchQueue() { - DCHECK(cleanup_queue_.empty()); -} - -void ExecNode::RowBatchQueue::AddBatch(RowBatch* batch) { - if (!blocking_put(batch)) { - std::lock_guard lock(lock_); - cleanup_queue_.push_back(batch); - } -} - -bool ExecNode::RowBatchQueue::AddBatchWithTimeout(RowBatch* batch, int64_t timeout_micros) { - // return blocking_put_with_timeout(batch, timeout_micros); - return blocking_put(batch); -} - -RowBatch* ExecNode::RowBatchQueue::GetBatch() { - RowBatch* result = nullptr; - if (blocking_get(&result)) { - return result; - } - return nullptr; -} - -int ExecNode::RowBatchQueue::Cleanup() { - int num_io_buffers = 0; - - // RowBatch* batch = nullptr; - // while ((batch = GetBatch()) != nullptr) { - // num_io_buffers += batch->num_io_buffers(); - // delete batch; - // } - - std::lock_guard l(lock_); - for (std::list::iterator it = cleanup_queue_.begin(); it != cleanup_queue_.end(); - ++it) { - // num_io_buffers += (*it)->num_io_buffers(); - delete *it; - } - cleanup_queue_.clear(); - return num_io_buffers; -} - ExecNode::ExecNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) : _id(tnode.node_id), _type(tnode.node_type), diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h index f5af72ac61..ff95d96934 100644 --- a/be/src/exec/exec_node.h +++ b/be/src/exec/exec_node.h @@ -40,7 +40,6 @@ class Expr; class ExprContext; class ObjectPool; class Counters; -class RowBatch; class RuntimeState; class TPlan; class TupleRow; @@ -266,50 +265,6 @@ protected: /// Only use in vectorized exec engine try to do projections to trans _row_desc -> _output_row_desc Status do_projections(vectorized::Block* origin_block, vectorized::Block* output_block); - /// Extends blocking queue for row batches. Row batches have a property that - /// they must be processed in the order they were produced, even in cancellation - /// paths. Preceding row batches can contain ptrs to memory in subsequent row batches - /// and we need to make sure those ptrs stay valid. - /// Row batches that are added after Shutdown() are queued in another queue, which can - /// be cleaned up during Close(). - /// All functions are thread safe. - class RowBatchQueue : public BlockingQueue { - public: - /// max_batches is the maximum number of row batches that can be queued. - /// When the queue is full, producers will block. - RowBatchQueue(int max_batches); - ~RowBatchQueue(); - - /// Adds a batch to the queue. This is blocking if the queue is full. - void AddBatch(RowBatch* batch); - - /// Adds a batch to the queue. If the queue is full, this blocks until space becomes - /// available or 'timeout_micros' has elapsed. - /// Returns true if the element was added to the queue, false if it wasn't. If this - /// method returns false, the queue didn't take ownership of the batch and it must be - /// managed externally. - bool AddBatchWithTimeout(RowBatch* batch, int64_t timeout_micros); - - /// Gets a row batch from the queue. Returns nullptr if there are no more. - /// This function blocks. - /// Returns nullptr after Shutdown(). - RowBatch* GetBatch(); - - /// Deletes all row batches in cleanup_queue_. Not valid to call AddBatch() - /// after this is called. - /// Returns the number of io buffers that were released (for debug tracking) - int Cleanup(); - - private: - /// Lock protecting cleanup_queue_ - // SpinLock lock_; - // TODO(dhc): need to modify spinlock - std::mutex lock_; - - /// Queue of orphaned row batches - std::list cleanup_queue_; - }; - int _id; // unique w/in single plan tree TPlanNodeType::type _type; ObjectPool* _pool; diff --git a/be/src/exec/row_batch_list.h b/be/src/exec/row_batch_list.h deleted file mode 100644 index a81f0aae86..0000000000 --- a/be/src/exec/row_batch_list.h +++ /dev/null @@ -1,130 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. -// This file is copied from -// https://github.com/apache/impala/blob/branch-2.9.0/be/src/exec/row-batch-list.h -// and modified by Doris - -#pragma once - -#include -#include - -#include "runtime/row_batch.h" -#include "runtime/tuple_row.h" - -namespace doris { - -class TupleRow; -class RowDescriptor; -class MemPool; - -// A simple list structure for RowBatches that provides an interface for -// iterating over the TupleRows. -class RowBatchList { -public: - RowBatchList() : _total_num_rows(0) {} - virtual ~RowBatchList() {} - - // A simple iterator used to scan over all the rows stored in the list. - class TupleRowIterator { - public: - // Dummy constructor - TupleRowIterator() : _list(nullptr), _row_idx(0) {} - virtual ~TupleRowIterator() {} - - // Returns true if this iterator is at the end, i.e. get_row() cannot be called. - bool at_end() { return _batch_it == _list->_row_batches.end(); } - - // Returns the current row. Callers must check the iterator is not at_end() before - // calling get_row(). - TupleRow* get_row() { - DCHECK(!at_end()); - return (*_batch_it)->get_row(_row_idx); - } - - // Increments the iterator. No-op if the iterator is at the end. - void next() { - if (_batch_it == _list->_row_batches.end()) { - return; - } - - if (++_row_idx == (*_batch_it)->num_rows()) { - ++_batch_it; - _row_idx = 0; - } - } - - private: - friend class RowBatchList; - - TupleRowIterator(RowBatchList* list) - : _list(list), _batch_it(list->_row_batches.begin()), _row_idx(0) {} - - RowBatchList* _list; - std::vector::iterator _batch_it; - int64_t _row_idx; - }; - - // Add the 'row_batch' to the list. The RowBatch* and all of its resources are owned - // by the caller. - void add_row_batch(RowBatch* row_batch) { - if (row_batch->num_rows() == 0) { - return; - } - - _row_batches.push_back(row_batch); - _total_num_rows += row_batch->num_rows(); - } - - // Resets the list. - void reset() { - _row_batches.clear(); - _total_num_rows = 0; - } - - // Outputs a debug string containing the contents of the list. - std::string debug_string(const RowDescriptor& desc) { - std::stringstream out; - out << "RowBatchList("; - out << "num_rows=" << _total_num_rows << "; "; - RowBatchList::TupleRowIterator it = iterator(); - - while (!it.at_end()) { - out << " " << it.get_row()->to_string(desc); - it.next(); - } - - out << " )"; - return out.str(); - } - - // Returns the total number of rows in all row batches. - int64_t total_num_rows() { return _total_num_rows; } - - // Returns a new iterator over all the tuple rows. - TupleRowIterator iterator() { return TupleRowIterator(this); } - -private: - friend class TupleRowIterator; - - std::vector _row_batches; - - // Total number of rows - int64_t _total_num_rows; -}; - -} // namespace doris diff --git a/be/src/exec/table_connector.cpp b/be/src/exec/table_connector.cpp index fa725832d3..e342f9abd6 100644 --- a/be/src/exec/table_connector.cpp +++ b/be/src/exec/table_connector.cpp @@ -45,115 +45,6 @@ std::u16string TableConnector::utf8_to_u16string(const char* first, const char* return utf8_utf16_cvt.from_bytes(first, last); } -Status TableConnector::append(const std::string& table_name, RowBatch* batch, - const std::vector& output_expr_ctxs, - uint32_t start_send_row, uint32* num_rows_sent) { - _insert_stmt_buffer.clear(); - std::u16string insert_stmt; - { - SCOPED_TIMER(_convert_tuple_timer); - fmt::format_to(_insert_stmt_buffer, "INSERT INTO {} VALUES (", table_name); - - int num_rows = batch->num_rows(); - for (int i = start_send_row; i < num_rows; ++i) { - auto row = batch->get_row(i); - (*num_rows_sent)++; - - // Construct insert statement of odbc table - int num_columns = output_expr_ctxs.size(); - for (int j = 0; j < num_columns; ++j) { - if (j != 0) { - fmt::format_to(_insert_stmt_buffer, "{}", ", "); - } - void* item = output_expr_ctxs[j]->get_value(row); - if (item == nullptr) { - fmt::format_to(_insert_stmt_buffer, "{}", "NULL"); - continue; - } - switch (output_expr_ctxs[j]->root()->type().type) { - case TYPE_BOOLEAN: - case TYPE_TINYINT: - fmt::format_to(_insert_stmt_buffer, "{}", *static_cast(item)); - break; - case TYPE_SMALLINT: - fmt::format_to(_insert_stmt_buffer, "{}", *static_cast(item)); - break; - case TYPE_INT: - fmt::format_to(_insert_stmt_buffer, "{}", *static_cast(item)); - break; - case TYPE_BIGINT: - fmt::format_to(_insert_stmt_buffer, "{}", *static_cast(item)); - break; - case TYPE_FLOAT: - fmt::format_to(_insert_stmt_buffer, "{}", *static_cast(item)); - break; - case TYPE_DOUBLE: - fmt::format_to(_insert_stmt_buffer, "{}", *static_cast(item)); - break; - case TYPE_DATE: - case TYPE_DATETIME: { - char buf[64]; - const auto* time_val = (const DateTimeValue*)(item); - time_val->to_string(buf); - fmt::format_to(_insert_stmt_buffer, "'{}'", buf); - break; - } - case TYPE_VARCHAR: - case TYPE_CHAR: - case TYPE_STRING: { - const auto* string_val = (const StringValue*)(item); - - if (string_val->ptr == nullptr) { - if (string_val->len == 0) { - fmt::format_to(_insert_stmt_buffer, "{}", "''"); - } else { - fmt::format_to(_insert_stmt_buffer, "{}", "NULL"); - } - } else { - fmt::format_to(_insert_stmt_buffer, "'{}'", - fmt::basic_string_view(string_val->ptr, string_val->len)); - } - break; - } - case TYPE_DECIMALV2: { - const DecimalV2Value decimal_val( - reinterpret_cast(item)->value); - char buffer[MAX_DECIMAL_WIDTH]; - int output_scale = output_expr_ctxs[j]->root()->output_scale(); - int len = decimal_val.to_buffer(buffer, output_scale); - _insert_stmt_buffer.append(buffer, buffer + len); - break; - } - case TYPE_LARGEINT: { - fmt::format_to(_insert_stmt_buffer, "{}", - reinterpret_cast(item)->value); - break; - } - default: { - return Status::InternalError("can't convert this type to mysql type. type = {}", - output_expr_ctxs[j]->root()->type().type); - } - } - } - - if (i < num_rows - 1 && _insert_stmt_buffer.size() < INSERT_BUFFER_SIZE) { - fmt::format_to(_insert_stmt_buffer, "{}", "),("); - } else { - // batch exhausted or _insert_stmt_buffer is full, need to do real insert stmt - fmt::format_to(_insert_stmt_buffer, "{}", ")"); - break; - } - } - // Translate utf8 string to utf16 to use unicode encoding - insert_stmt = utf8_to_u16string(_insert_stmt_buffer.data(), - _insert_stmt_buffer.data() + _insert_stmt_buffer.size()); - } - - RETURN_IF_ERROR(exec_write_sql(insert_stmt, _insert_stmt_buffer)); - COUNTER_UPDATE(_sent_rows_counter, *num_rows_sent); - return Status::OK(); -} - Status TableConnector::append(const std::string& table_name, vectorized::Block* block, const std::vector& output_vexpr_ctxs, uint32_t start_send_row, uint32_t* num_rows_sent, diff --git a/be/src/exec/table_connector.h b/be/src/exec/table_connector.h index 3fa9f5f5b1..a6077b3227 100644 --- a/be/src/exec/table_connector.h +++ b/be/src/exec/table_connector.h @@ -49,10 +49,6 @@ public: virtual Status exec_write_sql(const std::u16string& insert_stmt, const fmt::memory_buffer& _insert_stmt_buffer) = 0; - //write data into table row batch - Status append(const std::string& table_name, RowBatch* batch, - const std::vector& _output_expr_ctxs, uint32_t start_send_row, - uint32_t* num_rows_sent); //write data into table vectorized Status append(const std::string& table_name, vectorized::Block* block, diff --git a/be/src/exec/tablet_info.h b/be/src/exec/tablet_info.h index 247cd1a4f1..0b6fd00dbb 100644 --- a/be/src/exec/tablet_info.h +++ b/be/src/exec/tablet_info.h @@ -36,7 +36,6 @@ namespace doris { class MemPool; -class RowBatch; struct OlapTableIndexSchema { int64_t index_id; diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index 2a0fded006..aa05bed875 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -183,32 +183,6 @@ Status DeltaWriter::write(Tuple* tuple) { return Status::OK(); } -Status DeltaWriter::write(const RowBatch* row_batch, const std::vector& row_idxs) { - std::lock_guard l(_lock); - if (!_is_init && !_is_cancelled) { - RETURN_NOT_OK(init()); - } - - if (_is_cancelled) { - return _cancel_status; - } - - _total_received_rows += row_idxs.size(); - for (const auto& row_idx : row_idxs) { - _mem_table->insert(row_batch->get_row(row_idx)->get_tuple(0)); - } - - if (_mem_table->memory_usage() >= config::write_buffer_size) { - auto s = _flush_memtable_async(); - _reset_mem_table(); - if (OLAP_UNLIKELY(!s.ok())) { - return s; - } - } - - return Status::OK(); -} - Status DeltaWriter::write(const vectorized::Block* block, const std::vector& row_idxs) { if (UNLIKELY(row_idxs.empty())) { return Status::OK(); diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h index dafd77a8f8..eac3ea75cc 100644 --- a/be/src/olap/delta_writer.h +++ b/be/src/olap/delta_writer.h @@ -27,7 +27,6 @@ namespace doris { class FlushToken; class MemTable; class MemTracker; -class RowBatch; class Schema; class StorageEngine; class Tuple; @@ -64,7 +63,6 @@ public: Status init(); Status write(Tuple* tuple); - Status write(const RowBatch* row_batch, const std::vector& row_idxs); Status write(const vectorized::Block* block, const std::vector& row_idxs); // flush the last memtable to flush queue, must call it before close_wait() diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 0beccbdf9c..134df9f221 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -1010,7 +1010,6 @@ Status FragmentMgr::exec_external_plan_fragment(const TScanOpenParams& params, per_node_scan_ranges.insert(std::make_pair((::doris::TPlanNodeId)0, scan_ranges)); fragment_exec_params.per_node_scan_ranges = per_node_scan_ranges; exec_fragment_params.__set_params(fragment_exec_params); - // batch_size for one RowBatch TQueryOptions query_options; query_options.batch_size = params.batch_size; query_options.query_timeout = params.query_timeout; diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h index 9d8e5f2f33..3581363517 100644 --- a/be/src/runtime/load_channel.h +++ b/be/src/runtime/load_channel.h @@ -153,15 +153,9 @@ Status LoadChannel::add_batch(const TabletWriterAddRequest& request, return st; } - // 2. add batch to tablets channel - if constexpr (std::is_same_v) { - if (request.has_row_batch()) { - RETURN_IF_ERROR(channel->add_batch(request, response)); - } - } else { - if (request.has_block()) { - RETURN_IF_ERROR(channel->add_batch(request, response)); - } + // 2. add block to tablets channel + if (request.has_block()) { + RETURN_IF_ERROR(channel->add_batch(request, response)); } // 3. handle eos diff --git a/be/src/runtime/plan_fragment_executor.h b/be/src/runtime/plan_fragment_executor.h index 013c56471f..22bbe785b2 100644 --- a/be/src/runtime/plan_fragment_executor.h +++ b/be/src/runtime/plan_fragment_executor.h @@ -36,7 +36,6 @@ namespace doris { class QueryFragmentsCtx; class ExecNode; class RowDescriptor; -class RowBatch; class DataSink; class DataStreamMgr; class RuntimeProfile; diff --git a/be/src/runtime/result_writer.h b/be/src/runtime/result_writer.h index a77956c0c4..4a14a66637 100644 --- a/be/src/runtime/result_writer.h +++ b/be/src/runtime/result_writer.h @@ -23,7 +23,6 @@ namespace doris { class Status; -class RowBatch; class RuntimeState; struct TypeDescriptor; diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp index 6fdc42f2c4..7856864df4 100644 --- a/be/src/runtime/tablets_channel.cpp +++ b/be/src/runtime/tablets_channel.cpp @@ -466,13 +466,7 @@ Status TabletsChannel::add_batch(const TabletWriterAddRequest& request, } } - auto get_send_data = [&]() { - if constexpr (std::is_same_v) { - return RowBatch(*_row_desc, request.row_batch()); - } else { - return vectorized::Block(request.block()); - } - }; + auto get_send_data = [&]() { return vectorized::Block(request.block()); }; auto send_data = get_send_data(); google::protobuf::RepeatedPtrField* tablet_errors = @@ -507,9 +501,6 @@ Status TabletsChannel::add_batch(const TabletWriterAddRequest& request, return Status::OK(); } -template Status -TabletsChannel::add_batch( - PTabletWriterAddBatchRequest const&, PTabletWriterAddBatchResult*); template Status TabletsChannel::add_batch( PTabletWriterAddBlockRequest const&, PTabletWriterAddBlockResult*); diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index e01b83209e..af956b58ca 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -231,66 +231,6 @@ void PInternalServiceImpl::_tablet_writer_add_block(google::protobuf::RpcControl }); } -void PInternalServiceImpl::tablet_writer_add_batch(google::protobuf::RpcController* cntl_base, - const PTabletWriterAddBatchRequest* request, - PTabletWriterAddBatchResult* response, - google::protobuf::Closure* done) { - google::protobuf::Closure* new_done = new NewHttpClosure(done); - _tablet_writer_add_batch(cntl_base, request, response, new_done); -} - -void PInternalServiceImpl::tablet_writer_add_batch_by_http( - google::protobuf::RpcController* cntl_base, const ::doris::PEmptyRequest* request, - PTabletWriterAddBatchResult* response, google::protobuf::Closure* done) { - PTabletWriterAddBatchRequest* new_request = new PTabletWriterAddBatchRequest(); - google::protobuf::Closure* new_done = - new NewHttpClosure(new_request, done); - brpc::Controller* cntl = static_cast(cntl_base); - Status st = attachment_extract_request_contain_tuple(new_request, - cntl); - if (st.ok()) { - _tablet_writer_add_batch(cntl_base, new_request, response, new_done); - } else { - st.to_protobuf(response->mutable_status()); - } -} - -void PInternalServiceImpl::_tablet_writer_add_batch(google::protobuf::RpcController* cntl_base, - const PTabletWriterAddBatchRequest* request, - PTabletWriterAddBatchResult* response, - google::protobuf::Closure* done) { - VLOG_RPC << "tablet writer add batch, id=" << request->id() - << ", index_id=" << request->index_id() << ", sender_id=" << request->sender_id() - << ", current_queued_size=" << _tablet_worker_pool.get_queue_size(); - // add batch maybe cost a lot of time, and this callback thread will be held. - // this will influence query execution, because the pthreads under bthread may be - // exhausted, so we put this to a local thread pool to process - int64_t submit_task_time_ns = MonotonicNanos(); - _tablet_worker_pool.offer([cntl_base, request, response, done, submit_task_time_ns, this]() { - int64_t wait_execution_time_ns = MonotonicNanos() - submit_task_time_ns; - brpc::ClosureGuard closure_guard(done); - int64_t execution_time_ns = 0; - { - SCOPED_RAW_TIMER(&execution_time_ns); - - // TODO(zxy) delete in 1.2 version - brpc::Controller* cntl = static_cast(cntl_base); - attachment_transfer_request_row_batch(request, cntl); - - auto st = _exec_env->load_channel_mgr()->add_batch(*request, response); - if (!st.ok()) { - LOG(WARNING) << "tablet writer add batch failed, message=" << st - << ", id=" << request->id() << ", index_id=" << request->index_id() - << ", sender_id=" << request->sender_id() - << ", backend id=" << request->backend_id(); - } - st.to_protobuf(response->mutable_status()); - } - response->set_execution_time_us(execution_time_ns / NANOS_PER_MICRO); - response->set_wait_execution_time_us(wait_execution_time_ns / NANOS_PER_MICRO); - }); -} - void PInternalServiceImpl::tablet_writer_cancel(google::protobuf::RpcController* controller, const PTabletWriterCancelRequest* request, PTabletWriterCancelResult* response, diff --git a/be/src/service/internal_service.h b/be/src/service/internal_service.h index 3ea3655974..9b2a4db254 100644 --- a/be/src/service/internal_service.h +++ b/be/src/service/internal_service.h @@ -78,16 +78,6 @@ public: PTabletWriterOpenResult* response, google::protobuf::Closure* done) override; - void tablet_writer_add_batch(google::protobuf::RpcController* controller, - const PTabletWriterAddBatchRequest* request, - PTabletWriterAddBatchResult* response, - google::protobuf::Closure* done) override; - - void tablet_writer_add_batch_by_http(google::protobuf::RpcController* controller, - const ::doris::PEmptyRequest* request, - PTabletWriterAddBatchResult* response, - google::protobuf::Closure* done) override; - void tablet_writer_add_block(google::protobuf::RpcController* controller, const PTabletWriterAddBlockRequest* request, PTabletWriterAddBlockResult* response, @@ -178,11 +168,6 @@ private: ::doris::PTransmitDataResult* response, ::google::protobuf::Closure* done, const Status& extract_st); - void _tablet_writer_add_batch(google::protobuf::RpcController* controller, - const PTabletWriterAddBatchRequest* request, - PTabletWriterAddBatchResult* response, - google::protobuf::Closure* done); - void _tablet_writer_add_block(google::protobuf::RpcController* controller, const PTabletWriterAddBlockRequest* request, PTabletWriterAddBlockResult* response, diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp index 0f4fe065f2..571a391eb0 100644 --- a/be/src/vec/core/block.cpp +++ b/be/src/vec/core/block.cpp @@ -793,46 +793,6 @@ Status Block::serialize(int be_exec_version, PBlock* pblock, return Status::OK(); } -void Block::serialize(RowBatch* output_batch, const RowDescriptor& row_desc) { - auto num_rows = rows(); - auto mem_pool = output_batch->tuple_data_pool(); - - for (int i = 0; i < num_rows; ++i) { - auto tuple_row = output_batch->get_row(i); - const auto& tuple_descs = row_desc.tuple_descriptors(); - auto column_offset = 0; - - for (int j = 0; j < tuple_descs.size(); ++j) { - auto tuple_desc = tuple_descs[j]; - tuple_row->set_tuple(j, deep_copy_tuple(*tuple_desc, mem_pool, i, column_offset)); - column_offset += tuple_desc->slots().size(); - } - output_batch->commit_last_row(); - } -} - -doris::Tuple* Block::deep_copy_tuple(const doris::TupleDescriptor& desc, MemPool* pool, int row, - int column_offset, bool padding_char) { - auto dst = reinterpret_cast(pool->allocate(desc.byte_size())); - - for (int i = 0; i < desc.slots().size(); ++i) { - auto slot_desc = desc.slots()[i]; - auto& type_desc = slot_desc->type(); - const auto& column = get_by_position(column_offset + i).column; - const auto& data_ref = - type_desc.type != TYPE_ARRAY ? column->get_data_at(row) : StringRef(); - bool is_null = is_column_data_null(slot_desc->type(), data_ref, column, row); - if (is_null) { - dst->set_null(slot_desc->null_indicator_offset()); - } else { - dst->set_not_null(slot_desc->null_indicator_offset()); - deep_copy_slot(dst->get_slot(slot_desc->tuple_offset()), pool, type_desc, data_ref, - column.get(), row, padding_char); - } - } - return dst; -} - inline bool Block::is_column_data_null(const doris::TypeDescriptor& type_desc, const StringRef& data_ref, const IColumn* column, int row) { if (type_desc.type != TYPE_ARRAY) { diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h index 6b7cc9d5a1..803fb82f97 100644 --- a/be/src/vec/core/block.h +++ b/be/src/vec/core/block.h @@ -40,7 +40,6 @@ namespace doris { class MemPool; -class RowBatch; class RowDescriptor; class Status; class Tuple; @@ -280,9 +279,6 @@ public: size_t* compressed_bytes, segment_v2::CompressionTypePB compression_type, bool allow_transfer_large_data = false) const; - // serialize block to PRowbatch - void serialize(RowBatch*, const RowDescriptor&); - std::unique_ptr create_same_struct_block(size_t size) const; /** Compares (*this) n-th row and rhs m-th row. @@ -346,9 +342,6 @@ public: return res; } - doris::Tuple* deep_copy_tuple(const TupleDescriptor&, MemPool*, int, int, - bool padding_char = false); - // for String type or Array type void shrink_char_type_column_suffix_zero(const std::vector& char_type_idx); diff --git a/be/src/vec/exec/vbroker_scan_node.cpp b/be/src/vec/exec/vbroker_scan_node.cpp index e79d3ce104..ac96f97965 100644 --- a/be/src/vec/exec/vbroker_scan_node.cpp +++ b/be/src/vec/exec/vbroker_scan_node.cpp @@ -211,8 +211,6 @@ Status VBrokerScanNode::close(RuntimeState* state) { _scanner_threads[i].join(); } - // Close - _batch_queue.clear(); return ExecNode::close(state); } diff --git a/be/src/vec/exec/vbroker_scan_node.h b/be/src/vec/exec/vbroker_scan_node.h index 9c5e436b19..de46104088 100644 --- a/be/src/vec/exec/vbroker_scan_node.h +++ b/be/src/vec/exec/vbroker_scan_node.h @@ -88,7 +88,6 @@ private: std::mutex _batch_queue_lock; std::condition_variable _queue_reader_cond; std::condition_variable _queue_writer_cond; - std::deque> _batch_queue; int _num_running_scanners; diff --git a/be/src/vec/exec/vtable_function_node.h b/be/src/vec/exec/vtable_function_node.h index 85eccc047a..e28a700f46 100644 --- a/be/src/vec/exec/vtable_function_node.h +++ b/be/src/vec/exec/vtable_function_node.h @@ -110,7 +110,6 @@ private: std::vector _child_slots; std::vector _output_slots; int64_t _cur_child_offset = 0; - std::shared_ptr _cur_child_batch; std::vector _fn_ctxs; std::vector _vfn_ctxs; diff --git a/be/src/vec/runtime/vsorted_run_merger.h b/be/src/vec/runtime/vsorted_run_merger.h index 974b2f6096..24a9ce7602 100644 --- a/be/src/vec/runtime/vsorted_run_merger.h +++ b/be/src/vec/runtime/vsorted_run_merger.h @@ -23,7 +23,6 @@ namespace doris { -class RowBatch; class RuntimeProfile; namespace vectorized { @@ -54,11 +53,6 @@ public: // Return the next block of sorted rows from this merger. Status get_next(Block* output_block, bool* eos); - // Do not support now - virtual Status get_batch(RowBatch** output_batch) { - return Status::InternalError("no support method get_batch(RowBatch** output_batch)"); - } - protected: const std::vector& _ordering_expr; const std::vector& _is_asc_order; diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index 69cd1ecc9b..2163d26c4b 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -35,7 +35,6 @@ namespace doris { class ObjectPool; -class RowBatch; class RuntimeState; class RuntimeProfile; class BufferControlBlock; diff --git a/be/src/vec/sink/vmysql_result_writer.h b/be/src/vec/sink/vmysql_result_writer.h index e566a30213..3f79f0e2d6 100644 --- a/be/src/vec/sink/vmysql_result_writer.h +++ b/be/src/vec/sink/vmysql_result_writer.h @@ -24,7 +24,6 @@ namespace doris { class BufferControlBlock; -class RowBatch; class MysqlRowBuffer; class TFetchDataResult; diff --git a/be/src/vec/sink/vresult_sink.h b/be/src/vec/sink/vresult_sink.h index 63441e3179..66fb675def 100644 --- a/be/src/vec/sink/vresult_sink.h +++ b/be/src/vec/sink/vresult_sink.h @@ -21,7 +21,6 @@ namespace doris { class ObjectPool; -class RowBatch; class RuntimeState; class RuntimeProfile; class BufferControlBlock; diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp index 13aa7661ea..f042ce50b4 100644 --- a/be/src/vec/sink/vtablet_sink.cpp +++ b/be/src/vec/sink/vtablet_sink.cpp @@ -1247,7 +1247,6 @@ Status VOlapTableSink::close(RuntimeState* state, Status exec_status) { } Expr::close(_output_expr_ctxs, state); - _output_batch.reset(); _close_status = status; DataSink::close(state, exec_status); diff --git a/be/src/vec/sink/vtablet_sink.h b/be/src/vec/sink/vtablet_sink.h index 097bb1e6b7..1c0a1be8d1 100644 --- a/be/src/vec/sink/vtablet_sink.h +++ b/be/src/vec/sink/vtablet_sink.h @@ -303,12 +303,6 @@ protected: // rows number received per tablet, tablet_id -> rows_num std::vector> _tablets_received_rows; - std::unique_ptr _cur_batch; - PTabletWriterAddBatchRequest _cur_add_batch_request; - using AddBatchReq = std::pair, PTabletWriterAddBatchRequest>; - std::queue _pending_batches; - ReusableClosure* _add_batch_closure = nullptr; - std::unique_ptr _cur_mutable_block; PTabletWriterAddBlockRequest _cur_add_block_request; @@ -543,7 +537,6 @@ private: OlapTablePartitionParam* _partition = nullptr; std::vector _output_expr_ctxs; - std::unique_ptr _output_batch; VOlapTablePartitionParam* _vpartition = nullptr; std::vector _output_vexpr_ctxs; diff --git a/be/test/CMakeLists.txt b/be/test/CMakeLists.txt index f4a218b020..ce33cd1fda 100644 --- a/be/test/CMakeLists.txt +++ b/be/test/CMakeLists.txt @@ -151,8 +151,6 @@ set(OLAP_TEST_FILES set(RUNTIME_TEST_FILES # runtime/buffer_control_block_test.cpp # runtime/result_buffer_mgr_test.cpp - # runtime/result_sink_test.cpp - # runtime/data_stream_test.cpp # runtime/parallel_executor_test.cpp # runtime/datetime_value_test.cpp # runtime/dpp_sink_internal_test.cpp diff --git a/be/test/runtime/data_stream_test.cpp b/be/test/runtime/data_stream_test.cpp deleted file mode 100644 index e3b6695dfa..0000000000 --- a/be/test/runtime/data_stream_test.cpp +++ /dev/null @@ -1,667 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include - -#include -#include - -#include "common/status.h" -#include "exprs/slot_ref.h" -#include "gen_cpp/BackendService.h" -#include "gen_cpp/Descriptors_types.h" -#include "gen_cpp/Types_types.h" -#include "runtime/client_cache.h" -#include "runtime/data_stream_mgr.h" -#include "runtime/data_stream_recvr.h" -#include "runtime/data_stream_sender.h" -#include "runtime/descriptors.h" -#include "runtime/raw_value.h" -#include "runtime/row_batch.h" -#include "runtime/runtime_state.h" -#include "util/cpu_info.h" -#include "util/debug_util.h" -#include "util/disk_info.h" -#include "util/mem_info.h" -#include "util/thrift_server.h" - -using std::string; -using std::vector; -using std::multiset; - -using std::unique_ptr; -using std::thread; - -namespace doris { - -class DorisTestBackend : public BackendServiceIf { -public: - DorisTestBackend(DataStreamMgr* stream_mgr) : _mgr(stream_mgr) {} - virtual ~DorisTestBackend() {} - - virtual void exec_plan_fragment(TExecPlanFragmentResult& return_val, - const TExecPlanFragmentParams& params) {} - - virtual void cancel_plan_fragment(TCancelPlanFragmentResult& return_val, - const TCancelPlanFragmentParams& params) {} - - virtual void transmit_data(TTransmitDataResult& return_val, const TTransmitDataParams& params) { - /* - LOG(ERROR) << "transmit_data(): instance_id=" << params.dest_fragment_instance_id - << " node_id=" << params.dest_node_id - << " #rows=" << params.row_batch.num_rows - << " eos=" << (params.eos ? "true" : "false"); - if (!params.eos) { - _mgr->add_data( - params.dest_fragment_instance_id, - params.dest_node_id, - params.row_batch, - params.sender_id).set_t_status(&return_val); - } else { - Status status = _mgr->close_sender( - params.dest_fragment_instance_id, params.dest_node_id, params.sender_id, params.be_number); - status.set_t_status(&return_val); - LOG(ERROR) << "close_sender status: " << status; - } - */ - } - - virtual void fetch_data(TFetchDataResult& return_val, const TFetchDataParams& params) {} - - virtual void submit_tasks(TAgentResult& return_val, - const std::vector& tasks) {} - - virtual void make_snapshot(TAgentResult& return_val, const TSnapshotRequest& snapshot_request) { - } - - virtual void release_snapshot(TAgentResult& return_val, const std::string& snapshot_path) {} - - virtual void publish_cluster_state(TAgentResult& return_val, - const TAgentPublishRequest& request) {} - - virtual void register_pull_load_task(TStatus& _return, const TUniqueId& id, - const int32_t num_senders) {} - - virtual void deregister_pull_load_task(TStatus& _return, const TUniqueId& id) {} - - virtual void report_pull_load_sub_task_info(TStatus& _return, - const TPullLoadSubTaskInfo& task_info) {} - - virtual void fetch_pull_load_task_info(TFetchPullLoadTaskInfoResult& _return, - const TUniqueId& id) {} - - virtual void fetch_all_pull_load_task_infos(TFetchAllPullLoadTaskInfosResult& _return) {} - -private: - DataStreamMgr* _mgr; -}; - -class DataStreamTest : public testing::Test { -protected: - DataStreamTest() : _runtime_state(TUniqueId(), TQueryOptions(), "", &_exec_env), _next_val(0) { - _exec_env.init_for_tests(); - _runtime_state.init_mem_trackers(TUniqueId()); - } - // null dtor to pass codestyle check - ~DataStreamTest() {} - - virtual void SetUp() { - create_row_desc(); - create_tuple_comparator(); - create_row_batch(); - - _next_instance_id.lo = 0; - _next_instance_id.hi = 0; - _stream_mgr = new DataStreamMgr(); - - _broadcast_sink.dest_node_id = DEST_NODE_ID; - _broadcast_sink.output_partition.type = TPartitionType::UNPARTITIONED; - - _random_sink.dest_node_id = DEST_NODE_ID; - _random_sink.output_partition.type = TPartitionType::RANDOM; - - _hash_sink.dest_node_id = DEST_NODE_ID; - _hash_sink.output_partition.type = TPartitionType::HASH_PARTITIONED; - // there's only one column to partition on - TExprNode expr_node; - expr_node.node_type = TExprNodeType::SLOT_REF; - expr_node.type.types.push_back(TTypeNode()); - expr_node.type.types.back().__isset.scalar_type = true; - expr_node.type.types.back().scalar_type.type = TPrimitiveType::BIGINT; - expr_node.num_children = 0; - TSlotRef slot_ref; - slot_ref.slot_id = 0; - expr_node.__set_slot_ref(slot_ref); - TExpr expr; - expr.nodes.push_back(expr_node); - _hash_sink.output_partition.__isset.partition_exprs = true; - _hash_sink.output_partition.partition_exprs.push_back(expr); - - // Ensure that individual sender info addresses don't change - _sender_info.reserve(MAX_SENDERS); - _receiver_info.reserve(MAX_RECEIVERS); - start_backend(); - } - - const TDataStreamSink& get_sink(TPartitionType::type partition_type) { - switch (partition_type) { - case TPartitionType::UNPARTITIONED: - return _broadcast_sink; - case TPartitionType::RANDOM: - return _random_sink; - case TPartitionType::HASH_PARTITIONED: - return _hash_sink; - default: - DCHECK(false) << "Unhandled sink type: " << partition_type; - } - // Should never reach this. - return _broadcast_sink; - } - - virtual void TearDown() { - _lhs_slot_ctx->close(nullptr); - _rhs_slot_ctx->close(nullptr); - _exec_env.client_cache()->test_shutdown(); - stop_backend(); - } - - void reset() { - _sender_info.clear(); - _receiver_info.clear(); - _dest.clear(); - } - - // We reserve contiguous memory for senders in SetUp. If a test uses more - // senders, a DCHECK will fail and you should increase this value. - static const int MAX_SENDERS = 16; - static const int MAX_RECEIVERS = 16; - static const PlanNodeId DEST_NODE_ID = 1; - static const int BATCH_CAPACITY = 100; // rows - static const int PER_ROW_DATA = 8; - static const int TOTAL_DATA_SIZE = 8 * 1024; - static const int NUM_BATCHES = TOTAL_DATA_SIZE / BATCH_CAPACITY / PER_ROW_DATA; - - ObjectPool _obj_pool; - DescriptorTbl* _desc_tbl; - const RowDescriptor* _row_desc; - TupleRowComparator* _less_than; - ExecEnv _exec_env; - RuntimeState _runtime_state; - TUniqueId _next_instance_id; - string _stmt; - - // RowBatch generation - std::unique_ptr _batch; - int _next_val; - int64_t* _tuple_mem; - - // receiving node - DataStreamMgr* _stream_mgr; - ThriftServer* _server; - - // sending node(s) - TDataStreamSink _broadcast_sink; - TDataStreamSink _random_sink; - TDataStreamSink _hash_sink; - std::vector _dest; - - struct SenderInfo { - thread* thread_handle; - Status status; - int num_bytes_sent; - - SenderInfo() : thread_handle(nullptr), num_bytes_sent(0) {} - }; - std::vector _sender_info; - - struct ReceiverInfo { - TPartitionType::type stream_type; - int num_senders; - int receiver_num; - - thread* thread_handle; - std::shared_ptr stream_recvr; - Status status; - int num_rows_received; - multiset data_values; - - ReceiverInfo(TPartitionType::type stream_type, int num_senders, int receiver_num) - : stream_type(stream_type), - num_senders(num_senders), - receiver_num(receiver_num), - thread_handle(nullptr), - stream_recvr(nullptr), - num_rows_received(0) {} - - ~ReceiverInfo() { - delete thread_handle; - stream_recvr.reset(); - } - }; - std::vector _receiver_info; - - // Create an instance id and add it to _dest - void get_next_instance_id(TUniqueId* instance_id) { - _dest.push_back(TPlanFragmentDestination()); - TPlanFragmentDestination& dest = _dest.back(); - dest.fragment_instance_id = _next_instance_id; - dest.server.hostname = "127.0.0.1"; - dest.server.port = config::port; - *instance_id = _next_instance_id; - ++_next_instance_id.lo; - } - - // RowDescriptor to mimic "select bigint_col from alltypesagg", except the slot - // isn't nullable - void create_row_desc() { - // create DescriptorTbl - TTupleDescriptor tuple_desc; - tuple_desc.__set_id(0); - tuple_desc.__set_byteSize(8); - tuple_desc.__set_numNullBytes(0); - TDescriptorTable thrift_desc_tbl; - thrift_desc_tbl.tupleDescriptors.push_back(tuple_desc); - TSlotDescriptor slot_desc; - slot_desc.__set_id(0); - slot_desc.__set_parent(0); - - slot_desc.slotType.types.push_back(TTypeNode()); - slot_desc.slotType.types.back().__isset.scalar_type = true; - slot_desc.slotType.types.back().scalar_type.type = TPrimitiveType::BIGINT; - - slot_desc.__set_columnPos(0); - slot_desc.__set_byteOffset(0); - slot_desc.__set_nullIndicatorByte(0); - slot_desc.__set_nullIndicatorBit(-1); - slot_desc.__set_slotIdx(0); - slot_desc.__set_isMaterialized(true); - thrift_desc_tbl.slotDescriptors.push_back(slot_desc); - EXPECT_TRUE(DescriptorTbl::create(&_obj_pool, thrift_desc_tbl, &_desc_tbl).ok()); - _runtime_state.set_desc_tbl(_desc_tbl); - - std::vector row_tids; - row_tids.push_back(0); - - std::vector nullable_tuples; - nullable_tuples.push_back(false); - _row_desc = _obj_pool.add(new RowDescriptor(*_desc_tbl, row_tids, nullable_tuples)); - } - - // Create a tuple comparator to sort in ascending order on the single bigint column. - void create_tuple_comparator() { - TExprNode expr_node; - expr_node.node_type = TExprNodeType::SLOT_REF; - expr_node.type.types.push_back(TTypeNode()); - expr_node.type.types.back().__isset.scalar_type = true; - expr_node.type.types.back().scalar_type.type = TPrimitiveType::BIGINT; - expr_node.num_children = 0; - TSlotRef slot_ref; - slot_ref.slot_id = 0; - expr_node.__set_slot_ref(slot_ref); - - SlotRef* lhs_slot = _obj_pool.add(new SlotRef(expr_node)); - _lhs_slot_ctx = _obj_pool.add(new ExprContext(lhs_slot)); - SlotRef* rhs_slot = _obj_pool.add(new SlotRef(expr_node)); - _rhs_slot_ctx = _obj_pool.add(new ExprContext(rhs_slot)); - - _lhs_slot_ctx->prepare(&_runtime_state, *_row_desc); - _rhs_slot_ctx->prepare(&_runtime_state, *_row_desc); - _lhs_slot_ctx->open(nullptr); - _rhs_slot_ctx->open(nullptr); - SortExecExprs* sort_exprs = _obj_pool.add(new SortExecExprs()); - sort_exprs->init(vector(1, _lhs_slot_ctx), - std::vector(1, _rhs_slot_ctx)); - _less_than = _obj_pool.add(new TupleRowComparator(*sort_exprs, std::vector(1, true), - std::vector(1, false))); - } - - // Create _batch, but don't fill it with data yet. Assumes we created _row_desc. - RowBatch* create_row_batch() { - RowBatch* batch = new RowBatch(*_row_desc, BATCH_CAPACITY); - int64_t* tuple_mem = - reinterpret_cast(batch->tuple_data_pool()->allocate(BATCH_CAPACITY * 8)); - bzero(tuple_mem, BATCH_CAPACITY * 8); - - for (int i = 0; i < BATCH_CAPACITY; ++i) { - int idx = batch->add_row(); - TupleRow* row = batch->get_row(idx); - row->set_tuple(0, reinterpret_cast(&tuple_mem[i])); - batch->commit_last_row(); - } - - return batch; - } - - void get_next_batch(RowBatch* batch, int* next_val) { - LOG(INFO) << "batch_capacity=" << BATCH_CAPACITY << " next_val=" << *next_val; - for (int i = 0; i < BATCH_CAPACITY; ++i) { - TupleRow* row = batch->get_row(i); - int64_t* val = reinterpret_cast(row->get_tuple(0)->get_slot(0)); - *val = (*next_val)++; - } - } - - // Start receiver (expecting given number of senders) in separate thread. - void start_receiver(TPartitionType::type stream_type, int num_senders, int receiver_num, - int buffer_size, bool is_merging, TUniqueId* out_id = nullptr) { - VLOG_QUERY << "start receiver"; - RuntimeProfile* profile = _obj_pool.add(new RuntimeProfile("TestReceiver")); - TUniqueId instance_id; - get_next_instance_id(&instance_id); - _receiver_info.push_back(ReceiverInfo(stream_type, num_senders, receiver_num)); - ReceiverInfo& info = _receiver_info.back(); - info.stream_recvr = - _stream_mgr->create_recvr(&_runtime_state, *_row_desc, instance_id, DEST_NODE_ID, - num_senders, buffer_size, profile, is_merging); - if (!is_merging) { - info.thread_handle = new thread(&DataStreamTest::read_stream, this, &info); - } else { - info.thread_handle = - new thread(&DataStreamTest::read_stream_merging, this, &info, profile); - } - - if (out_id != nullptr) { - *out_id = instance_id; - } - } - - void join_receivers() { - VLOG_QUERY << "join receiver\n"; - - for (int i = 0; i < _receiver_info.size(); ++i) { - _receiver_info[i].thread_handle->join(); - _receiver_info[i].stream_recvr->close(); - } - } - - // Deplete stream and print batches - void read_stream(ReceiverInfo* info) { - RowBatch* batch = nullptr; - VLOG_QUERY << "start reading"; - - while (!(info->status = info->stream_recvr->get_batch(&batch)).is_cancelled() && - (batch != nullptr)) { - VLOG_QUERY << "read batch #rows=" << (batch != nullptr ? batch->num_rows() : 0); - - for (int i = 0; i < batch->num_rows(); ++i) { - TupleRow* row = batch->get_row(i); - info->data_values.insert(*static_cast(row->get_tuple(0)->get_slot(0))); - } - - SleepFor(MonoDelta::FromMilliseconds( - 10)); // slow down receiver to exercise buffering logic - } - - if (info->status.is_cancelled()) { - VLOG_QUERY << "reader is cancelled"; - } - - VLOG_QUERY << "done reading"; - } - - void read_stream_merging(ReceiverInfo* info, RuntimeProfile* profile) { - info->status = info->stream_recvr->create_merger(*_less_than); - if (info->status.is_cancelled()) { - return; - } - RowBatch batch(*_row_desc, 1024); - VLOG_QUERY << "start reading merging"; - bool eos = false; - while (!(info->status = info->stream_recvr->get_next(&batch, &eos)).is_cancelled()) { - VLOG_QUERY << "read batch #rows=" << batch.num_rows(); - for (int i = 0; i < batch.num_rows(); ++i) { - TupleRow* row = batch.get_row(i); - info->data_values.insert(*static_cast(row->get_tuple(0)->get_slot(0))); - } - SleepFor(MonoDelta::FromMilliseconds( - 10)); // slow down receiver to exercise buffering logic - batch.reset(); - if (eos) { - break; - } - } - if (info->status.is_cancelled()) { - VLOG_QUERY << "reader is cancelled"; - } - VLOG_QUERY << "done reading"; - } - - // Verify correctness of receivers' data values. - void check_receivers(TPartitionType::type stream_type, int num_senders) { - int64_t total = 0; - multiset all_data_values; - - for (int i = 0; i < _receiver_info.size(); ++i) { - ReceiverInfo& info = _receiver_info[i]; - EXPECT_TRUE(info.status.ok()); - total += info.data_values.size(); - DCHECK_EQ(info.stream_type, stream_type); - DCHECK_EQ(info.num_senders, num_senders); - - if (stream_type == TPartitionType::UNPARTITIONED) { - EXPECT_EQ(NUM_BATCHES * BATCH_CAPACITY * num_senders, info.data_values.size()); - } - - all_data_values.insert(info.data_values.begin(), info.data_values.end()); - - int k = 0; - for (multiset::iterator j = info.data_values.begin(); - j != info.data_values.end(); ++j, ++k) { - if (stream_type == TPartitionType::UNPARTITIONED) { - // unpartitioned streams contain all values as many times as there are - // senders - EXPECT_EQ(k / num_senders, *j); - } else if (stream_type == TPartitionType::HASH_PARTITIONED) { - // hash-partitioned streams send values to the right partition - int64_t value = *j; - uint32_t hash_val = RawValue::get_hash_value_fvn(&value, TYPE_BIGINT, 0U); - EXPECT_EQ(hash_val % _receiver_info.size(), info.receiver_num); - } - } - } - - if (stream_type == TPartitionType::HASH_PARTITIONED) { - EXPECT_EQ(NUM_BATCHES * BATCH_CAPACITY * num_senders, total); - - int k = 0; - for (multiset::iterator j = all_data_values.begin(); - j != all_data_values.end(); ++j, ++k) { - // each sender sent all values - EXPECT_EQ(k / num_senders, *j); - - if (k / num_senders != *j) { - break; - } - } - } - } - - void check_senders() { - for (int i = 0; i < _sender_info.size(); ++i) { - EXPECT_TRUE(_sender_info[i].status.ok()); - EXPECT_GT(_sender_info[i].num_bytes_sent, 0) << "info i=" << i; - } - } - - // Start backend in separate thread. - void start_backend() { - std::shared_ptr handler(new DorisTestBackend(_stream_mgr)); - std::shared_ptr processor(new BackendServiceProcessor(handler)); - _server = new ThriftServer("DataStreamTest backend", processor, config::port, nullptr); - _server->start(); - } - - void stop_backend() { - VLOG_QUERY << "stop backend\n"; - _server->stop_for_testing(); - delete _server; - } - - void start_sender(TPartitionType::type partition_type = TPartitionType::UNPARTITIONED, - int channel_buffer_size = 1024) { - VLOG_QUERY << "start sender"; - int sender_id = _sender_info.size(); - DCHECK_LT(sender_id, MAX_SENDERS); - _sender_info.push_back(SenderInfo()); - SenderInfo& info = _sender_info.back(); - info.thread_handle = new thread(&DataStreamTest::sender, this, sender_id, - channel_buffer_size, partition_type); - } - - void join_senders() { - VLOG_QUERY << "join senders\n"; - for (int i = 0; i < _sender_info.size(); ++i) { - _sender_info[i].thread_handle->join(); - } - } - - void sender(int sender_num, int channel_buffer_size, TPartitionType::type partition_type) { - RuntimeState state(TExecPlanFragmentParams(), TQueryOptions(), "", &_exec_env); - state.set_desc_tbl(_desc_tbl); - state.init_mem_trackers(TUniqueId()); - VLOG_QUERY << "create sender " << sender_num; - const TDataStreamSink& stream_sink = - (partition_type == TPartitionType::UNPARTITIONED ? _broadcast_sink : _hash_sink); - DataStreamSender sender(&_obj_pool, sender_num, *_row_desc, stream_sink, _dest, - channel_buffer_size); - - TDataSink data_sink; - data_sink.__set_type(TDataSinkType::DATA_STREAM_SINK); - data_sink.__set_stream_sink(stream_sink); - EXPECT_TRUE(sender.init(data_sink).ok()); - - EXPECT_TRUE(sender.prepare(&state).ok()); - EXPECT_TRUE(sender.open(&state).ok()); - std::unique_ptr batch(create_row_batch()); - SenderInfo& info = _sender_info[sender_num]; - int next_val = 0; - - for (int i = 0; i < NUM_BATCHES; ++i) { - get_next_batch(batch.get(), &next_val); - VLOG_QUERY << "sender " << sender_num << ": #rows=" << batch->num_rows(); - info.status = sender.send(&state, batch.get()); - - if (!info.status.ok()) { - LOG(WARNING) << "something is wrong when sending: " << info.status; - break; - } - } - - VLOG_QUERY << "closing sender" << sender_num; - info.status = sender.close(&state, Status::OK()); - info.num_bytes_sent = sender.get_num_data_bytes_sent(); - - batch->reset(); - } - - void test_stream(TPartitionType::type stream_type, int num_senders, int num_receivers, - int buffer_size, bool is_merging) { - LOG(INFO) << "Testing stream=" << stream_type << " #senders=" << num_senders - << " #receivers=" << num_receivers << " buffer_size=" << buffer_size; - reset(); - - for (int i = 0; i < num_receivers; ++i) { - start_receiver(stream_type, num_senders, i, buffer_size, is_merging); - } - - for (int i = 0; i < num_senders; ++i) { - start_sender(stream_type, buffer_size); - } - - join_senders(); - check_senders(); - join_receivers(); - check_receivers(stream_type, num_senders); - } - -private: - ExprContext* _lhs_slot_ctx; - ExprContext* _rhs_slot_ctx; -}; - -TEST_F(DataStreamTest, UnknownSenderSmallResult) { - // starting a sender w/o a corresponding receiver does not result in an error because - // we cannot distinguish whether a receiver was never created or the receiver - // willingly tore down the stream - // case 1: entire query result fits in single buffer, close() returns ok - TUniqueId dummy_id; - get_next_instance_id(&dummy_id); - start_sender(TPartitionType::UNPARTITIONED, TOTAL_DATA_SIZE + 1024); - join_senders(); - EXPECT_TRUE(_sender_info[0].status.ok()); - EXPECT_GT(_sender_info[0].num_bytes_sent, 0); -} - -TEST_F(DataStreamTest, UnknownSenderLargeResult) { - // case 2: query result requires multiple buffers, send() returns ok - TUniqueId dummy_id; - get_next_instance_id(&dummy_id); - start_sender(); - join_senders(); - EXPECT_TRUE(_sender_info[0].status.ok()); - EXPECT_GT(_sender_info[0].num_bytes_sent, 0); -} - -TEST_F(DataStreamTest, Cancel) { - TUniqueId instance_id; - start_receiver(TPartitionType::UNPARTITIONED, 1, 1, 1024, false, &instance_id); - _stream_mgr->cancel(instance_id); - start_receiver(TPartitionType::UNPARTITIONED, 1, 1, 1024, true, &instance_id); - _stream_mgr->cancel(instance_id); - join_receivers(); - EXPECT_TRUE(_receiver_info[0].status.is_cancelled()); -} - -TEST_F(DataStreamTest, BasicTest) { - // TODO: also test that all client connections have been returned - TPartitionType::type stream_types[] = {TPartitionType::UNPARTITIONED, - TPartitionType::HASH_PARTITIONED}; - int sender_nums[] = {1, 3}; - int receiver_nums[] = {1, 3}; - int buffer_sizes[] = {1024, 1024 * 1024}; - bool merging[] = {false, true}; - - // test_stream(TPartitionType::HASH_PARTITIONED, 1, 3, 1024, true); - for (int i = 0; i < sizeof(stream_types) / sizeof(*stream_types); ++i) { - for (int j = 0; j < sizeof(sender_nums) / sizeof(int); ++j) { - for (int k = 0; k < sizeof(receiver_nums) / sizeof(int); ++k) { - for (int l = 0; l < sizeof(buffer_sizes) / sizeof(int); ++l) { - for (int m = 0; m < sizeof(merging) / sizeof(bool); ++m) { - LOG(ERROR) << "before test: stream_type=" << stream_types[i] - << " sender num=" << sender_nums[j] - << " receiver_num=" << receiver_nums[k] - << " buffer_size=" << buffer_sizes[l] - << " merging=" << (merging[m] ? "true" : "false"); - test_stream(stream_types[i], sender_nums[j], receiver_nums[k], - buffer_sizes[l], merging[m]); - LOG(ERROR) << "after test: stream_type=" << stream_types[i] - << " sender num=" << sender_nums[j] - << " receiver_num=" << receiver_nums[k] - << " buffer_size=" << buffer_sizes[l] - << " merging=" << (merging[m] ? "true" : "false"); - } - } - } - } - } -} - -// TODO: more tests: -// - test case for transmission error in last batch -// - receivers getting created concurrently - -} // namespace doris diff --git a/be/test/runtime/load_channel_mgr_test.cpp b/be/test/runtime/load_channel_mgr_test.cpp index 3569b30757..d99951d957 100644 --- a/be/test/runtime/load_channel_mgr_test.cpp +++ b/be/test/runtime/load_channel_mgr_test.cpp @@ -60,23 +60,6 @@ Status DeltaWriter::open(WriteRequest* req, DeltaWriter** writer) { return open_status; } -Status DeltaWriter::write(Tuple* tuple) { - if (_k_tablet_recorder.find(_req.tablet_id) == std::end(_k_tablet_recorder)) { - _k_tablet_recorder[_req.tablet_id] = 1; - } else { - _k_tablet_recorder[_req.tablet_id]++; - } - return add_status; -} - -Status DeltaWriter::write(const RowBatch* row_batch, const std::vector& row_idxs) { - if (_k_tablet_recorder.find(_req.tablet_id) == std::end(_k_tablet_recorder)) { - _k_tablet_recorder[_req.tablet_id] = 0; - } - _k_tablet_recorder[_req.tablet_id] += row_idxs.size(); - return add_status; -} - Status DeltaWriter::close() { return Status::OK(); } @@ -177,94 +160,6 @@ void create_schema(DescriptorTbl* desc_tbl, POlapTableSchemaParam* pschema) { indexes->set_schema_hash(123); } -TEST_F(LoadChannelMgrTest, normal) { - ExecEnv env; - LoadChannelMgr mgr; - mgr.init(-1); - - auto tdesc_tbl = create_descriptor_table(); - ObjectPool obj_pool; - DescriptorTbl* desc_tbl = nullptr; - DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl); - auto tuple_desc = desc_tbl->get_tuple_descriptor(0); - RowDescriptor row_desc(*desc_tbl, {0}, {false}); - PUniqueId load_id; - load_id.set_hi(2); - load_id.set_lo(3); - { - PTabletWriterOpenRequest request; - request.set_allocated_id(&load_id); - request.set_index_id(4); - request.set_txn_id(1); - create_schema(desc_tbl, request.mutable_schema()); - for (int i = 0; i < 2; ++i) { - auto tablet = request.add_tablets(); - tablet->set_partition_id(10 + i); - tablet->set_tablet_id(20 + i); - } - request.set_num_senders(1); - request.set_need_gen_rollup(false); - auto st = mgr.open(request); - request.release_id(); - EXPECT_TRUE(st.ok()); - } - - // add a batch - { - PTabletWriterAddBatchRequest request; - request.set_allocated_id(&load_id); - request.set_index_id(4); - request.set_sender_id(0); - request.set_eos(true); - request.set_packet_seq(0); - - request.add_tablet_ids(20); - request.add_tablet_ids(21); - request.add_tablet_ids(20); - - RowBatch row_batch(row_desc, 1024); - - // row1 - { - auto id = row_batch.add_row(); - auto tuple = (Tuple*)row_batch.tuple_data_pool()->allocate(tuple_desc->byte_size()); - row_batch.get_row(id)->set_tuple(0, tuple); - memset(tuple, 0, tuple_desc->byte_size()); - *(int*)tuple->get_slot(tuple_desc->slots()[0]->tuple_offset()) = 987654; - *(int64_t*)tuple->get_slot(tuple_desc->slots()[1]->tuple_offset()) = 1234567899876; - row_batch.commit_last_row(); - } - // row2 - { - auto id = row_batch.add_row(); - auto tuple = (Tuple*)row_batch.tuple_data_pool()->allocate(tuple_desc->byte_size()); - row_batch.get_row(id)->set_tuple(0, tuple); - memset(tuple, 0, tuple_desc->byte_size()); - *(int*)tuple->get_slot(tuple_desc->slots()[0]->tuple_offset()) = 12345678; - *(int64_t*)tuple->get_slot(tuple_desc->slots()[1]->tuple_offset()) = 9876567899876; - row_batch.commit_last_row(); - } - // row3 - { - auto id = row_batch.add_row(); - auto tuple = (Tuple*)row_batch.tuple_data_pool()->allocate(tuple_desc->byte_size()); - row_batch.get_row(id)->set_tuple(0, tuple); - memset(tuple, 0, tuple_desc->byte_size()); - *(int*)tuple->get_slot(tuple_desc->slots()[0]->tuple_offset()) = 876545678; - *(int64_t*)tuple->get_slot(tuple_desc->slots()[1]->tuple_offset()) = 76543234567; - row_batch.commit_last_row(); - } - row_batch.serialize(request.mutable_row_batch(), &uncompressed_size, &compressed_size); - PTabletWriterAddBatchResult response; - auto st = mgr.add_batch(request, &response); - request.release_id(); - EXPECT_TRUE(st.ok()); - } - // check content - EXPECT_EQ(_k_tablet_recorder[20], 2); - EXPECT_EQ(_k_tablet_recorder[21], 1); -} - TEST_F(LoadChannelMgrTest, cancel) { ExecEnv env; LoadChannelMgr mgr; @@ -342,373 +237,4 @@ TEST_F(LoadChannelMgrTest, open_failed) { } } -TEST_F(LoadChannelMgrTest, add_failed) { - ExecEnv env; - LoadChannelMgr mgr; - mgr.init(-1); - - auto tdesc_tbl = create_descriptor_table(); - ObjectPool obj_pool; - DescriptorTbl* desc_tbl = nullptr; - DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl); - auto tuple_desc = desc_tbl->get_tuple_descriptor(0); - RowDescriptor row_desc(*desc_tbl, {0}, {false}); - PUniqueId load_id; - load_id.set_hi(2); - load_id.set_lo(3); - { - PTabletWriterOpenRequest request; - request.set_allocated_id(&load_id); - request.set_index_id(4); - request.set_txn_id(1); - create_schema(desc_tbl, request.mutable_schema()); - for (int i = 0; i < 2; ++i) { - auto tablet = request.add_tablets(); - tablet->set_partition_id(10 + i); - tablet->set_tablet_id(20 + i); - } - request.set_num_senders(1); - request.set_need_gen_rollup(false); - auto st = mgr.open(request); - request.release_id(); - EXPECT_TRUE(st.ok()); - } - - // add a batch - { - PTabletWriterAddBatchRequest request; - request.set_allocated_id(&load_id); - request.set_index_id(4); - request.set_sender_id(0); - request.set_eos(true); - request.set_packet_seq(0); - - request.add_tablet_ids(20); - request.add_tablet_ids(21); - request.add_tablet_ids(20); - - RowBatch row_batch(row_desc, 1024); - - // row1 - { - auto id = row_batch.add_row(); - auto tuple = (Tuple*)row_batch.tuple_data_pool()->allocate(tuple_desc->byte_size()); - row_batch.get_row(id)->set_tuple(0, tuple); - memset(tuple, 0, tuple_desc->byte_size()); - *(int*)tuple->get_slot(tuple_desc->slots()[0]->tuple_offset()) = 987654; - *(int64_t*)tuple->get_slot(tuple_desc->slots()[1]->tuple_offset()) = 1234567899876; - row_batch.commit_last_row(); - } - // row2 - { - auto id = row_batch.add_row(); - auto tuple = (Tuple*)row_batch.tuple_data_pool()->allocate(tuple_desc->byte_size()); - row_batch.get_row(id)->set_tuple(0, tuple); - memset(tuple, 0, tuple_desc->byte_size()); - *(int*)tuple->get_slot(tuple_desc->slots()[0]->tuple_offset()) = 12345678; - *(int64_t*)tuple->get_slot(tuple_desc->slots()[1]->tuple_offset()) = 9876567899876; - row_batch.commit_last_row(); - } - // row3 - { - auto id = row_batch.add_row(); - auto tuple = (Tuple*)row_batch.tuple_data_pool()->allocate(tuple_desc->byte_size()); - row_batch.get_row(id)->set_tuple(0, tuple); - memset(tuple, 0, tuple_desc->byte_size()); - *(int*)tuple->get_slot(tuple_desc->slots()[0]->tuple_offset()) = 876545678; - *(int64_t*)tuple->get_slot(tuple_desc->slots()[1]->tuple_offset()) = 76543234567; - row_batch.commit_last_row(); - } - row_batch.serialize(request.mutable_row_batch(), &uncompressed_size, &compressed_size); - // DeltaWriter's write will return -215 - add_status = Status::Error(); - PTabletWriterAddBatchResult response; - auto st = mgr.add_batch(request, &response); - request.release_id(); - // st is still ok. - EXPECT_TRUE(st.ok()); - EXPECT_EQ(2, response.tablet_errors().size()); - } -} - -TEST_F(LoadChannelMgrTest, close_failed) { - ExecEnv env; - LoadChannelMgr mgr; - mgr.init(-1); - - auto tdesc_tbl = create_descriptor_table(); - ObjectPool obj_pool; - DescriptorTbl* desc_tbl = nullptr; - DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl); - auto tuple_desc = desc_tbl->get_tuple_descriptor(0); - RowDescriptor row_desc(*desc_tbl, {0}, {false}); - PUniqueId load_id; - load_id.set_hi(2); - load_id.set_lo(3); - { - PTabletWriterOpenRequest request; - request.set_allocated_id(&load_id); - request.set_index_id(4); - request.set_txn_id(1); - create_schema(desc_tbl, request.mutable_schema()); - for (int i = 0; i < 2; ++i) { - auto tablet = request.add_tablets(); - tablet->set_partition_id(10 + i); - tablet->set_tablet_id(20 + i); - } - request.set_num_senders(1); - request.set_need_gen_rollup(false); - auto st = mgr.open(request); - request.release_id(); - EXPECT_TRUE(st.ok()); - } - - // add a batch - { - PTabletWriterAddBatchRequest request; - request.set_allocated_id(&load_id); - request.set_index_id(4); - request.set_sender_id(0); - request.set_eos(true); - request.set_packet_seq(0); - - request.add_tablet_ids(20); - request.add_tablet_ids(21); - request.add_tablet_ids(20); - - request.add_partition_ids(10); - request.add_partition_ids(11); - - RowBatch row_batch(row_desc, 1024); - - // row1 - { - auto id = row_batch.add_row(); - auto tuple = (Tuple*)row_batch.tuple_data_pool()->allocate(tuple_desc->byte_size()); - row_batch.get_row(id)->set_tuple(0, tuple); - memset(tuple, 0, tuple_desc->byte_size()); - *(int*)tuple->get_slot(tuple_desc->slots()[0]->tuple_offset()) = 987654; - *(int64_t*)tuple->get_slot(tuple_desc->slots()[1]->tuple_offset()) = 1234567899876; - row_batch.commit_last_row(); - } - // row2 - { - auto id = row_batch.add_row(); - auto tuple = (Tuple*)row_batch.tuple_data_pool()->allocate(tuple_desc->byte_size()); - row_batch.get_row(id)->set_tuple(0, tuple); - memset(tuple, 0, tuple_desc->byte_size()); - *(int*)tuple->get_slot(tuple_desc->slots()[0]->tuple_offset()) = 12345678; - *(int64_t*)tuple->get_slot(tuple_desc->slots()[1]->tuple_offset()) = 9876567899876; - row_batch.commit_last_row(); - } - // row3 - { - auto id = row_batch.add_row(); - auto tuple = (Tuple*)row_batch.tuple_data_pool()->allocate(tuple_desc->byte_size()); - row_batch.get_row(id)->set_tuple(0, tuple); - memset(tuple, 0, tuple_desc->byte_size()); - *(int*)tuple->get_slot(tuple_desc->slots()[0]->tuple_offset()) = 876545678; - *(int64_t*)tuple->get_slot(tuple_desc->slots()[1]->tuple_offset()) = 76543234567; - row_batch.commit_last_row(); - } - row_batch.serialize(request.mutable_row_batch(), &uncompressed_size, &compressed_size); - close_status = Status::Error(); - PTabletWriterAddBatchResult response; - auto st = mgr.add_batch(request, &response); - request.release_id(); - // even if delta close failed, the return status is still ok, but tablet_vec is empty - EXPECT_TRUE(st.ok()); - EXPECT_TRUE(response.tablet_vec().empty()); - } -} - -TEST_F(LoadChannelMgrTest, unknown_tablet) { - ExecEnv env; - LoadChannelMgr mgr; - mgr.init(-1); - - auto tdesc_tbl = create_descriptor_table(); - ObjectPool obj_pool; - DescriptorTbl* desc_tbl = nullptr; - DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl); - auto tuple_desc = desc_tbl->get_tuple_descriptor(0); - RowDescriptor row_desc(*desc_tbl, {0}, {false}); - PUniqueId load_id; - load_id.set_hi(2); - load_id.set_lo(3); - { - PTabletWriterOpenRequest request; - request.set_allocated_id(&load_id); - request.set_index_id(4); - request.set_txn_id(1); - create_schema(desc_tbl, request.mutable_schema()); - for (int i = 0; i < 2; ++i) { - auto tablet = request.add_tablets(); - tablet->set_partition_id(10 + i); - tablet->set_tablet_id(20 + i); - } - request.set_num_senders(1); - request.set_need_gen_rollup(false); - auto st = mgr.open(request); - request.release_id(); - EXPECT_TRUE(st.ok()); - } - - // add a batch - { - PTabletWriterAddBatchRequest request; - request.set_allocated_id(&load_id); - request.set_index_id(4); - request.set_sender_id(0); - request.set_eos(true); - request.set_packet_seq(0); - - request.add_tablet_ids(20); - request.add_tablet_ids(22); - request.add_tablet_ids(20); - - RowBatch row_batch(row_desc, 1024); - - // row1 - { - auto id = row_batch.add_row(); - auto tuple = (Tuple*)row_batch.tuple_data_pool()->allocate(tuple_desc->byte_size()); - row_batch.get_row(id)->set_tuple(0, tuple); - memset(tuple, 0, tuple_desc->byte_size()); - *(int*)tuple->get_slot(tuple_desc->slots()[0]->tuple_offset()) = 987654; - *(int64_t*)tuple->get_slot(tuple_desc->slots()[1]->tuple_offset()) = 1234567899876; - row_batch.commit_last_row(); - } - // row2 - { - auto id = row_batch.add_row(); - auto tuple = (Tuple*)row_batch.tuple_data_pool()->allocate(tuple_desc->byte_size()); - row_batch.get_row(id)->set_tuple(0, tuple); - memset(tuple, 0, tuple_desc->byte_size()); - *(int*)tuple->get_slot(tuple_desc->slots()[0]->tuple_offset()) = 12345678; - *(int64_t*)tuple->get_slot(tuple_desc->slots()[1]->tuple_offset()) = 9876567899876; - row_batch.commit_last_row(); - } - // row3 - { - auto id = row_batch.add_row(); - auto tuple = (Tuple*)row_batch.tuple_data_pool()->allocate(tuple_desc->byte_size()); - row_batch.get_row(id)->set_tuple(0, tuple); - memset(tuple, 0, tuple_desc->byte_size()); - *(int*)tuple->get_slot(tuple_desc->slots()[0]->tuple_offset()) = 876545678; - *(int64_t*)tuple->get_slot(tuple_desc->slots()[1]->tuple_offset()) = 76543234567; - row_batch.commit_last_row(); - } - row_batch.serialize(request.mutable_row_batch(), &uncompressed_size, &compressed_size); - PTabletWriterAddBatchResult response; - auto st = mgr.add_batch(request, &response); - request.release_id(); - EXPECT_FALSE(st.ok()); - } -} - -TEST_F(LoadChannelMgrTest, duplicate_packet) { - ExecEnv env; - LoadChannelMgr mgr; - mgr.init(-1); - - auto tdesc_tbl = create_descriptor_table(); - ObjectPool obj_pool; - DescriptorTbl* desc_tbl = nullptr; - DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl); - auto tuple_desc = desc_tbl->get_tuple_descriptor(0); - RowDescriptor row_desc(*desc_tbl, {0}, {false}); - PUniqueId load_id; - load_id.set_hi(2); - load_id.set_lo(3); - { - PTabletWriterOpenRequest request; - request.set_allocated_id(&load_id); - request.set_index_id(4); - request.set_txn_id(1); - create_schema(desc_tbl, request.mutable_schema()); - for (int i = 0; i < 2; ++i) { - auto tablet = request.add_tablets(); - tablet->set_partition_id(10 + i); - tablet->set_tablet_id(20 + i); - } - request.set_num_senders(1); - request.set_need_gen_rollup(false); - auto st = mgr.open(request); - request.release_id(); - EXPECT_TRUE(st.ok()); - } - - // add a batch - { - PTabletWriterAddBatchRequest request; - request.set_allocated_id(&load_id); - request.set_index_id(4); - request.set_sender_id(0); - request.set_eos(false); - request.set_packet_seq(0); - - request.add_tablet_ids(20); - request.add_tablet_ids(21); - request.add_tablet_ids(20); - - RowBatch row_batch(row_desc, 1024); - - // row1 - { - auto id = row_batch.add_row(); - auto tuple = (Tuple*)row_batch.tuple_data_pool()->allocate(tuple_desc->byte_size()); - row_batch.get_row(id)->set_tuple(0, tuple); - memset(tuple, 0, tuple_desc->byte_size()); - *(int*)tuple->get_slot(tuple_desc->slots()[0]->tuple_offset()) = 987654; - *(int64_t*)tuple->get_slot(tuple_desc->slots()[1]->tuple_offset()) = 1234567899876; - row_batch.commit_last_row(); - } - // row2 - { - auto id = row_batch.add_row(); - auto tuple = (Tuple*)row_batch.tuple_data_pool()->allocate(tuple_desc->byte_size()); - row_batch.get_row(id)->set_tuple(0, tuple); - memset(tuple, 0, tuple_desc->byte_size()); - *(int*)tuple->get_slot(tuple_desc->slots()[0]->tuple_offset()) = 12345678; - *(int64_t*)tuple->get_slot(tuple_desc->slots()[1]->tuple_offset()) = 9876567899876; - row_batch.commit_last_row(); - } - // row3 - { - auto id = row_batch.add_row(); - auto tuple = (Tuple*)row_batch.tuple_data_pool()->allocate(tuple_desc->byte_size()); - row_batch.get_row(id)->set_tuple(0, tuple); - memset(tuple, 0, tuple_desc->byte_size()); - *(int*)tuple->get_slot(tuple_desc->slots()[0]->tuple_offset()) = 876545678; - *(int64_t*)tuple->get_slot(tuple_desc->slots()[1]->tuple_offset()) = 76543234567; - row_batch.commit_last_row(); - } - row_batch.serialize(request.mutable_row_batch(), &uncompressed_size, &compressed_size); - PTabletWriterAddBatchResult response; - auto st = mgr.add_batch(request, &response); - EXPECT_TRUE(st.ok()); - PTabletWriterAddBatchResult response2; - st = mgr.add_batch(request, &response2); - request.release_id(); - EXPECT_TRUE(st.ok()); - } - // close - { - PTabletWriterAddBatchRequest request; - request.set_allocated_id(&load_id); - request.set_index_id(4); - request.set_sender_id(0); - request.set_eos(true); - request.set_packet_seq(0); - PTabletWriterAddBatchResult response; - auto st = mgr.add_batch(request, &response); - request.release_id(); - EXPECT_TRUE(st.ok()); - } - // check content - EXPECT_EQ(_k_tablet_recorder[20], 2); - EXPECT_EQ(_k_tablet_recorder[21], 1); -} - } // namespace doris diff --git a/be/test/runtime/result_sink_test.cpp b/be/test/runtime/result_sink_test.cpp deleted file mode 100644 index 05a3caee03..0000000000 --- a/be/test/runtime/result_sink_test.cpp +++ /dev/null @@ -1,93 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "runtime/result_sink.h" - -#include -#include -#include - -#include - -#include "exprs/bool_literal.h" -#include "exprs/expr.h" -#include "exprs/float_literal.h" -#include "exprs/int_literal.h" -#include "exprs/string_literal.h" -#include "exprs/timestamp_literal.h" -#include "gen_cpp/Exprs_types.h" -#include "gen_cpp/PaloInternalService_types.h" -#include "gen_cpp/Types_types.h" -#include "runtime/buffer_control_block.h" -#include "runtime/primitive_type.h" -#include "runtime/result_buffer_mgr.h" -#include "runtime/row_batch.h" -#include "runtime/runtime_state.h" -#include "runtime/tuple_row.h" -#include "util/cpu_info.h" -#include "util/mysql_row_buffer.h" - -namespace doris { - -class ResultSinkTest : public testing::Test { -public: - ResultSinkTest() { - _runtime_state = new RuntimeState("ResultWriterTest"); - _runtime_state->_exec_env = &_exec_env; - - { - TExpr expr; - { - TExprNode node; - - node.node_type = TExprNodeType::INT_LITERAL; - node.type = to_tcolumn_type_thrift(TPrimitiveType::TINYINT); - node.num_children = 0; - TIntLiteral data; - data.value = 1; - node.__set_int_literal(data); - expr.nodes.push_back(node); - } - _exprs.push_back(expr); - } - } - virtual ~ResultSinkTest() { delete _runtime_state; } - -protected: - virtual void SetUp() {} - -private: - ExecEnv _exec_env; - std::vector _exprs; - RuntimeState* _runtime_state; - RowDescriptor _row_desc; - TResultSink _tsink; -}; - -TEST_F(ResultSinkTest, init_normal) { - ResultSink sink(_row_desc, _exprs, _tsink, 1024); - EXPECT_TRUE(sink.init(_runtime_state).ok()); - RowBatch row_batch(_row_desc, 1024); - row_batch.add_row(); - row_batch.commit_last_row(); - EXPECT_TRUE(sink.send(_runtime_state, &row_batch).ok()); - EXPECT_TRUE(sink.close(_runtime_state, Status::OK()).ok()); -} - -} // namespace doris - -/* vim: set ts=4 sw=4 sts=4 tw=100 */ diff --git a/be/test/vec/exec/vtablet_sink_test.cpp b/be/test/vec/exec/vtablet_sink_test.cpp index a987f30150..ef7ac73ce7 100644 --- a/be/test/vec/exec/vtablet_sink_test.cpp +++ b/be/test/vec/exec/vtablet_sink_test.cpp @@ -289,31 +289,6 @@ public: status.to_protobuf(response->mutable_status()); } - void tablet_writer_add_batch(google::protobuf::RpcController* controller, - const PTabletWriterAddBatchRequest* request, - PTabletWriterAddBatchResult* response, - google::protobuf::Closure* done) override { - brpc::ClosureGuard done_guard(done); - { - std::lock_guard l(_lock); - _row_counters += request->tablet_ids_size(); - if (request->eos()) { - _eof_counters++; - } - k_add_batch_status.to_protobuf(response->mutable_status()); - - if (request->has_row_batch() && _row_desc != nullptr) { - brpc::Controller* cntl = static_cast(controller); - attachment_transfer_request_row_batch(request, cntl); - RowBatch batch(*_row_desc, request->row_batch()); - for (int i = 0; i < batch.num_rows(); ++i) { - LOG(INFO) << batch.get_row(i)->to_string(*_row_desc); - _output_set->emplace(batch.get_row(i)->to_string(*_row_desc)); - } - } - } - } - void tablet_writer_add_block(google::protobuf::RpcController* controller, const PTabletWriterAddBlockRequest* request, PTabletWriterAddBlockResult* response, diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java index e49503ca04..6a47156b39 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java @@ -370,12 +370,6 @@ public class MockedBackendFactory { responseObserver.onCompleted(); } - @Override - public void tabletWriterAddBatch(InternalService.PTabletWriterAddBatchRequest request, StreamObserver responseObserver) { - responseObserver.onNext(null); - responseObserver.onCompleted(); - } - @Override public void tabletWriterCancel(InternalService.PTabletWriterCancelRequest request, StreamObserver responseObserver) { responseObserver.onNext(null); diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 552786f313..50bdb1bb68 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -551,8 +551,6 @@ service PBackendService { rpc cancel_plan_fragment(PCancelPlanFragmentRequest) returns (PCancelPlanFragmentResult); rpc fetch_data(PFetchDataRequest) returns (PFetchDataResult); rpc tablet_writer_open(PTabletWriterOpenRequest) returns (PTabletWriterOpenResult); - rpc tablet_writer_add_batch(PTabletWriterAddBatchRequest) returns (PTabletWriterAddBatchResult); - rpc tablet_writer_add_batch_by_http(PEmptyRequest) returns (PTabletWriterAddBatchResult); rpc tablet_writer_add_block(PTabletWriterAddBlockRequest) returns (PTabletWriterAddBlockResult); rpc tablet_writer_add_block_by_http(PEmptyRequest) returns (PTabletWriterAddBlockResult); rpc tablet_writer_cancel(PTabletWriterCancelRequest) returns (PTabletWriterCancelResult);