From 6fe060b79ed77e8549bd2e32e668b0599483ffaf Mon Sep 17 00:00:00 2001 From: zzzzzzzs <1443539042@qq.com> Date: Sun, 8 Oct 2023 11:16:21 +0800 Subject: [PATCH] [fix](streamload) fix http_stream retry mechanism (#24978) If a failure occurs, doris may retry. Due to ctx->is_read_schema is a global variable that has not been reset in a timely manner, which may cause exceptions. --------- Co-authored-by: yiguolei <676222867@qq.com> --- be/src/http/action/http_stream.cpp | 7 ++++--- be/src/io/file_factory.cpp | 6 +++--- be/src/io/file_factory.h | 2 +- be/src/runtime/stream_load/stream_load_context.h | 1 - be/src/service/internal_service.cpp | 8 -------- be/src/vec/exec/format/csv/csv_reader.cpp | 7 +++++-- be/src/vec/exec/format/json/new_json_reader.cpp | 12 +++++++----- be/src/vec/exec/format/json/new_json_reader.h | 2 +- 8 files changed, 21 insertions(+), 24 deletions(-) diff --git a/be/src/http/action/http_stream.cpp b/be/src/http/action/http_stream.cpp index 1b59bf0b6b..e215543e1e 100644 --- a/be/src/http/action/http_stream.cpp +++ b/be/src/http/action/http_stream.cpp @@ -245,13 +245,13 @@ void HttpStreamAction::on_chunk_data(HttpRequest* req) { if (ctx->schema_buffer->pos + remove_bytes < config::stream_tvf_buffer_size) { ctx->schema_buffer->put_bytes(bb->ptr, remove_bytes); } else { - ctx->need_schema = true; + LOG(INFO) << "use a portion of data to request fe to obtain column information"; ctx->is_read_schema = false; ctx->status = _process_put(req, ctx); } } - if (!st.ok()) { + if (!st.ok() && !ctx->status.ok()) { LOG(WARNING) << "append body content failed. errmsg=" << st << ", " << ctx->brief(); ctx->status = st; return; @@ -260,7 +260,8 @@ void HttpStreamAction::on_chunk_data(HttpRequest* req) { } // after all the data has been read and it has not reached 1M, it will execute here if (ctx->is_read_schema) { - ctx->need_schema = true; + LOG(INFO) << "after all the data has been read and it has not reached 1M, it will execute " + << "here"; ctx->is_read_schema = false; ctx->status = _process_put(req, ctx); } diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp index 12037c1b62..afbf18c189 100644 --- a/be/src/io/file_factory.cpp +++ b/be/src/io/file_factory.cpp @@ -141,12 +141,13 @@ Status FileFactory::create_file_reader(const io::FileSystemProperties& system_pr // file scan node/stream load pipe Status FileFactory::create_pipe_reader(const TUniqueId& load_id, io::FileReaderSPtr* file_reader, - RuntimeState* runtime_state) { + RuntimeState* runtime_state, bool need_schema) { auto stream_load_ctx = ExecEnv::GetInstance()->new_load_stream_mgr()->get(load_id); if (!stream_load_ctx) { return Status::InternalError("unknown stream load id: {}", UniqueId(load_id).to_string()); } - if (stream_load_ctx->need_schema == true) { + if (need_schema == true) { + // Here, a portion of the data is processed to parse column information auto pipe = std::make_shared( io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /* min_chunk_size */, stream_load_ctx->schema_buffer->pos /* total_length */); @@ -154,7 +155,6 @@ Status FileFactory::create_pipe_reader(const TUniqueId& load_id, io::FileReaderS static_cast(pipe->append(stream_load_ctx->schema_buffer)); static_cast(pipe->finish()); *file_reader = std::move(pipe); - stream_load_ctx->need_schema = false; } else { *file_reader = stream_load_ctx->pipe; } diff --git a/be/src/io/file_factory.h b/be/src/io/file_factory.h index 06bf8e0cc4..7c51118fc5 100644 --- a/be/src/io/file_factory.h +++ b/be/src/io/file_factory.h @@ -85,7 +85,7 @@ public: // Create FileReader for stream load pipe static Status create_pipe_reader(const TUniqueId& load_id, io::FileReaderSPtr* file_reader, - RuntimeState* runtime_state); + RuntimeState* runtime_state, bool need_schema); static Status create_hdfs_reader(const THdfsParams& hdfs_params, const io::FileDescription& fd, const io::FileReaderOptions& reader_options, diff --git a/be/src/runtime/stream_load/stream_load_context.h b/be/src/runtime/stream_load/stream_load_context.h index ab8cc6be04..ffbed37fe3 100644 --- a/be/src/runtime/stream_load/stream_load_context.h +++ b/be/src/runtime/stream_load/stream_load_context.h @@ -165,7 +165,6 @@ public: int64_t txn_id = default_txn_id; // http stream - bool need_schema = false; bool is_read_schema = true; std::string txn_operation = ""; diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 4e770cd9b8..02ab1b4450 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -682,14 +682,6 @@ void PInternalServiceImpl::fetch_table_schema(google::protobuf::RpcController* c st.to_protobuf(result->mutable_status()); return; } - if (params.file_type == TFileType::FILE_STREAM) { - auto stream_load_ctx = - ExecEnv::GetInstance()->new_load_stream_mgr()->get(params.load_id); - if (!stream_load_ctx) { - st = Status::InternalError("unknown stream load id: {}", - UniqueId(params.load_id).to_string()); - } - } result->set_column_nums(col_names.size()); for (size_t idx = 0; idx < col_names.size(); ++idx) { result->add_column_names(col_names[idx]); diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp b/be/src/vec/exec/format/csv/csv_reader.cpp index bdfc685a3a..2382cb92de 100644 --- a/be/src/vec/exec/format/csv/csv_reader.cpp +++ b/be/src/vec/exec/format/csv/csv_reader.cpp @@ -294,7 +294,8 @@ Status CsvReader::init_reader(bool is_load) { } if (_params.file_type == TFileType::FILE_STREAM) { - RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, &_file_reader, _state)); + RETURN_IF_ERROR( + FileFactory::create_pipe_reader(_range.load_id, &_file_reader, _state, false)); } else { _file_description.mtime = _range.__isset.modification_time ? _range.modification_time : 0; io::FileReaderOptions reader_options = @@ -824,7 +825,9 @@ Status CsvReader::_prepare_parse(size_t* read_line, bool* is_parse_name) { io::FileReaderOptions reader_options = FileFactory::get_reader_options(_state, _file_description); if (_params.file_type == TFileType::FILE_STREAM) { - RETURN_IF_ERROR(FileFactory::create_pipe_reader(_params.load_id, &_file_reader, _state)); + // Due to http_stream needs to pre read a portion of the data to parse column information, so it is set to true here + RETURN_IF_ERROR( + FileFactory::create_pipe_reader(_params.load_id, &_file_reader, _state, true)); } else { RETURN_IF_ERROR(FileFactory::create_file_reader(_system_properties, _file_description, reader_options, &_file_system, diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp b/be/src/vec/exec/format/json/new_json_reader.cpp index 6ad72de22a..86b58fdc0a 100644 --- a/be/src/vec/exec/format/json/new_json_reader.cpp +++ b/be/src/vec/exec/format/json/new_json_reader.cpp @@ -170,7 +170,7 @@ Status NewJsonReader::init_reader( } RETURN_IF_ERROR(_get_range_params()); - RETURN_IF_ERROR(_open_file_reader()); + RETURN_IF_ERROR(_open_file_reader(false)); if (_read_json_by_line) { RETURN_IF_ERROR(_open_line_reader()); } @@ -237,7 +237,7 @@ Status NewJsonReader::get_parsed_schema(std::vector* col_names, std::vector* col_types) { RETURN_IF_ERROR(_get_range_params()); - RETURN_IF_ERROR(_open_file_reader()); + RETURN_IF_ERROR(_open_file_reader(true)); if (_read_json_by_line) { RETURN_IF_ERROR(_open_line_reader()); } @@ -373,7 +373,7 @@ Status NewJsonReader::_get_range_params() { return Status::OK(); } -Status NewJsonReader::_open_file_reader() { +Status NewJsonReader::_open_file_reader(bool need_schema) { int64_t start_offset = _range.start_offset; if (start_offset != 0) { start_offset -= 1; @@ -382,7 +382,9 @@ Status NewJsonReader::_open_file_reader() { _current_offset = start_offset; if (_params.file_type == TFileType::FILE_STREAM) { - RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, &_file_reader, _state)); + // Due to http_stream needs to pre read a portion of the data to parse column information, so it is set to true here + RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, &_file_reader, _state, + need_schema)); } else { _file_description.mtime = _range.__isset.modification_time ? _range.modification_time : 0; io::FileReaderOptions reader_options = @@ -978,7 +980,7 @@ Status NewJsonReader::_read_one_message(std::unique_ptr* file_buf, si Status NewJsonReader::_simdjson_init_reader() { RETURN_IF_ERROR(_get_range_params()); - RETURN_IF_ERROR(_open_file_reader()); + RETURN_IF_ERROR(_open_file_reader(false)); if (_read_json_by_line) { RETURN_IF_ERROR(_open_line_reader()); } diff --git a/be/src/vec/exec/format/json/new_json_reader.h b/be/src/vec/exec/format/json/new_json_reader.h index 2f4acf91c6..f40fe4c9f9 100644 --- a/be/src/vec/exec/format/json/new_json_reader.h +++ b/be/src/vec/exec/format/json/new_json_reader.h @@ -98,7 +98,7 @@ private: Status _get_range_params(); void _init_system_properties(); void _init_file_description(); - Status _open_file_reader(); + Status _open_file_reader(bool need_schema); Status _open_line_reader(); Status _parse_jsonpath_and_json_root();