backport #41550
This commit is contained in:
@ -135,7 +135,7 @@ LoadStreamStub::LoadStreamStub(PUniqueId load_id, int64_t src_id,
|
||||
_is_incremental(incremental) {};
|
||||
|
||||
LoadStreamStub::~LoadStreamStub() {
|
||||
if (_is_init.load() && !_is_closed.load()) {
|
||||
if (_is_open.load() && !_is_closed.load()) {
|
||||
auto ret = brpc::StreamClose(_stream_id);
|
||||
LOG(INFO) << *this << " is deconstructed, close " << (ret == 0 ? "success" : "failed");
|
||||
}
|
||||
@ -149,8 +149,9 @@ Status LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
|
||||
int64_t idle_timeout_ms, bool enable_profile) {
|
||||
std::unique_lock<bthread::Mutex> lock(_open_mutex);
|
||||
if (_is_init.load()) {
|
||||
return _init_st;
|
||||
return _status;
|
||||
}
|
||||
_is_init.store(true);
|
||||
_dst_id = node_info.id;
|
||||
brpc::StreamOptions opt;
|
||||
opt.max_buf_size = config::load_stream_max_buf_size;
|
||||
@ -160,8 +161,8 @@ Status LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
|
||||
brpc::Controller cntl;
|
||||
if (int ret = brpc::StreamCreate(&_stream_id, cntl, &opt)) {
|
||||
delete opt.handler;
|
||||
_init_st = Status::Error<true>(ret, "Failed to create stream");
|
||||
return _init_st;
|
||||
_status = Status::Error<true>(ret, "Failed to create stream");
|
||||
return _status;
|
||||
}
|
||||
cntl.set_timeout_ms(config::open_load_stream_timeout_ms);
|
||||
POpenLoadStreamRequest request;
|
||||
@ -174,8 +175,8 @@ Status LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
|
||||
} else if (total_streams > 0) {
|
||||
request.set_total_streams(total_streams);
|
||||
} else {
|
||||
_init_st = Status::InternalError("total_streams should be greator than 0");
|
||||
return _init_st;
|
||||
_status = Status::InternalError("total_streams should be greator than 0");
|
||||
return _status;
|
||||
}
|
||||
request.set_idle_timeout_ms(idle_timeout_ms);
|
||||
schema.to_protobuf(request.mutable_schema());
|
||||
@ -199,13 +200,13 @@ Status LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
|
||||
}
|
||||
if (cntl.Failed()) {
|
||||
brpc::StreamClose(_stream_id);
|
||||
_init_st = Status::InternalError("Failed to connect to backend {}: {}", _dst_id,
|
||||
cntl.ErrorText());
|
||||
return _init_st;
|
||||
_status = Status::InternalError("Failed to connect to backend {}: {}", _dst_id,
|
||||
cntl.ErrorText());
|
||||
return _status;
|
||||
}
|
||||
LOG(INFO) << "open load stream to host=" << node_info.host << ", port=" << node_info.brpc_port
|
||||
<< ", " << *this;
|
||||
_is_init.store(true);
|
||||
_is_open.store(true);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -213,9 +214,9 @@ Status LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
|
||||
Status LoadStreamStub::append_data(int64_t partition_id, int64_t index_id, int64_t tablet_id,
|
||||
int64_t segment_id, uint64_t offset, std::span<const Slice> data,
|
||||
bool segment_eos) {
|
||||
if (!_is_init.load()) {
|
||||
add_failed_tablet(tablet_id, _init_st);
|
||||
return _init_st;
|
||||
if (!_is_open.load()) {
|
||||
add_failed_tablet(tablet_id, _status);
|
||||
return _status;
|
||||
}
|
||||
DBUG_EXECUTE_IF("LoadStreamStub.only_send_segment_0", {
|
||||
if (segment_id != 0) {
|
||||
@ -239,9 +240,9 @@ Status LoadStreamStub::append_data(int64_t partition_id, int64_t index_id, int64
|
||||
Status LoadStreamStub::add_segment(int64_t partition_id, int64_t index_id, int64_t tablet_id,
|
||||
int64_t segment_id, const SegmentStatistics& segment_stat,
|
||||
TabletSchemaSPtr flush_schema) {
|
||||
if (!_is_init.load()) {
|
||||
add_failed_tablet(tablet_id, _init_st);
|
||||
return _init_st;
|
||||
if (!_is_open.load()) {
|
||||
add_failed_tablet(tablet_id, _status);
|
||||
return _status;
|
||||
}
|
||||
DBUG_EXECUTE_IF("LoadStreamStub.only_send_segment_0", {
|
||||
if (segment_id != 0) {
|
||||
@ -265,8 +266,8 @@ Status LoadStreamStub::add_segment(int64_t partition_id, int64_t index_id, int64
|
||||
|
||||
// CLOSE_LOAD
|
||||
Status LoadStreamStub::close_load(const std::vector<PTabletID>& tablets_to_commit) {
|
||||
if (!_is_init.load()) {
|
||||
return _init_st;
|
||||
if (!_is_open.load()) {
|
||||
return _status;
|
||||
}
|
||||
PStreamHeader header;
|
||||
*header.mutable_load_id() = _load_id;
|
||||
@ -275,10 +276,10 @@ Status LoadStreamStub::close_load(const std::vector<PTabletID>& tablets_to_commi
|
||||
for (const auto& tablet : tablets_to_commit) {
|
||||
*header.add_tablets() = tablet;
|
||||
}
|
||||
_close_st = _encode_and_send(header);
|
||||
if (!_close_st.ok()) {
|
||||
LOG(WARNING) << "stream " << _stream_id << " close failed: " << _close_st;
|
||||
return _close_st;
|
||||
_status = _encode_and_send(header);
|
||||
if (!_status.ok()) {
|
||||
LOG(WARNING) << "stream " << _stream_id << " close failed: " << _status;
|
||||
return _status;
|
||||
}
|
||||
_is_closing.store(true);
|
||||
return Status::OK();
|
||||
@ -286,8 +287,8 @@ Status LoadStreamStub::close_load(const std::vector<PTabletID>& tablets_to_commi
|
||||
|
||||
// GET_SCHEMA
|
||||
Status LoadStreamStub::get_schema(const std::vector<PTabletID>& tablets) {
|
||||
if (!_is_init.load()) {
|
||||
return _init_st;
|
||||
if (!_is_open.load()) {
|
||||
return _status;
|
||||
}
|
||||
PStreamHeader header;
|
||||
*header.mutable_load_id() = _load_id;
|
||||
@ -309,8 +310,8 @@ Status LoadStreamStub::get_schema(const std::vector<PTabletID>& tablets) {
|
||||
|
||||
Status LoadStreamStub::wait_for_schema(int64_t partition_id, int64_t index_id, int64_t tablet_id,
|
||||
int64_t timeout_ms) {
|
||||
if (!_is_init.load()) {
|
||||
return _init_st;
|
||||
if (!_is_open.load()) {
|
||||
return _status;
|
||||
}
|
||||
if (_tablet_schema_for_index->contains(index_id)) {
|
||||
return Status::OK();
|
||||
@ -337,11 +338,8 @@ Status LoadStreamStub::wait_for_schema(int64_t partition_id, int64_t index_id, i
|
||||
|
||||
Status LoadStreamStub::close_wait(RuntimeState* state, int64_t timeout_ms) {
|
||||
DBUG_EXECUTE_IF("LoadStreamStub::close_wait.long_wait", DBUG_BLOCK);
|
||||
if (!_is_init.load()) {
|
||||
return _init_st;
|
||||
}
|
||||
if (!_is_closing.load()) {
|
||||
return _close_st;
|
||||
return _status;
|
||||
}
|
||||
if (_is_closed.load()) {
|
||||
return _check_cancel();
|
||||
@ -370,7 +368,7 @@ Status LoadStreamStub::close_wait(RuntimeState* state, int64_t timeout_ms) {
|
||||
|
||||
void LoadStreamStub::cancel(Status reason) {
|
||||
LOG(WARNING) << *this << " is cancelled because of " << reason;
|
||||
if (_is_init.load()) {
|
||||
if (_is_open.load()) {
|
||||
brpc::StreamClose(_stream_id);
|
||||
}
|
||||
{
|
||||
|
||||
@ -195,7 +195,7 @@ public:
|
||||
|
||||
int64_t dst_id() const { return _dst_id; }
|
||||
|
||||
bool is_inited() const { return _is_init.load(); }
|
||||
bool is_open() const { return _is_open.load(); }
|
||||
|
||||
bool is_incremental() const { return _is_incremental; }
|
||||
|
||||
@ -231,6 +231,7 @@ private:
|
||||
|
||||
protected:
|
||||
std::atomic<bool> _is_init;
|
||||
std::atomic<bool> _is_open;
|
||||
std::atomic<bool> _is_closing;
|
||||
std::atomic<bool> _is_closed;
|
||||
std::atomic<bool> _is_cancelled;
|
||||
@ -240,8 +241,7 @@ protected:
|
||||
brpc::StreamId _stream_id;
|
||||
int64_t _src_id = -1; // source backend_id
|
||||
int64_t _dst_id = -1; // destination backend_id
|
||||
Status _init_st = Status::InternalError<false>("Stream is not open");
|
||||
Status _close_st;
|
||||
Status _status = Status::InternalError<false>("Stream is not open");
|
||||
Status _cancel_st;
|
||||
|
||||
bthread::Mutex _open_mutex;
|
||||
|
||||
@ -389,7 +389,7 @@ Status VTabletWriterV2::_select_streams(int64_t tablet_id, int64_t partition_id,
|
||||
VLOG_DEBUG << fmt::format("_select_streams P{} I{} T{}", partition_id, index_id, tablet_id);
|
||||
_tablets_for_node[node_id].emplace(tablet_id, tablet);
|
||||
auto stream = _load_stream_map->at(node_id)->at(_stream_index);
|
||||
for (int i = 1; i < _stream_per_node && !stream->is_inited(); i++) {
|
||||
for (int i = 1; i < _stream_per_node && !stream->is_open(); i++) {
|
||||
stream = _load_stream_map->at(node_id)->at((_stream_index + i) % _stream_per_node);
|
||||
}
|
||||
streams.emplace_back(std::move(stream));
|
||||
|
||||
Reference in New Issue
Block a user