diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp index 037d4764f5..266a4b9718 100644 --- a/be/src/runtime/load_stream.cpp +++ b/be/src/runtime/load_stream.cpp @@ -28,6 +28,7 @@ #include #include +#include #include "common/signal_handler.h" #include "exec/tablet_info.h" @@ -124,8 +125,8 @@ Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data for (size_t index = origin_size; index <= segid; index++) { mapping->at(index) = _next_segid; _next_segid++; - LOG(INFO) << "src_id=" << src_id << ", segid=" << index << " to " - << " segid=" << _next_segid - 1; + VLOG_DEBUG << "src_id=" << src_id << ", segid=" << index << " to " + << " segid=" << _next_segid - 1 << ", " << *this; } } } @@ -383,7 +384,7 @@ Status LoadStream::close(int64_t src_id, const std::vector& tablets_t } _close_load_cnt++; LOG(INFO) << "received CLOSE_LOAD from sender " << src_id << ", remaining " - << _total_streams - _close_load_cnt << " senders"; + << _total_streams - _close_load_cnt << " senders, " << *this; _tablets_to_commit.insert(_tablets_to_commit.end(), tablets_to_commit.begin(), tablets_to_commit.end()); @@ -432,14 +433,14 @@ void LoadStream::_report_result(StreamId stream, const Status& status, if (st.ok()) { response.set_load_stream_profile(buf, len); } else { - LOG(WARNING) << "load channel TRuntimeProfileTree serialize failed, errmsg=" << st; + LOG(WARNING) << "TRuntimeProfileTree serialize failed, errmsg=" << st << ", " << *this; } } buf.append(response.SerializeAsString()); auto wst = _write_stream(stream, buf); if (!wst.ok()) { - LOG(WARNING) << *this << " report result failed with " << wst; + LOG(WARNING) << " report result failed with " << wst << ", " << *this; } } @@ -464,7 +465,7 @@ void LoadStream::_report_schema(StreamId stream, const PStreamHeader& hdr) { buf.append(response.SerializeAsString()); auto wst = _write_stream(stream, buf); if (!wst.ok()) { - LOG(WARNING) << *this << " report result failed with " << wst; + LOG(WARNING) << " report result failed with " << wst << ", " << *this; } } @@ -592,26 +593,31 @@ void LoadStream::_dispatch(StreamId id, const PStreamHeader& hdr, butil::IOBuf* _report_schema(id, hdr); } break; default: - LOG(WARNING) << "unexpected stream message " << hdr.opcode(); + LOG(WARNING) << "unexpected stream message " << hdr.opcode() << ", " << *this; DCHECK(false); } } void LoadStream::on_idle_timeout(StreamId id) { - LOG(WARNING) << "closing load stream on idle timeout, load_id=" << print_id(_load_id); + LOG(WARNING) << "closing load stream on idle timeout, " << *this; brpc::StreamClose(id); } void LoadStream::on_closed(StreamId id) { + // `this` may be freed by other threads after increasing `_close_rpc_cnt`, + // format string first to prevent use-after-free + std::stringstream ss; + ss << *this; auto remaining_streams = _total_streams - _close_rpc_cnt.fetch_add(1) - 1; - LOG(INFO) << "stream " << id << " on_closed, remaining streams = " << remaining_streams; + LOG(INFO) << "stream " << id << " on_closed, remaining streams = " << remaining_streams << ", " + << ss.str(); if (remaining_streams == 0) { _load_stream_mgr->clear_load(_load_id); } } inline std::ostream& operator<<(std::ostream& ostr, const LoadStream& load_stream) { - ostr << "load_id=" << UniqueId(load_stream._load_id) << ", txn_id=" << load_stream._txn_id; + ostr << "load_id=" << print_id(load_stream._load_id) << ", txn_id=" << load_stream._txn_id; return ostr; } diff --git a/be/src/vec/sink/load_stream_stub.cpp b/be/src/vec/sink/load_stream_stub.cpp index 414e253c15..6d661a4c88 100644 --- a/be/src/vec/sink/load_stream_stub.cpp +++ b/be/src/vec/sink/load_stream_stub.cpp @@ -305,6 +305,9 @@ Status LoadStreamStub::close_wait(RuntimeState* state, int64_t timeout_ms) { while (!_is_closed.load() && !state->get_query_ctx()->is_cancelled()) { //the query maybe cancel, so need check after wait 1s timeout_sec = timeout_sec - 1; + LOG(INFO) << "close waiting, " << *this << ", timeout_sec=" << timeout_sec + << ", is_closed=" << _is_closed.load() + << ", is_cancelled=" << state->get_query_ctx()->is_cancelled(); int ret = _close_cv.wait_for(lock, 1000000); if (ret != 0 && timeout_sec <= 0) { return Status::InternalError( diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp b/be/src/vec/sink/writer/vtablet_writer_v2.cpp index af1caefa43..f05400fc6e 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp +++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp @@ -246,6 +246,8 @@ Status VTabletWriterV2::_init(RuntimeState* state, RuntimeProfile* profile) { Status VTabletWriterV2::open(RuntimeState* state, RuntimeProfile* profile) { RETURN_IF_ERROR(_init(state, profile)); + LOG(INFO) << "opening olap table sink, load_id=" << print_id(_load_id) << ", txn_id=" << _txn_id + << ", sink_id=" << _sender_id; _timeout_watch.start(); SCOPED_TIMER(_profile->total_time_counter()); SCOPED_TIMER(_open_timer); @@ -462,7 +464,8 @@ Status VTabletWriterV2::_write_memtable(std::shared_ptr block Status VTabletWriterV2::_cancel(Status status) { LOG(INFO) << "canceled olap table sink. load_id=" << print_id(_load_id) - << ", txn_id=" << _txn_id << ", due to error: " << status; + << ", txn_id=" << _txn_id << ", sink_id=" << _sender_id + << ", due to error: " << status; if (_delta_writer_for_tablet) { _delta_writer_for_tablet->cancel(status); _delta_writer_for_tablet.reset(); @@ -503,6 +506,8 @@ Status VTabletWriterV2::close(Status exec_status) { if (_is_closed) { return _close_status; } + LOG(INFO) << "closing olap table sink, load_id=" << print_id(_load_id) << ", txn_id=" << _txn_id + << ", sink_id=" << _sender_id << ", status=" << exec_status.to_string(); SCOPED_TIMER(_close_timer); Status status = exec_status; @@ -625,6 +630,8 @@ Status VTabletWriterV2::close(Status exec_status) { } void VTabletWriterV2::_calc_tablets_to_commit() { + LOG(INFO) << "saving close load info, load_id=" << print_id(_load_id) << ", txn_id=" << _txn_id + << ", sink_id=" << _sender_id; for (const auto& [dst_id, tablets] : _tablets_for_node) { std::vector tablets_to_commit; std::vector partition_ids;