From 4fcd6cd2365b038a5ef3830003d9ca28af960237 Mon Sep 17 00:00:00 2001 From: yiguolei <676222867@qq.com> Date: Fri, 10 Feb 2023 07:46:18 +0800 Subject: [PATCH] [refactor](remove unused code) remove load stream mgr (#16580) remove old stream load pipe remove old stream load manager --------- Co-authored-by: yiguolei --- be/src/http/action/stream_load.cpp | 2 +- be/src/io/file_factory.cpp | 10 - be/src/io/file_factory.h | 4 - be/src/runtime/CMakeLists.txt | 1 - be/src/runtime/exec_env.h | 3 - be/src/runtime/exec_env_init.cpp | 3 - be/src/runtime/fragment_mgr.cpp | 2 +- .../routine_load/kafka_consumer_pipe.h | 54 ---- .../runtime/stream_load/load_stream_mgr.cpp | 36 --- be/src/runtime/stream_load/load_stream_mgr.h | 72 ----- .../runtime/stream_load/new_load_stream_mgr.h | 1 - .../runtime/stream_load/stream_load_context.h | 1 - be/src/runtime/stream_load/stream_load_pipe.h | 282 ------------------ be/test/CMakeLists.txt | 3 - be/test/http/stream_load_test.cpp | 234 --------------- be/test/runtime/kafka_consumer_pipe_test.cpp | 66 ---- be/test/runtime/stream_load_pipe_test.cpp | 261 ---------------- be/test/vec/exec/vtablet_sink_test.cpp | 3 - 18 files changed, 2 insertions(+), 1036 deletions(-) delete mode 100644 be/src/runtime/routine_load/kafka_consumer_pipe.h delete mode 100644 be/src/runtime/stream_load/load_stream_mgr.cpp delete mode 100644 be/src/runtime/stream_load/load_stream_mgr.h delete mode 100644 be/src/runtime/stream_load/stream_load_pipe.h delete mode 100644 be/test/http/stream_load_test.cpp delete mode 100644 be/test/runtime/kafka_consumer_pipe_test.cpp delete mode 100644 be/test/runtime/stream_load_pipe_test.cpp diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index 8281064881..a55c0e22c2 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -397,7 +397,7 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, StreamLoadContext* request.__set_loadId(ctx->id.to_thrift()); if (ctx->use_streaming) { auto pipe = std::make_shared( - kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /* min_chunk_size */, + io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /* min_chunk_size */, ctx->body_bytes /* total_length */); RETURN_IF_ERROR(_exec_env->new_load_stream_mgr()->put(ctx->id, pipe)); diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp index 42b14f3572..9324a83f27 100644 --- a/be/src/io/file_factory.cpp +++ b/be/src/io/file_factory.cpp @@ -37,7 +37,6 @@ #include "io/s3_writer.h" #include "olap/iterators.h" #include "runtime/exec_env.h" -#include "runtime/stream_load/load_stream_mgr.h" #include "runtime/stream_load/new_load_stream_mgr.h" namespace doris { @@ -198,15 +197,6 @@ Status FileFactory::create_pipe_reader(const TUniqueId& load_id, io::FileReaderS return Status::OK(); } -Status FileFactory::create_pipe_reader(const TUniqueId& load_id, - std::shared_ptr& file_reader) { - file_reader = ExecEnv::GetInstance()->load_stream_mgr()->get(load_id); - if (!file_reader) { - return Status::InternalError("unknown stream load id: {}", UniqueId(load_id).to_string()); - } - return Status::OK(); -} - Status FileFactory::create_hdfs_reader(const THdfsParams& hdfs_params, const std::string& path, std::shared_ptr* hdfs_file_system, io::FileReaderSPtr* reader, diff --git a/be/src/io/file_factory.h b/be/src/io/file_factory.h index 63ab0f2a83..c78504f9c4 100644 --- a/be/src/io/file_factory.h +++ b/be/src/io/file_factory.h @@ -80,10 +80,6 @@ public: // Create FileReader for stream load pipe static Status create_pipe_reader(const TUniqueId& load_id, io::FileReaderSPtr* file_reader); - // [deprecated] Create FileReader for stream load pipe - static Status create_pipe_reader(const TUniqueId& load_id, - std::shared_ptr& file_reader); - static Status create_hdfs_reader(const THdfsParams& hdfs_params, const std::string& path, std::shared_ptr* hdfs_file_system, io::FileReaderSPtr* reader, diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt index 94b95a6fe6..a768c16a55 100644 --- a/be/src/runtime/CMakeLists.txt +++ b/be/src/runtime/CMakeLists.txt @@ -59,7 +59,6 @@ set(RUNTIME_FILES stream_load/stream_load_context.cpp stream_load/stream_load_executor.cpp stream_load/stream_load_recorder.cpp - stream_load/load_stream_mgr.cpp stream_load/new_load_stream_mgr.cpp routine_load/data_consumer.cpp routine_load/data_consumer_group.cpp diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index b646f67589..4d2163cf39 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -47,7 +47,6 @@ class ExternalScanContextMgr; class FragmentMgr; class ResultCache; class LoadPathMgr; -class LoadStreamMgr; class NewLoadStreamMgr; class MemTrackerLimiter; class MemTracker; @@ -156,7 +155,6 @@ public: return _function_client_cache; } LoadChannelMgr* load_channel_mgr() { return _load_channel_mgr; } - LoadStreamMgr* load_stream_mgr() { return _load_stream_mgr; } NewLoadStreamMgr* new_load_stream_mgr() { return _new_load_stream_mgr; } SmallFileMgr* small_file_mgr() { return _small_file_mgr; } BlockSpillManager* block_spill_mgr() { return _block_spill_mgr; } @@ -230,7 +228,6 @@ private: BfdParser* _bfd_parser = nullptr; BrokerMgr* _broker_mgr = nullptr; LoadChannelMgr* _load_channel_mgr = nullptr; - LoadStreamMgr* _load_stream_mgr = nullptr; NewLoadStreamMgr* _new_load_stream_mgr = nullptr; BrpcClientCache* _internal_client_cache = nullptr; BrpcClientCache* _function_client_cache = nullptr; diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index 62330854bb..8b3146cdc2 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -42,7 +42,6 @@ #include "runtime/result_queue_mgr.h" #include "runtime/routine_load/routine_load_task_executor.h" #include "runtime/small_file_mgr.h" -#include "runtime/stream_load/load_stream_mgr.h" #include "runtime/stream_load/new_load_stream_mgr.h" #include "runtime/stream_load/stream_load_executor.h" #include "runtime/tmp_file_mgr.h" @@ -116,7 +115,6 @@ Status ExecEnv::_init(const std::vector& store_paths) { _bfd_parser = BfdParser::create(); _broker_mgr = new BrokerMgr(this); _load_channel_mgr = new LoadChannelMgr(); - _load_stream_mgr = new LoadStreamMgr(); _new_load_stream_mgr = new NewLoadStreamMgr(); _internal_client_cache = new BrpcClientCache(); _function_client_cache = new BrpcClientCache(); @@ -313,7 +311,6 @@ void ExecEnv::_destroy() { _deregister_metrics(); SAFE_DELETE(_internal_client_cache); SAFE_DELETE(_function_client_cache); - SAFE_DELETE(_load_stream_mgr); SAFE_DELETE(_load_channel_mgr); SAFE_DELETE(_broker_mgr); SAFE_DELETE(_bfd_parser); diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index e1bd3651db..d705ff1141 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -506,7 +506,7 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params) { stream_load_ctx->need_commit_self = true; stream_load_ctx->need_rollback = true; auto pipe = std::make_shared( - kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /* min_chunk_size */, + io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /* min_chunk_size */, -1 /* total_length */, true /* use_proto */); stream_load_ctx->body_sink = pipe; stream_load_ctx->max_filter_ratio = params.txn_conf.max_filter_ratio; diff --git a/be/src/runtime/routine_load/kafka_consumer_pipe.h b/be/src/runtime/routine_load/kafka_consumer_pipe.h deleted file mode 100644 index 6d01bbe091..0000000000 --- a/be/src/runtime/routine_load/kafka_consumer_pipe.h +++ /dev/null @@ -1,54 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include - -#include -#include -#include - -#include "io/file_reader.h" -#include "librdkafka/rdkafka.h" -#include "runtime/message_body_sink.h" -#include "runtime/stream_load/stream_load_pipe.h" - -namespace doris { - -class KafkaConsumerPipe : public StreamLoadPipe { -public: - KafkaConsumerPipe(size_t max_buffered_bytes = 1024 * 1024, size_t min_chunk_size = 64 * 1024) - : StreamLoadPipe(max_buffered_bytes, min_chunk_size) {} - - virtual ~KafkaConsumerPipe() {} - - Status append_with_line_delimiter(const char* data, size_t size) { - Status st = append(data, size); - if (!st.ok()) { - return st; - } - - // append the line delimiter - st = append("\n", 1); - return st; - } - - Status append_json(const char* data, size_t size) { return append_and_flush(data, size); } -}; - -} // end namespace doris diff --git a/be/src/runtime/stream_load/load_stream_mgr.cpp b/be/src/runtime/stream_load/load_stream_mgr.cpp deleted file mode 100644 index c486dccd8d..0000000000 --- a/be/src/runtime/stream_load/load_stream_mgr.cpp +++ /dev/null @@ -1,36 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "runtime/stream_load/load_stream_mgr.h" - -namespace doris { - -DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(stream_load_pipe_count, MetricUnit::NOUNIT); - -LoadStreamMgr::LoadStreamMgr() { - // Each StreamLoadPipe has a limited buffer size (default 1M), it's not needed to count the - // actual size of all StreamLoadPipe. - REGISTER_HOOK_METRIC(stream_load_pipe_count, [this]() { - // std::lock_guard l(_lock); - return _stream_map.size(); - }); -} - -LoadStreamMgr::~LoadStreamMgr() { - DEREGISTER_HOOK_METRIC(stream_load_pipe_count); -} -} // namespace doris diff --git a/be/src/runtime/stream_load/load_stream_mgr.h b/be/src/runtime/stream_load/load_stream_mgr.h deleted file mode 100644 index b374fe0c32..0000000000 --- a/be/src/runtime/stream_load/load_stream_mgr.h +++ /dev/null @@ -1,72 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include -#include -#include - -#include "runtime/stream_load/stream_load_pipe.h" // for StreamLoadPipe -#include "util/doris_metrics.h" -#include "util/uid_util.h" // for std::hash for UniqueId - -namespace doris { - -// used to register all streams in process so that other module can get this stream -class LoadStreamMgr { -public: - LoadStreamMgr(); - ~LoadStreamMgr(); - - Status put(const UniqueId& id, std::shared_ptr stream) { - std::lock_guard l(_lock); - auto it = _stream_map.find(id); - if (it != std::end(_stream_map)) { - return Status::InternalError("id already exist"); - } - _stream_map.emplace(id, stream); - VLOG_NOTICE << "put stream load pipe: " << id; - return Status::OK(); - } - - std::shared_ptr get(const UniqueId& id) { - std::lock_guard l(_lock); - auto it = _stream_map.find(id); - if (it == std::end(_stream_map)) { - return nullptr; - } - auto stream = it->second; - _stream_map.erase(it); - return stream; - } - - void remove(const UniqueId& id) { - std::lock_guard l(_lock); - auto it = _stream_map.find(id); - if (it != std::end(_stream_map)) { - _stream_map.erase(it); - VLOG_NOTICE << "remove stream load pipe: " << id; - } - } - -private: - std::mutex _lock; - std::unordered_map> _stream_map; -}; - -} // namespace doris diff --git a/be/src/runtime/stream_load/new_load_stream_mgr.h b/be/src/runtime/stream_load/new_load_stream_mgr.h index 60cd3aa446..9ab2030487 100644 --- a/be/src/runtime/stream_load/new_load_stream_mgr.h +++ b/be/src/runtime/stream_load/new_load_stream_mgr.h @@ -28,7 +28,6 @@ namespace doris { // used to register all streams in process so that other module can get this stream -// TODO(ftw): should be renamed to `LoadStreamMgr` after new file reader is ready. class NewLoadStreamMgr { public: NewLoadStreamMgr(); diff --git a/be/src/runtime/stream_load/stream_load_context.h b/be/src/runtime/stream_load/stream_load_context.h index 0fc27a27f9..ef3602ab8f 100644 --- a/be/src/runtime/stream_load/stream_load_context.h +++ b/be/src/runtime/stream_load/stream_load_context.h @@ -28,7 +28,6 @@ #include "gen_cpp/BackendService_types.h" #include "gen_cpp/FrontendService_types.h" #include "runtime/exec_env.h" -#include "runtime/stream_load/load_stream_mgr.h" #include "runtime/stream_load/new_load_stream_mgr.h" #include "runtime/stream_load/stream_load_executor.h" #include "service/backend_options.h" diff --git a/be/src/runtime/stream_load/stream_load_pipe.h b/be/src/runtime/stream_load/stream_load_pipe.h deleted file mode 100644 index f32048da54..0000000000 --- a/be/src/runtime/stream_load/stream_load_pipe.h +++ /dev/null @@ -1,282 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include -#include -#include - -#include "gen_cpp/internal_service.pb.h" -#include "io/file_reader.h" -#include "runtime/message_body_sink.h" -#include "runtime/thread_context.h" -#include "util/bit_util.h" -#include "util/byte_buffer.h" - -namespace doris { - -const size_t kMaxPipeBufferedBytes = 4 * 1024 * 1024; -// StreamLoadPipe use to transfer data from producer to consumer -// Data in pip is stored in chunks. - -class StreamLoadPipe : public MessageBodySink, public FileReader { -public: - StreamLoadPipe(size_t max_buffered_bytes = kMaxPipeBufferedBytes, - size_t min_chunk_size = 64 * 1024, int64_t total_length = -1, - bool use_proto = false) - : _buffered_bytes(0), - _proto_buffered_bytes(0), - _max_buffered_bytes(max_buffered_bytes), - _min_chunk_size(min_chunk_size), - _total_length(total_length), - _use_proto(use_proto) {} - - ~StreamLoadPipe() override { - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker()); - while (!_buf_queue.empty()) { - _buf_queue.pop_front(); - } - } - - Status open() override { return Status::OK(); } - - Status append_and_flush(const char* data, size_t size, size_t proto_byte_size = 0) { - ByteBufferPtr buf = ByteBuffer::allocate(BitUtil::RoundUpToPowerOfTwo(size + 1)); - buf->put_bytes(data, size); - buf->flip(); - return _append(buf, proto_byte_size); - } - - Status append(const char* data, size_t size) override { - size_t pos = 0; - if (_write_buf != nullptr) { - if (size < _write_buf->remaining()) { - _write_buf->put_bytes(data, size); - return Status::OK(); - } else { - pos = _write_buf->remaining(); - _write_buf->put_bytes(data, pos); - - _write_buf->flip(); - RETURN_IF_ERROR(_append(_write_buf)); - _write_buf.reset(); - } - } - // need to allocate a new chunk, min chunk is 64k - size_t chunk_size = std::max(_min_chunk_size, size - pos); - chunk_size = BitUtil::RoundUpToPowerOfTwo(chunk_size); - _write_buf = ByteBuffer::allocate(chunk_size); - _write_buf->put_bytes(data + pos, size - pos); - return Status::OK(); - } - - Status append(const ByteBufferPtr& buf) override { - if (_write_buf != nullptr) { - _write_buf->flip(); - RETURN_IF_ERROR(_append(_write_buf)); - _write_buf.reset(); - } - return _append(buf); - } - - // If _total_length == -1, this should be a Kafka routine load task, - // just get the next buffer directly from the buffer queue, because one buffer contains a complete piece of data. - // Otherwise, this should be a stream load task that needs to read the specified amount of data. - Status read_one_message(std::unique_ptr* data, int64_t* length) override { - if (_total_length < -1) { - return Status::InternalError("invalid, _total_length is: {}", _total_length); - } else if (_total_length == 0) { - // no data - *length = 0; - return Status::OK(); - } - - if (_total_length == -1) { - return _read_next_buffer(data, length); - } - - // _total_length > 0, read the entire data - data->reset(new uint8_t[_total_length]); - bool eof = false; - Status st = read(data->get(), _total_length, length, &eof); - if (eof) { - *length = 0; - } - return st; - } - - Status read(uint8_t* data, int64_t data_size, int64_t* bytes_read, bool* eof) override { - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker()); - *bytes_read = 0; - while (*bytes_read < data_size) { - std::unique_lock l(_lock); - while (!_cancelled && !_finished && _buf_queue.empty()) { - _get_cond.wait(l); - } - // cancelled - if (_cancelled) { - return Status::InternalError("cancelled: {}", _cancelled_reason); - } - // finished - if (_buf_queue.empty()) { - DCHECK(_finished); - data_size = *bytes_read; - *eof = (*bytes_read == 0); - return Status::OK(); - } - auto buf = _buf_queue.front(); - int64_t copy_size = std::min(data_size - *bytes_read, (int64_t)buf->remaining()); - buf->get_bytes((char*)data + *bytes_read, copy_size); - *bytes_read += copy_size; - if (!buf->has_remaining()) { - _buf_queue.pop_front(); - _buffered_bytes -= buf->limit; - _put_cond.notify_one(); - } - } - DCHECK(*bytes_read == data_size) - << "*bytes_read=" << *bytes_read << ", data_size=" << data_size; - *eof = false; - return Status::OK(); - } - - Status readat(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out) override { - return Status::InternalError("Not implemented"); - } - - int64_t size() override { return 0; } - - Status seek(int64_t position) override { return Status::InternalError("Not implemented"); } - - Status tell(int64_t* position) override { return Status::InternalError("Not implemented"); } - - // called when consumer finished - void close() override { cancel("closed"); } - - bool closed() override { return _cancelled; } - - // called when producer finished - Status finish() override { - if (_write_buf != nullptr) { - _write_buf->flip(); - _append(_write_buf); - _write_buf.reset(); - } - { - std::lock_guard l(_lock); - _finished = true; - } - _get_cond.notify_all(); - return Status::OK(); - } - - // called when producer/consumer failed - void cancel(const std::string& reason) override { - { - std::lock_guard l(_lock); - _cancelled = true; - _cancelled_reason = reason; - } - _get_cond.notify_all(); - _put_cond.notify_all(); - } - -private: - // read the next buffer from _buf_queue - Status _read_next_buffer(std::unique_ptr* data, int64_t* length) { - std::unique_lock l(_lock); - while (!_cancelled && !_finished && _buf_queue.empty()) { - _get_cond.wait(l); - } - // cancelled - if (_cancelled) { - return Status::InternalError("cancelled: {}", _cancelled_reason); - } - // finished - if (_buf_queue.empty()) { - DCHECK(_finished); - data->reset(); - *length = 0; - return Status::OK(); - } - auto buf = _buf_queue.front(); - *length = buf->remaining(); - data->reset(new uint8_t[*length]); - buf->get_bytes((char*)(data->get()), *length); - _buf_queue.pop_front(); - _buffered_bytes -= buf->limit; - if (_use_proto) { - PDataRow** ptr = reinterpret_cast(data->get()); - _proto_buffered_bytes -= (sizeof(PDataRow*) + (*ptr)->GetCachedSize()); - } - _put_cond.notify_one(); - return Status::OK(); - } - - Status _append(const ByteBufferPtr& buf, size_t proto_byte_size = 0) { - { - std::unique_lock l(_lock); - // if _buf_queue is empty, we append this buf without size check - if (_use_proto) { - while (!_cancelled && !_buf_queue.empty() && - (_proto_buffered_bytes + proto_byte_size > _max_buffered_bytes)) { - _put_cond.wait(l); - } - } else { - while (!_cancelled && !_buf_queue.empty() && - _buffered_bytes + buf->remaining() > _max_buffered_bytes) { - _put_cond.wait(l); - } - } - if (_cancelled) { - return Status::InternalError("cancelled: {}", _cancelled_reason); - } - _buf_queue.push_back(buf); - if (_use_proto) { - _proto_buffered_bytes += proto_byte_size; - } else { - _buffered_bytes += buf->remaining(); - } - } - _get_cond.notify_one(); - return Status::OK(); - } - - // Blocking queue - std::mutex _lock; - size_t _buffered_bytes; - size_t _proto_buffered_bytes; - size_t _max_buffered_bytes; - size_t _min_chunk_size; - // The total amount of data expected to be read. - // In some scenarios, such as loading json format data through stream load, - // the data needs to be completely read before it can be parsed, - // so the total size of the data needs to be known. - // The default is -1, which means that the data arrives in a stream - // and the length is unknown. - // size_t is unsigned, so use int64_t - int64_t _total_length = -1; - bool _use_proto = false; - std::deque _buf_queue; - std::condition_variable _put_cond; - std::condition_variable _get_cond; - - ByteBufferPtr _write_buf; -}; - -} // namespace doris diff --git a/be/test/CMakeLists.txt b/be/test/CMakeLists.txt index 444416bce3..7fa7493bf6 100644 --- a/be/test/CMakeLists.txt +++ b/be/test/CMakeLists.txt @@ -66,7 +66,6 @@ set(HTTP_TEST_FILES http/http_utils_test.cpp http/http_client_test.cpp # TODO this will overide HttpChannel and make other test failed - # http/stream_load_test.cpp # http/metrics_action_test.cpp ) set(IO_TEST_FILES @@ -145,10 +144,8 @@ set(RUNTIME_TEST_FILES runtime/string_value_test.cpp runtime/fragment_mgr_test.cpp runtime/mem_limit_test.cpp - runtime/stream_load_pipe_test.cpp runtime/snapshot_loader_test.cpp runtime/user_function_cache_test.cpp - runtime/kafka_consumer_pipe_test.cpp runtime/routine_load_task_executor_test.cpp runtime/small_file_mgr_test.cpp runtime/heartbeat_flags_test.cpp diff --git a/be/test/http/stream_load_test.cpp b/be/test/http/stream_load_test.cpp deleted file mode 100644 index 1609eedf80..0000000000 --- a/be/test/http/stream_load_test.cpp +++ /dev/null @@ -1,234 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "http/action/stream_load.h" - -#include -#include -#include -#include - -#include "exec/schema_scanner/schema_helper.h" -#include "gen_cpp/HeartbeatService_types.h" -#include "http/http_channel.h" -#include "http/http_request.h" -#include "runtime/exec_env.h" -#include "runtime/stream_load/load_stream_mgr.h" -#include "runtime/stream_load/stream_load_executor.h" -#include "util/brpc_client_cache.h" -#include "util/cpu_info.h" - -struct mg_connection; - -namespace doris { - -std::string k_response_str; - -// Send Unauthorized status with basic challenge -void HttpChannel::send_basic_challenge(HttpRequest* req, const std::string& realm) {} - -void HttpChannel::send_error(HttpRequest* request, HttpStatus status) {} - -void HttpChannel::send_reply(HttpRequest* request, HttpStatus status) {} - -void HttpChannel::send_reply(HttpRequest* request, HttpStatus status, const std::string& content) { - k_response_str = content; -} - -void HttpChannel::send_file(HttpRequest* request, int fd, size_t off, size_t size) {} - -extern TLoadTxnBeginResult k_stream_load_begin_result; -extern TLoadTxnCommitResult k_stream_load_commit_result; -extern TLoadTxnRollbackResult k_stream_load_rollback_result; -extern TStreamLoadPutResult k_stream_load_put_result; -extern Status k_stream_load_plan_status; - -class StreamLoadActionTest : public testing::Test { -public: - StreamLoadActionTest() {} - virtual ~StreamLoadActionTest() {} - void SetUp() override { - k_stream_load_begin_result = TLoadTxnBeginResult(); - k_stream_load_commit_result = TLoadTxnCommitResult(); - k_stream_load_rollback_result = TLoadTxnRollbackResult(); - k_stream_load_put_result = TStreamLoadPutResult(); - k_stream_load_plan_status = Status::OK(); - k_response_str = ""; - config::streaming_load_max_mb = 1; - - _env._master_info = new TMasterInfo(); - _env._load_stream_mgr = new LoadStreamMgr(); - _env._internal_client_cache = new BrpcClientCache(); - _env._function_client_cache = new BrpcClientCache(); - _env._stream_load_executor = new StreamLoadExecutor(&_env); - - _evhttp_req = evhttp_request_new(nullptr, nullptr); - } - void TearDown() override { - delete _env._internal_client_cache; - _env._internal_client_cache = nullptr; - delete _env._function_client_cache; - _env._function_client_cache = nullptr; - delete _env._load_stream_mgr; - _env._load_stream_mgr = nullptr; - delete _env._master_info; - _env._master_info = nullptr; - delete _env._stream_load_executor; - _env._stream_load_executor = nullptr; - - if (_evhttp_req != nullptr) { - evhttp_request_free(_evhttp_req); - } - } - -private: - ExecEnv _env; - evhttp_request* _evhttp_req = nullptr; -}; - -TEST_F(StreamLoadActionTest, no_auth) { - StreamLoadAction action(&_env); - - HttpRequest request(_evhttp_req); - request.set_handler(&action); - action.on_header(&request); - action.handle(&request); - - rapidjson::Document doc; - doc.Parse(k_response_str.c_str()); - EXPECT_STREQ("Fail", doc["Status"].GetString()); -} - -TEST_F(StreamLoadActionTest, normal) { - StreamLoadAction action(&_env); - - HttpRequest request(_evhttp_req); - - struct evhttp_request ev_req; - ev_req.remote_host = nullptr; - request._ev_req = &ev_req; - - request._headers.emplace(HttpHeaders::AUTHORIZATION, "Basic cm9vdDo="); - request._headers.emplace(HttpHeaders::CONTENT_LENGTH, "0"); - request.set_handler(&action); - action.on_header(&request); - action.handle(&request); - - rapidjson::Document doc; - doc.Parse(k_response_str.c_str()); - EXPECT_STREQ("Success", doc["Status"].GetString()); -} - -TEST_F(StreamLoadActionTest, put_fail) { - StreamLoadAction action(&_env); - - HttpRequest request(_evhttp_req); - - struct evhttp_request ev_req; - ev_req.remote_host = nullptr; - request._ev_req = &ev_req; - - request._headers.emplace(HttpHeaders::AUTHORIZATION, "Basic cm9vdDo="); - request._headers.emplace(HttpHeaders::CONTENT_LENGTH, "16"); - Status status = Status::InternalError("TestFail"); - status.to_thrift(&k_stream_load_put_result.status); - request.set_handler(&action); - action.on_header(&request); - action.handle(&request); - - rapidjson::Document doc; - doc.Parse(k_response_str.c_str()); - EXPECT_STREQ("Fail", doc["Status"].GetString()); -} - -TEST_F(StreamLoadActionTest, commit_fail) { - StreamLoadAction action(&_env); - - HttpRequest request(_evhttp_req); - struct evhttp_request ev_req; - ev_req.remote_host = nullptr; - request._ev_req = &ev_req; - request._headers.emplace(HttpHeaders::AUTHORIZATION, "Basic cm9vdDo="); - request._headers.emplace(HttpHeaders::CONTENT_LENGTH, "16"); - Status status = Status::InternalError("TestFail"); - status.to_thrift(&k_stream_load_commit_result.status); - request.set_handler(&action); - action.on_header(&request); - action.handle(&request); - - rapidjson::Document doc; - doc.Parse(k_response_str.c_str()); - EXPECT_STREQ("Fail", doc["Status"].GetString()); -} - -TEST_F(StreamLoadActionTest, begin_fail) { - StreamLoadAction action(&_env); - - HttpRequest request(_evhttp_req); - struct evhttp_request ev_req; - ev_req.remote_host = nullptr; - request._ev_req = &ev_req; - request._headers.emplace(HttpHeaders::AUTHORIZATION, "Basic cm9vdDo="); - request._headers.emplace(HttpHeaders::CONTENT_LENGTH, "16"); - Status status = Status::InternalError("TestFail"); - status.to_thrift(&k_stream_load_begin_result.status); - request.set_handler(&action); - action.on_header(&request); - action.handle(&request); - - rapidjson::Document doc; - doc.Parse(k_response_str.c_str()); - EXPECT_STREQ("Fail", doc["Status"].GetString()); -} - -#if 0 -TEST_F(StreamLoadActionTest, receive_failed) { - StreamLoadAction action(&_env); - - HttpRequest request(_evhttp_req); - request._headers.emplace(HttpHeaders::AUTHORIZATION, "Basic cm9vdDo="); - request._headers.emplace(HttpHeaders::TRANSFER_ENCODING, "chunked"); - request.set_handler(&action); - action.on_header(&request); - action.handle(&request); - - rapidjson::Document doc; - doc.Parse(k_response_str.c_str()); - EXPECT_STREQ("Fail", doc["Status"].GetString()); -} -#endif - -TEST_F(StreamLoadActionTest, plan_fail) { - StreamLoadAction action(&_env); - - HttpRequest request(_evhttp_req); - struct evhttp_request ev_req; - ev_req.remote_host = nullptr; - request._ev_req = &ev_req; - request._headers.emplace(HttpHeaders::AUTHORIZATION, "Basic cm9vdDo="); - request._headers.emplace(HttpHeaders::CONTENT_LENGTH, "16"); - k_stream_load_plan_status = Status::InternalError("TestFail"); - request.set_handler(&action); - action.on_header(&request); - action.handle(&request); - - rapidjson::Document doc; - doc.Parse(k_response_str.c_str()); - EXPECT_STREQ("Fail", doc["Status"].GetString()); -} - -} // namespace doris diff --git a/be/test/runtime/kafka_consumer_pipe_test.cpp b/be/test/runtime/kafka_consumer_pipe_test.cpp deleted file mode 100644 index ee10e42eee..0000000000 --- a/be/test/runtime/kafka_consumer_pipe_test.cpp +++ /dev/null @@ -1,66 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "runtime/routine_load/kafka_consumer_pipe.h" - -#include - -namespace doris { - -class KafkaConsumerPipeTest : public testing::Test { -public: - KafkaConsumerPipeTest() {} - virtual ~KafkaConsumerPipeTest() {} - - void SetUp() override {} - - void TearDown() override {} - -private: -}; - -TEST_F(KafkaConsumerPipeTest, append_read) { - KafkaConsumerPipe k_pipe(1024 * 1024, 64 * 1024); - - std::string msg1 = "i have a dream"; - std::string msg2 = "This is from kafka"; - - Status st; - st = k_pipe.append_with_line_delimiter(msg1.c_str(), msg1.length()); - EXPECT_TRUE(st.ok()); - st = k_pipe.append_with_line_delimiter(msg2.c_str(), msg2.length()); - EXPECT_TRUE(st.ok()); - st = k_pipe.finish(); - EXPECT_TRUE(st.ok()); - - char buf[1024]; - int64_t data_size = 1024; - int64_t read_bytes = 0; - bool eof = false; - st = k_pipe.read((uint8_t*)buf, data_size, &read_bytes, &eof); - EXPECT_TRUE(st.ok()); - EXPECT_EQ(read_bytes, msg1.length() + msg2.length() + 2); - EXPECT_EQ(eof, false); - - data_size = 1024; - st = k_pipe.read((uint8_t*)buf, data_size, &read_bytes, &eof); - EXPECT_TRUE(st.ok()); - EXPECT_EQ(read_bytes, 0); - EXPECT_EQ(eof, true); -} - -} // namespace doris diff --git a/be/test/runtime/stream_load_pipe_test.cpp b/be/test/runtime/stream_load_pipe_test.cpp deleted file mode 100644 index 93c2cd91c2..0000000000 --- a/be/test/runtime/stream_load_pipe_test.cpp +++ /dev/null @@ -1,261 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "runtime/stream_load/stream_load_pipe.h" - -#include - -#include - -namespace doris { - -class StreamLoadPipeTest : public testing::Test { -public: - StreamLoadPipeTest() {} - virtual ~StreamLoadPipeTest() {} - void SetUp() override {} -}; - -TEST_F(StreamLoadPipeTest, append_buffer) { - StreamLoadPipe pipe(66, 64); - - auto appender = [&pipe] { - int k = 0; - for (int i = 0; i < 2; ++i) { - auto byte_buf = ByteBuffer::allocate(64); - char buf[64]; - for (int j = 0; j < 64; ++j) { - buf[j] = '0' + (k++ % 10); - } - byte_buf->put_bytes(buf, 64); - byte_buf->flip(); - pipe.append(byte_buf); - } - pipe.finish(); - }; - std::thread t1(appender); - - char buf[256]; - int64_t buf_len = 256; - int64_t read_bytes = 0; - bool eof = false; - auto st = pipe.read((uint8_t*)buf, buf_len, &read_bytes, &eof); - EXPECT_TRUE(st.ok()); - EXPECT_EQ(128, read_bytes); - EXPECT_FALSE(eof); - for (int i = 0; i < 128; ++i) { - EXPECT_EQ('0' + (i % 10), buf[i]); - } - st = pipe.read((uint8_t*)buf, buf_len, &read_bytes, &eof); - EXPECT_TRUE(st.ok()); - EXPECT_EQ(0, read_bytes); - EXPECT_TRUE(eof); - - t1.join(); -} - -TEST_F(StreamLoadPipeTest, append_bytes) { - StreamLoadPipe pipe(66, 64); - - auto appender = [&pipe] { - for (int i = 0; i < 128; ++i) { - char buf = '0' + (i % 10); - pipe.append(&buf, 1); - } - pipe.finish(); - }; - std::thread t1(appender); - - char buf[256]; - int64_t buf_len = 256; - int64_t read_bytes = 0; - bool eof = false; - auto st = pipe.read((uint8_t*)buf, buf_len, &read_bytes, &eof); - EXPECT_TRUE(st.ok()); - EXPECT_EQ(128, read_bytes); - EXPECT_FALSE(eof); - for (int i = 0; i < 128; ++i) { - EXPECT_EQ('0' + (i % 10), buf[i]); - } - st = pipe.read((uint8_t*)buf, buf_len, &read_bytes, &eof); - EXPECT_TRUE(st.ok()); - EXPECT_EQ(0, read_bytes); - EXPECT_TRUE(eof); - - t1.join(); -} - -TEST_F(StreamLoadPipeTest, append_bytes2) { - StreamLoadPipe pipe(66, 64); - - auto appender = [&pipe] { - for (int i = 0; i < 128; ++i) { - char buf = '0' + (i % 10); - pipe.append(&buf, 1); - } - pipe.finish(); - }; - std::thread t1(appender); - - char buf[128]; - int64_t buf_len = 62; - int64_t read_bytes = 0; - bool eof = false; - auto st = pipe.read((uint8_t*)buf, buf_len, &read_bytes, &eof); - EXPECT_TRUE(st.ok()); - EXPECT_EQ(62, read_bytes); - EXPECT_FALSE(eof); - for (int i = 0; i < 62; ++i) { - EXPECT_EQ('0' + (i % 10), buf[i]); - } - for (int i = 62; i < 128; ++i) { - char ch; - buf_len = 1; - auto st = pipe.read((uint8_t*)&ch, buf_len, &read_bytes, &eof); - EXPECT_TRUE(st.ok()); - EXPECT_EQ(1, read_bytes); - EXPECT_FALSE(eof); - EXPECT_EQ('0' + (i % 10), ch); - } - st = pipe.read((uint8_t*)buf, buf_len, &read_bytes, &eof); - EXPECT_TRUE(st.ok()); - EXPECT_EQ(0, read_bytes); - EXPECT_TRUE(eof); - - t1.join(); -} - -TEST_F(StreamLoadPipeTest, append_mix) { - StreamLoadPipe pipe(66, 64); - - auto appender = [&pipe] { - // 10 - int k = 0; - for (int i = 0; i < 10; ++i) { - char buf = '0' + (k++ % 10); - pipe.append(&buf, 1); - } - // 60 - { - auto byte_buf = ByteBuffer::allocate(60); - char buf[60]; - for (int j = 0; j < 60; ++j) { - buf[j] = '0' + (k++ % 10); - } - byte_buf->put_bytes(buf, 60); - byte_buf->flip(); - pipe.append(byte_buf); - } - // 8 - for (int i = 0; i < 8; ++i) { - char buf = '0' + (k++ % 10); - pipe.append(&buf, 1); - } - // 50 - { - auto byte_buf = ByteBuffer::allocate(50); - char buf[50]; - for (int j = 0; j < 50; ++j) { - buf[j] = '0' + (k++ % 10); - } - byte_buf->put_bytes(buf, 50); - byte_buf->flip(); - pipe.append(byte_buf); - } - pipe.finish(); - }; - std::thread t1(appender); - - char buf[128]; - int64_t buf_len = 128; - int64_t read_bytes = 0; - bool eof = false; - auto st = pipe.read((uint8_t*)buf, buf_len, &read_bytes, &eof); - EXPECT_TRUE(st.ok()); - EXPECT_EQ(128, read_bytes); - EXPECT_FALSE(eof); - for (int i = 0; i < 128; ++i) { - EXPECT_EQ('0' + (i % 10), buf[i]); - } - st = pipe.read((uint8_t*)buf, buf_len, &read_bytes, &eof); - EXPECT_TRUE(st.ok()); - EXPECT_EQ(0, read_bytes); - EXPECT_TRUE(eof); - - t1.join(); -} - -TEST_F(StreamLoadPipeTest, cancel) { - StreamLoadPipe pipe(66, 64); - - auto appender = [&pipe] { - int k = 0; - for (int i = 0; i < 10; ++i) { - char buf = '0' + (k++ % 10); - pipe.append(&buf, 1); - } - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - pipe.cancel("test"); - }; - std::thread t1(appender); - - char buf[128]; - int64_t buf_len = 128; - int64_t read_bytes = 0; - bool eof = false; - auto st = pipe.read((uint8_t*)buf, buf_len, &read_bytes, &eof); - EXPECT_FALSE(st.ok()); - t1.join(); -} - -TEST_F(StreamLoadPipeTest, close) { - StreamLoadPipe pipe(66, 64); - - auto appender = [&pipe] { - int k = 0; - { - auto byte_buf = ByteBuffer::allocate(64); - char buf[64]; - for (int j = 0; j < 64; ++j) { - buf[j] = '0' + (k++ % 10); - } - byte_buf->put_bytes(buf, 64); - byte_buf->flip(); - pipe.append(byte_buf); - } - { - auto byte_buf = ByteBuffer::allocate(64); - char buf[64]; - for (int j = 0; j < 64; ++j) { - buf[j] = '0' + (k++ % 10); - } - byte_buf->put_bytes(buf, 64); - byte_buf->flip(); - auto st = pipe.append(byte_buf); - EXPECT_FALSE(st.ok()); - } - }; - std::thread t1(appender); - - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - - pipe.close(); - - t1.join(); -} - -} // namespace doris diff --git a/be/test/vec/exec/vtablet_sink_test.cpp b/be/test/vec/exec/vtablet_sink_test.cpp index 05fca396dc..1182e1cb8e 100644 --- a/be/test/vec/exec/vtablet_sink_test.cpp +++ b/be/test/vec/exec/vtablet_sink_test.cpp @@ -30,7 +30,6 @@ #include "runtime/exec_env.h" #include "runtime/result_queue_mgr.h" #include "runtime/runtime_state.h" -#include "runtime/stream_load/load_stream_mgr.h" #include "runtime/types.h" #include "service/brpc.h" #include "util/brpc_client_cache.h" @@ -344,7 +343,6 @@ public: k_add_batch_status = Status::OK(); _env = ExecEnv::GetInstance(); _env->_master_info = new TMasterInfo(); - _env->_load_stream_mgr = new LoadStreamMgr(); _env->_internal_client_cache = new BrpcClientCache(); _env->_function_client_cache = new BrpcClientCache(); ThreadPoolBuilder("SendBatchThreadPool") @@ -359,7 +357,6 @@ public: void TearDown() override { SAFE_DELETE(_env->_internal_client_cache); SAFE_DELETE(_env->_function_client_cache); - SAFE_DELETE(_env->_load_stream_mgr); SAFE_DELETE(_env->_master_info); if (_server) { _server->Stop(100);