diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 0b2d5bf614..fb316ad7c2 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -784,6 +784,10 @@ DEFINE_String(kafka_broker_version_fallback, "0.10.0"); // Change this size to 0 to fix it temporarily. DEFINE_Int32(routine_load_consumer_pool_size, "10"); +// Used in single-stream-multi-table load. When receive a batch of messages from kafka, +// if the size of batch is more than this threshold, we will request plans for all related tables. +DEFINE_Int32(multi_table_batch_plan_threshold, "200"); + // When the timeout of a load task is less than this threshold, // Doris treats it as a high priority task. // high priority tasks use a separate thread pool for flush and do not block rpc by memory cleanup logic. diff --git a/be/src/common/config.h b/be/src/common/config.h index 895783ebfc..ff6003b43e 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -821,6 +821,10 @@ DECLARE_String(kafka_broker_version_fallback); // Change this size to 0 to fix it temporarily. DECLARE_Int32(routine_load_consumer_pool_size); +// Used in single-stream-multi-table load. When receive a batch of messages from kafka, +// if the size of batch is more than this threshold, we will request plans for all related tables. +DECLARE_Int32(multi_table_batch_plan_threshold); + // When the timeout of a load task is less than this threshold, // Doris treats it as a high priority task. // high priority tasks use a separate thread pool for flush and do not block rpc by memory cleanup logic. diff --git a/be/src/io/CMakeLists.txt b/be/src/io/CMakeLists.txt index 45f66901c4..5efcf3cdab 100644 --- a/be/src/io/CMakeLists.txt +++ b/be/src/io/CMakeLists.txt @@ -43,6 +43,7 @@ set(IO_FILES fs/broker_file_writer.cpp fs/buffered_reader.cpp fs/stream_load_pipe.cpp + fs/multi_table_pipe.cpp fs/err_utils.cpp fs/fs_utils.cpp cache/dummy_file_cache.cpp diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp index 65bc429d12..fc91c4c9dd 100644 --- a/be/src/io/file_factory.cpp +++ b/be/src/io/file_factory.cpp @@ -29,6 +29,7 @@ #include "io/fs/file_reader_options.h" #include "io/fs/hdfs_file_system.h" #include "io/fs/local_file_system.h" +#include "io/fs/multi_table_pipe.h" #include "io/fs/s3_file_system.h" #include "io/fs/stream_load_pipe.h" #include "io/hdfs_builder.h" @@ -144,12 +145,24 @@ Status FileFactory::create_file_reader(RuntimeProfile* profile, } // file scan node/stream load pipe -Status FileFactory::create_pipe_reader(const TUniqueId& load_id, io::FileReaderSPtr* file_reader) { +Status FileFactory::create_pipe_reader(const TUniqueId& load_id, io::FileReaderSPtr* file_reader, + const TUniqueId& fragment_instance_id) { auto stream_load_ctx = ExecEnv::GetInstance()->new_load_stream_mgr()->get(load_id); if (!stream_load_ctx) { return Status::InternalError("unknown stream load id: {}", UniqueId(load_id).to_string()); } + *file_reader = stream_load_ctx->pipe; + + if (file_reader->get() != nullptr) { + auto multi_table_pipe = std::dynamic_pointer_cast(*file_reader); + if (multi_table_pipe != nullptr) { + *file_reader = multi_table_pipe->getPipe(fragment_instance_id); + LOG(INFO) << "create pipe reader for fragment instance: " << fragment_instance_id + << " pipe: " << (*file_reader).get(); + } + } + return Status::OK(); } diff --git a/be/src/io/file_factory.h b/be/src/io/file_factory.h index 2e034b1ab8..5f7360c372 100644 --- a/be/src/io/file_factory.h +++ b/be/src/io/file_factory.h @@ -76,7 +76,8 @@ public: io::FileReaderOptions reader_options = NO_CACHE_READER_OPTIONS); // Create FileReader for stream load pipe - static Status create_pipe_reader(const TUniqueId& load_id, io::FileReaderSPtr* file_reader); + static Status create_pipe_reader(const TUniqueId& load_id, io::FileReaderSPtr* file_reader, + const TUniqueId& fragment_instance_id); static Status create_hdfs_reader(const THdfsParams& hdfs_params, const std::string& path, std::shared_ptr* hdfs_file_system, diff --git a/be/src/io/fs/kafka_consumer_pipe.h b/be/src/io/fs/kafka_consumer_pipe.h index 6aab83c3b2..d13004a46b 100644 --- a/be/src/io/fs/kafka_consumer_pipe.h +++ b/be/src/io/fs/kafka_consumer_pipe.h @@ -28,7 +28,7 @@ public: ~KafkaConsumerPipe() override = default; - Status append_with_line_delimiter(const char* data, size_t size) { + virtual Status append_with_line_delimiter(const char* data, size_t size) { Status st = append(data, size); if (!st.ok()) { return st; @@ -39,7 +39,9 @@ public: return st; } - Status append_json(const char* data, size_t size) { return append_and_flush(data, size); } + virtual Status append_json(const char* data, size_t size) { + return append_and_flush(data, size); + } }; } // namespace io } // end namespace doris diff --git a/be/src/io/fs/multi_table_pipe.cpp b/be/src/io/fs/multi_table_pipe.cpp new file mode 100644 index 0000000000..773188118d --- /dev/null +++ b/be/src/io/fs/multi_table_pipe.cpp @@ -0,0 +1,288 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "multi_table_pipe.h" + +#include +#include +#include +#include +#include + +#include "common/status.h" +#include "runtime/client_cache.h" +#include "runtime/fragment_mgr.h" +#include "runtime/runtime_state.h" +#include "runtime/stream_load/new_load_stream_mgr.h" +#include "util/thrift_rpc_helper.h" +#include "util/thrift_util.h" +#include "util/time.h" + +namespace doris { +namespace io { + +Status MultiTablePipe::append_with_line_delimiter(const char* data, size_t size) { + const std::string& table = parse_dst_table(data, size); + if (table.empty()) { + return Status::InternalError("table name is empty"); + } + size_t prefix_len = table.length() + 1; + AppendFunc cb = &KafkaConsumerPipe::append_with_line_delimiter; + return dispatch(table, data + prefix_len, size - prefix_len, cb); +} + +Status MultiTablePipe::append_json(const char* data, size_t size) { + const std::string& table = parse_dst_table(data, size); + if (table.empty()) { + return Status::InternalError("table name is empty"); + } + size_t prefix_len = table.length() + 1; + AppendFunc cb = &KafkaConsumerPipe::append_json; + return dispatch(table, data + prefix_len, size - prefix_len, cb); +} + +KafkaConsumerPipePtr MultiTablePipe::get_pipe_by_table(const std::string& table) { + auto pipe = _planned_pipes.find(table); + DCHECK(pipe != _planned_pipes.end()); + return pipe->second; +} + +static std::string_view get_first_part(const char* dat, char delimiter) { + const char* delimiterPos = std::strchr(dat, delimiter); + + if (delimiterPos != nullptr) { + std::ptrdiff_t length = delimiterPos - dat; + return std::string_view(dat, length); + } else { + return std::string_view(dat); + } +} + +Status MultiTablePipe::finish() { + for (auto& pair : _planned_pipes) { + RETURN_IF_ERROR(pair.second->finish()); + } + return Status::OK(); +} + +void MultiTablePipe::cancel(const std::string& reason) { + for (auto& pair : _planned_pipes) { + pair.second->cancel(reason); + } +} + +std::string MultiTablePipe::parse_dst_table(const char* data, size_t size) { + return std::string(get_first_part(data, '|')); +} + +Status MultiTablePipe::dispatch(const std::string& table, const char* data, size_t size, + AppendFunc cb) { + if (size == 0 || strlen(data) == 0) { + LOG(WARNING) << "empty data for table: " << table; + return Status::InternalError("empty data"); + } + KafkaConsumerPipePtr pipe = nullptr; + auto iter = _planned_pipes.find(table); + if (iter != _planned_pipes.end()) { + pipe = iter->second; + LOG(INFO) << "dispatch for planned pipe: " << pipe.get(); + RETURN_NOT_OK_STATUS_WITH_WARN((pipe.get()->*cb)(data, size), + "append failed in planned kafka pipe"); + } else { + iter = _unplanned_pipes.find(table); + if (iter == _unplanned_pipes.end()) { + pipe = std::make_shared(); + LOG(INFO) << "create new unplanned pipe: " << pipe.get(); + _unplanned_pipes.emplace(table, pipe); + } else { + pipe = iter->second; + } + LOG(INFO) << "dispatch for unplanned pipe: " << pipe.get(); + RETURN_NOT_OK_STATUS_WITH_WARN((pipe.get()->*cb)(data, size), + "append failed in unplanned kafka pipe"); + + ++_unplanned_row_cnt; + size_t threshold = config::multi_table_batch_plan_threshold; + if (_unplanned_row_cnt >= threshold) { + LOG(INFO) << fmt::format("unplanned row cnt={} reach threshold={}, plan them", + _unplanned_row_cnt, threshold); + Status st = request_and_exec_plans(); + _unplanned_row_cnt = 0; + if (!st.ok()) { + return st; + } + } + } + return Status::OK(); +} + +#ifndef BE_TEST +Status MultiTablePipe::request_and_exec_plans() { + if (_unplanned_pipes.empty()) return Status::OK(); + + // get list of table names in unplanned pipes + std::vector tables; + fmt::memory_buffer log_buffer; + log_buffer.clear(); + fmt::format_to(log_buffer, "request plans for {} tables: [ ", _unplanned_pipes.size()); + for (auto& pair : _unplanned_pipes) { + tables.push_back(pair.first); + fmt::format_to(log_buffer, "{} ", pair.first); + } + fmt::format_to(log_buffer, "]"); + LOG(INFO) << fmt::to_string(log_buffer); + + TStreamLoadPutRequest request; + set_request_auth(&request, _ctx->auth); + request.db = _ctx->db; + request.table_names = tables; + request.__isset.table_names = true; + request.txnId = _ctx->txn_id; + request.formatType = _ctx->format; + request.__set_compress_type(_ctx->compress_type); + request.__set_header_type(_ctx->header_type); + request.__set_loadId(_ctx->id.to_thrift()); + request.fileType = TFileType::FILE_STREAM; + request.__set_thrift_rpc_timeout_ms(config::thrift_rpc_timeout_ms); + // no need to register new_load_stream_mgr coz it is already done in routineload submit task + + // plan this load + ExecEnv* exec_env = doris::ExecEnv::GetInstance(); + TNetworkAddress master_addr = exec_env->master_info()->network_address; + int64_t stream_load_put_start_time = MonotonicNanos(); + RETURN_IF_ERROR(ThriftRpcHelper::rpc( + master_addr.hostname, master_addr.port, + [&request, this](FrontendServiceConnection& client) { + client->streamLoadMultiTablePut(_ctx->multi_table_put_result, request); + })); + _ctx->stream_load_put_cost_nanos = MonotonicNanos() - stream_load_put_start_time; + + Status plan_status(_ctx->multi_table_put_result.status); + if (!plan_status.ok()) { + LOG(WARNING) << "plan streaming load failed. errmsg=" << plan_status << _ctx->brief(); + return plan_status; + } + + // put unplanned pipes into planned pipes and clear unplanned pipes + for (auto& pipe : _unplanned_pipes) { + _ctx->table_list.push_back(pipe.first); + _planned_pipes.emplace(pipe.first, pipe.second); + } + LOG(INFO) << fmt::format("{} tables plan complete, planned table cnt={}, returned plan cnt={}", + _unplanned_pipes.size(), _planned_pipes.size(), + _ctx->multi_table_put_result.params.size()); + _unplanned_pipes.clear(); + + for (auto& plan : _ctx->multi_table_put_result.params) { + // TODO: use pipeline in the future (currently is buggy for load) + ++_inflight_plan_cnt; + DCHECK_EQ(plan.__isset.table_name, true); + DCHECK(_planned_pipes.find(plan.table_name) != _planned_pipes.end()); + putPipe(plan.params.fragment_instance_id, _planned_pipes[plan.table_name]); + LOG(INFO) << "fragment_instance_id=" << plan.params.fragment_instance_id + << " table=" << plan.table_name; + exec_env->fragment_mgr()->exec_plan_fragment(plan, [this](RuntimeState* state, + Status* status) { + --_inflight_plan_cnt; + _tablet_commit_infos.insert(_tablet_commit_infos.end(), + state->tablet_commit_infos().begin(), + state->tablet_commit_infos().end()); + _number_total_rows += state->num_rows_load_total(); + _number_loaded_rows += state->num_rows_load_success(); + _number_filtered_rows += state->num_rows_load_filtered(); + _number_unselected_rows += state->num_rows_load_unselected(); + + // check filtered ratio for this plan fragment + int64_t num_selected_rows = + state->num_rows_load_total() - state->num_rows_load_unselected(); + if (num_selected_rows > 0 && + (double)state->num_rows_load_filtered() / num_selected_rows > + _ctx->max_filter_ratio) { + *status = Status::InternalError("too many filtered rows"); + } + if (_number_filtered_rows > 0 && !state->get_error_log_file_path().empty()) { + _ctx->error_url = to_load_error_http_path(state->get_error_log_file_path()); + } + + // if any of the plan fragment exec failed, set the status to the first failed plan + if (!status->ok()) { + LOG(WARNING) << "plan fragment exec failed. errmsg=" << *status << _ctx->brief(); + _status = *status; + } + + if (_inflight_plan_cnt == 0 && is_consume_finished()) { + _ctx->number_total_rows = _number_total_rows; + _ctx->number_loaded_rows = _number_loaded_rows; + _ctx->number_filtered_rows = _number_filtered_rows; + _ctx->number_unselected_rows = _number_unselected_rows; + _ctx->commit_infos = _tablet_commit_infos; + LOG(INFO) << "all plan for multi-table load complete. number_total_rows=" + << _ctx->number_total_rows + << " number_loaded_rows=" << _ctx->number_loaded_rows + << " number_filtered_rows=" << _ctx->number_filtered_rows + << " number_unselected_rows=" << _ctx->number_unselected_rows; + _ctx->promise.set_value(_status); // when all done, finish the routine load task + } + }); + } + + return Status::OK(); +} +#else +Status MultiTablePipe::request_and_exec_plans() { + // put unplanned pipes into planned pipes + for (auto& pipe : _unplanned_pipes) { + _planned_pipes.emplace(pipe.first, pipe.second); + } + LOG(INFO) << fmt::format("{} tables plan complete, planned table cnt={}", + _unplanned_pipes.size(), _planned_pipes.size()); + _unplanned_pipes.clear(); + return Status::OK(); +} +#endif + +Status MultiTablePipe::putPipe(const TUniqueId& fragment_instance_id, + std::shared_ptr pipe) { + std::lock_guard l(_pipe_map_lock); + auto it = _pipe_map.find(fragment_instance_id); + if (it != std::end(_pipe_map)) { + return Status::InternalError("id already exist"); + } + _pipe_map.emplace(fragment_instance_id, pipe); + return Status::OK(); +} + +std::shared_ptr MultiTablePipe::getPipe(const TUniqueId& fragment_instance_id) { + std::lock_guard l(_pipe_map_lock); + auto it = _pipe_map.find(fragment_instance_id); + if (it == std::end(_pipe_map)) { + return std::shared_ptr(nullptr); + } + return it->second; +} + +void MultiTablePipe::removePipe(const TUniqueId& fragment_instance_id) { + std::lock_guard l(_pipe_map_lock); + auto it = _pipe_map.find(fragment_instance_id); + if (it != std::end(_pipe_map)) { + _pipe_map.erase(it); + VLOG_NOTICE << "remove stream load pipe: " << fragment_instance_id; + } +} + +} // namespace io +} // namespace doris \ No newline at end of file diff --git a/be/src/io/fs/multi_table_pipe.h b/be/src/io/fs/multi_table_pipe.h new file mode 100644 index 0000000000..cdf8db052a --- /dev/null +++ b/be/src/io/fs/multi_table_pipe.h @@ -0,0 +1,90 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "io/fs/kafka_consumer_pipe.h" +#include "io/fs/multi_table_pipe.h" +#include "runtime/stream_load/stream_load_context.h" + +namespace doris { +namespace io { + +class MultiTablePipe; +using AppendFunc = Status (KafkaConsumerPipe::*)(const char* data, size_t size); +using KafkaConsumerPipePtr = std::shared_ptr; + +class MultiTablePipe : public KafkaConsumerPipe { +public: + MultiTablePipe(std::shared_ptr ctx, size_t max_buffered_bytes = 1024 * 1024, + size_t min_chunk_size = 64 * 1024) + : KafkaConsumerPipe(max_buffered_bytes, min_chunk_size), _ctx(ctx) {} + + ~MultiTablePipe() override = default; + + Status append_with_line_delimiter(const char* data, size_t size) override; + + Status append_json(const char* data, size_t size) override; + + // for pipe consumers, i.e. scanners, to get underlying KafkaConsumerPipes + KafkaConsumerPipePtr get_pipe_by_table(const std::string& table); + + // request and execute plans for unplanned pipes + Status request_and_exec_plans(); + + void set_consume_finished() { _consume_finished.store(true, std::memory_order_release); } + + bool is_consume_finished() { return _consume_finished.load(std::memory_order_acquire); } + + Status finish() override; + + void cancel(const std::string& reason) override; + + // register pair + Status putPipe(const TUniqueId& fragment_instance_id, std::shared_ptr pipe); + + std::shared_ptr getPipe(const TUniqueId& fragment_instance_id); + + void removePipe(const TUniqueId& fragment_instance_id); + +private: + // parse table name from data + std::string parse_dst_table(const char* data, size_t size); + + // [thread-unsafe] dispatch data to corresponding KafkaConsumerPipe + Status dispatch(const std::string& table, const char* data, size_t size, AppendFunc cb); + +private: + std::unordered_map _planned_pipes; + std::unordered_map _unplanned_pipes; + std::atomic _unplanned_row_cnt {0}; // trigger plan request when exceed threshold + std::atomic _inflight_plan_cnt {0}; // how many plan fragment are executing? + std::atomic _consume_finished {false}; + std::shared_ptr _ctx; + Status _status; // save the first error status of all executing plan fragment +#ifndef BE_TEST + std::vector _tablet_commit_infos; // collect from each plan fragment + int64_t _number_total_rows = 0; + int64_t _number_loaded_rows = 0; + int64_t _number_filtered_rows = 0; + int64_t _number_unselected_rows = 0; +#endif + std::mutex _pipe_map_lock; + std::unordered_map> _pipe_map; +}; +} // namespace io +} // end namespace doris diff --git a/be/src/io/fs/stream_load_pipe.h b/be/src/io/fs/stream_load_pipe.h index 9222e12c73..848175ce9a 100644 --- a/be/src/io/fs/stream_load_pipe.h +++ b/be/src/io/fs/stream_load_pipe.h @@ -67,15 +67,17 @@ public: bool closed() const override { return _cancelled; } // called when producer finished - Status finish() override; + virtual Status finish() override; // called when producer/consumer failed - void cancel(const std::string& reason) override; + virtual void cancel(const std::string& reason) override; Status read_one_message(std::unique_ptr* data, size_t* length); FileSystemSPtr fs() const override { return nullptr; } + size_t get_queue_size() { return _buf_queue.size(); } + protected: Status read_at_impl(size_t offset, Slice result, size_t* bytes_read, const IOContext* io_ctx) override; diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 543b10f32a..f61eae31b9 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -774,6 +774,7 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, auto iter = _fragment_map.find(fragment_instance_id); if (iter != _fragment_map.end()) { // Duplicated + LOG(WARNING) << "duplicate fragment instance id: " << print_id(fragment_instance_id); return Status::OK(); } } diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index e70633ea8a..fa6f9ec499 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -333,7 +333,6 @@ Status PlanFragmentExecutor::open_vectorized_internal() { } } } - { _collect_query_statistics(); Status status; @@ -347,7 +346,6 @@ Status PlanFragmentExecutor::open_vectorized_internal() { // Setting to NULL ensures that the d'tor won't double-close the sink. _sink.reset(nullptr); _done = true; - return Status::OK(); } diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp b/be/src/runtime/routine_load/routine_load_task_executor.cpp index 7837c240e0..72285930b4 100644 --- a/be/src/runtime/routine_load/routine_load_task_executor.cpp +++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp @@ -40,6 +40,7 @@ #include "common/status.h" #include "common/utils.h" #include "io/fs/kafka_consumer_pipe.h" +#include "io/fs/multi_table_pipe.h" #include "io/fs/stream_load_pipe.h" #include "runtime/exec_env.h" #include "runtime/message_body_sink.h" @@ -207,8 +208,11 @@ Status RoutineLoadTaskExecutor::submit_task(const TRoutineLoadTask& task) { if (task.__isset.max_batch_size) { ctx->max_batch_size = task.max_batch_size; } + if (task.__isset.is_multi_table && task.is_multi_table) { + ctx->is_multi_table = true; + } - // set execute plan params + // set execute plan params (only for non-single-stream-multi-table load) TStreamLoadPutResult put_result; TStatus tstatus; tstatus.status_code = TStatusCode::OK; @@ -288,7 +292,12 @@ void RoutineLoadTaskExecutor::exec_task(std::shared_ptr ctx, std::shared_ptr pipe; switch (ctx->load_src_type) { case TLoadSourceType::KAFKA: { - pipe = std::make_shared(); + if (ctx->is_multi_table) { + LOG(INFO) << "recv single-stream-multi-table request, ctx=" << ctx->brief(); + pipe = std::make_shared(ctx); + } else { + pipe = std::make_shared(); + } Status st = std::static_pointer_cast(consumer_grp) ->assign_topic_partitions(ctx); if (!st.ok()) { @@ -312,18 +321,29 @@ void RoutineLoadTaskExecutor::exec_task(std::shared_ptr ctx, // must put pipe before executing plan fragment HANDLE_ERROR(_exec_env->new_load_stream_mgr()->put(ctx->id, ctx), "failed to add pipe"); + if (!ctx->is_multi_table) { + // only for normal load, single-stream-multi-table load will be planned during consuming #ifndef BE_TEST - // execute plan fragment, async - HANDLE_ERROR(_exec_env->stream_load_executor()->execute_plan_fragment(ctx), - "failed to execute plan fragment"); + // execute plan fragment, async + HANDLE_ERROR(_exec_env->stream_load_executor()->execute_plan_fragment(ctx), + "failed to execute plan fragment"); #else - // only for test - HANDLE_ERROR(_execute_plan_for_test(ctx), "test failed"); + // only for test + HANDLE_ERROR(_execute_plan_for_test(ctx), "test failed"); #endif + } // start to consume, this may block a while HANDLE_ERROR(consumer_grp->start_all(ctx), "consuming failed"); + if (ctx->is_multi_table) { + // plan the rest of unplanned data + auto multi_table_pipe = std::static_pointer_cast(ctx->body_sink); + multi_table_pipe->request_and_exec_plans(); + // need memory order + multi_table_pipe->set_consume_finished(); + } + // wait for all consumers finished HANDLE_ERROR(ctx->future.get(), "consume failed"); @@ -335,7 +355,6 @@ void RoutineLoadTaskExecutor::exec_task(std::shared_ptr ctx, // commit txn HANDLE_ERROR(_exec_env->stream_load_executor()->commit_txn(ctx.get()), "commit failed"); - // commit kafka offset switch (ctx->load_src_type) { case TLoadSourceType::KAFKA: { diff --git a/be/src/runtime/stream_load/stream_load_context.h b/be/src/runtime/stream_load/stream_load_context.h index 7a94c67012..0e004b12f5 100644 --- a/be/src/runtime/stream_load/stream_load_context.h +++ b/be/src/runtime/stream_load/stream_load_context.h @@ -170,6 +170,7 @@ public: std::shared_ptr pipe; TStreamLoadPutResult put_result; + TStreamLoadMultiTablePutResult multi_table_put_result; std::vector commit_infos; @@ -210,6 +211,12 @@ public: // csv with header type std::string header_type = ""; + // is this load single-stream-multi-table? + bool is_multi_table = false; + + // for single-stream-multi-table, we have table list + std::vector table_list; + public: ExecEnv* exec_env() { return _exec_env; } diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp b/be/src/runtime/stream_load/stream_load_executor.cpp index 05c68941b9..80228ec6e9 100644 --- a/be/src/runtime/stream_load/stream_load_executor.cpp +++ b/be/src/runtime/stream_load/stream_load_executor.cpp @@ -44,6 +44,7 @@ #include "runtime/runtime_state.h" #include "runtime/stream_load/new_load_stream_mgr.h" #include "runtime/stream_load/stream_load_context.h" +#include "thrift/protocol/TDebugProtocol.h" #include "util/doris_metrics.h" #include "util/thrift_rpc_helper.h" #include "util/time.h" @@ -359,6 +360,10 @@ void StreamLoadExecutor::get_commit_request(StreamLoadContext* ctx, request.commitInfos = std::move(ctx->commit_infos); request.__isset.commitInfos = true; request.__set_thrift_rpc_timeout_ms(config::txn_commit_rpc_timeout_ms); + request.tbls = ctx->table_list; + request.__isset.tbls = true; + + VLOG_DEBUG << "commit txn request:" << apache::thrift::ThriftDebugString(request); // set attachment if has TTxnCommitAttachment attachment; @@ -417,6 +422,8 @@ void StreamLoadExecutor::rollback_txn(StreamLoadContext* ctx) { request.tbl = ctx->table; request.txnId = ctx->txn_id; request.__set_reason(ctx->status.to_string()); + request.tbls = ctx->table_list; + request.__isset.tbls = true; // set attachment if has TTxnCommitAttachment attachment; diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp b/be/src/vec/exec/format/csv/csv_reader.cpp index 09de2c6479..995825888f 100644 --- a/be/src/vec/exec/format/csv/csv_reader.cpp +++ b/be/src/vec/exec/format/csv/csv_reader.cpp @@ -166,7 +166,8 @@ Status CsvReader::init_reader(bool is_load) { _file_description.start_offset = start_offset; if (_params.file_type == TFileType::FILE_STREAM) { - RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, &_file_reader)); + RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, &_file_reader, + _state->fragment_instance_id())); } else { io::FileReaderOptions reader_options = FileFactory::get_reader_options(_state); reader_options.modification_time = diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp b/be/src/vec/exec/format/json/new_json_reader.cpp index 5925515200..9da0aefdf0 100644 --- a/be/src/vec/exec/format/json/new_json_reader.cpp +++ b/be/src/vec/exec/format/json/new_json_reader.cpp @@ -373,7 +373,8 @@ Status NewJsonReader::_open_file_reader() { _file_description.start_offset = start_offset; if (_params.file_type == TFileType::FILE_STREAM) { - RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, &_file_reader)); + RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, &_file_reader, + _state->fragment_instance_id())); } else { io::FileReaderOptions reader_options = FileFactory::get_reader_options(_state); reader_options.modification_time = diff --git a/be/test/CMakeLists.txt b/be/test/CMakeLists.txt index e98961f03a..ad048d4656 100644 --- a/be/test/CMakeLists.txt +++ b/be/test/CMakeLists.txt @@ -72,6 +72,7 @@ set(IO_TEST_FILES io/fs/local_file_system_test.cpp io/fs/remote_file_system_test.cpp io/fs/buffered_reader_test.cpp + io/fs/multi_table_pipe_test.cpp ) set(OLAP_TEST_FILES olap/engine_storage_migration_task_test.cpp diff --git a/be/test/io/fs/multi_table_pipe_test.cpp b/be/test/io/fs/multi_table_pipe_test.cpp new file mode 100644 index 0000000000..ddc894d45f --- /dev/null +++ b/be/test/io/fs/multi_table_pipe_test.cpp @@ -0,0 +1,72 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "io/fs/multi_table_pipe.h" + +#include +#include + +#include + +#include "gtest/gtest_pred_impl.h" +#include "runtime/stream_load/stream_load_context.h" + +namespace doris { +using namespace doris::io; +using namespace testing; +class MultiTablePipeTest : public testing::Test { +public: + MultiTablePipeTest() {} + +protected: + virtual void SetUp() {} + + virtual void TearDown() {} +}; + +TEST_F(MultiTablePipeTest, append_json) { + config::multi_table_batch_plan_threshold = 3; + + std::string data1 = "test_table_1|data1"; + std::string data2 = "test_table_2|data2"; + std::string data3 = "test_table_1|data3"; + std::string data4 = "test_table_1|data4"; + std::string data5 = "test_table_3|data5"; + std::string data6 = "test_table_2|data6"; + std::string data7 = "test_table_3|data7"; + std::string data8 = "test_table_3|data8"; + + auto exec_env = doris::ExecEnv::GetInstance(); + std::shared_ptr ctx = std::make_shared(exec_env); + MultiTablePipe pipe(ctx); + + pipe.append_json(data1.c_str(), data1.size()); + pipe.append_json(data2.c_str(), data2.size()); + pipe.append_json(data3.c_str(), data3.size()); // should trigger 1st plan, for table 1&2 + EXPECT_EQ(pipe.get_pipe_by_table("test_table_1")->get_queue_size(), 2); + EXPECT_EQ(pipe.get_pipe_by_table("test_table_2")->get_queue_size(), 1); + pipe.append_json(data4.c_str(), data4.size()); + pipe.append_json(data5.c_str(), data5.size()); + pipe.append_json(data6.c_str(), data6.size()); + pipe.append_json(data7.c_str(), data7.size()); + pipe.append_json(data8.c_str(), data8.size()); // should trigger 2nd plan, for table 3 + EXPECT_EQ(pipe.get_pipe_by_table("test_table_1")->get_queue_size(), 3); + EXPECT_EQ(pipe.get_pipe_by_table("test_table_2")->get_queue_size(), 2); + EXPECT_EQ(pipe.get_pipe_by_table("test_table_3")->get_queue_size(), 3); +} + +} // end namespace doris diff --git a/docs/en/docs/admin-manual/config/be-config.md b/docs/en/docs/admin-manual/config/be-config.md index 787cbdc06f..af658ca537 100644 --- a/docs/en/docs/admin-manual/config/be-config.md +++ b/docs/en/docs/admin-manual/config/be-config.md @@ -718,6 +718,12 @@ Metrics: {"filtered_rows":0,"input_row_num":3346807,"input_rowsets_count":42,"in * Description: The number of caches for the data consumer used by the routine load. * Default value: 10 +#### `multi_table_batch_plan_threshold` + +* Type: int32 +* Description: For single-stream-multi-table load. When receive a batch of messages from kafka, if the size of batch is more than this threshold, we will request plans for all related tables. +* Default value: 200 + #### `single_replica_load_download_num_workers` * Type: int32 diff --git a/docs/zh-CN/docs/admin-manual/config/be-config.md b/docs/zh-CN/docs/admin-manual/config/be-config.md index 2c97ac1960..2f6f1622c0 100644 --- a/docs/zh-CN/docs/admin-manual/config/be-config.md +++ b/docs/zh-CN/docs/admin-manual/config/be-config.md @@ -732,6 +732,12 @@ Metrics: {"filtered_rows":0,"input_row_num":3346807,"input_rowsets_count":42,"in * 描述:routine load 所使用的 data consumer 的缓存数量。 * 默认值:10 +#### `multi_table_batch_plan_threshold` + +* 类型:int32 +* 描述:一流多表使用该配置,表示攒多少条数据再进行规划。过小的值会导致规划频繁,多大的值会增加内存压力和导入延迟。 +* 默认值:200 + #### `single_replica_load_download_num_workers` * 类型: int32 * 描述: 单副本数据导入功能中,Slave副本通过HTTP从Master副本下载数据文件的工作线程数。导入并发增大时,可以适当调大该参数来保证Slave副本及时同步Master副本数据。必要时也应相应地调大`webserver_num_workers`来提高IO效率。 diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index b650100823..32b5fec8eb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -144,6 +144,7 @@ import org.apache.doris.thrift.TShowVariableResult; import org.apache.doris.thrift.TSnapshotLoaderReportRequest; import org.apache.doris.thrift.TStatus; import org.apache.doris.thrift.TStatusCode; +import org.apache.doris.thrift.TStreamLoadMultiTablePutResult; import org.apache.doris.thrift.TStreamLoadPutRequest; import org.apache.doris.thrift.TStreamLoadPutResult; import org.apache.doris.thrift.TTableIndexQueryStats; @@ -1587,6 +1588,13 @@ public class FrontendServiceImpl implements FrontendService.Iface { return result; } + @Override + public TStreamLoadMultiTablePutResult streamLoadMultiTablePut(TStreamLoadPutRequest request) { + // placeholder + TStreamLoadMultiTablePutResult result = new TStreamLoadMultiTablePutResult(); + return result; + } + private TExecPlanFragmentParams streamLoadPutImpl(TStreamLoadPutRequest request) throws UserException { String cluster = request.getCluster(); if (Strings.isNullOrEmpty(cluster)) { diff --git a/gensrc/thrift/BackendService.thrift b/gensrc/thrift/BackendService.thrift index 62f7909304..3d77eab4ca 100644 --- a/gensrc/thrift/BackendService.thrift +++ b/gensrc/thrift/BackendService.thrift @@ -35,7 +35,7 @@ struct TTabletStat { 2: optional i64 data_size 3: optional i64 row_num 4: optional i64 version_count - 5: optional i64 remote_data_size + 5: optional i64 remote_data_size } struct TTabletStatResult { @@ -66,6 +66,7 @@ struct TRoutineLoadTask { 13: optional PaloInternalService.TExecPlanFragmentParams params 14: optional PlanNodes.TFileFormatType format 15: optional PaloInternalService.TPipelineFragmentParams pipeline_params + 16: optional bool is_multi_table } struct TKafkaMetaProxyRequest { @@ -168,9 +169,9 @@ service BackendService { Status.TStatus erase_export_task(1:Types.TUniqueId task_id); TTabletStatResult get_tablet_stat(); - + i64 get_trash_used_capacity(); - + list get_disk_trash_used_capacity(); Status.TStatus submit_routine_load_task(1:list tasks); diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 5aecdc4ba2..852d11493e 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -61,7 +61,7 @@ struct TColumnDef { 2: optional string comment } -// Arguments to DescribeTable, which returns a list of column descriptors for a +// Arguments to DescribeTable, which returns a list of column descriptors for a // given table struct TDescribeTableParams { 1: optional string db @@ -316,14 +316,14 @@ struct TGetDbsResult { 2: optional list catalogs } -// Arguments to getTableNames, which returns a list of tables that match an +// Arguments to getTableNames, which returns a list of tables that match an // optional pattern. struct TGetTablesParams { // If not set, match tables in all DBs - 1: optional string db + 1: optional string db // If not set, match every table - 2: optional string pattern + 2: optional string pattern 3: optional string user // deprecated 4: optional string user_ip // deprecated 5: optional Types.TUserIdentity current_user_ident // to replace the user and user ip @@ -378,7 +378,7 @@ enum FrontendServiceVersion { V1 } -// The results of an INSERT query, sent to the coordinator as part of +// The results of an INSERT query, sent to the coordinator as part of // TReportExecStatusParams struct TReportExecStatusParams { 1: required FrontendServiceVersion protocol_version @@ -404,9 +404,9 @@ struct TReportExecStatusParams { // cumulative profile // required in V1 7: optional RuntimeProfile.TRuntimeProfileTree profile - + // New errors that have not been reported to the coordinator - // optional in V1 + // optional in V1 9: optional list error_log // URL of files need to load @@ -416,7 +416,7 @@ struct TReportExecStatusParams { 12: optional string tracking_url // export files - 13: optional list export_files + 13: optional list export_files 14: optional list commitInfos @@ -445,7 +445,7 @@ struct TFeResult { struct TMasterOpRequest { 1: required string user 2: required string db - 3: required string sql + 3: required string sql // Deprecated 4: optional Types.TResourceInfo resourceInfo 5: optional string cluster @@ -603,6 +603,7 @@ struct TStreamLoadPutRequest { 43: optional i32 skip_lines // csv skip line num, only used when csv header_type is not set. 44: optional bool enable_profile 45: optional bool partial_update + 46: optional list table_names } struct TStreamLoadPutResult { @@ -612,6 +613,12 @@ struct TStreamLoadPutResult { 3: optional PaloInternalService.TPipelineFragmentParams pipeline_params } +struct TStreamLoadMultiTablePutResult { + 1: required Status.TStatus status + // valid when status is OK + 2: optional list params +} + struct TKafkaRLTaskProgress { 1: required map partitionCmtOffset } @@ -633,7 +640,7 @@ struct TRLTaskTxnCommitAttachment { struct TTxnCommitAttachment { 1: required Types.TLoadType loadType 2: optional TRLTaskTxnCommitAttachment rlTaskTxnCommitAttachment -// 3: optional TMiniLoadTxnCommitAttachment mlTxnCommitAttachment +// 3: optional TMiniLoadTxnCommitAttachment mlTxnCommitAttachment } struct TLoadTxnCommitRequest { @@ -651,6 +658,7 @@ struct TLoadTxnCommitRequest { 12: optional i64 thrift_rpc_timeout_ms 13: optional string token 14: optional i64 db_id + 15: optional list tbls } struct TLoadTxnCommitResult { @@ -724,6 +732,7 @@ struct TLoadTxnRollbackRequest { 10: optional TTxnCommitAttachment txnCommitAttachment 11: optional string token 12: optional i64 db_id + 13: optional list tbls } struct TLoadTxnRollbackResult { @@ -815,7 +824,7 @@ struct TAddColumnsRequest { 2: optional list addColumns 3: optional string table_name 4: optional string db_name - 5: optional bool allow_type_conflict + 5: optional bool allow_type_conflict } // Only support base table add columns @@ -1011,6 +1020,8 @@ service FrontendService { TStreamLoadPutResult streamLoadPut(1: TStreamLoadPutRequest request) + TStreamLoadMultiTablePutResult streamLoadMultiTablePut(1: TStreamLoadPutRequest request) + Status.TStatus snapshotLoaderReport(1: TSnapshotLoaderReportRequest request) TFrontendPingFrontendResult ping(1: TFrontendPingFrontendRequest request) @@ -1028,6 +1039,6 @@ service FrontendService { TCheckAuthResult checkAuth(1: TCheckAuthRequest request) TQueryStatsResult getQueryStats(1: TGetQueryStatsRequest request) - + TGetTabletReplicaInfosResult getTabletReplicaInfos(1: TGetTabletReplicaInfosRequest request) } diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index ca14a548b0..e556ee46f1 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -176,7 +176,7 @@ struct TQueryOptions { 51: optional bool enable_new_shuffle_hash_method 52: optional i32 be_exec_version = 0 - + 53: optional i32 partitioned_hash_join_rows_threshold = 0 54: optional bool enable_share_hash_table_for_broadcast_join @@ -196,7 +196,7 @@ struct TQueryOptions { 60: optional i32 partitioned_hash_agg_rows_threshold = 0 61: optional bool enable_file_cache = false - + 62: optional i32 insert_timeout = 14400 63: optional i32 execution_timeout = 3600 @@ -226,7 +226,7 @@ struct TQueryOptions { 75: optional bool enable_insert_strict = false; } - + // A scan range plus the parameters needed to execute that scan. struct TScanRangeParams { @@ -307,7 +307,7 @@ struct TQueryGlobals { 1: required string now_string // To support timezone in Doris. timestamp_ms is the millisecond uinix timestamp for - // this query to calculate time zone relative function + // this query to calculate time zone relative function 2: optional i64 timestamp_ms // time_zone is the timezone this query used. @@ -419,6 +419,7 @@ struct TExecPlanFragmentParams { 21: optional bool build_hash_table_for_broadcast_join = false; 22: optional list instances_sharing_hash_table; + 23: optional string table_name; } struct TExecPlanFragmentParamsList { diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift index 5806483cba..91074669e7 100644 --- a/gensrc/thrift/Types.thrift +++ b/gensrc/thrift/Types.thrift @@ -225,7 +225,7 @@ enum TStmtType { QUERY, DDL, // Data definition, e.g. CREATE TABLE (includes read-only functions e.g. SHOW) DML, // Data modification e.g. INSERT - EXPLAIN // EXPLAIN + EXPLAIN // EXPLAIN } // level of verboseness for "explain" output @@ -451,7 +451,7 @@ struct TJavaUdfExecutorCtorParams { 9: optional i64 output_intermediate_state_ptr 10: optional i64 batch_size_ptr - + // this is used to pass place or places to FE, which could help us call jni // only once and can process a batch size data in JAVA-Udaf 11: optional i64 input_places_ptr @@ -576,7 +576,7 @@ enum TLoadJobState { LOADING, FINISHED, CANCELLED -} +} enum TEtlState { RUNNING, @@ -653,7 +653,7 @@ struct TTabletCommitInfo { 2: required i64 backendId // Every load job should check if the global dict is valid, if the global dict // is invalid then should sent the invalid column names to FE - 3: optional list invalid_dict_cols + 3: optional list invalid_dict_cols } struct TErrorTabletInfo { @@ -670,6 +670,7 @@ enum TLoadType { enum TLoadSourceType { RAW, KAFKA, + MULTI_TABLE, } enum TMergeType { @@ -680,7 +681,7 @@ enum TMergeType { enum TSortType { LEXICAL, - ZORDER, + ZORDER, } enum TMetadataType {