diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp b/be/src/runtime/stream_load/stream_load_executor.cpp index 32e4d76dc7..3dd39e56f6 100644 --- a/be/src/runtime/stream_load/stream_load_executor.cpp +++ b/be/src/runtime/stream_load/stream_load_executor.cpp @@ -71,161 +71,83 @@ Status StreamLoadExecutor::execute_plan_fragment(std::shared_ptrnumber_filtered_rows > 0 && !state->get_error_log_file_path().empty()) { + ctx->error_url = to_load_error_http_path(state->get_error_log_file_path()); + } + + if (status->ok()) { + DorisMetrics::instance()->stream_receive_bytes_total->increment(ctx->receive_bytes); + DorisMetrics::instance()->stream_load_rows_total->increment( + ctx->number_loaded_rows); + } + } else { + LOG(WARNING) << "fragment execute failed" + << ", query_id=" << UniqueId(ctx->put_result.params.params.query_id) + << ", err_msg=" << status->to_string() << ", " << ctx->brief(); + // cancel body_sink, make sender known it + if (ctx->body_sink != nullptr) { + ctx->body_sink->cancel(status->to_string()); + } + + switch (ctx->load_src_type) { + // reset the stream load ctx's kafka commit offset + case TLoadSourceType::KAFKA: + ctx->kafka_info->reset_offset(); + break; + default: + break; + } + } + ctx->write_data_cost_nanos = MonotonicNanos() - ctx->start_write_data_nanos; + ctx->promise.set_value(*status); + + if (!status->ok() && ctx->body_sink != nullptr) { + // In some cases, the load execution is exited early. + // For example, when max_filter_ratio is 0 and illegal data is encountered + // during stream loading, the entire load process is terminated early. + // However, the http connection may still be sending data to stream_load_pipe + // and waiting for it to be consumed. + // Therefore, we need to actively cancel to end the pipe. + ctx->body_sink->cancel(status->to_string()); + } + + if (ctx->need_commit_self && ctx->body_sink != nullptr) { + if (ctx->body_sink->cancelled() || !status->ok()) { + ctx->status = *status; + this->rollback_txn(ctx.get()); + } else { + static_cast(this->commit_txn(ctx.get())); + } + } + }; + if (ctx->put_result.__isset.params) { - st = _exec_env->fragment_mgr()->exec_plan_fragment( - ctx->put_result.params, [ctx, this](RuntimeState* state, Status* status) { - if (ctx->group_commit) { - ctx->label = state->import_label(); - ctx->txn_id = state->wal_id(); - } - ctx->exec_env()->new_load_stream_mgr()->remove(ctx->id); - ctx->commit_infos = std::move(state->tablet_commit_infos()); - if (status->ok()) { - ctx->number_total_rows = state->num_rows_load_total(); - ctx->number_loaded_rows = state->num_rows_load_success(); - ctx->number_filtered_rows = state->num_rows_load_filtered(); - ctx->number_unselected_rows = state->num_rows_load_unselected(); - - int64_t num_selected_rows = - ctx->number_total_rows - ctx->number_unselected_rows; - if (!ctx->group_commit && num_selected_rows > 0 && - (double)ctx->number_filtered_rows / num_selected_rows > - ctx->max_filter_ratio) { - // NOTE: Do not modify the error message here, for historical reasons, - // some users may rely on this error message. - *status = Status::InternalError("too many filtered rows"); - } - if (ctx->number_filtered_rows > 0 && - !state->get_error_log_file_path().empty()) { - ctx->error_url = - to_load_error_http_path(state->get_error_log_file_path()); - } - - if (status->ok()) { - DorisMetrics::instance()->stream_receive_bytes_total->increment( - ctx->receive_bytes); - DorisMetrics::instance()->stream_load_rows_total->increment( - ctx->number_loaded_rows); - } - } else { - LOG(WARNING) - << "fragment execute failed" - << ", query_id=" << UniqueId(ctx->put_result.params.params.query_id) - << ", err_msg=" << status->to_string() << ", " << ctx->brief(); - // cancel body_sink, make sender known it - if (ctx->body_sink != nullptr) { - ctx->body_sink->cancel(status->to_string()); - } - - switch (ctx->load_src_type) { - // reset the stream load ctx's kafka commit offset - case TLoadSourceType::KAFKA: - ctx->kafka_info->reset_offset(); - break; - default: - break; - } - } - ctx->write_data_cost_nanos = MonotonicNanos() - ctx->start_write_data_nanos; - ctx->promise.set_value(*status); - - if (!status->ok() && ctx->body_sink != nullptr) { - // In some cases, the load execution is exited early. - // For example, when max_filter_ratio is 0 and illegal data is encountered - // during stream loading, the entire load process is terminated early. - // However, the http connection may still be sending data to stream_load_pipe - // and waiting for it to be consumed. - // Therefore, we need to actively cancel to end the pipe. - ctx->body_sink->cancel(status->to_string()); - } - - if (ctx->need_commit_self && ctx->body_sink != nullptr) { - if (ctx->body_sink->cancelled() || !status->ok()) { - ctx->status = *status; - this->rollback_txn(ctx.get()); - } else { - static_cast(this->commit_txn(ctx.get())); - } - } - }); + st = _exec_env->fragment_mgr()->exec_plan_fragment(ctx->put_result.params, exec_fragment); } else { - st = _exec_env->fragment_mgr()->exec_plan_fragment( - ctx->put_result.pipeline_params, [ctx, this](RuntimeState* state, Status* status) { - if (ctx->group_commit) { - ctx->label = state->import_label(); - ctx->txn_id = state->wal_id(); - } - ctx->exec_env()->new_load_stream_mgr()->remove(ctx->id); - ctx->commit_infos = std::move(state->tablet_commit_infos()); - if (status->ok()) { - ctx->number_total_rows = state->num_rows_load_total(); - ctx->number_loaded_rows = state->num_rows_load_success(); - ctx->number_filtered_rows = state->num_rows_load_filtered(); - ctx->number_unselected_rows = state->num_rows_load_unselected(); - - int64_t num_selected_rows = - ctx->number_total_rows - ctx->number_unselected_rows; - if (!ctx->group_commit && num_selected_rows > 0 && - (double)ctx->number_filtered_rows / num_selected_rows > - ctx->max_filter_ratio) { - // NOTE: Do not modify the error message here, for historical reasons, - // some users may rely on this error message. - *status = Status::InternalError("too many filtered rows"); - } - if (ctx->number_filtered_rows > 0 && - !state->get_error_log_file_path().empty()) { - ctx->error_url = - to_load_error_http_path(state->get_error_log_file_path()); - } - - if (status->ok()) { - DorisMetrics::instance()->stream_receive_bytes_total->increment( - ctx->receive_bytes); - DorisMetrics::instance()->stream_load_rows_total->increment( - ctx->number_loaded_rows); - } - } else { - LOG(WARNING) - << "fragment execute failed" - << ", query_id=" << UniqueId(ctx->put_result.params.params.query_id) - << ", err_msg=" << status->to_string() << ", " << ctx->brief(); - // cancel body_sink, make sender known it - if (ctx->body_sink != nullptr) { - ctx->body_sink->cancel(status->to_string()); - } - - switch (ctx->load_src_type) { - // reset the stream load ctx's kafka commit offset - case TLoadSourceType::KAFKA: - ctx->kafka_info->reset_offset(); - break; - default: - break; - } - } - ctx->write_data_cost_nanos = MonotonicNanos() - ctx->start_write_data_nanos; - ctx->promise.set_value(*status); - - if (!status->ok() && ctx->body_sink != nullptr) { - // In some cases, the load execution is exited early. - // For example, when max_filter_ratio is 0 and illegal data is encountered - // during stream loading, the entire load process is terminated early. - // However, the http connection may still be sending data to stream_load_pipe - // and waiting for it to be consumed. - // Therefore, we need to actively cancel to end the pipe. - ctx->body_sink->cancel(status->to_string()); - } - - if (ctx->need_commit_self && ctx->body_sink != nullptr) { - if (ctx->body_sink->cancelled() || !status->ok()) { - ctx->status = *status; - this->rollback_txn(ctx.get()); - } else { - static_cast(this->commit_txn(ctx.get())); - } - } - }); + st = _exec_env->fragment_mgr()->exec_plan_fragment(ctx->put_result.pipeline_params, + exec_fragment); } + if (!st.ok()) { // no need to check unref's return value return st;