From dec1eb360c60de7f86f5757e45acfbe76ae671c1 Mon Sep 17 00:00:00 2001 From: luozenglin <37725793+luozenglin@users.noreply.github.com> Date: Fri, 30 Dec 2022 15:37:33 +0800 Subject: [PATCH] [fix](brokerload) be core dump caused by broker load orc format file nullptr pointer (#15460) --- be/src/io/fs/broker_file_reader.cpp | 21 +++++++++------------ be/src/io/fs/broker_file_reader.h | 3 ++- 2 files changed, 11 insertions(+), 13 deletions(-) diff --git a/be/src/io/fs/broker_file_reader.cpp b/be/src/io/fs/broker_file_reader.cpp index e8e54cb736..4ddd0ece2c 100644 --- a/be/src/io/fs/broker_file_reader.cpp +++ b/be/src/io/fs/broker_file_reader.cpp @@ -28,13 +28,14 @@ namespace io { BrokerFileReader::BrokerFileReader(const TNetworkAddress& broker_addr, const Path& path, size_t file_size, TBrokerFD fd, BrokerFileSystem* fs) - : _path(path), _file_size(file_size), _broker_addr(broker_addr), _fd(fd), _fs(fs) { + : _path(path), _file_size(file_size), _broker_addr(broker_addr), _fd(fd) { + fs->get_client(&_client); DorisMetrics::instance()->broker_file_open_reading->increment(1); DorisMetrics::instance()->broker_file_reader_total->increment(1); } BrokerFileReader::~BrokerFileReader() { - close(); + BrokerFileReader::close(); } Status BrokerFileReader::close() { @@ -46,14 +47,12 @@ Status BrokerFileReader::close() { TBrokerOperationStatus response; try { - std::shared_ptr client; - RETURN_IF_ERROR(_fs->get_client(&client)); try { - (*client)->closeReader(response, request); + (*_client)->closeReader(response, request); } catch (apache::thrift::transport::TTransportException& e) { std::this_thread::sleep_for(std::chrono::seconds(1)); - RETURN_IF_ERROR((*client).reopen()); - (*client)->closeReader(response, request); + RETURN_IF_ERROR((*_client).reopen()); + (*_client)->closeReader(response, request); } } catch (apache::thrift::TException& e) { std::stringstream ss; @@ -90,18 +89,16 @@ Status BrokerFileReader::read_at(size_t offset, Slice result, const IOContext& / request.__set_length(bytes_req); TBrokerReadResponse response; - std::shared_ptr client; - RETURN_IF_ERROR(_fs->get_client(&client)); try { VLOG_RPC << "send pread request to broker:" << _broker_addr << " position:" << offset << ", read bytes length:" << bytes_req; try { - (*client)->pread(response, request); + (*_client)->pread(response, request); } catch (apache::thrift::transport::TTransportException& e) { std::this_thread::sleep_for(std::chrono::seconds(1)); - RETURN_IF_ERROR((*client).reopen()); + RETURN_IF_ERROR((*_client).reopen()); LOG(INFO) << "retry reading from broker: " << _broker_addr << ". reason: " << e.what(); - (*client)->pread(response, request); + (*_client)->pread(response, request); } } catch (apache::thrift::TException& e) { std::stringstream ss; diff --git a/be/src/io/fs/broker_file_reader.h b/be/src/io/fs/broker_file_reader.h index 70b417b058..8f60d8266c 100644 --- a/be/src/io/fs/broker_file_reader.h +++ b/be/src/io/fs/broker_file_reader.h @@ -23,6 +23,7 @@ #include #include "io/fs/file_reader.h" +#include "runtime/client_cache.h" namespace doris { namespace io { @@ -53,8 +54,8 @@ private: const TNetworkAddress& _broker_addr; TBrokerFD _fd; - BrokerFileSystem* _fs; std::atomic _closed = false; + std::shared_ptr _client; }; } // namespace io } // namespace doris