[Bug](storage )fix dead lock when create_tablet need lock two tablet && update mv_p0… (#21969)

fix dead lock when create_tablet need lock two tablet && update mv_p0/ssb case
This commit is contained in:
Pxl
2023-07-22 15:27:05 +08:00
committed by GitHub
parent 50c8563f35
commit ae809fbeba
10 changed files with 450 additions and 33 deletions

View File

@ -97,15 +97,14 @@ class TExpr;
namespace stream_load {
IndexChannel::~IndexChannel() {}
IndexChannel::~IndexChannel() = default;
Status IndexChannel::init(RuntimeState* state, const std::vector<TTabletWithPartition>& tablets) {
SCOPED_CONSUME_MEM_TRACKER(_index_channel_tracker.get());
for (auto& tablet : tablets) {
auto location = _parent->_location->find_tablet(tablet.tablet_id);
if (location == nullptr) {
LOG(WARNING) << "unknown tablet, tablet_id=" << tablet.tablet_id;
return Status::InternalError("unknown tablet");
return Status::InternalError("unknown tablet, tablet_id={}", tablet.tablet_id);
}
std::vector<std::shared_ptr<VNodeChannel>> channels;
for (auto& node_id : location->node_ids) {
@ -209,14 +208,12 @@ Status IndexChannel::check_tablet_received_rows_consistency() {
continue;
}
if (tablet.second[i].second != tablet.second[0].second) {
LOG(WARNING) << "rows num doest't match, load_id: " << _parent->_load_id
<< ", txn_id: " << std::to_string(_parent->_txn_id)
<< ", tablt_id: " << tablet.first
<< ", node_id: " << tablet.second[i].first
<< ", rows_num: " << tablet.second[i].second
<< ", node_id: " << tablet.second[0].first
<< ", rows_num: " << tablet.second[0].second;
return Status::InternalError("rows num written by multi replicas doest't match");
return Status::InternalError(
"rows num written by multi replicas doest't match, load_id={}, txn_id={}, "
"tablt_id={}, node_id={}, rows_num={}, node_id={}, rows_num={}",
print_id(_parent->_load_id), _parent->_txn_id, tablet.first,
tablet.second[i].first, tablet.second[i].second, tablet.second[0].first,
tablet.second[0].second);
}
}
}
@ -278,10 +275,9 @@ Status VNodeChannel::init(RuntimeState* state) {
_stub = state->exec_env()->brpc_internal_client_cache()->get_client(_node_info.host,
_node_info.brpc_port);
if (_stub == nullptr) {
LOG(WARNING) << "Get rpc stub failed, host=" << _node_info.host
<< ", port=" << _node_info.brpc_port << ", " << channel_info();
_cancelled = true;
return Status::InternalError("get rpc stub failed");
return Status::InternalError("Get rpc stub failed, host={}, port={}, info={}",
_node_info.host, _node_info.brpc_port, channel_info());
}
_rpc_timeout_ms = state->execution_timeout() * 1000;
@ -351,19 +347,17 @@ Status VNodeChannel::open_wait() {
ExecEnv::GetInstance()->brpc_internal_client_cache()->erase(
_open_closure->cntl.remote_side());
}
std::stringstream ss;
ss << "failed to open tablet writer, error=" << berror(_open_closure->cntl.ErrorCode())
<< ", error_text=" << _open_closure->cntl.ErrorText();
_cancelled = true;
LOG(WARNING) << ss.str() << " " << channel_info();
auto error_code = _open_closure->cntl.ErrorCode();
auto error_text = _open_closure->cntl.ErrorText();
if (_open_closure->unref()) {
delete _open_closure;
}
_open_closure = nullptr;
return Status::InternalError("failed to open tablet writer, error={}, error_text={}",
berror(error_code), error_text);
return Status::InternalError(
"failed to open tablet writer, error={}, error_text={}, info={}",
berror(error_code), error_text, channel_info());
}
Status status(Status::create(_open_closure->result.status()));
if (_open_closure->unref()) {