[Bug] Use object to replace pointer to avoid BE crash (#7024)
use `NodeInfo _node_info` to replace `NodeInfo *_node_info`
This commit is contained in:
@ -243,6 +243,8 @@ struct NodeInfo {
|
||||
std::string host;
|
||||
int32_t brpc_port;
|
||||
|
||||
NodeInfo() = default;
|
||||
|
||||
NodeInfo(const TNodeInfo& tnode)
|
||||
: id(tnode.id),
|
||||
option(tnode.option),
|
||||
|
||||
@ -62,22 +62,24 @@ NodeChannel::~NodeChannel() {
|
||||
// returned directly via "TabletSink::prepare()" method.
|
||||
Status NodeChannel::init(RuntimeState* state) {
|
||||
_tuple_desc = _parent->_output_tuple_desc;
|
||||
_node_info = _parent->_nodes_info->find_node(_node_id);
|
||||
if (_node_info == nullptr) {
|
||||
auto node = _parent->_nodes_info->find_node(_node_id);
|
||||
if (node == nullptr) {
|
||||
std::stringstream ss;
|
||||
ss << "unknown node id, id=" << _node_id;
|
||||
_cancelled = true;
|
||||
return Status::InternalError(ss.str());
|
||||
}
|
||||
|
||||
_node_info = *node;
|
||||
|
||||
_row_desc.reset(new RowDescriptor(_tuple_desc, false));
|
||||
_batch_size = state->batch_size();
|
||||
_cur_batch.reset(new RowBatch(*_row_desc, _batch_size, _parent->_mem_tracker.get()));
|
||||
|
||||
_stub = state->exec_env()->brpc_stub_cache()->get_stub(_node_info->host, _node_info->brpc_port);
|
||||
if (!_stub) {
|
||||
LOG(WARNING) << "Get rpc stub failed, host=" << _node_info->host
|
||||
<< ", port=" << _node_info->brpc_port;
|
||||
_stub = state->exec_env()->brpc_stub_cache()->get_stub(_node_info.host, _node_info.brpc_port);
|
||||
if (_stub == nullptr) {
|
||||
LOG(WARNING) << "Get rpc stub failed, host=" << _node_info.host
|
||||
<< ", port=" << _node_info.brpc_port;
|
||||
_cancelled = true;
|
||||
return Status::InternalError("get rpc stub failed");
|
||||
}
|
||||
@ -143,8 +145,8 @@ void NodeChannel::_cancel_with_msg(const std::string& msg) {
|
||||
Status NodeChannel::open_wait() {
|
||||
_open_closure->join();
|
||||
if (_open_closure->cntl.Failed()) {
|
||||
if (!ExecEnv::GetInstance()->brpc_stub_cache()->available(_stub, _node_info->host,
|
||||
_node_info->brpc_port)) {
|
||||
if (!ExecEnv::GetInstance()->brpc_stub_cache()->available(_stub, _node_info.host,
|
||||
_node_info.brpc_port)) {
|
||||
ExecEnv::GetInstance()->brpc_stub_cache()->erase(_open_closure->cntl.remote_side());
|
||||
}
|
||||
std::stringstream ss;
|
||||
|
||||
@ -203,7 +203,7 @@ public:
|
||||
// FIXME(cmy): There is a problem that when calling node_info, the node_info seems not initialized.
|
||||
// But I don't know why. so here I print node_info->id instead of node_info->host
|
||||
// to avoid BE crash. It needs further observation.
|
||||
return fmt::format("{}, {}, node={}:{}", _name, _load_info, _node_info->id, _node_info->brpc_port);
|
||||
return fmt::format("{}, {}, node={}:{}", _name, _load_info, _node_info.id, _node_info.brpc_port);
|
||||
}
|
||||
|
||||
private:
|
||||
@ -218,7 +218,7 @@ private:
|
||||
std::string _name;
|
||||
|
||||
TupleDescriptor* _tuple_desc = nullptr;
|
||||
const NodeInfo* _node_info = nullptr;
|
||||
NodeInfo _node_info;
|
||||
|
||||
// this should be set in init() using config
|
||||
int _rpc_timeout_ms = 60000;
|
||||
|
||||
Reference in New Issue
Block a user