diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index 14baf8ae06..6e60c27cb9 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -28,7 +28,6 @@ #include "exec/exchange_node.h" #include "exec/exec_node.h" #include "exec/scan_node.h" -#include "exprs/expr.h" #include "runtime/data_stream_mgr.h" #include "runtime/descriptors.h" #include "runtime/exec_env.h" @@ -40,8 +39,6 @@ #include "runtime/thread_context.h" #include "util/container_util.hpp" #include "util/defer_op.h" -#include "util/mem_info.h" -#include "util/parse_util.h" #include "util/pretty_printer.h" #include "util/telemetry/telemetry.h" #include "util/uid_util.h" @@ -68,8 +65,7 @@ PlanFragmentExecutor::PlanFragmentExecutor(ExecEnv* exec_env, _is_report_success(false), _is_report_on_cancel(true), _collect_query_statistics_with_every_batch(false), - _cancel_reason(PPlanFragmentCancelReason::INTERNAL_ERROR), - _cancel_msg("") {} + _cancel_reason(PPlanFragmentCancelReason::INTERNAL_ERROR) {} PlanFragmentExecutor::~PlanFragmentExecutor() { close(); @@ -227,7 +223,7 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request, _prepared = true; _query_statistics.reset(new QueryStatistics()); - if (_sink.get() != nullptr) { + if (_sink != nullptr) { _sink->set_query_statistics(_query_statistics); } return Status::OK(); @@ -301,7 +297,7 @@ Status PlanFragmentExecutor::open_vectorized_internal() { RETURN_IF_ERROR(get_vectorized_internal(&block)); } - if (block == NULL) { + if (block == nullptr) { break; } @@ -373,7 +369,7 @@ Status PlanFragmentExecutor::open_internal() { RETURN_IF_ERROR(_plan->open(_runtime_state.get())); } - if (_sink.get() == nullptr) { + if (_sink == nullptr) { return Status::OK(); } { @@ -669,13 +665,13 @@ void PlanFragmentExecutor::close() { _row_batch.reset(nullptr); // Prepare may not have been called, which sets _runtime_state - if (_runtime_state.get() != nullptr) { + if (_runtime_state != nullptr) { // _runtime_state init failed if (_plan != nullptr) { _plan->close(_runtime_state.get()); } - if (_sink.get() != nullptr) { + if (_sink != nullptr) { if (_prepared) { Status status; { diff --git a/be/src/util/block_compression.cpp b/be/src/util/block_compression.cpp index a9f490b181..821636c4f9 100644 --- a/be/src/util/block_compression.cpp +++ b/be/src/util/block_compression.cpp @@ -73,6 +73,13 @@ public: } Status compress(const Slice& input, faststring* output) override { + if (input.size > INT_MAX) { + return Status::InvalidArgument( + "LZ4 not support those case(input.size>INT_MAX), maybe you should change " + "fragment_transmission_compression_codec to snappy, size={}", + input.size); + } + Context* context; RETURN_IF_ERROR(_acquire_compression_ctx(&context)); bool compress_failed = false; diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp index 1bbdececd5..752e393386 100644 --- a/be/src/vec/core/block.cpp +++ b/be/src/vec/core/block.cpp @@ -737,9 +737,9 @@ Status Block::serialize(PBlock* pblock, RETURN_IF_ERROR(get_block_compression_codec(compression_type, &codec)); faststring buf_compressed; - codec->compress(Slice(column_values.data(), content_uncompressed_size), &buf_compressed); + RETURN_IF_ERROR(codec->compress(Slice(column_values.data(), content_uncompressed_size), + &buf_compressed)); size_t compressed_size = buf_compressed.size(); - if (LIKELY(compressed_size < content_uncompressed_size)) { pblock->set_column_values(buf_compressed.data(), buf_compressed.size()); pblock->set_compressed(true); diff --git a/be/src/vec/exec/vtable_function_node.cpp b/be/src/vec/exec/vtable_function_node.cpp index 96b887fd7b..a741bd3553 100644 --- a/be/src/vec/exec/vtable_function_node.cpp +++ b/be/src/vec/exec/vtable_function_node.cpp @@ -17,8 +17,6 @@ #include "vec/exec/vtable_function_node.h" -#include "exprs/expr.h" -#include "exprs/expr_context.h" #include "exprs/table_function/table_function.h" #include "exprs/table_function/table_function_factory.h" #include "vec/exprs/vexpr.h" @@ -109,7 +107,7 @@ Status VTableFunctionNode::get_expanded_block(RuntimeState* state, Block* output } } - while (true) { + while (columns[_child_slots.size()]->size() < state->batch_size()) { RETURN_IF_CANCELLED(state); RETURN_IF_ERROR(state->check_query_state("VTableFunctionNode, while getting next batch.")); @@ -133,7 +131,7 @@ Status VTableFunctionNode::get_expanded_block(RuntimeState* state, Block* output } bool skip_child_row = false; - while (true) { + while (columns[_child_slots.size()]->size() < state->batch_size()) { int idx = _find_last_fn_eos_idx(); if (idx == 0 || skip_child_row) { // all table functions' results are exhausted, process next child row. @@ -187,10 +185,6 @@ Status VTableFunctionNode::get_expanded_block(RuntimeState* state, Block* output bool tmp = false; _fns[_fn_num - 1]->forward(&tmp); - - if (columns[_child_slots.size()]->size() >= state->batch_size()) { - break; - } } } diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index 94a4e99163..383f2ba51d 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -22,12 +22,12 @@ #include -#include "runtime/client_cache.h" #include "runtime/dpp_sink_internal.h" #include "runtime/exec_env.h" #include "runtime/memory/mem_tracker.h" #include "runtime/runtime_state.h" #include "runtime/thread_context.h" +#include "util/brpc_client_cache.h" #include "util/proto_util.h" #include "vec/common/sip_hash.h" #include "vec/runtime/vdata_stream_mgr.h" @@ -174,7 +174,8 @@ Status VDataStreamSender::Channel::send_block(PBlock* block, bool eos) { brpc_url + "/PInternalServiceImpl/transmit_block_by_http"; _closure->cntl.http_request().set_method(brpc::HTTP_METHOD_POST); _closure->cntl.http_request().set_content_type("application/json"); - _brpc_http_stub->transmit_block_by_http(&_closure->cntl, NULL, &_closure->result, _closure); + _brpc_http_stub->transmit_block_by_http(&_closure->cntl, nullptr, &_closure->result, + _closure); } else { _closure->cntl.http_request().Clear(); _brpc_stub->transmit_block(&_closure->cntl, &_brpc_request, &_closure->result, _closure); @@ -190,7 +191,7 @@ Status VDataStreamSender::Channel::add_rows(Block* block, const std::vector return Status::OK(); } - if (_mutable_block.get() == nullptr) { + if (_mutable_block == nullptr) { _mutable_block.reset(new MutableBlock(block->clone_empty())); } @@ -398,7 +399,7 @@ Status VDataStreamSender::prepare(RuntimeState* state) { } std::string title = fmt::format("VDataStreamSender (dst_id={}, dst_fragments=[{}])", _dest_node_id, instances); - _profile = _pool->add(new RuntimeProfile(std::move(title))); + _profile = _pool->add(new RuntimeProfile(title)); SCOPED_TIMER(_profile->total_time_counter()); _mem_tracker = std::make_unique( "VDataStreamSender:" + print_id(state->fragment_instance_id()), _profile); @@ -468,7 +469,9 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block) { // 3. rollover block int local_size = 0; for (auto channel : _channels) { - if (channel->is_local()) local_size++; + if (channel->is_local()) { + local_size++; + } } if (local_size == _channels.size()) { for (auto channel : _channels) { @@ -562,7 +565,9 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block) { } Status VDataStreamSender::close(RuntimeState* state, Status exec_status) { - if (_closed) return Status::OK(); + if (_closed) { + return Status::OK(); + } START_AND_SCOPE_SPAN(state->get_tracer(), span, "VDataStreamSender::close"); Status final_st = Status::OK(); diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index 0d44d3152b..f3b561c625 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -24,12 +24,9 @@ #include "gen_cpp/internal_service.pb.h" #include "runtime/descriptors.h" #include "service/backend_options.h" -#include "service/brpc.h" -#include "util/brpc_client_cache.h" -#include "util/network_util.h" #include "util/ref_count_closure.h" #include "util/uid_util.h" -#include "vec/exprs/vexpr.h" +#include "vec/exprs/vexpr_context.h" namespace doris { class ObjectPool; @@ -59,18 +56,18 @@ public: VDataStreamSender(ObjectPool* pool, const RowDescriptor& row_desc, int per_channel_buffer_size, bool send_query_statistics_with_every_batch); - ~VDataStreamSender(); + ~VDataStreamSender() override; - virtual Status init(const TDataSink& thrift_sink) override; + Status init(const TDataSink& thrift_sink) override; - virtual Status prepare(RuntimeState* state) override; - virtual Status open(RuntimeState* state) override; + Status prepare(RuntimeState* state) override; + Status open(RuntimeState* state) override; - virtual Status send(RuntimeState* state, RowBatch* batch) override; - virtual Status send(RuntimeState* state, Block* block) override; + Status send(RuntimeState* state, RowBatch* batch) override; + Status send(RuntimeState* state, Block* block) override; - virtual Status close(RuntimeState* state, Status exec_status) override; - virtual RuntimeProfile* profile() override { return _profile; } + Status close(RuntimeState* state, Status exec_status) override; + RuntimeProfile* profile() override { return _profile; } RuntimeState* state() { return _state; } @@ -89,8 +86,8 @@ protected: } template - Status channel_add_rows(Channels& channels, int num_channels, uint64_t* channel_ids, int rows, - Block* block); + Status channel_add_rows(Channels& channels, int num_channels, const uint64_t* channel_ids, + int rows, Block* block); struct hash_128 { uint64_t high; @@ -246,7 +243,9 @@ public: private: Status _wait_last_brpc() { SCOPED_TIMER(_parent->_brpc_wait_timer); - if (_closure == nullptr) return Status::OK(); + if (_closure == nullptr) { + return Status::OK(); + } auto cntl = &_closure->cntl; auto call_id = _closure->cntl.call_id(); brpc::Join(call_id); @@ -315,7 +314,7 @@ private: template Status VDataStreamSender::channel_add_rows(Channels& channels, int num_channels, - uint64_t* __restrict channel_ids, int rows, + const uint64_t* __restrict channel_ids, int rows, Block* block) { std::vector channel2rows[num_channels]; diff --git a/be/src/vec/sink/vresult_file_sink.cpp b/be/src/vec/sink/vresult_file_sink.cpp index cbb8d8810c..7bfc8b4c8a 100644 --- a/be/src/vec/sink/vresult_file_sink.cpp +++ b/be/src/vec/sink/vresult_file_sink.cpp @@ -18,15 +18,14 @@ #include "vec/sink/vresult_file_sink.h" #include "common/config.h" -#include "exprs/expr.h" #include "runtime/buffer_control_block.h" #include "runtime/exec_env.h" #include "runtime/file_result_writer.h" #include "runtime/result_buffer_mgr.h" -#include "runtime/result_file_sink.h" #include "runtime/row_batch.h" #include "runtime/runtime_state.h" #include "util/uid_util.h" +#include "vec/exprs/vexpr.h" #include "vec/runtime/vfile_result_writer.h" namespace doris::vectorized { diff --git a/be/test/vec/runtime/vdata_stream_test.cpp b/be/test/vec/runtime/vdata_stream_test.cpp index c16224f470..175fa64ee3 100644 --- a/be/test/vec/runtime/vdata_stream_test.cpp +++ b/be/test/vec/runtime/vdata_stream_test.cpp @@ -24,8 +24,8 @@ #include "runtime/exec_env.h" #include "service/brpc.h" #include "testutil/desc_tbl_builder.h" +#include "util/brpc_client_cache.h" #include "util/proto_util.h" -#include "vec/columns/columns_number.h" #include "vec/data_types/data_type_number.h" #include "vec/runtime/vdata_stream_mgr.h" #include "vec/runtime/vdata_stream_recvr.h" diff --git a/regression-test/data/query_p1/lateral_view/load_from_big_lateral_view.out b/regression-test/data/query_p1/lateral_view/load_from_big_lateral_view.out new file mode 100644 index 0000000000..cb4189820b --- /dev/null +++ b/regression-test/data/query_p1/lateral_view/load_from_big_lateral_view.out @@ -0,0 +1,4 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +100000000 + diff --git a/regression-test/suites/query_p1/lateral_view/load_from_big_lateral_view.groovy b/regression-test/suites/query_p1/lateral_view/load_from_big_lateral_view.groovy new file mode 100644 index 0000000000..4a7bb4a196 --- /dev/null +++ b/regression-test/suites/query_p1/lateral_view/load_from_big_lateral_view.groovy @@ -0,0 +1,39 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("query_p1") { + def dbName = "regression_load_from_big_lateral_view" + sql "DROP DATABASE IF EXISTS ${dbName}" + sql "CREATE DATABASE ${dbName}" + sql "USE ${dbName}" + sql """ + CREATE TABLE `test` ( + `k1` smallint NULL, + `k2` int NULL, + `k3` bigint NULL, + `k4` largeint NULL + ) ENGINE=OLAP + DUPLICATE KEY(`k1`,`k2`,`k3`,`k4`) + DISTRIBUTED BY HASH(`k1`) BUCKETS 1 + PROPERTIES("replication_num" = "1"); + """ + + sql """insert into test select e1,e1,e1,e1 from (select 1 k1) as t lateral view explode_numbers(100000000) tmp1 as e1;""" + + qt_sql """select count(*) from test;""" +} +