[feature](load) introduce single-stream-multi-table load (#20006)

For routine load (kafka load), user can produce all data for different
table into single topic and doris will dispatch them into corresponding
table.

Signed-off-by: freemandealer <freeman.zhang1992@gmail.com>
This commit is contained in:
zhengyu
2023-06-07 17:55:25 +08:00
committed by GitHub
parent fbbf4c420e
commit 09344eaab5
25 changed files with 588 additions and 42 deletions

View File

@ -784,6 +784,10 @@ DEFINE_String(kafka_broker_version_fallback, "0.10.0");
// Change this size to 0 to fix it temporarily.
DEFINE_Int32(routine_load_consumer_pool_size, "10");
// Used in single-stream-multi-table load. When receive a batch of messages from kafka,
// if the size of batch is more than this threshold, we will request plans for all related tables.
DEFINE_Int32(multi_table_batch_plan_threshold, "200");
// When the timeout of a load task is less than this threshold,
// Doris treats it as a high priority task.
// high priority tasks use a separate thread pool for flush and do not block rpc by memory cleanup logic.

View File

@ -821,6 +821,10 @@ DECLARE_String(kafka_broker_version_fallback);
// Change this size to 0 to fix it temporarily.
DECLARE_Int32(routine_load_consumer_pool_size);
// Used in single-stream-multi-table load. When receive a batch of messages from kafka,
// if the size of batch is more than this threshold, we will request plans for all related tables.
DECLARE_Int32(multi_table_batch_plan_threshold);
// When the timeout of a load task is less than this threshold,
// Doris treats it as a high priority task.
// high priority tasks use a separate thread pool for flush and do not block rpc by memory cleanup logic.

View File

@ -43,6 +43,7 @@ set(IO_FILES
fs/broker_file_writer.cpp
fs/buffered_reader.cpp
fs/stream_load_pipe.cpp
fs/multi_table_pipe.cpp
fs/err_utils.cpp
fs/fs_utils.cpp
cache/dummy_file_cache.cpp

View File

@ -29,6 +29,7 @@
#include "io/fs/file_reader_options.h"
#include "io/fs/hdfs_file_system.h"
#include "io/fs/local_file_system.h"
#include "io/fs/multi_table_pipe.h"
#include "io/fs/s3_file_system.h"
#include "io/fs/stream_load_pipe.h"
#include "io/hdfs_builder.h"
@ -144,12 +145,24 @@ Status FileFactory::create_file_reader(RuntimeProfile* profile,
}
// file scan node/stream load pipe
Status FileFactory::create_pipe_reader(const TUniqueId& load_id, io::FileReaderSPtr* file_reader) {
Status FileFactory::create_pipe_reader(const TUniqueId& load_id, io::FileReaderSPtr* file_reader,
const TUniqueId& fragment_instance_id) {
auto stream_load_ctx = ExecEnv::GetInstance()->new_load_stream_mgr()->get(load_id);
if (!stream_load_ctx) {
return Status::InternalError("unknown stream load id: {}", UniqueId(load_id).to_string());
}
*file_reader = stream_load_ctx->pipe;
if (file_reader->get() != nullptr) {
auto multi_table_pipe = std::dynamic_pointer_cast<io::MultiTablePipe>(*file_reader);
if (multi_table_pipe != nullptr) {
*file_reader = multi_table_pipe->getPipe(fragment_instance_id);
LOG(INFO) << "create pipe reader for fragment instance: " << fragment_instance_id
<< " pipe: " << (*file_reader).get();
}
}
return Status::OK();
}

View File

@ -76,7 +76,8 @@ public:
io::FileReaderOptions reader_options = NO_CACHE_READER_OPTIONS);
// Create FileReader for stream load pipe
static Status create_pipe_reader(const TUniqueId& load_id, io::FileReaderSPtr* file_reader);
static Status create_pipe_reader(const TUniqueId& load_id, io::FileReaderSPtr* file_reader,
const TUniqueId& fragment_instance_id);
static Status create_hdfs_reader(const THdfsParams& hdfs_params, const std::string& path,
std::shared_ptr<io::FileSystem>* hdfs_file_system,

View File

@ -28,7 +28,7 @@ public:
~KafkaConsumerPipe() override = default;
Status append_with_line_delimiter(const char* data, size_t size) {
virtual Status append_with_line_delimiter(const char* data, size_t size) {
Status st = append(data, size);
if (!st.ok()) {
return st;
@ -39,7 +39,9 @@ public:
return st;
}
Status append_json(const char* data, size_t size) { return append_and_flush(data, size); }
virtual Status append_json(const char* data, size_t size) {
return append_and_flush(data, size);
}
};
} // namespace io
} // end namespace doris

View File

@ -0,0 +1,288 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "multi_table_pipe.h"
#include <gen_cpp/FrontendService.h>
#include <gen_cpp/FrontendService_types.h>
#include <gen_cpp/HeartbeatService_types.h>
#include <gen_cpp/Types_types.h>
#include <thrift/protocol/TDebugProtocol.h>
#include "common/status.h"
#include "runtime/client_cache.h"
#include "runtime/fragment_mgr.h"
#include "runtime/runtime_state.h"
#include "runtime/stream_load/new_load_stream_mgr.h"
#include "util/thrift_rpc_helper.h"
#include "util/thrift_util.h"
#include "util/time.h"
namespace doris {
namespace io {
Status MultiTablePipe::append_with_line_delimiter(const char* data, size_t size) {
const std::string& table = parse_dst_table(data, size);
if (table.empty()) {
return Status::InternalError("table name is empty");
}
size_t prefix_len = table.length() + 1;
AppendFunc cb = &KafkaConsumerPipe::append_with_line_delimiter;
return dispatch(table, data + prefix_len, size - prefix_len, cb);
}
Status MultiTablePipe::append_json(const char* data, size_t size) {
const std::string& table = parse_dst_table(data, size);
if (table.empty()) {
return Status::InternalError("table name is empty");
}
size_t prefix_len = table.length() + 1;
AppendFunc cb = &KafkaConsumerPipe::append_json;
return dispatch(table, data + prefix_len, size - prefix_len, cb);
}
KafkaConsumerPipePtr MultiTablePipe::get_pipe_by_table(const std::string& table) {
auto pipe = _planned_pipes.find(table);
DCHECK(pipe != _planned_pipes.end());
return pipe->second;
}
static std::string_view get_first_part(const char* dat, char delimiter) {
const char* delimiterPos = std::strchr(dat, delimiter);
if (delimiterPos != nullptr) {
std::ptrdiff_t length = delimiterPos - dat;
return std::string_view(dat, length);
} else {
return std::string_view(dat);
}
}
Status MultiTablePipe::finish() {
for (auto& pair : _planned_pipes) {
RETURN_IF_ERROR(pair.second->finish());
}
return Status::OK();
}
void MultiTablePipe::cancel(const std::string& reason) {
for (auto& pair : _planned_pipes) {
pair.second->cancel(reason);
}
}
std::string MultiTablePipe::parse_dst_table(const char* data, size_t size) {
return std::string(get_first_part(data, '|'));
}
Status MultiTablePipe::dispatch(const std::string& table, const char* data, size_t size,
AppendFunc cb) {
if (size == 0 || strlen(data) == 0) {
LOG(WARNING) << "empty data for table: " << table;
return Status::InternalError("empty data");
}
KafkaConsumerPipePtr pipe = nullptr;
auto iter = _planned_pipes.find(table);
if (iter != _planned_pipes.end()) {
pipe = iter->second;
LOG(INFO) << "dispatch for planned pipe: " << pipe.get();
RETURN_NOT_OK_STATUS_WITH_WARN((pipe.get()->*cb)(data, size),
"append failed in planned kafka pipe");
} else {
iter = _unplanned_pipes.find(table);
if (iter == _unplanned_pipes.end()) {
pipe = std::make_shared<io::KafkaConsumerPipe>();
LOG(INFO) << "create new unplanned pipe: " << pipe.get();
_unplanned_pipes.emplace(table, pipe);
} else {
pipe = iter->second;
}
LOG(INFO) << "dispatch for unplanned pipe: " << pipe.get();
RETURN_NOT_OK_STATUS_WITH_WARN((pipe.get()->*cb)(data, size),
"append failed in unplanned kafka pipe");
++_unplanned_row_cnt;
size_t threshold = config::multi_table_batch_plan_threshold;
if (_unplanned_row_cnt >= threshold) {
LOG(INFO) << fmt::format("unplanned row cnt={} reach threshold={}, plan them",
_unplanned_row_cnt, threshold);
Status st = request_and_exec_plans();
_unplanned_row_cnt = 0;
if (!st.ok()) {
return st;
}
}
}
return Status::OK();
}
#ifndef BE_TEST
Status MultiTablePipe::request_and_exec_plans() {
if (_unplanned_pipes.empty()) return Status::OK();
// get list of table names in unplanned pipes
std::vector<std::string> tables;
fmt::memory_buffer log_buffer;
log_buffer.clear();
fmt::format_to(log_buffer, "request plans for {} tables: [ ", _unplanned_pipes.size());
for (auto& pair : _unplanned_pipes) {
tables.push_back(pair.first);
fmt::format_to(log_buffer, "{} ", pair.first);
}
fmt::format_to(log_buffer, "]");
LOG(INFO) << fmt::to_string(log_buffer);
TStreamLoadPutRequest request;
set_request_auth(&request, _ctx->auth);
request.db = _ctx->db;
request.table_names = tables;
request.__isset.table_names = true;
request.txnId = _ctx->txn_id;
request.formatType = _ctx->format;
request.__set_compress_type(_ctx->compress_type);
request.__set_header_type(_ctx->header_type);
request.__set_loadId(_ctx->id.to_thrift());
request.fileType = TFileType::FILE_STREAM;
request.__set_thrift_rpc_timeout_ms(config::thrift_rpc_timeout_ms);
// no need to register new_load_stream_mgr coz it is already done in routineload submit task
// plan this load
ExecEnv* exec_env = doris::ExecEnv::GetInstance();
TNetworkAddress master_addr = exec_env->master_info()->network_address;
int64_t stream_load_put_start_time = MonotonicNanos();
RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
master_addr.hostname, master_addr.port,
[&request, this](FrontendServiceConnection& client) {
client->streamLoadMultiTablePut(_ctx->multi_table_put_result, request);
}));
_ctx->stream_load_put_cost_nanos = MonotonicNanos() - stream_load_put_start_time;
Status plan_status(_ctx->multi_table_put_result.status);
if (!plan_status.ok()) {
LOG(WARNING) << "plan streaming load failed. errmsg=" << plan_status << _ctx->brief();
return plan_status;
}
// put unplanned pipes into planned pipes and clear unplanned pipes
for (auto& pipe : _unplanned_pipes) {
_ctx->table_list.push_back(pipe.first);
_planned_pipes.emplace(pipe.first, pipe.second);
}
LOG(INFO) << fmt::format("{} tables plan complete, planned table cnt={}, returned plan cnt={}",
_unplanned_pipes.size(), _planned_pipes.size(),
_ctx->multi_table_put_result.params.size());
_unplanned_pipes.clear();
for (auto& plan : _ctx->multi_table_put_result.params) {
// TODO: use pipeline in the future (currently is buggy for load)
++_inflight_plan_cnt;
DCHECK_EQ(plan.__isset.table_name, true);
DCHECK(_planned_pipes.find(plan.table_name) != _planned_pipes.end());
putPipe(plan.params.fragment_instance_id, _planned_pipes[plan.table_name]);
LOG(INFO) << "fragment_instance_id=" << plan.params.fragment_instance_id
<< " table=" << plan.table_name;
exec_env->fragment_mgr()->exec_plan_fragment(plan, [this](RuntimeState* state,
Status* status) {
--_inflight_plan_cnt;
_tablet_commit_infos.insert(_tablet_commit_infos.end(),
state->tablet_commit_infos().begin(),
state->tablet_commit_infos().end());
_number_total_rows += state->num_rows_load_total();
_number_loaded_rows += state->num_rows_load_success();
_number_filtered_rows += state->num_rows_load_filtered();
_number_unselected_rows += state->num_rows_load_unselected();
// check filtered ratio for this plan fragment
int64_t num_selected_rows =
state->num_rows_load_total() - state->num_rows_load_unselected();
if (num_selected_rows > 0 &&
(double)state->num_rows_load_filtered() / num_selected_rows >
_ctx->max_filter_ratio) {
*status = Status::InternalError("too many filtered rows");
}
if (_number_filtered_rows > 0 && !state->get_error_log_file_path().empty()) {
_ctx->error_url = to_load_error_http_path(state->get_error_log_file_path());
}
// if any of the plan fragment exec failed, set the status to the first failed plan
if (!status->ok()) {
LOG(WARNING) << "plan fragment exec failed. errmsg=" << *status << _ctx->brief();
_status = *status;
}
if (_inflight_plan_cnt == 0 && is_consume_finished()) {
_ctx->number_total_rows = _number_total_rows;
_ctx->number_loaded_rows = _number_loaded_rows;
_ctx->number_filtered_rows = _number_filtered_rows;
_ctx->number_unselected_rows = _number_unselected_rows;
_ctx->commit_infos = _tablet_commit_infos;
LOG(INFO) << "all plan for multi-table load complete. number_total_rows="
<< _ctx->number_total_rows
<< " number_loaded_rows=" << _ctx->number_loaded_rows
<< " number_filtered_rows=" << _ctx->number_filtered_rows
<< " number_unselected_rows=" << _ctx->number_unselected_rows;
_ctx->promise.set_value(_status); // when all done, finish the routine load task
}
});
}
return Status::OK();
}
#else
Status MultiTablePipe::request_and_exec_plans() {
// put unplanned pipes into planned pipes
for (auto& pipe : _unplanned_pipes) {
_planned_pipes.emplace(pipe.first, pipe.second);
}
LOG(INFO) << fmt::format("{} tables plan complete, planned table cnt={}",
_unplanned_pipes.size(), _planned_pipes.size());
_unplanned_pipes.clear();
return Status::OK();
}
#endif
Status MultiTablePipe::putPipe(const TUniqueId& fragment_instance_id,
std::shared_ptr<io::StreamLoadPipe> pipe) {
std::lock_guard<std::mutex> l(_pipe_map_lock);
auto it = _pipe_map.find(fragment_instance_id);
if (it != std::end(_pipe_map)) {
return Status::InternalError("id already exist");
}
_pipe_map.emplace(fragment_instance_id, pipe);
return Status::OK();
}
std::shared_ptr<io::StreamLoadPipe> MultiTablePipe::getPipe(const TUniqueId& fragment_instance_id) {
std::lock_guard<std::mutex> l(_pipe_map_lock);
auto it = _pipe_map.find(fragment_instance_id);
if (it == std::end(_pipe_map)) {
return std::shared_ptr<io::StreamLoadPipe>(nullptr);
}
return it->second;
}
void MultiTablePipe::removePipe(const TUniqueId& fragment_instance_id) {
std::lock_guard<std::mutex> l(_pipe_map_lock);
auto it = _pipe_map.find(fragment_instance_id);
if (it != std::end(_pipe_map)) {
_pipe_map.erase(it);
VLOG_NOTICE << "remove stream load pipe: " << fragment_instance_id;
}
}
} // namespace io
} // namespace doris

