[fix](brokerload) be core dump caused by broker load orc format file nullptr pointer (#15460)

This commit is contained in:
luozenglin
2022-12-30 15:37:33 +08:00
committed by GitHub
parent 2f572ccc43
commit dec1eb360c
2 changed files with 11 additions and 13 deletions

View File

@ -28,13 +28,14 @@ namespace io {
BrokerFileReader::BrokerFileReader(const TNetworkAddress& broker_addr, const Path& path,
size_t file_size, TBrokerFD fd, BrokerFileSystem* fs)
: _path(path), _file_size(file_size), _broker_addr(broker_addr), _fd(fd), _fs(fs) {
: _path(path), _file_size(file_size), _broker_addr(broker_addr), _fd(fd) {
fs->get_client(&_client);
DorisMetrics::instance()->broker_file_open_reading->increment(1);
DorisMetrics::instance()->broker_file_reader_total->increment(1);
}
BrokerFileReader::~BrokerFileReader() {
close();
BrokerFileReader::close();
}
Status BrokerFileReader::close() {
@ -46,14 +47,12 @@ Status BrokerFileReader::close() {
TBrokerOperationStatus response;
try {
std::shared_ptr<BrokerServiceConnection> client;
RETURN_IF_ERROR(_fs->get_client(&client));
try {
(*client)->closeReader(response, request);
(*_client)->closeReader(response, request);
} catch (apache::thrift::transport::TTransportException& e) {
std::this_thread::sleep_for(std::chrono::seconds(1));
RETURN_IF_ERROR((*client).reopen());
(*client)->closeReader(response, request);
RETURN_IF_ERROR((*_client).reopen());
(*_client)->closeReader(response, request);
}
} catch (apache::thrift::TException& e) {
std::stringstream ss;
@ -90,18 +89,16 @@ Status BrokerFileReader::read_at(size_t offset, Slice result, const IOContext& /
request.__set_length(bytes_req);
TBrokerReadResponse response;
std::shared_ptr<BrokerServiceConnection> client;
RETURN_IF_ERROR(_fs->get_client(&client));
try {
VLOG_RPC << "send pread request to broker:" << _broker_addr << " position:" << offset
<< ", read bytes length:" << bytes_req;
try {
(*client)->pread(response, request);
(*_client)->pread(response, request);
} catch (apache::thrift::transport::TTransportException& e) {
std::this_thread::sleep_for(std::chrono::seconds(1));
RETURN_IF_ERROR((*client).reopen());
RETURN_IF_ERROR((*_client).reopen());
LOG(INFO) << "retry reading from broker: " << _broker_addr << ". reason: " << e.what();
(*client)->pread(response, request);
(*_client)->pread(response, request);
}
} catch (apache::thrift::TException& e) {
std::stringstream ss;

View File

@ -23,6 +23,7 @@
#include <atomic>
#include "io/fs/file_reader.h"
#include "runtime/client_cache.h"
namespace doris {
namespace io {
@ -53,8 +54,8 @@ private:
const TNetworkAddress& _broker_addr;
TBrokerFD _fd;
BrokerFileSystem* _fs;
std::atomic<bool> _closed = false;
std::shared_ptr<BrokerServiceConnection> _client;
};
} // namespace io
} // namespace doris