[improve](move-memtable) improve logging messages (#27443)
This commit is contained in:
@ -21,6 +21,7 @@
|
||||
|
||||
#include "olap/olap_common.h"
|
||||
#include "olap/rowset/beta_rowset_writer.h"
|
||||
#include "util/uid_util.h"
|
||||
#include "vec/sink/load_stream_stub.h"
|
||||
|
||||
namespace doris {
|
||||
@ -45,9 +46,9 @@ Status StreamSinkFileWriter::appendv(const Slice* data, size_t data_cnt) {
|
||||
}
|
||||
_bytes_appended += bytes_req;
|
||||
|
||||
VLOG_DEBUG << "writer appendv, load_id: " << UniqueId(_load_id).to_string()
|
||||
<< ", index_id: " << _index_id << ", tablet_id: " << _tablet_id
|
||||
<< ", segment_id: " << _segment_id << ", data_length: " << bytes_req;
|
||||
VLOG_DEBUG << "writer appendv, load_id: " << print_id(_load_id) << ", index_id: " << _index_id
|
||||
<< ", tablet_id: " << _tablet_id << ", segment_id: " << _segment_id
|
||||
<< ", data_length: " << bytes_req;
|
||||
|
||||
std::span<const Slice> slices {data, data_cnt};
|
||||
for (auto& stream : _streams) {
|
||||
@ -58,9 +59,8 @@ Status StreamSinkFileWriter::appendv(const Slice* data, size_t data_cnt) {
|
||||
}
|
||||
|
||||
Status StreamSinkFileWriter::finalize() {
|
||||
VLOG_DEBUG << "writer finalize, load_id: " << UniqueId(_load_id).to_string()
|
||||
<< ", index_id: " << _index_id << ", tablet_id: " << _tablet_id
|
||||
<< ", segment_id: " << _segment_id;
|
||||
VLOG_DEBUG << "writer finalize, load_id: " << print_id(_load_id) << ", index_id: " << _index_id
|
||||
<< ", tablet_id: " << _tablet_id << ", segment_id: " << _segment_id;
|
||||
// TODO(zhengyu): update get_inverted_index_file_size into stat
|
||||
for (auto& stream : _streams) {
|
||||
RETURN_IF_ERROR(
|
||||
|
||||
@ -306,8 +306,8 @@ Status LoadStream::close(int64_t src_id, const std::vector<PTabletID>& tablets_t
|
||||
}
|
||||
}
|
||||
LOG(INFO) << "close load " << *this
|
||||
<< ", failed_tablet_num=" << failed_tablet_ids->size()
|
||||
<< ", success_tablet_num=" << success_tablet_ids->size();
|
||||
<< ", success_tablet_num=" << success_tablet_ids->size()
|
||||
<< ", failed_tablet_num=" << failed_tablet_ids->size();
|
||||
std::unique_lock<bthread::Mutex> lock(mutex);
|
||||
cond.notify_one();
|
||||
});
|
||||
@ -324,7 +324,7 @@ Status LoadStream::close(int64_t src_id, const std::vector<PTabletID>& tablets_t
|
||||
void LoadStream::_report_result(StreamId stream, const Status& st,
|
||||
const std::vector<int64_t>& success_tablet_ids,
|
||||
const std::vector<int64_t>& failed_tablet_ids) {
|
||||
LOG(INFO) << "report result, success tablet num " << success_tablet_ids.size()
|
||||
LOG(INFO) << "report result " << *this << ", success tablet num " << success_tablet_ids.size()
|
||||
<< ", failed tablet num " << failed_tablet_ids.size();
|
||||
butil::IOBuf buf;
|
||||
PWriteStreamSinkResponse response;
|
||||
@ -456,9 +456,8 @@ void LoadStream::_dispatch(StreamId id, const PStreamHeader& hdr, butil::IOBuf*
|
||||
VLOG_DEBUG << PStreamHeader_Opcode_Name(hdr.opcode()) << " from " << hdr.src_id()
|
||||
<< " with tablet " << hdr.tablet_id();
|
||||
if (UniqueId(hdr.load_id()) != UniqueId(_load_id)) {
|
||||
Status st = Status::Error<ErrorCode::INVALID_ARGUMENT>("invalid load id {}, expected {}",
|
||||
UniqueId(hdr.load_id()).to_string(),
|
||||
UniqueId(_load_id).to_string());
|
||||
Status st = Status::Error<ErrorCode::INVALID_ARGUMENT>(
|
||||
"invalid load id {}, expected {}", print_id(hdr.load_id()), print_id(_load_id));
|
||||
_report_failure(id, st, hdr);
|
||||
return;
|
||||
}
|
||||
|
||||
@ -359,8 +359,8 @@ void PInternalServiceImpl::open_load_stream(google::protobuf::RpcController* con
|
||||
brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
|
||||
brpc::StreamOptions stream_options;
|
||||
|
||||
LOG(INFO) << "open load stream, load_id = " << request->load_id()
|
||||
<< ", src_id = " << request->src_id();
|
||||
LOG(INFO) << "open load stream, load_id=" << request->load_id()
|
||||
<< ", src_id=" << request->src_id();
|
||||
|
||||
for (const auto& req : request->tablets()) {
|
||||
TabletManager* tablet_mgr = StorageEngine::instance()->tablet_manager();
|
||||
|
||||
@ -41,13 +41,13 @@ DeltaWriterV2* DeltaWriterV2Map::get_or_create(
|
||||
Status DeltaWriterV2Map::close(RuntimeProfile* profile) {
|
||||
int num_use = --_use_cnt;
|
||||
if (num_use > 0) {
|
||||
LOG(INFO) << "not closing DeltaWriterV2Map << " << _load_id << " , use_cnt = " << num_use;
|
||||
LOG(INFO) << "keeping DeltaWriterV2Map, load_id=" << _load_id << " , use_cnt=" << num_use;
|
||||
return Status::OK();
|
||||
}
|
||||
LOG(INFO) << "closing DeltaWriterV2Map " << _load_id;
|
||||
if (_pool != nullptr) {
|
||||
_pool->erase(_load_id);
|
||||
}
|
||||
LOG(INFO) << "closing DeltaWriterV2Map, load_id=" << _load_id;
|
||||
Status status = Status::OK();
|
||||
_map.for_each([&status](auto& entry) {
|
||||
if (status.ok()) {
|
||||
@ -57,6 +57,7 @@ Status DeltaWriterV2Map::close(RuntimeProfile* profile) {
|
||||
if (!status.ok()) {
|
||||
return status;
|
||||
}
|
||||
LOG(INFO) << "close-waiting DeltaWriterV2Map, load_id=" << _load_id;
|
||||
_map.for_each([&status, profile](auto& entry) {
|
||||
if (status.ok()) {
|
||||
status = entry.second->close_wait(profile);
|
||||
@ -67,7 +68,7 @@ Status DeltaWriterV2Map::close(RuntimeProfile* profile) {
|
||||
|
||||
void DeltaWriterV2Map::cancel(Status status) {
|
||||
int num_use = --_use_cnt;
|
||||
LOG(INFO) << "cancelling DeltaWriterV2Map " << _load_id << ", use_cnt = " << num_use;
|
||||
LOG(INFO) << "cancelling DeltaWriterV2Map " << _load_id << ", use_cnt=" << num_use;
|
||||
if (num_use == 0 && _pool != nullptr) {
|
||||
_pool->erase(_load_id);
|
||||
}
|
||||
@ -95,7 +96,7 @@ std::shared_ptr<DeltaWriterV2Map> DeltaWriterV2Pool::get_or_create(PUniqueId loa
|
||||
|
||||
void DeltaWriterV2Pool::erase(UniqueId load_id) {
|
||||
std::lock_guard<std::mutex> lock(_mutex);
|
||||
LOG(INFO) << "erasing DeltaWriterV2Map, load_id = " << load_id;
|
||||
LOG(INFO) << "erasing DeltaWriterV2Map, load_id=" << load_id;
|
||||
_pool.erase(load_id);
|
||||
}
|
||||
|
||||
|
||||
@ -23,6 +23,7 @@
|
||||
#include "util/brpc_client_cache.h"
|
||||
#include "util/network_util.h"
|
||||
#include "util/thrift_util.h"
|
||||
#include "util/uid_util.h"
|
||||
|
||||
namespace doris {
|
||||
|
||||
@ -37,12 +38,15 @@ int LoadStreamStub::LoadStreamReplyHandler::on_received_messages(brpc::StreamId
|
||||
Status st = Status::create(response.status());
|
||||
|
||||
std::stringstream ss;
|
||||
ss << "received response from backend " << _dst_id;
|
||||
ss << "on_received_messages, load_id=" << _load_id << ", backend_id=" << _dst_id;
|
||||
if (response.success_tablet_ids_size() > 0) {
|
||||
ss << ", success tablet ids:";
|
||||
for (auto tablet_id : response.success_tablet_ids()) {
|
||||
ss << " " << tablet_id;
|
||||
}
|
||||
if (response.success_tablet_ids_size() == 0) {
|
||||
ss << " none";
|
||||
}
|
||||
std::lock_guard<bthread::Mutex> lock(_success_tablets_mutex);
|
||||
for (auto tablet_id : response.success_tablet_ids()) {
|
||||
_success_tablets.push_back(tablet_id);
|
||||
@ -53,6 +57,9 @@ int LoadStreamStub::LoadStreamReplyHandler::on_received_messages(brpc::StreamId
|
||||
for (auto tablet_id : response.failed_tablet_ids()) {
|
||||
ss << " " << tablet_id;
|
||||
}
|
||||
if (response.failed_tablet_ids_size() == 0) {
|
||||
ss << " none";
|
||||
}
|
||||
std::lock_guard<bthread::Mutex> lock(_failed_tablets_mutex);
|
||||
for (auto tablet_id : response.failed_tablet_ids()) {
|
||||
_failed_tablets.push_back(tablet_id);
|
||||
@ -86,6 +93,7 @@ int LoadStreamStub::LoadStreamReplyHandler::on_received_messages(brpc::StreamId
|
||||
}
|
||||
|
||||
void LoadStreamStub::LoadStreamReplyHandler::on_closed(brpc::StreamId id) {
|
||||
LOG(INFO) << "on_closed, load_id=" << _load_id << ", stream_id=" << id;
|
||||
std::lock_guard<bthread::Mutex> lock(_mutex);
|
||||
_is_closed.store(true);
|
||||
_close_cv.notify_all();
|
||||
@ -123,6 +131,7 @@ Status LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
|
||||
}
|
||||
_dst_id = node_info.id;
|
||||
_handler.set_dst_id(_dst_id);
|
||||
_handler.set_load_id(_load_id);
|
||||
std::string host_port = get_host_port(node_info.host, node_info.brpc_port);
|
||||
brpc::StreamOptions opt;
|
||||
opt.max_buf_size = 20 << 20; // 20MB
|
||||
@ -160,8 +169,8 @@ Status LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
|
||||
return Status::InternalError("Failed to connect to backend {}: {}", _dst_id,
|
||||
cntl.ErrorText());
|
||||
}
|
||||
LOG(INFO) << "Opened stream " << _stream_id << " for backend " << _dst_id << " (" << host_port
|
||||
<< ")";
|
||||
LOG(INFO) << "open load stream " << _stream_id << " load_id=" << print_id(_load_id)
|
||||
<< " for backend " << _dst_id << " (" << host_port << ")";
|
||||
_is_init.store(true);
|
||||
return Status::OK();
|
||||
}
|
||||
@ -227,12 +236,15 @@ Status LoadStreamStub::get_schema(const std::vector<PTabletID>& tablets) {
|
||||
header.set_src_id(_src_id);
|
||||
header.set_opcode(doris::PStreamHeader::CLOSE_LOAD);
|
||||
std::ostringstream oss;
|
||||
oss << "fetching tablet schema from stream " << _stream_id << ", load id: " << _load_id
|
||||
<< ", tablet id:";
|
||||
oss << "fetching tablet schema from stream " << _stream_id
|
||||
<< ", load id: " << print_id(_load_id) << ", tablet id:";
|
||||
for (const auto& tablet : tablets) {
|
||||
*header.add_tablets() = tablet;
|
||||
oss << " " << tablet.tablet_id();
|
||||
}
|
||||
if (tablets.size() == 0) {
|
||||
oss << " none";
|
||||
}
|
||||
LOG(INFO) << oss.str();
|
||||
return _encode_and_send(header);
|
||||
}
|
||||
@ -318,12 +330,12 @@ Status LoadStreamStub::_send_with_retry(butil::IOBuf& buf) {
|
||||
const timespec time = butil::seconds_from_now(60);
|
||||
int wait_ret = brpc::StreamWait(_stream_id, &time);
|
||||
if (wait_ret != 0) {
|
||||
return Status::InternalError("StreamWait failed, err = ", wait_ret);
|
||||
return Status::InternalError("StreamWait failed, err=", wait_ret);
|
||||
}
|
||||
break;
|
||||
}
|
||||
default:
|
||||
return Status::InternalError("StreamWrite failed, err = {}", ret);
|
||||
return Status::InternalError("StreamWrite failed, err={}", ret);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -121,8 +121,10 @@ private:
|
||||
}
|
||||
|
||||
void set_dst_id(int64_t dst_id) { _dst_id = dst_id; }
|
||||
void set_load_id(PUniqueId load_id) { _load_id = UniqueId(load_id); }
|
||||
|
||||
private:
|
||||
UniqueId _load_id; // for logging
|
||||
int64_t _dst_id = -1; // for logging
|
||||
std::atomic<bool> _is_closed;
|
||||
bthread::Mutex _mutex;
|
||||
|
||||
@ -30,11 +30,11 @@ LoadStreams::LoadStreams(UniqueId load_id, int64_t dst_id, int num_use, LoadStre
|
||||
void LoadStreams::release() {
|
||||
int num_use = --_use_cnt;
|
||||
if (num_use == 0) {
|
||||
LOG(INFO) << "releasing streams for load_id = " << _load_id << ", dst_id = " << _dst_id;
|
||||
LOG(INFO) << "releasing streams, load_id=" << _load_id << ", dst_id=" << _dst_id;
|
||||
_pool->erase(_load_id, _dst_id);
|
||||
} else {
|
||||
LOG(INFO) << "no releasing streams for load_id = " << _load_id << ", dst_id = " << _dst_id
|
||||
<< ", use_cnt = " << num_use;
|
||||
LOG(INFO) << "keeping streams, load_id=" << _load_id << ", dst_id=" << _dst_id
|
||||
<< ", use_cnt=" << num_use;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -187,11 +187,12 @@ Status VTabletWriterV2::_init(RuntimeState* state, RuntimeProfile* profile) {
|
||||
_stream_per_node = state->load_stream_per_node();
|
||||
_total_streams = state->total_load_streams();
|
||||
_num_local_sink = state->num_local_sink();
|
||||
LOG(INFO) << "init olap tablet sink, load_id: " << print_id(_load_id)
|
||||
<< ", num senders: " << _num_senders << ", stream per node: " << _stream_per_node
|
||||
<< ", total_streams " << _total_streams << ", num_local_sink: " << _num_local_sink;
|
||||
DCHECK(_stream_per_node > 0) << "load stream per node should be greator than 0";
|
||||
DCHECK(_total_streams > 0) << "total load streams should be greator than 0";
|
||||
DCHECK(_num_local_sink > 0) << "num local sink should be greator than 0";
|
||||
LOG(INFO) << "num senders: " << _num_senders << ", stream per node: " << _stream_per_node
|
||||
<< ", total_streams " << _total_streams << ", num_local_sink: " << _num_local_sink;
|
||||
_is_high_priority =
|
||||
(state->execution_timeout() <= config::load_task_high_priority_threshold_second);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user