View File

@ -0,0 +1,90 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "io/fs/kafka_consumer_pipe.h"
#include "io/fs/multi_table_pipe.h"
#include "runtime/stream_load/stream_load_context.h"
namespace doris {
namespace io {
class MultiTablePipe;
using AppendFunc = Status (KafkaConsumerPipe::*)(const char* data, size_t size);
using KafkaConsumerPipePtr = std::shared_ptr<io::KafkaConsumerPipe>;
class MultiTablePipe : public KafkaConsumerPipe {
public:
MultiTablePipe(std::shared_ptr<StreamLoadContext> ctx, size_t max_buffered_bytes = 1024 * 1024,
size_t min_chunk_size = 64 * 1024)
: KafkaConsumerPipe(max_buffered_bytes, min_chunk_size), _ctx(ctx) {}
~MultiTablePipe() override = default;
Status append_with_line_delimiter(const char* data, size_t size) override;
Status append_json(const char* data, size_t size) override;
// for pipe consumers, i.e. scanners, to get underlying KafkaConsumerPipes
KafkaConsumerPipePtr get_pipe_by_table(const std::string& table);
// request and execute plans for unplanned pipes
Status request_and_exec_plans();
void set_consume_finished() { _consume_finished.store(true, std::memory_order_release); }
bool is_consume_finished() { return _consume_finished.load(std::memory_order_acquire); }
Status finish() override;
void cancel(const std::string& reason) override;
// register <instance id, pipe> pair
Status putPipe(const TUniqueId& fragment_instance_id, std::shared_ptr<io::StreamLoadPipe> pipe);
std::shared_ptr<io::StreamLoadPipe> getPipe(const TUniqueId& fragment_instance_id);
void removePipe(const TUniqueId& fragment_instance_id);
private:
// parse table name from data
std::string parse_dst_table(const char* data, size_t size);
// [thread-unsafe] dispatch data to corresponding KafkaConsumerPipe
Status dispatch(const std::string& table, const char* data, size_t size, AppendFunc cb);
private:
std::unordered_map<std::string /*table*/, KafkaConsumerPipePtr> _planned_pipes;
std::unordered_map<std::string /*table*/, KafkaConsumerPipePtr> _unplanned_pipes;
std::atomic<uint64_t> _unplanned_row_cnt {0}; // trigger plan request when exceed threshold
std::atomic<uint64_t> _inflight_plan_cnt {0}; // how many plan fragment are executing?
std::atomic<bool> _consume_finished {false};
std::shared_ptr<StreamLoadContext> _ctx;
Status _status; // save the first error status of all executing plan fragment
#ifndef BE_TEST
std::vector<TTabletCommitInfo> _tablet_commit_infos; // collect from each plan fragment
int64_t _number_total_rows = 0;
int64_t _number_loaded_rows = 0;
int64_t _number_filtered_rows = 0;
int64_t _number_unselected_rows = 0;
#endif
std::mutex _pipe_map_lock;
std::unordered_map<TUniqueId /*instance id*/, std::shared_ptr<io::StreamLoadPipe>> _pipe_map;
};
} // namespace io
} // end namespace doris

