From 63c99eb4cbb9f1a42c0c41b5c6d65c36de8825dd Mon Sep 17 00:00:00 2001 From: xinghuayu007 <1450306854@qq.com> Date: Fri, 28 May 2021 13:45:47 +0800 Subject: [PATCH] [Cache][Enhancement] Assure sql cache only one version (#5793) For PR #5792. This patch add a new param `cache type` to distinguish sql cache and partition cache. When update sql cache, we make assure one sql key only has one version cache. --- be/src/runtime/cache/result_node.cpp | 46 +++++++++++++++- be/src/runtime/cache/result_node.h | 2 + .../runtime/cache/partition_cache_test.cpp | 55 ++++++++++++++----- .../doris/qe/cache/RowBatchBuilder.java | 8 ++- gensrc/proto/internal_service.proto | 6 ++ 5 files changed, 99 insertions(+), 18 deletions(-) diff --git a/be/src/runtime/cache/result_node.cpp b/be/src/runtime/cache/result_node.cpp index f188c89484..4a56349e40 100644 --- a/be/src/runtime/cache/result_node.cpp +++ b/be/src/runtime/cache/result_node.cpp @@ -77,7 +77,50 @@ PCacheStatus ResultNode::update_partition(const PUpdateCacheRequest* request, //Only one thread per SQL key can update the cache CacheWriteLock write_lock(_node_mtx); + if (request->cache_type() == CacheType::SQL_CACHE) { + return update_sql_cache(request, is_update_firstkey); + } else { + return update_partition_cache(request, is_update_firstkey); + } +} +PCacheStatus ResultNode::update_sql_cache(const PUpdateCacheRequest *request, bool &is_update_firstkey) { + PartitionRowBatch* partition = NULL; + if (request->values_size() > 1) { + return PCacheStatus::PARAM_ERROR; + } + is_update_firstkey = true; + const PCacheValue& value = request->values(0); + PartitionKey partition_key = value.param().partition_key(); + // no cache exist, create new cache node + if (_partition_map.size() == 0) { + partition = new PartitionRowBatch(partition_key); + partition->set_row_batch(value); + _partition_map[partition_key] = partition; + _partition_list.push_back(partition); + } else { + // compatible with previous version + for (auto it = _partition_list.begin(); it != _partition_list.end(); it++) { + _data_size -= (*it)->get_data_size(); + } + // clear old cache, and create new cache node + for (auto it = _partition_list.begin(); it != _partition_list.end();) { + (*it)->clear(); + SAFE_DELETE(*it); + it = _partition_list.erase(it); + } + _partition_map.clear(); + partition = new PartitionRowBatch(partition_key); + partition->set_row_batch(value); + _partition_map[partition_key] = partition; + _partition_list.push_back(partition); + } + _data_size += partition->get_data_size(); + VLOG(1) << "finish update sql cache batches:" << _partition_list.size(); + return PCacheStatus::CACHE_OK; +} + +PCacheStatus ResultNode::update_partition_cache(const PUpdateCacheRequest *request, bool &is_update_firstkey) { PartitionKey first_key = kint64max; if (_partition_list.size() == 0) { is_update_firstkey = true; @@ -115,7 +158,7 @@ PCacheStatus ResultNode::update_partition(const PUpdateCacheRequest* request, _data_size += partition->get_data_size(); } _partition_list.sort(compare_partition); - LOG(INFO) << "finish update batches:" << _partition_list.size(); + VLOG(1) << "finish update partition cache batches:" << _partition_list.size(); while (config::query_cache_max_partition_count > 0 && _partition_list.size() > config::query_cache_max_partition_count) { if (prune_first() == 0) { @@ -247,6 +290,7 @@ size_t ResultNode::prune_first() { PartitionRowBatch* part_node = *_partition_list.begin(); size_t prune_size = part_node->get_data_size(); _partition_list.erase(_partition_list.begin()); + _partition_map.erase(part_node->get_partition_key()); part_node->clear(); SAFE_DELETE(part_node); _data_size -= prune_size; diff --git a/be/src/runtime/cache/result_node.h b/be/src/runtime/cache/result_node.h index 2d5ca0de06..d2c5d1d614 100644 --- a/be/src/runtime/cache/result_node.h +++ b/be/src/runtime/cache/result_node.h @@ -128,6 +128,8 @@ public: PCacheStatus update_partition(const PUpdateCacheRequest* request, bool& is_update_firstkey); PCacheStatus fetch_partition(const PFetchCacheRequest* request, PartitionRowBatchList& rowBatchList, bool& is_hit_firstkey); + PCacheStatus update_sql_cache(const PUpdateCacheRequest* request, bool& is_update_firstkey); + PCacheStatus update_partition_cache(const PUpdateCacheRequest* request, bool& is_update_firstkey); size_t prune_first(); void clear(); diff --git a/be/test/runtime/cache/partition_cache_test.cpp b/be/test/runtime/cache/partition_cache_test.cpp index 5ae9eefbc7..999909014d 100644 --- a/be/test/runtime/cache/partition_cache_test.cpp +++ b/be/test/runtime/cache/partition_cache_test.cpp @@ -46,7 +46,7 @@ private: } void init(int max_size, int ela_size); void clear(); - PCacheStatus init_batch_data(int sql_num, int part_begin, int part_num); + PCacheStatus init_batch_data(int sql_num, int part_begin, int part_num, CacheType cache_type); ResultCache* _cache; PUpdateCacheRequest* _update_request; PCacheResponse* _update_response; @@ -84,7 +84,7 @@ void set_sql_key(PUniqueId* sql_key, int64 hi, int64 lo) { sql_key->set_lo(lo); } -PCacheStatus PartitionCacheTest::init_batch_data(int sql_num, int part_begin, int part_num) { +PCacheStatus PartitionCacheTest::init_batch_data(int sql_num, int part_begin, int part_num, CacheType cache_type) { LOG(WARNING) << "init data, sql_num:" << sql_num << ",part_num:" << part_num; PUpdateCacheRequest* up_req = NULL; PCacheResponse* up_res = NULL; @@ -103,6 +103,7 @@ PCacheStatus PartitionCacheTest::init_batch_data(int sql_num, int part_begin, in value->set_data_size(16); value->add_rows("0123456789abcdef"); //16 byte } + up_req->set_cache_type(cache_type); _cache->update(up_req, up_res); LOG(WARNING) << "finish update data"; st = up_res->status(); @@ -114,7 +115,7 @@ PCacheStatus PartitionCacheTest::init_batch_data(int sql_num, int part_begin, in TEST_F(PartitionCacheTest, update_data) { init_default(); - PCacheStatus st = init_batch_data(1, 1, 1); + PCacheStatus st = init_batch_data(1, 1, 1, CacheType::SQL_CACHE); ASSERT_TRUE(st == PCacheStatus::CACHE_OK); LOG(WARNING) << "clear cache"; clear(); @@ -122,14 +123,14 @@ TEST_F(PartitionCacheTest, update_data) { TEST_F(PartitionCacheTest, update_over_partition) { init_default(); - PCacheStatus st = init_batch_data(1, 1, config::query_cache_max_partition_count + 1); + PCacheStatus st = init_batch_data(1, 1, config::query_cache_max_partition_count + 1, CacheType::PARTITION_CACHE); ASSERT_TRUE(st == PCacheStatus::PARAM_ERROR); clear(); } TEST_F(PartitionCacheTest, cache_clear) { init_default(); - init_batch_data(1, 1, 1); + init_batch_data(1, 1, 1, CacheType::SQL_CACHE); _cache->clear(_clear_request, _clear_response); ASSERT_EQ(_cache->get_cache_size(), 0); clear(); @@ -137,7 +138,7 @@ TEST_F(PartitionCacheTest, cache_clear) { TEST_F(PartitionCacheTest, fetch_simple_data) { init_default(); - init_batch_data(1, 1, 1); + init_batch_data(1, 1, 1, CacheType::SQL_CACHE); LOG(WARNING) << "finish init\n"; set_sql_key(_fetch_request->mutable_sql_key(), 1, 1); @@ -159,7 +160,7 @@ TEST_F(PartitionCacheTest, fetch_simple_data) { TEST_F(PartitionCacheTest, fetch_not_sqlid) { init_default(); - init_batch_data(1, 1, 1); + init_batch_data(1, 1, 1, CacheType::SQL_CACHE); set_sql_key(_fetch_request->mutable_sql_key(), 2, 2); PCacheParam* p1 = _fetch_request->add_params(); @@ -174,7 +175,7 @@ TEST_F(PartitionCacheTest, fetch_not_sqlid) { TEST_F(PartitionCacheTest, fetch_range_data) { init_default(); - init_batch_data(1, 1, 3); + init_batch_data(1, 1, 3, CacheType::PARTITION_CACHE); set_sql_key(_fetch_request->mutable_sql_key(), 1, 1); PCacheParam* p1 = _fetch_request->add_params(); @@ -195,7 +196,7 @@ TEST_F(PartitionCacheTest, fetch_range_data) { TEST_F(PartitionCacheTest, fetch_invalid_right_range) { init_default(); - init_batch_data(1, 1, 3); + init_batch_data(1, 1, 3, CacheType::PARTITION_CACHE); set_sql_key(_fetch_request->mutable_sql_key(), 1, 1); PCacheParam* p1 = _fetch_request->add_params(); @@ -215,7 +216,7 @@ TEST_F(PartitionCacheTest, fetch_invalid_right_range) { TEST_F(PartitionCacheTest, fetch_invalid_left_range) { init_default(); - init_batch_data(1, 1, 3); + init_batch_data(1, 1, 3, CacheType::PARTITION_CACHE); set_sql_key(_fetch_request->mutable_sql_key(), 1, 1); PCacheParam* p1 = _fetch_request->add_params(); @@ -231,7 +232,7 @@ TEST_F(PartitionCacheTest, fetch_invalid_left_range) { TEST_F(PartitionCacheTest, fetch_invalid_key_range) { init_default(); - init_batch_data(1, 2, 1); + init_batch_data(1, 2, 1, CacheType::PARTITION_CACHE); set_sql_key(_fetch_request->mutable_sql_key(), 1, 1); PCacheParam* p1 = _fetch_request->add_params(); @@ -256,7 +257,7 @@ TEST_F(PartitionCacheTest, fetch_invalid_key_range) { TEST_F(PartitionCacheTest, fetch_data_overdue) { init_default(); - init_batch_data(1, 1, 1); + init_batch_data(1, 1, 1, CacheType::PARTITION_CACHE); set_sql_key(_fetch_request->mutable_sql_key(), 1, 1); PCacheParam* p1 = _fetch_request->add_params(); @@ -276,15 +277,15 @@ TEST_F(PartitionCacheTest, fetch_data_overdue) { TEST_F(PartitionCacheTest, prune_data) { init(1, 1); - init_batch_data(LOOP_LESS_OR_MORE(10, 129), 1, 1024); // 16*1024*128=2M + init_batch_data(LOOP_LESS_OR_MORE(10, 129), 1, 1024, CacheType::PARTITION_CACHE); // 16*1024*128=2M ASSERT_LE(_cache->get_cache_size(), 1 * 1024 * 1024); //cache_size <= 1M clear(); } TEST_F(PartitionCacheTest, fetch_not_continue_partition) { init_default(); - init_batch_data(1, 1, 1); - init_batch_data(1, 3, 1); + init_batch_data(1, 1, 1, CacheType::PARTITION_CACHE); + init_batch_data(1, 3, 1, CacheType::PARTITION_CACHE); set_sql_key(_fetch_request->mutable_sql_key(), 1, 1); PCacheParam* p1 = _fetch_request->add_params(); p1->set_partition_key(1); @@ -306,6 +307,30 @@ TEST_F(PartitionCacheTest, fetch_not_continue_partition) { clear(); } +TEST_F(PartitionCacheTest, update_sql_cache) { + init_default(); + init_batch_data(1, 1, 1, CacheType::SQL_CACHE); + set_sql_key(_fetch_request->mutable_sql_key(), 1, 1); + PCacheParam* p1 = _fetch_request->add_params(); + p1->set_partition_key(1); + p1->set_last_version(1); + p1->set_last_version_time(1); + _cache->fetch(_fetch_request, _fetch_result); + ASSERT_TRUE(_fetch_result->status() == PCacheStatus::CACHE_OK); + ASSERT_EQ(_fetch_result->values_size(), 1); + ASSERT_EQ(_fetch_result->values(0).rows(0), "0123456789abcdef"); + // update sql cache and fetch cache again + init_batch_data(1, 2, 1, CacheType::SQL_CACHE); + set_sql_key(_fetch_request->mutable_sql_key(), 1, 1); + p1 = _fetch_request->add_params(); + p1->set_partition_key(1); + p1->set_last_version(1); + p1->set_last_version_time(1); + _cache->fetch(_fetch_request, _fetch_result); + ASSERT_TRUE(_fetch_result->status() == PCacheStatus::NO_PARTITION_KEY); + clear(); +} + } // namespace doris int main(int argc, char** argv) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/RowBatchBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/RowBatchBuilder.java index 444cef24bc..c468bac4be 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/RowBatchBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/RowBatchBuilder.java @@ -102,7 +102,9 @@ public class RowBatchBuilder { public InternalService.PUpdateCacheRequest buildSqlUpdateRequest(String sql, long partitionKey, long lastVersion, long lastestTime) { if (updateRequest == null) { - updateRequest = InternalService.PUpdateCacheRequest.newBuilder().setSqlKey(CacheProxy.getMd5(sql)).build(); + updateRequest = InternalService.PUpdateCacheRequest.newBuilder() + .setSqlKey(CacheProxy.getMd5(sql)) + .setCacheType(InternalService.CacheType.SQL_CACHE).build(); } updateRequest = updateRequest.toBuilder() .addValues(InternalService.PCacheValue.newBuilder() @@ -139,7 +141,9 @@ public class RowBatchBuilder { */ public InternalService.PUpdateCacheRequest buildPartitionUpdateRequest(String sql) { if (updateRequest == null) { - updateRequest = InternalService.PUpdateCacheRequest.newBuilder().setSqlKey(CacheProxy.getMd5(sql)).build(); + updateRequest = InternalService.PUpdateCacheRequest.newBuilder() + .setSqlKey(CacheProxy.getMd5(sql)) + .setCacheType(InternalService.CacheType.PARTITION_CACHE).build(); } HashMap> partRowMap = new HashMap<>(); List partitionRowList; diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 437fc2f3dd..6aaf40da2d 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -164,6 +164,11 @@ enum PCacheStatus { EMPTY_DATA = 8; }; +enum CacheType { + SQL_CACHE = 1; + PARTITION_CACHE = 2; +}; + message PCacheParam { required int64 partition_key = 1; optional int64 last_version = 2; @@ -184,6 +189,7 @@ message PCacheResponse { message PUpdateCacheRequest{ required PUniqueId sql_key = 1; repeated PCacheValue values = 2; + optional CacheType cache_type = 3; }; message PFetchCacheRequest {