[refactor](reader) refactor broker_file_reader to get _client in the constructor (#16021)
This commit is contained in:
@ -34,6 +34,7 @@ BrokerFileReader::BrokerFileReader(const TNetworkAddress& broker_addr, const Pat
|
||||
_broker_addr(broker_addr),
|
||||
_fd(fd),
|
||||
_fs(std::move(fs)) {
|
||||
_fs->get_client(&_client);
|
||||
DorisMetrics::instance()->broker_file_open_reading->increment(1);
|
||||
DorisMetrics::instance()->broker_file_reader_total->increment(1);
|
||||
}
|
||||
@ -51,14 +52,12 @@ Status BrokerFileReader::close() {
|
||||
|
||||
TBrokerOperationStatus response;
|
||||
try {
|
||||
std::shared_ptr<BrokerServiceConnection> client;
|
||||
RETURN_IF_ERROR(_fs->get_client(&client));
|
||||
try {
|
||||
(*client)->closeReader(response, request);
|
||||
(*_client)->closeReader(response, request);
|
||||
} catch (apache::thrift::transport::TTransportException& e) {
|
||||
std::this_thread::sleep_for(std::chrono::seconds(1));
|
||||
RETURN_IF_ERROR((*client).reopen());
|
||||
(*client)->closeReader(response, request);
|
||||
RETURN_IF_ERROR((*_client).reopen());
|
||||
(*_client)->closeReader(response, request);
|
||||
}
|
||||
} catch (apache::thrift::TException& e) {
|
||||
std::stringstream ss;
|
||||
@ -95,18 +94,16 @@ Status BrokerFileReader::read_at(size_t offset, Slice result, const IOContext& /
|
||||
request.__set_length(bytes_req);
|
||||
|
||||
TBrokerReadResponse response;
|
||||
std::shared_ptr<BrokerServiceConnection> client;
|
||||
RETURN_IF_ERROR(_fs->get_client(&client));
|
||||
try {
|
||||
VLOG_RPC << "send pread request to broker:" << _broker_addr << " position:" << offset
|
||||
<< ", read bytes length:" << bytes_req;
|
||||
try {
|
||||
(*client)->pread(response, request);
|
||||
(*_client)->pread(response, request);
|
||||
} catch (apache::thrift::transport::TTransportException& e) {
|
||||
std::this_thread::sleep_for(std::chrono::seconds(1));
|
||||
RETURN_IF_ERROR((*client).reopen());
|
||||
RETURN_IF_ERROR((*_client).reopen());
|
||||
LOG(INFO) << "retry reading from broker: " << _broker_addr << ". reason: " << e.what();
|
||||
(*client)->pread(response, request);
|
||||
(*_client)->pread(response, request);
|
||||
}
|
||||
} catch (apache::thrift::TException& e) {
|
||||
std::stringstream ss;
|
||||
|
||||
@ -58,6 +58,7 @@ private:
|
||||
TBrokerFD _fd;
|
||||
|
||||
std::shared_ptr<BrokerFileSystem> _fs;
|
||||
std::shared_ptr<BrokerServiceConnection> _client;
|
||||
std::atomic<bool> _closed = false;
|
||||
};
|
||||
} // namespace io
|
||||
|
||||
Reference in New Issue
Block a user