[Cache][Enhancement] Assure sql cache only one version (#5793)
For PR #5792. This patch add a new param `cache type` to distinguish sql cache and partition cache. When update sql cache, we make assure one sql key only has one version cache.
This commit is contained in:
46
be/src/runtime/cache/result_node.cpp
vendored
46
be/src/runtime/cache/result_node.cpp
vendored
@ -77,7 +77,50 @@ PCacheStatus ResultNode::update_partition(const PUpdateCacheRequest* request,
|
||||
|
||||
//Only one thread per SQL key can update the cache
|
||||
CacheWriteLock write_lock(_node_mtx);
|
||||
if (request->cache_type() == CacheType::SQL_CACHE) {
|
||||
return update_sql_cache(request, is_update_firstkey);
|
||||
} else {
|
||||
return update_partition_cache(request, is_update_firstkey);
|
||||
}
|
||||
}
|
||||
|
||||
PCacheStatus ResultNode::update_sql_cache(const PUpdateCacheRequest *request, bool &is_update_firstkey) {
|
||||
PartitionRowBatch* partition = NULL;
|
||||
if (request->values_size() > 1) {
|
||||
return PCacheStatus::PARAM_ERROR;
|
||||
}
|
||||
is_update_firstkey = true;
|
||||
const PCacheValue& value = request->values(0);
|
||||
PartitionKey partition_key = value.param().partition_key();
|
||||
// no cache exist, create new cache node
|
||||
if (_partition_map.size() == 0) {
|
||||
partition = new PartitionRowBatch(partition_key);
|
||||
partition->set_row_batch(value);
|
||||
_partition_map[partition_key] = partition;
|
||||
_partition_list.push_back(partition);
|
||||
} else {
|
||||
// compatible with previous version
|
||||
for (auto it = _partition_list.begin(); it != _partition_list.end(); it++) {
|
||||
_data_size -= (*it)->get_data_size();
|
||||
}
|
||||
// clear old cache, and create new cache node
|
||||
for (auto it = _partition_list.begin(); it != _partition_list.end();) {
|
||||
(*it)->clear();
|
||||
SAFE_DELETE(*it);
|
||||
it = _partition_list.erase(it);
|
||||
}
|
||||
_partition_map.clear();
|
||||
partition = new PartitionRowBatch(partition_key);
|
||||
partition->set_row_batch(value);
|
||||
_partition_map[partition_key] = partition;
|
||||
_partition_list.push_back(partition);
|
||||
}
|
||||
_data_size += partition->get_data_size();
|
||||
VLOG(1) << "finish update sql cache batches:" << _partition_list.size();
|
||||
return PCacheStatus::CACHE_OK;
|
||||
}
|
||||
|
||||
PCacheStatus ResultNode::update_partition_cache(const PUpdateCacheRequest *request, bool &is_update_firstkey) {
|
||||
PartitionKey first_key = kint64max;
|
||||
if (_partition_list.size() == 0) {
|
||||
is_update_firstkey = true;
|
||||
@ -115,7 +158,7 @@ PCacheStatus ResultNode::update_partition(const PUpdateCacheRequest* request,
|
||||
_data_size += partition->get_data_size();
|
||||
}
|
||||
_partition_list.sort(compare_partition);
|
||||
LOG(INFO) << "finish update batches:" << _partition_list.size();
|
||||
VLOG(1) << "finish update partition cache batches:" << _partition_list.size();
|
||||
while (config::query_cache_max_partition_count > 0 &&
|
||||
_partition_list.size() > config::query_cache_max_partition_count) {
|
||||
if (prune_first() == 0) {
|
||||
@ -247,6 +290,7 @@ size_t ResultNode::prune_first() {
|
||||
PartitionRowBatch* part_node = *_partition_list.begin();
|
||||
size_t prune_size = part_node->get_data_size();
|
||||
_partition_list.erase(_partition_list.begin());
|
||||
_partition_map.erase(part_node->get_partition_key());
|
||||
part_node->clear();
|
||||
SAFE_DELETE(part_node);
|
||||
_data_size -= prune_size;
|
||||
|
||||
Reference in New Issue
Block a user