[refactor](streamload) refactor stream load executor (#25615)

This commit is contained in:
HHoflittlefish777
2023-10-23 14:34:26 +08:00
committed by GitHub
parent 09b2593035
commit d0da94e22b

View File

@ -71,161 +71,83 @@ Status StreamLoadExecutor::execute_plan_fragment(std::shared_ptr<StreamLoadConte
LOG(INFO) << "begin to execute job. label=" << ctx->label << ", txn_id=" << ctx->txn_id
<< ", query_id=" << print_id(ctx->put_result.params.params.query_id);
Status st;
auto exec_fragment = [ctx, this](RuntimeState* state, Status* status) {
if (ctx->group_commit) {
ctx->label = state->import_label();
ctx->txn_id = state->wal_id();
}
ctx->exec_env()->new_load_stream_mgr()->remove(ctx->id);
ctx->commit_infos = std::move(state->tablet_commit_infos());
if (status->ok()) {
ctx->number_total_rows = state->num_rows_load_total();
ctx->number_loaded_rows = state->num_rows_load_success();
ctx->number_filtered_rows = state->num_rows_load_filtered();
ctx->number_unselected_rows = state->num_rows_load_unselected();
int64_t num_selected_rows = ctx->number_total_rows - ctx->number_unselected_rows;
if (!ctx->group_commit && num_selected_rows > 0 &&
(double)ctx->number_filtered_rows / num_selected_rows > ctx->max_filter_ratio) {
// NOTE: Do not modify the error message here, for historical reasons,
// some users may rely on this error message.
*status = Status::InternalError("too many filtered rows");
}
if (ctx->number_filtered_rows > 0 && !state->get_error_log_file_path().empty()) {
ctx->error_url = to_load_error_http_path(state->get_error_log_file_path());
}
if (status->ok()) {
DorisMetrics::instance()->stream_receive_bytes_total->increment(ctx->receive_bytes);
DorisMetrics::instance()->stream_load_rows_total->increment(
ctx->number_loaded_rows);
}
} else {
LOG(WARNING) << "fragment execute failed"
<< ", query_id=" << UniqueId(ctx->put_result.params.params.query_id)
<< ", err_msg=" << status->to_string() << ", " << ctx->brief();
// cancel body_sink, make sender known it
if (ctx->body_sink != nullptr) {
ctx->body_sink->cancel(status->to_string());
}
switch (ctx->load_src_type) {
// reset the stream load ctx's kafka commit offset
case TLoadSourceType::KAFKA:
ctx->kafka_info->reset_offset();
break;
default:
break;
}
}
ctx->write_data_cost_nanos = MonotonicNanos() - ctx->start_write_data_nanos;
ctx->promise.set_value(*status);
if (!status->ok() && ctx->body_sink != nullptr) {
// In some cases, the load execution is exited early.
// For example, when max_filter_ratio is 0 and illegal data is encountered
// during stream loading, the entire load process is terminated early.
// However, the http connection may still be sending data to stream_load_pipe
// and waiting for it to be consumed.
// Therefore, we need to actively cancel to end the pipe.
ctx->body_sink->cancel(status->to_string());
}
if (ctx->need_commit_self && ctx->body_sink != nullptr) {
if (ctx->body_sink->cancelled() || !status->ok()) {
ctx->status = *status;
this->rollback_txn(ctx.get());
} else {
static_cast<void>(this->commit_txn(ctx.get()));
}
}
};
if (ctx->put_result.__isset.params) {
st = _exec_env->fragment_mgr()->exec_plan_fragment(
ctx->put_result.params, [ctx, this](RuntimeState* state, Status* status) {
if (ctx->group_commit) {
ctx->label = state->import_label();
ctx->txn_id = state->wal_id();
}
ctx->exec_env()->new_load_stream_mgr()->remove(ctx->id);
ctx->commit_infos = std::move(state->tablet_commit_infos());
if (status->ok()) {
ctx->number_total_rows = state->num_rows_load_total();
ctx->number_loaded_rows = state->num_rows_load_success();
ctx->number_filtered_rows = state->num_rows_load_filtered();
ctx->number_unselected_rows = state->num_rows_load_unselected();
int64_t num_selected_rows =
ctx->number_total_rows - ctx->number_unselected_rows;
if (!ctx->group_commit && num_selected_rows > 0 &&
(double)ctx->number_filtered_rows / num_selected_rows >
ctx->max_filter_ratio) {
// NOTE: Do not modify the error message here, for historical reasons,
// some users may rely on this error message.
*status = Status::InternalError("too many filtered rows");
}
if (ctx->number_filtered_rows > 0 &&
!state->get_error_log_file_path().empty()) {
ctx->error_url =
to_load_error_http_path(state->get_error_log_file_path());
}
if (status->ok()) {
DorisMetrics::instance()->stream_receive_bytes_total->increment(
ctx->receive_bytes);
DorisMetrics::instance()->stream_load_rows_total->increment(
ctx->number_loaded_rows);
}
} else {
LOG(WARNING)
<< "fragment execute failed"
<< ", query_id=" << UniqueId(ctx->put_result.params.params.query_id)
<< ", err_msg=" << status->to_string() << ", " << ctx->brief();
// cancel body_sink, make sender known it
if (ctx->body_sink != nullptr) {
ctx->body_sink->cancel(status->to_string());
}
switch (ctx->load_src_type) {
// reset the stream load ctx's kafka commit offset
case TLoadSourceType::KAFKA:
ctx->kafka_info->reset_offset();
break;
default:
break;
}
}
ctx->write_data_cost_nanos = MonotonicNanos() - ctx->start_write_data_nanos;
ctx->promise.set_value(*status);
if (!status->ok() && ctx->body_sink != nullptr) {
// In some cases, the load execution is exited early.
// For example, when max_filter_ratio is 0 and illegal data is encountered
// during stream loading, the entire load process is terminated early.
// However, the http connection may still be sending data to stream_load_pipe
// and waiting for it to be consumed.
// Therefore, we need to actively cancel to end the pipe.
ctx->body_sink->cancel(status->to_string());
}
if (ctx->need_commit_self && ctx->body_sink != nullptr) {
if (ctx->body_sink->cancelled() || !status->ok()) {
ctx->status = *status;
this->rollback_txn(ctx.get());
} else {
static_cast<void>(this->commit_txn(ctx.get()));
}
}
});
st = _exec_env->fragment_mgr()->exec_plan_fragment(ctx->put_result.params, exec_fragment);
} else {
st = _exec_env->fragment_mgr()->exec_plan_fragment(
ctx->put_result.pipeline_params, [ctx, this](RuntimeState* state, Status* status) {
if (ctx->group_commit) {
ctx->label = state->import_label();
ctx->txn_id = state->wal_id();
}
ctx->exec_env()->new_load_stream_mgr()->remove(ctx->id);
ctx->commit_infos = std::move(state->tablet_commit_infos());
if (status->ok()) {
ctx->number_total_rows = state->num_rows_load_total();
ctx->number_loaded_rows = state->num_rows_load_success();
ctx->number_filtered_rows = state->num_rows_load_filtered();
ctx->number_unselected_rows = state->num_rows_load_unselected();
int64_t num_selected_rows =
ctx->number_total_rows - ctx->number_unselected_rows;
if (!ctx->group_commit && num_selected_rows > 0 &&
(double)ctx->number_filtered_rows / num_selected_rows >
ctx->max_filter_ratio) {
// NOTE: Do not modify the error message here, for historical reasons,
// some users may rely on this error message.
*status = Status::InternalError("too many filtered rows");
}
if (ctx->number_filtered_rows > 0 &&
!state->get_error_log_file_path().empty()) {
ctx->error_url =
to_load_error_http_path(state->get_error_log_file_path());
}
if (status->ok()) {
DorisMetrics::instance()->stream_receive_bytes_total->increment(
ctx->receive_bytes);
DorisMetrics::instance()->stream_load_rows_total->increment(
ctx->number_loaded_rows);
}
} else {
LOG(WARNING)
<< "fragment execute failed"
<< ", query_id=" << UniqueId(ctx->put_result.params.params.query_id)
<< ", err_msg=" << status->to_string() << ", " << ctx->brief();
// cancel body_sink, make sender known it
if (ctx->body_sink != nullptr) {
ctx->body_sink->cancel(status->to_string());
}
switch (ctx->load_src_type) {
// reset the stream load ctx's kafka commit offset
case TLoadSourceType::KAFKA:
ctx->kafka_info->reset_offset();
break;
default:
break;
}
}
ctx->write_data_cost_nanos = MonotonicNanos() - ctx->start_write_data_nanos;
ctx->promise.set_value(*status);
if (!status->ok() && ctx->body_sink != nullptr) {
// In some cases, the load execution is exited early.
// For example, when max_filter_ratio is 0 and illegal data is encountered
// during stream loading, the entire load process is terminated early.
// However, the http connection may still be sending data to stream_load_pipe
// and waiting for it to be consumed.
// Therefore, we need to actively cancel to end the pipe.
ctx->body_sink->cancel(status->to_string());
}
if (ctx->need_commit_self && ctx->body_sink != nullptr) {
if (ctx->body_sink->cancelled() || !status->ok()) {
ctx->status = *status;
this->rollback_txn(ctx.get());
} else {
static_cast<void>(this->commit_txn(ctx.get()));
}
}
});
st = _exec_env->fragment_mgr()->exec_plan_fragment(ctx->put_result.pipeline_params,
exec_fragment);
}
if (!st.ok()) {
// no need to check unref's return value
return st;