|
|
|
|
@ -62,7 +62,6 @@ TabletStream::TabletStream(PUniqueId load_id, int64_t id, int64_t txn_id,
|
|
|
|
|
_txn_id(txn_id),
|
|
|
|
|
_load_stream_mgr(load_stream_mgr) {
|
|
|
|
|
load_stream_mgr->create_tokens(_flush_tokens);
|
|
|
|
|
_status = Status::OK();
|
|
|
|
|
_profile = profile->create_child(fmt::format("TabletStream {}", id), true, true);
|
|
|
|
|
_append_data_timer = ADD_TIMER(_profile, "AppendDataTime");
|
|
|
|
|
_add_segment_timer = ADD_TIMER(_profile, "AddSegmentTime");
|
|
|
|
|
@ -71,7 +70,7 @@ TabletStream::TabletStream(PUniqueId load_id, int64_t id, int64_t txn_id,
|
|
|
|
|
|
|
|
|
|
inline std::ostream& operator<<(std::ostream& ostr, const TabletStream& tablet_stream) {
|
|
|
|
|
ostr << "load_id=" << tablet_stream._load_id << ", txn_id=" << tablet_stream._txn_id
|
|
|
|
|
<< ", tablet_id=" << tablet_stream._id << ", status=" << tablet_stream._status;
|
|
|
|
|
<< ", tablet_id=" << tablet_stream._id << ", status=" << tablet_stream._status.status();
|
|
|
|
|
return ostr;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@ -89,19 +88,19 @@ Status TabletStream::init(std::shared_ptr<OlapTableSchemaParam> schema, int64_t
|
|
|
|
|
|
|
|
|
|
_load_stream_writer = std::make_shared<LoadStreamWriter>(&req, _profile);
|
|
|
|
|
DBUG_EXECUTE_IF("TabletStream.init.uninited_writer", {
|
|
|
|
|
_status = Status::Uninitialized("fault injection");
|
|
|
|
|
return _status;
|
|
|
|
|
_status.update(Status::Uninitialized("fault injection"));
|
|
|
|
|
return _status.status();
|
|
|
|
|
});
|
|
|
|
|
_status = _load_stream_writer->init();
|
|
|
|
|
_status.update(_load_stream_writer->init());
|
|
|
|
|
if (!_status.ok()) {
|
|
|
|
|
LOG(INFO) << "failed to init rowset builder due to " << *this;
|
|
|
|
|
}
|
|
|
|
|
return _status;
|
|
|
|
|
return _status.status();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data) {
|
|
|
|
|
if (!_status.ok()) {
|
|
|
|
|
return _status;
|
|
|
|
|
return _status.status();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// dispatch add_segment request
|
|
|
|
|
@ -150,8 +149,8 @@ Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data
|
|
|
|
|
if (eos && st.ok()) {
|
|
|
|
|
st = _load_stream_writer->close_segment(new_segid);
|
|
|
|
|
}
|
|
|
|
|
if (!st.ok() && _status.ok()) {
|
|
|
|
|
_status = st;
|
|
|
|
|
if (!st.ok()) {
|
|
|
|
|
_status.update(st);
|
|
|
|
|
LOG(WARNING) << "write data failed " << st << ", " << *this;
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
@ -167,11 +166,11 @@ Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data
|
|
|
|
|
timer.start();
|
|
|
|
|
while (flush_token->num_tasks() >= load_stream_flush_token_max_tasks) {
|
|
|
|
|
if (timer.elapsed_time() / 1000 / 1000 >= load_stream_max_wait_flush_token_time_ms) {
|
|
|
|
|
_status = Status::Error<true>(
|
|
|
|
|
"wait flush token back pressure time is more than "
|
|
|
|
|
"load_stream_max_wait_flush_token_time {}",
|
|
|
|
|
load_stream_max_wait_flush_token_time_ms);
|
|
|
|
|
return _status;
|
|
|
|
|
_status.update(
|
|
|
|
|
Status::Error<true>("wait flush token back pressure time is more than "
|
|
|
|
|
"load_stream_max_wait_flush_token_time {}",
|
|
|
|
|
load_stream_max_wait_flush_token_time_ms));
|
|
|
|
|
return _status.status();
|
|
|
|
|
}
|
|
|
|
|
bthread_usleep(2 * 1000); // 2ms
|
|
|
|
|
}
|
|
|
|
|
@ -181,14 +180,14 @@ Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data
|
|
|
|
|
g_load_stream_flush_running_threads << 1;
|
|
|
|
|
auto st = flush_token->submit_func(flush_func);
|
|
|
|
|
if (!st.ok()) {
|
|
|
|
|
_status = st;
|
|
|
|
|
_status.update(st);
|
|
|
|
|
}
|
|
|
|
|
return _status;
|
|
|
|
|
return _status.status();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf* data) {
|
|
|
|
|
if (!_status.ok()) {
|
|
|
|
|
return _status;
|
|
|
|
|
return _status.status();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
SCOPED_TIMER(_add_segment_timer);
|
|
|
|
|
@ -207,17 +206,17 @@ Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf* data
|
|
|
|
|
{
|
|
|
|
|
std::lock_guard lock_guard(_lock);
|
|
|
|
|
if (!_segids_mapping.contains(src_id)) {
|
|
|
|
|
_status = Status::InternalError(
|
|
|
|
|
_status.update(Status::InternalError(
|
|
|
|
|
"add segment failed, no segment written by this src be yet, src_id={}, "
|
|
|
|
|
"segment_id={}",
|
|
|
|
|
src_id, segid);
|
|
|
|
|
return _status;
|
|
|
|
|
src_id, segid));
|
|
|
|
|
return _status.status();
|
|
|
|
|
}
|
|
|
|
|
if (segid >= _segids_mapping[src_id]->size()) {
|
|
|
|
|
_status = Status::InternalError(
|
|
|
|
|
_status.update(Status::InternalError(
|
|
|
|
|
"add segment failed, segment is never written, src_id={}, segment_id={}",
|
|
|
|
|
src_id, segid);
|
|
|
|
|
return _status;
|
|
|
|
|
src_id, segid));
|
|
|
|
|
return _status.status();
|
|
|
|
|
}
|
|
|
|
|
new_segid = _segids_mapping[src_id]->at(segid);
|
|
|
|
|
}
|
|
|
|
|
@ -226,76 +225,76 @@ Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf* data
|
|
|
|
|
auto add_segment_func = [this, new_segid, stat, flush_schema]() {
|
|
|
|
|
signal::set_signal_task_id(_load_id);
|
|
|
|
|
auto st = _load_stream_writer->add_segment(new_segid, stat, flush_schema);
|
|
|
|
|
if (!st.ok() && _status.ok()) {
|
|
|
|
|
_status = st;
|
|
|
|
|
if (!st.ok()) {
|
|
|
|
|
_status.update(st);
|
|
|
|
|
LOG(INFO) << "add segment failed " << *this;
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
auto& flush_token = _flush_tokens[new_segid % _flush_tokens.size()];
|
|
|
|
|
auto st = flush_token->submit_func(add_segment_func);
|
|
|
|
|
if (!st.ok()) {
|
|
|
|
|
_status = st;
|
|
|
|
|
_status.update(st);
|
|
|
|
|
}
|
|
|
|
|
return _status;
|
|
|
|
|
return _status.status();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Status TabletStream::close() {
|
|
|
|
|
if (!_status.ok()) {
|
|
|
|
|
return _status;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
SCOPED_TIMER(_close_wait_timer);
|
|
|
|
|
Status TabletStream::_run_in_heavy_work_pool(std::function<Status()> fn) {
|
|
|
|
|
bthread::Mutex mu;
|
|
|
|
|
std::unique_lock<bthread::Mutex> lock(mu);
|
|
|
|
|
bthread::ConditionVariable cv;
|
|
|
|
|
auto wait_func = [this, &mu, &cv] {
|
|
|
|
|
auto st = Status::OK();
|
|
|
|
|
auto func = [this, &mu, &cv, &st, &fn] {
|
|
|
|
|
signal::set_signal_task_id(_load_id);
|
|
|
|
|
for (auto& token : _flush_tokens) {
|
|
|
|
|
token->wait();
|
|
|
|
|
}
|
|
|
|
|
st = fn();
|
|
|
|
|
std::lock_guard<bthread::Mutex> lock(mu);
|
|
|
|
|
cv.notify_one();
|
|
|
|
|
};
|
|
|
|
|
bool ret = _load_stream_mgr->heavy_work_pool()->try_offer(wait_func);
|
|
|
|
|
if (ret) {
|
|
|
|
|
cv.wait(lock);
|
|
|
|
|
} else {
|
|
|
|
|
_status = Status::Error<ErrorCode::INTERNAL_ERROR>(
|
|
|
|
|
bool ret = _load_stream_mgr->heavy_work_pool()->try_offer(func);
|
|
|
|
|
if (!ret) {
|
|
|
|
|
return Status::Error<ErrorCode::INTERNAL_ERROR>(
|
|
|
|
|
"there is not enough thread resource for close load");
|
|
|
|
|
return _status;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (_check_num_segments && (_next_segid.load() != _num_segments)) {
|
|
|
|
|
_status = Status::Corruption(
|
|
|
|
|
"segment num mismatch in tablet {}, expected: {}, actual: {}, load_id: {}", _id,
|
|
|
|
|
_num_segments, _next_segid.load(), print_id(_load_id));
|
|
|
|
|
return _status;
|
|
|
|
|
}
|
|
|
|
|
cv.wait(lock);
|
|
|
|
|
return st;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void TabletStream::pre_close() {
|
|
|
|
|
if (!_status.ok()) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
SCOPED_TIMER(_close_wait_timer);
|
|
|
|
|
_status.update(_run_in_heavy_work_pool([this]() {
|
|
|
|
|
for (auto& token : _flush_tokens) {
|
|
|
|
|
token->wait();
|
|
|
|
|
}
|
|
|
|
|
return Status::OK();
|
|
|
|
|
}));
|
|
|
|
|
// it is necessary to check status after wait_func,
|
|
|
|
|
// for create_rowset could fail during add_segment when loading to MOW table,
|
|
|
|
|
// in this case, should skip close to avoid submit_calc_delete_bitmap_task which could cause coredump.
|
|
|
|
|
if (!_status.ok()) {
|
|
|
|
|
return _status;
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
auto close_func = [this, &mu, &cv]() {
|
|
|
|
|
signal::set_signal_task_id(_load_id);
|
|
|
|
|
auto st = _load_stream_writer->close();
|
|
|
|
|
if (!st.ok() && _status.ok()) {
|
|
|
|
|
_status = st;
|
|
|
|
|
}
|
|
|
|
|
std::lock_guard<bthread::Mutex> lock(mu);
|
|
|
|
|
cv.notify_one();
|
|
|
|
|
};
|
|
|
|
|
ret = _load_stream_mgr->heavy_work_pool()->try_offer(close_func);
|
|
|
|
|
if (ret) {
|
|
|
|
|
cv.wait(lock);
|
|
|
|
|
} else {
|
|
|
|
|
_status = Status::Error<ErrorCode::INTERNAL_ERROR>(
|
|
|
|
|
"there is not enough thread resource for close load");
|
|
|
|
|
if (_check_num_segments && (_next_segid.load() != _num_segments)) {
|
|
|
|
|
_status.update(Status::Corruption(
|
|
|
|
|
"segment num mismatch in tablet {}, expected: {}, actual: {}, load_id: {}", _id,
|
|
|
|
|
_num_segments, _next_segid.load(), print_id(_load_id)));
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
return _status;
|
|
|
|
|
|
|
|
|
|
_status.update(_run_in_heavy_work_pool([this]() { return _load_stream_writer->pre_close(); }));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Status TabletStream::close() {
|
|
|
|
|
if (!_status.ok()) {
|
|
|
|
|
return _status.status();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
SCOPED_TIMER(_close_wait_timer);
|
|
|
|
|
_status.update(_run_in_heavy_work_pool([this]() { return _load_stream_writer->close(); }));
|
|
|
|
|
return _status.status();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
IndexStream::IndexStream(PUniqueId load_id, int64_t id, int64_t txn_id,
|
|
|
|
|
@ -363,6 +362,10 @@ void IndexStream::close(const std::vector<PTabletID>& tablets_to_commit,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for (auto& [_, tablet_stream] : _tablet_streams_map) {
|
|
|
|
|
tablet_stream->pre_close();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for (auto& [_, tablet_stream] : _tablet_streams_map) {
|
|
|
|
|
auto st = tablet_stream->close();
|
|
|
|
|
if (st.ok()) {
|
|
|
|
|
|