diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp index 266a4b9718..87898d95a4 100644 --- a/be/src/runtime/load_stream.cpp +++ b/be/src/runtime/load_stream.cpp @@ -30,6 +30,7 @@ #include #include +#include "bvar/bvar.h" #include "common/signal_handler.h" #include "exec/tablet_info.h" #include "gutil/ref_counted.h" @@ -48,6 +49,7 @@ namespace doris { +bvar::Adder g_load_stream_cnt("load_stream_count"); bvar::LatencyRecorder g_load_stream_flush_wait_ms("load_stream_flush_wait_ms"); bvar::Adder g_load_stream_flush_running_threads("load_stream_flush_wait_threads"); @@ -330,6 +332,7 @@ Status IndexStream::close(const std::vector& tablets_to_commit, // 2. There are some problems in _profile->to_thrift() LoadStream::LoadStream(PUniqueId load_id, LoadStreamMgr* load_stream_mgr, bool enable_profile) : _load_id(load_id), _enable_profile(false), _load_stream_mgr(load_stream_mgr) { + g_load_stream_cnt << 1; _profile = std::make_unique("LoadStream"); _append_data_timer = ADD_TIMER(_profile, "AppendDataTime"); _close_wait_timer = ADD_TIMER(_profile, "CloseWaitTime"); @@ -354,6 +357,7 @@ LoadStream::LoadStream(PUniqueId load_id, LoadStreamMgr* load_stream_mgr, bool e } LoadStream::~LoadStream() { + g_load_stream_cnt << -1; LOG(INFO) << "load stream is deconstructed " << *this; } diff --git a/be/src/runtime/load_stream_writer.cpp b/be/src/runtime/load_stream_writer.cpp index 90de07556e..79f2482d16 100644 --- a/be/src/runtime/load_stream_writer.cpp +++ b/be/src/runtime/load_stream_writer.cpp @@ -28,6 +28,7 @@ #include #include +#include "bvar/bvar.h" #include "common/compiler_util.h" // IWYU pragma: keep #include "common/config.h" #include "common/logging.h" @@ -69,14 +70,20 @@ namespace doris { using namespace ErrorCode; +bvar::Adder g_load_stream_writer_cnt("load_stream_writer_count"); +bvar::Adder g_load_stream_file_writer_cnt("load_stream_file_writer_count"); + LoadStreamWriter::LoadStreamWriter(WriteRequest* context, RuntimeProfile* profile) : _req(*context), _rowset_writer(nullptr) { _rowset_builder = std::make_unique(*StorageEngine::instance(), *context, profile); _query_thread_context.init(); // from load stream + g_load_stream_writer_cnt << 1; } -LoadStreamWriter::~LoadStreamWriter() = default; +LoadStreamWriter::~LoadStreamWriter() { + g_load_stream_writer_cnt << -1; +} Status LoadStreamWriter::init() { RETURN_IF_ERROR(_rowset_builder->init()); @@ -101,6 +108,7 @@ Status LoadStreamWriter::append_data(uint32_t segid, uint64_t offset, butil::IOB return st; } _segment_file_writers.push_back(std::move(file_writer)); + g_load_stream_file_writer_cnt << 1; } } @@ -145,6 +153,7 @@ Status LoadStreamWriter::close_segment(uint32_t segid) { _is_canceled = true; return st; } + g_load_stream_file_writer_cnt << -1; LOG(INFO) << "segment " << segid << " path " << file_writer->path().native() << "closed, written " << file_writer->bytes_appended() << " bytes"; if (file_writer->bytes_appended() == 0) {