[branch-2.1](move-memtable) fix move memtable core when use multi table load (#37370)
## Proposed changes pick https://github.com/apache/doris/pull/35458
This commit is contained in:
@ -174,26 +174,6 @@ Status FileFactory::create_pipe_reader(const TUniqueId& load_id, io::FileReaderS
|
||||
*file_reader = stream_load_ctx->pipe;
|
||||
}
|
||||
|
||||
if (file_reader->get() == nullptr) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
auto multi_table_pipe = std::dynamic_pointer_cast<io::MultiTablePipe>(*file_reader);
|
||||
if (multi_table_pipe == nullptr || runtime_state == nullptr) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
TUniqueId pipe_id;
|
||||
if (runtime_state->enable_pipeline_exec()) {
|
||||
pipe_id = io::StreamLoadPipe::calculate_pipe_id(runtime_state->query_id(),
|
||||
runtime_state->fragment_id());
|
||||
} else {
|
||||
pipe_id = runtime_state->fragment_instance_id();
|
||||
}
|
||||
*file_reader = multi_table_pipe->get_pipe(pipe_id);
|
||||
LOG(INFO) << "create pipe reader for fragment instance: " << pipe_id
|
||||
<< " pipe: " << (*file_reader).get();
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
|
||||
@ -61,9 +61,9 @@ Status MultiTablePipe::append_json(const char* data, size_t size) {
|
||||
}
|
||||
|
||||
KafkaConsumerPipePtr MultiTablePipe::get_pipe_by_table(const std::string& table) {
|
||||
auto pipe = _planned_pipes.find(table);
|
||||
DCHECK(pipe != _planned_pipes.end());
|
||||
return pipe->second;
|
||||
auto pair = _planned_tables.find(table);
|
||||
DCHECK(pair != _planned_tables.end());
|
||||
return std::static_pointer_cast<io::KafkaConsumerPipe>(pair->second->pipe);
|
||||
}
|
||||
|
||||
static std::string_view get_first_part(const char* dat, char delimiter) {
|
||||
@ -78,15 +78,15 @@ static std::string_view get_first_part(const char* dat, char delimiter) {
|
||||
}
|
||||
|
||||
Status MultiTablePipe::finish() {
|
||||
for (auto& pair : _planned_pipes) {
|
||||
RETURN_IF_ERROR(pair.second->finish());
|
||||
for (auto& pair : _planned_tables) {
|
||||
RETURN_IF_ERROR(pair.second->pipe->finish());
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
void MultiTablePipe::cancel(const std::string& reason) {
|
||||
for (auto& pair : _planned_pipes) {
|
||||
pair.second->cancel(reason);
|
||||
for (auto& pair : _planned_tables) {
|
||||
pair.second->pipe->cancel(reason);
|
||||
}
|
||||
}
|
||||
|
||||
@ -101,19 +101,29 @@ Status MultiTablePipe::dispatch(const std::string& table, const char* data, size
|
||||
return Status::InternalError("empty data");
|
||||
}
|
||||
KafkaConsumerPipePtr pipe = nullptr;
|
||||
auto iter = _planned_pipes.find(table);
|
||||
if (iter != _planned_pipes.end()) {
|
||||
pipe = iter->second;
|
||||
auto iter = _planned_tables.find(table);
|
||||
if (iter != _planned_tables.end()) {
|
||||
pipe = std::static_pointer_cast<io::KafkaConsumerPipe>(iter->second->pipe);
|
||||
RETURN_NOT_OK_STATUS_WITH_WARN((pipe.get()->*cb)(data, size),
|
||||
"append failed in planned kafka pipe");
|
||||
} else {
|
||||
iter = _unplanned_pipes.find(table);
|
||||
if (iter == _unplanned_pipes.end()) {
|
||||
iter = _unplanned_tables.find(table);
|
||||
if (iter == _unplanned_tables.end()) {
|
||||
std::shared_ptr<StreamLoadContext> ctx =
|
||||
std::make_shared<StreamLoadContext>(doris::ExecEnv::GetInstance());
|
||||
ctx->id = UniqueId::gen_uid();
|
||||
pipe = std::make_shared<io::KafkaConsumerPipe>();
|
||||
LOG(INFO) << "create new unplanned pipe: " << pipe.get() << ", ctx: " << _ctx->brief();
|
||||
_unplanned_pipes.emplace(table, pipe);
|
||||
ctx->pipe = pipe;
|
||||
#ifndef BE_TEST
|
||||
RETURN_NOT_OK_STATUS_WITH_WARN(
|
||||
doris::ExecEnv::GetInstance()->new_load_stream_mgr()->put(ctx->id, ctx),
|
||||
"put stream load ctx error");
|
||||
#endif
|
||||
_unplanned_tables.emplace(table, ctx);
|
||||
LOG(INFO) << "create new unplanned table ctx, table: " << table
|
||||
<< "load id: " << ctx->id << ", txn id: " << _ctx->txn_id;
|
||||
} else {
|
||||
pipe = iter->second;
|
||||
pipe = std::static_pointer_cast<io::KafkaConsumerPipe>(iter->second->pipe);
|
||||
}
|
||||
|
||||
// It is necessary to determine whether the sum of pipe_current_capacity and size is greater than pipe_max_capacity,
|
||||
@ -124,7 +134,7 @@ Status MultiTablePipe::dispatch(const std::string& table, const char* data, size
|
||||
auto pipe_current_capacity = pipe->current_capacity();
|
||||
auto pipe_max_capacity = pipe->max_capacity();
|
||||
if (_unplanned_row_cnt >= _row_threshold ||
|
||||
_unplanned_pipes.size() >= _wait_tables_threshold ||
|
||||
_unplanned_tables.size() >= _wait_tables_threshold ||
|
||||
pipe_current_capacity + size > pipe_max_capacity) {
|
||||
LOG(INFO) << fmt::format(
|
||||
"unplanned row cnt={} reach row_threshold={} or "
|
||||
@ -151,109 +161,104 @@ Status MultiTablePipe::dispatch(const std::string& table, const char* data, size
|
||||
|
||||
#ifndef BE_TEST
|
||||
Status MultiTablePipe::request_and_exec_plans() {
|
||||
if (_unplanned_pipes.empty()) {
|
||||
if (_unplanned_tables.empty()) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
// get list of table names in unplanned pipes
|
||||
std::vector<std::string> tables;
|
||||
fmt::memory_buffer log_buffer;
|
||||
log_buffer.clear();
|
||||
fmt::format_to(log_buffer, "request plans for {} tables: [ ", _unplanned_pipes.size());
|
||||
for (auto& pair : _unplanned_pipes) {
|
||||
tables.push_back(pair.first);
|
||||
fmt::format_to(log_buffer, "request plans for {} tables: [ ", _unplanned_tables.size());
|
||||
for (auto& pair : _unplanned_tables) {
|
||||
fmt::format_to(log_buffer, "{} ", pair.first);
|
||||
}
|
||||
fmt::format_to(log_buffer, "]");
|
||||
LOG(INFO) << fmt::to_string(log_buffer);
|
||||
|
||||
TStreamLoadPutRequest request;
|
||||
set_request_auth(&request, _ctx->auth);
|
||||
request.db = _ctx->db;
|
||||
request.table_names = tables;
|
||||
request.__isset.table_names = true;
|
||||
request.txnId = _ctx->txn_id;
|
||||
request.formatType = _ctx->format;
|
||||
request.__set_compress_type(_ctx->compress_type);
|
||||
request.__set_header_type(_ctx->header_type);
|
||||
request.__set_loadId(_ctx->id.to_thrift());
|
||||
request.fileType = TFileType::FILE_STREAM;
|
||||
request.__set_thrift_rpc_timeout_ms(config::thrift_rpc_timeout_ms);
|
||||
request.__set_memtable_on_sink_node(_ctx->memtable_on_sink_node);
|
||||
// no need to register new_load_stream_mgr coz it is already done in routineload submit task
|
||||
|
||||
// plan this load
|
||||
ExecEnv* exec_env = doris::ExecEnv::GetInstance();
|
||||
TNetworkAddress master_addr = exec_env->master_info()->network_address;
|
||||
int64_t stream_load_put_start_time = MonotonicNanos();
|
||||
RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
|
||||
master_addr.hostname, master_addr.port,
|
||||
[&request, this](FrontendServiceConnection& client) {
|
||||
client->streamLoadMultiTablePut(_ctx->multi_table_put_result, request);
|
||||
}));
|
||||
_ctx->stream_load_put_cost_nanos = MonotonicNanos() - stream_load_put_start_time;
|
||||
|
||||
Status plan_status(Status::create(_ctx->multi_table_put_result.status));
|
||||
if (!plan_status.ok()) {
|
||||
LOG(WARNING) << "plan streaming load failed. errmsg=" << plan_status << _ctx->brief();
|
||||
return plan_status;
|
||||
}
|
||||
|
||||
Status st;
|
||||
if (_ctx->multi_table_put_result.__isset.params &&
|
||||
!_ctx->multi_table_put_result.__isset.pipeline_params) {
|
||||
st = exec_plans(exec_env, _ctx->multi_table_put_result.params);
|
||||
} else if (!_ctx->multi_table_put_result.__isset.params &&
|
||||
_ctx->multi_table_put_result.__isset.pipeline_params) {
|
||||
st = exec_plans(exec_env, _ctx->multi_table_put_result.pipeline_params);
|
||||
} else {
|
||||
return Status::Aborted("too many or too few params are set in multi_table_put_result.");
|
||||
}
|
||||
for (auto& pair : _unplanned_tables) {
|
||||
TStreamLoadPutRequest request;
|
||||
set_request_auth(&request, _ctx->auth);
|
||||
std::vector<std::string> tables;
|
||||
tables.push_back(pair.first);
|
||||
request.db = _ctx->db;
|
||||
request.table_names = tables;
|
||||
request.__isset.table_names = true;
|
||||
request.txnId = _ctx->txn_id;
|
||||
request.formatType = _ctx->format;
|
||||
request.__set_compress_type(_ctx->compress_type);
|
||||
request.__set_header_type(_ctx->header_type);
|
||||
request.__set_loadId((pair.second->id).to_thrift());
|
||||
request.fileType = TFileType::FILE_STREAM;
|
||||
request.__set_thrift_rpc_timeout_ms(config::thrift_rpc_timeout_ms);
|
||||
request.__set_memtable_on_sink_node(_ctx->memtable_on_sink_node);
|
||||
// no need to register new_load_stream_mgr coz it is already done in routineload submit task
|
||||
|
||||
// plan this load
|
||||
ExecEnv* exec_env = doris::ExecEnv::GetInstance();
|
||||
TNetworkAddress master_addr = exec_env->master_info()->network_address;
|
||||
int64_t stream_load_put_start_time = MonotonicNanos();
|
||||
RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
|
||||
master_addr.hostname, master_addr.port,
|
||||
[&request, this](FrontendServiceConnection& client) {
|
||||
client->streamLoadMultiTablePut(_ctx->multi_table_put_result, request);
|
||||
}));
|
||||
_ctx->stream_load_put_cost_nanos = MonotonicNanos() - stream_load_put_start_time;
|
||||
|
||||
Status plan_status(Status::create(_ctx->multi_table_put_result.status));
|
||||
if (!plan_status.ok()) {
|
||||
LOG(WARNING) << "plan streaming load failed. errmsg=" << plan_status << _ctx->brief();
|
||||
return plan_status;
|
||||
}
|
||||
|
||||
if (_ctx->multi_table_put_result.__isset.params &&
|
||||
!_ctx->multi_table_put_result.__isset.pipeline_params) {
|
||||
st = exec_plans(exec_env, _ctx->multi_table_put_result.params);
|
||||
} else if (!_ctx->multi_table_put_result.__isset.params &&
|
||||
_ctx->multi_table_put_result.__isset.pipeline_params) {
|
||||
st = exec_plans(exec_env, _ctx->multi_table_put_result.pipeline_params);
|
||||
} else {
|
||||
return Status::Aborted("too many or too few params are set in multi_table_put_result.");
|
||||
}
|
||||
if (!st.ok()) {
|
||||
return st;
|
||||
}
|
||||
}
|
||||
_unplanned_tables.clear();
|
||||
return st;
|
||||
}
|
||||
|
||||
template <typename ExecParam>
|
||||
Status MultiTablePipe::exec_plans(ExecEnv* exec_env, std::vector<ExecParam> params) {
|
||||
// put unplanned pipes into planned pipes and clear unplanned pipes
|
||||
for (auto& pipe : _unplanned_pipes) {
|
||||
_ctx->table_list.push_back(pipe.first);
|
||||
_planned_pipes.emplace(pipe.first, pipe.second);
|
||||
for (auto& pair : _unplanned_tables) {
|
||||
_ctx->table_list.push_back(pair.first);
|
||||
_planned_tables.emplace(pair.first, pair.second);
|
||||
}
|
||||
LOG(INFO) << fmt::format("{} tables plan complete, planned table cnt={}, returned plan cnt={}",
|
||||
_unplanned_pipes.size(), _planned_pipes.size(), params.size())
|
||||
_unplanned_tables.size(), _planned_tables.size(), params.size())
|
||||
<< ", ctx: " << _ctx->brief();
|
||||
_unplanned_pipes.clear();
|
||||
|
||||
for (auto& plan : params) {
|
||||
DBUG_EXECUTE_IF("MultiTablePipe.exec_plans.failed",
|
||||
{ return Status::Aborted("MultiTablePipe.exec_plans.failed"); });
|
||||
if (!plan.__isset.table_name ||
|
||||
_planned_pipes.find(plan.table_name) == _planned_pipes.end()) {
|
||||
_unplanned_tables.find(plan.table_name) == _unplanned_tables.end()) {
|
||||
return Status::Aborted("Missing vital param: table_name");
|
||||
}
|
||||
|
||||
if constexpr (std::is_same_v<ExecParam, TExecPlanFragmentParams>) {
|
||||
RETURN_IF_ERROR(
|
||||
put_pipe(plan.params.fragment_instance_id, _planned_pipes[plan.table_name]));
|
||||
LOG(INFO) << "fragment_instance_id=" << print_id(plan.params.fragment_instance_id)
|
||||
<< " table=" << plan.table_name << ", ctx: " << _ctx->brief();
|
||||
} else if constexpr (std::is_same_v<ExecParam, TPipelineFragmentParams>) {
|
||||
auto pipe_id = calculate_pipe_id(plan.query_id, plan.fragment_id);
|
||||
RETURN_IF_ERROR(put_pipe(pipe_id, _planned_pipes[plan.table_name]));
|
||||
LOG(INFO) << "pipe_id=" << pipe_id << ", table=" << plan.table_name
|
||||
<< ", ctx: " << _ctx->brief();
|
||||
} else {
|
||||
LOG(WARNING) << "illegal exec param type, need `TExecPlanFragmentParams` or "
|
||||
"`TPipelineFragmentParams`, will crash"
|
||||
<< ", ctx: " << _ctx->brief();
|
||||
CHECK(false);
|
||||
}
|
||||
|
||||
_inflight_cnt++;
|
||||
|
||||
RETURN_IF_ERROR(exec_env->fragment_mgr()->exec_plan_fragment(
|
||||
plan, [this](RuntimeState* state, Status* status) {
|
||||
plan, [this, plan](RuntimeState* state, Status* status) {
|
||||
DCHECK(state);
|
||||
auto pair = _planned_tables.find(plan.table_name);
|
||||
if (pair == _planned_tables.end()) {
|
||||
LOG(WARNING) << "failed to get ctx, table: " << plan.table_name;
|
||||
} else {
|
||||
doris::ExecEnv::GetInstance()->new_load_stream_mgr()->remove(
|
||||
pair->second->id);
|
||||
}
|
||||
|
||||
{
|
||||
std::lock_guard<std::mutex> l(_tablet_commit_infos_lock);
|
||||
_tablet_commit_infos.insert(_tablet_commit_infos.end(),
|
||||
@ -297,12 +302,12 @@ Status MultiTablePipe::exec_plans(ExecEnv* exec_env, std::vector<ExecParam> para
|
||||
#else
|
||||
Status MultiTablePipe::request_and_exec_plans() {
|
||||
// put unplanned pipes into planned pipes
|
||||
for (auto& pipe : _unplanned_pipes) {
|
||||
_planned_pipes.emplace(pipe.first, pipe.second);
|
||||
for (auto& pipe : _unplanned_tables) {
|
||||
_planned_tables.emplace(pipe.first, pipe.second);
|
||||
}
|
||||
LOG(INFO) << fmt::format("{} tables plan complete, planned table cnt={}",
|
||||
_unplanned_pipes.size(), _planned_pipes.size());
|
||||
_unplanned_pipes.clear();
|
||||
_unplanned_tables.size(), _planned_tables.size());
|
||||
_unplanned_tables.clear();
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -327,35 +332,6 @@ void MultiTablePipe::_handle_consumer_finished() {
|
||||
_ctx->promise.set_value(_status); // when all done, finish the routine load task
|
||||
}
|
||||
|
||||
Status MultiTablePipe::put_pipe(const TUniqueId& pipe_id,
|
||||
std::shared_ptr<io::StreamLoadPipe> pipe) {
|
||||
std::lock_guard<std::mutex> l(_pipe_map_lock);
|
||||
auto it = _pipe_map.find(pipe_id);
|
||||
if (it != std::end(_pipe_map)) {
|
||||
return Status::InternalError("id already exist");
|
||||
}
|
||||
_pipe_map.emplace(pipe_id, pipe);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
std::shared_ptr<io::StreamLoadPipe> MultiTablePipe::get_pipe(const TUniqueId& pipe_id) {
|
||||
std::lock_guard<std::mutex> l(_pipe_map_lock);
|
||||
auto it = _pipe_map.find(pipe_id);
|
||||
if (it == std::end(_pipe_map)) {
|
||||
return {};
|
||||
}
|
||||
return it->second;
|
||||
}
|
||||
|
||||
void MultiTablePipe::remove_pipe(const TUniqueId& pipe_id) {
|
||||
std::lock_guard<std::mutex> l(_pipe_map_lock);
|
||||
auto it = _pipe_map.find(pipe_id);
|
||||
if (it != std::end(_pipe_map)) {
|
||||
_pipe_map.erase(it);
|
||||
VLOG_NOTICE << "remove stream load pipe: " << pipe_id;
|
||||
}
|
||||
}
|
||||
|
||||
template Status MultiTablePipe::exec_plans(ExecEnv* exec_env,
|
||||
std::vector<TExecPlanFragmentParams> params);
|
||||
template Status MultiTablePipe::exec_plans(ExecEnv* exec_env,
|
||||
|
||||
@ -60,13 +60,6 @@ public:
|
||||
|
||||
void cancel(const std::string& reason) override;
|
||||
|
||||
// register <instance id, pipe> pair
|
||||
Status put_pipe(const TUniqueId& pipe_id, std::shared_ptr<io::StreamLoadPipe> pipe);
|
||||
|
||||
std::shared_ptr<io::StreamLoadPipe> get_pipe(const TUniqueId& pipe_id);
|
||||
|
||||
void remove_pipe(const TUniqueId& pipe_id);
|
||||
|
||||
private:
|
||||
// parse table name from data
|
||||
std::string parse_dst_table(const char* data, size_t size);
|
||||
@ -82,8 +75,8 @@ private:
|
||||
void _handle_consumer_finished();
|
||||
|
||||
private:
|
||||
std::unordered_map<std::string /*table*/, KafkaConsumerPipePtr> _planned_pipes;
|
||||
std::unordered_map<std::string /*table*/, KafkaConsumerPipePtr> _unplanned_pipes;
|
||||
std::unordered_map<std::string /*table*/, std::shared_ptr<StreamLoadContext>> _planned_tables;
|
||||
std::unordered_map<std::string /*table*/, std::shared_ptr<StreamLoadContext>> _unplanned_tables;
|
||||
std::atomic<uint64_t> _unplanned_row_cnt {0}; // trigger plan request when exceed threshold
|
||||
// inflight count, when it is zero, means consume and all plans is finished
|
||||
std::atomic<uint64_t> _inflight_cnt {1};
|
||||
|
||||
Reference in New Issue
Block a user