[Fix](lazy_open) Fix lazy open null point (#19829)
This commit is contained in:
committed by
GitHub
parent
30417e06d4
commit
fe111207a9
@ -87,6 +87,15 @@ Status LoadChannel::open(const PTabletWriterOpenRequest& params) {
|
||||
|
||||
Status LoadChannel::open_partition(const OpenPartitionRequest& params) {
|
||||
int64_t index_id = params.index_id();
|
||||
|
||||
// check finish
|
||||
{
|
||||
std::lock_guard<std::mutex> l(_lock);
|
||||
auto it = _finished_channel_ids.find(index_id);
|
||||
if (it != _finished_channel_ids.end()) {
|
||||
return Status::OK();
|
||||
}
|
||||
}
|
||||
std::shared_ptr<TabletsChannel> channel;
|
||||
{
|
||||
std::lock_guard<std::mutex> l(_lock);
|
||||
@ -94,14 +103,13 @@ Status LoadChannel::open_partition(const OpenPartitionRequest& params) {
|
||||
if (it != _tablets_channels.end()) {
|
||||
channel = it->second;
|
||||
} else {
|
||||
// create a new tablets channel
|
||||
TabletsChannelKey key(params.id(), index_id);
|
||||
channel = std::make_shared<TabletsChannel>(key, _load_id, _is_high_priority,
|
||||
_self_profile);
|
||||
{
|
||||
std::lock_guard<SpinLock> l(_tablets_channels_lock);
|
||||
_tablets_channels.insert({index_id, channel});
|
||||
fmt::memory_buffer buf;
|
||||
for (auto tablet : params.tablets()) {
|
||||
fmt::format_to(buf, "tablet id:{}", tablet.tablet_id());
|
||||
}
|
||||
LOG(WARNING) << "should be opened partition index id=" << params.index_id()
|
||||
<< "tablet ids=" << fmt::to_string(buf);
|
||||
return Status::InternalError("Partition should be opened");
|
||||
}
|
||||
}
|
||||
RETURN_IF_ERROR(channel->open_all_writers_for_partition(params));
|
||||
|
||||
@ -1107,10 +1107,13 @@ Status VOlapTableSink::open(RuntimeState* state) {
|
||||
SCOPED_TIMER(_open_timer);
|
||||
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
|
||||
|
||||
fmt::memory_buffer buf;
|
||||
for (auto index_channel : _channels) {
|
||||
fmt::format_to(buf, "index id:{}", index_channel->_index_id);
|
||||
index_channel->for_each_node_channel(
|
||||
[](const std::shared_ptr<VNodeChannel>& ch) { ch->open(); });
|
||||
}
|
||||
LOG(INFO) << "list of open index id = " << fmt::to_string(buf);
|
||||
|
||||
for (auto index_channel : _channels) {
|
||||
index_channel->for_each_node_channel([&index_channel](
|
||||
@ -1145,7 +1148,9 @@ void VOlapTableSink::_open_partition(const VOlapTablePartition* partition) {
|
||||
auto it = _opened_partitions.find(id);
|
||||
if (it == _opened_partitions.end()) {
|
||||
_opened_partitions.insert(id);
|
||||
fmt::memory_buffer buf;
|
||||
for (int j = 0; j < partition->indexes.size(); ++j) {
|
||||
fmt::format_to(buf, "index id:{}", partition->indexes[j].index_id);
|
||||
for (const auto& tid : partition->indexes[j].tablets) {
|
||||
auto it = _channels[j]->_channels_by_tablet.find(tid);
|
||||
for (const auto& channel : it->second) {
|
||||
@ -1153,6 +1158,7 @@ void VOlapTableSink::_open_partition(const VOlapTablePartition* partition) {
|
||||
}
|
||||
}
|
||||
}
|
||||
LOG(INFO) << "list of lazy open index id = " << fmt::to_string(buf);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user