diff --git a/be/src/io/fs/stream_sink_file_writer.cpp b/be/src/io/fs/stream_sink_file_writer.cpp index 6726933513..9b2125c944 100644 --- a/be/src/io/fs/stream_sink_file_writer.cpp +++ b/be/src/io/fs/stream_sink_file_writer.cpp @@ -21,6 +21,7 @@ #include "olap/olap_common.h" #include "olap/rowset/beta_rowset_writer.h" +#include "util/uid_util.h" #include "vec/sink/load_stream_stub.h" namespace doris { @@ -45,9 +46,9 @@ Status StreamSinkFileWriter::appendv(const Slice* data, size_t data_cnt) { } _bytes_appended += bytes_req; - VLOG_DEBUG << "writer appendv, load_id: " << UniqueId(_load_id).to_string() - << ", index_id: " << _index_id << ", tablet_id: " << _tablet_id - << ", segment_id: " << _segment_id << ", data_length: " << bytes_req; + VLOG_DEBUG << "writer appendv, load_id: " << print_id(_load_id) << ", index_id: " << _index_id + << ", tablet_id: " << _tablet_id << ", segment_id: " << _segment_id + << ", data_length: " << bytes_req; std::span slices {data, data_cnt}; for (auto& stream : _streams) { @@ -58,9 +59,8 @@ Status StreamSinkFileWriter::appendv(const Slice* data, size_t data_cnt) { } Status StreamSinkFileWriter::finalize() { - VLOG_DEBUG << "writer finalize, load_id: " << UniqueId(_load_id).to_string() - << ", index_id: " << _index_id << ", tablet_id: " << _tablet_id - << ", segment_id: " << _segment_id; + VLOG_DEBUG << "writer finalize, load_id: " << print_id(_load_id) << ", index_id: " << _index_id + << ", tablet_id: " << _tablet_id << ", segment_id: " << _segment_id; // TODO(zhengyu): update get_inverted_index_file_size into stat for (auto& stream : _streams) { RETURN_IF_ERROR( diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp index 25e79ccab5..b313307c70 100644 --- a/be/src/runtime/load_stream.cpp +++ b/be/src/runtime/load_stream.cpp @@ -306,8 +306,8 @@ Status LoadStream::close(int64_t src_id, const std::vector& tablets_t } } LOG(INFO) << "close load " << *this - << ", failed_tablet_num=" << failed_tablet_ids->size() - << ", success_tablet_num=" << success_tablet_ids->size(); + << ", success_tablet_num=" << success_tablet_ids->size() + << ", failed_tablet_num=" << failed_tablet_ids->size(); std::unique_lock lock(mutex); cond.notify_one(); }); @@ -324,7 +324,7 @@ Status LoadStream::close(int64_t src_id, const std::vector& tablets_t void LoadStream::_report_result(StreamId stream, const Status& st, const std::vector& success_tablet_ids, const std::vector& failed_tablet_ids) { - LOG(INFO) << "report result, success tablet num " << success_tablet_ids.size() + LOG(INFO) << "report result " << *this << ", success tablet num " << success_tablet_ids.size() << ", failed tablet num " << failed_tablet_ids.size(); butil::IOBuf buf; PWriteStreamSinkResponse response; @@ -456,9 +456,8 @@ void LoadStream::_dispatch(StreamId id, const PStreamHeader& hdr, butil::IOBuf* VLOG_DEBUG << PStreamHeader_Opcode_Name(hdr.opcode()) << " from " << hdr.src_id() << " with tablet " << hdr.tablet_id(); if (UniqueId(hdr.load_id()) != UniqueId(_load_id)) { - Status st = Status::Error("invalid load id {}, expected {}", - UniqueId(hdr.load_id()).to_string(), - UniqueId(_load_id).to_string()); + Status st = Status::Error( + "invalid load id {}, expected {}", print_id(hdr.load_id()), print_id(_load_id)); _report_failure(id, st, hdr); return; } diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index ce249536e7..751b246b9c 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -359,8 +359,8 @@ void PInternalServiceImpl::open_load_stream(google::protobuf::RpcController* con brpc::Controller* cntl = static_cast(controller); brpc::StreamOptions stream_options; - LOG(INFO) << "open load stream, load_id = " << request->load_id() - << ", src_id = " << request->src_id(); + LOG(INFO) << "open load stream, load_id=" << request->load_id() + << ", src_id=" << request->src_id(); for (const auto& req : request->tablets()) { TabletManager* tablet_mgr = StorageEngine::instance()->tablet_manager(); diff --git a/be/src/vec/sink/delta_writer_v2_pool.cpp b/be/src/vec/sink/delta_writer_v2_pool.cpp index dc9a2765a5..df9c0fc1c8 100644 --- a/be/src/vec/sink/delta_writer_v2_pool.cpp +++ b/be/src/vec/sink/delta_writer_v2_pool.cpp @@ -41,13 +41,13 @@ DeltaWriterV2* DeltaWriterV2Map::get_or_create( Status DeltaWriterV2Map::close(RuntimeProfile* profile) { int num_use = --_use_cnt; if (num_use > 0) { - LOG(INFO) << "not closing DeltaWriterV2Map << " << _load_id << " , use_cnt = " << num_use; + LOG(INFO) << "keeping DeltaWriterV2Map, load_id=" << _load_id << " , use_cnt=" << num_use; return Status::OK(); } - LOG(INFO) << "closing DeltaWriterV2Map " << _load_id; if (_pool != nullptr) { _pool->erase(_load_id); } + LOG(INFO) << "closing DeltaWriterV2Map, load_id=" << _load_id; Status status = Status::OK(); _map.for_each([&status](auto& entry) { if (status.ok()) { @@ -57,6 +57,7 @@ Status DeltaWriterV2Map::close(RuntimeProfile* profile) { if (!status.ok()) { return status; } + LOG(INFO) << "close-waiting DeltaWriterV2Map, load_id=" << _load_id; _map.for_each([&status, profile](auto& entry) { if (status.ok()) { status = entry.second->close_wait(profile); @@ -67,7 +68,7 @@ Status DeltaWriterV2Map::close(RuntimeProfile* profile) { void DeltaWriterV2Map::cancel(Status status) { int num_use = --_use_cnt; - LOG(INFO) << "cancelling DeltaWriterV2Map " << _load_id << ", use_cnt = " << num_use; + LOG(INFO) << "cancelling DeltaWriterV2Map " << _load_id << ", use_cnt=" << num_use; if (num_use == 0 && _pool != nullptr) { _pool->erase(_load_id); } @@ -95,7 +96,7 @@ std::shared_ptr DeltaWriterV2Pool::get_or_create(PUniqueId loa void DeltaWriterV2Pool::erase(UniqueId load_id) { std::lock_guard lock(_mutex); - LOG(INFO) << "erasing DeltaWriterV2Map, load_id = " << load_id; + LOG(INFO) << "erasing DeltaWriterV2Map, load_id=" << load_id; _pool.erase(load_id); } diff --git a/be/src/vec/sink/load_stream_stub.cpp b/be/src/vec/sink/load_stream_stub.cpp index 8319def542..793098a3e9 100644 --- a/be/src/vec/sink/load_stream_stub.cpp +++ b/be/src/vec/sink/load_stream_stub.cpp @@ -23,6 +23,7 @@ #include "util/brpc_client_cache.h" #include "util/network_util.h" #include "util/thrift_util.h" +#include "util/uid_util.h" namespace doris { @@ -37,12 +38,15 @@ int LoadStreamStub::LoadStreamReplyHandler::on_received_messages(brpc::StreamId Status st = Status::create(response.status()); std::stringstream ss; - ss << "received response from backend " << _dst_id; + ss << "on_received_messages, load_id=" << _load_id << ", backend_id=" << _dst_id; if (response.success_tablet_ids_size() > 0) { ss << ", success tablet ids:"; for (auto tablet_id : response.success_tablet_ids()) { ss << " " << tablet_id; } + if (response.success_tablet_ids_size() == 0) { + ss << " none"; + } std::lock_guard lock(_success_tablets_mutex); for (auto tablet_id : response.success_tablet_ids()) { _success_tablets.push_back(tablet_id); @@ -53,6 +57,9 @@ int LoadStreamStub::LoadStreamReplyHandler::on_received_messages(brpc::StreamId for (auto tablet_id : response.failed_tablet_ids()) { ss << " " << tablet_id; } + if (response.failed_tablet_ids_size() == 0) { + ss << " none"; + } std::lock_guard lock(_failed_tablets_mutex); for (auto tablet_id : response.failed_tablet_ids()) { _failed_tablets.push_back(tablet_id); @@ -86,6 +93,7 @@ int LoadStreamStub::LoadStreamReplyHandler::on_received_messages(brpc::StreamId } void LoadStreamStub::LoadStreamReplyHandler::on_closed(brpc::StreamId id) { + LOG(INFO) << "on_closed, load_id=" << _load_id << ", stream_id=" << id; std::lock_guard lock(_mutex); _is_closed.store(true); _close_cv.notify_all(); @@ -123,6 +131,7 @@ Status LoadStreamStub::open(BrpcClientCache* client_cache, } _dst_id = node_info.id; _handler.set_dst_id(_dst_id); + _handler.set_load_id(_load_id); std::string host_port = get_host_port(node_info.host, node_info.brpc_port); brpc::StreamOptions opt; opt.max_buf_size = 20 << 20; // 20MB @@ -160,8 +169,8 @@ Status LoadStreamStub::open(BrpcClientCache* client_cache, return Status::InternalError("Failed to connect to backend {}: {}", _dst_id, cntl.ErrorText()); } - LOG(INFO) << "Opened stream " << _stream_id << " for backend " << _dst_id << " (" << host_port - << ")"; + LOG(INFO) << "open load stream " << _stream_id << " load_id=" << print_id(_load_id) + << " for backend " << _dst_id << " (" << host_port << ")"; _is_init.store(true); return Status::OK(); } @@ -227,12 +236,15 @@ Status LoadStreamStub::get_schema(const std::vector& tablets) { header.set_src_id(_src_id); header.set_opcode(doris::PStreamHeader::CLOSE_LOAD); std::ostringstream oss; - oss << "fetching tablet schema from stream " << _stream_id << ", load id: " << _load_id - << ", tablet id:"; + oss << "fetching tablet schema from stream " << _stream_id + << ", load id: " << print_id(_load_id) << ", tablet id:"; for (const auto& tablet : tablets) { *header.add_tablets() = tablet; oss << " " << tablet.tablet_id(); } + if (tablets.size() == 0) { + oss << " none"; + } LOG(INFO) << oss.str(); return _encode_and_send(header); } @@ -318,12 +330,12 @@ Status LoadStreamStub::_send_with_retry(butil::IOBuf& buf) { const timespec time = butil::seconds_from_now(60); int wait_ret = brpc::StreamWait(_stream_id, &time); if (wait_ret != 0) { - return Status::InternalError("StreamWait failed, err = ", wait_ret); + return Status::InternalError("StreamWait failed, err=", wait_ret); } break; } default: - return Status::InternalError("StreamWrite failed, err = {}", ret); + return Status::InternalError("StreamWrite failed, err={}", ret); } } } diff --git a/be/src/vec/sink/load_stream_stub.h b/be/src/vec/sink/load_stream_stub.h index 8a2c9dfce5..bbed172f8a 100644 --- a/be/src/vec/sink/load_stream_stub.h +++ b/be/src/vec/sink/load_stream_stub.h @@ -121,8 +121,10 @@ private: } void set_dst_id(int64_t dst_id) { _dst_id = dst_id; } + void set_load_id(PUniqueId load_id) { _load_id = UniqueId(load_id); } private: + UniqueId _load_id; // for logging int64_t _dst_id = -1; // for logging std::atomic _is_closed; bthread::Mutex _mutex; diff --git a/be/src/vec/sink/load_stream_stub_pool.cpp b/be/src/vec/sink/load_stream_stub_pool.cpp index ec7e53211f..240e44ef38 100644 --- a/be/src/vec/sink/load_stream_stub_pool.cpp +++ b/be/src/vec/sink/load_stream_stub_pool.cpp @@ -30,11 +30,11 @@ LoadStreams::LoadStreams(UniqueId load_id, int64_t dst_id, int num_use, LoadStre void LoadStreams::release() { int num_use = --_use_cnt; if (num_use == 0) { - LOG(INFO) << "releasing streams for load_id = " << _load_id << ", dst_id = " << _dst_id; + LOG(INFO) << "releasing streams, load_id=" << _load_id << ", dst_id=" << _dst_id; _pool->erase(_load_id, _dst_id); } else { - LOG(INFO) << "no releasing streams for load_id = " << _load_id << ", dst_id = " << _dst_id - << ", use_cnt = " << num_use; + LOG(INFO) << "keeping streams, load_id=" << _load_id << ", dst_id=" << _dst_id + << ", use_cnt=" << num_use; } } diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp b/be/src/vec/sink/writer/vtablet_writer_v2.cpp index 19ed6caf29..8bc65a4cba 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp +++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp @@ -187,11 +187,12 @@ Status VTabletWriterV2::_init(RuntimeState* state, RuntimeProfile* profile) { _stream_per_node = state->load_stream_per_node(); _total_streams = state->total_load_streams(); _num_local_sink = state->num_local_sink(); + LOG(INFO) << "init olap tablet sink, load_id: " << print_id(_load_id) + << ", num senders: " << _num_senders << ", stream per node: " << _stream_per_node + << ", total_streams " << _total_streams << ", num_local_sink: " << _num_local_sink; DCHECK(_stream_per_node > 0) << "load stream per node should be greator than 0"; DCHECK(_total_streams > 0) << "total load streams should be greator than 0"; DCHECK(_num_local_sink > 0) << "num local sink should be greator than 0"; - LOG(INFO) << "num senders: " << _num_senders << ", stream per node: " << _stream_per_node - << ", total_streams " << _total_streams << ", num_local_sink: " << _num_local_sink; _is_high_priority = (state->execution_timeout() <= config::load_task_high_priority_threshold_second);