From 4df70becb9dedbc659ec76c5fe3af5d1055b3f8f Mon Sep 17 00:00:00 2001 From: luozenglin <37725793+luozenglin@users.noreply.github.com> Date: Fri, 3 Feb 2023 16:51:19 +0800 Subject: [PATCH] [refactor](reader) refactor broker_file_reader to get _client in the constructor (#16021) --- be/src/io/fs/broker_file_reader.cpp | 17 +++++++---------- be/src/io/fs/broker_file_reader.h | 1 + 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/be/src/io/fs/broker_file_reader.cpp b/be/src/io/fs/broker_file_reader.cpp index 3d3dbfd802..b708627660 100644 --- a/be/src/io/fs/broker_file_reader.cpp +++ b/be/src/io/fs/broker_file_reader.cpp @@ -34,6 +34,7 @@ BrokerFileReader::BrokerFileReader(const TNetworkAddress& broker_addr, const Pat _broker_addr(broker_addr), _fd(fd), _fs(std::move(fs)) { + _fs->get_client(&_client); DorisMetrics::instance()->broker_file_open_reading->increment(1); DorisMetrics::instance()->broker_file_reader_total->increment(1); } @@ -51,14 +52,12 @@ Status BrokerFileReader::close() { TBrokerOperationStatus response; try { - std::shared_ptr client; - RETURN_IF_ERROR(_fs->get_client(&client)); try { - (*client)->closeReader(response, request); + (*_client)->closeReader(response, request); } catch (apache::thrift::transport::TTransportException& e) { std::this_thread::sleep_for(std::chrono::seconds(1)); - RETURN_IF_ERROR((*client).reopen()); - (*client)->closeReader(response, request); + RETURN_IF_ERROR((*_client).reopen()); + (*_client)->closeReader(response, request); } } catch (apache::thrift::TException& e) { std::stringstream ss; @@ -95,18 +94,16 @@ Status BrokerFileReader::read_at(size_t offset, Slice result, const IOContext& / request.__set_length(bytes_req); TBrokerReadResponse response; - std::shared_ptr client; - RETURN_IF_ERROR(_fs->get_client(&client)); try { VLOG_RPC << "send pread request to broker:" << _broker_addr << " position:" << offset << ", read bytes length:" << bytes_req; try { - (*client)->pread(response, request); + (*_client)->pread(response, request); } catch (apache::thrift::transport::TTransportException& e) { std::this_thread::sleep_for(std::chrono::seconds(1)); - RETURN_IF_ERROR((*client).reopen()); + RETURN_IF_ERROR((*_client).reopen()); LOG(INFO) << "retry reading from broker: " << _broker_addr << ". reason: " << e.what(); - (*client)->pread(response, request); + (*_client)->pread(response, request); } } catch (apache::thrift::TException& e) { std::stringstream ss; diff --git a/be/src/io/fs/broker_file_reader.h b/be/src/io/fs/broker_file_reader.h index ae6b630597..5a753462bd 100644 --- a/be/src/io/fs/broker_file_reader.h +++ b/be/src/io/fs/broker_file_reader.h @@ -58,6 +58,7 @@ private: TBrokerFD _fd; std::shared_ptr _fs; + std::shared_ptr _client; std::atomic _closed = false; }; } // namespace io