pick https://github.com/apache/doris/pull/38605
This commit is contained in:
@ -215,6 +215,7 @@ Status BaseTabletsChannel::incremental_open(const PTabletWriterOpenRequest& para
|
||||
ss << "LocalTabletsChannel txn_id: " << _txn_id << " load_id: " << print_id(params.id())
|
||||
<< " incremental open delta writer: ";
|
||||
|
||||
// every change will hold _lock. this find in under _lock too. so no need _tablet_writers_lock again.
|
||||
for (const auto& tablet : params.tablets()) {
|
||||
if (_tablet_writers.find(tablet.tablet_id()) != _tablet_writers.end()) {
|
||||
continue;
|
||||
@ -238,6 +239,7 @@ Status BaseTabletsChannel::incremental_open(const PTabletWriterOpenRequest& para
|
||||
_profile, _load_id);
|
||||
ss << "[" << tablet.tablet_id() << "]";
|
||||
{
|
||||
// here we modify _tablet_writers. so need lock.
|
||||
std::lock_guard<SpinLock> l(_tablet_writers_lock);
|
||||
_tablet_writers.emplace(tablet.tablet_id(), std::move(delta_writer));
|
||||
}
|
||||
@ -479,6 +481,7 @@ Status BaseTabletsChannel::_open_all_writers(const PTabletWriterOpenRequest& req
|
||||
#endif
|
||||
|
||||
int tablet_cnt = 0;
|
||||
// under _lock. no need _tablet_writers_lock again.
|
||||
for (const auto& tablet : request.tablets()) {
|
||||
if (_tablet_writers.find(tablet.tablet_id()) != _tablet_writers.end()) {
|
||||
continue;
|
||||
@ -578,6 +581,11 @@ Status BaseTabletsChannel::add_batch(const PTabletWriterAddBlockRequest& request
|
||||
std::function<Status(BaseDeltaWriter * writer)> write_func) {
|
||||
google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors =
|
||||
response->mutable_tablet_errors();
|
||||
|
||||
// add_batch may concurrency with inc_open but not under _lock.
|
||||
// so need to protect it with _tablet_writers_lock.
|
||||
std::lock_guard<SpinLock> l(_tablet_writers_lock);
|
||||
|
||||
auto tablet_writer_it = _tablet_writers.find(tablet_id);
|
||||
if (tablet_writer_it == _tablet_writers.end()) {
|
||||
return Status::InternalError("unknown tablet to append data, tablet={}", tablet_id);
|
||||
|
||||
@ -136,11 +136,8 @@ protected:
|
||||
// id of this load channel
|
||||
TabletsChannelKey _key;
|
||||
|
||||
// make execute sequence
|
||||
// protect _state change. open and close. when add_batch finished, lock to change _next_seqs also
|
||||
std::mutex _lock;
|
||||
|
||||
SpinLock _tablet_writers_lock;
|
||||
|
||||
enum State {
|
||||
kInitialized,
|
||||
kOpened,
|
||||
@ -166,8 +163,10 @@ protected:
|
||||
// currently it's OK.
|
||||
Status _close_status;
|
||||
|
||||
// tablet_id -> TabletChannel
|
||||
// tablet_id -> TabletChannel. it will only be changed in open() or inc_open()
|
||||
std::unordered_map<int64_t, std::unique_ptr<BaseDeltaWriter>> _tablet_writers;
|
||||
// protect _tablet_writers
|
||||
SpinLock _tablet_writers_lock;
|
||||
// broken tablet ids.
|
||||
// If a tablet write fails, it's id will be added to this set.
|
||||
// So that following batch will not handle this tablet anymore.
|
||||
|
||||
Reference in New Issue
Block a user