View File

@ -67,15 +67,17 @@ public:
bool closed() const override { return _cancelled; }
// called when producer finished
Status finish() override;
virtual Status finish() override;
// called when producer/consumer failed
void cancel(const std::string& reason) override;
virtual void cancel(const std::string& reason) override;
Status read_one_message(std::unique_ptr<uint8_t[]>* data, size_t* length);
FileSystemSPtr fs() const override { return nullptr; }
size_t get_queue_size() { return _buf_queue.size(); }
protected:
Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
const IOContext* io_ctx) override;

View File

@ -774,6 +774,7 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params,
auto iter = _fragment_map.find(fragment_instance_id);
if (iter != _fragment_map.end()) {
// Duplicated
LOG(WARNING) << "duplicate fragment instance id: " << print_id(fragment_instance_id);
return Status::OK();
}
}

View File

@ -333,7 +333,6 @@ Status PlanFragmentExecutor::open_vectorized_internal() {
}
}
}
{
_collect_query_statistics();
Status status;
@ -347,7 +346,6 @@ Status PlanFragmentExecutor::open_vectorized_internal() {
// Setting to NULL ensures that the d'tor won't double-close the sink.
_sink.reset(nullptr);
_done = true;
return Status::OK();
}

View File

@ -40,6 +40,7 @@
#include "common/status.h"
#include "common/utils.h"
#include "io/fs/kafka_consumer_pipe.h"
#include "io/fs/multi_table_pipe.h"
#include "io/fs/stream_load_pipe.h"
#include "runtime/exec_env.h"
#include "runtime/message_body_sink.h"
@ -207,8 +208,11 @@ Status RoutineLoadTaskExecutor::submit_task(const TRoutineLoadTask& task) {
if (task.__isset.max_batch_size) {
ctx->max_batch_size = task.max_batch_size;
}
if (task.__isset.is_multi_table && task.is_multi_table) {
ctx->is_multi_table = true;
}
// set execute plan params
// set execute plan params (only for non-single-stream-multi-table load)
TStreamLoadPutResult put_result;
TStatus tstatus;
tstatus.status_code = TStatusCode::OK;
@ -288,7 +292,12 @@ void RoutineLoadTaskExecutor::exec_task(std::shared_ptr<StreamLoadContext> ctx,
std::shared_ptr<io::StreamLoadPipe> pipe;
switch (ctx->load_src_type) {
case TLoadSourceType::KAFKA: {
pipe = std::make_shared<io::KafkaConsumerPipe>();
if (ctx->is_multi_table) {
LOG(INFO) << "recv single-stream-multi-table request, ctx=" << ctx->brief();
pipe = std::make_shared<io::MultiTablePipe>(ctx);
} else {
pipe = std::make_shared<io::KafkaConsumerPipe>();
}
Status st = std::static_pointer_cast<KafkaDataConsumerGroup>(consumer_grp)
->assign_topic_partitions(ctx);
if (!st.ok()) {
@ -312,18 +321,29 @@ void RoutineLoadTaskExecutor::exec_task(std::shared_ptr<StreamLoadContext> ctx,
// must put pipe before executing plan fragment
HANDLE_ERROR(_exec_env->new_load_stream_mgr()->put(ctx->id, ctx), "failed to add pipe");
if (!ctx->is_multi_table) {
// only for normal load, single-stream-multi-table load will be planned during consuming
#ifndef BE_TEST
// execute plan fragment, async
HANDLE_ERROR(_exec_env->stream_load_executor()->execute_plan_fragment(ctx),
"failed to execute plan fragment");
// execute plan fragment, async
HANDLE_ERROR(_exec_env->stream_load_executor()->execute_plan_fragment(ctx),
"failed to execute plan fragment");
#else
// only for test
HANDLE_ERROR(_execute_plan_for_test(ctx), "test failed");
// only for test
HANDLE_ERROR(_execute_plan_for_test(ctx), "test failed");
#endif
}
// start to consume, this may block a while
HANDLE_ERROR(consumer_grp->start_all(ctx), "consuming failed");
if (ctx->is_multi_table) {
// plan the rest of unplanned data
auto multi_table_pipe = std::static_pointer_cast<io::MultiTablePipe>(ctx->body_sink);
multi_table_pipe->request_and_exec_plans();
// need memory order
multi_table_pipe->set_consume_finished();
}
// wait for all consumers finished
HANDLE_ERROR(ctx->future.get(), "consume failed");
@ -335,7 +355,6 @@ void RoutineLoadTaskExecutor::exec_task(std::shared_ptr<StreamLoadContext> ctx,
// commit txn
HANDLE_ERROR(_exec_env->stream_load_executor()->commit_txn(ctx.get()), "commit failed");
// commit kafka offset
switch (ctx->load_src_type) {
case TLoadSourceType::KAFKA: {

View File

@ -170,6 +170,7 @@ public:
std::shared_ptr<io::StreamLoadPipe> pipe;
TStreamLoadPutResult put_result;
TStreamLoadMultiTablePutResult multi_table_put_result;
std::vector<TTabletCommitInfo> commit_infos;
@ -210,6 +211,12 @@ public:
// csv with header type
std::string header_type = "";
// is this load single-stream-multi-table?
bool is_multi_table = false;
// for single-stream-multi-table, we have table list
std::vector<std::string> table_list;
public:
ExecEnv* exec_env() { return _exec_env; }

View File

@ -44,6 +44,7 @@
#include "runtime/runtime_state.h"
#include "runtime/stream_load/new_load_stream_mgr.h"
#include "runtime/stream_load/stream_load_context.h"
#include "thrift/protocol/TDebugProtocol.h"
#include "util/doris_metrics.h"
#include "util/thrift_rpc_helper.h"
#include "util/time.h"
@ -359,6 +360,10 @@ void StreamLoadExecutor::get_commit_request(StreamLoadContext* ctx,
request.commitInfos = std::move(ctx->commit_infos);
request.__isset.commitInfos = true;
request.__set_thrift_rpc_timeout_ms(config::txn_commit_rpc_timeout_ms);
request.tbls = ctx->table_list;
request.__isset.tbls = true;
VLOG_DEBUG << "commit txn request:" << apache::thrift::ThriftDebugString(request);
// set attachment if has
TTxnCommitAttachment attachment;
@ -417,6 +422,8 @@ void StreamLoadExecutor::rollback_txn(StreamLoadContext* ctx) {
request.tbl = ctx->table;
request.txnId = ctx->txn_id;
request.__set_reason(ctx->status.to_string());
request.tbls = ctx->table_list;
request.__isset.tbls = true;
// set attachment if has
TTxnCommitAttachment attachment;

View File

@ -166,7 +166,8 @@ Status CsvReader::init_reader(bool is_load) {
_file_description.start_offset = start_offset;
if (_params.file_type == TFileType::FILE_STREAM) {
RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, &_file_reader));
RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, &_file_reader,
_state->fragment_instance_id()));
} else {
io::FileReaderOptions reader_options = FileFactory::get_reader_options(_state);
reader_options.modification_time =

View File

@ -373,7 +373,8 @@ Status NewJsonReader::_open_file_reader() {
_file_description.start_offset = start_offset;
if (_params.file_type == TFileType::FILE_STREAM) {
RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, &_file_reader));
RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, &_file_reader,
_state->fragment_instance_id()));
} else {
io::FileReaderOptions reader_options = FileFactory::get_reader_options(_state);
reader_options.modification_time =

View File

@ -72,6 +72,7 @@ set(IO_TEST_FILES
io/fs/local_file_system_test.cpp
io/fs/remote_file_system_test.cpp
io/fs/buffered_reader_test.cpp
io/fs/multi_table_pipe_test.cpp
)
set(OLAP_TEST_FILES
olap/engine_storage_migration_task_test.cpp

View File

@ -0,0 +1,72 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "io/fs/multi_table_pipe.h"
#include <gtest/gtest-message.h>
#include <gtest/gtest-test-part.h>
#include <memory>
#include "gtest/gtest_pred_impl.h"
#include "runtime/stream_load/stream_load_context.h"
namespace doris {
using namespace doris::io;
using namespace testing;
class MultiTablePipeTest : public testing::Test {
public:
MultiTablePipeTest() {}
protected:
virtual void SetUp() {}
virtual void TearDown() {}
};
TEST_F(MultiTablePipeTest, append_json) {
config::multi_table_batch_plan_threshold = 3;
std::string data1 = "test_table_1|data1";
std::string data2 = "test_table_2|data2";
std::string data3 = "test_table_1|data3";
std::string data4 = "test_table_1|data4";
std::string data5 = "test_table_3|data5";
std::string data6 = "test_table_2|data6";
std::string data7 = "test_table_3|data7";
std::string data8 = "test_table_3|data8";
auto exec_env = doris::ExecEnv::GetInstance();
std::shared_ptr<StreamLoadContext> ctx = std::make_shared<StreamLoadContext>(exec_env);
MultiTablePipe pipe(ctx);
pipe.append_json(data1.c_str(), data1.size());
pipe.append_json(data2.c_str(), data2.size());
pipe.append_json(data3.c_str(), data3.size()); // should trigger 1st plan, for table 1&2
EXPECT_EQ(pipe.get_pipe_by_table("test_table_1")->get_queue_size(), 2);
EXPECT_EQ(pipe.get_pipe_by_table("test_table_2")->get_queue_size(), 1);
pipe.append_json(data4.c_str(), data4.size());
pipe.append_json(data5.c_str(), data5.size());
pipe.append_json(data6.c_str(), data6.size());
pipe.append_json(data7.c_str(), data7.size());
pipe.append_json(data8.c_str(), data8.size()); // should trigger 2nd plan, for table 3
EXPECT_EQ(pipe.get_pipe_by_table("test_table_1")->get_queue_size(), 3);
EXPECT_EQ(pipe.get_pipe_by_table("test_table_2")->get_queue_size(), 2);
EXPECT_EQ(pipe.get_pipe_by_table("test_table_3")->get_queue_size(), 3);
}
} // end namespace doris

View File

@ -718,6 +718,12 @@ Metrics: {"filtered_rows":0,"input_row_num":3346807,"input_rowsets_count":42,"in
* Description: The number of caches for the data consumer used by the routine load.
* Default value: 10
#### `multi_table_batch_plan_threshold`
* Type: int32
* Description: For single-stream-multi-table load. When receive a batch of messages from kafka, if the size of batch is more than this threshold, we will request plans for all related tables.
* Default value: 200
#### `single_replica_load_download_num_workers`
* Type: int32

View File

@ -732,6 +732,12 @@ Metrics: {"filtered_rows":0,"input_row_num":3346807,"input_rowsets_count":42,"in
* 描述:routine load 所使用的 data consumer 的缓存数量。
* 默认值:10
#### `multi_table_batch_plan_threshold`
* 类型:int32
* 描述:一流多表使用该配置,表示攒多少条数据再进行规划。过小的值会导致规划频繁,多大的值会增加内存压力和导入延迟。
* 默认值:200
#### `single_replica_load_download_num_workers`
* 类型: int32
* 描述: 单副本数据导入功能中,Slave副本通过HTTP从Master副本下载数据文件的工作线程数。导入并发增大时,可以适当调大该参数来保证Slave副本及时同步Master副本数据。必要时也应相应地调大`webserver_num_workers`来提高IO效率。

View File

@ -144,6 +144,7 @@ import org.apache.doris.thrift.TShowVariableResult;
import org.apache.doris.thrift.TSnapshotLoaderReportRequest;
import org.apache.doris.thrift.TStatus;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TStreamLoadMultiTablePutResult;
import org.apache.doris.thrift.TStreamLoadPutRequest;
import org.apache.doris.thrift.TStreamLoadPutResult;
import org.apache.doris.thrift.TTableIndexQueryStats;
@ -1587,6 +1588,13 @@ public class FrontendServiceImpl implements FrontendService.Iface {
return result;
}
@Override
public TStreamLoadMultiTablePutResult streamLoadMultiTablePut(TStreamLoadPutRequest request) {
// placeholder
TStreamLoadMultiTablePutResult result = new TStreamLoadMultiTablePutResult();
return result;
}
private TExecPlanFragmentParams streamLoadPutImpl(TStreamLoadPutRequest request) throws UserException {
String cluster = request.getCluster();
if (Strings.isNullOrEmpty(cluster)) {

View File

@ -35,7 +35,7 @@ struct TTabletStat {
2: optional i64 data_size
3: optional i64 row_num
4: optional i64 version_count
5: optional i64 remote_data_size
5: optional i64 remote_data_size
}
struct TTabletStatResult {
@ -66,6 +66,7 @@ struct TRoutineLoadTask {
13: optional PaloInternalService.TExecPlanFragmentParams params
14: optional PlanNodes.TFileFormatType format
15: optional PaloInternalService.TPipelineFragmentParams pipeline_params
16: optional bool is_multi_table
}
struct TKafkaMetaProxyRequest {
@ -168,9 +169,9 @@ service BackendService {
Status.TStatus erase_export_task(1:Types.TUniqueId task_id);
TTabletStatResult get_tablet_stat();
i64 get_trash_used_capacity();
list<TDiskTrashInfo> get_disk_trash_used_capacity();
Status.TStatus submit_routine_load_task(1:list<TRoutineLoadTask> tasks);

View File

@ -61,7 +61,7 @@ struct TColumnDef {
2: optional string comment
}
// Arguments to DescribeTable, which returns a list of column descriptors for a
// Arguments to DescribeTable, which returns a list of column descriptors for a
// given table
struct TDescribeTableParams {
1: optional string db
@ -316,14 +316,14 @@ struct TGetDbsResult {
2: optional list<string> catalogs
}
// Arguments to getTableNames, which returns a list of tables that match an
// Arguments to getTableNames, which returns a list of tables that match an
// optional pattern.
struct TGetTablesParams {
// If not set, match tables in all DBs
1: optional string db
1: optional string db
// If not set, match every table
2: optional string pattern
2: optional string pattern
3: optional string user // deprecated
4: optional string user_ip // deprecated
5: optional Types.TUserIdentity current_user_ident // to replace the user and user ip
@ -378,7 +378,7 @@ enum FrontendServiceVersion {
V1
}
// The results of an INSERT query, sent to the coordinator as part of
// The results of an INSERT query, sent to the coordinator as part of
// TReportExecStatusParams
struct TReportExecStatusParams {
1: required FrontendServiceVersion protocol_version
@ -404,9 +404,9 @@ struct TReportExecStatusParams {
// cumulative profile
// required in V1
7: optional RuntimeProfile.TRuntimeProfileTree profile
// New errors that have not been reported to the coordinator
// optional in V1
// optional in V1
9: optional list<string> error_log
// URL of files need to load
@ -416,7 +416,7 @@ struct TReportExecStatusParams {
12: optional string tracking_url
// export files
13: optional list<string> export_files
13: optional list<string> export_files
14: optional list<Types.TTabletCommitInfo> commitInfos
@ -445,7 +445,7 @@ struct TFeResult {
struct TMasterOpRequest {
1: required string user
2: required string db
3: required string sql
3: required string sql
// Deprecated
4: optional Types.TResourceInfo resourceInfo
5: optional string cluster
@ -603,6 +603,7 @@ struct TStreamLoadPutRequest {
43: optional i32 skip_lines // csv skip line num, only used when csv header_type is not set.
44: optional bool enable_profile
45: optional bool partial_update
46: optional list<string> table_names
}
struct TStreamLoadPutResult {
@ -612,6 +613,12 @@ struct TStreamLoadPutResult {
3: optional PaloInternalService.TPipelineFragmentParams pipeline_params
}
struct TStreamLoadMultiTablePutResult {
1: required Status.TStatus status
// valid when status is OK
2: optional list<PaloInternalService.TExecPlanFragmentParams> params
}
struct TKafkaRLTaskProgress {
1: required map<i32,i64> partitionCmtOffset
}
@ -633,7 +640,7 @@ struct TRLTaskTxnCommitAttachment {
struct TTxnCommitAttachment {
1: required Types.TLoadType loadType
2: optional TRLTaskTxnCommitAttachment rlTaskTxnCommitAttachment
// 3: optional TMiniLoadTxnCommitAttachment mlTxnCommitAttachment
// 3: optional TMiniLoadTxnCommitAttachment mlTxnCommitAttachment
}
struct TLoadTxnCommitRequest {
@ -651,6 +658,7 @@ struct TLoadTxnCommitRequest {
12: optional i64 thrift_rpc_timeout_ms
13: optional string token
14: optional i64 db_id
15: optional list<string> tbls
}
struct TLoadTxnCommitResult {
@ -724,6 +732,7 @@ struct TLoadTxnRollbackRequest {
10: optional TTxnCommitAttachment txnCommitAttachment
11: optional string token
12: optional i64 db_id
13: optional list<string> tbls
}
struct TLoadTxnRollbackResult {
@ -815,7 +824,7 @@ struct TAddColumnsRequest {
2: optional list<TColumnDef> addColumns
3: optional string table_name
4: optional string db_name
5: optional bool allow_type_conflict
5: optional bool allow_type_conflict
}
// Only support base table add columns
@ -1011,6 +1020,8 @@ service FrontendService {
TStreamLoadPutResult streamLoadPut(1: TStreamLoadPutRequest request)
TStreamLoadMultiTablePutResult streamLoadMultiTablePut(1: TStreamLoadPutRequest request)
Status.TStatus snapshotLoaderReport(1: TSnapshotLoaderReportRequest request)
TFrontendPingFrontendResult ping(1: TFrontendPingFrontendRequest request)
@ -1028,6 +1039,6 @@ service FrontendService {
TCheckAuthResult checkAuth(1: TCheckAuthRequest request)
TQueryStatsResult getQueryStats(1: TGetQueryStatsRequest request)
TGetTabletReplicaInfosResult getTabletReplicaInfos(1: TGetTabletReplicaInfosRequest request)
}

View File

@ -176,7 +176,7 @@ struct TQueryOptions {
51: optional bool enable_new_shuffle_hash_method
52: optional i32 be_exec_version = 0
53: optional i32 partitioned_hash_join_rows_threshold = 0
54: optional bool enable_share_hash_table_for_broadcast_join
@ -196,7 +196,7 @@ struct TQueryOptions {
60: optional i32 partitioned_hash_agg_rows_threshold = 0
61: optional bool enable_file_cache = false
62: optional i32 insert_timeout = 14400
63: optional i32 execution_timeout = 3600
@ -226,7 +226,7 @@ struct TQueryOptions {
75: optional bool enable_insert_strict = false;
}
// A scan range plus the parameters needed to execute that scan.
struct TScanRangeParams {
@ -307,7 +307,7 @@ struct TQueryGlobals {
1: required string now_string
// To support timezone in Doris. timestamp_ms is the millisecond uinix timestamp for
// this query to calculate time zone relative function
// this query to calculate time zone relative function
2: optional i64 timestamp_ms
// time_zone is the timezone this query used.
@ -419,6 +419,7 @@ struct TExecPlanFragmentParams {
21: optional bool build_hash_table_for_broadcast_join = false;
22: optional list<Types.TUniqueId> instances_sharing_hash_table;
23: optional string table_name;
}
struct TExecPlanFragmentParamsList {

View File

@ -225,7 +225,7 @@ enum TStmtType {
QUERY,
DDL, // Data definition, e.g. CREATE TABLE (includes read-only functions e.g. SHOW)
DML, // Data modification e.g. INSERT
EXPLAIN // EXPLAIN
EXPLAIN // EXPLAIN
}
// level of verboseness for "explain" output
@ -451,7 +451,7 @@ struct TJavaUdfExecutorCtorParams {
9: optional i64 output_intermediate_state_ptr
10: optional i64 batch_size_ptr
// this is used to pass place or places to FE, which could help us call jni
// only once and can process a batch size data in JAVA-Udaf
11: optional i64 input_places_ptr
@ -576,7 +576,7 @@ enum TLoadJobState {
LOADING,
FINISHED,
CANCELLED
}
}
enum TEtlState {
RUNNING,
@ -653,7 +653,7 @@ struct TTabletCommitInfo {
2: required i64 backendId
// Every load job should check if the global dict is valid, if the global dict
// is invalid then should sent the invalid column names to FE
3: optional list<string> invalid_dict_cols
3: optional list<string> invalid_dict_cols
}
struct TErrorTabletInfo {
@ -670,6 +670,7 @@ enum TLoadType {
enum TLoadSourceType {
RAW,
KAFKA,
MULTI_TABLE,
}
enum TMergeType {
@ -680,7 +681,7 @@ enum TMergeType {
enum TSortType {
LEXICAL,
ZORDER,
ZORDER,
}
enum TMetadataType {