From 05e365ea137eb8c92b8e7eedc7d1435e83f065ae Mon Sep 17 00:00:00 2001 From: zzzzzzzs <1443539042@qq.com> Date: Thu, 14 Sep 2023 21:41:11 +0800 Subject: [PATCH] [fix] fix http_stream retry mechanism (#23969) Co-authored-by: yiguolei <676222867@qq.com> --- be/src/http/action/http_stream.cpp | 2 -- be/src/io/file_factory.cpp | 5 ++--- be/src/io/file_factory.h | 2 +- .../runtime/stream_load/stream_load_context.h | 1 - be/src/service/internal_service.cpp | 8 -------- be/src/vec/exec/format/csv/csv_reader.cpp | 6 ++++-- .../vec/exec/format/json/new_json_reader.cpp | 3 ++- .../org/apache/doris/qe/ConnectContext.java | 19 ------------------- gensrc/thrift/FrontendService.thrift | 9 --------- 9 files changed, 9 insertions(+), 46 deletions(-) diff --git a/be/src/http/action/http_stream.cpp b/be/src/http/action/http_stream.cpp index f8068389b2..25b1f188f9 100644 --- a/be/src/http/action/http_stream.cpp +++ b/be/src/http/action/http_stream.cpp @@ -241,7 +241,6 @@ void HttpStreamAction::on_chunk_data(HttpRequest* req) { if (ctx->schema_buffer->pos + remove_bytes < config::stream_tvf_buffer_size) { ctx->schema_buffer->put_bytes(bb->ptr, remove_bytes); } else { - ctx->need_schema = true; ctx->is_read_schema = false; ctx->status = _process_put(req, ctx); } @@ -256,7 +255,6 @@ void HttpStreamAction::on_chunk_data(HttpRequest* req) { } // after all the data has been read and it has not reached 1M, it will execute here if (ctx->is_read_schema) { - ctx->need_schema = true; ctx->is_read_schema = false; ctx->status = _process_put(req, ctx); } diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp index c75ec280b8..55b6f181a9 100644 --- a/be/src/io/file_factory.cpp +++ b/be/src/io/file_factory.cpp @@ -141,12 +141,12 @@ Status FileFactory::create_file_reader(const io::FileSystemProperties& system_pr // file scan node/stream load pipe Status FileFactory::create_pipe_reader(const TUniqueId& load_id, io::FileReaderSPtr* file_reader, - RuntimeState* runtime_state) { + RuntimeState* runtime_state, bool need_schema) { auto stream_load_ctx = ExecEnv::GetInstance()->new_load_stream_mgr()->get(load_id); if (!stream_load_ctx) { return Status::InternalError("unknown stream load id: {}", UniqueId(load_id).to_string()); } - if (stream_load_ctx->need_schema == true) { + if (need_schema == true) { auto pipe = std::make_shared( io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /* min_chunk_size */, stream_load_ctx->schema_buffer->pos /* total_length */); @@ -154,7 +154,6 @@ Status FileFactory::create_pipe_reader(const TUniqueId& load_id, io::FileReaderS pipe->append(stream_load_ctx->schema_buffer); pipe->finish(); *file_reader = std::move(pipe); - stream_load_ctx->need_schema = false; } else { *file_reader = stream_load_ctx->pipe; } diff --git a/be/src/io/file_factory.h b/be/src/io/file_factory.h index 06bf8e0cc4..7c51118fc5 100644 --- a/be/src/io/file_factory.h +++ b/be/src/io/file_factory.h @@ -85,7 +85,7 @@ public: // Create FileReader for stream load pipe static Status create_pipe_reader(const TUniqueId& load_id, io::FileReaderSPtr* file_reader, - RuntimeState* runtime_state); + RuntimeState* runtime_state, bool need_schema); static Status create_hdfs_reader(const THdfsParams& hdfs_params, const io::FileDescription& fd, const io::FileReaderOptions& reader_options, diff --git a/be/src/runtime/stream_load/stream_load_context.h b/be/src/runtime/stream_load/stream_load_context.h index e5eda74964..70aeb18fee 100644 --- a/be/src/runtime/stream_load/stream_load_context.h +++ b/be/src/runtime/stream_load/stream_load_context.h @@ -161,7 +161,6 @@ public: int64_t txn_id = -1; // http stream - bool need_schema = false; bool is_read_schema = true; std::string txn_operation = ""; diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 9b923aec7d..0dcc26f45d 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -680,14 +680,6 @@ void PInternalServiceImpl::fetch_table_schema(google::protobuf::RpcController* c st.to_protobuf(result->mutable_status()); return; } - if (params.file_type == TFileType::FILE_STREAM) { - auto stream_load_ctx = - ExecEnv::GetInstance()->new_load_stream_mgr()->get(params.load_id); - if (!stream_load_ctx) { - st = Status::InternalError("unknown stream load id: {}", - UniqueId(params.load_id).to_string()); - } - } result->set_column_nums(col_names.size()); for (size_t idx = 0; idx < col_names.size(); ++idx) { result->add_column_names(col_names[idx]); diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp b/be/src/vec/exec/format/csv/csv_reader.cpp index cbb89b28b5..7abe510000 100644 --- a/be/src/vec/exec/format/csv/csv_reader.cpp +++ b/be/src/vec/exec/format/csv/csv_reader.cpp @@ -276,7 +276,8 @@ Status CsvReader::init_reader(bool is_load) { } if (_params.file_type == TFileType::FILE_STREAM) { - RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, &_file_reader, _state)); + RETURN_IF_ERROR( + FileFactory::create_pipe_reader(_range.load_id, &_file_reader, _state, false)); } else { _file_description.mtime = _range.__isset.modification_time ? _range.modification_time : 0; io::FileReaderOptions reader_options = @@ -769,7 +770,8 @@ Status CsvReader::_prepare_parse(size_t* read_line, bool* is_parse_name) { io::FileReaderOptions reader_options = FileFactory::get_reader_options(_state, _file_description); if (_params.file_type == TFileType::FILE_STREAM) { - RETURN_IF_ERROR(FileFactory::create_pipe_reader(_params.load_id, &_file_reader, _state)); + RETURN_IF_ERROR( + FileFactory::create_pipe_reader(_params.load_id, &_file_reader, _state, true)); } else { RETURN_IF_ERROR(FileFactory::create_file_reader(_system_properties, _file_description, reader_options, &_file_system, diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp b/be/src/vec/exec/format/json/new_json_reader.cpp index 907dbd2350..af46d853d2 100644 --- a/be/src/vec/exec/format/json/new_json_reader.cpp +++ b/be/src/vec/exec/format/json/new_json_reader.cpp @@ -382,7 +382,8 @@ Status NewJsonReader::_open_file_reader() { _current_offset = start_offset; if (_params.file_type == TFileType::FILE_STREAM) { - RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, &_file_reader, _state)); + RETURN_IF_ERROR( + FileFactory::create_pipe_reader(_range.load_id, &_file_reader, _state, false)); } else { _file_description.mtime = _range.__isset.modification_time ? _range.modification_time : 0; io::FileReaderOptions reader_options = diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java index 7c7aa7e2da..99afa091d7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java @@ -41,7 +41,6 @@ import org.apache.doris.resource.Tag; import org.apache.doris.statistics.ColumnStatistic; import org.apache.doris.statistics.Histogram; import org.apache.doris.system.Backend; -import org.apache.doris.task.LoadTaskInfo; import org.apache.doris.thrift.TUniqueId; import org.apache.doris.transaction.TransactionEntry; import org.apache.doris.transaction.TransactionStatus; @@ -78,9 +77,7 @@ public class ConnectContext { protected volatile long forwardedStmtId; // set for http_stream - protected volatile TUniqueId loadId; protected volatile long backendId; - protected volatile LoadTaskInfo streamLoadInfo; protected volatile TUniqueId queryId; protected volatile String traceId; @@ -338,22 +335,6 @@ public class ConnectContext { this.backendId = backendId; } - public TUniqueId getLoadId() { - return loadId; - } - - public void setLoadId(TUniqueId loadId) { - this.loadId = loadId; - } - - public void setStreamLoadInfo(LoadTaskInfo streamLoadInfo) { - this.streamLoadInfo = streamLoadInfo; - } - - public LoadTaskInfo getStreamLoadInfo() { - return streamLoadInfo; - } - public void setStmtId(long stmtId) { this.stmtId = stmtId; } diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 65bd2b7151..1edaa202a0 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -647,15 +647,6 @@ struct TStreamLoadMultiTablePutResult { 3: optional list pipeline_params } -struct TStreamLoadWithLoadStatusResult { - 1: optional Status.TStatus status - 2: optional i64 txn_id - 3: optional i64 total_rows - 4: optional i64 loaded_rows - 5: optional i64 filtered_rows - 6: optional i64 unselected_rows -} - struct TKafkaRLTaskProgress { 1: required map partitionCmtOffset }