[fix](stream-load) fix bug that stream load may be blocked with unqualified data (#8176)

Co-authored-by: morningman <chenmingyu@baidu.com>
This commit is contained in:
Mingyu Chen
2022-02-22 09:26:23 +08:00
committed by GitHub
parent 47067e40a6
commit 6e8d52f3fc
6 changed files with 24 additions and 17 deletions

View File

@ -511,7 +511,7 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params) {
RETURN_IF_ERROR(_exec_env->load_stream_mgr()->put(stream_load_cxt->id, pipe));
RETURN_IF_ERROR(
_exec_env->stream_load_executor()->execute_plan_fragment(stream_load_cxt, pipe));
_exec_env->stream_load_executor()->execute_plan_fragment(stream_load_cxt));
set_pipe(params.params.fragment_instance_id, pipe);
return Status::OK();
} else {

View File

@ -31,6 +31,14 @@ public:
virtual Status finish() { return Status::OK(); }
// called when read HTTP failed
virtual void cancel(const std::string& reason) {}
bool finished() const { return _finished; }
bool cancelled() const { return _cancelled; }
protected:
bool _finished = false;
bool _cancelled = false;
std::string _cancelled_reason = "";
};
// write message to a local file

View File

@ -296,7 +296,7 @@ Status PlanFragmentExecutor::open_vectorized_internal() {
_collect_query_statistics();
}
auto st =_sink->send(runtime_state(), block);
auto st = _sink->send(runtime_state(), block);
if (st.is_end_of_file()) {
break;
}

View File

@ -38,11 +38,6 @@ Status k_stream_load_plan_status;
#endif
Status StreamLoadExecutor::execute_plan_fragment(StreamLoadContext* ctx) {
return execute_plan_fragment(ctx, nullptr);
}
Status StreamLoadExecutor::execute_plan_fragment(StreamLoadContext* ctx,
std::shared_ptr<StreamLoadPipe> pipe) {
DorisMetrics::instance()->txn_exec_plan_total->increment(1);
// submit this params
#ifndef BE_TEST
@ -51,7 +46,7 @@ Status StreamLoadExecutor::execute_plan_fragment(StreamLoadContext* ctx,
LOG(INFO) << "begin to execute job. label=" << ctx->label << ", txn_id=" << ctx->txn_id
<< ", query_id=" << print_id(ctx->put_result.params.params.query_id);
auto st = _exec_env->fragment_mgr()->exec_plan_fragment(
ctx->put_result.params, [ctx, pipe, this](PlanFragmentExecutor* executor) {
ctx->put_result.params, [ctx, this](PlanFragmentExecutor* executor) {
ctx->commit_infos = std::move(executor->runtime_state()->tablet_commit_infos());
Status status = executor->status();
if (status.ok()) {
@ -103,8 +98,18 @@ Status StreamLoadExecutor::execute_plan_fragment(StreamLoadContext* ctx,
ctx->write_data_cost_nanos = MonotonicNanos() - ctx->start_write_data_nanos;
ctx->promise.set_value(status);
if (ctx->need_commit_self && pipe != nullptr) {
if (pipe->closed() || !status.ok()) {
if (!status.ok() && ctx->body_sink != nullptr) {
// In some cases, the load execution is exited early.
// For example, when max_filter_ratio is 0 and illegal data is encountered
// during stream loading, the entire load process is terminated early.
// However, the http connection may still be sending data to stream_load_pipe
// and waiting for it to be consumed.
// Therefore, we need to actively cancel to end the pipe.
ctx->body_sink->cancel(status.get_error_msg());
}
if (ctx->need_commit_self && ctx->body_sink != nullptr) {
if (ctx->body_sink->cancelled() || !status.ok()) {
ctx->status = status;
this->rollback_txn(ctx);
} else {

View File

@ -50,7 +50,6 @@ public:
Status execute_plan_fragment(StreamLoadContext* ctx);
Status execute_plan_fragment(StreamLoadContext* ctx, std::shared_ptr<StreamLoadPipe> pipe);
private:
// collect the load statistics from context and set them to stat
// return true if stat is set, otherwise, return false

View File

@ -40,9 +40,7 @@ public:
_max_buffered_bytes(max_buffered_bytes),
_min_chunk_size(min_chunk_size),
_total_length(total_length),
_use_proto(use_proto),
_finished(false),
_cancelled(false) {}
_use_proto(use_proto) {}
virtual ~StreamLoadPipe() {}
Status open() override { return Status::OK(); }
@ -270,9 +268,6 @@ private:
std::condition_variable _put_cond;
std::condition_variable _get_cond;
bool _finished;
bool _cancelled;
std::string _cancelled_reason = "";
ByteBufferPtr _write_buf;
};