From 6e8d52f3fcf5a4f3068dbfeb46428846c558eba6 Mon Sep 17 00:00:00 2001 From: Mingyu Chen Date: Tue, 22 Feb 2022 09:26:23 +0800 Subject: [PATCH] [fix](stream-load) fix bug that stream load may be blocked with unqualified data (#8176) Co-authored-by: morningman --- be/src/runtime/fragment_mgr.cpp | 2 +- be/src/runtime/message_body_sink.h | 8 +++++++ be/src/runtime/plan_fragment_executor.cpp | 2 +- .../stream_load/stream_load_executor.cpp | 21 ++++++++++++------- .../stream_load/stream_load_executor.h | 1 - be/src/runtime/stream_load/stream_load_pipe.h | 7 +------ 6 files changed, 24 insertions(+), 17 deletions(-) diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index dcff85c2d5..450aa67343 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -511,7 +511,7 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params) { RETURN_IF_ERROR(_exec_env->load_stream_mgr()->put(stream_load_cxt->id, pipe)); RETURN_IF_ERROR( - _exec_env->stream_load_executor()->execute_plan_fragment(stream_load_cxt, pipe)); + _exec_env->stream_load_executor()->execute_plan_fragment(stream_load_cxt)); set_pipe(params.params.fragment_instance_id, pipe); return Status::OK(); } else { diff --git a/be/src/runtime/message_body_sink.h b/be/src/runtime/message_body_sink.h index d5460a64ae..2c4df30976 100644 --- a/be/src/runtime/message_body_sink.h +++ b/be/src/runtime/message_body_sink.h @@ -31,6 +31,14 @@ public: virtual Status finish() { return Status::OK(); } // called when read HTTP failed virtual void cancel(const std::string& reason) {} + + bool finished() const { return _finished; } + bool cancelled() const { return _cancelled; } + +protected: + bool _finished = false; + bool _cancelled = false; + std::string _cancelled_reason = ""; }; // write message to a local file diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index 3966c8d952..554f0a3133 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -296,7 +296,7 @@ Status PlanFragmentExecutor::open_vectorized_internal() { _collect_query_statistics(); } - auto st =_sink->send(runtime_state(), block); + auto st = _sink->send(runtime_state(), block); if (st.is_end_of_file()) { break; } diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp b/be/src/runtime/stream_load/stream_load_executor.cpp index 7fa22456ec..67b8336fbe 100644 --- a/be/src/runtime/stream_load/stream_load_executor.cpp +++ b/be/src/runtime/stream_load/stream_load_executor.cpp @@ -38,11 +38,6 @@ Status k_stream_load_plan_status; #endif Status StreamLoadExecutor::execute_plan_fragment(StreamLoadContext* ctx) { - return execute_plan_fragment(ctx, nullptr); -} - -Status StreamLoadExecutor::execute_plan_fragment(StreamLoadContext* ctx, - std::shared_ptr pipe) { DorisMetrics::instance()->txn_exec_plan_total->increment(1); // submit this params #ifndef BE_TEST @@ -51,7 +46,7 @@ Status StreamLoadExecutor::execute_plan_fragment(StreamLoadContext* ctx, LOG(INFO) << "begin to execute job. label=" << ctx->label << ", txn_id=" << ctx->txn_id << ", query_id=" << print_id(ctx->put_result.params.params.query_id); auto st = _exec_env->fragment_mgr()->exec_plan_fragment( - ctx->put_result.params, [ctx, pipe, this](PlanFragmentExecutor* executor) { + ctx->put_result.params, [ctx, this](PlanFragmentExecutor* executor) { ctx->commit_infos = std::move(executor->runtime_state()->tablet_commit_infos()); Status status = executor->status(); if (status.ok()) { @@ -103,8 +98,18 @@ Status StreamLoadExecutor::execute_plan_fragment(StreamLoadContext* ctx, ctx->write_data_cost_nanos = MonotonicNanos() - ctx->start_write_data_nanos; ctx->promise.set_value(status); - if (ctx->need_commit_self && pipe != nullptr) { - if (pipe->closed() || !status.ok()) { + if (!status.ok() && ctx->body_sink != nullptr) { + // In some cases, the load execution is exited early. + // For example, when max_filter_ratio is 0 and illegal data is encountered + // during stream loading, the entire load process is terminated early. + // However, the http connection may still be sending data to stream_load_pipe + // and waiting for it to be consumed. + // Therefore, we need to actively cancel to end the pipe. + ctx->body_sink->cancel(status.get_error_msg()); + } + + if (ctx->need_commit_self && ctx->body_sink != nullptr) { + if (ctx->body_sink->cancelled() || !status.ok()) { ctx->status = status; this->rollback_txn(ctx); } else { diff --git a/be/src/runtime/stream_load/stream_load_executor.h b/be/src/runtime/stream_load/stream_load_executor.h index 8adcde1ebe..fd6ea7e39f 100644 --- a/be/src/runtime/stream_load/stream_load_executor.h +++ b/be/src/runtime/stream_load/stream_load_executor.h @@ -50,7 +50,6 @@ public: Status execute_plan_fragment(StreamLoadContext* ctx); - Status execute_plan_fragment(StreamLoadContext* ctx, std::shared_ptr pipe); private: // collect the load statistics from context and set them to stat // return true if stat is set, otherwise, return false diff --git a/be/src/runtime/stream_load/stream_load_pipe.h b/be/src/runtime/stream_load/stream_load_pipe.h index 7f9cb5110e..d5d3006aed 100644 --- a/be/src/runtime/stream_load/stream_load_pipe.h +++ b/be/src/runtime/stream_load/stream_load_pipe.h @@ -40,9 +40,7 @@ public: _max_buffered_bytes(max_buffered_bytes), _min_chunk_size(min_chunk_size), _total_length(total_length), - _use_proto(use_proto), - _finished(false), - _cancelled(false) {} + _use_proto(use_proto) {} virtual ~StreamLoadPipe() {} Status open() override { return Status::OK(); } @@ -270,9 +268,6 @@ private: std::condition_variable _put_cond; std::condition_variable _get_cond; - bool _finished; - bool _cancelled; - std::string _cancelled_reason = ""; ByteBufferPtr _write_buf; };