[fix](streamload) fix http_stream retry mechanism (#24978)

If a failure occurs, doris may retry. Due to ctx->is_read_schema is a global variable that has not been reset in a timely manner, which may cause exceptions.


---------

Co-authored-by: yiguolei <676222867@qq.com>
This commit is contained in:
zzzzzzzs
2023-10-08 11:16:21 +08:00
committed by GitHub
parent feb1cbe9ed
commit 6fe060b79e
8 changed files with 21 additions and 24 deletions

View File

@ -245,13 +245,13 @@ void HttpStreamAction::on_chunk_data(HttpRequest* req) {
if (ctx->schema_buffer->pos + remove_bytes < config::stream_tvf_buffer_size) {
ctx->schema_buffer->put_bytes(bb->ptr, remove_bytes);
} else {
ctx->need_schema = true;
LOG(INFO) << "use a portion of data to request fe to obtain column information";
ctx->is_read_schema = false;
ctx->status = _process_put(req, ctx);
}
}
if (!st.ok()) {
if (!st.ok() && !ctx->status.ok()) {
LOG(WARNING) << "append body content failed. errmsg=" << st << ", " << ctx->brief();
ctx->status = st;
return;
@ -260,7 +260,8 @@ void HttpStreamAction::on_chunk_data(HttpRequest* req) {
}
// after all the data has been read and it has not reached 1M, it will execute here
if (ctx->is_read_schema) {
ctx->need_schema = true;
LOG(INFO) << "after all the data has been read and it has not reached 1M, it will execute "
<< "here";
ctx->is_read_schema = false;
ctx->status = _process_put(req, ctx);
}

View File

@ -141,12 +141,13 @@ Status FileFactory::create_file_reader(const io::FileSystemProperties& system_pr
// file scan node/stream load pipe
Status FileFactory::create_pipe_reader(const TUniqueId& load_id, io::FileReaderSPtr* file_reader,
RuntimeState* runtime_state) {
RuntimeState* runtime_state, bool need_schema) {
auto stream_load_ctx = ExecEnv::GetInstance()->new_load_stream_mgr()->get(load_id);
if (!stream_load_ctx) {
return Status::InternalError("unknown stream load id: {}", UniqueId(load_id).to_string());
}
if (stream_load_ctx->need_schema == true) {
if (need_schema == true) {
// Here, a portion of the data is processed to parse column information
auto pipe = std::make_shared<io::StreamLoadPipe>(
io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /* min_chunk_size */,
stream_load_ctx->schema_buffer->pos /* total_length */);
@ -154,7 +155,6 @@ Status FileFactory::create_pipe_reader(const TUniqueId& load_id, io::FileReaderS
static_cast<void>(pipe->append(stream_load_ctx->schema_buffer));
static_cast<void>(pipe->finish());
*file_reader = std::move(pipe);
stream_load_ctx->need_schema = false;
} else {
*file_reader = stream_load_ctx->pipe;
}

View File

@ -85,7 +85,7 @@ public:
// Create FileReader for stream load pipe
static Status create_pipe_reader(const TUniqueId& load_id, io::FileReaderSPtr* file_reader,
RuntimeState* runtime_state);
RuntimeState* runtime_state, bool need_schema);
static Status create_hdfs_reader(const THdfsParams& hdfs_params, const io::FileDescription& fd,
const io::FileReaderOptions& reader_options,

View File

@ -165,7 +165,6 @@ public:
int64_t txn_id = default_txn_id;
// http stream
bool need_schema = false;
bool is_read_schema = true;
std::string txn_operation = "";

View File

@ -682,14 +682,6 @@ void PInternalServiceImpl::fetch_table_schema(google::protobuf::RpcController* c
st.to_protobuf(result->mutable_status());
return;
}
if (params.file_type == TFileType::FILE_STREAM) {
auto stream_load_ctx =
ExecEnv::GetInstance()->new_load_stream_mgr()->get(params.load_id);
if (!stream_load_ctx) {
st = Status::InternalError("unknown stream load id: {}",
UniqueId(params.load_id).to_string());
}
}
result->set_column_nums(col_names.size());
for (size_t idx = 0; idx < col_names.size(); ++idx) {
result->add_column_names(col_names[idx]);

View File

@ -294,7 +294,8 @@ Status CsvReader::init_reader(bool is_load) {
}
if (_params.file_type == TFileType::FILE_STREAM) {
RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, &_file_reader, _state));
RETURN_IF_ERROR(
FileFactory::create_pipe_reader(_range.load_id, &_file_reader, _state, false));
} else {
_file_description.mtime = _range.__isset.modification_time ? _range.modification_time : 0;
io::FileReaderOptions reader_options =
@ -824,7 +825,9 @@ Status CsvReader::_prepare_parse(size_t* read_line, bool* is_parse_name) {
io::FileReaderOptions reader_options =
FileFactory::get_reader_options(_state, _file_description);
if (_params.file_type == TFileType::FILE_STREAM) {
RETURN_IF_ERROR(FileFactory::create_pipe_reader(_params.load_id, &_file_reader, _state));
// Due to http_stream needs to pre read a portion of the data to parse column information, so it is set to true here
RETURN_IF_ERROR(
FileFactory::create_pipe_reader(_params.load_id, &_file_reader, _state, true));
} else {
RETURN_IF_ERROR(FileFactory::create_file_reader(_system_properties, _file_description,
reader_options, &_file_system,

View File

@ -170,7 +170,7 @@ Status NewJsonReader::init_reader(
}
RETURN_IF_ERROR(_get_range_params());
RETURN_IF_ERROR(_open_file_reader());
RETURN_IF_ERROR(_open_file_reader(false));
if (_read_json_by_line) {
RETURN_IF_ERROR(_open_line_reader());
}
@ -237,7 +237,7 @@ Status NewJsonReader::get_parsed_schema(std::vector<std::string>* col_names,
std::vector<TypeDescriptor>* col_types) {
RETURN_IF_ERROR(_get_range_params());
RETURN_IF_ERROR(_open_file_reader());
RETURN_IF_ERROR(_open_file_reader(true));
if (_read_json_by_line) {
RETURN_IF_ERROR(_open_line_reader());
}
@ -373,7 +373,7 @@ Status NewJsonReader::_get_range_params() {
return Status::OK();
}
Status NewJsonReader::_open_file_reader() {
Status NewJsonReader::_open_file_reader(bool need_schema) {
int64_t start_offset = _range.start_offset;
if (start_offset != 0) {
start_offset -= 1;
@ -382,7 +382,9 @@ Status NewJsonReader::_open_file_reader() {
_current_offset = start_offset;
if (_params.file_type == TFileType::FILE_STREAM) {
RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, &_file_reader, _state));
// Due to http_stream needs to pre read a portion of the data to parse column information, so it is set to true here
RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, &_file_reader, _state,
need_schema));
} else {
_file_description.mtime = _range.__isset.modification_time ? _range.modification_time : 0;
io::FileReaderOptions reader_options =
@ -978,7 +980,7 @@ Status NewJsonReader::_read_one_message(std::unique_ptr<uint8_t[]>* file_buf, si
Status NewJsonReader::_simdjson_init_reader() {
RETURN_IF_ERROR(_get_range_params());
RETURN_IF_ERROR(_open_file_reader());
RETURN_IF_ERROR(_open_file_reader(false));
if (_read_json_by_line) {
RETURN_IF_ERROR(_open_line_reader());
}

View File

@ -98,7 +98,7 @@ private:
Status _get_range_params();
void _init_system_properties();
void _init_file_description();
Status _open_file_reader();
Status _open_file_reader(bool need_schema);
Status _open_line_reader();
Status _parse_jsonpath_and_json_root();