cherry-pick #36300
This commit is contained in:
@ -30,6 +30,7 @@
|
||||
#include <memory>
|
||||
#include <sstream>
|
||||
|
||||
#include "bvar/bvar.h"
|
||||
#include "common/signal_handler.h"
|
||||
#include "exec/tablet_info.h"
|
||||
#include "gutil/ref_counted.h"
|
||||
@ -48,6 +49,7 @@
|
||||
|
||||
namespace doris {
|
||||
|
||||
bvar::Adder<int64_t> g_load_stream_cnt("load_stream_count");
|
||||
bvar::LatencyRecorder g_load_stream_flush_wait_ms("load_stream_flush_wait_ms");
|
||||
bvar::Adder<int> g_load_stream_flush_running_threads("load_stream_flush_wait_threads");
|
||||
|
||||
@ -330,6 +332,7 @@ Status IndexStream::close(const std::vector<PTabletID>& tablets_to_commit,
|
||||
// 2. There are some problems in _profile->to_thrift()
|
||||
LoadStream::LoadStream(PUniqueId load_id, LoadStreamMgr* load_stream_mgr, bool enable_profile)
|
||||
: _load_id(load_id), _enable_profile(false), _load_stream_mgr(load_stream_mgr) {
|
||||
g_load_stream_cnt << 1;
|
||||
_profile = std::make_unique<RuntimeProfile>("LoadStream");
|
||||
_append_data_timer = ADD_TIMER(_profile, "AppendDataTime");
|
||||
_close_wait_timer = ADD_TIMER(_profile, "CloseWaitTime");
|
||||
@ -354,6 +357,7 @@ LoadStream::LoadStream(PUniqueId load_id, LoadStreamMgr* load_stream_mgr, bool e
|
||||
}
|
||||
|
||||
LoadStream::~LoadStream() {
|
||||
g_load_stream_cnt << -1;
|
||||
LOG(INFO) << "load stream is deconstructed " << *this;
|
||||
}
|
||||
|
||||
|
||||
@ -28,6 +28,7 @@
|
||||
#include <string>
|
||||
#include <utility>
|
||||
|
||||
#include "bvar/bvar.h"
|
||||
#include "common/compiler_util.h" // IWYU pragma: keep
|
||||
#include "common/config.h"
|
||||
#include "common/logging.h"
|
||||
@ -69,14 +70,20 @@
|
||||
namespace doris {
|
||||
using namespace ErrorCode;
|
||||
|
||||
bvar::Adder<int64_t> g_load_stream_writer_cnt("load_stream_writer_count");
|
||||
bvar::Adder<int64_t> g_load_stream_file_writer_cnt("load_stream_file_writer_count");
|
||||
|
||||
LoadStreamWriter::LoadStreamWriter(WriteRequest* context, RuntimeProfile* profile)
|
||||
: _req(*context), _rowset_writer(nullptr) {
|
||||
_rowset_builder =
|
||||
std::make_unique<RowsetBuilder>(*StorageEngine::instance(), *context, profile);
|
||||
_query_thread_context.init(); // from load stream
|
||||
g_load_stream_writer_cnt << 1;
|
||||
}
|
||||
|
||||
LoadStreamWriter::~LoadStreamWriter() = default;
|
||||
LoadStreamWriter::~LoadStreamWriter() {
|
||||
g_load_stream_writer_cnt << -1;
|
||||
}
|
||||
|
||||
Status LoadStreamWriter::init() {
|
||||
RETURN_IF_ERROR(_rowset_builder->init());
|
||||
@ -101,6 +108,7 @@ Status LoadStreamWriter::append_data(uint32_t segid, uint64_t offset, butil::IOB
|
||||
return st;
|
||||
}
|
||||
_segment_file_writers.push_back(std::move(file_writer));
|
||||
g_load_stream_file_writer_cnt << 1;
|
||||
}
|
||||
}
|
||||
|
||||
@ -145,6 +153,7 @@ Status LoadStreamWriter::close_segment(uint32_t segid) {
|
||||
_is_canceled = true;
|
||||
return st;
|
||||
}
|
||||
g_load_stream_file_writer_cnt << -1;
|
||||
LOG(INFO) << "segment " << segid << " path " << file_writer->path().native()
|
||||
<< "closed, written " << file_writer->bytes_appended() << " bytes";
|
||||
if (file_writer->bytes_appended() == 0) {
|
||||
|
||||
Reference in New Issue
Block a user