[improve](move-memtable) avoid using heavy work pool during append data (#28745)
This commit is contained in:
@ -45,7 +45,11 @@ namespace doris {
|
||||
|
||||
TabletStream::TabletStream(PUniqueId load_id, int64_t id, int64_t txn_id,
|
||||
LoadStreamMgr* load_stream_mgr, RuntimeProfile* profile)
|
||||
: _id(id), _next_segid(0), _load_id(load_id), _txn_id(txn_id) {
|
||||
: _id(id),
|
||||
_next_segid(0),
|
||||
_load_id(load_id),
|
||||
_txn_id(txn_id),
|
||||
_load_stream_mgr(load_stream_mgr) {
|
||||
load_stream_mgr->create_tokens(_flush_tokens);
|
||||
_failed_st = std::make_shared<Status>();
|
||||
_profile = profile->create_child(fmt::format("TabletStream {}", id), true, true);
|
||||
@ -125,6 +129,7 @@ Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data
|
||||
DCHECK(new_segid != std::numeric_limits<uint32_t>::max());
|
||||
butil::IOBuf buf = data->movable();
|
||||
auto flush_func = [this, new_segid, eos, buf, header]() {
|
||||
signal::set_signal_task_id(_load_id);
|
||||
auto st = _load_stream_writer->append_data(new_segid, header.offset(), buf);
|
||||
if (eos && st.ok()) {
|
||||
st = _load_stream_writer->close_segment(new_segid);
|
||||
@ -166,6 +171,7 @@ Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf* data
|
||||
DCHECK(new_segid != std::numeric_limits<uint32_t>::max());
|
||||
|
||||
auto add_segment_func = [this, new_segid, stat, flush_schema]() {
|
||||
signal::set_signal_task_id(_load_id);
|
||||
auto st = _load_stream_writer->add_segment(new_segid, stat, flush_schema);
|
||||
if (!st.ok() && _failed_st->ok()) {
|
||||
_failed_st = std::make_shared<Status>(st);
|
||||
@ -181,13 +187,44 @@ Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf* data
|
||||
|
||||
Status TabletStream::close() {
|
||||
SCOPED_TIMER(_close_wait_timer);
|
||||
for (auto& token : _flush_tokens) {
|
||||
token->wait();
|
||||
bthread::Mutex mu;
|
||||
std::unique_lock<bthread::Mutex> lock(mu);
|
||||
bthread::ConditionVariable cv;
|
||||
auto wait_func = [this, &mu, &cv] {
|
||||
signal::set_signal_task_id(_load_id);
|
||||
for (auto& token : _flush_tokens) {
|
||||
token->wait();
|
||||
}
|
||||
std::lock_guard<bthread::Mutex> lock(mu);
|
||||
cv.notify_one();
|
||||
};
|
||||
bool ret = _load_stream_mgr->heavy_work_pool()->try_offer(wait_func);
|
||||
if (ret) {
|
||||
cv.wait(lock);
|
||||
} else {
|
||||
return Status::Error<ErrorCode::INTERNAL_ERROR>(
|
||||
"there is not enough thread resource for close load");
|
||||
}
|
||||
|
||||
if (!_failed_st->ok()) {
|
||||
return *_failed_st;
|
||||
}
|
||||
return _load_stream_writer->close();
|
||||
|
||||
Status st = Status::OK();
|
||||
auto close_func = [this, &mu, &cv, &st]() {
|
||||
signal::set_signal_task_id(_load_id);
|
||||
st = _load_stream_writer->close();
|
||||
std::lock_guard<bthread::Mutex> lock(mu);
|
||||
cv.notify_one();
|
||||
};
|
||||
ret = _load_stream_mgr->heavy_work_pool()->try_offer(close_func);
|
||||
if (ret) {
|
||||
cv.wait(lock);
|
||||
} else {
|
||||
return Status::Error<ErrorCode::INTERNAL_ERROR>(
|
||||
"there is not enough thread resource for close load");
|
||||
}
|
||||
return st;
|
||||
}
|
||||
|
||||
IndexStream::IndexStream(PUniqueId load_id, int64_t id, int64_t txn_id,
|
||||
@ -244,13 +281,13 @@ Status IndexStream::close(const std::vector<PTabletID>& tablets_to_commit,
|
||||
}
|
||||
}
|
||||
|
||||
for (auto& it : _tablet_streams_map) {
|
||||
auto st = it.second->close();
|
||||
for (auto& [_, tablet_stream] : _tablet_streams_map) {
|
||||
auto st = tablet_stream->close();
|
||||
if (st.ok()) {
|
||||
success_tablet_ids->push_back(it.second->id());
|
||||
success_tablet_ids->push_back(tablet_stream->id());
|
||||
} else {
|
||||
LOG(INFO) << "close tablet stream " << *it.second << ", status=" << st;
|
||||
failed_tablet_ids->push_back(it.second->id());
|
||||
LOG(INFO) << "close tablet stream " << *tablet_stream << ", status=" << st;
|
||||
failed_tablet_ids->push_back(tablet_stream->id());
|
||||
}
|
||||
}
|
||||
return Status::OK();
|
||||
@ -308,37 +345,13 @@ Status LoadStream::close(int64_t src_id, const std::vector<PTabletID>& tablets_t
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status st = Status::OK();
|
||||
{
|
||||
bthread::Mutex mutex;
|
||||
std::unique_lock<bthread::Mutex> lock(mutex);
|
||||
bthread::ConditionVariable cond;
|
||||
bool ret = _load_stream_mgr->heavy_work_pool()->try_offer(
|
||||
[this, &success_tablet_ids, &failed_tablet_ids, &mutex, &cond, &st]() {
|
||||
signal::set_signal_task_id(_load_id);
|
||||
for (auto& it : _index_streams_map) {
|
||||
st = it.second->close(_tablets_to_commit, success_tablet_ids,
|
||||
failed_tablet_ids);
|
||||
if (!st.ok()) {
|
||||
std::unique_lock<bthread::Mutex> lock(mutex);
|
||||
cond.notify_one();
|
||||
return;
|
||||
}
|
||||
}
|
||||
LOG(INFO) << "close load " << *this
|
||||
<< ", success_tablet_num=" << success_tablet_ids->size()
|
||||
<< ", failed_tablet_num=" << failed_tablet_ids->size();
|
||||
std::unique_lock<bthread::Mutex> lock(mutex);
|
||||
cond.notify_one();
|
||||
});
|
||||
if (ret) {
|
||||
cond.wait(lock);
|
||||
} else {
|
||||
return Status::Error<ErrorCode::INTERNAL_ERROR>(
|
||||
"there is not enough thread resource for close load");
|
||||
}
|
||||
for (auto& [_, index_stream] : _index_streams_map) {
|
||||
RETURN_IF_ERROR(
|
||||
index_stream->close(_tablets_to_commit, success_tablet_ids, failed_tablet_ids));
|
||||
}
|
||||
return st;
|
||||
LOG(INFO) << "close load " << *this << ", success_tablet_num=" << success_tablet_ids->size()
|
||||
<< ", failed_tablet_num=" << failed_tablet_ids->size();
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
void LoadStream::_report_result(StreamId stream, const Status& st,
|
||||
@ -424,26 +437,7 @@ Status LoadStream::_append_data(const PStreamHeader& header, butil::IOBuf* data)
|
||||
index_stream = it->second;
|
||||
}
|
||||
|
||||
Status st = Status::OK();
|
||||
{
|
||||
bthread::Mutex mutex;
|
||||
std::unique_lock<bthread::Mutex> lock(mutex);
|
||||
bthread::ConditionVariable cond;
|
||||
bool ret = _load_stream_mgr->heavy_work_pool()->try_offer(
|
||||
[this, &index_stream, &header, &data, &mutex, &cond, &st] {
|
||||
signal::set_signal_task_id(_load_id);
|
||||
st = index_stream->append_data(header, data);
|
||||
std::unique_lock<bthread::Mutex> lock(mutex);
|
||||
cond.notify_one();
|
||||
});
|
||||
if (ret) {
|
||||
cond.wait(lock);
|
||||
} else {
|
||||
return Status::Error<ErrorCode::INTERNAL_ERROR>(
|
||||
"there is not enough thread resource for append data");
|
||||
}
|
||||
}
|
||||
return st;
|
||||
return index_stream->append_data(header, data);
|
||||
}
|
||||
|
||||
int LoadStream::on_received_messages(StreamId id, butil::IOBuf* const messages[], size_t size) {
|
||||
|
||||
@ -69,6 +69,7 @@ private:
|
||||
RuntimeProfile::Counter* _append_data_timer = nullptr;
|
||||
RuntimeProfile::Counter* _add_segment_timer = nullptr;
|
||||
RuntimeProfile::Counter* _close_wait_timer = nullptr;
|
||||
LoadStreamMgr* _load_stream_mgr = nullptr;
|
||||
};
|
||||
|
||||
using TabletStreamSharedPtr = std::shared_ptr<TabletStream>;
|
||||
|
||||
Reference in New Issue
Block a user